From 6ef55eeb2363b46e99d8630121c8772b4193234a Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Sat, 10 Oct 2015 23:41:18 -0700 Subject: [PATCH 1/3] KAFKA-2632.v1 --- .../apache/kafka/clients/consumer/internals/Fetcher.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index 4608959f01434..f0de6ebf0e16d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -302,11 +302,6 @@ public Map>> fetchedRecords() { throwIfOffsetOutOfRange(); for (PartitionRecords part : this.records) { - if (!subscriptions.isFetchable(part.partition)) { - log.debug("Ignoring fetched records for {} since it is no longer fetchable", part.partition); - continue; - } - Long consumed = subscriptions.consumed(part.partition); if (consumed != null && part.fetchOffset == consumed) { List> records = drained.get(part.partition); @@ -446,7 +441,11 @@ private void handleFetchResponse(ClientResponse resp, FetchRequest request) { TopicPartition tp = entry.getKey(); FetchResponse.PartitionData partition = entry.getValue(); if (!subscriptions.assignedPartitions().contains(tp)) { + // this can happen when a rebalance happened while fetch is still in-flight log.debug("Ignoring fetched data for partition {} which is no longer assigned.", tp); + } else if (!subscriptions.isFetchable(tp)) { + // this can happen when user called pause while fetch is still in-flight + log.debug("Ignoring fetched records for partition {} since it is no longer fetchable", tp); } else if (partition.errorCode == Errors.NONE.code()) { long fetchOffset = request.fetchData().get(tp).offset; From 48ddf18e0d6fdabdcf312a40967bcad25b3fe427 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Mon, 12 Oct 2015 13:37:39 -0700 Subject: [PATCH 2/3] address Jason's comments --- .../clients/consumer/internals/Fetcher.java | 20 ++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index f0de6ebf0e16d..3041fb8f5ba48 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -302,6 +302,18 @@ public Map>> fetchedRecords() { throwIfOffsetOutOfRange(); for (PartitionRecords part : this.records) { + if (!subscriptions.isFetchable(part.partition)) { + // this can happen when a rebalance happened or a partition consumption paused + // before fetched records are returned to the consumer's poll call + log.debug("Not returning fetched records for partition {} since it is no longer fetchable", part.partition); + + // we also need to reset the fetch positions to pretend we did not fetch + // this partition in the previous request at all + subscriptions.fetched(part.partition, part.records.get(0).offset()); + + continue; + } + Long consumed = subscriptions.consumed(part.partition); if (consumed != null && part.fetchOffset == consumed) { List> records = drained.get(part.partition); @@ -440,11 +452,9 @@ private void handleFetchResponse(ClientResponse resp, FetchRequest request) { for (Map.Entry entry : response.responseData().entrySet()) { TopicPartition tp = entry.getKey(); FetchResponse.PartitionData partition = entry.getValue(); - if (!subscriptions.assignedPartitions().contains(tp)) { - // this can happen when a rebalance happened while fetch is still in-flight - log.debug("Ignoring fetched data for partition {} which is no longer assigned.", tp); - } else if (!subscriptions.isFetchable(tp)) { - // this can happen when user called pause while fetch is still in-flight + if (!subscriptions.isFetchable(tp)) { + // this can happen when a rebalance happened or a partition consumption paused + // while fetch is still in-flight log.debug("Ignoring fetched records for partition {} since it is no longer fetchable", tp); } else if (partition.errorCode == Errors.NONE.code()) { long fetchOffset = request.fetchData().get(tp).offset; From 9fa10344861559e6075975b6e83aa2f28423618b Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Mon, 12 Oct 2015 18:01:54 -0700 Subject: [PATCH 3/3] some further refactoring --- .../clients/consumer/internals/Fetcher.java | 23 +++++++++++-------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index 3041fb8f5ba48..4d68e74d517cf 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -302,20 +302,23 @@ public Map>> fetchedRecords() { throwIfOffsetOutOfRange(); for (PartitionRecords part : this.records) { + if (!subscriptions.isAssigned(part.partition)) { + // this can happen when a rebalance happened before fetched records are returned to the consumer's poll call + log.debug("Not returning fetched records for partition {} since it is no longer assigned", part.partition); + continue; + } + + // note that the consumed position should always be available + // as long as the partition is still assigned + long consumed = subscriptions.consumed(part.partition); if (!subscriptions.isFetchable(part.partition)) { - // this can happen when a rebalance happened or a partition consumption paused - // before fetched records are returned to the consumer's poll call - log.debug("Not returning fetched records for partition {} since it is no longer fetchable", part.partition); + // this can happen when a partition consumption paused before fetched records are returned to the consumer's poll call + log.debug("Not returning fetched records for assigned partition {} since it is no longer fetchable", part.partition); // we also need to reset the fetch positions to pretend we did not fetch // this partition in the previous request at all - subscriptions.fetched(part.partition, part.records.get(0).offset()); - - continue; - } - - Long consumed = subscriptions.consumed(part.partition); - if (consumed != null && part.fetchOffset == consumed) { + subscriptions.fetched(part.partition, consumed); + } else if (part.fetchOffset == consumed) { List> records = drained.get(part.partition); if (records == null) { records = part.records;