Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-14209 : Rewrite self joins to use single state store 2/3 #12644

Merged
merged 4 commits into from
Oct 5, 2022

Conversation

vpapavas
Copy link
Contributor

KIP can be found here: https://cwiki.apache.org/confluence/display/KAFKA/KIP-862%3A+Self-join

It only applies to Stream-Stream joins and not n-way self-joins.

This is an inner-join topology (without the optimization)

Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [topic2])
      --> KSTREAM-WINDOWED-0000000001, KSTREAM-WINDOWED-0000000002
    Processor: KSTREAM-WINDOWED-0000000001 (stores: [KSTREAM-JOINTHIS-0000000003-store])
      --> KSTREAM-JOINTHIS-0000000003
      <-- KSTREAM-SOURCE-0000000000
    Processor: KSTREAM-WINDOWED-0000000002 (stores: [KSTREAM-JOINOTHER-0000000004-store])
      --> KSTREAM-JOINOTHER-0000000004
      <-- KSTREAM-SOURCE-0000000000
    Processor: KSTREAM-JOINOTHER-0000000004 (stores: [KSTREAM-JOINTHIS-0000000003-store])
      --> KSTREAM-MERGE-0000000005
      <-- KSTREAM-WINDOWED-0000000002
    Processor: KSTREAM-JOINTHIS-0000000003 (stores: [KSTREAM-JOINOTHER-0000000004-store])
      --> KSTREAM-MERGE-0000000005
      <-- KSTREAM-WINDOWED-0000000001
    Processor: KSTREAM-MERGE-0000000005 (stores: [])
      --> KSTREAM-PROCESSOR-0000000006
      <-- KSTREAM-JOINTHIS-0000000003, KSTREAM-JOINOTHER-0000000004
    Processor: KSTREAM-PROCESSOR-0000000006 (stores: [])
      --> none
      <-- KSTREAM-MERGE-0000000005

inner-join

and this is the optimized self-join topology

Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [topic1])
      --> KSTREAM-WINDOWED-0000000001
    Processor: KSTREAM-WINDOWED-0000000001 (stores: [KSTREAM-JOINTHIS-0000000003-store])
      --> KSTREAM-MERGE-0000000005
      <-- KSTREAM-SOURCE-0000000000
    Processor: KSTREAM-MERGE-0000000005 (stores: [KSTREAM-JOINTHIS-0000000003-store])
      --> KSTREAM-PROCESSOR-0000000006
      <-- KSTREAM-WINDOWED-0000000001
    Processor: KSTREAM-PROCESSOR-0000000006 (stores: [])
      --> none
      <-- KSTREAM-MERGE-0000000005

self-join

Testing: Unit tests (Integration and upgrade test in follow up PR)

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

final GraphNode parent = joinNode.parentNodes().stream().findFirst().get();
GraphNode left = null, right = null;
for (final GraphNode child: parent.children()) {
if (child instanceof WindowedStreamProcessorNode
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current JoinNode might have other JoinNodes as siblings. We need to differentiate between the WindowedStreamProcessorNode nodes that belong the current node versus others.

@vpapavas
Copy link
Contributor Author

I opened the follow-up ticket for improving runtime by doing a single-loop https://issues.apache.org/jira/browse/KAFKA-14251.

@@ -198,7 +207,8 @@ public <K, V1, V2, VOut> KStream<K, VOut> join(final KStream<K, V1> lhs,
.withOtherWindowedStreamProcessorParameters(otherWindowStreamProcessorParams)
.withOuterJoinWindowStoreBuilder(outerJoinWindowStore)
.withValueJoiner(joiner)
.withNodeName(joinMergeName);
.withNodeName(joinMergeName)
.withSelfJoinProcessorParameters(selfJoinProcessorParams);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a strong opinion, since I know we are messing the logical planner with physical info quite badly already.. but I'm wondering if we could defer this in StreamStreamJoinNode#writeToTopology inside the isSelfJoin condition? We still have all the pieces we need: 1) left store name, 2) both join window specs. 3) joiner, 4) time tracker, from other params, so that in writeToTopology we can still generate the KStreamKStreamSelfJoin object if self-join is enabled.

My motivation is just to not spill more physical node info into logical planning phase.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @guozhangwang! You mean to move to StreamStreamJoinNode#writeToTopology the code about the creation of the self-join node

final KStreamKStreamSelfJoin<K, V1, V2, VOut> selfJoin = new KStreamKStreamSelfJoin<>(
            thisWindowStore.name(),
            internalWindows,
            joiner,
            sharedTimeTracker
        );

I tried it but I don't think it makes the code clearer. All the components of the StreamStreamJoinNode are built in the KStreamJoinImpl and are only visible there because they are not public. So, I would have to special case the self-join part which fixes the problem of spilling physical information into the logical plan but does it only for a small special case. I am worried this will be confusing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack, thanks a lot for letting me know :)

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @vpapavas , I do not have further comments on the non-testing part. Made a final pass on the testing part, and just one more question. Otherwise I think it's good to go.

@vvcephei
Copy link
Contributor

vvcephei commented Oct 5, 2022

These test failures are unrelated:

[Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest.testReplication()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12644/7/testReport/junit/org.apache.kafka.connect.mirror.integration/IdentityReplicationIntegrationTest/Build___JDK_11_and_Scala_2_13___testReplication__/)
    [Build / JDK 11 and Scala 2.13 / kafka.api.PlaintextAdminIntegrationTest.testCreatePartitions(String).quorum=kraft](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12644/7/testReport/junit/kafka.api/PlaintextAdminIntegrationTest/Build___JDK_11_and_Scala_2_13___testCreatePartitions_String__quorum_kraft/)
    [Build / JDK 8 and Scala 2.12 / kafka.api.TransactionsTest.testAbortTransactionTimeout(String).quorum=kraft](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12644/7/testReport/junit/kafka.api/TransactionsTest/Build___JDK_8_and_Scala_2_12___testAbortTransactionTimeout_String__quorum_kraft_2/)
    [Build / JDK 17 and Scala 2.13 / org.apache.kafka.common.network.SslTransportLayerTest.[3] tlsProtocol=TLSv1.3, useInlinePem=false](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12644/7/testReport/junit/org.apache.kafka.common.network/SslTransportLayerTest/Build___JDK_17_and_Scala_2_13____3__tlsProtocol_TLSv1_3__useInlinePem_false/)

@vvcephei vvcephei merged commit 21a15c6 into apache:trunk Oct 5, 2022
guozhangwang pushed a commit to guozhangwang/kafka that referenced this pull request Jan 25, 2023
…e#12644)

Implements KIP-862: https://cwiki.apache.org/confluence/x/WSf1D

Reviewers: Guozhang Wang <guozhang@apache.org>,  Austin Heyne <aheyne>, John Roesler <vvcephei@apache.org>
rutvijmehta-harness pushed a commit to rutvijmehta-harness/kafka that referenced this pull request Feb 9, 2024
…e#12644)

Implements KIP-862: https://cwiki.apache.org/confluence/x/WSf1D

Reviewers: Guozhang Wang <guozhang@apache.org>,  Austin Heyne <aheyne>, John Roesler <vvcephei@apache.org>
rutvijmehta-harness added a commit to rutvijmehta-harness/kafka that referenced this pull request Feb 9, 2024
…e#12644) (#35)

Implements KIP-862: https://cwiki.apache.org/confluence/x/WSf1D

Reviewers: Guozhang Wang <guozhang@apache.org>,  Austin Heyne <aheyne>, John Roesler <vvcephei@apache.org>

Co-authored-by: Vicky Papavasileiou <vpapavas@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants