-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support online consumer cursor reset #139
Conversation
CLA is valid! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change looks good! Please go ahead
if (log.isDebugEnabled()) { | ||
log.debug("[{}][{}] Successfully reset subscription to timestamp {}", topicName, subName, | ||
timestamp); | ||
if (isFenced.compareAndSet(false, true)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of indenting the whole code, I'd prefer :
if (!isFenced.compareAndSet(false, true)) {
// fail
return;
}
/// continue
@merlimat regarding my comments:
Do you think it's really necessary to disconnect all consumers? |
dc30a70
to
c56511b
Compare
c56511b
to
5af1bd9
Compare
@merlimat I did some more changes, mainly removing CountDownLatch in favor of full async. I also adapted the tests to perform online resets. Could you give the new changes a look? By the way, test failure is unrelated to this changes, could you re-run travis please? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sschepens The change looks good.
About the need for disconnecting the consumers, I think the trickiest part is how to treat the in-flight read operations. Typically, a read operation will move the read position cursor when it succeeds.
If you reset the cursor back to position 0, but there was a read from 10 to 15, the reset might go through, but then read operation will jump again on position 15, invalidating the reset.
The disconnection should ensure that doesn't happen.
We should have some test to verify that the reset works while a consumer is actively receiving messages.
@@ -1264,20 +1240,8 @@ public void partitionedTopicsCursorReset() throws Exception { | |||
} | |||
// messages should still be available due to retention | |||
|
|||
try { | |||
admin.persistentTopics().resetCursor(topicName, "my-sub", timestamp); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this test still be valid?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why? resetCursor should not throw an exception now if consumers are online.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. @rdhabalia Can you give it another look?
return future; | ||
} | ||
if (!isFenced.compareAndSet(false, true)) { | ||
future.completeExceptionally(new SubscriptionBusyException("Failed to fence subscription")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some failure path during close() and disconnect() reset the fenced state. But I think in those cases pending reads are cancelled, so we are still ok?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should be fine as we disconnect the consumers and reconnection of the consumer will take care of it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
* Feature rich python instance * Seperate out python instance from main * Functionally complete python * Fix syntax * Fix errors * More fixes * Semi working version * Fixed more errors in the produce path * User userCodeFile instead of jarFile * ProcessContainer launches correct python process * Stupid identation bug * Downloaded file should have the same name as the module * Added more comments * Fix a log issue * Add licence header
Motivation
A consumer should be able to reset it's cursor without taking down the process.
Modifications
Allow for reset when consumers are connected.
Fence subscription, disconnect all consumer, reset cursor, the un-fence the subscription.
I'm a bit weary of fencing the subscription, if something would go wrong, we would leave the subscription locked, but I think it's the only way of making sure no consumer connects while we reset the cursor.
Anyway, What could happen if consumers are online while cursor is being reset? Maybe it's not such a bad thing, and disconnecting consumers is not a pretty way to handle this.
Edit: cursor reset seems to be pretty safe with respect to acks.
Just want a review to see if this is the correct way to do it, then I'll try to add a test case.
Result
Consumer should now be able to reset cursors without taking down processes