From 2cf2334b524af2596b316140eb6471193a432711 Mon Sep 17 00:00:00 2001 From: Daniel Mescheder Date: Mon, 11 Feb 2019 10:24:50 +0100 Subject: [PATCH 1/3] wrap join in PTransform to allow multiple joins in same pipeline --- .../beam/sdk/extensions/joinlibrary/Join.java | 673 +++++++++++------- .../extensions/joinlibrary/InnerJoinTest.java | 49 +- .../joinlibrary/OuterFullJoinTest.java | 47 +- .../joinlibrary/OuterLeftJoinTest.java | 47 +- .../joinlibrary/OuterRightJoinTest.java | 47 +- 5 files changed, 568 insertions(+), 295 deletions(-) diff --git a/sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java b/sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java index 704450156c09..70ab00fa1547 100644 --- a/sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java +++ b/sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java @@ -17,10 +17,9 @@ */ package org.apache.beam.sdk.extensions.joinlibrary; -import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkNotNull; - import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.join.CoGbkResult; import org.apache.beam.sdk.transforms.join.CoGroupByKey; @@ -29,249 +28,441 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TupleTag; +import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkNotNull; + /** * Utility class with different versions of joins. All methods join two collections of key/value * pairs (KV). */ public class Join { - /** - * Inner join of two collections of KV elements. - * - * @param leftCollection Left side collection to join. - * @param rightCollection Right side collection to join. - * @param Type of the key for both collections - * @param Type of the values for the left collection. - * @param Type of the values for the right collection. - * @return A joined collection of KV where Key is the key and value is a KV where Key is of type - * V1 and Value is type V2. - */ - public static PCollection>> innerJoin( - final PCollection> leftCollection, final PCollection> rightCollection) { - checkNotNull(leftCollection); - checkNotNull(rightCollection); - - final TupleTag v1Tuple = new TupleTag<>(); - final TupleTag v2Tuple = new TupleTag<>(); - - PCollection> coGbkResultCollection = - KeyedPCollectionTuple.of(v1Tuple, leftCollection) - .and(v2Tuple, rightCollection) - .apply("CoGBK", CoGroupByKey.create()); - - return coGbkResultCollection - .apply( - "Join", - ParDo.of( - new DoFn, KV>>() { - @ProcessElement - public void processElement(ProcessContext c) { - KV e = c.element(); - - Iterable leftValuesIterable = e.getValue().getAll(v1Tuple); - Iterable rightValuesIterable = e.getValue().getAll(v2Tuple); - - for (V1 leftValue : leftValuesIterable) { - for (V2 rightValue : rightValuesIterable) { - c.output(KV.of(e.getKey(), KV.of(leftValue, rightValue))); - } - } - } - })) - .setCoder( - KvCoder.of( - ((KvCoder) leftCollection.getCoder()).getKeyCoder(), - KvCoder.of( - ((KvCoder) leftCollection.getCoder()).getValueCoder(), - ((KvCoder) rightCollection.getCoder()).getValueCoder()))); - } - - /** - * Left Outer Join of two collections of KV elements. - * - * @param leftCollection Left side collection to join. - * @param rightCollection Right side collection to join. - * @param nullValue Value to use as null value when right side do not match left side. - * @param Type of the key for both collections - * @param Type of the values for the left collection. - * @param Type of the values for the right collection. - * @return A joined collection of KV where Key is the key and value is a KV where Key is of type - * V1 and Value is type V2. Values that should be null or empty is replaced with nullValue. - */ - public static PCollection>> leftOuterJoin( - final PCollection> leftCollection, - final PCollection> rightCollection, - final V2 nullValue) { - checkNotNull(leftCollection); - checkNotNull(rightCollection); - checkNotNull(nullValue); - - final TupleTag v1Tuple = new TupleTag<>(); - final TupleTag v2Tuple = new TupleTag<>(); - - PCollection> coGbkResultCollection = - KeyedPCollectionTuple.of(v1Tuple, leftCollection) - .and(v2Tuple, rightCollection) - .apply("CoGBK", CoGroupByKey.create()); - - return coGbkResultCollection - .apply( - "Join", - ParDo.of( - new DoFn, KV>>() { - @ProcessElement - public void processElement(ProcessContext c) { - KV e = c.element(); - - Iterable leftValuesIterable = e.getValue().getAll(v1Tuple); - Iterable rightValuesIterable = e.getValue().getAll(v2Tuple); - - for (V1 leftValue : leftValuesIterable) { - if (rightValuesIterable.iterator().hasNext()) { - for (V2 rightValue : rightValuesIterable) { - c.output(KV.of(e.getKey(), KV.of(leftValue, rightValue))); - } - } else { - c.output(KV.of(e.getKey(), KV.of(leftValue, nullValue))); - } - } - } - })) - .setCoder( - KvCoder.of( - ((KvCoder) leftCollection.getCoder()).getKeyCoder(), - KvCoder.of( - ((KvCoder) leftCollection.getCoder()).getValueCoder(), - ((KvCoder) rightCollection.getCoder()).getValueCoder()))); - } - - /** - * Right Outer Join of two collections of KV elements. - * - * @param leftCollection Left side collection to join. - * @param rightCollection Right side collection to join. - * @param nullValue Value to use as null value when left side do not match right side. - * @param Type of the key for both collections - * @param Type of the values for the left collection. - * @param Type of the values for the right collection. - * @return A joined collection of KV where Key is the key and value is a KV where Key is of type - * V1 and Value is type V2. Values that should be null or empty is replaced with nullValue. - */ - public static PCollection>> rightOuterJoin( - final PCollection> leftCollection, - final PCollection> rightCollection, - final V1 nullValue) { - checkNotNull(leftCollection); - checkNotNull(rightCollection); - checkNotNull(nullValue); - - final TupleTag v1Tuple = new TupleTag<>(); - final TupleTag v2Tuple = new TupleTag<>(); - - PCollection> coGbkResultCollection = - KeyedPCollectionTuple.of(v1Tuple, leftCollection) - .and(v2Tuple, rightCollection) - .apply("CoGBK", CoGroupByKey.create()); - - return coGbkResultCollection - .apply( - "Join", - ParDo.of( - new DoFn, KV>>() { - @ProcessElement - public void processElement(ProcessContext c) { - KV e = c.element(); - - Iterable leftValuesIterable = e.getValue().getAll(v1Tuple); - Iterable rightValuesIterable = e.getValue().getAll(v2Tuple); - - for (V2 rightValue : rightValuesIterable) { - if (leftValuesIterable.iterator().hasNext()) { - for (V1 leftValue : leftValuesIterable) { - c.output(KV.of(e.getKey(), KV.of(leftValue, rightValue))); - } - } else { - c.output(KV.of(e.getKey(), KV.of(nullValue, rightValue))); - } - } - } - })) - .setCoder( - KvCoder.of( - ((KvCoder) leftCollection.getCoder()).getKeyCoder(), - KvCoder.of( - ((KvCoder) leftCollection.getCoder()).getValueCoder(), - ((KvCoder) rightCollection.getCoder()).getValueCoder()))); - } - - /** - * Full Outer Join of two collections of KV elements. - * - * @param leftCollection Left side collection to join. - * @param rightCollection Right side collection to join. - * @param leftNullValue Value to use as null value when left side do not match right side. - * @param rightNullValue Value to use as null value when right side do not match right side. - * @param Type of the key for both collections - * @param Type of the values for the left collection. - * @param Type of the values for the right collection. - * @return A joined collection of KV where Key is the key and value is a KV where Key is of type - * V1 and Value is type V2. Values that should be null or empty is replaced with - * leftNullValue/rightNullValue. - */ - public static PCollection>> fullOuterJoin( - final PCollection> leftCollection, - final PCollection> rightCollection, - final V1 leftNullValue, - final V2 rightNullValue) { - checkNotNull(leftCollection); - checkNotNull(rightCollection); - checkNotNull(leftNullValue); - checkNotNull(rightNullValue); - - final TupleTag v1Tuple = new TupleTag<>(); - final TupleTag v2Tuple = new TupleTag<>(); - - PCollection> coGbkResultCollection = - KeyedPCollectionTuple.of(v1Tuple, leftCollection) - .and(v2Tuple, rightCollection) - .apply("CoGBK", CoGroupByKey.create()); - - return coGbkResultCollection - .apply( - "Join", - ParDo.of( - new DoFn, KV>>() { - @ProcessElement - public void processElement(ProcessContext c) { - KV e = c.element(); - - Iterable leftValuesIterable = e.getValue().getAll(v1Tuple); - Iterable rightValuesIterable = e.getValue().getAll(v2Tuple); - if (leftValuesIterable.iterator().hasNext() - && rightValuesIterable.iterator().hasNext()) { - for (V2 rightValue : rightValuesIterable) { - for (V1 leftValue : leftValuesIterable) { - c.output(KV.of(e.getKey(), KV.of(leftValue, rightValue))); - } - } - } else if (leftValuesIterable.iterator().hasNext() - && !rightValuesIterable.iterator().hasNext()) { - for (V1 leftValue : leftValuesIterable) { - c.output(KV.of(e.getKey(), KV.of(leftValue, rightNullValue))); - } - } else if (!leftValuesIterable.iterator().hasNext() - && rightValuesIterable.iterator().hasNext()) { - for (V2 rightValue : rightValuesIterable) { - c.output(KV.of(e.getKey(), KV.of(leftNullValue, rightValue))); - } - } - } - })) - .setCoder( - KvCoder.of( - ((KvCoder) leftCollection.getCoder()).getKeyCoder(), - KvCoder.of( - ((KvCoder) leftCollection.getCoder()).getValueCoder(), - ((KvCoder) rightCollection.getCoder()).getValueCoder()))); - } + /** + * PTransform representing an inner join of two collections of KV elements. + * + * @param Type of the key for both collections + * @param Type of the values for the left collection. + * @param Type of the values for the right collection. + */ + public static class InnerJoin extends PTransform>, PCollection>>> { + + private transient PCollection> rightCollection; + + private InnerJoin(PCollection> rightCollection) { + this.rightCollection = rightCollection; + } + + public static InnerJoin with(PCollection> rightCollection) { + return new InnerJoin<>(rightCollection); + } + + @Override + public PCollection>> expand(PCollection> leftCollection) { + checkNotNull(leftCollection); + checkNotNull(rightCollection); + + final TupleTag v1Tuple = new TupleTag<>(); + final TupleTag v2Tuple = new TupleTag<>(); + + PCollection> coGbkResultCollection = + KeyedPCollectionTuple.of(v1Tuple, leftCollection) + .and(v2Tuple, rightCollection) + .apply("CoGBK", CoGroupByKey.create()); + + return coGbkResultCollection + .apply( + "Join", + ParDo.of( + new DoFn, KV>>() { + @ProcessElement + public void processElement(ProcessContext c) { + KV e = c.element(); + + Iterable leftValuesIterable = e.getValue().getAll(v1Tuple); + Iterable rightValuesIterable = e.getValue().getAll(v2Tuple); + + for (V1 leftValue : leftValuesIterable) { + for (V2 rightValue : rightValuesIterable) { + c.output(KV.of(e.getKey(), KV.of(leftValue, rightValue))); + } + } + } + })) + .setCoder( + KvCoder.of( + ((KvCoder) leftCollection.getCoder()).getKeyCoder(), + KvCoder.of( + ((KvCoder) leftCollection.getCoder()).getValueCoder(), + ((KvCoder) rightCollection.getCoder()).getValueCoder()))); + } + + } + + /** + * PTransform representing a left outer join of two collections of KV elements. + * + * @param Type of the key for both collections + * @param Type of the values for the left collection. + * @param Type of the values for the right collection. + */ + public static class LeftOuterJoin extends PTransform>, PCollection>>> { + + private transient PCollection> rightCollection; + private V2 nullValue; + + private LeftOuterJoin(PCollection> rightCollection, V2 nullValue) { + this.rightCollection = rightCollection; + this.nullValue = nullValue; + } + + public static LeftOuterJoin with(PCollection> rightCollection, V2 nullValue) { + return new LeftOuterJoin<>(rightCollection, nullValue); + } + + @Override + public PCollection>> expand(PCollection> leftCollection) { + checkNotNull(leftCollection); + checkNotNull(rightCollection); + checkNotNull(nullValue); + final TupleTag v1Tuple = new TupleTag<>(); + final TupleTag v2Tuple = new TupleTag<>(); + + PCollection> coGbkResultCollection = + KeyedPCollectionTuple.of(v1Tuple, leftCollection) + .and(v2Tuple, rightCollection) + .apply("CoGBK", CoGroupByKey.create()); + + return coGbkResultCollection + .apply( + "Join", + ParDo.of( + new DoFn, KV>>() { + @ProcessElement + public void processElement(ProcessContext c) { + KV e = c.element(); + + Iterable leftValuesIterable = e.getValue().getAll(v1Tuple); + Iterable rightValuesIterable = e.getValue().getAll(v2Tuple); + + for (V1 leftValue : leftValuesIterable) { + if (rightValuesIterable.iterator().hasNext()) { + for (V2 rightValue : rightValuesIterable) { + c.output(KV.of(e.getKey(), KV.of(leftValue, rightValue))); + } + } else { + c.output(KV.of(e.getKey(), KV.of(leftValue, nullValue))); + } + } + } + })) + .setCoder( + KvCoder.of( + ((KvCoder) leftCollection.getCoder()).getKeyCoder(), + KvCoder.of( + ((KvCoder) leftCollection.getCoder()).getValueCoder(), + ((KvCoder) rightCollection.getCoder()).getValueCoder()))); + } + + } + + /** + * PTransform representing a right outer join of two collections of KV elements. + * + * @param Type of the key for both collections + * @param Type of the values for the left collection. + * @param Type of the values for the right collection. + */ + public static class RightOuterJoin extends PTransform>, PCollection>>> { + + private transient PCollection> rightCollection; + private V1 nullValue; + + private RightOuterJoin(PCollection> rightCollection, V1 nullValue) { + this.rightCollection = rightCollection; + this.nullValue = nullValue; + } + + public static RightOuterJoin with(PCollection> rightCollection, V1 nullValue) { + return new RightOuterJoin<>(rightCollection, nullValue); + } + + @Override + public PCollection>> expand(PCollection> leftCollection) { + checkNotNull(leftCollection); + checkNotNull(rightCollection); + checkNotNull(nullValue); + + final TupleTag v1Tuple = new TupleTag<>(); + final TupleTag v2Tuple = new TupleTag<>(); + + PCollection> coGbkResultCollection = + KeyedPCollectionTuple.of(v1Tuple, leftCollection) + .and(v2Tuple, rightCollection) + .apply("CoGBK", CoGroupByKey.create()); + + return coGbkResultCollection + .apply( + "Join", + ParDo.of( + new DoFn, KV>>() { + @ProcessElement + public void processElement(ProcessContext c) { + KV e = c.element(); + + Iterable leftValuesIterable = e.getValue().getAll(v1Tuple); + Iterable rightValuesIterable = e.getValue().getAll(v2Tuple); + + for (V2 rightValue : rightValuesIterable) { + if (leftValuesIterable.iterator().hasNext()) { + for (V1 leftValue : leftValuesIterable) { + c.output(KV.of(e.getKey(), KV.of(leftValue, rightValue))); + } + } else { + c.output(KV.of(e.getKey(), KV.of(nullValue, rightValue))); + } + } + } + })) + .setCoder( + KvCoder.of( + ((KvCoder) leftCollection.getCoder()).getKeyCoder(), + KvCoder.of( + ((KvCoder) leftCollection.getCoder()).getValueCoder(), + ((KvCoder) rightCollection.getCoder()).getValueCoder()))); + } + + } + + /** + * PTransform representing a full outer join of two collections of KV elements. + * + * @param Type of the key for both collections + * @param Type of the values for the left collection. + * @param Type of the values for the right collection. + */ + public static class FullOuterJoin extends PTransform>, PCollection>>> { + + private transient PCollection> rightCollection; + private V1 leftNullValue; + private V2 rightNullValue; + + private FullOuterJoin(PCollection> rightCollection, V1 leftNullValue, V2 rightNullValue) { + this.rightCollection = rightCollection; + this.leftNullValue = leftNullValue; + this.rightNullValue = rightNullValue; + } + + public static FullOuterJoin with(PCollection> rightCollection, V1 leftNullValue, V2 rightNullValue) { + return new FullOuterJoin<>(rightCollection, leftNullValue, rightNullValue); + } + + @Override + public PCollection>> expand(PCollection> leftCollection) { + checkNotNull(leftCollection); + checkNotNull(rightCollection); + checkNotNull(leftNullValue); + checkNotNull(rightNullValue); + + + final TupleTag v1Tuple = new TupleTag<>(); + final TupleTag v2Tuple = new TupleTag<>(); + + PCollection> coGbkResultCollection = + KeyedPCollectionTuple.of(v1Tuple, leftCollection) + .and(v2Tuple, rightCollection) + .apply("CoGBK", CoGroupByKey.create()); + + return coGbkResultCollection + .apply( + "Join", + ParDo.of( + new DoFn, KV>>() { + @ProcessElement + public void processElement(ProcessContext c) { + KV e = c.element(); + + Iterable leftValuesIterable = e.getValue().getAll(v1Tuple); + Iterable rightValuesIterable = e.getValue().getAll(v2Tuple); + if (leftValuesIterable.iterator().hasNext() + && rightValuesIterable.iterator().hasNext()) { + for (V2 rightValue : rightValuesIterable) { + for (V1 leftValue : leftValuesIterable) { + c.output(KV.of(e.getKey(), KV.of(leftValue, rightValue))); + } + } + } else if (leftValuesIterable.iterator().hasNext() + && !rightValuesIterable.iterator().hasNext()) { + for (V1 leftValue : leftValuesIterable) { + c.output(KV.of(e.getKey(), KV.of(leftValue, rightNullValue))); + } + } else if (!leftValuesIterable.iterator().hasNext() + && rightValuesIterable.iterator().hasNext()) { + for (V2 rightValue : rightValuesIterable) { + c.output(KV.of(e.getKey(), KV.of(leftNullValue, rightValue))); + } + } + } + })) + .setCoder( + KvCoder.of( + ((KvCoder) leftCollection.getCoder()).getKeyCoder(), + KvCoder.of( + ((KvCoder) leftCollection.getCoder()).getValueCoder(), + ((KvCoder) rightCollection.getCoder()).getValueCoder()))); + } + + } + + + /** + * Inner join of two collections of KV elements. + * + * @param leftCollection Left side collection to join. + * @param rightCollection Right side collection to join. + * @param Type of the key for both collections + * @param Type of the values for the left collection. + * @param Type of the values for the right collection. + * @return A joined collection of KV where Key is the key and value is a KV where Key is of type + * V1 and Value is type V2. + */ + public static PCollection>> innerJoin( + final PCollection> leftCollection, final PCollection> rightCollection) { + return innerJoin("InnerJoin", leftCollection, rightCollection); + } + + /** + * Inner join of two collections of KV elements. + * + * @param name Name of the PTransform. + * @param leftCollection Left side collection to join. + * @param rightCollection Right side collection to join. + * @param Type of the key for both collections + * @param Type of the values for the left collection. + * @param Type of the values for the right collection. + * @return A joined collection of KV where Key is the key and value is a KV where Key is of type + * V1 and Value is type V2. + */ + public static PCollection>> innerJoin( + final String name, + final PCollection> leftCollection, final PCollection> rightCollection) { + return leftCollection.apply(name, InnerJoin.with(rightCollection)); + } + + + /** + * Left Outer Join of two collections of KV elements. + * + * @param name Name of the PTransform. + * @param leftCollection Left side collection to join. + * @param rightCollection Right side collection to join. + * @param nullValue Value to use as null value when right side do not match left side. + * @param Type of the key for both collections + * @param Type of the values for the left collection. + * @param Type of the values for the right collection. + * @return A joined collection of KV where Key is the key and value is a KV where Key is of type + * V1 and Value is type V2. Values that should be null or empty is replaced with nullValue. + */ + public static PCollection>> leftOuterJoin( + final String name, + final PCollection> leftCollection, + final PCollection> rightCollection, + final V2 nullValue) { + return leftCollection.apply(name, LeftOuterJoin.with(rightCollection, nullValue)); + } + + public static PCollection>> leftOuterJoin( + final PCollection> leftCollection, + final PCollection> rightCollection, + final V2 nullValue) { + return leftOuterJoin("LeftOuterJoin", leftCollection, rightCollection, nullValue); + } + + /** + * Right Outer Join of two collections of KV elements. + * + * @param name Name of the PTransform. + * @param leftCollection Left side collection to join. + * @param rightCollection Right side collection to join. + * @param nullValue Value to use as null value when left side do not match right side. + * @param Type of the key for both collections + * @param Type of the values for the left collection. + * @param Type of the values for the right collection. + * @return A joined collection of KV where Key is the key and value is a KV where Key is of type + * V1 and Value is type V2. Values that should be null or empty is replaced with nullValue. + */ + public static PCollection>> rightOuterJoin( + final String name, + final PCollection> leftCollection, + final PCollection> rightCollection, + final V1 nullValue) { + return leftCollection.apply(name, RightOuterJoin.with(rightCollection, nullValue)); + + } + + /** + * Right Outer Join of two collections of KV elements. + * + * @param leftCollection Left side collection to join. + * @param rightCollection Right side collection to join. + * @param nullValue Value to use as null value when left side do not match right side. + * @param Type of the key for both collections + * @param Type of the values for the left collection. + * @param Type of the values for the right collection. + * @return A joined collection of KV where Key is the key and value is a KV where Key is of type + * V1 and Value is type V2. Values that should be null or empty is replaced with nullValue. + */ + public static PCollection>> rightOuterJoin( + final PCollection> leftCollection, + final PCollection> rightCollection, + final V1 nullValue) { + return rightOuterJoin("RightOuterJoin", leftCollection, rightCollection, nullValue); + + } + + /** + * Full Outer Join of two collections of KV elements. + * + * @param name Name of the PTransform. + * @param leftCollection Left side collection to join. + * @param rightCollection Right side collection to join. + * @param leftNullValue Value to use as null value when left side do not match right side. + * @param rightNullValue Value to use as null value when right side do not match right side. + * @param Type of the key for both collections + * @param Type of the values for the left collection. + * @param Type of the values for the right collection. + * @return A joined collection of KV where Key is the key and value is a KV where Key is of type + * V1 and Value is type V2. Values that should be null or empty is replaced with + * leftNullValue/rightNullValue. + */ + public static PCollection>> fullOuterJoin( + final String name, + final PCollection> leftCollection, + final PCollection> rightCollection, + final V1 leftNullValue, + final V2 rightNullValue) { + return leftCollection.apply(name, FullOuterJoin.with(rightCollection, leftNullValue, rightNullValue)); + + } + + /** + * Full Outer Join of two collections of KV elements. + * + * @param leftCollection Left side collection to join. + * @param rightCollection Right side collection to join. + * @param leftNullValue Value to use as null value when left side do not match right side. + * @param rightNullValue Value to use as null value when right side do not match right side. + * @param Type of the key for both collections + * @param Type of the values for the left collection. + * @param Type of the values for the right collection. + * @return A joined collection of KV where Key is the key and value is a KV where Key is of type + * V1 and Value is type V2. Values that should be null or empty is replaced with + * leftNullValue/rightNullValue. + */ + public static PCollection>> fullOuterJoin( + final PCollection> leftCollection, + final PCollection> rightCollection, + final V1 leftNullValue, + final V2 rightNullValue) { + return fullOuterJoin("FullOuterJoin", leftCollection, rightCollection, leftNullValue, rightNullValue); + + } } diff --git a/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/InnerJoinTest.java b/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/InnerJoinTest.java index e32e2f50bf82..7348a8ab463f 100644 --- a/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/InnerJoinTest.java +++ b/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/InnerJoinTest.java @@ -34,7 +34,7 @@ /** This test Inner Join functionality. */ public class InnerJoinTest { private List> leftListOfKv; - private List> listRightOfKv; + private List> rightListOfKv; private List>> expectedResult; @Rule public final transient TestPipeline p = TestPipeline.create(); @@ -43,7 +43,7 @@ public class InnerJoinTest { public void setup() { leftListOfKv = new ArrayList<>(); - listRightOfKv = new ArrayList<>(); + rightListOfKv = new ArrayList<>(); expectedResult = new ArrayList<>(); } @@ -54,10 +54,10 @@ public void testJoinOneToOneMapping() { leftListOfKv.add(KV.of("Key2", 4L)); PCollection> leftCollection = p.apply("CreateLeft", Create.of(leftListOfKv)); - listRightOfKv.add(KV.of("Key1", "foo")); - listRightOfKv.add(KV.of("Key2", "bar")); + rightListOfKv.add(KV.of("Key1", "foo")); + rightListOfKv.add(KV.of("Key2", "bar")); PCollection> rightCollection = - p.apply("CreateRight", Create.of(listRightOfKv)); + p.apply("CreateRight", Create.of(rightListOfKv)); PCollection>> output = Join.innerJoin(leftCollection, rightCollection); @@ -74,10 +74,9 @@ public void testJoinOneToManyMapping() { leftListOfKv.add(KV.of("Key2", 4L)); PCollection> leftCollection = p.apply("CreateLeft", Create.of(leftListOfKv)); - listRightOfKv.add(KV.of("Key2", "bar")); - listRightOfKv.add(KV.of("Key2", "gazonk")); - PCollection> rightCollection = - p.apply("CreateRight", Create.of(listRightOfKv)); + rightListOfKv.add(KV.of("Key2", "bar")); + rightListOfKv.add(KV.of("Key2", "gazonk")); + PCollection> rightCollection = p.apply("CreateRight", Create.of(rightListOfKv)); PCollection>> output = Join.innerJoin(leftCollection, rightCollection); @@ -95,9 +94,8 @@ public void testJoinManyToOneMapping() { leftListOfKv.add(KV.of("Key2", 6L)); PCollection> leftCollection = p.apply("CreateLeft", Create.of(leftListOfKv)); - listRightOfKv.add(KV.of("Key2", "bar")); - PCollection> rightCollection = - p.apply("CreateRight", Create.of(listRightOfKv)); + rightListOfKv.add(KV.of("Key2", "bar")); + PCollection> rightCollection = p.apply("CreateRight", Create.of(rightListOfKv)); PCollection>> output = Join.innerJoin(leftCollection, rightCollection); @@ -114,9 +112,9 @@ public void testJoinNoneToNoneMapping() { leftListOfKv.add(KV.of("Key2", 4L)); PCollection> leftCollection = p.apply("CreateLeft", Create.of(leftListOfKv)); - listRightOfKv.add(KV.of("Key3", "bar")); + rightListOfKv.add(KV.of("Key3", "bar")); PCollection> rightCollection = - p.apply("CreateRight", Create.of(listRightOfKv)); + p.apply("CreateRight", Create.of(rightListOfKv)); PCollection>> output = Join.innerJoin(leftCollection, rightCollection); @@ -125,13 +123,34 @@ public void testJoinNoneToNoneMapping() { p.run(); } + @Test + public void testMultipleJoinsInSamePipeline() { + leftListOfKv.add(KV.of("Key2", 4L)); + PCollection> leftCollection = p.apply("CreateLeft", Create.of(leftListOfKv)); + + rightListOfKv.add(KV.of("Key2", "bar")); + PCollection> rightCollection = + p.apply("CreateRight", Create.of(rightListOfKv)); + + expectedResult.add(KV.of("Key2", KV.of(4L, "bar"))); + + PCollection>> output1 = + Join.innerJoin("Join1", leftCollection, rightCollection); + PCollection>> output2 = + Join.innerJoin("Join2", leftCollection, rightCollection); + PAssert.that(output1).containsInAnyOrder(expectedResult); + PAssert.that(output2).containsInAnyOrder(expectedResult); + + p.run(); + } + @Test(expected = NullPointerException.class) public void testJoinLeftCollectionNull() { p.enableAbandonedNodeEnforcement(false); Join.innerJoin( null, p.apply( - Create.of(listRightOfKv) + Create.of(rightListOfKv) .withCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())))); } diff --git a/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterFullJoinTest.java b/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterFullJoinTest.java index ba4d0b49e9b4..b23475781785 100644 --- a/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterFullJoinTest.java +++ b/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterFullJoinTest.java @@ -34,7 +34,7 @@ /** This test Outer Full Join functionality. */ public class OuterFullJoinTest { private List> leftListOfKv; - private List> listRightOfKv; + private List> rightListOfKv; private List>> expectedResult; @Rule public final transient TestPipeline p = TestPipeline.create(); @@ -43,7 +43,7 @@ public class OuterFullJoinTest { public void setup() { leftListOfKv = new ArrayList<>(); - listRightOfKv = new ArrayList<>(); + rightListOfKv = new ArrayList<>(); expectedResult = new ArrayList<>(); } @@ -54,10 +54,10 @@ public void testJoinOneToOneMapping() { leftListOfKv.add(KV.of("Key2", 4L)); PCollection> leftCollection = p.apply("CreateLeft", Create.of(leftListOfKv)); - listRightOfKv.add(KV.of("Key1", "foo")); - listRightOfKv.add(KV.of("Key2", "bar")); + rightListOfKv.add(KV.of("Key1", "foo")); + rightListOfKv.add(KV.of("Key2", "bar")); PCollection> rightCollection = - p.apply("CreateRight", Create.of(listRightOfKv)); + p.apply("CreateRight", Create.of(rightListOfKv)); PCollection>> output = Join.fullOuterJoin(leftCollection, rightCollection, -1L, ""); @@ -74,10 +74,10 @@ public void testJoinOneToManyMapping() { leftListOfKv.add(KV.of("Key2", 4L)); PCollection> leftCollection = p.apply("CreateLeft", Create.of(leftListOfKv)); - listRightOfKv.add(KV.of("Key2", "bar")); - listRightOfKv.add(KV.of("Key2", "gazonk")); + rightListOfKv.add(KV.of("Key2", "bar")); + rightListOfKv.add(KV.of("Key2", "gazonk")); PCollection> rightCollection = - p.apply("CreateRight", Create.of(listRightOfKv)); + p.apply("CreateRight", Create.of(rightListOfKv)); PCollection>> output = Join.fullOuterJoin(leftCollection, rightCollection, -1L, ""); @@ -95,9 +95,9 @@ public void testJoinManyToOneMapping() { leftListOfKv.add(KV.of("Key2", 6L)); PCollection> leftCollection = p.apply("CreateLeft", Create.of(leftListOfKv)); - listRightOfKv.add(KV.of("Key2", "bar")); + rightListOfKv.add(KV.of("Key2", "bar")); PCollection> rightCollection = - p.apply("CreateRight", Create.of(listRightOfKv)); + p.apply("CreateRight", Create.of(rightListOfKv)); PCollection>> output = Join.fullOuterJoin(leftCollection, rightCollection, -1L, ""); @@ -114,9 +114,9 @@ public void testJoinNoneToNoneMapping() { leftListOfKv.add(KV.of("Key2", 4L)); PCollection> leftCollection = p.apply("CreateLeft", Create.of(leftListOfKv)); - listRightOfKv.add(KV.of("Key3", "bar")); + rightListOfKv.add(KV.of("Key3", "bar")); PCollection> rightCollection = - p.apply("CreateRight", Create.of(listRightOfKv)); + p.apply("CreateRight", Create.of(rightListOfKv)); PCollection>> output = Join.fullOuterJoin(leftCollection, rightCollection, -1L, ""); @@ -127,13 +127,34 @@ public void testJoinNoneToNoneMapping() { p.run(); } + @Test + public void testMultipleJoinsInSamePipeline() { + leftListOfKv.add(KV.of("Key2", 4L)); + PCollection> leftCollection = p.apply("CreateLeft", Create.of(leftListOfKv)); + + rightListOfKv.add(KV.of("Key2", "bar")); + PCollection> rightCollection = + p.apply("CreateRight", Create.of(rightListOfKv)); + + expectedResult.add(KV.of("Key2", KV.of(4L, "bar"))); + + PCollection>> output1 = + Join.fullOuterJoin("Join1", leftCollection, rightCollection,-1L, ""); + PCollection>> output2 = + Join.fullOuterJoin("Join2", leftCollection, rightCollection, -1L, ""); + PAssert.that(output1).containsInAnyOrder(expectedResult); + PAssert.that(output2).containsInAnyOrder(expectedResult); + + p.run(); + } + @Test(expected = NullPointerException.class) public void testJoinLeftCollectionNull() { p.enableAbandonedNodeEnforcement(false); Join.fullOuterJoin( null, p.apply( - Create.of(listRightOfKv) + Create.of(rightListOfKv) .withCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))), "", ""); diff --git a/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterLeftJoinTest.java b/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterLeftJoinTest.java index f94f3618fbdd..0f6cdf22a567 100644 --- a/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterLeftJoinTest.java +++ b/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterLeftJoinTest.java @@ -34,7 +34,7 @@ /** This test Outer Left Join functionality. */ public class OuterLeftJoinTest { private List> leftListOfKv; - private List> listRightOfKv; + private List> rightListOfKv; private List>> expectedResult; @Rule public final transient TestPipeline p = TestPipeline.create(); @@ -43,7 +43,7 @@ public class OuterLeftJoinTest { public void setup() { leftListOfKv = new ArrayList<>(); - listRightOfKv = new ArrayList<>(); + rightListOfKv = new ArrayList<>(); expectedResult = new ArrayList<>(); } @@ -54,10 +54,10 @@ public void testJoinOneToOneMapping() { leftListOfKv.add(KV.of("Key2", 4L)); PCollection> leftCollection = p.apply("CreateLeft", Create.of(leftListOfKv)); - listRightOfKv.add(KV.of("Key1", "foo")); - listRightOfKv.add(KV.of("Key2", "bar")); + rightListOfKv.add(KV.of("Key1", "foo")); + rightListOfKv.add(KV.of("Key2", "bar")); PCollection> rightCollection = - p.apply("CreateRight", Create.of(listRightOfKv)); + p.apply("CreateRight", Create.of(rightListOfKv)); PCollection>> output = Join.leftOuterJoin(leftCollection, rightCollection, ""); @@ -74,10 +74,10 @@ public void testJoinOneToManyMapping() { leftListOfKv.add(KV.of("Key2", 4L)); PCollection> leftCollection = p.apply("CreateLeft", Create.of(leftListOfKv)); - listRightOfKv.add(KV.of("Key2", "bar")); - listRightOfKv.add(KV.of("Key2", "gazonk")); + rightListOfKv.add(KV.of("Key2", "bar")); + rightListOfKv.add(KV.of("Key2", "gazonk")); PCollection> rightCollection = - p.apply("CreateRight", Create.of(listRightOfKv)); + p.apply("CreateRight", Create.of(rightListOfKv)); PCollection>> output = Join.leftOuterJoin(leftCollection, rightCollection, ""); @@ -95,9 +95,9 @@ public void testJoinManyToOneMapping() { leftListOfKv.add(KV.of("Key2", 6L)); PCollection> leftCollection = p.apply("CreateLeft", Create.of(leftListOfKv)); - listRightOfKv.add(KV.of("Key2", "bar")); + rightListOfKv.add(KV.of("Key2", "bar")); PCollection> rightCollection = - p.apply("CreateRight", Create.of(listRightOfKv)); + p.apply("CreateRight", Create.of(rightListOfKv)); PCollection>> output = Join.leftOuterJoin(leftCollection, rightCollection, ""); @@ -114,9 +114,9 @@ public void testJoinOneToNoneMapping() { leftListOfKv.add(KV.of("Key2", 4L)); PCollection> leftCollection = p.apply("CreateLeft", Create.of(leftListOfKv)); - listRightOfKv.add(KV.of("Key3", "bar")); + rightListOfKv.add(KV.of("Key3", "bar")); PCollection> rightCollection = - p.apply("CreateRight", Create.of(listRightOfKv)); + p.apply("CreateRight", Create.of(rightListOfKv)); PCollection>> output = Join.leftOuterJoin(leftCollection, rightCollection, ""); @@ -126,13 +126,34 @@ public void testJoinOneToNoneMapping() { p.run(); } + @Test + public void testMultipleJoinsInSamePipeline() { + leftListOfKv.add(KV.of("Key2", 4L)); + PCollection> leftCollection = p.apply("CreateLeft", Create.of(leftListOfKv)); + + rightListOfKv.add(KV.of("Key2", "bar")); + PCollection> rightCollection = + p.apply("CreateRight", Create.of(rightListOfKv)); + + expectedResult.add(KV.of("Key2", KV.of(4L, "bar"))); + + PCollection>> output1 = + Join.leftOuterJoin("Join1", leftCollection, rightCollection,""); + PCollection>> output2 = + Join.leftOuterJoin("Join2", leftCollection, rightCollection,""); + PAssert.that(output1).containsInAnyOrder(expectedResult); + PAssert.that(output2).containsInAnyOrder(expectedResult); + + p.run(); + } + @Test(expected = NullPointerException.class) public void testJoinLeftCollectionNull() { p.enableAbandonedNodeEnforcement(false); Join.leftOuterJoin( null, p.apply( - Create.of(listRightOfKv) + Create.of(rightListOfKv) .withCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))), ""); } diff --git a/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterRightJoinTest.java b/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterRightJoinTest.java index 050a182bb8ba..5c808cb083a7 100644 --- a/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterRightJoinTest.java +++ b/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterRightJoinTest.java @@ -34,7 +34,7 @@ /** This test Outer Right Join functionality. */ public class OuterRightJoinTest { private List> leftListOfKv; - private List> listRightOfKv; + private List> rightListOfKv; private List>> expectedResult; @Rule public final transient TestPipeline p = TestPipeline.create(); @@ -43,7 +43,7 @@ public class OuterRightJoinTest { public void setup() { leftListOfKv = new ArrayList<>(); - listRightOfKv = new ArrayList<>(); + rightListOfKv = new ArrayList<>(); expectedResult = new ArrayList<>(); } @@ -54,10 +54,10 @@ public void testJoinOneToOneMapping() { leftListOfKv.add(KV.of("Key2", 4L)); PCollection> leftCollection = p.apply("CreateLeft", Create.of(leftListOfKv)); - listRightOfKv.add(KV.of("Key1", "foo")); - listRightOfKv.add(KV.of("Key2", "bar")); + rightListOfKv.add(KV.of("Key1", "foo")); + rightListOfKv.add(KV.of("Key2", "bar")); PCollection> rightCollection = - p.apply("CreateRight", Create.of(listRightOfKv)); + p.apply("CreateRight", Create.of(rightListOfKv)); PCollection>> output = Join.rightOuterJoin(leftCollection, rightCollection, -1L); @@ -74,10 +74,10 @@ public void testJoinOneToManyMapping() { leftListOfKv.add(KV.of("Key2", 4L)); PCollection> leftCollection = p.apply("CreateLeft", Create.of(leftListOfKv)); - listRightOfKv.add(KV.of("Key2", "bar")); - listRightOfKv.add(KV.of("Key2", "gazonk")); + rightListOfKv.add(KV.of("Key2", "bar")); + rightListOfKv.add(KV.of("Key2", "gazonk")); PCollection> rightCollection = - p.apply("CreateRight", Create.of(listRightOfKv)); + p.apply("CreateRight", Create.of(rightListOfKv)); PCollection>> output = Join.rightOuterJoin(leftCollection, rightCollection, -1L); @@ -95,9 +95,9 @@ public void testJoinManyToOneMapping() { leftListOfKv.add(KV.of("Key2", 6L)); PCollection> leftCollection = p.apply("CreateLeft", Create.of(leftListOfKv)); - listRightOfKv.add(KV.of("Key2", "bar")); + rightListOfKv.add(KV.of("Key2", "bar")); PCollection> rightCollection = - p.apply("CreateRight", Create.of(listRightOfKv)); + p.apply("CreateRight", Create.of(rightListOfKv)); PCollection>> output = Join.rightOuterJoin(leftCollection, rightCollection, -1L); @@ -114,9 +114,9 @@ public void testJoinNoneToOneMapping() { leftListOfKv.add(KV.of("Key2", 4L)); PCollection> leftCollection = p.apply("CreateLeft", Create.of(leftListOfKv)); - listRightOfKv.add(KV.of("Key3", "bar")); + rightListOfKv.add(KV.of("Key3", "bar")); PCollection> rightCollection = - p.apply("CreateRight", Create.of(listRightOfKv)); + p.apply("CreateRight", Create.of(rightListOfKv)); PCollection>> output = Join.rightOuterJoin(leftCollection, rightCollection, -1L); @@ -126,13 +126,34 @@ public void testJoinNoneToOneMapping() { p.run(); } + @Test + public void testMultipleJoinsInSamePipeline() { + leftListOfKv.add(KV.of("Key2", 4L)); + PCollection> leftCollection = p.apply("CreateLeft", Create.of(leftListOfKv)); + + rightListOfKv.add(KV.of("Key2", "bar")); + PCollection> rightCollection = + p.apply("CreateRight", Create.of(rightListOfKv)); + + expectedResult.add(KV.of("Key2", KV.of(4L, "bar"))); + + PCollection>> output1 = + Join.rightOuterJoin("Join1", leftCollection, rightCollection,-1L); + PCollection>> output2 = + Join.rightOuterJoin("Join2", leftCollection, rightCollection,-1L); + PAssert.that(output1).containsInAnyOrder(expectedResult); + PAssert.that(output2).containsInAnyOrder(expectedResult); + + p.run(); + } + @Test(expected = NullPointerException.class) public void testJoinLeftCollectionNull() { p.enableAbandonedNodeEnforcement(false); Join.rightOuterJoin( null, p.apply( - Create.of(listRightOfKv) + Create.of(rightListOfKv) .withCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))), ""); } From 0fc7bfc60ecb2d0604e5cfe2745599ce37fb43ac Mon Sep 17 00:00:00 2001 From: Daniel Mescheder Date: Mon, 11 Feb 2019 22:42:54 +0100 Subject: [PATCH 2/3] fix stylecheck issues --- .../beam/sdk/extensions/joinlibrary/InnerJoinTest.java | 6 ++---- .../beam/sdk/extensions/joinlibrary/OuterFullJoinTest.java | 4 ++-- .../beam/sdk/extensions/joinlibrary/OuterLeftJoinTest.java | 6 ++---- .../beam/sdk/extensions/joinlibrary/OuterRightJoinTest.java | 6 ++---- 4 files changed, 8 insertions(+), 14 deletions(-) diff --git a/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/InnerJoinTest.java b/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/InnerJoinTest.java index 7348a8ab463f..b13748780ea7 100644 --- a/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/InnerJoinTest.java +++ b/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/InnerJoinTest.java @@ -134,10 +134,8 @@ public void testMultipleJoinsInSamePipeline() { expectedResult.add(KV.of("Key2", KV.of(4L, "bar"))); - PCollection>> output1 = - Join.innerJoin("Join1", leftCollection, rightCollection); - PCollection>> output2 = - Join.innerJoin("Join2", leftCollection, rightCollection); + PCollection>> output1 = Join.innerJoin("Join1", leftCollection, rightCollection); + PCollection>> output2 = Join.innerJoin("Join2", leftCollection, rightCollection); PAssert.that(output1).containsInAnyOrder(expectedResult); PAssert.that(output2).containsInAnyOrder(expectedResult); diff --git a/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterFullJoinTest.java b/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterFullJoinTest.java index b23475781785..6a36ffa61d6f 100644 --- a/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterFullJoinTest.java +++ b/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterFullJoinTest.java @@ -139,9 +139,9 @@ public void testMultipleJoinsInSamePipeline() { expectedResult.add(KV.of("Key2", KV.of(4L, "bar"))); PCollection>> output1 = - Join.fullOuterJoin("Join1", leftCollection, rightCollection,-1L, ""); + Join.fullOuterJoin("Join1", leftCollection, rightCollection,-1L, ""); PCollection>> output2 = - Join.fullOuterJoin("Join2", leftCollection, rightCollection, -1L, ""); + Join.fullOuterJoin("Join2", leftCollection, rightCollection, -1L, ""); PAssert.that(output1).containsInAnyOrder(expectedResult); PAssert.that(output2).containsInAnyOrder(expectedResult); diff --git a/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterLeftJoinTest.java b/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterLeftJoinTest.java index 0f6cdf22a567..edc5fdf3591f 100644 --- a/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterLeftJoinTest.java +++ b/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterLeftJoinTest.java @@ -137,10 +137,8 @@ public void testMultipleJoinsInSamePipeline() { expectedResult.add(KV.of("Key2", KV.of(4L, "bar"))); - PCollection>> output1 = - Join.leftOuterJoin("Join1", leftCollection, rightCollection,""); - PCollection>> output2 = - Join.leftOuterJoin("Join2", leftCollection, rightCollection,""); + PCollection>> output1 = Join.leftOuterJoin("Join1", leftCollection, rightCollection,""); + PCollection>> output2 = Join.leftOuterJoin("Join2", leftCollection, rightCollection,""); PAssert.that(output1).containsInAnyOrder(expectedResult); PAssert.that(output2).containsInAnyOrder(expectedResult); diff --git a/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterRightJoinTest.java b/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterRightJoinTest.java index 5c808cb083a7..d2c0de920407 100644 --- a/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterRightJoinTest.java +++ b/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterRightJoinTest.java @@ -137,10 +137,8 @@ public void testMultipleJoinsInSamePipeline() { expectedResult.add(KV.of("Key2", KV.of(4L, "bar"))); - PCollection>> output1 = - Join.rightOuterJoin("Join1", leftCollection, rightCollection,-1L); - PCollection>> output2 = - Join.rightOuterJoin("Join2", leftCollection, rightCollection,-1L); + PCollection>> output1 = Join.rightOuterJoin("Join1", leftCollection, rightCollection,-1L); + PCollection>> output2 = Join.rightOuterJoin("Join2", leftCollection, rightCollection,-1L); PAssert.that(output1).containsInAnyOrder(expectedResult); PAssert.that(output2).containsInAnyOrder(expectedResult); From 27810d1b6e10e5482b59cf5941194131c9b79942 Mon Sep 17 00:00:00 2001 From: Daniel Mescheder Date: Mon, 11 Feb 2019 22:50:44 +0100 Subject: [PATCH 3/3] actually fix stylecheck issues --- .../beam/sdk/extensions/joinlibrary/Join.java | 820 +++++++++--------- .../extensions/joinlibrary/InnerJoinTest.java | 16 +- .../joinlibrary/OuterFullJoinTest.java | 4 +- .../joinlibrary/OuterLeftJoinTest.java | 8 +- .../joinlibrary/OuterRightJoinTest.java | 9 +- 5 files changed, 433 insertions(+), 424 deletions(-) diff --git a/sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java b/sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java index 70ab00fa1547..73386b200042 100644 --- a/sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java +++ b/sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.extensions.joinlibrary; +import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkNotNull; + import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; @@ -28,441 +30,439 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TupleTag; -import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkNotNull; - /** * Utility class with different versions of joins. All methods join two collections of key/value * pairs (KV). */ public class Join { - /** - * PTransform representing an inner join of two collections of KV elements. - * - * @param Type of the key for both collections - * @param Type of the values for the left collection. - * @param Type of the values for the right collection. - */ - public static class InnerJoin extends PTransform>, PCollection>>> { - - private transient PCollection> rightCollection; - - private InnerJoin(PCollection> rightCollection) { - this.rightCollection = rightCollection; - } - - public static InnerJoin with(PCollection> rightCollection) { - return new InnerJoin<>(rightCollection); - } - - @Override - public PCollection>> expand(PCollection> leftCollection) { - checkNotNull(leftCollection); - checkNotNull(rightCollection); - - final TupleTag v1Tuple = new TupleTag<>(); - final TupleTag v2Tuple = new TupleTag<>(); - - PCollection> coGbkResultCollection = - KeyedPCollectionTuple.of(v1Tuple, leftCollection) - .and(v2Tuple, rightCollection) - .apply("CoGBK", CoGroupByKey.create()); - - return coGbkResultCollection - .apply( - "Join", - ParDo.of( - new DoFn, KV>>() { - @ProcessElement - public void processElement(ProcessContext c) { - KV e = c.element(); - - Iterable leftValuesIterable = e.getValue().getAll(v1Tuple); - Iterable rightValuesIterable = e.getValue().getAll(v2Tuple); - - for (V1 leftValue : leftValuesIterable) { - for (V2 rightValue : rightValuesIterable) { - c.output(KV.of(e.getKey(), KV.of(leftValue, rightValue))); - } - } - } - })) - .setCoder( - KvCoder.of( - ((KvCoder) leftCollection.getCoder()).getKeyCoder(), - KvCoder.of( - ((KvCoder) leftCollection.getCoder()).getValueCoder(), - ((KvCoder) rightCollection.getCoder()).getValueCoder()))); - } + /** + * PTransform representing an inner join of two collections of KV elements. + * + * @param Type of the key for both collections + * @param Type of the values for the left collection. + * @param Type of the values for the right collection. + */ + public static class InnerJoin + extends PTransform>, PCollection>>> { - } - - /** - * PTransform representing a left outer join of two collections of KV elements. - * - * @param Type of the key for both collections - * @param Type of the values for the left collection. - * @param Type of the values for the right collection. - */ - public static class LeftOuterJoin extends PTransform>, PCollection>>> { - - private transient PCollection> rightCollection; - private V2 nullValue; - - private LeftOuterJoin(PCollection> rightCollection, V2 nullValue) { - this.rightCollection = rightCollection; - this.nullValue = nullValue; - } - - public static LeftOuterJoin with(PCollection> rightCollection, V2 nullValue) { - return new LeftOuterJoin<>(rightCollection, nullValue); - } - - @Override - public PCollection>> expand(PCollection> leftCollection) { - checkNotNull(leftCollection); - checkNotNull(rightCollection); - checkNotNull(nullValue); - final TupleTag v1Tuple = new TupleTag<>(); - final TupleTag v2Tuple = new TupleTag<>(); - - PCollection> coGbkResultCollection = - KeyedPCollectionTuple.of(v1Tuple, leftCollection) - .and(v2Tuple, rightCollection) - .apply("CoGBK", CoGroupByKey.create()); - - return coGbkResultCollection - .apply( - "Join", - ParDo.of( - new DoFn, KV>>() { - @ProcessElement - public void processElement(ProcessContext c) { - KV e = c.element(); - - Iterable leftValuesIterable = e.getValue().getAll(v1Tuple); - Iterable rightValuesIterable = e.getValue().getAll(v2Tuple); - - for (V1 leftValue : leftValuesIterable) { - if (rightValuesIterable.iterator().hasNext()) { - for (V2 rightValue : rightValuesIterable) { - c.output(KV.of(e.getKey(), KV.of(leftValue, rightValue))); - } - } else { - c.output(KV.of(e.getKey(), KV.of(leftValue, nullValue))); - } - } - } - })) - .setCoder( - KvCoder.of( - ((KvCoder) leftCollection.getCoder()).getKeyCoder(), - KvCoder.of( - ((KvCoder) leftCollection.getCoder()).getValueCoder(), - ((KvCoder) rightCollection.getCoder()).getValueCoder()))); - } + private transient PCollection> rightCollection; + private InnerJoin(PCollection> rightCollection) { + this.rightCollection = rightCollection; } - /** - * PTransform representing a right outer join of two collections of KV elements. - * - * @param Type of the key for both collections - * @param Type of the values for the left collection. - * @param Type of the values for the right collection. - */ - public static class RightOuterJoin extends PTransform>, PCollection>>> { - - private transient PCollection> rightCollection; - private V1 nullValue; - - private RightOuterJoin(PCollection> rightCollection, V1 nullValue) { - this.rightCollection = rightCollection; - this.nullValue = nullValue; - } - - public static RightOuterJoin with(PCollection> rightCollection, V1 nullValue) { - return new RightOuterJoin<>(rightCollection, nullValue); - } - - @Override - public PCollection>> expand(PCollection> leftCollection) { - checkNotNull(leftCollection); - checkNotNull(rightCollection); - checkNotNull(nullValue); - - final TupleTag v1Tuple = new TupleTag<>(); - final TupleTag v2Tuple = new TupleTag<>(); - - PCollection> coGbkResultCollection = - KeyedPCollectionTuple.of(v1Tuple, leftCollection) - .and(v2Tuple, rightCollection) - .apply("CoGBK", CoGroupByKey.create()); - - return coGbkResultCollection - .apply( - "Join", - ParDo.of( - new DoFn, KV>>() { - @ProcessElement - public void processElement(ProcessContext c) { - KV e = c.element(); - - Iterable leftValuesIterable = e.getValue().getAll(v1Tuple); - Iterable rightValuesIterable = e.getValue().getAll(v2Tuple); - - for (V2 rightValue : rightValuesIterable) { - if (leftValuesIterable.iterator().hasNext()) { - for (V1 leftValue : leftValuesIterable) { - c.output(KV.of(e.getKey(), KV.of(leftValue, rightValue))); - } - } else { - c.output(KV.of(e.getKey(), KV.of(nullValue, rightValue))); - } - } - } - })) - .setCoder( - KvCoder.of( - ((KvCoder) leftCollection.getCoder()).getKeyCoder(), - KvCoder.of( - ((KvCoder) leftCollection.getCoder()).getValueCoder(), - ((KvCoder) rightCollection.getCoder()).getValueCoder()))); - } - + public static InnerJoin with(PCollection> rightCollection) { + return new InnerJoin<>(rightCollection); } - /** - * PTransform representing a full outer join of two collections of KV elements. - * - * @param Type of the key for both collections - * @param Type of the values for the left collection. - * @param Type of the values for the right collection. - */ - public static class FullOuterJoin extends PTransform>, PCollection>>> { - - private transient PCollection> rightCollection; - private V1 leftNullValue; - private V2 rightNullValue; - - private FullOuterJoin(PCollection> rightCollection, V1 leftNullValue, V2 rightNullValue) { - this.rightCollection = rightCollection; - this.leftNullValue = leftNullValue; - this.rightNullValue = rightNullValue; - } - - public static FullOuterJoin with(PCollection> rightCollection, V1 leftNullValue, V2 rightNullValue) { - return new FullOuterJoin<>(rightCollection, leftNullValue, rightNullValue); - } - - @Override - public PCollection>> expand(PCollection> leftCollection) { - checkNotNull(leftCollection); - checkNotNull(rightCollection); - checkNotNull(leftNullValue); - checkNotNull(rightNullValue); - - - final TupleTag v1Tuple = new TupleTag<>(); - final TupleTag v2Tuple = new TupleTag<>(); - - PCollection> coGbkResultCollection = - KeyedPCollectionTuple.of(v1Tuple, leftCollection) - .and(v2Tuple, rightCollection) - .apply("CoGBK", CoGroupByKey.create()); - - return coGbkResultCollection - .apply( - "Join", - ParDo.of( - new DoFn, KV>>() { - @ProcessElement - public void processElement(ProcessContext c) { - KV e = c.element(); - - Iterable leftValuesIterable = e.getValue().getAll(v1Tuple); - Iterable rightValuesIterable = e.getValue().getAll(v2Tuple); - if (leftValuesIterable.iterator().hasNext() - && rightValuesIterable.iterator().hasNext()) { - for (V2 rightValue : rightValuesIterable) { - for (V1 leftValue : leftValuesIterable) { - c.output(KV.of(e.getKey(), KV.of(leftValue, rightValue))); - } - } - } else if (leftValuesIterable.iterator().hasNext() - && !rightValuesIterable.iterator().hasNext()) { - for (V1 leftValue : leftValuesIterable) { - c.output(KV.of(e.getKey(), KV.of(leftValue, rightNullValue))); - } - } else if (!leftValuesIterable.iterator().hasNext() - && rightValuesIterable.iterator().hasNext()) { - for (V2 rightValue : rightValuesIterable) { - c.output(KV.of(e.getKey(), KV.of(leftNullValue, rightValue))); - } - } - } - })) - .setCoder( - KvCoder.of( - ((KvCoder) leftCollection.getCoder()).getKeyCoder(), - KvCoder.of( - ((KvCoder) leftCollection.getCoder()).getValueCoder(), - ((KvCoder) rightCollection.getCoder()).getValueCoder()))); - } - + @Override + public PCollection>> expand(PCollection> leftCollection) { + checkNotNull(leftCollection); + checkNotNull(rightCollection); + + final TupleTag v1Tuple = new TupleTag<>(); + final TupleTag v2Tuple = new TupleTag<>(); + + PCollection> coGbkResultCollection = + KeyedPCollectionTuple.of(v1Tuple, leftCollection) + .and(v2Tuple, rightCollection) + .apply("CoGBK", CoGroupByKey.create()); + + return coGbkResultCollection + .apply( + "Join", + ParDo.of( + new DoFn, KV>>() { + @ProcessElement + public void processElement(ProcessContext c) { + KV e = c.element(); + + Iterable leftValuesIterable = e.getValue().getAll(v1Tuple); + Iterable rightValuesIterable = e.getValue().getAll(v2Tuple); + + for (V1 leftValue : leftValuesIterable) { + for (V2 rightValue : rightValuesIterable) { + c.output(KV.of(e.getKey(), KV.of(leftValue, rightValue))); + } + } + } + })) + .setCoder( + KvCoder.of( + ((KvCoder) leftCollection.getCoder()).getKeyCoder(), + KvCoder.of( + ((KvCoder) leftCollection.getCoder()).getValueCoder(), + ((KvCoder) rightCollection.getCoder()).getValueCoder()))); } - - - /** - * Inner join of two collections of KV elements. - * - * @param leftCollection Left side collection to join. - * @param rightCollection Right side collection to join. - * @param Type of the key for both collections - * @param Type of the values for the left collection. - * @param Type of the values for the right collection. - * @return A joined collection of KV where Key is the key and value is a KV where Key is of type - * V1 and Value is type V2. - */ - public static PCollection>> innerJoin( - final PCollection> leftCollection, final PCollection> rightCollection) { - return innerJoin("InnerJoin", leftCollection, rightCollection); + } + + /** + * PTransform representing a left outer join of two collections of KV elements. + * + * @param Type of the key for both collections + * @param Type of the values for the left collection. + * @param Type of the values for the right collection. + */ + public static class LeftOuterJoin + extends PTransform>, PCollection>>> { + + private transient PCollection> rightCollection; + private V2 nullValue; + + private LeftOuterJoin(PCollection> rightCollection, V2 nullValue) { + this.rightCollection = rightCollection; + this.nullValue = nullValue; } - /** - * Inner join of two collections of KV elements. - * - * @param name Name of the PTransform. - * @param leftCollection Left side collection to join. - * @param rightCollection Right side collection to join. - * @param Type of the key for both collections - * @param Type of the values for the left collection. - * @param Type of the values for the right collection. - * @return A joined collection of KV where Key is the key and value is a KV where Key is of type - * V1 and Value is type V2. - */ - public static PCollection>> innerJoin( - final String name, - final PCollection> leftCollection, final PCollection> rightCollection) { - return leftCollection.apply(name, InnerJoin.with(rightCollection)); + public static LeftOuterJoin with( + PCollection> rightCollection, V2 nullValue) { + return new LeftOuterJoin<>(rightCollection, nullValue); } - - /** - * Left Outer Join of two collections of KV elements. - * - * @param name Name of the PTransform. - * @param leftCollection Left side collection to join. - * @param rightCollection Right side collection to join. - * @param nullValue Value to use as null value when right side do not match left side. - * @param Type of the key for both collections - * @param Type of the values for the left collection. - * @param Type of the values for the right collection. - * @return A joined collection of KV where Key is the key and value is a KV where Key is of type - * V1 and Value is type V2. Values that should be null or empty is replaced with nullValue. - */ - public static PCollection>> leftOuterJoin( - final String name, - final PCollection> leftCollection, - final PCollection> rightCollection, - final V2 nullValue) { - return leftCollection.apply(name, LeftOuterJoin.with(rightCollection, nullValue)); + @Override + public PCollection>> expand(PCollection> leftCollection) { + checkNotNull(leftCollection); + checkNotNull(rightCollection); + checkNotNull(nullValue); + final TupleTag v1Tuple = new TupleTag<>(); + final TupleTag v2Tuple = new TupleTag<>(); + + PCollection> coGbkResultCollection = + KeyedPCollectionTuple.of(v1Tuple, leftCollection) + .and(v2Tuple, rightCollection) + .apply("CoGBK", CoGroupByKey.create()); + + return coGbkResultCollection + .apply( + "Join", + ParDo.of( + new DoFn, KV>>() { + @ProcessElement + public void processElement(ProcessContext c) { + KV e = c.element(); + + Iterable leftValuesIterable = e.getValue().getAll(v1Tuple); + Iterable rightValuesIterable = e.getValue().getAll(v2Tuple); + + for (V1 leftValue : leftValuesIterable) { + if (rightValuesIterable.iterator().hasNext()) { + for (V2 rightValue : rightValuesIterable) { + c.output(KV.of(e.getKey(), KV.of(leftValue, rightValue))); + } + } else { + c.output(KV.of(e.getKey(), KV.of(leftValue, nullValue))); + } + } + } + })) + .setCoder( + KvCoder.of( + ((KvCoder) leftCollection.getCoder()).getKeyCoder(), + KvCoder.of( + ((KvCoder) leftCollection.getCoder()).getValueCoder(), + ((KvCoder) rightCollection.getCoder()).getValueCoder()))); } - - public static PCollection>> leftOuterJoin( - final PCollection> leftCollection, - final PCollection> rightCollection, - final V2 nullValue) { - return leftOuterJoin("LeftOuterJoin", leftCollection, rightCollection, nullValue); + } + + /** + * PTransform representing a right outer join of two collections of KV elements. + * + * @param Type of the key for both collections + * @param Type of the values for the left collection. + * @param Type of the values for the right collection. + */ + public static class RightOuterJoin + extends PTransform>, PCollection>>> { + + private transient PCollection> rightCollection; + private V1 nullValue; + + private RightOuterJoin(PCollection> rightCollection, V1 nullValue) { + this.rightCollection = rightCollection; + this.nullValue = nullValue; } - /** - * Right Outer Join of two collections of KV elements. - * - * @param name Name of the PTransform. - * @param leftCollection Left side collection to join. - * @param rightCollection Right side collection to join. - * @param nullValue Value to use as null value when left side do not match right side. - * @param Type of the key for both collections - * @param Type of the values for the left collection. - * @param Type of the values for the right collection. - * @return A joined collection of KV where Key is the key and value is a KV where Key is of type - * V1 and Value is type V2. Values that should be null or empty is replaced with nullValue. - */ - public static PCollection>> rightOuterJoin( - final String name, - final PCollection> leftCollection, - final PCollection> rightCollection, - final V1 nullValue) { - return leftCollection.apply(name, RightOuterJoin.with(rightCollection, nullValue)); - + public static RightOuterJoin with( + PCollection> rightCollection, V1 nullValue) { + return new RightOuterJoin<>(rightCollection, nullValue); } - /** - * Right Outer Join of two collections of KV elements. - * - * @param leftCollection Left side collection to join. - * @param rightCollection Right side collection to join. - * @param nullValue Value to use as null value when left side do not match right side. - * @param Type of the key for both collections - * @param Type of the values for the left collection. - * @param Type of the values for the right collection. - * @return A joined collection of KV where Key is the key and value is a KV where Key is of type - * V1 and Value is type V2. Values that should be null or empty is replaced with nullValue. - */ - public static PCollection>> rightOuterJoin( - final PCollection> leftCollection, - final PCollection> rightCollection, - final V1 nullValue) { - return rightOuterJoin("RightOuterJoin", leftCollection, rightCollection, nullValue); - + @Override + public PCollection>> expand(PCollection> leftCollection) { + checkNotNull(leftCollection); + checkNotNull(rightCollection); + checkNotNull(nullValue); + + final TupleTag v1Tuple = new TupleTag<>(); + final TupleTag v2Tuple = new TupleTag<>(); + + PCollection> coGbkResultCollection = + KeyedPCollectionTuple.of(v1Tuple, leftCollection) + .and(v2Tuple, rightCollection) + .apply("CoGBK", CoGroupByKey.create()); + + return coGbkResultCollection + .apply( + "Join", + ParDo.of( + new DoFn, KV>>() { + @ProcessElement + public void processElement(ProcessContext c) { + KV e = c.element(); + + Iterable leftValuesIterable = e.getValue().getAll(v1Tuple); + Iterable rightValuesIterable = e.getValue().getAll(v2Tuple); + + for (V2 rightValue : rightValuesIterable) { + if (leftValuesIterable.iterator().hasNext()) { + for (V1 leftValue : leftValuesIterable) { + c.output(KV.of(e.getKey(), KV.of(leftValue, rightValue))); + } + } else { + c.output(KV.of(e.getKey(), KV.of(nullValue, rightValue))); + } + } + } + })) + .setCoder( + KvCoder.of( + ((KvCoder) leftCollection.getCoder()).getKeyCoder(), + KvCoder.of( + ((KvCoder) leftCollection.getCoder()).getValueCoder(), + ((KvCoder) rightCollection.getCoder()).getValueCoder()))); } - - /** - * Full Outer Join of two collections of KV elements. - * - * @param name Name of the PTransform. - * @param leftCollection Left side collection to join. - * @param rightCollection Right side collection to join. - * @param leftNullValue Value to use as null value when left side do not match right side. - * @param rightNullValue Value to use as null value when right side do not match right side. - * @param Type of the key for both collections - * @param Type of the values for the left collection. - * @param Type of the values for the right collection. - * @return A joined collection of KV where Key is the key and value is a KV where Key is of type - * V1 and Value is type V2. Values that should be null or empty is replaced with - * leftNullValue/rightNullValue. - */ - public static PCollection>> fullOuterJoin( - final String name, - final PCollection> leftCollection, - final PCollection> rightCollection, - final V1 leftNullValue, - final V2 rightNullValue) { - return leftCollection.apply(name, FullOuterJoin.with(rightCollection, leftNullValue, rightNullValue)); - + } + + /** + * PTransform representing a full outer join of two collections of KV elements. + * + * @param Type of the key for both collections + * @param Type of the values for the left collection. + * @param Type of the values for the right collection. + */ + public static class FullOuterJoin + extends PTransform>, PCollection>>> { + + private transient PCollection> rightCollection; + private V1 leftNullValue; + private V2 rightNullValue; + + private FullOuterJoin( + PCollection> rightCollection, V1 leftNullValue, V2 rightNullValue) { + this.rightCollection = rightCollection; + this.leftNullValue = leftNullValue; + this.rightNullValue = rightNullValue; } - /** - * Full Outer Join of two collections of KV elements. - * - * @param leftCollection Left side collection to join. - * @param rightCollection Right side collection to join. - * @param leftNullValue Value to use as null value when left side do not match right side. - * @param rightNullValue Value to use as null value when right side do not match right side. - * @param Type of the key for both collections - * @param Type of the values for the left collection. - * @param Type of the values for the right collection. - * @return A joined collection of KV where Key is the key and value is a KV where Key is of type - * V1 and Value is type V2. Values that should be null or empty is replaced with - * leftNullValue/rightNullValue. - */ - public static PCollection>> fullOuterJoin( - final PCollection> leftCollection, - final PCollection> rightCollection, - final V1 leftNullValue, - final V2 rightNullValue) { - return fullOuterJoin("FullOuterJoin", leftCollection, rightCollection, leftNullValue, rightNullValue); + public static FullOuterJoin with( + PCollection> rightCollection, V1 leftNullValue, V2 rightNullValue) { + return new FullOuterJoin<>(rightCollection, leftNullValue, rightNullValue); + } + @Override + public PCollection>> expand(PCollection> leftCollection) { + checkNotNull(leftCollection); + checkNotNull(rightCollection); + checkNotNull(leftNullValue); + checkNotNull(rightNullValue); + + final TupleTag v1Tuple = new TupleTag<>(); + final TupleTag v2Tuple = new TupleTag<>(); + + PCollection> coGbkResultCollection = + KeyedPCollectionTuple.of(v1Tuple, leftCollection) + .and(v2Tuple, rightCollection) + .apply("CoGBK", CoGroupByKey.create()); + + return coGbkResultCollection + .apply( + "Join", + ParDo.of( + new DoFn, KV>>() { + @ProcessElement + public void processElement(ProcessContext c) { + KV e = c.element(); + + Iterable leftValuesIterable = e.getValue().getAll(v1Tuple); + Iterable rightValuesIterable = e.getValue().getAll(v2Tuple); + if (leftValuesIterable.iterator().hasNext() + && rightValuesIterable.iterator().hasNext()) { + for (V2 rightValue : rightValuesIterable) { + for (V1 leftValue : leftValuesIterable) { + c.output(KV.of(e.getKey(), KV.of(leftValue, rightValue))); + } + } + } else if (leftValuesIterable.iterator().hasNext() + && !rightValuesIterable.iterator().hasNext()) { + for (V1 leftValue : leftValuesIterable) { + c.output(KV.of(e.getKey(), KV.of(leftValue, rightNullValue))); + } + } else if (!leftValuesIterable.iterator().hasNext() + && rightValuesIterable.iterator().hasNext()) { + for (V2 rightValue : rightValuesIterable) { + c.output(KV.of(e.getKey(), KV.of(leftNullValue, rightValue))); + } + } + } + })) + .setCoder( + KvCoder.of( + ((KvCoder) leftCollection.getCoder()).getKeyCoder(), + KvCoder.of( + ((KvCoder) leftCollection.getCoder()).getValueCoder(), + ((KvCoder) rightCollection.getCoder()).getValueCoder()))); } + } + + /** + * Inner join of two collections of KV elements. + * + * @param leftCollection Left side collection to join. + * @param rightCollection Right side collection to join. + * @param Type of the key for both collections + * @param Type of the values for the left collection. + * @param Type of the values for the right collection. + * @return A joined collection of KV where Key is the key and value is a KV where Key is of type + * V1 and Value is type V2. + */ + public static PCollection>> innerJoin( + final PCollection> leftCollection, final PCollection> rightCollection) { + return innerJoin("InnerJoin", leftCollection, rightCollection); + } + + /** + * Inner join of two collections of KV elements. + * + * @param name Name of the PTransform. + * @param leftCollection Left side collection to join. + * @param rightCollection Right side collection to join. + * @param Type of the key for both collections + * @param Type of the values for the left collection. + * @param Type of the values for the right collection. + * @return A joined collection of KV where Key is the key and value is a KV where Key is of type + * V1 and Value is type V2. + */ + public static PCollection>> innerJoin( + final String name, + final PCollection> leftCollection, + final PCollection> rightCollection) { + return leftCollection.apply(name, InnerJoin.with(rightCollection)); + } + + /** + * Left Outer Join of two collections of KV elements. + * + * @param name Name of the PTransform. + * @param leftCollection Left side collection to join. + * @param rightCollection Right side collection to join. + * @param nullValue Value to use as null value when right side do not match left side. + * @param Type of the key for both collections + * @param Type of the values for the left collection. + * @param Type of the values for the right collection. + * @return A joined collection of KV where Key is the key and value is a KV where Key is of type + * V1 and Value is type V2. Values that should be null or empty is replaced with nullValue. + */ + public static PCollection>> leftOuterJoin( + final String name, + final PCollection> leftCollection, + final PCollection> rightCollection, + final V2 nullValue) { + return leftCollection.apply(name, LeftOuterJoin.with(rightCollection, nullValue)); + } + + public static PCollection>> leftOuterJoin( + final PCollection> leftCollection, + final PCollection> rightCollection, + final V2 nullValue) { + return leftOuterJoin("LeftOuterJoin", leftCollection, rightCollection, nullValue); + } + + /** + * Right Outer Join of two collections of KV elements. + * + * @param name Name of the PTransform. + * @param leftCollection Left side collection to join. + * @param rightCollection Right side collection to join. + * @param nullValue Value to use as null value when left side do not match right side. + * @param Type of the key for both collections + * @param Type of the values for the left collection. + * @param Type of the values for the right collection. + * @return A joined collection of KV where Key is the key and value is a KV where Key is of type + * V1 and Value is type V2. Values that should be null or empty is replaced with nullValue. + */ + public static PCollection>> rightOuterJoin( + final String name, + final PCollection> leftCollection, + final PCollection> rightCollection, + final V1 nullValue) { + return leftCollection.apply(name, RightOuterJoin.with(rightCollection, nullValue)); + } + + /** + * Right Outer Join of two collections of KV elements. + * + * @param leftCollection Left side collection to join. + * @param rightCollection Right side collection to join. + * @param nullValue Value to use as null value when left side do not match right side. + * @param Type of the key for both collections + * @param Type of the values for the left collection. + * @param Type of the values for the right collection. + * @return A joined collection of KV where Key is the key and value is a KV where Key is of type + * V1 and Value is type V2. Values that should be null or empty is replaced with nullValue. + */ + public static PCollection>> rightOuterJoin( + final PCollection> leftCollection, + final PCollection> rightCollection, + final V1 nullValue) { + return rightOuterJoin("RightOuterJoin", leftCollection, rightCollection, nullValue); + } + + /** + * Full Outer Join of two collections of KV elements. + * + * @param name Name of the PTransform. + * @param leftCollection Left side collection to join. + * @param rightCollection Right side collection to join. + * @param leftNullValue Value to use as null value when left side do not match right side. + * @param rightNullValue Value to use as null value when right side do not match right side. + * @param Type of the key for both collections + * @param Type of the values for the left collection. + * @param Type of the values for the right collection. + * @return A joined collection of KV where Key is the key and value is a KV where Key is of type + * V1 and Value is type V2. Values that should be null or empty is replaced with + * leftNullValue/rightNullValue. + */ + public static PCollection>> fullOuterJoin( + final String name, + final PCollection> leftCollection, + final PCollection> rightCollection, + final V1 leftNullValue, + final V2 rightNullValue) { + return leftCollection.apply( + name, FullOuterJoin.with(rightCollection, leftNullValue, rightNullValue)); + } + + /** + * Full Outer Join of two collections of KV elements. + * + * @param leftCollection Left side collection to join. + * @param rightCollection Right side collection to join. + * @param leftNullValue Value to use as null value when left side do not match right side. + * @param rightNullValue Value to use as null value when right side do not match right side. + * @param Type of the key for both collections + * @param Type of the values for the left collection. + * @param Type of the values for the right collection. + * @return A joined collection of KV where Key is the key and value is a KV where Key is of type + * V1 and Value is type V2. Values that should be null or empty is replaced with + * leftNullValue/rightNullValue. + */ + public static PCollection>> fullOuterJoin( + final PCollection> leftCollection, + final PCollection> rightCollection, + final V1 leftNullValue, + final V2 rightNullValue) { + return fullOuterJoin( + "FullOuterJoin", leftCollection, rightCollection, leftNullValue, rightNullValue); + } } diff --git a/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/InnerJoinTest.java b/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/InnerJoinTest.java index b13748780ea7..5da0d1f8536e 100644 --- a/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/InnerJoinTest.java +++ b/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/InnerJoinTest.java @@ -76,7 +76,8 @@ public void testJoinOneToManyMapping() { rightListOfKv.add(KV.of("Key2", "bar")); rightListOfKv.add(KV.of("Key2", "gazonk")); - PCollection> rightCollection = p.apply("CreateRight", Create.of(rightListOfKv)); + PCollection> rightCollection = + p.apply("CreateRight", Create.of(rightListOfKv)); PCollection>> output = Join.innerJoin(leftCollection, rightCollection); @@ -95,7 +96,8 @@ public void testJoinManyToOneMapping() { PCollection> leftCollection = p.apply("CreateLeft", Create.of(leftListOfKv)); rightListOfKv.add(KV.of("Key2", "bar")); - PCollection> rightCollection = p.apply("CreateRight", Create.of(rightListOfKv)); + PCollection> rightCollection = + p.apply("CreateRight", Create.of(rightListOfKv)); PCollection>> output = Join.innerJoin(leftCollection, rightCollection); @@ -114,7 +116,7 @@ public void testJoinNoneToNoneMapping() { rightListOfKv.add(KV.of("Key3", "bar")); PCollection> rightCollection = - p.apply("CreateRight", Create.of(rightListOfKv)); + p.apply("CreateRight", Create.of(rightListOfKv)); PCollection>> output = Join.innerJoin(leftCollection, rightCollection); @@ -130,12 +132,14 @@ public void testMultipleJoinsInSamePipeline() { rightListOfKv.add(KV.of("Key2", "bar")); PCollection> rightCollection = - p.apply("CreateRight", Create.of(rightListOfKv)); + p.apply("CreateRight", Create.of(rightListOfKv)); expectedResult.add(KV.of("Key2", KV.of(4L, "bar"))); - PCollection>> output1 = Join.innerJoin("Join1", leftCollection, rightCollection); - PCollection>> output2 = Join.innerJoin("Join2", leftCollection, rightCollection); + PCollection>> output1 = + Join.innerJoin("Join1", leftCollection, rightCollection); + PCollection>> output2 = + Join.innerJoin("Join2", leftCollection, rightCollection); PAssert.that(output1).containsInAnyOrder(expectedResult); PAssert.that(output2).containsInAnyOrder(expectedResult); diff --git a/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterFullJoinTest.java b/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterFullJoinTest.java index 6a36ffa61d6f..472076627302 100644 --- a/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterFullJoinTest.java +++ b/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterFullJoinTest.java @@ -134,12 +134,12 @@ public void testMultipleJoinsInSamePipeline() { rightListOfKv.add(KV.of("Key2", "bar")); PCollection> rightCollection = - p.apply("CreateRight", Create.of(rightListOfKv)); + p.apply("CreateRight", Create.of(rightListOfKv)); expectedResult.add(KV.of("Key2", KV.of(4L, "bar"))); PCollection>> output1 = - Join.fullOuterJoin("Join1", leftCollection, rightCollection,-1L, ""); + Join.fullOuterJoin("Join1", leftCollection, rightCollection, -1L, ""); PCollection>> output2 = Join.fullOuterJoin("Join2", leftCollection, rightCollection, -1L, ""); PAssert.that(output1).containsInAnyOrder(expectedResult); diff --git a/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterLeftJoinTest.java b/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterLeftJoinTest.java index edc5fdf3591f..d7592f270e21 100644 --- a/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterLeftJoinTest.java +++ b/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterLeftJoinTest.java @@ -133,12 +133,14 @@ public void testMultipleJoinsInSamePipeline() { rightListOfKv.add(KV.of("Key2", "bar")); PCollection> rightCollection = - p.apply("CreateRight", Create.of(rightListOfKv)); + p.apply("CreateRight", Create.of(rightListOfKv)); expectedResult.add(KV.of("Key2", KV.of(4L, "bar"))); - PCollection>> output1 = Join.leftOuterJoin("Join1", leftCollection, rightCollection,""); - PCollection>> output2 = Join.leftOuterJoin("Join2", leftCollection, rightCollection,""); + PCollection>> output1 = + Join.leftOuterJoin("Join1", leftCollection, rightCollection, ""); + PCollection>> output2 = + Join.leftOuterJoin("Join2", leftCollection, rightCollection, ""); PAssert.that(output1).containsInAnyOrder(expectedResult); PAssert.that(output2).containsInAnyOrder(expectedResult); diff --git a/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterRightJoinTest.java b/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterRightJoinTest.java index d2c0de920407..e84d40aa1d1e 100644 --- a/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterRightJoinTest.java +++ b/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterRightJoinTest.java @@ -133,12 +133,15 @@ public void testMultipleJoinsInSamePipeline() { rightListOfKv.add(KV.of("Key2", "bar")); PCollection> rightCollection = - p.apply("CreateRight", Create.of(rightListOfKv)); + p.apply("CreateRight", Create.of(rightListOfKv)); expectedResult.add(KV.of("Key2", KV.of(4L, "bar"))); - PCollection>> output1 = Join.rightOuterJoin("Join1", leftCollection, rightCollection,-1L); - PCollection>> output2 = Join.rightOuterJoin("Join2", leftCollection, rightCollection,-1L); + PCollection>> output1 = + Join.rightOuterJoin("Join1", leftCollection, rightCollection, -1L); + PCollection>> output2 = + Join.rightOuterJoin("Join2", leftCollection, rightCollection, -1L); + PAssert.that(output1).containsInAnyOrder(expectedResult); PAssert.that(output2).containsInAnyOrder(expectedResult);