New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-2677: ensure consumer sees coordinator disconnects #349
Conversation
f7888ff
to
7177352
Compare
@ijuma @guozhangwang ping for review |
One alternative is to only have the listener at the The code compiles although I commented out some test code since I just meant to demonstrate the idea. Naming could probably be improved. I think this approach fits better with the NetworkClient design and, as Jason pointed out, we don't have to worry about semantics for the case where the listener invokes NetworkClient methods. Jason suggested that I should ask for @guozhangwang's input. |
+1 on @ijuma's suggestion. I like this idea since it keeps the NetworkClient API "in front of you," so to speak. I wasn't sure if listener semantics is really something we want to introduce to NetworkClient, since it is so widely used in the codebase. |
@ijuma @hachikuji Just another idea I had before: we can check "coordinator alive" in each |
@guozhangwang That might work. The tricky thing about using connectionFailed() is that the disconnected state will persist until it is cleared with a call to ready() (after the backoff time has elapsed). But maybe we could solve that by always calling ready() when we discover the coordinator in the group metadata response handler. |
@guozhangwang I think the tradeoff with this approach is that we'll only detect disconnects after we try to send something, which means detection will typically be limited by the heartbeat duration. That might be a reasonable trade since the approach is simpler. |
@hachikuji I think a) we need to respect the connection backoff even for coordinator; which means if the coordinator has disconnected, and we discover it is still the coordinator, while reconnecting we still need to wait until backoff is elapsed. b) since the disconnects are only detected inside the selector, for any of these approaches we are limited to selector's behavior that we can only detect disconnections when we select on that channel's key. |
@guozhangwang I think that suggestion works. Have a look and let me know what you think. |
@@ -63,6 +63,10 @@ public void close(String id) { | |||
} | |||
} | |||
|
|||
public void disconnect(String id) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch.
7f4c587
to
863e09a
Compare
@@ -240,7 +240,7 @@ | |||
FETCH_MAX_WAIT_MS_DOC) | |||
.define(RECONNECT_BACKOFF_MS_CONFIG, | |||
Type.LONG, | |||
50L, | |||
500L, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For my benefit, why are we increasing the reconnect backoff?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll revert this change since I didn't mean to commit it. 50ms seems a little low, but I'm not sure what is reasonable.
@@ -464,6 +464,8 @@ private void handleGroupMetadataResponse(ClientResponse resp, RequestFuture<Void | |||
groupMetadataResponse.node().host(), | |||
groupMetadataResponse.node().port()); | |||
|
|||
client.tryConnect(coordinator); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this just for saving one more poll turn-around?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's to ensure that the connection doesn't stay in the DISCONNECTED state indefinitely. The only way to break out of that state is to call NetworkClient.ready() (tryConnect delegates to NetworkCleint.ready()). Otherwise, the coordinatorUnknown() check in ensureCoordinatorKnown() would always return true after the connection had failed.
LGTM except one minor comment about adding comments on |
If the fetch response has no data, then the log append currently fails with an `Append failed unexpectedly` error. The problem is that there is no start offset for an empty append. This patch fixes the problem by adding a check in the response handler and skipping the append if the record set is empty. We also formally make empty appends invalid in the API and add some testing for this.
If the fetch response has no data, then the log append currently fails with an `Append failed unexpectedly` error. The problem is that there is no start offset for an empty append. This patch fixes the problem by adding a check in the response handler and skipping the append if the record set is empty. We also formally make empty appends invalid in the API and add some testing for this.
If the fetch response has no data, then the log append currently fails with an `Append failed unexpectedly` error. The problem is that there is no start offset for an empty append. This patch fixes the problem by adding a check in the response handler and skipping the append if the record set is empty. We also formally make empty appends invalid in the API and add some testing for this.
No description provided.