New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
DBZ-5557: Supply partition when comiting offsets with source database #3828
Conversation
ee157c6
to
fee02e8
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ramanenka LGTM thanks, I've left two comments to address.
previousOutputBatchSize += batchSize; | ||
if (pollOutputDelay.hasElapsed()) { | ||
// We want to record the status ... | ||
final Instant currentTime = clock.currentTime(); | ||
LOGGER.info("{} records sent during previous {}, last recorded offset: {}", previousOutputBatchSize, | ||
Strings.duration(Duration.between(previousOutputInstant, currentTime).toMillis()), lastOffset); | ||
Strings.duration(Duration.between(previousOutputInstant, currentTime).toMillis()), lastRecord.sourceOffset()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It probably makes sense to log partition here too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added the partition to the log message.
coordinator.commitOffset(partition, lastOffset); | ||
iterator.remove(); | ||
} | ||
catch (RuntimeException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why only RuntimeException? And why should be unsuccessfull commit ignored?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why only RuntimeException?
There are no checked exceptions according to coordinator.commitOffset
method signature. Which leaves us with runtime exceptions only.
And why should be unsuccessfull commit ignored?
I was thinking that an unsuccessful commit shouldn't fail the connector as it will have a chance to retry on the next commit attempt. In the use case we're trying to implement (to delete consumed capture instances) it makes sense to retry until the commit is successful. Alternatively we could just let the exception bubble up and fail the connector which is also a valid option to choose. Which of the two approaches make more sense to you?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ramanenka I'd keep the behaviour as is now. Looking at commitOffsets()
I've a feeling that in that case in an unpredicted state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with @jpechane. Let's focus on introducing the necessary API changes with just as little implementation as necessary.
Once we finalize the implementation for the SQL Server connector, we'll have a better understanding of the needed error handling. If any changes in the implementation of this method will become necessary, we will be able to articulate them better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've removed the try-catch
here. Also I've simplified the code a bit by getting the key set iterator as opposed to the entry set iterator.
Keep track of partitions offsets being committed belong to in `BaseSourceTask`. Supply the partition along with the offset in calls to `commitOffset`.
@ramanenka Applied, thanks |
@ramanenka is this a breaking change when upgrading existing versions? |
@tooptoop4 unless you have custom code that overrides |
Keep track of partitions offsets being committed belong to in
BaseSourceTask
. Supply the partition along with the offset in calls tocommitOffset
.