-
Notifications
You must be signed in to change notification settings - Fork 14k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-5362: Add EOS system tests for Streams API #3201
KAFKA-5362: Add EOS system tests for Streams API #3201
Conversation
Call for review @guozhangwang @bbejeck @dguy |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if we can simplify this test to only do a single operation, i.e., sum? Do we really need to have four different operations to prove that it works? The test seems complicated by the fact that it is trying to verify the various different results and, to me at least, it is adding a bunch of noise that i'm struggling to see through
|
||
private static Map<String, Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>> getOutputRecords(final KafkaConsumer<byte[], byte[]> consumer, | ||
final Map<TopicPartition, Long> committedOffsets, | ||
final List<TopicPartition> partitions) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
partitions is not used
final List<TopicPartition> partitions) { | ||
final Map<String, Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>> recordPerTopicPerPartition = new HashMap<>(); | ||
|
||
long maxWaitTime = System.currentTimeMillis() + 30000; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
extract 30000
to a constant maybe?
I designed the test similar to SmokeTest that does even more stuff... I still think, it makes sense to test simple "copy" and an "aggregation". I also want to make the test more complex and add a repartitioning step for one of the aggregates -- but it did not work yet, so I just removed the repartitioning... Maybe we should split this into individual test though. WDYT? |
I guess it becomes a trade off between system test runtime and code complexity. My preference would be to have simple tests that are easy to follow. If time becomes a problem then merge them. |
props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 2); | ||
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3); | ||
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE); | ||
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we want to disable caching? I think it is valuable to have at least one test case with caching turned on and see if EOS is still guaranteed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With caching, we cannot know the result (ie, not each individual record that get's written to the output), as we don't know when we will flush. For this test, I want to check all intermediate records, too, so I need to disable caching.
But we can add a second test with caching and just make sure we don't over count for example and ignore all intermediate results and just check the latest result per key.
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); | ||
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); | ||
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); | ||
producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you need to set the txnId also?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We write non-transactional data. Actually, we might not even need idempotent producer as we assume that there is no error writing data for this test cases.
|
||
for (final Map.Entry<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecords : receivedRecords.entrySet()) { | ||
final Long committed = committedOffsets.get(partitionRecords.getKey()); | ||
if (partitionRecords.getValue().size() < (committed == null ? 0 : committed)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this okay? If committed == null it means no offsets get committed for this partition in getCommittedOffsets
, and we should check that we do not get any data in partitionRecords
but currently this check will always pass even if partitionRecords.getValue().size() > 0
.
|
||
verifyAllTransactionFinished(consumer, kafka); | ||
|
||
// no not modify: required test output |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no
-> do
?
|
||
for (final Map.Entry<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecords : topicRecords.entrySet()) { | ||
final TopicPartition tp = partitionRecords.getKey(); | ||
truncatedTopicRecords.put(tp, partitionRecords.getValue().subList(0, committedOffsets.get(tp).intValue())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similarly, committedOffsets.get(tp).intValue()
could throw NPE?
Some further improvements
321c6a2
to
925da6c
Compare
Updated. 3 consecutive local runs passed. |
Triggered branch builder at: https://jenkins.confluent.io/job/system-test-kafka-branch-builder-2/306/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Branch builder failed... Investigating. |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Branch builder passed: https://jenkins.confluent.io/job/system-test-kafka-branch-builder-2/311/ |
Author: Matthias J. Sax <matthias@confluent.io> Reviewers: Damian Guy <damian.guy@gmail.com>, Bill Bejeck <bill@confluent.io>, Guozhang Wang <wangguoz@gmail.com> Closes #3201 from mjsax/kafka-5362-add-eos-system-tests-for-streams-api (cherry picked from commit ba07d82) Signed-off-by: Guozhang Wang <wangguoz@gmail.com>
Merged to trunk and 0.11.0 |
No description provided.