diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java index 7e54803593f88..89d2e3d1ccc72 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java @@ -73,6 +73,8 @@ public AbstractStream(final AbstractStream stream) { this.valueSerde = valueSerde; this.subTopologySourceNodes = subTopologySourceNodes; this.graphNode = graphNode; + + this.graphNode.setValueSerde(valueSerde); } // This method allows to expose the InternalTopologyBuilder instance diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java index 6460313a62bf9..330854f572ac9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java @@ -280,7 +280,7 @@ private void maybeAddNodeForOptimizationMetadata(final GraphNode node) { if (node.isKeyChangingOperation()) { keyChangingOperationsToOptimizableRepartitionNodes.put(node, new LinkedHashSet<>()); } else if (node instanceof OptimizableRepartitionNode) { - final GraphNode parentNode = getKeyChangingParentNode(node); + final GraphNode parentNode = findParentNodeMatching(node, GraphNode::isKeyChangingOperation); if (parentNode != null) { keyChangingOperationsToOptimizableRepartitionNodes.get(parentNode).add((OptimizableRepartitionNode) node); } @@ -488,7 +488,8 @@ private void mergeRepartitionTopics() { continue; } - final GroupedInternal groupedInternal = new GroupedInternal<>(getRepartitionSerdes(entry.getValue())); + // Resolve key value serdes for merging repartition nodes under the common key changing node + final GroupedInternal groupedInternal = getRepartitionSerdes(entry.getKey(), entry.getValue()); final String repartitionTopicName = getFirstRepartitionTopicName(entry.getValue()); //passing in the name of the first repartition topic, re-used to create the optimized repartition topic @@ -503,19 +504,19 @@ private void mergeRepartitionTopics() { final GraphNode keyChangingNodeChild = findParentNodeMatching(repartitionNodeToBeReplaced, gn -> gn.parentNodes().contains(keyChangingNode)); - if (keyChangingNodeChild == null) { - throw new StreamsException(String.format("Found a null keyChangingChild node for %s", repartitionNodeToBeReplaced)); - } - - LOG.debug("Found the child node of the key changer {} from the repartition {}.", keyChangingNodeChild, repartitionNodeToBeReplaced); + // Remove any child to the key-changing node + // Such child may be common to all corresponding repartition nodes, so only remove it once + if (keyChangingNodeChild != null) { + LOG.debug("Found the child node of the key changer {} from the repartition {}.", keyChangingNodeChild, repartitionNodeToBeReplaced); - // need to add children of key-changing node as children of optimized repartition - // in order to process records from re-partitioning - optimizedSingleRepartition.addChild(keyChangingNodeChild); + // need to add children of key-changing node as children of optimized repartition + // in order to process records from re-partitioning + optimizedSingleRepartition.addChild(keyChangingNodeChild); - LOG.debug("Removing {} from {} children {}", keyChangingNodeChild, keyChangingNode, keyChangingNode.children()); - // now remove children from key-changing node - keyChangingNode.removeChild(keyChangingNodeChild); + LOG.debug("Removing {} from {} children {}", keyChangingNodeChild, keyChangingNode, keyChangingNode.children()); + // now remove children from key-changing node + keyChangingNode.removeChild(keyChangingNodeChild); + } // now need to get children of repartition node so we can remove repartition node final Collection repartitionNodeToBeReplacedChildren = repartitionNodeToBeReplaced.children(); @@ -607,39 +608,26 @@ private OptimizableRepartitionNode createRepartitionNode(final Stri } - private GraphNode getKeyChangingParentNode(final GraphNode repartitionNode) { - final GraphNode shouldBeKeyChangingNode = findParentNodeMatching(repartitionNode, n -> n.isKeyChangingOperation() || n.isValueChangingOperation()); - - final GraphNode keyChangingNode = findParentNodeMatching(repartitionNode, GraphNode::isKeyChangingOperation); - if (shouldBeKeyChangingNode != null && shouldBeKeyChangingNode.equals(keyChangingNode)) { - return keyChangingNode; - } - return null; - } - private String getFirstRepartitionTopicName(final Collection> repartitionNodes) { return repartitionNodes.iterator().next().repartitionTopic(); } @SuppressWarnings("unchecked") - private GroupedInternal getRepartitionSerdes(final Collection> repartitionNodes) { + private GroupedInternal getRepartitionSerdes(final GraphNode keyChangingNode, final Collection> repartitionNodes) { Serde keySerde = null; - Serde valueSerde = null; for (final OptimizableRepartitionNode repartitionNode : repartitionNodes) { if (keySerde == null && repartitionNode.keySerde() != null) { keySerde = (Serde) repartitionNode.keySerde(); } - - if (valueSerde == null && repartitionNode.valueSerde() != null) { - valueSerde = (Serde) repartitionNode.valueSerde(); - } - - if (keySerde != null && valueSerde != null) { - break; - } } + // Resolve repartition nodes' value serde for any value-changing nodes upstream until key changing node + Serde valueSerde = (Serde) keyChangingNode.valueSerde(); + if (valueSerde == null) { + final GraphNode parent = findParentNodeMatching(keyChangingNode, gn -> gn.valueSerde() != null); + valueSerde = parent == null ? null : (Serde) parent.valueSerde(); + } return new GroupedInternal<>(Grouped.with(keySerde, valueSerde)); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphNode.java index 6894e0dcb0ba1..5944ee166a703 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphNode.java @@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream.internals.graph; +import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; import java.util.Arrays; @@ -42,6 +43,7 @@ public abstract class GraphNode { // explicitly materialized (as either a versioned or an unversioned store) and therefore // whether the output is to be considered versioned or not depends on its parent(s) private Optional outputVersioned = Optional.empty(); + private Serde valueSerde = null; public GraphNode(final String nodeName) { this.nodeName = nodeName; @@ -143,6 +145,14 @@ public void setOutputVersioned(final boolean outputVersioned) { this.outputVersioned = Optional.of(outputVersioned); } + public void setValueSerde(final Serde valueSerde) { + this.valueSerde = valueSerde; + } + + public Serde valueSerde() { + return valueSerde; + } + @Override public String toString() { final String[] parentNames = parentNodeNames(); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/OptimizableRepartitionNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/OptimizableRepartitionNode.java index 266cadb290bce..b057fb3d6ae83 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/OptimizableRepartitionNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/OptimizableRepartitionNode.java @@ -50,10 +50,6 @@ public Serde keySerde() { return keySerde; } - public Serde valueSerde() { - return valueSerde; - } - public String repartitionTopic() { return repartitionTopic; } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java index 7f11cc32451ef..83e88162d3b17 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java @@ -331,17 +331,16 @@ public void shouldPartiallyOptimizeWithValueOrKeyChangingOperatorsAfterInitialKe " <-- KTABLE-TOSTREAM-0000000024\n\n", noOptimization.describe().toString() ); - assertEquals(3, getCountOfRepartitionTopicsFound(attemptedOptimize.describe().toString())); + assertEquals(1, getCountOfRepartitionTopicsFound(attemptedOptimize.describe().toString())); assertEquals(3, getCountOfRepartitionTopicsFound(noOptimization.describe().toString())); } @Test - public void shouldNotOptimizeWithValueOrKeyChangingOperatorsAfterInitialKeyChangeWithFixEnabled() { + public void shouldOptimizeWithValueOrKeyChangingOperatorsAfterInitialKeyChangeWithFixEnabled() { final Topology attemptedOptimize = getTopologyWithChangingValuesAfterChangingKey(StreamsConfig.OPTIMIZE, true); final Topology noOptimization = getTopologyWithChangingValuesAfterChangingKey(StreamsConfig.NO_OPTIMIZATION, true); - assertEquals(attemptedOptimize.describe().toString(), noOptimization.describe().toString()); - assertEquals(3, getCountOfRepartitionTopicsFound(attemptedOptimize.describe().toString())); + assertEquals(1, getCountOfRepartitionTopicsFound(attemptedOptimize.describe().toString())); assertEquals(3, getCountOfRepartitionTopicsFound(noOptimization.describe().toString())); } @@ -578,13 +577,13 @@ private int getCountOfRepartitionTopicsFound(final String topologyString) { " --> KSTREAM-MERGE-0000000022\n" + " <-- KSTREAM-PEEK-0000000020\n" + " Processor: KSTREAM-MERGE-0000000022 (stores: [])\n" + - " --> KSTREAM-FILTER-0000000024\n" + + " --> KSTREAM-MERGE-0000000022-repartition-filter\n" + " <-- KSTREAM-MAPVALUES-0000000021, KSTREAM-FLATMAP-0000000010\n" + - " Processor: KSTREAM-FILTER-0000000024 (stores: [])\n" + - " --> KSTREAM-SINK-0000000023\n" + + " Processor: KSTREAM-MERGE-0000000022-repartition-filter (stores: [])\n" + + " --> KSTREAM-MERGE-0000000022-repartition-sink\n" + " <-- KSTREAM-MERGE-0000000022\n" + - " Sink: KSTREAM-SINK-0000000023 (topic: KSTREAM-MERGE-0000000022-repartition)\n" + - " <-- KSTREAM-FILTER-0000000024\n" + + " Sink: KSTREAM-MERGE-0000000022-repartition-sink (topic: KSTREAM-MERGE-0000000022-repartition)\n" + + " <-- KSTREAM-MERGE-0000000022-repartition-filter\n" + "\n" + " Sub-topology: 2\n" + " Source: KSTREAM-SOURCE-0000000011 (topics: [id-table-topic])\n" + @@ -599,11 +598,11 @@ private int getCountOfRepartitionTopicsFound(final String topologyString) { " <-- KSTREAM-AGGREGATE-STATE-STORE-0000000014-repartition-filter\n" + "\n" + " Sub-topology: 3\n" + - " Source: KSTREAM-SOURCE-0000000025 (topics: [KSTREAM-MERGE-0000000022-repartition])\n" + + " Source: KSTREAM-MERGE-0000000022-repartition-source (topics: [KSTREAM-MERGE-0000000022-repartition])\n" + " --> KSTREAM-LEFTJOIN-0000000026\n" + " Processor: KSTREAM-LEFTJOIN-0000000026 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000014])\n" + " --> KSTREAM-BRANCH-0000000027\n" + - " <-- KSTREAM-SOURCE-0000000025\n" + + " <-- KSTREAM-MERGE-0000000022-repartition-source\n" + " Processor: KSTREAM-BRANCH-0000000027 (stores: [])\n" + " --> KSTREAM-BRANCH-00000000270, KSTREAM-BRANCH-00000000271\n" + " <-- KSTREAM-LEFTJOIN-0000000026\n" + diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionMergeAcrossValueChangingOperationTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionMergeAcrossValueChangingOperationTest.java new file mode 100644 index 0000000000000..cc6ee02cac374 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionMergeAcrossValueChangingOperationTest.java @@ -0,0 +1,372 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.processor.internals; + +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.TestInputTopic; +import org.apache.kafka.streams.TestOutputTopic; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Grouped; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.test.StreamsTestUtils; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Properties; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** + * Test KAFAK-19669 merge.repartition.topic optimization when value-changing operations are present. + */ +public class RepartitionMergeAcrossValueChangingOperationTest { + + private static final String INPUT_TOPIC = "input"; + private static final String OUTPUT_TOPIC_1 = "output1"; + private static final String OUTPUT_TOPIC_2 = "output2"; + + private final Serializer stringSerializer = new StringSerializer(); + private final Deserializer stringDeserializer = new StringDeserializer(); + private final Deserializer integerDeserializer = new IntegerDeserializer(); + + private final Pattern repartitionTopicPattern = Pattern.compile("Sink: .*-repartition"); + + private Properties streamsConfiguration; + private TopologyTestDriver topologyTestDriver; + + @BeforeEach + public void before() { + streamsConfiguration = StreamsTestUtils.getStreamsConfig( + Serdes.String(), + Serdes.String() + ); + } + + @AfterEach + public void after() { + if (topologyTestDriver != null) { + topologyTestDriver.close(); + } + } + + /** + * Test Case 1: Demonstrates current limitation + * + * Topology: + * source[String, Integer] + * → map(key-changing: lowercase key) + * → mapValues(int → string) [VALUE-CHANGING] + * → groupByKey → count → output1 + * → filter → groupByKey → count → output2 + * + * - Should create 1 repartition topic (with Integer value serde from before mapValues) + * - Push repartition above mapValues + * - Both branches read from same repartition and apply mapValues afterward + */ + @Test + public void singleValueChangingOperation() { + streamsConfiguration.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE); + + final StreamsBuilder builder = new StreamsBuilder(); + + // Source produces + final KStream source = builder.stream( + INPUT_TOPIC, + Consumed.with(Serdes.String(), Serdes.Integer()) + ); + + // Key-changing operation to trigger repartitioning + final KStream rekeyed = source.map( + (k, v) -> KeyValue.pair(k.toLowerCase(Locale.getDefault()), v) + ); + + // mapValues is VALUE-CHANGING: Integer → String + final KStream mapped = rekeyed.mapValues( + value -> "value-" + value + ); + + // Branch 1: groupByKey → count + // Creates repartition-1 because key was changed above + mapped + .groupByKey(Grouped.with(Serdes.String(), Serdes.String())) + .count(Materialized.as(Stores.inMemoryKeyValueStore("count-store-1"))) + .toStream() + .to(OUTPUT_TOPIC_1, Produced.with(Serdes.String(), Serdes.Long())); + + // Branch 2: filter → groupByKey → count + // Creates repartition-2 (prior to KAFKA-19669, cannot merge with repartition-1 due to mapValues) + mapped + .filter((k, v) -> v.length() > 5) + .groupByKey(Grouped.with(Serdes.String(), Serdes.String())) + .count(Materialized.as(Stores.inMemoryKeyValueStore("count-store-2"))) + .toStream() + .to(OUTPUT_TOPIC_2, Produced.with(Serdes.String(), Serdes.Long())); + + final Topology topology = builder.build(streamsConfiguration); + final String topologyString = topology.describe().toString(); + + // Print topology for inspection + System.out.println("=== TOPOLOGY WITH VALUE-CHANGING OPERATION ==="); + System.out.println(topologyString); + System.out.println("=============================================="); + + // Count repartition topics + final int repartitionCount = countRepartitionTopics(topologyString); + + // Prior to KAFKA-19669: 2 repartition topics created + // This is because optimization cannot push repartition above mapValues + // KAFKA-19669 merges these two topics + assertEquals(1, repartitionCount, + "KAFKA-19669 behavior: 1 merged repartition topic created because optimization " + + "can pass any value-changing operation (mapValues)"); + + // Verify topology still works correctly + topologyTestDriver = new TopologyTestDriver(topology, streamsConfiguration); + + final TestInputTopic inputTopic = + topologyTestDriver.createInputTopic(INPUT_TOPIC, stringSerializer, Serdes.Integer().serializer()); + final TestOutputTopic outputTopic1 = + topologyTestDriver.createOutputTopic(OUTPUT_TOPIC_1, stringDeserializer, Serdes.Long().deserializer()); + final TestOutputTopic outputTopic2 = + topologyTestDriver.createOutputTopic(OUTPUT_TOPIC_2, stringDeserializer, Serdes.Long().deserializer()); + + // Input data (uppercase keys will be lowercased by map()) + inputTopic.pipeKeyValueList(Arrays.asList( + KeyValue.pair("A", 1), + KeyValue.pair("B", 2), + KeyValue.pair("A", 3), + KeyValue.pair("B", 4) + )); + + // Verify outputs (keys are now lowercase: "a", "b") + final Map output1 = outputTopic1.readKeyValuesToMap(); + final Map output2 = outputTopic2.readKeyValuesToMap(); + + // Branch 1: counts all records (a=2, b=2) + assertThat(output1.get("a"), equalTo(2L)); + assertThat(output1.get("b"), equalTo(2L)); + + // Branch 2: counts only records with value.length() > 5 + // "value-1" = 7 chars, "value-2" = 7 chars, "value-3" = 7 chars, "value-4" = 7 chars + // All pass the filter + assertThat(output2.get("a"), equalTo(2L)); + assertThat(output2.get("b"), equalTo(2L)); + } + + /** + * Test Case 2: Shows optimization works WITHOUT value-changing operation + * + * Topology: + * source[String, Integer] + * → map(key-changing: lowercase key) + * → groupByKey → count → output1 + * → filter → groupByKey → count → output2 + * + * Prior to KAFKA-19669, repartition applies to this case: + * - Creates 1 repartition topic (optimization successful) + * - Both branches share the same repartition topic + */ + @Test + public void withoutValueChangingOperation() { + streamsConfiguration.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE); + + final StreamsBuilder builder = new StreamsBuilder(); + + final KStream source = builder.stream( + INPUT_TOPIC, + Consumed.with(Serdes.String(), Serdes.Integer()) + ); + + // Key-changing operation to trigger repartitioning + final KStream rekeyed = source.map( + (k, v) -> KeyValue.pair(k.toLowerCase(Locale.getDefault()), v) + ); + + // NO value-changing operation here + + // Branch 1: groupByKey → count + rekeyed + .groupByKey(Grouped.with(Serdes.String(), Serdes.Integer())) + .count(Materialized.as(Stores.inMemoryKeyValueStore("count-store-1"))) + .toStream() + .to(OUTPUT_TOPIC_1, Produced.with(Serdes.String(), Serdes.Long())); + + // Branch 2: filter → groupByKey → count + rekeyed + .filter((k, v) -> v > 2) + .groupByKey(Grouped.with(Serdes.String(), Serdes.Integer())) + .count(Materialized.as(Stores.inMemoryKeyValueStore("count-store-2"))) + .toStream() + .to(OUTPUT_TOPIC_2, Produced.with(Serdes.String(), Serdes.Long())); + + final Topology topology = builder.build(streamsConfiguration); + final String topologyString = topology.describe().toString(); + + System.out.println("=== TOPOLOGY WITHOUT VALUE-CHANGING OPERATION ==="); + System.out.println(topologyString); + System.out.println("=================================================="); + + final int repartitionCount = countRepartitionTopics(topologyString); + + // Without KAFKA-19669: 1 repartition topic (optimization successful!) + assertEquals(1, repartitionCount, + "Prior-KAFKA-19669 Optimization still works: " + + "1 merged repartition topic when no value-changing operation present"); + + // Verify topology works correctly + topologyTestDriver = new TopologyTestDriver(topology, streamsConfiguration); + + final TestInputTopic inputTopic = + topologyTestDriver.createInputTopic(INPUT_TOPIC, stringSerializer, Serdes.Integer().serializer()); + final TestOutputTopic outputTopic1 = + topologyTestDriver.createOutputTopic(OUTPUT_TOPIC_1, stringDeserializer, Serdes.Long().deserializer()); + final TestOutputTopic outputTopic2 = + topologyTestDriver.createOutputTopic(OUTPUT_TOPIC_2, stringDeserializer, Serdes.Long().deserializer()); + + inputTopic.pipeKeyValueList(Arrays.asList( + KeyValue.pair("A", 1), + KeyValue.pair("B", 2), + KeyValue.pair("A", 3), + KeyValue.pair("B", 4) + )); + + final Map output1 = outputTopic1.readKeyValuesToMap(); + final Map output2 = outputTopic2.readKeyValuesToMap(); + + // Branch 1: counts all (a=2, b=2) + assertThat(output1.get("a"), equalTo(2L)); + assertThat(output1.get("b"), equalTo(2L)); + + // Branch 2: counts only > 2 (a:3, b:4 = a=1, b=1) + assertThat(output2.get("a"), equalTo(1L)); + assertThat(output2.get("b"), equalTo(1L)); + } + + /** + * Test Case 3: Multiple value-changing operations in sequence + * + * More complex scenario where multiple value transformations do not block the optimization. + */ + @Test + public void multipleValueChangingOperations() { + streamsConfiguration.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE); + + final StreamsBuilder builder = new StreamsBuilder(); + + final KStream source = builder.stream( + INPUT_TOPIC, + Consumed.with(Serdes.String(), Serdes.Integer()) + ); + + // Key-changing operation to trigger repartitioning + final KStream rekeyed = source.map( + (k, v) -> KeyValue.pair(k.toLowerCase(Locale.getDefault()), v) + ); + + // Chain of value-changing operations: Integer → String → String + final KStream transformed = rekeyed + .mapValues(value -> "step1-" + value) // Integer → String + .mapValues(value -> value.toUpperCase(Locale.getDefault())); // String → String + + // Two branches that could share repartition if pushed above transformations + transformed + .groupByKey(Grouped.with(Serdes.String(), Serdes.String())) + .count(Materialized.as(Stores.inMemoryKeyValueStore("count-store-1"))) + .toStream() + .to(OUTPUT_TOPIC_1, Produced.with(Serdes.String(), Serdes.Long())); + + transformed + .filter((k, v) -> true) + .groupByKey(Grouped.with(Serdes.String(), Serdes.String())) + .count(Materialized.as(Stores.inMemoryKeyValueStore("count-store-2"))) + .toStream() + .to(OUTPUT_TOPIC_2, Produced.with(Serdes.String(), Serdes.Long())); + + final Topology topology = builder.build(streamsConfiguration); + final String topologyString = topology.describe().toString(); + + System.out.println("=== TOPOLOGY WITH CHAINED VALUE-CHANGING OPERATIONS ==="); + System.out.println(topologyString); + System.out.println("========================================================"); + + final int repartitionCount = countRepartitionTopics(topologyString); + + // KAFKA-19669: push repartition all the way to source with Integer serde + assertEquals(1, repartitionCount, + "KAFKA19669: merge 2 repartition topics to 1 due to value-changing operations chain"); + + topologyTestDriver = new TopologyTestDriver(topology, streamsConfiguration); + + final TestInputTopic inputTopic = + topologyTestDriver.createInputTopic(INPUT_TOPIC, stringSerializer, Serdes.Integer().serializer()); + final TestOutputTopic outputTopic1 = + topologyTestDriver.createOutputTopic(OUTPUT_TOPIC_1, stringDeserializer, Serdes.Long().deserializer()); + final TestOutputTopic outputTopic2 = + topologyTestDriver.createOutputTopic(OUTPUT_TOPIC_2, stringDeserializer, Serdes.Long().deserializer()); + + inputTopic.pipeKeyValueList(Arrays.asList( + KeyValue.pair("A", 1), + KeyValue.pair("B", 2) + )); + + final Map output1 = outputTopic1.readKeyValuesToMap(); + final Map output2 = outputTopic2.readKeyValuesToMap(); + + assertThat(output1.get("a"), equalTo(1L)); + assertThat(output1.get("b"), equalTo(1L)); + assertThat(output2.get("a"), equalTo(1L)); + assertThat(output2.get("b"), equalTo(1L)); + } + + private int countRepartitionTopics(final String topologyString) { + final Matcher matcher = repartitionTopicPattern.matcher(topologyString); + final List repartitionTopics = new ArrayList<>(); + while (matcher.find()) { + repartitionTopics.add(matcher.group()); + } + System.out.println("Repartition topics found: " + repartitionTopics.size()); + for (final String topic : repartitionTopics) { + System.out.println(" - " + topic); + } + return repartitionTopics.size(); + } +}