-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-3443 [Kafka Stream] support for adding sources to KafkaStreams via Pattern #1477
KAFKA-3443 [Kafka Stream] support for adding sources to KafkaStreams via Pattern #1477
Conversation
@guozhangwang
Two remaining issues per your suggestions for creating integration tests:
Should these be included in this PR or addressed as a separate issue? |
Tests run fine locally. Looks like the build stalled for some reason:
|
@ijuma @hachikuji Do you know if this test could be transiently stalled?
|
@bbejeck Thanks! I will review the patch soon. |
Thoughts on the topic removal - store TopicPartitions when assigned. On partitionsRevoked compare what's being revoked vs what was assigned, if any partitions left, then don't remove stream tasks? |
|
I cannot find anything from #1410's logs you posted. As I mentioned both creating / deleting a topic should trigger a rebalance, which triggers the |
|
||
public String[] getTopics(Collection<String> subscribedTopics) { | ||
List<String> matchedTopics = new ArrayList<>(); | ||
for (String update : subscribedTopics) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we do the following instead?
for (String update : subscribedTopics) {
// already matched from previous rebalances
if (this.pattern == topicToPatterns.get(update)) {
matchedTopics.add(update);
} else if (topicToPatterns.get(update) != null) {
throw new TopologyBuilderException("Topic " + update + " already matched with another regex pattern");
} else if (this.pattern.matcher(update).matches()) {
topicToPatterns.put(update, this.pattern);
matchedTopics.add(update);
}
}
I looked through the diff but cannot fine obvious bugs that could cause your integration tests execute unexpectedly. One thing we would suggest is to print out the |
@bbejeck I think I get the root cause of integration test issues (kudos to @hachikuji): as mentioned the consumer depend on metadata refresh to detect newly added topics / deleted topics, and by default this Now about why topic deletion does not cause newly created tasks in |
I observed this behavior as well when running a KStreams program locally. After deleting a topic, I let the program continue to run for some time (30+ minutes) and the remaining topics were never re-assigned. As for adding a new topic I did the same thing, add a topic and let the program run for a while and a new assignment never happened. I can see the new topic in the KafkaServer logs, but the changes don't propagate over to the Streams program. I'm thinking this could have something to do with my environment and there is a silent failure somewhere. I'll add some detailed logging and see if I can get some more information. EDIT: Fixed the solution was to update the |
…new topic after streams started, fixed potential NullPointerException that can occur if a matching topic is removed while a KStreams program is running.
@guozhangwang I need some guidance for a counter test - removing a matching topic once a KStreams program has started. Very tough to get to the subscription updates without resorting to some messy reflection work. EDIT: To be more clear, I'm looking for another way to prove the regex subscriptions are correct after a rebalance, the case of adding a topic was easier, just confirm a message recieved. Would this be a test for the StreamsPartitionAssignor? |
@@ -292,7 +292,10 @@ private Cluster getClusterForCurrentTopics(Cluster cluster) { | |||
unauthorizedTopics.retainAll(this.topics.keySet()); | |||
|
|||
for (String topic : this.topics.keySet()) { | |||
partitionInfos.addAll(cluster.partitionsForTopic(topic)); | |||
List<PartitionInfo> partitionInfoList = cluster.partitionsForTopic(topic); | |||
if (partitionInfoList != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was necessary as deleting a matching topic causes a NullPointerException when KStreams is running.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch!
@bbejeck I think one way to validate that added / removed topics are handled is to check the removed / created tasks instead of checking the processed messages; i.e. if a new topic is added, the new tasks corresponding to this new topic should be created in the rebalance callback (i.e. We can override the |
…ics added and/or deleted when a streams instance is running.
@guozhangwang Rebalancing when deleting works fine, just needed to set |
pattern1Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC); | ||
|
||
// Remove any state from previous test runs | ||
IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of purge for previous runs, could you use @After
to call this function at the end of each test case?
Thanks @bbejeck ! LGTM overall, just left some minor comments about unit tests. |
…ed integration test, removed Thread.sleep calls
@guozhangwang EDIT: I realized that my replacement for Thread.sleep needs a timeout added, I'll update that. |
|
||
void waitUntilTasksUpdated() { | ||
while (!streamTaskUpdated) { | ||
//empty loop just waiting for update |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needs a check for total time elapsed here, default to 30 seconds then return.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good.
@guozhangwang changes made per suggestions. |
@@ -249,7 +249,7 @@ public void testShouldReadFromRegexAndNamedTopics() throws Exception { | |||
assertThat(actualValues, equalTo(expectedReceivedValues)); | |||
} | |||
|
|||
//TODO should be updated to expected = TopologyBuilderException after KAFKA-3708 | |||
// todo should be updated to expected = TopologyBuilderException after KAFKA-3708 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm actually more considering about the comment in line 264-267, and probably line 276 for de-capitalizing. As for TODO
it is usually treated as a marker in most IDEs for easy finding and we'd keep it that way. For example:
One minor comment regarding comments, otherwise LGTM, running unit / integration test locally now. |
LGTM and merged to trunk. Thanks @bbejeck for this patch! It's a big contribution which drags long :) |
This PR is the follow on to the closed PR #1410.