New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SAMZA-1728: BootstrappingChooser - Call checkOffset only for a lagging partition while choosing. #533
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure what are the changes here..
@sborya Moved checkOffset into the if block. |
// If the offset we just read is the same as the offset for the last | ||
// message (newest) in this system stream partition, then we have read | ||
// all messages, and can mark this SSP as bootstrapped. | ||
checkOffset(systemStreamPartition, offset, OffsetType.NEWEST) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Invoking the checkOffset method from if block will prevent marking a SystemStreamPartition
as caught up in some scenarios(systemStreamLagCounts
will not be set to zero for a SystemStreamPartition
).
systemStreamLagCounts
reports a metric to indicate if a SystemStreamPartition
from a bootstrap stream has caught up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this ssp is not a lagging partition, it means that it has already been accounted for (wrt sspLagCounts), as part of earlier call to checkOffset. I do not know in what other scenario the metric would be incorrect.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a test to verify that checkOffset is called only for lagging SystemStreamPartitions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added the test. Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Build fails in jenkins with compilation errors.
[ant:scalac] Element '/home/jenkins/jenkins-slave/workspace/samza-pr-checks/samza-core/build/resources/main' does not exist.
[ant:scalac] Element '/home/jenkins/jenkins-slave/workspace/samza-pr-checks/samza-api/build/resources/test' does not exist.
:samza-core_2.11:compileTestScala FAILED
FAILURE: Build failed with an exception.
Can we please fix that.
// message (newest) in this system stream partition, then we have read | ||
// all messages, and can mark this SSP as bootstrapped. | ||
checkOffset(systemStreamPartition, offset, OffsetType.NEWEST) | ||
// If the offset we just read is the same as the offset for the last |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NEWEST
offsetType denotes the offset at the tail-end of the system.
If we receive a offset equal to NEWEST from a system, then it will be sufficient to mark the SSP of the system as caught-up(the check doesn't have to be greater than or equal to) in BootstrappingChooser
.
Instead of the following check in checkOffset
:
val comparatorResult = systemAdmin.offsetComparator(offset, offsetToCheck)
if (comparatorResult != null && comparatorResult.intValue() >= 0) {
...
...
}
I think it should be like this:
val comparatorResult = systemAdmin.offsetComparator(offset, offsetToCheck)
if (comparatorResult != null && comparatorResult.intValue() == 0) {
...
...
}
I think this should solve the problem reported in SAMZA-1728
.
Do you see any issues in this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There could be systems where there could be two different topics: one for bootstrap and the other for change capture. When systemAdmin advertises it's newest offset, it would be that of change capture's latest offset, when the job started. There is this corner case: By the time we end up consuming all of bootstrap topic and we switch to change capture, the initially advertised offset could be out of retention. In that case, greater than would be required.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand why we need this check. Thanks for the explanation. Makes sense.
…g partition while choosing.
…g partition while choosing.
…g partition while choosing.
…g partition while choosing.
…g partition while choosing.
…g partition while choosing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
No description provided.