From 8ff38fee7c194901cdcf168dd41eb969c6aaca47 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Fri, 4 Sep 2015 15:56:08 +0100 Subject: [PATCH] NetworkClient.close should remove node from inFlightRequests --- .../apache/kafka/clients/NetworkClient.java | 1 + .../kafka/clients/NetworkClientTest.java | 19 +++++++++++++++++++ 2 files changed, 20 insertions(+) diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index 0a6f9522239d7..049b22eadd549 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -152,6 +152,7 @@ public boolean ready(Node node, long now) { @Override public void close(String nodeId) { selector.close(nodeId); + inFlightRequests.clearAll(nodeId); connectionStates.remove(nodeId); } diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java index ce6328a3a9e11..69c93c3adf674 100644 --- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java @@ -94,6 +94,25 @@ public void testSimpleRequestResponseWithStaticNodes() { checkSimpleRequestResponse(clientWithStaticNodes); } + @Test + public void testClose() { + client.ready(node, time.milliseconds()); + awaitReady(client, node); + client.poll(1, time.milliseconds()); + assertTrue("The client should be ready", client.isReady(node, time.milliseconds())); + + ProduceRequest produceRequest = new ProduceRequest((short) 1, 1000, Collections.emptyMap()); + RequestHeader reqHeader = client.nextRequestHeader(ApiKeys.PRODUCE); + RequestSend send = new RequestSend(node.idString(), reqHeader, produceRequest.toStruct()); + ClientRequest request = new ClientRequest(time.milliseconds(), true, send, null); + client.send(request); + assertEquals("There should be 1 in-flight request after send", 1, client.inFlightRequestCount(node.idString())); + + client.close(node.idString()); + assertEquals("There should be no in-flight request after close", 0, client.inFlightRequestCount(node.idString())); + assertFalse("Connection should not be ready after close", client.isReady(node, 0)); + } + private void checkSimpleRequestResponse(NetworkClient networkClient) { ProduceRequest produceRequest = new ProduceRequest((short) 1, 1000, Collections.emptyMap()); RequestHeader reqHeader = networkClient.nextRequestHeader(ApiKeys.PRODUCE);