Skip to content

Commit

Permalink
MINOR: rename internal FK-join processor classes (#13589)
Browse files Browse the repository at this point in the history
Reviewers: John Roesler <john@confluent.io>, Bill Bejeck <bill@confluent.io>
  • Loading branch information
mjsax committed Apr 18, 2023
1 parent 4b6dcf1 commit 9992f43
Show file tree
Hide file tree
Showing 10 changed files with 80 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.CombinedKey;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.CombinedKeySchema;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionProcessorSupplier;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionJoinForeignProcessorSupplier;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionResolverJoinProcessorSupplier;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignTableJoinProcessorSupplier;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionSendProcessorSupplier;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionJoinProcessorSupplier;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ResponseJoinProcessorSupplier;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionResponseWrapper;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionResponseWrapperSerde;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionStoreReceiveProcessorSupplier;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionReceiveProcessorSupplier;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapperSerde;
import org.apache.kafka.streams.kstream.internals.graph.KTableKTableJoinNode;
Expand Down Expand Up @@ -1139,9 +1139,9 @@ private <VR, KO, VO> KTable<K, VR> doJoinOnForeignKey(final KTable<KO, VO> forei
);

final KTableValueGetterSupplier<K, V> primaryKeyValueGetter = valueGetterSupplier();
final StatefulProcessorNode<K, Change<V>> subscriptionNode = new StatefulProcessorNode<>(
final StatefulProcessorNode<K, Change<V>> subscriptionSendNode = new StatefulProcessorNode<>(
new ProcessorParameters<>(
new ForeignJoinSubscriptionSendProcessorSupplier<>(
new SubscriptionSendProcessorSupplier<>(
foreignKeyExtractor,
subscriptionForeignKeySerdePseudoTopic,
valueHashSerdePseudoTopic,
Expand All @@ -1155,7 +1155,7 @@ private <VR, KO, VO> KTable<K, VR> doJoinOnForeignKey(final KTable<KO, VO> forei
Collections.emptySet(),
Collections.singleton(primaryKeyValueGetter)
);
builder.addGraphNode(graphNode, subscriptionNode);
builder.addGraphNode(graphNode, subscriptionSendNode);

final StreamPartitioner<KO, SubscriptionWrapper<K>> subscriptionSinkPartitioner =
tableJoinedInternal.otherPartitioner() == null
Expand All @@ -1167,7 +1167,7 @@ private <VR, KO, VO> KTable<K, VR> doJoinOnForeignKey(final KTable<KO, VO> forei
new StaticTopicNameExtractor<>(subscriptionTopicName),
new ProducedInternal<>(Produced.with(foreignKeySerde, subscriptionWrapperSerde, subscriptionSinkPartitioner))
);
builder.addGraphNode(subscriptionNode, subscriptionSink);
builder.addGraphNode(subscriptionSendNode, subscriptionSink);

final StreamSourceNode<KO, SubscriptionWrapper<K>> subscriptionSource = new StreamSourceNode<>(
renamed.suffixWithOrElseGet("-subscription-registration-source", builder, SOURCE_NAME),
Expand Down Expand Up @@ -1197,7 +1197,7 @@ private <VR, KO, VO> KTable<K, VR> doJoinOnForeignKey(final KTable<KO, VO> forei
final StatefulProcessorNode<KO, SubscriptionWrapper<K>> subscriptionReceiveNode =
new StatefulProcessorNode<>(
new ProcessorParameters<>(
new SubscriptionStoreReceiveProcessorSupplier<>(subscriptionStore, combinedKeySchema),
new SubscriptionReceiveProcessorSupplier<>(subscriptionStore, combinedKeySchema),
renamed.suffixWithOrElseGet("-subscription-receive", builder, SUBSCRIPTION_PROCESSOR)
),
Collections.singleton(subscriptionStore),
Expand All @@ -1206,28 +1206,28 @@ private <VR, KO, VO> KTable<K, VR> doJoinOnForeignKey(final KTable<KO, VO> forei
builder.addGraphNode(subscriptionSource, subscriptionReceiveNode);

final KTableValueGetterSupplier<KO, VO> foreignKeyValueGetter = ((KTableImpl<KO, VO, VO>) foreignKeyTable).valueGetterSupplier();
final StatefulProcessorNode<CombinedKey<KO, K>, Change<ValueAndTimestamp<SubscriptionWrapper<K>>>> subscriptionJoinForeignNode =
final StatefulProcessorNode<CombinedKey<KO, K>, Change<ValueAndTimestamp<SubscriptionWrapper<K>>>> subscriptionJoinNode =
new StatefulProcessorNode<>(
new ProcessorParameters<>(
new SubscriptionJoinForeignProcessorSupplier<>(
new SubscriptionJoinProcessorSupplier<>(
foreignKeyValueGetter
),
renamed.suffixWithOrElseGet("-subscription-join-foreign", builder, SUBSCRIPTION_PROCESSOR)
),
Collections.emptySet(),
Collections.singleton(foreignKeyValueGetter)
);
builder.addGraphNode(subscriptionReceiveNode, subscriptionJoinForeignNode);
builder.addGraphNode(subscriptionReceiveNode, subscriptionJoinNode);

final StatefulProcessorNode<KO, Change<VO>> foreignJoinSubscriptionNode = new StatefulProcessorNode<>(
final StatefulProcessorNode<KO, Change<VO>> foreignTableJoinNode = new StatefulProcessorNode<>(
new ProcessorParameters<>(
new ForeignJoinSubscriptionProcessorSupplier<>(subscriptionStore, combinedKeySchema, foreignKeyValueGetter),
new ForeignTableJoinProcessorSupplier<>(subscriptionStore, combinedKeySchema, foreignKeyValueGetter),
renamed.suffixWithOrElseGet("-foreign-join-subscription", builder, SUBSCRIPTION_PROCESSOR)
),
Collections.singleton(subscriptionStore),
Collections.singleton(foreignKeyValueGetter)
);
builder.addGraphNode(((KTableImpl<KO, VO, ?>) foreignKeyTable).graphNode, foreignJoinSubscriptionNode);
builder.addGraphNode(((KTableImpl<KO, VO, ?>) foreignKeyTable).graphNode, foreignTableJoinNode);


final String finalRepartitionTopicName = renamed.suffixWithOrElseGet("-subscription-response", builder, SUBSCRIPTION_RESPONSE) + TOPIC_SUFFIX;
Expand All @@ -1244,8 +1244,8 @@ private <VR, KO, VO> KTable<K, VR> doJoinOnForeignKey(final KTable<KO, VO> forei
new StaticTopicNameExtractor<>(finalRepartitionTopicName),
new ProducedInternal<>(Produced.with(keySerde, responseWrapperSerde, foreignResponseSinkPartitioner))
);
builder.addGraphNode(subscriptionJoinForeignNode, foreignResponseSink);
builder.addGraphNode(foreignJoinSubscriptionNode, foreignResponseSink);
builder.addGraphNode(subscriptionJoinNode, foreignResponseSink);
builder.addGraphNode(foreignTableJoinNode, foreignResponseSink);

final StreamSourceNode<K, SubscriptionResponseWrapper<VO>> foreignResponseSource = new StreamSourceNode<>(
renamed.suffixWithOrElseGet("-subscription-response-source", builder, SOURCE_NAME),
Expand All @@ -1259,22 +1259,21 @@ private <VR, KO, VO> KTable<K, VR> doJoinOnForeignKey(final KTable<KO, VO> forei
resultSourceNodes.add(foreignResponseSource.nodeName());
builder.internalTopologyBuilder.copartitionSources(resultSourceNodes);

final SubscriptionResolverJoinProcessorSupplier<K, V, VO, VR> resolverProcessorSupplier = new SubscriptionResolverJoinProcessorSupplier<>(
primaryKeyValueGetter,
valueSerde == null ? null : valueSerde.serializer(),
valueHashSerdePseudoTopic,
joiner,
leftJoin
);
final StatefulProcessorNode<K, SubscriptionResponseWrapper<VO>> resolverNode = new StatefulProcessorNode<>(
final StatefulProcessorNode<K, SubscriptionResponseWrapper<VO>> responseJoinNode = new StatefulProcessorNode<>(
new ProcessorParameters<>(
resolverProcessorSupplier,
new ResponseJoinProcessorSupplier<>(
primaryKeyValueGetter,
valueSerde == null ? null : valueSerde.serializer(),
valueHashSerdePseudoTopic,
joiner,
leftJoin
),
renamed.suffixWithOrElseGet("-subscription-response-resolver", builder, SUBSCRIPTION_RESPONSE_RESOLVER_PROCESSOR)
),
Collections.emptySet(),
Collections.singleton(primaryKeyValueGetter)
);
builder.addGraphNode(foreignResponseSource, resolverNode);
builder.addGraphNode(foreignResponseSource, responseJoinNode);

final String resultProcessorName = renamed.suffixWithOrElseGet("-result", builder, FK_JOIN_OUTPUT_NAME);

Expand Down Expand Up @@ -1308,7 +1307,7 @@ private <VR, KO, VO> KTable<K, VR> doJoinOnForeignKey(final KTable<KO, VO> forei
resultStore
);
resultNode.setOutputVersioned(materializedInternal.storeSupplier() instanceof VersionedBytesStoreSupplier);
builder.addGraphNode(resolverNode, resultNode);
builder.addGraphNode(responseJoinNode, resultNode);

return new KTableImpl<K, V, VR>(
resultProcessorName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,14 @@

import java.nio.ByteBuffer;

public class ForeignJoinSubscriptionProcessorSupplier<K, KO, VO> implements
public class ForeignTableJoinProcessorSupplier<K, KO, VO> implements
ProcessorSupplier<KO, Change<VO>, K, SubscriptionResponseWrapper<VO>> {
private static final Logger LOG = LoggerFactory.getLogger(ForeignJoinSubscriptionProcessorSupplier.class);
private static final Logger LOG = LoggerFactory.getLogger(ForeignTableJoinProcessorSupplier.class);
private final StoreBuilder<TimestampedKeyValueStore<Bytes, SubscriptionWrapper<K>>> storeBuilder;
private final CombinedKeySchema<KO, K> keySchema;
private final KTableValueGetterSupplier<KO, VO> foreignKeyValueGetterSupplier;

public ForeignJoinSubscriptionProcessorSupplier(
public ForeignTableJoinProcessorSupplier(
final StoreBuilder<TimestampedKeyValueStore<Bytes, SubscriptionWrapper<K>>> storeBuilder,
final CombinedKeySchema<KO, K> keySchema,
final KTableValueGetterSupplier<KO, VO> foreignKeyValueGetterSupplier) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,18 +42,18 @@
* @param <VO> Type of foreign values
* @param <VR> Type of joined result of primary and foreign values
*/
public class SubscriptionResolverJoinProcessorSupplier<K, V, VO, VR> implements ProcessorSupplier<K, SubscriptionResponseWrapper<VO>, K, VR> {
public class ResponseJoinProcessorSupplier<K, V, VO, VR> implements ProcessorSupplier<K, SubscriptionResponseWrapper<VO>, K, VR> {
private final KTableValueGetterSupplier<K, V> valueGetterSupplier;
private final Serializer<V> constructionTimeValueSerializer;
private final Supplier<String> valueHashSerdePseudoTopicSupplier;
private final ValueJoiner<V, VO, VR> joiner;
private final boolean leftJoin;

public SubscriptionResolverJoinProcessorSupplier(final KTableValueGetterSupplier<K, V> valueGetterSupplier,
final Serializer<V> valueSerializer,
final Supplier<String> valueHashSerdePseudoTopicSupplier,
final ValueJoiner<V, VO, VR> joiner,
final boolean leftJoin) {
public ResponseJoinProcessorSupplier(final KTableValueGetterSupplier<K, V> valueGetterSupplier,
final Serializer<V> valueSerializer,
final Supplier<String> valueHashSerdePseudoTopicSupplier,
final ValueJoiner<V, VO, VR> joiner,
final boolean leftJoin) {
this.valueGetterSupplier = valueGetterSupplier;
constructionTimeValueSerializer = valueSerializer;
this.valueHashSerdePseudoTopicSupplier = valueHashSerdePseudoTopicSupplier;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@
* @param <KO> Type of foreign key
* @param <VO> Type of foreign value
*/
public class SubscriptionJoinForeignProcessorSupplier<K, KO, VO>
public class SubscriptionJoinProcessorSupplier<K, KO, VO>
implements ProcessorSupplier<CombinedKey<KO, K>, Change<ValueAndTimestamp<SubscriptionWrapper<K>>>, K, SubscriptionResponseWrapper<VO>> {

private final KTableValueGetterSupplier<KO, VO> foreignValueGetterSupplier;

public SubscriptionJoinForeignProcessorSupplier(final KTableValueGetterSupplier<KO, VO> foreignValueGetterSupplier) {
public SubscriptionJoinProcessorSupplier(final KTableValueGetterSupplier<KO, VO> foreignValueGetterSupplier) {
this.foreignValueGetterSupplier = foreignValueGetterSupplier;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,14 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SubscriptionStoreReceiveProcessorSupplier<K, KO>
public class SubscriptionReceiveProcessorSupplier<K, KO>
implements ProcessorSupplier<KO, SubscriptionWrapper<K>, CombinedKey<KO, K>, Change<ValueAndTimestamp<SubscriptionWrapper<K>>>> {
private static final Logger LOG = LoggerFactory.getLogger(SubscriptionStoreReceiveProcessorSupplier.class);
private static final Logger LOG = LoggerFactory.getLogger(SubscriptionReceiveProcessorSupplier.class);

private final StoreBuilder<TimestampedKeyValueStore<Bytes, SubscriptionWrapper<K>>> storeBuilder;
private final CombinedKeySchema<KO, K> keySchema;

public SubscriptionStoreReceiveProcessorSupplier(
public SubscriptionReceiveProcessorSupplier(
final StoreBuilder<TimestampedKeyValueStore<Bytes, SubscriptionWrapper<K>>> storeBuilder,
final CombinedKeySchema<KO, K> keySchema) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@
import static org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper.Instruction.PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE;
import static org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE;

public class ForeignJoinSubscriptionSendProcessorSupplier<K, KO, V> implements ProcessorSupplier<K, Change<V>, KO, SubscriptionWrapper<K>> {
private static final Logger LOG = LoggerFactory.getLogger(ForeignJoinSubscriptionSendProcessorSupplier.class);
public class SubscriptionSendProcessorSupplier<K, KO, V> implements ProcessorSupplier<K, Change<V>, KO, SubscriptionWrapper<K>> {
private static final Logger LOG = LoggerFactory.getLogger(SubscriptionSendProcessorSupplier.class);

private final Function<V, KO> foreignKeyExtractor;
private final Supplier<String> foreignKeySerdeTopicSupplier;
Expand All @@ -56,13 +56,13 @@ public class ForeignJoinSubscriptionSendProcessorSupplier<K, KO, V> implements P
private Serializer<KO> foreignKeySerializer;
private Serializer<V> valueSerializer;

public ForeignJoinSubscriptionSendProcessorSupplier(final Function<V, KO> foreignKeyExtractor,
final Supplier<String> foreignKeySerdeTopicSupplier,
final Supplier<String> valueSerdeTopicSupplier,
final Serde<KO> foreignKeySerde,
final Serializer<V> valueSerializer,
final boolean leftJoin,
final KTableValueGetterSupplier<K, V> primaryKeyValueGetterSupplier) {
public SubscriptionSendProcessorSupplier(final Function<V, KO> foreignKeyExtractor,
final Supplier<String> foreignKeySerdeTopicSupplier,
final Supplier<String> valueSerdeTopicSupplier,
final Serde<KO> foreignKeySerde,
final Serializer<V> valueSerializer,
final boolean leftJoin,
final KTableValueGetterSupplier<K, V> primaryKeyValueGetterSupplier) {
this.foreignKeyExtractor = foreignKeyExtractor;
this.foreignKeySerdeTopicSupplier = foreignKeySerdeTopicSupplier;
this.valueSerdeTopicSupplier = valueSerdeTopicSupplier;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import org.apache.kafka.common.utils.BytesTest;
import org.apache.kafka.streams.kstream.internals.KTableKTableForeignKeyJoinScenarioTest;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.CombinedKeySchemaTest;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionResolverJoinProcessorSupplierTest;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ResponseJoinProcessorSupplierTest;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionResponseWrapperSerdeTest;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapperSerdeTest;
import org.junit.runner.RunWith;
Expand All @@ -44,7 +44,7 @@
CombinedKeySchemaTest.class,
SubscriptionWrapperSerdeTest.class,
SubscriptionResponseWrapperSerdeTest.class,
SubscriptionResolverJoinProcessorSupplierTest.class
ResponseJoinProcessorSupplierTest.class
})
public class ForeignKeyJoinSuite {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import org.junit.Assert;
import org.junit.Test;

public class ForeignJoinSubscriptionProcessorSupplierTest {
public class ForeignTableJoinProcessorSupplierTest {
final Map<String, ValueAndTimestamp<String>> fks = Collections.singletonMap(
"fk1", ValueAndTimestamp.make("foo", 1L)
);
Expand Down Expand Up @@ -375,8 +375,8 @@ public String[] storeNames() {
Change<ValueAndTimestamp<SubscriptionWrapper<String>>>,
String,
SubscriptionResponseWrapper<String>> processor(final KTableValueGetterSupplier<String, String> valueGetterSupplier) {
final SubscriptionJoinForeignProcessorSupplier<String, String, String> supplier =
new SubscriptionJoinForeignProcessorSupplier<>(valueGetterSupplier);
final SubscriptionJoinProcessorSupplier<String, String, String> supplier =
new SubscriptionJoinProcessorSupplier<>(valueGetterSupplier);
return supplier.get();
}
}

0 comments on commit 9992f43

Please sign in to comment.