Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-5394. KafkaAdminClient#timeoutCallsInFlight does not work as ex… #3250

Closed
wants to merge 5 commits into from

Conversation

cmccabe
Copy link
Contributor

@cmccabe cmccabe commented Jun 6, 2017

…pected

  • Rename KafkaClient#close to KafkaClient#forget to emphasize that it forgets the requests on a given connection.
  • Create KafkaClient#disconnect to tear down a connection and deliver disconnects to all the requests on it.
  • AdminClient.java: fix mismatched braces in JavaDoc.
  • Make the AdminClientConfig constructor visible for testing.
  • KafkaAdminClient: add TimeoutProcessorFactory to make the TimeoutProcessor swappable for testing.
  • Make TimeoutProcessor a static class rather than an inner class.

*
* @param nodeId The id of the node
*/
void close(String nodeId);
void forget(String nodeId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need this rename? NetworkClient.close() is consistent with Selector.close() that doesn't populate the disconnected list. I'd rather keep this as close() for now. We can reconsider later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

close implies cleaning up resources, which this does not. Similarly, I would expect AdminClient#close to either complete or abort any futures that are in progress, not just to forget about them. FileOutputStrem#close flushes any modifications to disk; it doesn't just drop them.

In the interests of time I will leave this as close and we can discuss it later.

*/
@Override
public void disconnect(String nodeId) {
selector.close(nodeId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you considered implementing Selector.disconnect that would call Selector.close(channel, true) instead of keeping the state in abortedSends?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the current approach is better because it's simpler. NC already needs to track responses that were triggered because of version issues, so we're just using that same queue. We can revisit this later, of course.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The advantage of doing it via the Selector is that all disconnections are handled together. abortedSends are for sends that are not even attempted (and hence Selector doesn't know about them). I agree with you that this solution is lower risk and simpler though, so let's go with this PR and consider the Selector option in a follow-up.

@asfbot
Copy link

asfbot commented Jun 6, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/4968/
Test FAILed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Jun 6, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/4984/
Test FAILed (JDK 7 and Scala 2.11).

@asfbot
Copy link

asfbot commented Jun 7, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/5003/
Test FAILed (JDK 7 and Scala 2.11).

@asfbot
Copy link

asfbot commented Jun 7, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/4987/
Test FAILed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Jun 7, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/5009/
Test FAILed (JDK 7 and Scala 2.11).

@asfbot
Copy link

asfbot commented Jun 7, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/4993/
Test FAILed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Jun 7, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/5014/
Test FAILed (JDK 7 and Scala 2.11).

@asfbot
Copy link

asfbot commented Jun 7, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/4998/
Test FAILed (JDK 8 and Scala 2.12).

*
* @param nodeId The id of the node
*/
void disconnect(String nodeId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This approach seem fine, but I am wondering if it's truly needed for KafkaAdmin. In KafkaAdminClient, we are already maintaining callsInFlight. So, if we explicit close a connection, we know all the inflight calls on that connection even if networkClient.poll() doesn't tell us, and can therefore handle them accordingly. This is how producer handles timeout as well. The slight downside of this approach is that now there are two different ways of closing a connection and the caller has to figure out which one to use. It's probably simpler to have just one way of closing a connection and be consistent whether it's the caller's responsibility to deal with inflight requests or not.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@junrao One difference is that the producer only handles time outs before it sends a request. The reason why the AdminClient has to do a little more is that it has per request timeouts. I think the right solution is to add per request timeouts to NetworkClient, but that was a bigger change and we postponed that for a subsequent release.

Having said that, it's a good question if we could just invoke Call.fail by retrieving it from callsInFlight. I had a look at the code and it's unclear if it makes things simpler. It certainly keeps NetworkClient simpler, but AdminClient becomes a little more complex. Do you think it's worth trying to do this now? One option is to go with what's in the PR and then implement per request timeouts in NetworkClient.

What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now AdminClient has the invariant that if it submits a request to NetworkClient, that request will eventually be returned from NetworkClient#poll... possibly with an error status, possibly after a timeout, but in any case returned. If we have cases where that doesn't happen, the code becomes a lot harder to reason about. There are certainly other ways to structure things, but I think they are more complex and error-prone.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, as @ijuma said, when we implement per-request timeouts in NetworkClient, AdminClient won't need to call disconnect any more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I agree that we can commit the patch and clean things up in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. I pushed a new version that fixes checkstyle.

Copy link
Contributor

@ijuma ijuma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates. I created a separate PR #3258 with checkstyle fixes to verify that the tests pass. Aside from that, LGTM. I will merge if the tests pass in #3258.

@@ -416,6 +418,15 @@ private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long
*/
@Override
public List<ClientResponse> poll(long timeout, long now) {
if (!abortedSends.isEmpty()) {
// If there are aborted sends because of unsupported version exceptions or disconnects,
// handle them immediately without waiting for Selector#poll.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the way, this is the issue I was seeing before when I was testing the version downgrade of metadata requests. It would wait in Selector.poll instead of returning immediately. Your other timeout changes made the issue go away, but I think this is fixing the root cause.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree

@@ -227,7 +227,39 @@ public boolean ready(Node node, long now) {
}

/**
* Diconnects the connection to a particular node, if there is one.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo, fixing before merging.

*/
@Override
public void disconnect(String nodeId) {
selector.close(nodeId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The advantage of doing it via the Selector is that all disconnections are handled together. abortedSends are for sends that are not even attempted (and hence Selector doesn't know about them). I agree with you that this solution is lower risk and simpler though, so let's go with this PR and consider the Selector option in a follow-up.

@cmccabe
Copy link
Contributor Author

cmccabe commented Jun 7, 2017

Filed https://issues.apache.org/jira/browse/KAFKA-5400 for the per-request timeouts

asfgit pushed a commit that referenced this pull request Jun 7, 2017
KAFKA-5394; Fix disconnections due to timeouts in AdminClient

* Create KafkaClient#disconnect to tear down a connection and
deliver disconnects to all the requests on it.
* AdminClient.java: fix mismatched braces in JavaDoc.
* Make the AdminClientConfig constructor visible for testing.
* KafkaAdminClient: add TimeoutProcessorFactory to make the
TimeoutProcessor swappable for testing.
* Make TimeoutProcessor a static class rather than an inner
class.

Author: Colin P. Mccabe <cmccabe@confluent.io>

Reviewers: Jun Rao <junrao@gmail.com>, Ismael Juma <ismael@juma.me.uk>

Closes #3250 from cmccabe/KAFKA-5394

(cherry picked from commit 38ae746)
Signed-off-by: Ismael Juma <ismael@juma.me.uk>
@asfgit asfgit closed this in 38ae746 Jun 7, 2017
@asfbot
Copy link

asfbot commented Jun 7, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/5033/
Test PASSed (JDK 7 and Scala 2.11).

@asfbot
Copy link

asfbot commented Jun 7, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/5017/
Test FAILed (JDK 8 and Scala 2.12).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants