Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-5671: Add StreamsBuilder and Deprecate KStreamBuilder #3602

Closed
wants to merge 3 commits into from

Conversation

mjsax
Copy link
Member

@mjsax mjsax commented Jul 31, 2017

No description provided.

@mjsax
Copy link
Member Author

mjsax commented Jul 31, 2017

Call for review @bbejeck @guozhangwang @enothereska @dguy

@asfgit
Copy link

asfgit commented Jul 31, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/6439/
Test PASSed (JDK 7 and Scala 2.11).

@asfgit
Copy link

asfgit commented Jul 31, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/6424/
Test PASSed (JDK 8 and Scala 2.12).

Copy link
Contributor

@dguy dguy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @mjsax, my main concern is with the use of reflection. I'd like to think we can somehow avoid it, but i think the package structure etc is working against us quite a bit.

* Long valueForKey = localStore.get(key);
* }</pre>
* Note that {@link GlobalKTable} always applies {@code "auto.offset.reset"} strategy {@code "earliest"}
* regardless of the speci
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need addStateStore on the DSL?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is what we decided on (cf the KIP). Strictly speaking it's not required, as people could do builder.build().addStateStore() but that is quite clumsy. And you need to add stores manually for transform/transformValues/process.

* Long valueForKey = localStore.get(key);
* }</pre>
* Note that {@link GlobalKTable} always applies {@code "auto.offset.reset"} strategy {@code "earliest"}
* regardless of the speci
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure we need this on the DSL? It isn't used anywhere and seems more like it should be on Topology?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both are available in Topology, too. And we add to DSL for convenience (cf. my other comment). Also, one might want to add a global store to transform/transformValues/process, too.

* Long valueForKey = localStore.get(key);
* }</pre>
* Note that {@link GlobalKTable} always applies {@code "auto.offset.reset"} strategy {@code "earliest"}
* regardless of the speci
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto

}

public KStreamBuilder() {
// TODO: we should refactor this to avoid usage of reflection
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to think about how we can avoid this. The package structure appears to be working against us.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like this either and to me, it should be a temporary workaround only. The PR is already huge and I think it's ok to accept this for now to finish the KIP. Refactoring will be internal cleanup only and can be done as a follow up. WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I'm not clear why we cannot keep the current impl of KStreamBuilder as is, i.e. to leverage on the old deprecated TopologyBuilder's APIs in processor package than introducing an InternalStreamsBuilder and reflection?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed, we can't keep as-is, because e.g. KStreamImpl was updated to accept an InternalStreamBuilder but not a KStreamBuilder anymore.

private final AtomicInteger index = new AtomicInteger(0);

public InternalStreamsBuilder() {
// TODO: we should refactor this to avoid usage of reflection
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, i think we need to sort this out. Seems to me that somehow we need to pass in both InternalTopologyBuilder and Topology. Using reflection for this is a bit grim

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should be able to just call Topology's public APIs to construct the DAG, e.g. topology.addSource instead of internalTopologyBuilder.addSource if we believe the DSL could be "independently" layer on top of the PAPI layer instead of integrated with it.

I understand that for some other classes like KStreamImpl there is some places that the public APIs of Topology are not sufficient, and we need to access its internal topology builder for getting internal topics / marker co-partitions / etc, I will left a separate comment on those places. But for this class it seems we do not need to access any of its internal functions.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree in general. The goal is to layer DSL on top of Topology -- but there are still some other "leaking" abstractions to get the separation as it "should be". For now, it seems to be ok to me, to work with InternalTopologyBuilder to get access to those "internal methods". I would prefer to do some more refactoring as a follow up.

}

@Test
public void shouldAddTimestampExtractorToStreamWithKeyValSerdePerSource() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about the name? In particular the WithKeyValueSerde - we aren't adding any serdes?

}

@Test
public void shouldAddTimestampExtractorToStreamWithOffsetResetPerSource() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here. Also, is this really any different from the test above?

@@ -67,11 +68,24 @@ public void setUp() throws IOException {
stateDir = TestUtils.tempDirectory("kafka-test");
}

private void doTestJoin(final KStreamBuilder builder,
public static Collection<Set<String>> getCopartitionedGroups(StreamsBuilder builder) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above. Seems something is not quite right if we are resorting to reflection

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will try to remove it in a follow-up PR.

final StreamsBuilder builder = new StreamsBuilder();

// TODO: we should refactor this to avoid usage of reflection
final Field internalStreamsBuilderField = builder.getClass().getDeclaredField("internalStreamsBuilder");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the same as in StandbyTaskTest

final StreamsBuilder builder = new StreamsBuilder();

// TODO: we should refactor this to avoid usage of reflection
final Field internalStreamsBuilderField = builder.getClass().getDeclaredField("internalStreamsBuilder");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above block of code?

*
* @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topic if no valid committed
* offsets are available
* @param topic the topic name; cannot be {@code null}
Copy link
Contributor

@bbejeck bbejeck Jul 31, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: missing javadoc for timestampExtractor parameter

*
* @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topic if no valid committed
* offsets are available
* @param keySerde key serde used to send key-value pairs,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: missing javadoc param tag for timestampExtractor

* @param keySerde key serde used to send key-value pairs,
* if not specified the default key serde defined in the configuration will be used
* @param valueSerde value serde used to send key-value pairs,
* if not specified the default value serde defined in the configuration will be used
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: missing javadoc param tag for timestampExtractor


addSource(offsetReset, name, timestampExtractor, keySerde == null ? null : keySerde.deserializer(), valSerde == null ? null : valSerde.deserializer(), topics);
internalTopologyBuilder.addSource(translateAutoOffsetReset(offsetReset), name, timestampExtractor, keySerde == null ? null : keySerde.deserializer(), valSerde == null ? null : valSerde.deserializer(), topics);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: with this many parameters including ternary statements maybe place the params on a separate line for readability?


addSource(offsetReset, name, timestampExtractor, keySerde == null ? null : keySerde.deserializer(), valSerde == null ? null : valSerde.deserializer(), topicPattern);
internalTopologyBuilder.addSource(translateAutoOffsetReset(offsetReset), name, timestampExtractor, keySerde == null ? null : keySerde.deserializer(), valSerde == null ? null : valSerde.deserializer(), topicPattern);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: same as above

throw new TopologyBuilderException(message);
builder.internalTopologyBuilder.addProcessor(name, new KStreamPrint<>(new PrintForeachAction(printWriter, defaultKeyValueMapper, label), keySerde, valSerde), this.name);
} catch (final FileNotFoundException | UnsupportedEncodingException e) {
throw new TopologyException("Unable to write stream to file at [" + filePath + "] " + e.getMessage());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: use String.format for error message

@mjsax
Copy link
Member Author

mjsax commented Jul 31, 2017

Updated to address @dguy and @bbejeck comments.

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leave two general comments on trying to remove reflections.

@@ -1094,8 +1094,8 @@
//
// OR
//
KStreamBuilder builder = ...; // when using the Kafka Streams DSL
Topology topology = builder.topology();
StreamsBuilder builder = ...; // when using the Kafka Streams DSL
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned in the previous PR, we should add the paragraph for using describe() as well as building incrementally with the describe -> build -> describe -> build pattern.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack.

* {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ...
* ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: space after , ditto similar javadoc elsewhere.

@@ -36,8 +37,8 @@
* For example a user X might buy two items I1 and I2, and thus there might be two records {@code <K:I1>, <K:I2>}
* in the stream.
* <p>
* A {@code KStream} is either {@link KStreamBuilder#stream(String...) defined from one or multiple Kafka topics} that
* are consumed message by message or the result of a {@code KStream} transformation.
* A {@code KStream} is either {@link org.apache.kafka.streams.StreamsBuilder#stream(String...) defined from one or
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to reference the whole path here, since we already import the class above? Ditto elsewhere.

}

public KStreamBuilder() {
// TODO: we should refactor this to avoid usage of reflection
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I'm not clear why we cannot keep the current impl of KStreamBuilder as is, i.e. to leverage on the old deprecated TopologyBuilder's APIs in processor package than introducing an InternalStreamsBuilder and reflection?

private final AtomicInteger index = new AtomicInteger(0);

public InternalStreamsBuilder() {
// TODO: we should refactor this to avoid usage of reflection
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should be able to just call Topology's public APIs to construct the DAG, e.g. topology.addSource instead of internalTopologyBuilder.addSource if we believe the DSL could be "independently" layer on top of the PAPI layer instead of integrated with it.

I understand that for some other classes like KStreamImpl there is some places that the public APIs of Topology are not sufficient, and we need to access its internal topology builder for getting internal topics / marker co-partitions / etc, I will left a separate comment on those places. But for this class it seems we do not need to access any of its internal functions.

: Collections.singleton(sourceName),
storeSupplier.name(),
isQueryable);
builder.internalTopologyBuilder.addProcessor(aggFunctionName, aggregateSupplier, sourceName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here. Could we just use InternalStreamsBuilder#topology#addProcessor instead?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think, the code would be simpler with the suggested change. ATM, the code has nested "hierarchy" with (top level to inner) Topology -> InternalTopologyBuilder -- thus, InternalTopologyBuilder does not even know it's "wrapped" by Topology.

Also, IMHO, in internal classes, it's ok to use InternalTopologyBuilder.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. Thanks for the explanation.

@@ -132,18 +131,18 @@ private void determineIsQueryable(final String queryableStoreName) {
ChangedDeserializer<? extends V> changedValueDeserializer = new ChangedDeserializer<>(valueDeserializer);

// send the aggregate key-value pairs to the intermediate topic for partitioning
topology.addInternalTopic(topic);
topology.addSink(sinkName, topic, keySerializer, changedValueSerializer, this.name);
builder.internalTopologyBuilder.addInternalTopic(topic);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay this will be the place that I mentioned about the place that Topology's public API is not sufficient. One way we can walk around it is to introduce one constructor in Topology which takes a InternalTopologyBuilder and the current constructor will need to be explicit and create the

final InternalTopologyBuilder internalTopologyBuilder = new InternalTopologyBuilder();

The added constructor will also be package-private so it is not exposed to users, but o.a.k.streams. StreamsBuilder can use it, so it can create the internalTopologyBuilder and pass it to the Topology in its own constructor, so that it can then hold on its reference to use it in places like here.

BTW as I mentioned above, for classes we do not really need to access the internalTopologyBuilder we should restrict ourselves to only call Topology's public APIs.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did some of the refactoring you suggested already. But it's easier and cleaner, to go from Topology to InternalTopologyBuilder at one place. For internals, we don't loose anything if we use the internal topology builder IMHO.

@asfgit
Copy link

asfgit commented Jul 31, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/6448/
Test PASSed (JDK 7 and Scala 2.11).

@asfgit
Copy link

asfgit commented Jul 31, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/6433/
Test PASSed (JDK 8 and Scala 2.12).

@asfgit
Copy link

asfgit commented Jul 31, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/6449/
Test PASSed (JDK 7 and Scala 2.11).

@asfgit
Copy link

asfgit commented Jul 31, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/6434/
Test PASSed (JDK 8 and Scala 2.12).

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Follow-up comments that will be wrapped in a separate PR.

@@ -51,7 +52,7 @@
* @param <V> Type of values
* @see KTable
* @see KGroupedStream
* @see KStreamBuilder#stream(String...)
* @see org.apache.kafka.streams.StreamsBuilder#stream(String...)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can just use the class name itself.


/**
* Create a {@link GlobalKTable} for the specified topic.
* The default {@link TimestampExtractor} as specified
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can call the static function of KStreamImpl directly and get rid of the additional function in InternalStreamsBuilder.

: Collections.singleton(sourceName),
storeSupplier.name(),
isQueryable);
builder.internalTopologyBuilder.addProcessor(aggFunctionName, aggregateSupplier, sourceName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. Thanks for the explanation.


private static final String PROCESSOR_NAME = "KSTREAM-PROCESSOR-";

private static final String PRINTING_NAME = "KSTREAM-PRINTER-";

private static final String KEY_SELECT_NAME = "KSTREAM-KEY-SELECT-";

public static final String SINK_NAME = "KSTREAM-SINK-";
static final String SINK_NAME = "KSTREAM-SINK-";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally we should make SOURCE_NAME to be package-private as well. I'll see if that's doable.

public void to(final Serde<K> keySerde, final Serde<V> valSerde, StreamPartitioner<? super K, ? super V> partitioner, final String topic) {
public void to(final Serde<K> keySerde,
final Serde<V> valSerde,
StreamPartitioner<? super K, ? super V> partitioner,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not introduced by this PR: we should not change the passed in reference but just pass a different object if needed in addSink.

@@ -218,7 +218,7 @@ public Integer apply(final String aggKey, final Integer aggOne, final Integer ag
return aggOne + aggTwo;
}
}, SessionWindows.with(30), Serdes.Integer(), "session-store");
table.foreach(new ForeachAction<Windowed<String>, Integer>() {
table.toStream().foreach(new ForeachAction<Windowed<String>, Integer>() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch!

@@ -67,11 +68,24 @@ public void setUp() throws IOException {
stateDir = TestUtils.tempDirectory("kafka-test");
}

private void doTestJoin(final KStreamBuilder builder,
public static Collection<Set<String>> getCopartitionedGroups(StreamsBuilder builder) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will try to remove it in a follow-up PR.

final KStreamBuilder builder = new KStreamBuilder();
builder.stream("topic").groupByKey().count("my-store");
final ProcessorTopology topology = builder.setApplicationId(applicationId).build(0);
final InternalStreamsBuilder builder = new InternalStreamsBuilder(new InternalTopologyBuilder());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto, we can create the internal topology builder directly.

builder.setApplicationId(applicationId);

// TODO: we should refactor this to avoid usage of reflection
final Field internalTopologyBuilderField = internalStreamsBuilder.getClass().getDeclaredField("internalTopologyBuilder");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

@@ -83,6 +87,58 @@ public KStreamTestDriver(final KStreamBuilder builder,
initTopology(topology, topology.stateStores());
}

public KStreamTestDriver(final StreamsBuilder builder) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should mark the above constructors as deprecated.

@guozhangwang
Copy link
Contributor

Merged to trunk.

@asfgit asfgit closed this in da22055 Jul 31, 2017
@mjsax mjsax deleted the kafka-5671-add-streamsbuilder branch June 5, 2018 23:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
5 participants