From d8b61969f6adc276741db95567af0cb98f2ff6f9 Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Wed, 26 Apr 2017 20:12:04 -0700 Subject: [PATCH] Revert "Make WindowedValueCoder an Interface" This reverts commit 691269565b537370c133598c2c32bd3f87eb8c29. --- .../apache/beam/sdk/util/WindowedValue.java | 74 +++++++++++-------- 1 file changed, 42 insertions(+), 32 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java index 5d692f2623cc..6b7595119959 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java @@ -38,7 +38,6 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.CollectionCoder; -import org.apache.beam.sdk.coders.CustomCoder; import org.apache.beam.sdk.coders.InstantCoder; import org.apache.beam.sdk.coders.StandardCoder; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -573,24 +572,35 @@ public static ValueOnlyWindowedValueCoder getValueOnlyCoder(Coder valu return ValueOnlyWindowedValueCoder.of(valueCoder); } - /** Abstract class for {@code WindowedValue} coder. */ - public interface WindowedValueCoder extends Coder> { + /** + * Abstract class for {@code WindowedValue} coder. + */ + public abstract static class WindowedValueCoder + extends StandardCoder> { + final Coder valueCoder; + + WindowedValueCoder(Coder valueCoder) { + this.valueCoder = checkNotNull(valueCoder); + } + /** - * Returns the coder used to encode the values within this {@link WindowedValueCoder}. + * Returns the value coder. */ - Coder getValueCoder(); + public Coder getValueCoder() { + return valueCoder; + } /** - * Returns a new {@code WindowedValueCoder} that is a copy of this one, but with a different - * value coder. + * Returns a new {@code WindowedValueCoder} that is a copy of this one, + * but with a different value coder. */ - WindowedValueCoder withValueCoder(Coder valueCoder); + public abstract WindowedValueCoder withValueCoder(Coder valueCoder); } - /** Coder for {@code WindowedValue}. */ - public static class FullWindowedValueCoder extends StandardCoder> - implements WindowedValueCoder { - private final Coder valueCoder; + /** + * Coder for {@code WindowedValue}. + */ + public static class FullWindowedValueCoder extends WindowedValueCoder { private final Coder windowCoder; // Precompute and cache the coder for a list of windows. private final Coder> windowsCoder; @@ -614,7 +624,7 @@ public static FullWindowedValueCoder of( FullWindowedValueCoder(Coder valueCoder, Coder windowCoder) { - this.valueCoder = checkNotNull(valueCoder); + super(valueCoder); this.windowCoder = checkNotNull(windowCoder); // It's not possible to statically type-check correct use of the // windowCoder (we have to ensure externally that we only get @@ -635,14 +645,6 @@ public Coder> getWindowsCoder() { return windowsCoder; } - /** - * Returns the value coder. - */ - @Override - public Coder getValueCoder() { - return valueCoder; - } - @Override public WindowedValueCoder withValueCoder(Coder valueCoder) { return new FullWindowedValueCoder<>(valueCoder, windowCoder); @@ -715,23 +717,24 @@ public List> getComponents() { * Coder for {@code WindowedValue}. * *

A {@code ValueOnlyWindowedValueCoder} only encodes and decodes the value. It drops - * timestamps and windows when encoding, and uses a default timestamp and window when decoding. + * timestamp and windows for encoding, and uses defaults timestamp, and windows for decoding. */ - public static class ValueOnlyWindowedValueCoder extends CustomCoder> - implements WindowedValueCoder { - public static ValueOnlyWindowedValueCoder of(Coder valueCoder) { + public static class ValueOnlyWindowedValueCoder extends WindowedValueCoder { + public static ValueOnlyWindowedValueCoder of( + Coder valueCoder) { return new ValueOnlyWindowedValueCoder<>(valueCoder); } - private final Coder valueCoder; - - ValueOnlyWindowedValueCoder(Coder valueCoder) { - this.valueCoder = valueCoder; + @JsonCreator + public static ValueOnlyWindowedValueCoder of( + @JsonProperty(PropertyNames.COMPONENT_ENCODINGS) + List> components) { + checkArgument(components.size() == 1, "Expecting 1 component, got " + components.size()); + return of(components.get(0)); } - @Override - public Coder getValueCoder() { - return valueCoder; + ValueOnlyWindowedValueCoder(Coder valueCoder) { + super(valueCoder); } @Override @@ -766,6 +769,13 @@ public void registerByteSizeObserver( valueCoder.registerByteSizeObserver(value.getValue(), observer, context); } + @Override + public CloudObject initializeCloudObject() { + CloudObject result = CloudObject.forClass(getClass()); + addBoolean(result, PropertyNames.IS_WRAPPER, true); + return result; + } + @Override public List> getCoderArguments() { return Arrays.>asList(valueCoder);