From ccba1a4ee7626230d9a6aed1e966d54d6e509c26 Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Fri, 14 Oct 2016 13:29:06 -0700 Subject: [PATCH 1/2] KAFKA-4303: Ensure commitSync does not block indefinitely in poll without in-flight requests --- .../kafka/clients/InFlightRequests.java | 4 +-- .../internals/ConsumerNetworkClient.java | 8 ++++-- .../internals/ConsumerNetworkClientTest.java | 27 ++++++++++++++++++- 3 files changed, 34 insertions(+), 5 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java b/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java index 8de19ee4f11f5..c7b3bf9f922cc 100644 --- a/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java +++ b/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java @@ -26,7 +26,7 @@ final class InFlightRequests { private final int maxInFlightRequestsPerConnection; - private final Map> requests = new HashMap>(); + private final Map> requests = new HashMap<>(); public InFlightRequests(int maxInFlightRequestsPerConnection) { this.maxInFlightRequestsPerConnection = maxInFlightRequestsPerConnection; @@ -133,7 +133,7 @@ public Iterable clearAll(String node) { * @return list of nodes */ public List getNodesWithTimedOutRequests(long now, int requestTimeout) { - List nodeIds = new LinkedList(); + List nodeIds = new LinkedList<>(); for (String nodeId : requests.keySet()) { if (inFlightRequestCount(nodeId) > 0) { ClientRequest request = requests.get(nodeId).peekLast(); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java index 21fe0b8adc1a0..2495b23db4606 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java @@ -47,6 +47,7 @@ */ public class ConsumerNetworkClient implements Closeable { private static final Logger log = LoggerFactory.getLogger(ConsumerNetworkClient.class); + private static final long MAX_POLL_TIMEOUT_MS = 5000L; // the mutable state of this class is protected by the object's monitor (excluding the wakeup // flag and the request completion queue below). @@ -176,7 +177,7 @@ public void wakeup() { */ public void poll(RequestFuture future) { while (!future.isDone()) - poll(Long.MAX_VALUE, time.milliseconds(), future); + poll(MAX_POLL_TIMEOUT_MS, time.milliseconds(), future); } /** @@ -225,7 +226,10 @@ public void poll(long timeout, long now, PollCondition pollCondition) { // condition becomes satisfied after the call to shouldBlock() (because of a fired completion // handler), the client will be woken up. if (pollCondition == null || pollCondition.shouldBlock()) { - client.poll(timeout, now); + // if there are no requests in flight, do not block longer than the retry backoff + if (client.inFlightRequestCount() == 0) + timeout = Math.min(timeout, retryBackoffMs); + client.poll(Math.min(MAX_POLL_TIMEOUT_MS, timeout), now); now = time.milliseconds(); } else { client.poll(0, now); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java index f90cd636926dd..f8ad3caaaf11b 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java @@ -100,10 +100,35 @@ public boolean shouldBlock() { @Test public void blockWhenPollConditionNotSatisfied() { + long timeout = 4000L; + NetworkClient mockNetworkClient = EasyMock.mock(NetworkClient.class); ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(mockNetworkClient, metadata, time, 100, 1000); - EasyMock.expect(mockNetworkClient.poll(EasyMock.eq(Long.MAX_VALUE), EasyMock.anyLong())).andReturn(Collections.emptyList()); + EasyMock.expect(mockNetworkClient.inFlightRequestCount()).andReturn(1); + EasyMock.expect(mockNetworkClient.poll(EasyMock.eq(timeout), EasyMock.anyLong())).andReturn(Collections.emptyList()); + + EasyMock.replay(mockNetworkClient); + + consumerClient.poll(timeout, time.milliseconds(), new ConsumerNetworkClient.PollCondition() { + @Override + public boolean shouldBlock() { + return true; + } + }); + + EasyMock.verify(mockNetworkClient); + } + + @Test + public void blockOnlyForRetryBackoffIfNoInflightRequests() { + long retryBackoffMs = 100L; + + NetworkClient mockNetworkClient = EasyMock.mock(NetworkClient.class); + ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(mockNetworkClient, metadata, time, retryBackoffMs, 1000L); + + EasyMock.expect(mockNetworkClient.inFlightRequestCount()).andReturn(0); + EasyMock.expect(mockNetworkClient.poll(EasyMock.eq(retryBackoffMs), EasyMock.anyLong())).andReturn(Collections.emptyList()); EasyMock.replay(mockNetworkClient); From cbf3e113e834549adff30234e841b29d48fed952 Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Fri, 14 Oct 2016 14:23:40 -0700 Subject: [PATCH 2/2] We no longer need to track pending fetches --- .../apache/kafka/clients/InFlightRequests.java | 12 +++++++----- .../kafka/clients/consumer/KafkaConsumer.java | 8 +------- .../clients/consumer/internals/Fetcher.java | 16 +--------------- .../clients/consumer/internals/FetcherTest.java | 4 ---- 4 files changed, 9 insertions(+), 31 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java b/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java index c7b3bf9f922cc..91b9dba72e5e7 100644 --- a/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java +++ b/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java @@ -134,13 +134,15 @@ public Iterable clearAll(String node) { */ public List getNodesWithTimedOutRequests(long now, int requestTimeout) { List nodeIds = new LinkedList<>(); - for (String nodeId : requests.keySet()) { - if (inFlightRequestCount(nodeId) > 0) { - ClientRequest request = requests.get(nodeId).peekLast(); + for (Map.Entry> requestEntry : requests.entrySet()) { + String nodeId = requestEntry.getKey(); + Deque deque = requestEntry.getValue(); + + if (!deque.isEmpty()) { + ClientRequest request = deque.peekLast(); long timeSinceSend = now - request.sendTimeMs(); - if (timeSinceSend > requestTimeout) { + if (timeSinceSend > requestTimeout) nodeIds.add(nodeId); - } } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index b2b4bf0d4e47b..b384211e3d8d1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -1025,12 +1025,6 @@ private Map>> pollOnce(long timeout) { // send any new fetches (won't resend pending fetches) fetcher.sendFetches(); - // if no fetches could be sent at the moment (which can happen if a partition leader is in the - // blackout period following a disconnect, or if the partition leader is unknown), then we don't - // block for longer than the retry backoff duration. - if (!fetcher.hasInFlightFetches()) - timeout = Math.min(timeout, retryBackoffMs); - long now = time.milliseconds(); long pollTimeout = Math.min(coordinator.timeToNextPoll(now), timeout); @@ -1039,7 +1033,7 @@ private Map>> pollOnce(long timeout) { public boolean shouldBlock() { // since a fetch might be completed by the background thread, we need this poll condition // to ensure that we do not block unnecessarily in poll() - return !fetcher.hasCompletedFetches() && fetcher.hasInFlightFetches(); + return !fetcher.hasCompletedFetches(); } }); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index 9e9ae925eaa72..bfc1a0b01f2d8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -17,6 +17,7 @@ import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.NoOffsetForPartitionException; +import org.apache.kafka.clients.consumer.OffsetAndTimestamp; import org.apache.kafka.clients.consumer.OffsetOutOfRangeException; import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.common.Cluster; @@ -41,7 +42,6 @@ import org.apache.kafka.common.record.InvalidRecordException; import org.apache.kafka.common.record.LogEntry; import org.apache.kafka.common.record.MemoryRecords; -import org.apache.kafka.clients.consumer.OffsetAndTimestamp; import org.apache.kafka.common.record.Record; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.requests.FetchRequest; @@ -91,7 +91,6 @@ public class Fetcher { private final FetchManagerMetrics sensors; private final SubscriptionState subscriptions; private final ConcurrentLinkedQueue completedFetches; - private final AtomicInteger numInFlightFetches = new AtomicInteger(0); private final Deserializer keyDeserializer; private final Deserializer valueDeserializer; @@ -137,15 +136,6 @@ public boolean hasCompletedFetches() { return !completedFetches.isEmpty(); } - /** - * Check whether there are in-flight fetches. This is used to avoid unnecessary blocking in - * {@link ConsumerNetworkClient#poll(long)} if there are no fetches to wait for. This method is thread-safe. - * @return true if there are, false otherwise - */ - public boolean hasInFlightFetches() { - return numInFlightFetches.get() > 0; - } - private boolean matchesRequestedPartitions(FetchRequest request, FetchResponse response) { Set requestedPartitions = request.fetchData().keySet(); Set fetchedPartitions = response.responseData().keySet(); @@ -161,13 +151,10 @@ public void sendFetches() { final FetchRequest request = fetchEntry.getValue(); final Node fetchTarget = fetchEntry.getKey(); - numInFlightFetches.incrementAndGet(); client.send(fetchTarget, ApiKeys.FETCH, request) .addListener(new RequestFutureListener() { @Override public void onSuccess(ClientResponse resp) { - numInFlightFetches.decrementAndGet(); - FetchResponse response = new FetchResponse(resp.responseBody()); if (!matchesRequestedPartitions(request, response)) { // obviously we expect the broker to always send us valid responses, so this check @@ -194,7 +181,6 @@ public void onSuccess(ClientResponse resp) { @Override public void onFailure(RuntimeException e) { - numInFlightFetches.decrementAndGet(); log.debug("Fetch request to {} failed", fetchTarget, e); } }); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index faf6efa5d4773..5822646dd8989 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -128,13 +128,11 @@ public void testFetchNormal() { // normal fetch fetcher.sendFetches(); - assertTrue(fetcher.hasInFlightFetches()); assertFalse(fetcher.hasCompletedFetches()); client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 0)); consumerClient.poll(0); assertTrue(fetcher.hasCompletedFetches()); - assertFalse(fetcher.hasInFlightFetches()); Map>> partitionRecords = fetcher.fetchedRecords(); assertTrue(partitionRecords.containsKey(tp)); @@ -155,13 +153,11 @@ public void testFetchError() { subscriptions.seek(tp, 0); fetcher.sendFetches(); - assertTrue(fetcher.hasInFlightFetches()); assertFalse(fetcher.hasCompletedFetches()); client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NOT_LEADER_FOR_PARTITION.code(), 100L, 0)); consumerClient.poll(0); assertTrue(fetcher.hasCompletedFetches()); - assertFalse(fetcher.hasInFlightFetches()); Map>> partitionRecords = fetcher.fetchedRecords(); assertFalse(partitionRecords.containsKey(tp));