Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-5124: autocommit reset earliest fixes race condition #2921

Closed

Conversation

original-brownbear
Copy link
Contributor

Fixes org.apache.kafka.streams.integration.utils.IntegrationTestUtils#readKeyValues potentially starting to poll for stream output after the stream finished sending the test data and hence missing it when working with latest offsets.

@original-brownbear
Copy link
Contributor Author

original-brownbear commented Apr 26, 2017

Fix can be verified to work and the issue reproduced directly by putting a sleep in the test after the stream thread starts the test like so:

    private void verifyKTableKTableJoin(final JoinType joinType1,
                                        final JoinType joinType2,
                                        final List<KeyValue<String, String>> expectedResult) throws Exception {
        streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, joinType1 + "-" + joinType2 + "-ktable-ktable-join");

        streams = prepareTopology(joinType1, joinType2);
        streams.start();
        TimeUnit.SECONDS.sleep(5L); // makes trunk fail 100% of runs but is green with this fix

        final List<KeyValue<String, String>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
                CONSUMER_CONFIG,
                OUTPUT,
                expectedResult.size());

        assertThat(result, equalTo(expectedResult));
    }

@enothereska
Copy link
Contributor

Thanks @original-brownbear . I noticed that the default in StreamsConfig.java is also "earliest". : tempConsumerDefaultOverrides.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

I'm wondering why we need to explicitly put it in the config too? Curious to hear your thoughts. Thanks.

@original-brownbear
Copy link
Contributor Author

@enothereska I was wondering the same (that's why it took me 1h+ rather than minutes to figure this out) :D it's really simple though :(

We use org.apache.kafka.streams.integration.KTableKTableJoinIntegrationTest#CONSUMER_CONFIG and not the stream config when we call org.apache.kafka.streams.integration.utils.IntegrationTestUtils#waitUntilMinKeyValueRecordsReceived(java.util.Properties, java.lang.String, int) (which then sets up a fresh Consumer from it) -> stream config doesn't come into play here.

@enothereska
Copy link
Contributor

Doh! LGTM, thanks!

@enothereska
Copy link
Contributor

cc @mjsax @guozhangwang for checkin

@mjsax
Copy link
Member

mjsax commented Apr 26, 2017

Can we extend this, and apply to all tests? We have couple of flaky tests... (I actually think, some test to set "earliest" for consumer already -- too bad that we missed to set the config everywhere)

@original-brownbear
Copy link
Contributor Author

@mjsax sure on it :)

@mjsax
Copy link
Member

mjsax commented Apr 26, 2017

Retest this please.

@original-brownbear
Copy link
Contributor Author

@mjsax separate PR for the extending or do it here? :)

@mjsax
Copy link
Member

mjsax commented Apr 26, 2017

Not sure... It's actually good to have a PR per JIRA. However, personally I think, we can do one PR, and mark all other JIRAs we "resolved by"... It's just single line fixes anyway.

@enothereska @guozhangwang @dguy WDYT?

@guozhangwang
Copy link
Contributor

Let's do separate PRs for other tests.

@guozhangwang
Copy link
Contributor

LGTM.

One thing to note that, this is not introduced in this PR but: each time we call readKeyValues as the condition we are creating a new consumer with the given consumer configs, and the condition is called multiple times. So we are effectively relying on the consumer config to have ENABLE_AUTO_COMMIT_CONFIG turned on so that when the consumer is closing each time within the readKeyValues it will commit offset so it will not duplicate fetching.

I think we can do a follow up PR for fixing the waitUntilMinXXXRecordsReceived functions in a way that

  1. it only requires the passed consumer config to contain bootstrap servers and key-value serdes; while all other configs will be overridden internally that "reset" is earliest, and "auto commit" is turned on, etc.
  2. we will re-use a single consumer across multiple calls of readKeyValues / readValues so we do not need to create a new one on each call.

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Merged to trunk. @original-brownbear could you consider working on the follow-up PR as I mentioned? Thanks!

@original-brownbear
Copy link
Contributor Author

@guozhangwang sure, will work the above into #2920 tomorrow :)

asfgit pushed a commit that referenced this pull request Apr 27, 2017
…se consumer

* Producer and Consumer `close` calls were not handled via `try-with-resources`
* `cleanRun` unused field removed
* Refactored handling of Consumer configuration in `IntegrationTestUtils` to ensure auto-committing of offsets and starting from `earliest`
  * As a result reverted #2921 since it's redundant now

Author: Armin Braun <me@obrown.io>

Reviewers: Matthias J. Sax, Guozhang Wang

Closes #2920 from original-brownbear/cleanup-it-utils-closing
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants