From 70a2e718bbd8b50f561002751e968392e54af280 Mon Sep 17 00:00:00 2001 From: Vahid Hashemian Date: Mon, 25 Apr 2016 14:56:07 -0700 Subject: [PATCH] KAFKA-3614: Consolidate duplicate code in KGroupedTableImpl Two methods aggregate() and reduce() share common code that is consolidated in this patch. --- .../kstream/internals/KGroupedTableImpl.java | 72 ++++++------------- 1 file changed, 23 insertions(+), 49 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java index f2e2eed580dc..cb4f963a9873 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java @@ -60,16 +60,10 @@ public KGroupedTableImpl(KStreamBuilder topology, this.valSerde = valSerde; } - @Override - public KTable aggregate(Initializer initializer, - Aggregator adder, - Aggregator subtractor, - Serde aggValueSerde, - String name) { - + private KTable getKTableImplObj(String name, String kTableAction, ProcessorSupplier> aggregateSupplier, StateStoreSupplier aggregateStore) { String sinkName = topology.newName(KStreamImpl.SINK_NAME); String sourceName = topology.newName(KStreamImpl.SOURCE_NAME); - String aggregateName = topology.newName(AGGREGATE_NAME); + String actionName = topology.newName(kTableAction); String topic = name + REPARTITION_TOPIC_SUFFIX; @@ -81,14 +75,6 @@ public KTable aggregate(Initializer initializer, ChangedSerializer changedValueSerializer = new ChangedSerializer<>(valueSerializer); ChangedDeserializer changedValueDeserializer = new ChangedDeserializer<>(valueDeserializer); - ProcessorSupplier> aggregateSupplier = new KTableAggregate<>(name, initializer, adder, subtractor); - - StateStoreSupplier aggregateStore = Stores.create(name) - .withKeys(keySerde) - .withValues(aggValueSerde) - .persistent() - .build(); - // send the aggregate key-value pairs to the intermediate topic for partitioning topology.addInternalTopic(topic); topology.addSink(sinkName, topic, keySerializer, changedValueSerializer, this.name); @@ -97,11 +83,27 @@ public KTable aggregate(Initializer initializer, topology.addSource(sourceName, keyDeserializer, changedValueDeserializer, topic); // aggregate the values with the aggregator and local store - topology.addProcessor(aggregateName, aggregateSupplier, sourceName); - topology.addStateStore(aggregateStore, aggregateName); + topology.addProcessor(actionName, aggregateSupplier, sourceName); + topology.addStateStore(aggregateStore, actionName); // return the KTable representation with the intermediate topic as the sources - return new KTableImpl<>(topology, aggregateName, aggregateSupplier, Collections.singleton(sourceName)); + return new KTableImpl<>(topology, actionName, aggregateSupplier, Collections.singleton(sourceName)); + } + + @Override + public KTable aggregate(Initializer initializer, + Aggregator adder, + Aggregator subtractor, + Serde aggValueSerde, + String name) { + ProcessorSupplier> aggregateSupplier = new KTableAggregate<>(name, initializer, adder, subtractor); + StateStoreSupplier aggregateStore = Stores.create(name) + .withKeys(keySerde) + .withValues(aggValueSerde) + .persistent() + .build(); + + return getKTableImplObj(name, AGGREGATE_NAME, aggregateSupplier, aggregateStore); } @Override @@ -117,42 +119,14 @@ public KTable aggregate(Initializer initializer, public KTable reduce(Reducer adder, Reducer subtractor, String name) { - - String sinkName = topology.newName(KStreamImpl.SINK_NAME); - String sourceName = topology.newName(KStreamImpl.SOURCE_NAME); - String reduceName = topology.newName(REDUCE_NAME); - - String topic = name + REPARTITION_TOPIC_SUFFIX; - - Serializer keySerializer = keySerde == null ? null : keySerde.serializer(); - Deserializer keyDeserializer = keySerde == null ? null : keySerde.deserializer(); - Serializer valueSerializer = valSerde == null ? null : valSerde.serializer(); - Deserializer valueDeserializer = valSerde == null ? null : valSerde.deserializer(); - - ChangedSerializer changedValueSerializer = new ChangedSerializer<>(valueSerializer); - ChangedDeserializer changedValueDeserializer = new ChangedDeserializer<>(valueDeserializer); - - ProcessorSupplier> aggregateSupplier = new KTableReduce<>(name, adder, subtractor); - + ProcessorSupplier> aggregateSupplier = aggregateSupplier = new KTableReduce<>(name, adder, subtractor); StateStoreSupplier aggregateStore = Stores.create(name) .withKeys(keySerde) .withValues(valSerde) .persistent() .build(); - // send the aggregate key-value pairs to the intermediate topic for partitioning - topology.addInternalTopic(topic); - topology.addSink(sinkName, topic, keySerializer, changedValueSerializer, this.name); - - // read the intermediate topic - topology.addSource(sourceName, keyDeserializer, changedValueDeserializer, topic); - - // aggregate the values with the aggregator and local store - topology.addProcessor(reduceName, aggregateSupplier, sourceName); - topology.addStateStore(aggregateStore, reduceName); - - // return the KTable representation with the intermediate topic as the sources - return new KTableImpl<>(topology, reduceName, aggregateSupplier, Collections.singleton(sourceName)); + return getKTableImplObj(name, REDUCE_NAME, aggregateSupplier, aggregateStore); } @Override