New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-7658: Add KStream#toTable to the Streams DSL #7985
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR!
I think we need to add a test that verifies that a KTable
is only materialized if we need to materialize it (eg, only if Materialized.as(name/storeSupplier)
is used, or if the KTable
is used in a join) -- the simplest way might be to verify if a StateStore
is added to the processor via Topology#describe()
?
streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
Outdated
Show resolved
Hide resolved
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
Outdated
Show resolved
Hide resolved
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
Outdated
Show resolved
Hide resolved
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
Outdated
Show resolved
Hide resolved
972f4fa
to
7c83fc3
Compare
@mjsax And update the code thank you |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the quick update. I think we need to be a little bit smarter about the materialization decision and add more test for it.
streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
Outdated
Show resolved
Hide resolved
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
Outdated
Show resolved
Hide resolved
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
Outdated
Show resolved
Hide resolved
Thanks for pointing out the auto-materialization use case, @mjsax . I've been bitten by failing to get this right in a couple of recent features. A good, realistic, test seems to be just attempting to use the new operator as one join input. |
a0e83df
to
0467a61
Compare
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for adding more test -- left couple of comments. (Did only look at the tests.)
I think it would be good to add an aggregation test, too: stream.toTable().groupBy(...).count()
to verify that it works... Not sure if we would need a KTable#filter()
(or `mapValue) test? Maybe to verify that a filter/mapValue does not trigger a materialization?
I had one more thought (\cc @vvcephei): how should we handle the case if the KStream
key was changes when toTable()
is called? We can either insert a repartition topic automatically or we can disallow this operation (because we cannot guarantee the order or the update to the table). Personally, I think that inserting a repartition topic automatically should be fine. In any case, we also need a test for this case, too.
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
Outdated
Show resolved
Hide resolved
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
Outdated
Show resolved
Hide resolved
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
Outdated
Show resolved
Hide resolved
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
Outdated
Show resolved
Hide resolved
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
Outdated
Show resolved
Hide resolved
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
Outdated
Show resolved
Hide resolved
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
Outdated
Show resolved
Hide resolved
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
Outdated
Show resolved
Hide resolved
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
Outdated
Show resolved
Hide resolved
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
Outdated
Show resolved
Hide resolved
7356184
to
7908066
Compare
@mjsax |
b4282c5
to
43c219b
Compare
@mjsax I added a test case and added logic to repartition when calling toTable. Thank you!! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for adding more tests -- I think the DSL code is still not translated correctly into a Topology
-- details comments on the tests.
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
Show resolved
Hide resolved
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
Outdated
Show resolved
Hide resolved
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
Show resolved
Hide resolved
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
Show resolved
Hide resolved
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
Outdated
Show resolved
Hide resolved
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
Outdated
Show resolved
Hide resolved
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
Outdated
Show resolved
Hide resolved
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
Outdated
Show resolved
Hide resolved
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
Outdated
Show resolved
Hide resolved
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
Outdated
Show resolved
Hide resolved
43c219b
to
395dfb7
Compare
f071490
to
a1cab2e
Compare
@mjsax By fixing the test, I could understand what you had said before. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks updating the PR.
I reviewed the main code, too, this time. Couple of minor comments. Also some follow up comments on the tests (overall they look good now).
@mumrah -- This PR is basically ready and I would love to get it into 2.5
release -- when do you plan to cut the release branch? I don't want to cherry-pick a feature into the release branch hence I think there are two options:
- we get the PR finalized before you cut the release branch
- we merge the PR as-is, and do a follow up to address the open comments (cherry-picking a follow up seems ok, as the feature itself would be merged already).
Thoughts?
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
Show resolved
Hide resolved
final Collection<Set<String>> copartitionGroups = | ||
TopologyWrapper.getInternalTopologyBuilder(topology).copartitionGroups(); | ||
|
||
assertEquals(1, copartitionGroups.size()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be better to verify the whole TopologyDescription
-- as you reuse this method for two different test, you could pass the "expectedTopology" String as parameter into this method.
However, reading the test code below, the case for table-table join, for which the right-hand table is updated to trigger a join result is missing (this case only makes sense for table-table join, as for stream-table join the right hand side table updates only update the rhs table, but never trigger a join result). Hence, I think it would be best to not share code but have two test and add the missing case for the table-table join.
.toStream() | ||
.to(output); | ||
|
||
testCountHelper(builder, input, output); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is only used once -- we should inline the code and remove the method.
mkProperties(mkMap( | ||
mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy"), | ||
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "test"), | ||
mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory("kafka-test").getAbsolutePath()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need this and cannot pass props
as in the other tests?
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "test"), | ||
mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory("kafka-test").getAbsolutePath()) | ||
)), | ||
Instant.ofEpochMilli(0L))) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to set wall-clock time?
); | ||
|
||
nextNode = repartitionNodeBuilder.build(); | ||
builder.addGraphNode(this.streamsGraphNode, nextNode); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: avoid unnecessary this.
prefix
builder.addGraphNode(this.streamsGraphNode, nextNode); | ||
nodes = Collections.singleton(sourceName); | ||
} else { | ||
nextNode = this.streamsGraphNode; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: avoid unnecessary this.
prefix
nodes = Collections.singleton(sourceName); | ||
} else { | ||
nextNode = this.streamsGraphNode; | ||
nodes = this.sourceNodes; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: avoid unnecessary this.
prefix
final NamedInternal namedInternal = new NamedInternal(named); | ||
final String name = namedInternal.orElseGenerateWithPrefix(builder, TO_KTABLE_NAME); | ||
final Set<String> nodes; | ||
final StreamsGraphNode nextNode; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think nextNode
is a confusion name; better tableParentNode
?
|
||
final NamedInternal namedInternal = new NamedInternal(named); | ||
final String name = namedInternal.orElseGenerateWithPrefix(builder, TO_KTABLE_NAME); | ||
final Set<String> nodes; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nodes
is not a good name -> subTopologySourceNodes
is better.
a1cab2e
to
79ebe16
Compare
@mjsax the plan is to cut the branch tomorrow (1/30). Looks like @highluck has made some changes since your last review. Barring any issues, let's merge this tonight/tomorrow and open follow up PRs to address any outstanding minor issues. |
79ebe16
to
604a182
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM -- I will do a minor follow up PR the next days (there are some typos in the JavaDocs). Merging to meet the feature freeze deadline.
Thanks for the PR @highluck!
@mjsax |
Conflicts and/or compiler errors due to the fact that we temporarily reverted the commit that removes Scala 2.11 support: * SslAdminIntegrationTest: keep using JAdminClient, take upstream changes otherwise. * ReassignPartitionsClusterTest: keep using JAdminClient, take upstream changes otherwise. * KafkaApis: use `asScala.foreach` instead of `forEach`. # By Ismael Juma (3) and others # Via GitHub * apache-github/trunk: (22 commits) KAFKA-9437; Make the Kafka Protocol Friendlier with L7 Proxies [KIP-559] (apache#7994) KAFKA-9375: Add names to all Connect threads (apache#7901) MINOR: Introduce 2.5-IV0 IBP (apache#8010) KAFKA-8503; Add default api timeout to AdminClient (KIP-533) (apache#8011) Add retries to release.py script (apache#8021) KAFKA-8162: IBM JDK Class not found error when handling SASL (apache#6524) MINOR: Add explicit result type in public defs/vals (apache#7993) KAFKA-9408: Use StandardCharsets.UTF-8 instead of "UTF-8" (apache#7940) KAFKA-9474: Adds 'float64' to the RPC protocol types (apache#8012) KAFKA-9360: Allow disabling MM2 heartbeat and checkpoint emissions (apache#7887) KAFKA-7658: Add KStream#toTable to the Streams DSL (apache#7985) KAFKA-9445: Allow adding changes to allow serving from a specific partition (apache#7984) KAFKA-9422: Track the set of topics a connector is using (KIP-558) (apache#8017) KAFKA-9040; Add --all option to config command (apache#7607) KAFKA-4203: Align broker default for max.message.bytes with Java producer default (apache#4154) KAFKA-9426: Use switch instead of chained if/else in OffsetsForLeaderEpochClient (apache#7959) KAFKA-9405: Use Map.computeIfAbsent where applicable (apache#7937) KAFKA-9026: Use automatic RPC generation in DescribeAcls (apache#7560) MINOR: Remove unused fields in StreamsMetricsImpl (apache#7992) KAFKA-9460: Enable only TLSv1.2 by default and disable other TLS protocol versions (KIP-553) (apache#7998) ...
Follow up to original PR #7985 for KIP-523 (adding `KStream#toTable()` operator) - improve JavaDocs - add more unit tests - fix bug for auto-repartitioning - some code cleanup Reviewers: High Lee <yello1109@daum.net>, John Roesler <john@confluent.io>
Follow up to original PR #7985 for KIP-523 (adding `KStream#toTable()` operator) - improve JavaDocs - add more unit tests - fix bug for auto-repartitioning - some code cleanup Reviewers: High Lee <yello1109@daum.net>, John Roesler <john@confluent.io>
Follow up to original PR apache#7985 for KIP-523 (adding `KStream#toTable()` operator) - improve JavaDocs - add more unit tests - fix bug for auto-repartitioning - some code cleanup Reviewers: High Lee <yello1109@daum.net>, John Roesler <john@confluent.io>
Add KStream#toTable to the Streams DSL
Committer Checklist (excluded from commit message)