From da5805e382788227804ca655f556b80a1145d4f5 Mon Sep 17 00:00:00 2001 From: Kamal C Date: Fri, 30 Jun 2017 17:51:25 +0530 Subject: [PATCH 1/4] MINOR: InFlightRequests#isEmpty(node) method corrected. - In clearAll method, get operation is removed. - variable name `requestTimeout` changed to `requestTimeoutMs` for clarity --- .../apache/kafka/clients/InFlightRequests.java | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java b/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java index 642f0284acaa7..f9773297dbba3 100644 --- a/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java +++ b/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java @@ -110,7 +110,7 @@ public int count(String node) { */ public boolean isEmpty(String node) { Deque queue = requests.get(node); - return queue != null && !queue.isEmpty(); + return queue == null || queue.isEmpty(); } /** @@ -141,22 +141,18 @@ public boolean isEmpty() { * @return All the in-flight requests for that node that have been removed */ public Iterable clearAll(String node) { - Deque reqs = requests.get(node); - if (reqs == null) { - return Collections.emptyList(); - } else { - return requests.remove(node); - } + Deque reqs = requests.remove(node); + return (reqs == null) ? Collections.emptyList() : reqs; } /** * Returns a list of nodes with pending in-flight request, that need to be timed out * * @param now current time in milliseconds - * @param requestTimeout max time to wait for the request to be completed + * @param requestTimeoutMs max time to wait for the request to be completed * @return list of nodes */ - public List getNodesWithTimedOutRequests(long now, int requestTimeout) { + public List getNodesWithTimedOutRequests(long now, int requestTimeoutMs) { List nodeIds = new LinkedList<>(); for (Map.Entry> requestEntry : requests.entrySet()) { String nodeId = requestEntry.getKey(); @@ -165,11 +161,10 @@ public List getNodesWithTimedOutRequests(long now, int requestTimeout) { if (!deque.isEmpty()) { NetworkClient.InFlightRequest request = deque.peekLast(); long timeSinceSend = now - request.sendTimeMs; - if (timeSinceSend > requestTimeout) + if (timeSinceSend > requestTimeoutMs) nodeIds.add(nodeId); } } - return nodeIds; } From bb552c76944efa9e964481acdc50be8ec47728bb Mon Sep 17 00:00:00 2001 From: Kamal C Date: Fri, 30 Jun 2017 17:57:26 +0530 Subject: [PATCH 2/4] Used count() method to find whether a in-flight request is empty. --- .../main/java/org/apache/kafka/clients/InFlightRequests.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java b/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java index f9773297dbba3..ca803d8a2494c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java +++ b/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java @@ -109,8 +109,7 @@ public int count(String node) { * Return true if there is no in-flight request directed at the given node and false otherwise */ public boolean isEmpty(String node) { - Deque queue = requests.get(node); - return queue == null || queue.isEmpty(); + return count(node) == 0; } /** From 4f71405d1d0d8062526fb431f3db92981d00d5d4 Mon Sep 17 00:00:00 2001 From: Kamal C Date: Fri, 30 Jun 2017 18:05:55 +0530 Subject: [PATCH 3/4] Updated the hasInFlightRequests method. --- .../src/main/java/org/apache/kafka/clients/NetworkClient.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index 59c606f8fabd7..60b15980406e8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -482,7 +482,7 @@ public int inFlightRequestCount(String node) { @Override public boolean hasInFlightRequests(String node) { - return this.inFlightRequests.isEmpty(node); + return !this.inFlightRequests.isEmpty(node); } @Override From dae6ecd2650837be40679b68d73411f7b58c8aec Mon Sep 17 00:00:00 2001 From: Kamal C Date: Fri, 30 Jun 2017 18:11:59 +0530 Subject: [PATCH 4/4] Revert "Used count() method to find whether a in-flight request is empty." This reverts commit bb552c76944efa9e964481acdc50be8ec47728bb. --- .../main/java/org/apache/kafka/clients/InFlightRequests.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java b/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java index ca803d8a2494c..f9773297dbba3 100644 --- a/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java +++ b/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java @@ -109,7 +109,8 @@ public int count(String node) { * Return true if there is no in-flight request directed at the given node and false otherwise */ public boolean isEmpty(String node) { - return count(node) == 0; + Deque queue = requests.get(node); + return queue == null || queue.isEmpty(); } /**