Skip to content

Conversation

@DanielMe
Copy link

@DanielMe DanielMe commented Feb 11, 2019

This change wraps the joins from the joinlibrary extension in individual PTransforms. I also provide overloaded methods which allow to name the corresponding nodes in the graph.

The background of this change is that currently it is not possible to have multiple joins in the same pipeline without wrapping them in individual PTransforms as this would generate name clashes.

Consider the following test case:

 @Test
  public void testMultipleJoinsInSamePipeline() {
    leftListOfKv.add(KV.of("Key2", 4L));
    PCollection<KV<String, Long>> leftCollection = p.apply("CreateLeft", Create.of(leftListOfKv));

    rightListOfKv.add(KV.of("Key2", "bar"));
    PCollection<KV<String, String>> rightCollection =
            p.apply("CreateRight", Create.of(rightListOfKv));

    expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));

    PCollection<KV<String, KV<Long, String>>> output1 =
            Join.innerJoin(leftCollection, rightCollection);
    PCollection<KV<String, KV<Long, String>>> output2 =
            Join.innerJoin(leftCollection, rightCollection);
    PAssert.that(output1).containsInAnyOrder(expectedResult);
    PAssert.that(output2).containsInAnyOrder(expectedResult);

    p.run();
  }

With the change contained in this PR, the same code would still fail but there is now an overloaded call to Join.innerJoin so that the corresponding nodes in the execution graph receive different names (see test case below).

The change is backwards compatible. Two other side benefits are:

  • The naming of the transformation is slightly more intuitive in case you want to debug / look at the execution graph it's now easier to see which steps correspond to the join.

  • It also allows using the PTransform directly writing something like myCollection1.apply(InnerJoin.with(myCollection2)) which some people might find more intuitive.

  • Choose reviewer(s) and mention them in a comment (R: @username).

  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.

  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

Post-Commit Tests Status (on master branch)

Lang SDK Apex Dataflow Flink Gearpump Samza Spark
Go Build Status --- --- --- --- --- ---
Java Build Status Build Status Build Status Build Status
Build Status
Build Status
Build Status Build Status Build Status
Python Build Status --- Build Status
Build Status
Build Status --- --- ---

@DanielMe
Copy link
Author

R: @lukecwik

@DanielMe DanielMe changed the title Allow multiple Joins in the same pipeline [BEAM-6719] Allow multiple Joins in the same pipeline Feb 20, 2019
@DanielMe
Copy link
Author

See the matching JIRA ticket here: https://issues.apache.org/jira/browse/BEAM-6719

@pabloem
Copy link
Member

pabloem commented Mar 11, 2019

Hello Daniel! I'm so sorry that we did not pick this up. Luke is away on leave, so we'd need to get you a new reviewer.

@pabloem pabloem requested review from iemejia and pabloem March 11, 2019 17:56
@pabloem
Copy link
Member

pabloem commented Mar 11, 2019

I've requested myself and ismael as reviewers. I'll take a look soon.

@DanielMe
Copy link
Author

No worries @pabloem, thanks for picking this up.

@pabloem
Copy link
Member

pabloem commented Mar 25, 2019

Hi Daniel! Sorry it took so long. This change looks good, and I appreciate the other pluses provided by it. LGTM.

@pabloem pabloem merged commit 3b03106 into apache:master Mar 25, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants