Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-7548: KafkaConsumer should not throw away already fetched data for paused partitions (v2) #6988

Merged
merged 8 commits into from Aug 2, 2019

Conversation

@seglo
Copy link
Contributor

commented Jun 23, 2019

This is an updated implementation of #5844 by @MayureshGharat (with Mayuresh's permission).

Today when we call KafkaConsumer.poll(), it will fetch data from Kafka asynchronously and is put in to a local buffer (completedFetches).

If now we pause some TopicPartitions and call KafkaConsumer.poll(), we might throw away any buffered data that we might have in the local buffer for these TopicPartitions. Generally, if an application is calling pause on some TopicPartitions, it is likely to resume those TopicPartitions in near future, which would require KafkaConsumer to re-issue a fetch for the same data that it had buffered earlier for these TopicPartitions. This is a wasted effort from the application's point of view.

I've reviewed the original PR's feedback from @hachikuji and reimplemented this solution to add completed fetches that belong to paused partitions back to the queue. I also rebased against the latest trunk which caused more changes as a result of subscription event handlers being removed from the fetcher class.

You can find more details in my updated notes in the original Jira issue KAFKA-7548.

@seglo

This comment has been minimized.

Copy link
Contributor Author

commented Jun 28, 2019

@hachikuji @ijuma Do either of you have some time to review this PR?

@gwenshap gwenshap requested review from hachikuji and rhauch Jun 29, 2019

@gwenshap

This comment has been minimized.

Copy link
Contributor

commented Jun 29, 2019

@rhauch looks like it will be useful for Kafka Connect too?

@seglo

This comment has been minimized.

Copy link
Contributor Author

commented Jun 30, 2019

Thanks @gwenshap.

@hachikuji hachikuji self-assigned this Jul 3, 2019

@seglo

This comment has been minimized.

Copy link
Contributor Author

commented Jul 4, 2019

Retest this please

@seglo seglo force-pushed the seglo:seglo/KAFKA-7548 branch Jul 4, 2019

@seglo

This comment has been minimized.

Copy link
Contributor Author

commented Jul 11, 2019

Hi @hachikuji . I'm just putting this on your radar again for a review. Thanks for picking it up.

@seglo

This comment has been minimized.

Copy link
Contributor Author

commented Jul 23, 2019

Hi @gwenshap @hachikuji @rhauch, please let me know if you have any questions about the PR or how to reproduce the issue. Thanks.

@hachikuji
Copy link
Contributor

left a comment

Sorry for the delay. The approach makes sense. I had a couple ideas how to simplify it. Let me know what you think.

clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java Outdated
@@ -223,6 +227,11 @@ public boolean hasCompletedFetches() {
return !completedFetches.isEmpty();
}

// Visibilty for testing
protected boolean hasParsedFetchesCache() {

This comment has been minimized.

Copy link
@hachikuji

hachikuji Jul 26, 2019

Contributor

nit: could be package private

This comment has been minimized.

Copy link
@seglo

seglo Jul 28, 2019

Author Contributor

Removed.

clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java Outdated
@@ -577,33 +586,42 @@ private ListOffsetResult fetchOffsetsByTimes(Map<TopicPartition, Long> timestamp
*/
public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {
Map<TopicPartition, List<ConsumerRecord<K, V>>> fetched = new HashMap<>();
Queue<CompletedFetch> pausedCompletedFetches = new ArrayDeque<>();

This comment has been minimized.

Copy link
@hachikuji

hachikuji Jul 26, 2019

Contributor

I have a few suggestions to simplify this a little bit:

  1. The PartitionRecords parses records in a streaming fashion. The constructor doesn't do any parsing, so there's no harm constructing it immediately when we find a CompletedFetch for a paused partition. So rather than leaving the completed fetch inside completedFetches, it seems like we could keep them all in the other queue (maybe pausedCompletedFetches instead of parsedFetchesCache)? As a matter of fact, I think we can change the type of completedFetches to ConcurrentLinkedQueue<PartitionRecords>.

  2. Below we have logic to basically iterate through the parsedFetchesCache and move all the partitions which are still paused to the end. Would it be simpler to leave the collection in the same order and

  3. Once a partition is unpaused, does it make sense to just move it back to completedFetches? Then there is only a single collection that we are pulling the next records from.

So then the logic would be just to iterate through the paused data and check for unpaused partitions which can be moved over to completedFetches. Then everything is done as usual.

This comment has been minimized.

Copy link
@seglo

seglo Jul 26, 2019

Author Contributor

Thanks for the review @hachikuji ! I think it would be nice to have the single completedFetches of type ConcurrentLinkedQueue<PartitionRecords>. To do this we would have to change the fetch parsing workflow a little bit, but I'm not sure what ramifications this may have. I've made some assumptions below I hope you can confirm before I refactor the PR.

  • In the callback for sendFetches we would immediately call parseCompletedFetch to create a PartitionRecords and add it to the completedFetches, instead of calling parseCompletedFetch later in fetchedRecords. The javadoc comment on parseCompletedFetch method suggests that at one point in the past this may have been the case: "The callback for fetch completion". If we move the call here I don't think there's any reason to have a CompletedFetch type at all.

  • The callsite to parseCompletedFetch is wrapped in exception handling which discards the fetch in some scenarios when an exception is raised. If we move the callsite to sendFetches then I assume we would move the exception handling along with it. One issue is that in this exception handling we check to see if we've accumulated any parsed records to return to the user if(fetched.isEmpty()), but since we haven't parsed any records in the sendFetches callback I'm not sure how to translate this logic, or if it's still necessary.

    EDIT: I looked at this again and the comment says this check is just for the purpose of making sure data from a previous fetch, but that's not applicable when we move the callsite to sendFetches because no records have been parsed yet.

  • In parseCompletedFetch if the partition is unfetchable then no records are returned. Based on comments in this method this could be caused by a rebalance that happened while a fetch request was in flight, or when a partition is paused. We would need to change this so that the PartitionRecords are still created when a partition is paused, but not in other unfetchable scenarios.

This comment has been minimized.

Copy link
@seglo

seglo Jul 26, 2019

Author Contributor

I tried refactoring the PR with the assumptions I made above, but it began to reveal a lot of regression bugs when running FetcherTest. I think my idea about moving the parseCompletedFetch callsite to sendFetches seems to miss a lot of nuance accounted for in the tests.

This comment has been minimized.

Copy link
@seglo

seglo Jul 26, 2019

Author Contributor

I created a PR on my fork with the refactor: https://github.com/seglo/kafka/pull/3/files

This comment has been minimized.

Copy link
@hachikuji

hachikuji Jul 28, 2019

Contributor

@seglo Yeah, this is a little trickier than I suggested. The draft looks like it's heading in the right direction. One thought I had is to take the parseCompletedFetch logic and turn it into an initializePartitionRecords function. So when we first construct PartitionRecords, we consider it uninitialized. Then the logic in fetchedRecords could look something like this:

nextInLineRecords = completedFetches.peek();
if (nextInLineRecords.notInitialized()) {
  try {
    initializePartitionRecords(nextInLineRecords);
  } catch (Exception e) {
  }
}
// now check in nextInLineRecords has any data to fetch

The benefit is that the same logic is still executed from the same location. Would that help?

This comment has been minimized.

Copy link
@seglo

seglo Jul 28, 2019

Author Contributor

@hachikuji Yes, that suggestion helped, thanks. I no longer have any FetcherTest regression failures. I split up parseCompletedFetch into:

  • A parseCompletedFetch that always returns a PartitionRecords, but does no other validation. Called from sendFetches.
  • An initializePartitionRecords which does all the validation that parseCompletedFetch did before. Called from fetchedRecords where exceptions are handled.

completedFetches is now used not just as a queue, but as a cache. The queue usage semantics aren't really applicable anymore (we no longer peek or poll), but since we want the ordering I don't think there's a better datastructure in java.util.concurrent to use.

I've updated this PR.

This comment has been minimized.

Copy link
@seglo

seglo Jul 29, 2019

Author Contributor

I made an update to cache each paused completed fetch for the lifetime of a call to fetchRecords and then add them back at the end so that completedFetches.poll() can be used instead of removing by object reference completedFetches.remove(records). I'm not sure if this is any more efficient, but it preserves the original implementation of fetchedRecords better. The change is in this commit: 7cb7943 .

@seglo seglo force-pushed the seglo:seglo/KAFKA-7548 branch 3 times, most recently to 55f3fab Jul 26, 2019

@seglo

This comment has been minimized.

Copy link
Contributor Author

commented Jul 29, 2019

Retest this please

@seglo

This comment has been minimized.

Copy link
Contributor Author

commented Jul 30, 2019

It's interesting that all the tests ran with no failures, but the check stil failed.

@hachikuji

This comment has been minimized.

Copy link
Contributor

commented Jul 31, 2019

@seglo Thanks for the updates. I think there might be some redundant checking. What do you think about this? hachikuji@d46d000.

I think the only other thing is that it seems like we have a chance to consolidate CompletedFetch and PartitionRecords into a single class, but potentially we could leave that for a follow-up patch.

@seglo seglo force-pushed the seglo:seglo/KAFKA-7548 branch from 55f3fab to ded3f61 Jul 31, 2019

@seglo

This comment has been minimized.

Copy link
Contributor Author

commented Jul 31, 2019

@hachikuji Good find. I wish I had seen that! I included your commit.

I can follow up with another PR for consolidating CompletedFetch into PartitionRecords, along with changing some of the comments and variable names to match. Is that alright with you?

@hachikuji
Copy link
Contributor

left a comment

Thanks, I think this is just about ready. Just a couple more comments.

@@ -897,6 +897,145 @@ public void testFetchOnPausedPartition() {
assertTrue(client.requests().isEmpty());
}

@Test

This comment has been minimized.

Copy link
@hachikuji

hachikuji Aug 1, 2019

Contributor

Do we have a test case which covers the case where the user seeks to a new offset while a partition is paused with data available to return? In this case, we expect the data to be discarded when the partition is resumed.

This comment has been minimized.

Copy link
@seglo

seglo Aug 1, 2019

Author Contributor

I did a pass over FetcherTest and didn't see this scenario exactly. I added another test called testFetchDiscardedAfterPausedPartitionResumedAndSeekedToNewOffset (a bit of a mouthful). Does it create the scenario you were thinking?

@ijuma

This comment has been minimized.

Copy link
Contributor

commented Aug 1, 2019

This could be done as a separate PR, but would it be helpful to have a benchmark that exercises this use case? The perf improvement could be significant from the previous discussion and it would be nice to be able to quantify that.

@seglo

This comment has been minimized.

Copy link
Contributor Author

commented Aug 1, 2019

@ijuma I can take a crack at implementing a performance test.

Before I created the PR I created a small test app that I measured before and after a patch. You can find a reference to the test app and the Grafana dashboard with the comparison in my update of the original Jira issue KAFKA-7548.

@ijuma

This comment has been minimized.

Copy link
Contributor

commented Aug 1, 2019

That's great!

@hachikuji
Copy link
Contributor

left a comment

LGTM. Thanks for the contribution! I will merge after the build completes.

@seglo

This comment has been minimized.

Copy link
Contributor Author

commented Aug 1, 2019

Thanks for your review and advice, @hachikuji. It was fun working on this PR. It gave me a great opportunity to dig into the Consumer internals. I'll follow up with you soon on some of the loose ends.

@seglo

This comment has been minimized.

Copy link
Contributor Author

commented Aug 1, 2019

Retest this please

@hachikuji

This comment has been minimized.

Copy link
Contributor

commented Aug 1, 2019

The two failures are known to be flaky. I will go ahead and merge.

@hachikuji hachikuji merged commit 1546fc3 into apache:trunk Aug 2, 2019

2 of 3 checks passed

JDK 11 and Scala 2.13 FAILURE 11743 tests run, 67 skipped, 2 failed.
Details
JDK 11 and Scala 2.12 SUCCESS 11743 tests run, 67 skipped, 0 failed.
Details
JDK 8 and Scala 2.11 SUCCESS 11743 tests run, 67 skipped, 0 failed.
Details
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants
You can’t perform that action at this time.