From a2118e18846811a7b2040fa3f58d4af29610f291 Mon Sep 17 00:00:00 2001 From: Leen Toelen Date: Mon, 13 Aug 2018 17:07:15 +0200 Subject: [PATCH 1/2] Make PTransform names stable in Join/CoGroupByKey --- .../beam/sdk/transforms/join/CoGroupByKey.java | 5 +++-- .../apache/beam/sdk/extensions/joinlibrary/Join.java | 12 ++++++++---- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGroupByKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGroupByKey.java index 29ae180224a64..9053ad0b1723d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGroupByKey.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGroupByKey.java @@ -111,10 +111,11 @@ public PCollection> expand(KeyedPCollectionTuple input) { unionTables = unionTables.and(unionTable); } - PCollection> flattenedTable = unionTables.apply(Flatten.pCollections()); + PCollection> flattenedTable = + unionTables.apply("Flatten", Flatten.pCollections()); PCollection>> groupedTable = - flattenedTable.apply(GroupByKey.create()); + flattenedTable.apply("GBK", GroupByKey.create()); CoGbkResultSchema tupleTags = input.getCoGbkResultSchema(); PCollection> result = diff --git a/sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java b/sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java index 68598d68138fd..9c370406e964c 100644 --- a/sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java +++ b/sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java @@ -57,10 +57,11 @@ public static PCollection>> innerJoin( PCollection> coGbkResultCollection = KeyedPCollectionTuple.of(v1Tuple, leftCollection) .and(v2Tuple, rightCollection) - .apply(CoGroupByKey.create()); + .apply("GBK", CoGroupByKey.create()); return coGbkResultCollection .apply( + "Join", ParDo.of( new DoFn, KV>>() { @ProcessElement @@ -111,10 +112,11 @@ public static PCollection>> leftOuterJoin( PCollection> coGbkResultCollection = KeyedPCollectionTuple.of(v1Tuple, leftCollection) .and(v2Tuple, rightCollection) - .apply(CoGroupByKey.create()); + .apply("GBK", CoGroupByKey.create()); return coGbkResultCollection .apply( + "Join", ParDo.of( new DoFn, KV>>() { @ProcessElement @@ -169,10 +171,11 @@ public static PCollection>> rightOuterJoin( PCollection> coGbkResultCollection = KeyedPCollectionTuple.of(v1Tuple, leftCollection) .and(v2Tuple, rightCollection) - .apply(CoGroupByKey.create()); + .apply("GBK", CoGroupByKey.create()); return coGbkResultCollection .apply( + "Join", ParDo.of( new DoFn, KV>>() { @ProcessElement @@ -231,10 +234,11 @@ public static PCollection>> fullOuterJoin( PCollection> coGbkResultCollection = KeyedPCollectionTuple.of(v1Tuple, leftCollection) .and(v2Tuple, rightCollection) - .apply(CoGroupByKey.create()); + .apply("GBK", CoGroupByKey.create()); return coGbkResultCollection .apply( + "Join", ParDo.of( new DoFn, KV>>() { @ProcessElement From eeb9ff084c405a33257e2a9b682e6b3f153e5a23 Mon Sep 17 00:00:00 2001 From: Lukasz Cwik Date: Mon, 13 Aug 2018 14:13:44 -0700 Subject: [PATCH 2/2] Rename "GBK" -> "CoGBK" --- .../org/apache/beam/sdk/extensions/joinlibrary/Join.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java b/sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java index 9c370406e964c..42d1ed67e4279 100644 --- a/sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java +++ b/sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java @@ -57,7 +57,7 @@ public static PCollection>> innerJoin( PCollection> coGbkResultCollection = KeyedPCollectionTuple.of(v1Tuple, leftCollection) .and(v2Tuple, rightCollection) - .apply("GBK", CoGroupByKey.create()); + .apply("CoGBK", CoGroupByKey.create()); return coGbkResultCollection .apply( @@ -112,7 +112,7 @@ public static PCollection>> leftOuterJoin( PCollection> coGbkResultCollection = KeyedPCollectionTuple.of(v1Tuple, leftCollection) .and(v2Tuple, rightCollection) - .apply("GBK", CoGroupByKey.create()); + .apply("CoGBK", CoGroupByKey.create()); return coGbkResultCollection .apply( @@ -171,7 +171,7 @@ public static PCollection>> rightOuterJoin( PCollection> coGbkResultCollection = KeyedPCollectionTuple.of(v1Tuple, leftCollection) .and(v2Tuple, rightCollection) - .apply("GBK", CoGroupByKey.create()); + .apply("CoGBK", CoGroupByKey.create()); return coGbkResultCollection .apply( @@ -234,7 +234,7 @@ public static PCollection>> fullOuterJoin( PCollection> coGbkResultCollection = KeyedPCollectionTuple.of(v1Tuple, leftCollection) .and(v2Tuple, rightCollection) - .apply("GBK", CoGroupByKey.create()); + .apply("CoGBK", CoGroupByKey.create()); return coGbkResultCollection .apply(