From e8c708ad196645ddf5f01694e89f8d1ca651d315 Mon Sep 17 00:00:00 2001 From: Ian Zhou Date: Thu, 18 Aug 2016 13:50:52 -0700 Subject: [PATCH 1/5] Fixed Combine display data --- .../apache/beam/sdk/transforms/Combine.java | 53 +++++++++++++++++-- 1 file changed, 49 insertions(+), 4 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java index 26f0f660f074..d432e15dc71e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java @@ -51,6 +51,7 @@ import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext; import org.apache.beam.sdk.transforms.CombineWithContext.RequiresContextInternal; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.display.DisplayData.Builder; import org.apache.beam.sdk.transforms.display.HasDisplayData; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; @@ -1815,7 +1816,14 @@ public PerKeyWithHotKeyFanout withHotKeyFanout( */ public PerKeyWithHotKeyFanout withHotKeyFanout(final int hotKeyFanout) { return new PerKeyWithHotKeyFanout<>(name, fn, fnDisplayData, - new SerializableFunction() { + new SimpleFunction() { + @Override + public void populateDisplayData(Builder builder) { + super.populateDisplayData(builder); + builder.addIfNotDefault(DisplayData.item("fanout", hotKeyFanout) + .withLabel("Key Fanout Size"), 0); + } + @Override public Integer apply(K unused) { return hotKeyFanout; @@ -1904,7 +1912,7 @@ private PCollection> applyHelper(PCollection( inputCoder.getValueCoder(), accumCoder); - // A CombineFn's mergeAccumulator can be applied in a tree-like fashon. + // A CombineFn's mergeAccumulator can be applied in a tree-like fashion. // Here we shard the key using an integer nonce, combine on that partial // set of values, then drop the nonce and do a final combine of the // aggregates. We do this by splitting the original CombineFn into two, @@ -1944,6 +1952,16 @@ public Coder getAccumulatorCoder( throws CannotProvideCoderException { return accumCoder; } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + builder.add(DisplayData.item("fanoutFn", hotKeyFanout.getClass()) + .withLabel("Fanout Function")); + if (hotKeyFanout instanceof HasDisplayData) { + ((HasDisplayData) hotKeyFanout).populateDisplayData(builder); + } + } }; postCombine = new KeyedCombineFn, AccumT, OutputT>() { @@ -1988,6 +2006,15 @@ public Coder getAccumulatorCoder(CoderRegistry registry, Coder keyCod throws CannotProvideCoderException { return accumCoder; } + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + builder.add(DisplayData.item("fanoutFn", hotKeyFanout.getClass()) + .withLabel("Fanout Function")); + if (hotKeyFanout instanceof HasDisplayData) { + ((HasDisplayData) hotKeyFanout).populateDisplayData(builder); + } + } }; } else { final KeyedCombineFnWithContext keyedFnWithContext = @@ -2028,6 +2055,15 @@ public Coder getAccumulatorCoder( throws CannotProvideCoderException { return accumCoder; } + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + builder.add(DisplayData.item("fanoutFn", hotKeyFanout.getClass()) + .withLabel("Fanout Function")); + if (hotKeyFanout instanceof HasDisplayData) { + ((HasDisplayData) hotKeyFanout).populateDisplayData(builder); + } + } }; postCombine = new KeyedCombineFnWithContext, AccumT, OutputT>() { @@ -2073,6 +2109,15 @@ public Coder getAccumulatorCoder(CoderRegistry registry, Coder keyCod throws CannotProvideCoderException { return accumCoder; } + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + builder.add(DisplayData.item("fanoutFn", hotKeyFanout.getClass()) + .withLabel("Fanout Function")); + if (hotKeyFanout instanceof HasDisplayData) { + ((HasDisplayData) hotKeyFanout).populateDisplayData(builder); + } + } }; } @@ -2117,7 +2162,7 @@ public void processElement(ProcessContext c) { .setCoder(KvCoder.of(KvCoder.of(inputCoder.getKeyCoder(), VarIntCoder.of()), inputCoder.getValueCoder())) .setWindowingStrategyInternal(preCombineStrategy) - .apply("PreCombineHot", Combine.perKey(hotPreCombine)) + .apply("PreCombineHot", Combine.perKey(hotPreCombine, fnDisplayData)) .apply("StripNonce", MapElements.via( new SimpleFunction, AccumT>, KV>>() { @@ -2147,7 +2192,7 @@ public KV> apply(KV element) { // Combine the union of the pre-processed hot and cold key results. return PCollectionList.of(precombinedHot).and(preprocessedCold) .apply(Flatten.>>pCollections()) - .apply("PostCombine", Combine.perKey(postCombine)); + .apply("PostCombine", Combine.perKey(postCombine, fnDisplayData)); } @Override From fcabbfc843be1d5eef7c34362ec697bfe5456daf Mon Sep 17 00:00:00 2001 From: Ian Zhou Date: Fri, 19 Aug 2016 11:26:38 -0700 Subject: [PATCH 2/5] Added DisplayData test --- .../beam/sdk/transforms/CombineTest.java | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java index 77a1d6b03e69..be061af51c3b 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java @@ -731,6 +731,25 @@ public void testCombinePerKeyPrimitiveDisplayData() { displayData, hasItem(hasDisplayItem("combineFn", combineFn.getClass()))); } + @Test + @Category(RunnableOnService.class) + public void testCombinePerKeyWithHotKeyFanoutPrimitiveDisplayData() { + int hotKeyFanout = 2; + DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(); + + CombineTest.UniqueInts combineFn = new CombineTest.UniqueInts(); + PTransform>, PCollection>>> combine = + Combine.>perKey(combineFn).withHotKeyFanout(hotKeyFanout); + + Set displayData = evaluator.displayDataForPrimitiveTransforms(combine, + KvCoder.of(VarIntCoder.of(), VarIntCoder.of())); + + assertThat("Combine.perKey.withHotKeyFanout should include the combineFn in its primitive " + + "transform", displayData, hasItem(hasDisplayItem("combineFn", combineFn.getClass()))); + assertThat("Combine.perKey.withHotKeyFanout(int) should include the fanout in its primitive " + + "transform", displayData, hasItem(hasDisplayItem("fanout", hotKeyFanout))); + } + //////////////////////////////////////////////////////////////////////////// // Test classes, for different kinds of combining fns. From 3fc6e45425e9bcfa095803add7555710a8cef2cc Mon Sep 17 00:00:00 2001 From: Scott Wegner Date: Fri, 19 Aug 2016 13:32:45 -0700 Subject: [PATCH 3/5] Delegate populateDipslayData to wrapped combineFn's --- .../apache/beam/sdk/transforms/Combine.java | 38 +++---------------- 1 file changed, 5 insertions(+), 33 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java index d432e15dc71e..8f8047fd729f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java @@ -51,7 +51,6 @@ import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext; import org.apache.beam.sdk.transforms.CombineWithContext.RequiresContextInternal; import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.transforms.display.DisplayData.Builder; import org.apache.beam.sdk.transforms.display.HasDisplayData; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; @@ -1816,14 +1815,7 @@ public PerKeyWithHotKeyFanout withHotKeyFanout( */ public PerKeyWithHotKeyFanout withHotKeyFanout(final int hotKeyFanout) { return new PerKeyWithHotKeyFanout<>(name, fn, fnDisplayData, - new SimpleFunction() { - @Override - public void populateDisplayData(Builder builder) { - super.populateDisplayData(builder); - builder.addIfNotDefault(DisplayData.item("fanout", hotKeyFanout) - .withLabel("Key Fanout Size"), 0); - } - + new SerializableFunction() { @Override public Integer apply(K unused) { return hotKeyFanout; @@ -1955,12 +1947,7 @@ public Coder getAccumulatorCoder( @Override public void populateDisplayData(DisplayData.Builder builder) { - super.populateDisplayData(builder); - builder.add(DisplayData.item("fanoutFn", hotKeyFanout.getClass()) - .withLabel("Fanout Function")); - if (hotKeyFanout instanceof HasDisplayData) { - ((HasDisplayData) hotKeyFanout).populateDisplayData(builder); - } + builder.include(keyedFn); } }; postCombine = @@ -2008,12 +1995,7 @@ public Coder getAccumulatorCoder(CoderRegistry registry, Coder keyCod } @Override public void populateDisplayData(DisplayData.Builder builder) { - super.populateDisplayData(builder); - builder.add(DisplayData.item("fanoutFn", hotKeyFanout.getClass()) - .withLabel("Fanout Function")); - if (hotKeyFanout instanceof HasDisplayData) { - ((HasDisplayData) hotKeyFanout).populateDisplayData(builder); - } + builder.include(keyedFn); } }; } else { @@ -2057,12 +2039,7 @@ public Coder getAccumulatorCoder( } @Override public void populateDisplayData(DisplayData.Builder builder) { - super.populateDisplayData(builder); - builder.add(DisplayData.item("fanoutFn", hotKeyFanout.getClass()) - .withLabel("Fanout Function")); - if (hotKeyFanout instanceof HasDisplayData) { - ((HasDisplayData) hotKeyFanout).populateDisplayData(builder); - } + builder.include(keyedFnWithContext); } }; postCombine = @@ -2111,12 +2088,7 @@ public Coder getAccumulatorCoder(CoderRegistry registry, Coder keyCod } @Override public void populateDisplayData(DisplayData.Builder builder) { - super.populateDisplayData(builder); - builder.add(DisplayData.item("fanoutFn", hotKeyFanout.getClass()) - .withLabel("Fanout Function")); - if (hotKeyFanout instanceof HasDisplayData) { - ((HasDisplayData) hotKeyFanout).populateDisplayData(builder); - } + builder.include(keyedFnWithContext); } }; } From 7ffdf23e5e0f01f6aa0a9e810b2cbd4182f8158f Mon Sep 17 00:00:00 2001 From: Ian Zhou Date: Fri, 19 Aug 2016 15:02:28 -0700 Subject: [PATCH 4/5] Minor changes --- .../apache/beam/sdk/transforms/Combine.java | 21 ++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java index 8f8047fd729f..49328e84d553 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java @@ -51,6 +51,7 @@ import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext; import org.apache.beam.sdk.transforms.CombineWithContext.RequiresContextInternal; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.display.DisplayData.Builder; import org.apache.beam.sdk.transforms.display.HasDisplayData; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; @@ -1815,7 +1816,14 @@ public PerKeyWithHotKeyFanout withHotKeyFanout( */ public PerKeyWithHotKeyFanout withHotKeyFanout(final int hotKeyFanout) { return new PerKeyWithHotKeyFanout<>(name, fn, fnDisplayData, - new SerializableFunction() { + new SimpleFunction() { + @Override + public void populateDisplayData(Builder builder) { + super.populateDisplayData(builder); + builder.add(DisplayData.item("fanout", hotKeyFanout) + .withLabel("Key Fanout Size")); + } + @Override public Integer apply(K unused) { return hotKeyFanout; @@ -1947,7 +1955,7 @@ public Coder getAccumulatorCoder( @Override public void populateDisplayData(DisplayData.Builder builder) { - builder.include(keyedFn); + builder.include(PerKeyWithHotKeyFanout.this); } }; postCombine = @@ -1995,7 +2003,7 @@ public Coder getAccumulatorCoder(CoderRegistry registry, Coder keyCod } @Override public void populateDisplayData(DisplayData.Builder builder) { - builder.include(keyedFn); + builder.include(PerKeyWithHotKeyFanout.this); } }; } else { @@ -2039,7 +2047,7 @@ public Coder getAccumulatorCoder( } @Override public void populateDisplayData(DisplayData.Builder builder) { - builder.include(keyedFnWithContext); + builder.include(PerKeyWithHotKeyFanout.this); } }; postCombine = @@ -2088,7 +2096,7 @@ public Coder getAccumulatorCoder(CoderRegistry registry, Coder keyCod } @Override public void populateDisplayData(DisplayData.Builder builder) { - builder.include(keyedFnWithContext); + builder.include(PerKeyWithHotKeyFanout.this); } }; } @@ -2172,6 +2180,9 @@ public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); Combine.populateDisplayData(builder, fn, fnDisplayData); + if (hotKeyFanout instanceof HasDisplayData) { + builder.include((HasDisplayData) hotKeyFanout); + } builder.add(DisplayData.item("fanoutFn", hotKeyFanout.getClass()) .withLabel("Fanout Function")); } From 5079a9487f6653522dc001adbba7b4f86b824345 Mon Sep 17 00:00:00 2001 From: Scott Wegner Date: Mon, 29 Aug 2016 15:42:23 -0700 Subject: [PATCH 5/5] fixup! make KeyedCombineFn non-transient to it is available for display data --- .../src/main/java/org/apache/beam/sdk/transforms/Combine.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java index 49328e84d553..2b89372a47a8 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java @@ -1740,7 +1740,7 @@ protected SimpleCombineFn(SerializableFunction, V> combiner) { public static class PerKey extends PTransform>, PCollection>> { - private final transient PerKeyCombineFn fn; + private final PerKeyCombineFn fn; private final DisplayData.Item> fnDisplayData; private final boolean fewKeys; private final List> sideInputs; @@ -1866,7 +1866,7 @@ public void populateDisplayData(DisplayData.Builder builder) { public static class PerKeyWithHotKeyFanout extends PTransform>, PCollection>> { - private final transient PerKeyCombineFn fn; + private final PerKeyCombineFn fn; private final DisplayData.Item> fnDisplayData; private final SerializableFunction hotKeyFanout;