Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-4841; NetworkClient should only consider a connection to have failed after attempt to connect #2641

Closed
wants to merge 6 commits into from

Conversation

lindong28
Copy link
Member

No description provided.

@lindong28
Copy link
Member Author

lindong28 commented Mar 5, 2017

KAFKA-4820 allows new request to be enqueued to unsent by user thread while some other thread does poll(...). This causes problem in the following scenario:

  • Thread A calls poll(...) and is blocked on select(...)
  • Thread B enqueues a request into unsent of ConsumerNetworkClient for node N
  • Thread A calls checkDisconnects(now) -> client.connectionFailed(N)
  • Because no attempts have been made to connection to node N yet, there is no state for node N and connectionFailed(N) would throw exception.

The solution is to only consider a connection has failed if attempts have been made to connect to this node AND the connection state is DISCONNECTED. Note that this problem only occurs when one thread is able to enqueue requests while another thread is in the process of poll(...)

@ijuma @becketqin Can you review this patch?

@lindong28 lindong28 changed the title KAFKA-4586 followup; Fix testMaxPollIntervalMsDelayInRevocation test failure KAFKA-4841 followup; NetworkClient should only consider a connection to be fail after attempt to connect Mar 5, 2017
@lindong28 lindong28 changed the title KAFKA-4841 followup; NetworkClient should only consider a connection to be fail after attempt to connect KAFKA-4841; NetworkClient should only consider a connection to be fail after attempt to connect Mar 5, 2017
@lindong28 lindong28 changed the title KAFKA-4841; NetworkClient should only consider a connection to be fail after attempt to connect KAFKA-4841; NetworkClient should only consider a connection to have failed after attempt to connect Mar 5, 2017
@asfbot
Copy link

asfbot commented Mar 5, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/2011/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Mar 5, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/2008/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Mar 5, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/2009/
Test FAILed (JDK 8 and Scala 2.12).

* @param id node id
* @return true if we have attempted to connect to the node
*/
public boolean connectionAttempted(String id) {
Copy link
Contributor

@becketqin becketqin Mar 6, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is a little weird. In isConnected() and isReady() we explicitly verify state != null. Maybe it is more consistent to just add a isDisconnected() method in the ClusterConnectionState and let NetworkClient.connectionFailed() just call that method.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I think you suggestion is indeed more consistent with current methods in ClusterConnectionState. I have updated the patch accordingly.

@asfbot
Copy link

asfbot commented Mar 6, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/2020/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Mar 6, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/2023/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Mar 6, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/2021/
Test FAILed (JDK 8 and Scala 2.12).

Copy link
Contributor

@ijuma ijuma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. I left one comment.

@@ -229,7 +229,7 @@ public long connectionDelay(Node node, long now) {
*/
@Override
public boolean connectionFailed(Node node) {
return connectionStates.connectionState(node.idString()).equals(ConnectionState.DISCONNECTED);
return connectionStates.isDisconnected(node.idString());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about this. It's weird to say that a connection that has never been attempted has failed. This kind of situation only arises due to specifics of how ConsumerNetworkClient is implemented, so we should perhaps handle it there. cc @hachikuji

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree it's weird. An alternative in ConsumerNetworkClient is to keep a set of the nodes that are currently connected and just use that when checking failed connections. That would also eliminate the annoying copy in UnsentRequests.nodes().

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just want to understand the weirdness here. Is it because isDisconnected could mean either "used to be connected" or "never connected"? In our current context it means the former one. I am not sure if we want to keep a separate set of supposedly-connected, which introduces another state to maintain.

We can add a new state (something like NO_STATE) to ConnectionState to indicate "never connected" and let ConsumerNetworkClient to use it if that would help disambiguate.

Copy link
Member Author

@lindong28 lindong28 Mar 7, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hachikuji @ijuma I am concerned that maintaining a separate set of nodes only make the code more complex and hard to maintain. Ideally we want to maintain this information in only one state (i.e. unsent) instead of duplicating it in two states.

Maybe we can just define ConnectionState.DISCONNECTED to indicate "disconnected after connection" as suggested by Becket. If this doesn't work for you, then we can still define ConnectionState.DISCONNECTED to indicate both. And I can also revert to the first version by adding ClusterConnectionStates.connectionAttempted(...). I think either solution is better than maintaining a separate set of nodes that duplicate the information in unsent. What do you think?

Copy link
Contributor

@ijuma ijuma Mar 7, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To clarify: the issue is the connectionFailed method, changing the meaning of that method to include have not attempted a connection is surprising and likely to lead to bugs in the future.

Another option is to use the result of the poll method. Each ClientResponse has a disconnected boolean and we could iterate over the responses instead of the unsent nodes. Is there any reason why this would not work?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ijuma I'm not sure, but I think that might not be sufficient. The problem is that we'd only see disconnects for connections with in-flight requests. However, we need to see all disconnects in order to be able to clear out the unsent requests.

Maybe we just need to expose the disconnected nodes directly from NetworkClient?

List<Node> disconnected();

Copy link
Member Author

@lindong28 lindong28 Mar 7, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ijuma You mentioned in your first comment that "It's weird to say that a connection that has never been attempted has failed", which seems to suggest that a connection that has never been attempts should NOT be considered as failed. I agree with you on this. The current patch actually enforces this understanding for connectionFailed(...), right?

Thus I think our concern is whether we should consider a connection as disconnected (i.e. connectionStates.isDisconnected returns True) or not when a connection has not even been attempted yet. Here is my thought:

  • I prefer not to consider a connection as disconnected if no attempt has been made to connect to the node via e.g. trySend(). In this case, we can just use the existing patch.
  • If we are not OK with the above definition, then we need a separate method ClusterConnectionStates.connectionAttempted(...) to distinguish between states connection not attempt and connection attempted and failed. In this case, I will revert to use the first commit of this patch.
  • As Jason mentioned, using results from poll(...) may not be sufficient.

@hachikuji Following your suggestion of adding List<Node> disconnected(), does this method return nodes which we haven't even attempted to connect to? If no, then we can probably just add isDisconnected(...) to be consistent with existing methods isReady(...) and isConnecting(...). If yes, then we still need a separate method such as connectionAttempted(...) to distinguish between states connection not attempt and connection attempted and failed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, maybe we can leave as is. @lindong28, can you address the other comment? Let's get this merged and we can do other improvements later.

@asfbot
Copy link

asfbot commented Mar 6, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/2028/
Test FAILed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Mar 6, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/2031/
Test FAILed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Mar 6, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/2029/
Test FAILed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Mar 6, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/2030/
Test FAILed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Mar 6, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/2032/
Test FAILed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Mar 6, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/2029/
Test FAILed (JDK 7 and Scala 2.10).

public synchronized List<ClientRequest> removeExpiredRequests(long now, long unsentExpiryMs) {
List<ClientRequest> expiredRequests = new ArrayList<>();
Iterator<Map.Entry<Node, List<ClientRequest>>> iterator = unsent.entrySet().iterator();
public synchronized ConcurrentLinkedQueue<ClientRequest> removeExpiredRequests(long now, long unsentExpiryMs) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this need to return a ConcurrentLinkedQueue? Wouldn't a list be sufficient?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, my bad. LinkedIn organized a trip to Tahao today and I want to address Ismael's comments before the bus leaves at 5:30 pm. I was in a hurry to address all the comments and simply replaced many lists to ConcurrentLinkedQueue.

It is fixed now.

return requests == null ? Collections.<ClientRequest>emptyList() : requests;
public synchronized ConcurrentLinkedQueue<ClientRequest> remove(Node node) {
ConcurrentLinkedQueue<ClientRequest> requests = unsent.remove(node);
return requests == null ? new ConcurrentLinkedQueue<ClientRequest>() : requests;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems there's no need for this method to declare ConcurrentLinkedQueue as the return type. You can use a normal Collection, which would then allow you to use Collections.emptyList() instead of pointlessly creating an empty queue.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Fixed now.


public synchronized Iterator<ClientRequest> requestIterator(Node node) {
ConcurrentLinkedQueue<ClientRequest> requests = unsent.get(node);
return requests == null ? null : requests.iterator();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't it seem a bit inconsistent that we return null here, but we return empty in remove above? Maybe remove should also return null?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The benefits of having remove(...) return empty list instead of null is that failUnsentRequests(...) and checkDisconnects(...) can be simpler since they don't need to check if the result is null. I think it is fine since UnsentRequests is just an private class used only in ConsumerNetworkClient. I will check it to allow remove() to return null if you prefer that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hachikuji is suggesting that requestIterator should return an empty iterator instead of null to be consistent with the other methods.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ijuma I think @hachikuji is suggesting directly that "Maybe remove should also return null". I think it is good to return an empty iterator in requestIterator(...)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re-reading, he did say that, but I think his main point is that it would be good to be consistent. :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ijuma Sure. This makes sense:) I will update the patch after the issue around isDisconnected is resolved.

@@ -229,7 +229,7 @@ public long connectionDelay(Node node, long now) {
*/
@Override
public boolean connectionFailed(Node node) {
return connectionStates.connectionState(node.idString()).equals(ConnectionState.DISCONNECTED);
return connectionStates.isDisconnected(node.idString());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree it's weird. An alternative in ConsumerNetworkClient is to keep a set of the nodes that are currently connected and just use that when checking failed connections. That would also eliminate the annoying copy in UnsentRequests.nodes().

@asfbot
Copy link

asfbot commented Mar 7, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/2045/
Test FAILed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Mar 7, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/2047/
Test FAILed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Mar 7, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/2044/
Test FAILed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Mar 7, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/2045/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Mar 7, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/2048/
Test FAILed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Mar 7, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/2046/
Test FAILed (JDK 8 and Scala 2.12).

Copy link
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

for (Node node : unsent.nodes()) {
Iterator<ClientRequest> iterator = unsent.requestIterator(node);
if (iterator == null)
continue;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove this too, right?

@lindong28
Copy link
Member Author

@ijuma My bad. I should have removed that earlier. Problem fixed now.

Copy link
Contributor

@ijuma ijuma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, LGTM. I'll merge it to trunk once Jenkins gives the go ahead.

@asfbot
Copy link

asfbot commented Mar 8, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/2058/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Mar 8, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/2059/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Mar 8, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/2061/
Test PASSed (JDK 8 and Scala 2.11).

@asfgit asfgit closed this in c6bccdd Mar 8, 2017
@lindong28
Copy link
Member Author

lindong28 commented Mar 8, 2017

@hachikuji @ijuma @becketqin Thank you for review!

@lindong28 lindong28 deleted the KAFKA-4820-followup branch March 15, 2017 03:12
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants