Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Joining on ROWKEY #2735

merged 9 commits into from May 1, 2019


Copy link

commented Apr 25, 2019


Fixes #2636
Fixes #2525
Fixes #2470

This continues the work to clean up key field handling in the existing code.

This PR addresses issues when joining by ROWKEY.

  • Previous versions of KSQL haven't handled this too well as they introduce unnecessary repartition steps. This is now resolved.
  • Plus there was an outstanding issue from earlier work where the 'new' key field was incorrectly being set to empty on the source added to the metastore when joining on ROWKEY. This is also now resolved.
Review notes:
  1. The first commit is just switching a test to Mockito and refactoring it to keep the next commit clean, so review it by itself and ignore in the main review.
  2. I'd start by looking at joins.json to see the tests that have been fixed and added.
  3. Then look at Analyzer, which now builds the KeyFields that the JoinNode needs.
  4. Then look at JoinNode for the wiring. Note: getKeyField now returns leftKeyField, which may feel wrong, but this is refactored existing functionality. I'll be looking into this more soon, as there are issues around key fields where only the right join key is in the projection.
  5. The look at SchemaKStream, specifically selectKey as this is where the magic happens. This will, if using new key fields i.e. if a new query, early out and not repartition is selecting rowkey. It still repartitions if using legacy key fields. This method also builds a new key field, replacing ROWKEY in the new key, and uses this to create new SchemaKStream, which fixes the empty key issue.
  6. Take a look at PhysicalPlanBuilderTest as this adds functional tests around the different combinations on join keys.
  7. The rest is just plumbing...

Testing done

Extensive JSON and Unit tests added.

Reviewer checklist

  • Ensure docs are updated if necessary. (eg. if a user visible feature is being added or changed).
  • Ensure relevant issues are linked (description should include text like "Fixes #")

big-andy-coates added some commits Apr 25, 2019

Fix the handling of ROWKEY in joins - sources added to the metastore …
…now have the correct keyfield - unnecessary repartition steps of streams now avoided in new queries, but maintained in existing queries.

@big-andy-coates big-andy-coates requested a review from confluentinc/ksql as a code owner Apr 25, 2019

big-andy-coates added some commits Apr 25, 2019

@agavra agavra added this to the 5.3 milestone Apr 25, 2019


agavra approved these changes Apr 25, 2019

Copy link

left a comment

Really nice change @big-andy-coates. Took me a while to get my head wrapped around exactly what was changing despite it being, in essence, a pretty simple change... The data flow has such high complexity around this part of the code I wish there was a simpler way to model it!

@agavra agavra requested a review from confluentinc/ksql Apr 25, 2019

big-andy-coates added some commits Apr 28, 2019

Switch `JoinNode` to take left and right _join keys_ as constructor a…
…rgs, as this is what they are. They are not KeyFields.

 Clean up some naming to make the code easier to understand.
Copy link

left a comment

Thanks @big-andy-coates

@@ -104,12 +107,38 @@
private static final String FILTER_MAPVALUES_NODE = "KSTREAM-MAPVALUES-0000000004";
private static final String FOREACH_NODE = "KSTREAM-FOREACH-0000000005";

private static final String createStream = "CREATE STREAM TEST1 (COL0 BIGINT, COL1 VARCHAR, COL2 DOUBLE) WITH ( "
+ "KAFKA_TOPIC = 'test1', VALUE_FORMAT = 'JSON' );";
private static final String CREATE_STREAM_TEST1 = "CREATE STREAM TEST1 "

This comment has been minimized.

Copy link

hjafarpour Apr 29, 2019


For better readability you could include stream/table in the name. For instance, Create Stream Stream_test1 ...

This comment has been minimized.

Copy link

big-andy-coates Apr 30, 2019

Author Member

That's what I've already done...


@big-andy-coates big-andy-coates merged commit aa512cb into confluentinc:master May 1, 2019

1 check passed

continuous-integration/jenkins/pr-merge This commit looks good

@big-andy-coates big-andy-coates deleted the big-andy-coates:rowkey_join branch May 1, 2019

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
None yet
3 participants
You can’t perform that action at this time.