Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KAFKA-5417] Clients get inconsistent connection states when SASL/SSL… #3282

Closed
wants to merge 1 commit into from

Conversation

dongeforever
Copy link
Member

… connection is marked CONECTED and DISCONNECTED at the same time

details are in:
https://issues.apache.org/jira/browse/KAFKA-5417

@dongeforever
Copy link
Member Author

@guozhangwang please have a check at your convinence

@ijuma
Copy link
Contributor

ijuma commented Jun 9, 2017

cc @rajinisivaram @junrao

@asfbot
Copy link

asfbot commented Jun 9, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/5104/
Test PASSed (JDK 7 and Scala 2.11).

@asfbot
Copy link

asfbot commented Jun 9, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/5088/
Test PASSed (JDK 8 and Scala 2.12).

@junrao
Copy link
Contributor

junrao commented Jun 9, 2017

@dongeforever : Thanks for the finding. Great catch. The patch looks good. Perhaps we could add a comment in ConnectionState about state transition (e.g., we can transition from any of CONNECTING, CHECKING_API_VERSIONS, READY directly to DISCONNECTED).

Iterator<String> connectedIt = connected.iterator();
while (connectedIt.hasNext()) {
if (disconnected.containsKey(connectedIt.next())) {
log.warn("Channel {} is marked connected and disconnected at the same time");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this warning indicate? Should users be doing something about it? If not, maybe it should be logged at a lower level?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. It is an internal class though. Users need to do nothing. I will update it.

while (connectedIt.hasNext()) {
if (disconnected.containsKey(connectedIt.next())) {
log.warn("Channel {} is marked connected and disconnected at the same time");
connectedIt.remove();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this channel was added to the connected set only in this poll(), no one knows about this channel yet. So perhaps we should remove it from disconnected as well to avoid notifying disconnection of an unknown channel?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was debating the same thing. Won't NetworkClient keep the node under the CONNECTING state though? It seems like either approach involves a change in the contract that could affect users who are not expecting it. It's an internal class though, so we just need to make sure that the affected Kafka code is updated (if necessary).

It would be nice to include a test for this so that we can verify that things truly work under this scenario.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ijuma Yes, you are right, NetworkClient does need to be notified. Ignore my previous comment.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ijuma I want to add a test, but unfortunately it is hard to mock such a network environment.
I have tested it many times in my company's LVS Proxy env.
Do you have some suggestions about it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could keep it simple and add a test for Selector.pollSelectedKeys where SelectionKey and the channel returned by the SelectionKey are mocked. What do you think?

@asfbot
Copy link

asfbot commented Jun 13, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/5218/
Test PASSed (JDK 7 and Scala 2.11).

@asfbot
Copy link

asfbot commented Jun 13, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/5202/
Test PASSed (JDK 8 and Scala 2.12).

Iterator<String> connectedIt = connected.iterator();
while (connectedIt.hasNext()) {
if (disconnected.containsKey(connectedIt.next())) {
log.debug("Channel {} is marked connected and disconnected at the same time");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The channel is not being passed to log.debug.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ijuma Thanks. It has been polished

@asfbot
Copy link

asfbot commented Jun 14, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/5284/
Test PASSed (JDK 7 and Scala 2.11).

@asfbot
Copy link

asfbot commented Jun 14, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/5268/
Test FAILed (JDK 8 and Scala 2.12).

@@ -335,6 +335,15 @@ public void poll(long timeout) throws IOException {
// we use the time at the end of select to ensure that we don't close any connections that
// have just been processed in pollSelectionKeys
maybeCloseOldestConnection(endSelect);

Iterator<String> connectedIt = connected.iterator();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be a little cleaner to avoid the inconsistency in the first place rather than fixing it after the fact. Have you considered calling connected.remove(channel) when we add the channel to disconnected in doClose?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dongeforever, what do you think?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hachikuji @ijuma Agreed to avoid inconsistency in the first place. And it may be a little better to do it in close rather than in doClose, for doClose maybe a delayed operation and close is the first place to notify the ConnectionStates.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ijuma @hachikuji what do you think about the updated version?

@asfgit
Copy link

asfgit commented Jun 22, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/5603/
Test FAILed (JDK 7 and Scala 2.11).

@asfgit
Copy link

asfgit commented Jun 22, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/5589/
Test PASSed (JDK 8 and Scala 2.12).

@asfgit
Copy link

asfgit commented Jun 26, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/5695/
Test PASSed (JDK 7 and Scala 2.11).

@asfgit
Copy link

asfgit commented Jun 26, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/5681/
Test FAILed (JDK 8 and Scala 2.12).

@@ -555,6 +555,9 @@ private void close(KafkaChannel channel, boolean processOutstanding) {

channel.disconnect();

//avoid inconsistent connection states, see KAFKA-5417
connected.remove(channel.id());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was debating whether doClose is the more appropriate spot for this since that is where we actually close the channel and add the id to the disconnected collection. We have this "closing" state below in which we await pending receives before closing the channel. I am not sure if it is better to allow a connection in that state to be reported as connected or not. It probably doesn't matter too much for the specific bug reported since we wouldn't have any pending receives, but I guess we should still make sure this the state transitions are consistent. Thoughts?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, there is not too much difference to do it in "doClose" and "close". Maybe both are ok.
Any channel in "connected" will be reported as connected.
Since it is going to close such channel, it is better to prevent it to be marked as connected than close it after it has been actually marked.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I can't think of a strong reason for either option, so I guess we can leave it here.

@hachikuji
Copy link
Contributor

@dongeforever Don't forget about Jun's comment from above:

Perhaps we could add a comment in ConnectionState about state transition (e.g., we can transition from any of CONNECTING, CHECKING_API_VERSIONS, READY directly to DISCONNECTED).

We also should try to come up with a reasonable test case. Ismael suggested previously:

Maybe we could keep it simple and add a test for Selector.pollSelectedKeys where SelectionKey and the channel returned by the SelectionKey are mocked.

Would that work?

@dongeforever
Copy link
Member Author

@hachikuji It seems that Kafka dose not introduce any mock framework such as mockito.
So without mock framework, could you give me any advice to write such a unit test?

@rajinisivaram
Copy link
Contributor

@dongeforever You can use easymock/powermock for the unit tests. There are already tests in clients which use these.

@ijuma
Copy link
Contributor

ijuma commented Jul 18, 2017

@dongeforever, do you think you'll be able to provide a test? If not, we can help.

@dongeforever
Copy link
Member Author

@ijuma Thank you. A little busy these days. But I want to try it by myself firstly. And I will go back for your help if there is trouble.

@ijuma
Copy link
Contributor

ijuma commented Aug 17, 2017

@dongeforever any luck writing the test? We are planning to do a 0.11.0.1 release soon and it would be good to include this fix.

@asfgit
Copy link

asfgit commented Aug 17, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/6841/
Test FAILed (JDK 7 and Scala 2.11).

@asfgit
Copy link

asfgit commented Aug 17, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/6827/
Test PASSed (JDK 8 and Scala 2.12).

… connection is marked CONECTED and DISCONNECTED at the same time
@dongeforever
Copy link
Member Author

@ijuma Sorry for being so late. I have added a unit test, named as "testAvoidInconsisConnectionStates" in SelectorTest, for this issue.

IMO, the easiest way is to test the private method "pollSelectionKeys" with a mocked SelectionKey and mocked KafkaChannel.
You could get more details in that unit test.
Please feel free to reach out with any question.

Copy link
Contributor

@ijuma ijuma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the test. I've made some minor changes (avoid reflection, tweak comments, remove some unnecessary code) and merged to trunk and 0.11.0.

@asfgit asfgit closed this in ea21752 Aug 23, 2017
asfgit pushed a commit that referenced this pull request Aug 23, 2017
If prepare throws an exception in the same poll when the connection
is established, the channel id should be in `disconnected`, but
not in `connected`.

Author: dongeforever <dongeforever@apache.org>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes #3282 from dongeforever/KAFKA-5417

(cherry picked from commit ea21752)
Signed-off-by: Ismael Juma <ismael@juma.me.uk>
@ijuma
Copy link
Contributor

ijuma commented Aug 23, 2017

Sorry, I made a mistake during merging and didn't mention all the reviewers (Jun, Jason, Rajini).

@dongeforever
Copy link
Member Author

@ijuma Thanks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
7 participants