From ce1d5f124d4ca2e3aa9ed9d7c2ccd89f4ebec8ee Mon Sep 17 00:00:00 2001 From: Guang Zhao Date: Fri, 31 Oct 2025 07:20:11 +1100 Subject: [PATCH 1/6] Add unit test and todo --- .../internals/InternalStreamsBuilder.java | 9 +- ...MergeAcrossValueChangingOperationTest.java | 414 ++++++++++++++++++ 2 files changed, 419 insertions(+), 4 deletions(-) create mode 100644 streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionMergeAcrossValueChangingOperationTest.java diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java index 6460313a62bf9..780c1eaa234a0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java @@ -608,7 +608,7 @@ private OptimizableRepartitionNode createRepartitionNode(final Stri } private GraphNode getKeyChangingParentNode(final GraphNode repartitionNode) { - final GraphNode shouldBeKeyChangingNode = findParentNodeMatching(repartitionNode, n -> n.isKeyChangingOperation() || n.isValueChangingOperation()); + final GraphNode shouldBeKeyChangingNode = findParentNodeMatching(repartitionNode, n -> n.isKeyChangingOperation() || n.isValueChangingOperation()); /// todo: remove isValueChangingOperation() condition final GraphNode keyChangingNode = findParentNodeMatching(repartitionNode, GraphNode::isKeyChangingOperation); if (shouldBeKeyChangingNode != null && shouldBeKeyChangingNode.equals(keyChangingNode)) { @@ -623,6 +623,10 @@ private String getFirstRepartitionTopicName(final Collection GroupedInternal getRepartitionSerdes(final Collection> repartitionNodes) { + /// guang: instead of getting the repartition node's serde, determine the serde as: + /// between this repartition node and it's key changing parent: + /// - the earliest value changing operation's input serde, if any, or + /// - the key value changing operation serde, which is equal to the repartition node's Serde keySerde = null; Serde valueSerde = null; @@ -635,9 +639,6 @@ private GroupedInternal getRepartitionSerdes(final Collection) repartitionNode.valueSerde(); } - if (keySerde != null && valueSerde != null) { - break; - } } return new GroupedInternal<>(Grouped.with(keySerde, valueSerde)); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionMergeAcrossValueChangingOperationTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionMergeAcrossValueChangingOperationTest.java new file mode 100644 index 0000000000000..9116b53765215 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionMergeAcrossValueChangingOperationTest.java @@ -0,0 +1,414 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.processor.internals; + +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.TestInputTopic; +import org.apache.kafka.streams.TestOutputTopic; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Grouped; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.test.StreamsTestUtils; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Properties; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** + * Test demonstrating the current limitation of merge.repartition.topic optimization + * when value-changing operations are present. + * + * CURRENT BEHAVIOR: + * - Optimization stops at value-changing operations (mapValues, flatMapValues, etc.) + * - Cannot merge repartition topics that are separated by value-changing operations + * - Results in multiple repartition topics even when they could be merged + * + * PROPOSED ENHANCEMENT (KAFKA-7138): + * - Track input/output serdes at each node + * - Allow pushing repartition upstream past value-changing operations + * - Switch to upstream serdes when merging repartitions + * - Would enable merging repartitions across value-changing boundaries + */ +public class RepartitionMergeAcrossValueChangingOperationTest { + + /// key-changing operation -> repartition + /// value-changing operation -> cannot reorder repartition past this point + + /// Critical locations to debug: + /// - InternalStreamBuilder.getKeyChangingParentNode() - The decision point + /// - InternalStreamBuilder.mergeRepartitionTopics() - Entry point for optimization + /// - isValueChangingOperation() - Flag checking + /// - setValueChangingOperation() - Where flags are set + + + private static final String INPUT_TOPIC = "input"; + private static final String OUTPUT_TOPIC_1 = "output1"; + private static final String OUTPUT_TOPIC_2 = "output2"; + + private final Serializer stringSerializer = new StringSerializer(); + private final Deserializer stringDeserializer = new StringDeserializer(); + private final Deserializer integerDeserializer = new IntegerDeserializer(); + + private final Pattern repartitionTopicPattern = Pattern.compile("Sink: .*-repartition"); + + private Properties streamsConfiguration; + private TopologyTestDriver topologyTestDriver; + + @BeforeEach + public void before() { + streamsConfiguration = StreamsTestUtils.getStreamsConfig( + Serdes.String(), + Serdes.String() + ); + } + + @AfterEach + public void after() { + if (topologyTestDriver != null) { + topologyTestDriver.close(); + } + } + + /** + * Test Case 1: Demonstrates current limitation + * + * Topology: + * source[String, Integer] + * → map(key-changing: lowercase key) + * → mapValues(int → string) [VALUE-CHANGING] + * → groupByKey → count → output1 + * → filter → groupByKey → count → output2 + * + * CURRENT BEHAVIOR: + * - Creates 2 repartition topics (both with String value serde) + * - Optimization blocked by mapValues operation + * + * DESIRED BEHAVIOR (with enhancement): + * - Could create 1 repartition topic (with Integer value serde from before mapValues) + * - Push repartition above mapValues + * - Both branches read from same repartition and apply mapValues afterward + */ + @Test + public void shouldCreateTwoRepartitionTopicsDueToValueChangingOperation() { + streamsConfiguration.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE); + + final StreamsBuilder builder = new StreamsBuilder(); + + // Source produces + final KStream source = builder.stream( + INPUT_TOPIC, + Consumed.with(Serdes.String(), Serdes.Integer()) + ); + + // KEY-CHANGING operation to trigger repartitioning + final KStream rekeyed = source.map( + (k, v) -> KeyValue.pair(k.toLowerCase(Locale.getDefault()), v) + ); + + // mapValues is VALUE-CHANGING: Integer → String + final KStream mapped = rekeyed.mapValues( + value -> "value-" + value + ); + + // Branch 1: groupByKey → count + // Creates repartition-1 because key was changed above + mapped + .groupByKey(Grouped.with(Serdes.String(), Serdes.String())) + .count(Materialized.as(Stores.inMemoryKeyValueStore("count-store-1"))) + .toStream() + .to(OUTPUT_TOPIC_1, Produced.with(Serdes.String(), Serdes.Long())); + + // Branch 2: filter → groupByKey → count + // Creates repartition-2 (cannot merge with repartition-1 due to mapValues) + mapped + .filter((k, v) -> v.length() > 5) + .groupByKey(Grouped.with(Serdes.String(), Serdes.String())) + .count(Materialized.as(Stores.inMemoryKeyValueStore("count-store-2"))) + .toStream() + .to(OUTPUT_TOPIC_2, Produced.with(Serdes.String(), Serdes.Long())); + + final Topology topology = builder.build(streamsConfiguration); + final String topologyString = topology.describe().toString(); + + // Print topology for inspection + System.out.println("=== TOPOLOGY WITH VALUE-CHANGING OPERATION ==="); + System.out.println(topologyString); + System.out.println("=============================================="); + + // Count repartition topics + final int repartitionCount = countRepartitionTopics(topologyString); + + // CURRENT BEHAVIOR: 2 repartition topics created + // This is because optimization cannot push repartition above mapValues + assertEquals(2, repartitionCount, + "Current behavior: 2 repartition topics created because optimization " + + "is blocked by value-changing operation (mapValues)"); + + // Verify topology still works correctly + topologyTestDriver = new TopologyTestDriver(topology, streamsConfiguration); + + final TestInputTopic inputTopic = + topologyTestDriver.createInputTopic(INPUT_TOPIC, stringSerializer, Serdes.Integer().serializer()); + final TestOutputTopic outputTopic1 = + topologyTestDriver.createOutputTopic(OUTPUT_TOPIC_1, stringDeserializer, Serdes.Long().deserializer()); + final TestOutputTopic outputTopic2 = + topologyTestDriver.createOutputTopic(OUTPUT_TOPIC_2, stringDeserializer, Serdes.Long().deserializer()); + + // Input data (uppercase keys will be lowercased by map()) + inputTopic.pipeKeyValueList(Arrays.asList( + KeyValue.pair("A", 1), + KeyValue.pair("B", 2), + KeyValue.pair("A", 3), + KeyValue.pair("B", 4) + )); + + // Verify outputs (keys are now lowercase: "a", "b") + final Map output1 = outputTopic1.readKeyValuesToMap(); + final Map output2 = outputTopic2.readKeyValuesToMap(); + + // Branch 1: counts all records (a=2, b=2) + assertThat(output1.get("a"), equalTo(2L)); + assertThat(output1.get("b"), equalTo(2L)); + + // Branch 2: counts only records with value.length() > 5 + // "value-1" = 7 chars, "value-2" = 7 chars, "value-3" = 7 chars, "value-4" = 7 chars + // All pass the filter + assertThat(output2.get("a"), equalTo(2L)); + assertThat(output2.get("b"), equalTo(2L)); + } + + /** + * Test Case 2: Shows optimization works WITHOUT value-changing operation + * + * Topology: + * source[String, Integer] + * → map(key-changing: lowercase key) + * → groupByKey → count → output1 + * → filter → groupByKey → count → output2 + * + * CURRENT BEHAVIOR: + * - Creates 1 repartition topic (optimization successful) + * - Both branches share the same repartition topic + */ + @Test + public void shouldMergeRepartitionTopicsWithoutValueChangingOperation() { + streamsConfiguration.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE); + + final StreamsBuilder builder = new StreamsBuilder(); + + final KStream source = builder.stream( + INPUT_TOPIC, + Consumed.with(Serdes.String(), Serdes.Integer()) + ); + + // KEY-CHANGING operation to trigger repartitioning + final KStream rekeyed = source.map( + (k, v) -> KeyValue.pair(k.toLowerCase(Locale.getDefault()), v) + ); + + // NO value-changing operation here + + // Branch 1: groupByKey → count + rekeyed + .groupByKey(Grouped.with(Serdes.String(), Serdes.Integer())) + .count(Materialized.as(Stores.inMemoryKeyValueStore("count-store-1"))) + .toStream() + .to(OUTPUT_TOPIC_1, Produced.with(Serdes.String(), Serdes.Long())); + + // Branch 2: filter → groupByKey → count + rekeyed + .filter((k, v) -> v > 2) + .groupByKey(Grouped.with(Serdes.String(), Serdes.Integer())) + .count(Materialized.as(Stores.inMemoryKeyValueStore("count-store-2"))) + .toStream() + .to(OUTPUT_TOPIC_2, Produced.with(Serdes.String(), Serdes.Long())); + + final Topology topology = builder.build(streamsConfiguration); + final String topologyString = topology.describe().toString(); + + System.out.println("=== TOPOLOGY WITHOUT VALUE-CHANGING OPERATION ==="); + System.out.println(topologyString); + System.out.println("=================================================="); + + final int repartitionCount = countRepartitionTopics(topologyString); + + // CURRENT BEHAVIOR: 1 repartition topic (optimization successful!) + assertEquals(1, repartitionCount, + "Optimization works: 1 merged repartition topic when no value-changing operation present"); + + // Verify topology works correctly + topologyTestDriver = new TopologyTestDriver(topology, streamsConfiguration); + + final TestInputTopic inputTopic = + topologyTestDriver.createInputTopic(INPUT_TOPIC, stringSerializer, Serdes.Integer().serializer()); + final TestOutputTopic outputTopic1 = + topologyTestDriver.createOutputTopic(OUTPUT_TOPIC_1, stringDeserializer, Serdes.Long().deserializer()); + final TestOutputTopic outputTopic2 = + topologyTestDriver.createOutputTopic(OUTPUT_TOPIC_2, stringDeserializer, Serdes.Long().deserializer()); + + inputTopic.pipeKeyValueList(Arrays.asList( + KeyValue.pair("A", 1), + KeyValue.pair("B", 2), + KeyValue.pair("A", 3), + KeyValue.pair("B", 4) + )); + + final Map output1 = outputTopic1.readKeyValuesToMap(); + final Map output2 = outputTopic2.readKeyValuesToMap(); + + // Branch 1: counts all (a=2, b=2) + assertThat(output1.get("a"), equalTo(2L)); + assertThat(output1.get("b"), equalTo(2L)); + + // Branch 2: counts only > 2 (a:3, b:4 = a=1, b=1) + assertThat(output2.get("a"), equalTo(1L)); + assertThat(output2.get("b"), equalTo(1L)); + } + + /** + * Test Case 3: Multiple value-changing operations in sequence + * + * Demonstrates even more complex scenario where multiple value transformations + * block the optimization. + */ + @Test + public void shouldCreateTwoRepartitionTopicsWithMultipleValueChangingOperations() { + streamsConfiguration.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE); + + final StreamsBuilder builder = new StreamsBuilder(); + + final KStream source = builder.stream( + INPUT_TOPIC, + Consumed.with(Serdes.String(), Serdes.Integer()) + ); + + // KEY-CHANGING operation to trigger repartitioning + final KStream rekeyed = source.map( + (k, v) -> KeyValue.pair(k.toLowerCase(Locale.getDefault()), v) + ); + + // Chain of value-changing operations: Integer → String → String + final KStream transformed = rekeyed + .mapValues(value -> "step1-" + value) // Integer → String + .mapValues(value -> value.toUpperCase(Locale.getDefault())); // String → String + + // Two branches that could share repartition if pushed above transformations + transformed + .groupByKey(Grouped.with(Serdes.String(), Serdes.String())) + .count(Materialized.as(Stores.inMemoryKeyValueStore("count-store-1"))) + .toStream() + .to(OUTPUT_TOPIC_1, Produced.with(Serdes.String(), Serdes.Long())); + + transformed + .filter((k, v) -> true) + .groupByKey(Grouped.with(Serdes.String(), Serdes.String())) + .count(Materialized.as(Stores.inMemoryKeyValueStore("count-store-2"))) + .toStream() + .to(OUTPUT_TOPIC_2, Produced.with(Serdes.String(), Serdes.Long())); + + final Topology topology = builder.build(streamsConfiguration); + final String topologyString = topology.describe().toString(); + + System.out.println("=== TOPOLOGY WITH CHAINED VALUE-CHANGING OPERATIONS ==="); + System.out.println(topologyString); + System.out.println("========================================================"); + + final int repartitionCount = countRepartitionTopics(topologyString); + + // CURRENT: 2 repartition topics + // DESIRED: Could push repartition all the way to source with Integer serde + assertEquals(2, repartitionCount, + "Current: 2 repartition topics due to value-changing operations chain"); + + topologyTestDriver = new TopologyTestDriver(topology, streamsConfiguration); + + final TestInputTopic inputTopic = + topologyTestDriver.createInputTopic(INPUT_TOPIC, stringSerializer, Serdes.Integer().serializer()); + final TestOutputTopic outputTopic1 = + topologyTestDriver.createOutputTopic(OUTPUT_TOPIC_1, stringDeserializer, Serdes.Long().deserializer()); + final TestOutputTopic outputTopic2 = + topologyTestDriver.createOutputTopic(OUTPUT_TOPIC_2, stringDeserializer, Serdes.Long().deserializer()); + + inputTopic.pipeKeyValueList(Arrays.asList( + KeyValue.pair("A", 1), + KeyValue.pair("B", 2) + )); + + final Map output1 = outputTopic1.readKeyValuesToMap(); + final Map output2 = outputTopic2.readKeyValuesToMap(); + + assertThat(output1.get("a"), equalTo(1L)); + assertThat(output1.get("b"), equalTo(1L)); + assertThat(output2.get("a"), equalTo(1L)); + assertThat(output2.get("b"), equalTo(1L)); + } + + /** + * Helper method to count repartition topics in the topology + */ + private int countRepartitionTopics(final String topologyString) { + final Matcher matcher = repartitionTopicPattern.matcher(topologyString); + final List repartitionTopics = new ArrayList<>(); + while (matcher.find()) { + repartitionTopics.add(matcher.group()); + } + System.out.println("Repartition topics found: " + repartitionTopics.size()); + for (final String topic : repartitionTopics) { + System.out.println(" - " + topic); + } + return repartitionTopics.size(); + } + + /** + * Helper to convert KeyValue list to Map + */ + private Map keyValueListToMap(final List> keyValuePairs) { + final Map map = new HashMap<>(); + for (final KeyValue pair : keyValuePairs) { + map.put(pair.key, pair.value); + } + return map; + } +} From ce485a2e546ab9eda084ec868e4c326306188eef Mon Sep 17 00:00:00 2001 From: Guang Zhao Date: Mon, 3 Nov 2025 06:13:25 +1100 Subject: [PATCH 2/6] Add value serde tracking at graph node --- .../kstream/internals/AbstractStream.java | 2 ++ .../internals/InternalStreamsBuilder.java | 33 ++++++++++++------- .../kstream/internals/graph/GraphNode.java | 10 ++++++ ...MergeAcrossValueChangingOperationTest.java | 4 +-- 4 files changed, 35 insertions(+), 14 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java index 7e54803593f88..89d2e3d1ccc72 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java @@ -73,6 +73,8 @@ public AbstractStream(final AbstractStream stream) { this.valueSerde = valueSerde; this.subTopologySourceNodes = subTopologySourceNodes; this.graphNode = graphNode; + + this.graphNode.setValueSerde(valueSerde); } // This method allows to expose the InternalTopologyBuilder instance diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java index 780c1eaa234a0..e7c511db8c2a5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java @@ -280,7 +280,8 @@ private void maybeAddNodeForOptimizationMetadata(final GraphNode node) { if (node.isKeyChangingOperation()) { keyChangingOperationsToOptimizableRepartitionNodes.put(node, new LinkedHashSet<>()); } else if (node instanceof OptimizableRepartitionNode) { - final GraphNode parentNode = getKeyChangingParentNode(node); + // final GraphNode parentNode = getKeyChangingParentNode(node); + final GraphNode parentNode = findParentNodeMatching(node, GraphNode::isKeyChangingOperation); if (parentNode != null) { keyChangingOperationsToOptimizableRepartitionNodes.get(parentNode).add((OptimizableRepartitionNode) node); } @@ -488,7 +489,14 @@ private void mergeRepartitionTopics() { continue; } - final GroupedInternal groupedInternal = new GroupedInternal<>(getRepartitionSerdes(entry.getValue())); + final Serde keySerde = getRepartitionSerdes(entry.getValue()).keySerde(); + // guang: can repartition node miss key serde? + Serde valueSerde = keyChangingNode.valueSerde(); + if (valueSerde == null) { + final GraphNode parent = findParentNodeMatching(keyChangingNode, gn -> gn.valueSerde() != null); + valueSerde = parent == null ? null : parent.valueSerde(); + } + final GroupedInternal groupedInternal = new GroupedInternal<>(Grouped.with(keySerde, valueSerde)); final String repartitionTopicName = getFirstRepartitionTopicName(entry.getValue()); //passing in the name of the first repartition topic, re-used to create the optimized repartition topic @@ -503,19 +511,20 @@ private void mergeRepartitionTopics() { final GraphNode keyChangingNodeChild = findParentNodeMatching(repartitionNodeToBeReplaced, gn -> gn.parentNodes().contains(keyChangingNode)); - if (keyChangingNodeChild == null) { + /*if (keyChangingNodeChild == null) { throw new StreamsException(String.format("Found a null keyChangingChild node for %s", repartitionNodeToBeReplaced)); - } + }*/ + if (keyChangingNodeChild != null) { + LOG.debug("Found the child node of the key changer {} from the repartition {}.", keyChangingNodeChild, repartitionNodeToBeReplaced); - LOG.debug("Found the child node of the key changer {} from the repartition {}.", keyChangingNodeChild, repartitionNodeToBeReplaced); + // need to add children of key-changing node as children of optimized repartition + // in order to process records from re-partitioning + optimizedSingleRepartition.addChild(keyChangingNodeChild); - // need to add children of key-changing node as children of optimized repartition - // in order to process records from re-partitioning - optimizedSingleRepartition.addChild(keyChangingNodeChild); - - LOG.debug("Removing {} from {} children {}", keyChangingNodeChild, keyChangingNode, keyChangingNode.children()); - // now remove children from key-changing node - keyChangingNode.removeChild(keyChangingNodeChild); + LOG.debug("Removing {} from {} children {}", keyChangingNodeChild, keyChangingNode, keyChangingNode.children()); + // now remove children from key-changing node + keyChangingNode.removeChild(keyChangingNodeChild); + } // now need to get children of repartition node so we can remove repartition node final Collection repartitionNodeToBeReplacedChildren = repartitionNodeToBeReplaced.children(); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphNode.java index 6894e0dcb0ba1..5944ee166a703 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphNode.java @@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream.internals.graph; +import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; import java.util.Arrays; @@ -42,6 +43,7 @@ public abstract class GraphNode { // explicitly materialized (as either a versioned or an unversioned store) and therefore // whether the output is to be considered versioned or not depends on its parent(s) private Optional outputVersioned = Optional.empty(); + private Serde valueSerde = null; public GraphNode(final String nodeName) { this.nodeName = nodeName; @@ -143,6 +145,14 @@ public void setOutputVersioned(final boolean outputVersioned) { this.outputVersioned = Optional.of(outputVersioned); } + public void setValueSerde(final Serde valueSerde) { + this.valueSerde = valueSerde; + } + + public Serde valueSerde() { + return valueSerde; + } + @Override public String toString() { final String[] parentNames = parentNodeNames(); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionMergeAcrossValueChangingOperationTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionMergeAcrossValueChangingOperationTest.java index 9116b53765215..b7e6a71ee13fb 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionMergeAcrossValueChangingOperationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionMergeAcrossValueChangingOperationTest.java @@ -182,7 +182,7 @@ public void shouldCreateTwoRepartitionTopicsDueToValueChangingOperation() { // CURRENT BEHAVIOR: 2 repartition topics created // This is because optimization cannot push repartition above mapValues - assertEquals(2, repartitionCount, + assertEquals(1, repartitionCount, "Current behavior: 2 repartition topics created because optimization " + "is blocked by value-changing operation (mapValues)"); @@ -359,7 +359,7 @@ public void shouldCreateTwoRepartitionTopicsWithMultipleValueChangingOperations( // CURRENT: 2 repartition topics // DESIRED: Could push repartition all the way to source with Integer serde - assertEquals(2, repartitionCount, + assertEquals(1, repartitionCount, "Current: 2 repartition topics due to value-changing operations chain"); topologyTestDriver = new TopologyTestDriver(topology, streamsConfiguration); From 751d481ae8c61909237e831923c015830b46b6f7 Mon Sep 17 00:00:00 2001 From: Guang Zhao Date: Mon, 3 Nov 2025 06:35:37 +1100 Subject: [PATCH 3/6] Fix tests --- .../internals/graph/StreamsGraphTest.java | 21 +++++++++---------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java index 7f11cc32451ef..83e88162d3b17 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java @@ -331,17 +331,16 @@ public void shouldPartiallyOptimizeWithValueOrKeyChangingOperatorsAfterInitialKe " <-- KTABLE-TOSTREAM-0000000024\n\n", noOptimization.describe().toString() ); - assertEquals(3, getCountOfRepartitionTopicsFound(attemptedOptimize.describe().toString())); + assertEquals(1, getCountOfRepartitionTopicsFound(attemptedOptimize.describe().toString())); assertEquals(3, getCountOfRepartitionTopicsFound(noOptimization.describe().toString())); } @Test - public void shouldNotOptimizeWithValueOrKeyChangingOperatorsAfterInitialKeyChangeWithFixEnabled() { + public void shouldOptimizeWithValueOrKeyChangingOperatorsAfterInitialKeyChangeWithFixEnabled() { final Topology attemptedOptimize = getTopologyWithChangingValuesAfterChangingKey(StreamsConfig.OPTIMIZE, true); final Topology noOptimization = getTopologyWithChangingValuesAfterChangingKey(StreamsConfig.NO_OPTIMIZATION, true); - assertEquals(attemptedOptimize.describe().toString(), noOptimization.describe().toString()); - assertEquals(3, getCountOfRepartitionTopicsFound(attemptedOptimize.describe().toString())); + assertEquals(1, getCountOfRepartitionTopicsFound(attemptedOptimize.describe().toString())); assertEquals(3, getCountOfRepartitionTopicsFound(noOptimization.describe().toString())); } @@ -578,13 +577,13 @@ private int getCountOfRepartitionTopicsFound(final String topologyString) { " --> KSTREAM-MERGE-0000000022\n" + " <-- KSTREAM-PEEK-0000000020\n" + " Processor: KSTREAM-MERGE-0000000022 (stores: [])\n" + - " --> KSTREAM-FILTER-0000000024\n" + + " --> KSTREAM-MERGE-0000000022-repartition-filter\n" + " <-- KSTREAM-MAPVALUES-0000000021, KSTREAM-FLATMAP-0000000010\n" + - " Processor: KSTREAM-FILTER-0000000024 (stores: [])\n" + - " --> KSTREAM-SINK-0000000023\n" + + " Processor: KSTREAM-MERGE-0000000022-repartition-filter (stores: [])\n" + + " --> KSTREAM-MERGE-0000000022-repartition-sink\n" + " <-- KSTREAM-MERGE-0000000022\n" + - " Sink: KSTREAM-SINK-0000000023 (topic: KSTREAM-MERGE-0000000022-repartition)\n" + - " <-- KSTREAM-FILTER-0000000024\n" + + " Sink: KSTREAM-MERGE-0000000022-repartition-sink (topic: KSTREAM-MERGE-0000000022-repartition)\n" + + " <-- KSTREAM-MERGE-0000000022-repartition-filter\n" + "\n" + " Sub-topology: 2\n" + " Source: KSTREAM-SOURCE-0000000011 (topics: [id-table-topic])\n" + @@ -599,11 +598,11 @@ private int getCountOfRepartitionTopicsFound(final String topologyString) { " <-- KSTREAM-AGGREGATE-STATE-STORE-0000000014-repartition-filter\n" + "\n" + " Sub-topology: 3\n" + - " Source: KSTREAM-SOURCE-0000000025 (topics: [KSTREAM-MERGE-0000000022-repartition])\n" + + " Source: KSTREAM-MERGE-0000000022-repartition-source (topics: [KSTREAM-MERGE-0000000022-repartition])\n" + " --> KSTREAM-LEFTJOIN-0000000026\n" + " Processor: KSTREAM-LEFTJOIN-0000000026 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000014])\n" + " --> KSTREAM-BRANCH-0000000027\n" + - " <-- KSTREAM-SOURCE-0000000025\n" + + " <-- KSTREAM-MERGE-0000000022-repartition-source\n" + " Processor: KSTREAM-BRANCH-0000000027 (stores: [])\n" + " --> KSTREAM-BRANCH-00000000270, KSTREAM-BRANCH-00000000271\n" + " <-- KSTREAM-LEFTJOIN-0000000026\n" + From 134c96b9c65cbd5fd157facebf44a7f65a9b531f Mon Sep 17 00:00:00 2001 From: Guang Zhao Date: Mon, 3 Nov 2025 08:15:20 +1100 Subject: [PATCH 4/6] Refactor and clean up --- .../internals/InternalStreamsBuilder.java | 43 ++++---------- .../graph/OptimizableRepartitionNode.java | 4 -- ...MergeAcrossValueChangingOperationTest.java | 57 ++++--------------- 3 files changed, 23 insertions(+), 81 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java index e7c511db8c2a5..c4f350d2b4423 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java @@ -489,14 +489,8 @@ private void mergeRepartitionTopics() { continue; } - final Serde keySerde = getRepartitionSerdes(entry.getValue()).keySerde(); - // guang: can repartition node miss key serde? - Serde valueSerde = keyChangingNode.valueSerde(); - if (valueSerde == null) { - final GraphNode parent = findParentNodeMatching(keyChangingNode, gn -> gn.valueSerde() != null); - valueSerde = parent == null ? null : parent.valueSerde(); - } - final GroupedInternal groupedInternal = new GroupedInternal<>(Grouped.with(keySerde, valueSerde)); + // Resolve key value serdes for merging repartition nodes under the common key changing node + final GroupedInternal groupedInternal = getRepartitionSerdes(entry.getKey(), entry.getValue()); final String repartitionTopicName = getFirstRepartitionTopicName(entry.getValue()); //passing in the name of the first repartition topic, re-used to create the optimized repartition topic @@ -511,9 +505,8 @@ private void mergeRepartitionTopics() { final GraphNode keyChangingNodeChild = findParentNodeMatching(repartitionNodeToBeReplaced, gn -> gn.parentNodes().contains(keyChangingNode)); - /*if (keyChangingNodeChild == null) { - throw new StreamsException(String.format("Found a null keyChangingChild node for %s", repartitionNodeToBeReplaced)); - }*/ + // Remove any child to the key-changing node + // Such child may be common to all corresponding repartition nodes, so only remove it once if (keyChangingNodeChild != null) { LOG.debug("Found the child node of the key changer {} from the repartition {}.", keyChangingNodeChild, repartitionNodeToBeReplaced); @@ -616,40 +609,26 @@ private OptimizableRepartitionNode createRepartitionNode(final Stri } - private GraphNode getKeyChangingParentNode(final GraphNode repartitionNode) { - final GraphNode shouldBeKeyChangingNode = findParentNodeMatching(repartitionNode, n -> n.isKeyChangingOperation() || n.isValueChangingOperation()); /// todo: remove isValueChangingOperation() condition - - final GraphNode keyChangingNode = findParentNodeMatching(repartitionNode, GraphNode::isKeyChangingOperation); - if (shouldBeKeyChangingNode != null && shouldBeKeyChangingNode.equals(keyChangingNode)) { - return keyChangingNode; - } - return null; - } - private String getFirstRepartitionTopicName(final Collection> repartitionNodes) { return repartitionNodes.iterator().next().repartitionTopic(); } @SuppressWarnings("unchecked") - private GroupedInternal getRepartitionSerdes(final Collection> repartitionNodes) { - /// guang: instead of getting the repartition node's serde, determine the serde as: - /// between this repartition node and it's key changing parent: - /// - the earliest value changing operation's input serde, if any, or - /// - the key value changing operation serde, which is equal to the repartition node's + private GroupedInternal getRepartitionSerdes(final GraphNode keyChangingNode, final Collection> repartitionNodes) { Serde keySerde = null; - Serde valueSerde = null; for (final OptimizableRepartitionNode repartitionNode : repartitionNodes) { if (keySerde == null && repartitionNode.keySerde() != null) { keySerde = (Serde) repartitionNode.keySerde(); } - - if (valueSerde == null && repartitionNode.valueSerde() != null) { - valueSerde = (Serde) repartitionNode.valueSerde(); - } - } + // Resolve repartition nodes' value serde for any value-changing nodes upstream until key changing node + Serde valueSerde = (Serde) keyChangingNode.valueSerde(); + if (valueSerde == null) { + final GraphNode parent = findParentNodeMatching(keyChangingNode, gn -> gn.valueSerde() != null); + valueSerde = parent == null ? null : (Serde) parent.valueSerde(); + } return new GroupedInternal<>(Grouped.with(keySerde, valueSerde)); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/OptimizableRepartitionNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/OptimizableRepartitionNode.java index 266cadb290bce..b057fb3d6ae83 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/OptimizableRepartitionNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/OptimizableRepartitionNode.java @@ -50,10 +50,6 @@ public Serde keySerde() { return keySerde; } - public Serde valueSerde() { - return valueSerde; - } - public String repartitionTopic() { return repartitionTopic; } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionMergeAcrossValueChangingOperationTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionMergeAcrossValueChangingOperationTest.java index b7e6a71ee13fb..c74802a540f38 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionMergeAcrossValueChangingOperationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionMergeAcrossValueChangingOperationTest.java @@ -44,7 +44,6 @@ import java.util.ArrayList; import java.util.Arrays; -import java.util.HashMap; import java.util.List; import java.util.Locale; import java.util.Map; @@ -57,32 +56,21 @@ import static org.junit.jupiter.api.Assertions.assertEquals; /** - * Test demonstrating the current limitation of merge.repartition.topic optimization - * when value-changing operations are present. + * Test KAFAK-19669 merge.repartition.topic optimization when value-changing operations are present. * - * CURRENT BEHAVIOR: + * Without optimization (KAFKA-19669): * - Optimization stops at value-changing operations (mapValues, flatMapValues, etc.) * - Cannot merge repartition topics that are separated by value-changing operations * - Results in multiple repartition topics even when they could be merged * - * PROPOSED ENHANCEMENT (KAFKA-7138): + * With optimization (KAFKA-19669): * - Track input/output serdes at each node * - Allow pushing repartition upstream past value-changing operations * - Switch to upstream serdes when merging repartitions - * - Would enable merging repartitions across value-changing boundaries + * - Enable merging repartitions across value-changing boundaries */ public class RepartitionMergeAcrossValueChangingOperationTest { - /// key-changing operation -> repartition - /// value-changing operation -> cannot reorder repartition past this point - - /// Critical locations to debug: - /// - InternalStreamBuilder.getKeyChangingParentNode() - The decision point - /// - InternalStreamBuilder.mergeRepartitionTopics() - Entry point for optimization - /// - isValueChangingOperation() - Flag checking - /// - setValueChangingOperation() - Where flags are set - - private static final String INPUT_TOPIC = "input"; private static final String OUTPUT_TOPIC_1 = "output1"; private static final String OUTPUT_TOPIC_2 = "output2"; @@ -121,17 +109,12 @@ public void after() { * → groupByKey → count → output1 * → filter → groupByKey → count → output2 * - * CURRENT BEHAVIOR: - * - Creates 2 repartition topics (both with String value serde) - * - Optimization blocked by mapValues operation - * - * DESIRED BEHAVIOR (with enhancement): - * - Could create 1 repartition topic (with Integer value serde from before mapValues) + * - Should create 1 repartition topic (with Integer value serde from before mapValues) * - Push repartition above mapValues * - Both branches read from same repartition and apply mapValues afterward */ @Test - public void shouldCreateTwoRepartitionTopicsDueToValueChangingOperation() { + public void singleValueChangingOperation() { streamsConfiguration.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE); final StreamsBuilder builder = new StreamsBuilder(); @@ -228,12 +211,12 @@ public void shouldCreateTwoRepartitionTopicsDueToValueChangingOperation() { * → groupByKey → count → output1 * → filter → groupByKey → count → output2 * - * CURRENT BEHAVIOR: + * Prior to KAFKA-19669, repartition applies to this case: * - Creates 1 repartition topic (optimization successful) * - Both branches share the same repartition topic */ @Test - public void shouldMergeRepartitionTopicsWithoutValueChangingOperation() { + public void withoutValueChangingOperation() { streamsConfiguration.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE); final StreamsBuilder builder = new StreamsBuilder(); @@ -274,7 +257,7 @@ public void shouldMergeRepartitionTopicsWithoutValueChangingOperation() { final int repartitionCount = countRepartitionTopics(topologyString); - // CURRENT BEHAVIOR: 1 repartition topic (optimization successful!) + // Without KAFKA-19669: 1 repartition topic (optimization successful!) assertEquals(1, repartitionCount, "Optimization works: 1 merged repartition topic when no value-changing operation present"); @@ -310,11 +293,10 @@ public void shouldMergeRepartitionTopicsWithoutValueChangingOperation() { /** * Test Case 3: Multiple value-changing operations in sequence * - * Demonstrates even more complex scenario where multiple value transformations - * block the optimization. + * More complex scenario where multiple value transformations do not block the optimization. */ @Test - public void shouldCreateTwoRepartitionTopicsWithMultipleValueChangingOperations() { + public void multipleValueChangingOperations() { streamsConfiguration.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE); final StreamsBuilder builder = new StreamsBuilder(); @@ -357,8 +339,7 @@ public void shouldCreateTwoRepartitionTopicsWithMultipleValueChangingOperations( final int repartitionCount = countRepartitionTopics(topologyString); - // CURRENT: 2 repartition topics - // DESIRED: Could push repartition all the way to source with Integer serde + // Optimize: push repartition all the way to source with Integer serde assertEquals(1, repartitionCount, "Current: 2 repartition topics due to value-changing operations chain"); @@ -385,9 +366,6 @@ public void shouldCreateTwoRepartitionTopicsWithMultipleValueChangingOperations( assertThat(output2.get("b"), equalTo(1L)); } - /** - * Helper method to count repartition topics in the topology - */ private int countRepartitionTopics(final String topologyString) { final Matcher matcher = repartitionTopicPattern.matcher(topologyString); final List repartitionTopics = new ArrayList<>(); @@ -400,15 +378,4 @@ private int countRepartitionTopics(final String topologyString) { } return repartitionTopics.size(); } - - /** - * Helper to convert KeyValue list to Map - */ - private Map keyValueListToMap(final List> keyValuePairs) { - final Map map = new HashMap<>(); - for (final KeyValue pair : keyValuePairs) { - map.put(pair.key, pair.value); - } - return map; - } } From 9ce61c0ee3ed1f4db6a75b855838037c90a8ccec Mon Sep 17 00:00:00 2001 From: Guang Zhao Date: Mon, 3 Nov 2025 08:24:39 +1100 Subject: [PATCH 5/6] Clean up --- .../internals/InternalStreamsBuilder.java | 1 - ...ionMergeAcrossValueChangingOperationTest.java | 16 +++++++++------- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java index c4f350d2b4423..330854f572ac9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java @@ -280,7 +280,6 @@ private void maybeAddNodeForOptimizationMetadata(final GraphNode node) { if (node.isKeyChangingOperation()) { keyChangingOperationsToOptimizableRepartitionNodes.put(node, new LinkedHashSet<>()); } else if (node instanceof OptimizableRepartitionNode) { - // final GraphNode parentNode = getKeyChangingParentNode(node); final GraphNode parentNode = findParentNodeMatching(node, GraphNode::isKeyChangingOperation); if (parentNode != null) { keyChangingOperationsToOptimizableRepartitionNodes.get(parentNode).add((OptimizableRepartitionNode) node); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionMergeAcrossValueChangingOperationTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionMergeAcrossValueChangingOperationTest.java index c74802a540f38..22c07417cd5d5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionMergeAcrossValueChangingOperationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionMergeAcrossValueChangingOperationTest.java @@ -144,7 +144,7 @@ public void singleValueChangingOperation() { .to(OUTPUT_TOPIC_1, Produced.with(Serdes.String(), Serdes.Long())); // Branch 2: filter → groupByKey → count - // Creates repartition-2 (cannot merge with repartition-1 due to mapValues) + // Creates repartition-2 (prior to KAFKA-19669, cannot merge with repartition-1 due to mapValues) mapped .filter((k, v) -> v.length() > 5) .groupByKey(Grouped.with(Serdes.String(), Serdes.String())) @@ -163,11 +163,12 @@ public void singleValueChangingOperation() { // Count repartition topics final int repartitionCount = countRepartitionTopics(topologyString); - // CURRENT BEHAVIOR: 2 repartition topics created + // Prior to KAFKA-19669: 2 repartition topics created // This is because optimization cannot push repartition above mapValues + // KAFKA-19669 merges these two topics assertEquals(1, repartitionCount, - "Current behavior: 2 repartition topics created because optimization " + - "is blocked by value-changing operation (mapValues)"); + "KAFKA-19669 behavior: 1 merged repartition topic created because optimization " + + "can pass any value-changing operation (mapValues)"); // Verify topology still works correctly topologyTestDriver = new TopologyTestDriver(topology, streamsConfiguration); @@ -259,7 +260,8 @@ public void withoutValueChangingOperation() { // Without KAFKA-19669: 1 repartition topic (optimization successful!) assertEquals(1, repartitionCount, - "Optimization works: 1 merged repartition topic when no value-changing operation present"); + "Prior-KAFKA-19669 Optimization still works: " + + "1 merged repartition topic when no value-changing operation present"); // Verify topology works correctly topologyTestDriver = new TopologyTestDriver(topology, streamsConfiguration); @@ -339,9 +341,9 @@ public void multipleValueChangingOperations() { final int repartitionCount = countRepartitionTopics(topologyString); - // Optimize: push repartition all the way to source with Integer serde + // KAFKA-19669: push repartition all the way to source with Integer serde assertEquals(1, repartitionCount, - "Current: 2 repartition topics due to value-changing operations chain"); + "KAFKA19669: merge 2 repartition topics to 1 due to value-changing operations chain"); topologyTestDriver = new TopologyTestDriver(topology, streamsConfiguration); From 5310f95e3af3248e2fe27ca70e2163c2dfcd0525 Mon Sep 17 00:00:00 2001 From: Guang Zhao Date: Mon, 3 Nov 2025 08:27:08 +1100 Subject: [PATCH 6/6] Clean up --- ...onMergeAcrossValueChangingOperationTest.java | 17 +++-------------- 1 file changed, 3 insertions(+), 14 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionMergeAcrossValueChangingOperationTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionMergeAcrossValueChangingOperationTest.java index 22c07417cd5d5..cc6ee02cac374 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionMergeAcrossValueChangingOperationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionMergeAcrossValueChangingOperationTest.java @@ -57,17 +57,6 @@ /** * Test KAFAK-19669 merge.repartition.topic optimization when value-changing operations are present. - * - * Without optimization (KAFKA-19669): - * - Optimization stops at value-changing operations (mapValues, flatMapValues, etc.) - * - Cannot merge repartition topics that are separated by value-changing operations - * - Results in multiple repartition topics even when they could be merged - * - * With optimization (KAFKA-19669): - * - Track input/output serdes at each node - * - Allow pushing repartition upstream past value-changing operations - * - Switch to upstream serdes when merging repartitions - * - Enable merging repartitions across value-changing boundaries */ public class RepartitionMergeAcrossValueChangingOperationTest { @@ -125,7 +114,7 @@ public void singleValueChangingOperation() { Consumed.with(Serdes.String(), Serdes.Integer()) ); - // KEY-CHANGING operation to trigger repartitioning + // Key-changing operation to trigger repartitioning final KStream rekeyed = source.map( (k, v) -> KeyValue.pair(k.toLowerCase(Locale.getDefault()), v) ); @@ -227,7 +216,7 @@ public void withoutValueChangingOperation() { Consumed.with(Serdes.String(), Serdes.Integer()) ); - // KEY-CHANGING operation to trigger repartitioning + // Key-changing operation to trigger repartitioning final KStream rekeyed = source.map( (k, v) -> KeyValue.pair(k.toLowerCase(Locale.getDefault()), v) ); @@ -308,7 +297,7 @@ public void multipleValueChangingOperations() { Consumed.with(Serdes.String(), Serdes.Integer()) ); - // KEY-CHANGING operation to trigger repartitioning + // Key-changing operation to trigger repartitioning final KStream rekeyed = source.map( (k, v) -> KeyValue.pair(k.toLowerCase(Locale.getDefault()), v) );