From 0fece10db85c858c36a56ff7a463636cdf335d70 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Wed, 5 Apr 2017 17:01:35 -0700 Subject: [PATCH 1/3] HOTFIX: break infinite loop of Stream input topics are not available --- .../internals/StreamPartitionAssignor.java | 14 +-- .../StreamPartitionAssignorTest.java | 108 ++++++++++++++---- 2 files changed, 90 insertions(+), 32 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java index 004926fd8c956..bd725e4d91c96 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java @@ -333,11 +333,11 @@ public Map assign(Cluster metadata, Map numPartitions) { + if (numPartitionsCandidate == NOT_AVAILABLE || numPartitionsCandidate > numPartitions) { numPartitions = numPartitionsCandidate; } } @@ -354,6 +354,11 @@ public Map assign(Cluster metadata, Map allRepartitionTopicPartitions = new HashMap<>(); @@ -367,11 +372,6 @@ public Map assign(Cluster metadata, Map stream1 = builder - // Task 1 (should get created): - .stream("topic1") + // Task 1 (should get created if `sourceTopic1` is known) + .stream(sourceTopic1) + // force repartitioning for aggregation + // -> should create internal repartitioning topic only if `sourceTopic1` is known .selectKey(new KeyValueMapper() { @Override public Object apply(Object key, Object value) { @@ -817,12 +843,13 @@ public Object apply(Object key, Object value) { }) .groupByKey() - // Task 2 (should get created): - // create repartioning and changelog topic as task 1 exists + // Task 2 (should only get created if `sourceTopic1` is known): + // -> create repartioning and changelog topic if task2 is created .count("count") - // force repartitioning for join, but second join input topic unknown - // -> internal repartitioning topic should not get created + // force repartitioning for join + // -> internal repartitioning topic should only be create if `sourceTopic1` and sourceTopic2` are both known + // (cf. Task4 below) .toStream() .map(new KeyValueMapper>() { @Override @@ -832,11 +859,11 @@ public KeyValue apply(Object key, Long value) { }); builder - // Task 3 (should not get created because input topic unknown) - .stream("unknownTopic") + // Task 3 (should get created if `sourceTopic2` is known) + .stream(sourceTopic2) - // force repartitioning for join, but input topic unknown - // -> thus should not create internal repartitioning topic + // force repartitioning for join + // -> should create internal repartitioning topic only if `sourceTopic2` is known .selectKey(new KeyValueMapper() { @Override public Object apply(Object key, Object value) { @@ -844,8 +871,8 @@ public Object apply(Object key, Object value) { } }) - // Task 4 (should not get created because input topics unknown) - // should not create any of both input repartition topics or any of both changelog topics + // Task 4 (should only get created if `sourceTopic1` and `sourceTopic2` are both known) + // -> should create of both input repartition topics if task4 is created .join( stream1, new ValueJoiner() { @@ -877,20 +904,51 @@ public Object apply(Object value1, Object value2) { ); final Map assignment = partitionAssignor.assign(metadata, subscriptions); + final HashSet expectedAssignment = new HashSet<>(); final Map expectedCreatedInternalTopics = new HashMap<>(); - expectedCreatedInternalTopics.put(applicationId + "-count-repartition", 3); - expectedCreatedInternalTopics.put(applicationId + "-count-changelog", 3); - assertThat(mockInternalTopicManager.readyTopics, equalTo(expectedCreatedInternalTopics)); + if (src1Known) { + expectedAssignment.add(new TopicPartition(sourceTopic1, 0)); + expectedAssignment.add(new TopicPartition(sourceTopic1, 1)); + expectedAssignment.add(new TopicPartition(sourceTopic1, 2)); + + // force repartitioning for aggregation (selectKey() on `sourceTopic1`) + expectedAssignment.add(new TopicPartition(applicationId + "-count-repartition", 0)); + expectedAssignment.add(new TopicPartition(applicationId + "-count-repartition", 1)); + expectedAssignment.add(new TopicPartition(applicationId + "-count-repartition", 2)); + expectedCreatedInternalTopics.put(applicationId + "-count-repartition", 3); + + // changelog topic for groupBy-count on `sourceTopic1` + expectedCreatedInternalTopics.put(applicationId + "-count-changelog", 3); + } + if (src2Known) { + expectedAssignment.add(new TopicPartition(sourceTopic2, 0)); + expectedAssignment.add(new TopicPartition(sourceTopic2, 1)); + expectedAssignment.add(new TopicPartition(sourceTopic2, 2)); + expectedAssignment.add(new TopicPartition(sourceTopic2, 3)); + } + if (src1Known && src2Known) { + // force repartitioning for join (map() after groupBY-count on `sourceTopic1`) + expectedAssignment.add(new TopicPartition(applicationId + "-KSTREAM-MAP-0000000007-repartition", 0)); + expectedAssignment.add(new TopicPartition(applicationId + "-KSTREAM-MAP-0000000007-repartition", 1)); + expectedAssignment.add(new TopicPartition(applicationId + "-KSTREAM-MAP-0000000007-repartition", 2)); + expectedAssignment.add(new TopicPartition(applicationId + "-KSTREAM-MAP-0000000007-repartition", 3)); + expectedCreatedInternalTopics.put(applicationId + "-KSTREAM-MAP-0000000007-repartition", 4); + + // force repartitioning for join (selectKey() on `sourceTopic2`) + expectedAssignment.add(new TopicPartition(applicationId + "-KSTREAM-KEY-SELECT-0000000009-repartition", 0)); + expectedAssignment.add(new TopicPartition(applicationId + "-KSTREAM-KEY-SELECT-0000000009-repartition", 1)); + expectedAssignment.add(new TopicPartition(applicationId + "-KSTREAM-KEY-SELECT-0000000009-repartition", 2)); + expectedAssignment.add(new TopicPartition(applicationId + "-KSTREAM-KEY-SELECT-0000000009-repartition", 3)); + expectedCreatedInternalTopics.put(applicationId + "-KSTREAM-KEY-SELECT-0000000009-repartition", 4); + + // both join changelog topics + expectedCreatedInternalTopics.put(applicationId + "-KSTREAM-JOINTHIS-0000000018-store-changelog", 4); + expectedCreatedInternalTopics.put(applicationId + "-KSTREAM-JOINOTHER-0000000019-store-changelog", 4); + } - final List expectedAssignment = Arrays.asList( - new TopicPartition("topic1", 0), - new TopicPartition("topic1", 1), - new TopicPartition("topic1", 2), - new TopicPartition(applicationId + "-count-repartition", 0), - new TopicPartition(applicationId + "-count-repartition", 1), - new TopicPartition(applicationId + "-count-repartition", 2) - ); - assertThat(new HashSet(assignment.get(client).partitions()), equalTo(new HashSet(expectedAssignment))); + assertThat(new HashSet<>(assignment.get(client).partitions()), + equalTo(expectedAssignment)); + assertThat(mockInternalTopicManager.readyTopics, equalTo(expectedCreatedInternalTopics)); } @Test From c8f59cb5908cd996dcd10e2935da7632ce764b62 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Thu, 6 Apr 2017 13:57:44 -0700 Subject: [PATCH 2/3] Github comments --- .../processor/internals/StreamPartitionAssignor.java | 8 +++++--- .../internals/StreamPartitionAssignorTest.java | 10 +++++----- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java index bd725e4d91c96..b87c3c7aa1df7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java @@ -337,6 +337,7 @@ public Map assign(Cluster metadata, Map numPartitions) { numPartitions = numPartitionsCandidate; } @@ -345,16 +346,17 @@ public Map assign(Cluster metadata, Map Date: Thu, 6 Apr 2017 16:12:38 -0700 Subject: [PATCH 3/3] WIP --- .../processor/internals/StreamPartitionAssignor.java | 2 +- .../internals/CopartitionedTopicsValidatorTest.java | 7 ------- 2 files changed, 1 insertion(+), 8 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java index b87c3c7aa1df7..53007e378d364 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java @@ -739,7 +739,7 @@ void validate(final Set copartitionGroup, final Integer partitions = metadata.partitionCountForTopic(topic); if (partitions == null) { - throw new TopologyBuilderException(String.format("stream-thread [%s] Topic not found: %s", threadName, topic)); + continue; } if (numPartitions == UNKNOWN) { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/CopartitionedTopicsValidatorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/CopartitionedTopicsValidatorTest.java index 77001ce72f16c..587ecc6006b57 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/CopartitionedTopicsValidatorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/CopartitionedTopicsValidatorTest.java @@ -46,13 +46,6 @@ public void before() { partitions.put(new TopicPartition("second", 1), new PartitionInfo("second", 1, null, null, null)); } - @Test(expected = TopologyBuilderException.class) - public void shouldThrowTopologyBuilderExceptionIfNoPartitionsFoundForCoPartitionedTopic() throws Exception { - validator.validate(Collections.singleton("topic"), - Collections.emptyMap(), - cluster); - } - @Test(expected = TopologyBuilderException.class) public void shouldThrowTopologyBuilderExceptionIfPartitionCountsForCoPartitionedTopicsDontMatch() throws Exception { partitions.remove(new TopicPartition("second", 0));