From 377da9ea25abc24b5f74eed4acd513de620bf7d6 Mon Sep 17 00:00:00 2001 From: Aviem Zur Date: Sat, 4 Mar 2017 07:41:36 +0200 Subject: [PATCH 1/3] [BEAM-463] BoundedHeapCoder should be a StandardCoder --- .../main/java/org/apache/beam/sdk/transforms/Top.java | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java index 47be9b9c7a3b2..0f8e026bece1a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java @@ -19,6 +19,7 @@ import static com.google.common.base.Preconditions.checkArgument; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import java.io.IOException; import java.io.InputStream; @@ -33,6 +34,7 @@ import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.coders.CustomCoder; import org.apache.beam.sdk.coders.ListCoder; +import org.apache.beam.sdk.coders.StandardCoder; import org.apache.beam.sdk.transforms.Combine.AccumulatingCombineFn; import org.apache.beam.sdk.transforms.Combine.AccumulatingCombineFn.Accumulator; import org.apache.beam.sdk.transforms.Combine.PerKey; @@ -525,7 +527,7 @@ private List asList() { * A {@link Coder} for {@link BoundedHeap}, using Java serialization via {@link CustomCoder}. */ private static class BoundedHeapCoder & Serializable> - extends CustomCoder> { + extends StandardCoder> { private final Coder> listCoder; private final ComparatorT compareFn; private final int maximumSize; @@ -549,6 +551,12 @@ public BoundedHeap decode(InputStream inStream, Coder.Context co return new BoundedHeap<>(maximumSize, compareFn, listCoder.decode(inStream, context)); } + @Override + public List> getCoderArguments() { + Coder elementCoder = listCoder.getCoderArguments().get(0); + return ImmutableList.of(elementCoder); + } + @Override public void verifyDeterministic() throws NonDeterministicException { verifyDeterministic( From bcf823a4a181aab204616a228ef4193761f1f9bd Mon Sep 17 00:00:00 2001 From: Aviem Zur Date: Sat, 4 Mar 2017 07:42:07 +0200 Subject: [PATCH 2/3] [BEAM-464] HolderCoder should be a StandardCoder --- .../java/org/apache/beam/sdk/transforms/Combine.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java index 51c5e71a1fad7..233b17fac31ab 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java @@ -35,7 +35,6 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.CoderRegistry; -import org.apache.beam.sdk.coders.CustomCoder; import org.apache.beam.sdk.coders.DelegateCoder; import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; @@ -43,7 +42,6 @@ import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.Combine.AccumulatingCombineFn; import org.apache.beam.sdk.transforms.CombineFnBase.AbstractGlobalCombineFn; import org.apache.beam.sdk.transforms.CombineFnBase.AbstractPerKeyCombineFn; import org.apache.beam.sdk.transforms.CombineFnBase.GlobalCombineFn; @@ -646,7 +644,7 @@ private void set(V value) { /** * A {@link Coder} for a {@link Holder}. */ - private static class HolderCoder extends CustomCoder> { + private static class HolderCoder extends StandardCoder> { private Coder valueCoder; @@ -675,6 +673,11 @@ public Holder decode(InputStream inStream, Context context) } } + @Override + public List> getCoderArguments() { + return ImmutableList.of(valueCoder); + } + @Override public void verifyDeterministic() throws NonDeterministicException { valueCoder.verifyDeterministic(); From c18c3e85ed58074285e46ba3b2d6d85655283d02 Mon Sep 17 00:00:00 2001 From: Aviem Zur Date: Sat, 4 Mar 2017 07:42:23 +0200 Subject: [PATCH 3/3] [BEAM-466] QuantileStateCoder should be a StandardCoder --- .../beam/sdk/transforms/ApproximateQuantiles.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java index ed3a253e10162..f1d81858dccf9 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java @@ -19,6 +19,7 @@ import static com.google.common.base.Preconditions.checkArgument; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.collect.UnmodifiableIterator; @@ -40,8 +41,8 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.CoderRegistry; -import org.apache.beam.sdk.coders.CustomCoder; import org.apache.beam.sdk.coders.ListCoder; +import org.apache.beam.sdk.coders.StandardCoder; import org.apache.beam.sdk.transforms.Combine.AccumulatingCombineFn; import org.apache.beam.sdk.transforms.Combine.AccumulatingCombineFn.Accumulator; import org.apache.beam.sdk.transforms.display.DisplayData; @@ -668,7 +669,7 @@ public boolean hasNext() { * Coder for QuantileState. */ private static class QuantileStateCoder & Serializable> - extends CustomCoder> { + extends StandardCoder> { private final ComparatorT compareFn; private final Coder elementCoder; private final Coder> elementListCoder; @@ -718,6 +719,11 @@ public QuantileState decode(InputStream inStream, Coder.Context compareFn, numQuantiles, min, max, numBuffers, bufferSize, unbufferedElements, buffers); } + @Override + public List> getCoderArguments() { + return ImmutableList.of(elementCoder); + } + private void encodeBuffer( QuantileBuffer buffer, OutputStream outStream, Coder.Context context) throws CoderException, IOException {