-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-6391 ensure topics are created with correct partitions BEFORE building the… #4347
Conversation
… metadata for our stream tasks
@guozhangwang Could you have a look on this as well since you seem to have contributed a lot to this class before? |
@cvaliente Thx for the PR. Can you elaborate on this change? The return value of |
@mjsax
previously, |
With this PR, we first validate our |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the explanation. I missed the fact that we indeed modify repartitionTopicMetadata
. Good catch and thanks for the patch!
LGMT.
Call for second review @bbejeck @dguy @guozhangwang
Thanks for the patch @cvaliente. LGTM, but I think we should have an integration test for this. |
The test should fail with the old logic, because: While stream-partition-assignor-test-KSTREAM-MAP-0000000001-repartition is created correctly with four partitions, the StreamPartitionAssignor will only assign three tasks to the topic. Test passes with the new logic.
new TopicPartition("topic1", 0), | ||
new TopicPartition("topic1", 1), | ||
new TopicPartition("topic1", 2), | ||
new TopicPartition("topic3", 0), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider naming the topic "topic2" since there are only two topics in the test
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am using the source topics defined in the test class for this. Since topic2 has the same number of partitions as topic1, it is not suitable for this test case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for adding a test!
// joining the stream and the table | ||
// this triggers the enforceCopartitioning() routine in the StreamPartitionAssignor, | ||
// forcing the stream.map to get repartitioned to a topic with four partitions. | ||
stream1.join(table1, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: move table1
to next line, or fix indention below to align with table1
final UUID uuid = UUID.randomUUID(); | ||
final String client = "client1"; | ||
|
||
mockTaskManager(Collections.<TaskId>emptySet(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: as above
subscriptions.put( | ||
client, | ||
new PartitionAssignor.Subscription( | ||
Collections.singletonList("unknownTopic"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we put "unknownTopic" here? Should we subscribe to "topic1" and "topic3"? Or we can actually pass in an empty list?
I guess this is copied from shouldNotLoopInfinitelyOnMissingMetadataAndShouldNotCreateRelatedTasks
-- there we need to pass in unknownTopic
as this topic does not exist in cluster metadata.
expectedCreatedInternalTopics.put(applicationId + "-topic3-STATE-STORE-0000000002-changelog", 4); | ||
|
||
// check if all internal topics were created as expected | ||
assertThat(mockInternalTopicManager.readyTopics, equalTo(expectedCreatedInternalTopics)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: assertThat
take expected result as first parameter IIRC (otherwise error message on failing test is "reversed")
); | ||
|
||
// check if we created a task for all expected topicpartitions. | ||
assertThat(new HashSet<>(assignment.get(client).partitions()), equalTo(new HashSet<>(expectedAssignment))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as above. (please fix in shouldNotLoopInfinitelyOnMissingMetadataAndShouldNotCreateRelatedTasks
, too)
@mjsax thanks for the feedback, done! |
@cvaliente thanks for adding the test, LGTM. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the forth and back :(
return new KeyValue<>(key, value); | ||
} | ||
}) | ||
.count(Materialized.<Object, Long, KeyValueStore<Bytes, byte[]>>as("count")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We actually don't need to name the store. This could be .count()
plus updating the name for the repartitioning and changelog topic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
know the naming was not required. My thinking is it would be preferable to explicitly choose the name so I don't have to rely on internal logic for naming the repartition topic that I later refer to. Unfortunately there's no explicit naming for the streams.map() repartition topic so it is all moot anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand your point -- however, naming must be deterministic and should not change from release to release (otherwise, upgrading hard harder and we need to mention it in the upgrade docs). Thus, if we use internal names in tests, we have some implicit testing that naming does not change. That's why I prefer to not name the operator, too, if not required.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I guess that's ok. I agree that an upgrade should not require a reset of the streams application's topology unless absolutely necessary. However I think in that case we should also have some explicit testing for the naming of itnernal topics (not sure if those exist yet?)
Either way I will fix this PR up once I am back in the office.
expectedCreatedInternalTopics.put(applicationId + "-topic3-STATE-STORE-0000000002-changelog", 4); | ||
|
||
// check if all internal topics were created as expected | ||
assertThat(expectedCreatedInternalTopics, equalTo(mockInternalTopicManager.readyTopics)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the forth and back -- for assertThat
you original code was correct and expected argument is second one... (it different for assertEquals
-- my bad(!)).
); | ||
|
||
// check if we created a task for all expected topicpartitions. | ||
assertThat(new HashSet<>(expectedAssignment), equalTo(new HashSet<>(assignment.get(client).partitions()))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as above: flip arguments
Collections.<TaskId>emptySet(), | ||
uuid1, | ||
builder); | ||
mockTaskManager( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the cleanup! Really appreciated!
LGTM! Merged to trunk. |
ensure topics are created with correct partitions BEFORE building the metadata for our stream tasks
First ensureCoPartitioning() on repartitionTopicMetadata before creating allRepartitionTopicPartitions
Committer Checklist (excluded from commit message)