From eb9020e65ca09d0aedf78ea8c8cf1bc8f2e35926 Mon Sep 17 00:00:00 2001 From: "Colin P. Mccabe" Date: Mon, 12 Jun 2017 16:41:39 -0700 Subject: [PATCH 1/3] MINOR: NetworkClient#disconnect should not erase connection info NetworkClient#disconnect should not erase the connection information. This will allow exponential backoff to occur. --- .../src/main/java/org/apache/kafka/clients/NetworkClient.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index a0730ca3d4820..af96575ae7d13 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -249,7 +249,7 @@ public void disconnect(String nodeId) { true, null, null)); } } - connectionStates.remove(nodeId); + connectionStates.disconnected(nodeId, now); if (log.isDebugEnabled()) { log.debug("Manually disconnected from {}. Removed requests: {}.", nodeId, Utils.join(requestTypes, ", ")); From cb1b620a24d8257e185263b87b32fddc76745d81 Mon Sep 17 00:00:00 2001 From: "Colin P. Mccabe" Date: Tue, 13 Jun 2017 13:16:34 -0700 Subject: [PATCH 2/3] Add unit test --- .../kafka/clients/NetworkClientTest.java | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java index 0de76a1a6a04f..9fd3dd8470e22 100644 --- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java @@ -29,6 +29,8 @@ import org.apache.kafka.common.requests.ProduceRequest; import org.apache.kafka.common.requests.ResponseHeader; import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; import org.apache.kafka.test.DelayedReceive; import org.apache.kafka.test.MockSelector; import org.apache.kafka.test.TestUtils; @@ -264,7 +266,21 @@ public void testDisconnectDuringUserMetadataRequest() { assertEquals(1, responses.size()); assertTrue(responses.iterator().next().wasDisconnected()); } - + + @Test + public void testCallDisconnect() throws Exception { + awaitReady(client, node); + assertTrue("Expected NetworkClient to be ready to send to node " + node.idString(), + client.isReady(node, Time.SYSTEM.milliseconds())); + assertFalse("Did not expect connection to node " + node.idString() + " to be failed", + client.connectionFailed(node)); + client.disconnect(node.idString()); + assertFalse("Expected node " + node.idString() + " to be disconnected.", + client.isReady(node, Time.SYSTEM.milliseconds())); + assertTrue("Expected connection to node " + node.idString() + " to be failed after disconnect", + client.connectionFailed(node)); + } + private static class TestCallbackHandler implements RequestCompletionHandler { public boolean executed = false; public ClientResponse response; From 1c2d1244798fe68d1c1d2154cffc3e15c4905480 Mon Sep 17 00:00:00 2001 From: "Colin P. Mccabe" Date: Tue, 13 Jun 2017 16:08:20 -0700 Subject: [PATCH 3/3] fix checkstyle --- .../test/java/org/apache/kafka/clients/NetworkClientTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java index 9fd3dd8470e22..77960e1123d76 100644 --- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java @@ -30,7 +30,6 @@ import org.apache.kafka.common.requests.ResponseHeader; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; -import org.apache.kafka.common.utils.Utils; import org.apache.kafka.test.DelayedReceive; import org.apache.kafka.test.MockSelector; import org.apache.kafka.test.TestUtils;