From 83765ab259b6ef231dbca88eb479280ecbaf792c Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Fri, 5 May 2017 10:08:32 -0700 Subject: [PATCH 1/5] Re-add AtomicCoder This is a moderately useful base class for coders which take no configuration. --- .../apache/beam/sdk/coders/AtomicCoder.java | 86 +++++++++++++++++++ 1 file changed, 86 insertions(+) create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AtomicCoder.java diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AtomicCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AtomicCoder.java new file mode 100644 index 000000000000..528cfb06d47b --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AtomicCoder.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.coders; + +import java.util.Collections; +import java.util.List; + +/** + * A {@link Coder} that has no component {@link Coder Coders} or other state. + * + *

Note that, unless the behavior is overridden, atomic coders are presumed to be deterministic. + * + *

All atomic coders of the same class are considered to be equal to each other. As a result, + * an {@link AtomicCoder} should have no associated state. + * + * @param the type of the values being transcoded + */ +public abstract class AtomicCoder extends StructuredCoder { + /** + * Returns an empty list. + * + *

{@link CoderFactories#fromStaticMethods(Class)} builds a {@link CoderFactory} from the + * {@code #of()} method and this method, used to determine the components of an object. Because + * {@link AtomicCoder} has no components, always returns an empty list. + * + * @param exampleValue unused, but part of the latent interface expected by {@link + * CoderFactories#fromStaticMethods} + */ + public static List getInstanceComponents(T exampleValue) { + return Collections.emptyList(); + } + + /** + * {@inheritDoc}. + * + * @throws NonDeterministicException + */ + @Override + public void verifyDeterministic() throws NonDeterministicException {} + + @Override + public List> getCoderArguments() { + return null; + } + + /** + * {@inheritDoc}. + * + * @return the empty {@link List}. + */ + @Override + public final List> getComponents() { + return Collections.emptyList(); + } + + /** + * {@inheritDoc}. + * + * @return true if the other object has the same class as this {@link AtomicCoder}. + */ + @Override + public final boolean equals(Object other) { + return other != null && this.getClass().equals(other.getClass()); + } + + @Override + public final int hashCode() { + return this.getClass().hashCode(); + } +} From 83ecd4b4750f4c00da580a581d1fbd645256689a Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Fri, 5 May 2017 10:10:40 -0700 Subject: [PATCH 2/5] Add default implementations of Coder methods to Coder Remove from StructuredCoder. These are sensible defaults implemented in terms of other Coder methods. --- .../org/apache/beam/sdk/coders/Coder.java | 121 +++++++++++++++--- .../beam/sdk/coders/StructuredCoder.java | 51 +------- 2 files changed, 108 insertions(+), 64 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java index 061e9e535338..41e83acf6e50 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java @@ -22,6 +22,9 @@ import com.google.common.base.Joiner; import com.google.common.base.MoreObjects; import com.google.common.base.Objects; +import com.google.common.io.ByteStreams; +import com.google.common.io.CountingOutputStream; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -205,6 +208,30 @@ public abstract T decode(InputStream inStream, Context context) */ public abstract void verifyDeterministic() throws Coder.NonDeterministicException; + /** + * Verifies all of the provided coders are deterministic. If any are not, throws a {@link + * NonDeterministicException} for the {@code target} {@link Coder}. + */ + public static void verifyDeterministic(Coder target, String message, Iterable> coders) + throws NonDeterministicException { + for (Coder coder : coders) { + try { + coder.verifyDeterministic(); + } catch (NonDeterministicException e) { + throw new NonDeterministicException(target, message, e); + } + } + } + + /** + * Verifies all of the provided coders are deterministic. If any are not, throws a {@link + * NonDeterministicException} for the {@code target} {@link Coder}. + */ + public static void verifyDeterministic(Coder target, String message, Coder... coders) + throws NonDeterministicException { + verifyDeterministic(target, message, Arrays.asList(coders)); + } + /** * Returns {@code true} if this {@link Coder} is injective with respect to {@link Objects#equals}. * @@ -214,28 +241,50 @@ public abstract T decode(InputStream inStream, Context context) *

This condition is most notably false for arrays. More generally, this condition is false * whenever {@code equals()} compares object identity, rather than performing a * semantic/structural comparison. + * + *

By default, returns false. */ - public abstract boolean consistentWithEquals(); + public boolean consistentWithEquals() { + return false; + } /** - * Returns an object with an {@code Object.equals()} method that represents structural equality - * on the argument. + * Returns an object with an {@code Object.equals()} method that represents structural equality on + * the argument. * *

For any two values {@code x} and {@code y} of type {@code T}, if their encoded bytes are the * same, then it must be the case that {@code structuralValue(x).equals(@code structuralValue(y)}. * *

Most notably: + * *

    *
  • The structural value for an array coder should perform a structural comparison of the - * contents of the arrays, rather than the default behavior of comparing according to object - * identity. - *
  • The structural value for a coder accepting {@code null} should be a proper object with - * an {@code equals()} method, even if the input value is {@code null}. + * contents of the arrays, rather than the default behavior of comparing according to object + * identity. + *
  • The structural value for a coder accepting {@code null} should be a proper object with an + * {@code equals()} method, even if the input value is {@code null}. *
* *

See also {@link #consistentWithEquals()}. + * + *

By default, if this coder is {@link #consistentWithEquals()}, and the value is not null, + * returns the provided object. Otherwise, encodes the value into a {@code byte[]}, and returns + * an object that performs array equality on the encoded bytes. */ - public abstract Object structuralValue(T value); + public Object structuralValue(T value) { + if (value != null && consistentWithEquals()) { + return value; + } else { + try { + ByteArrayOutputStream os = new ByteArrayOutputStream(); + encode(value, os, Context.OUTER); + return new StructuralByteArray(os.toByteArray()); + } catch (Exception exn) { + throw new IllegalArgumentException( + "Unable to encode element '" + value + "' with coder '" + this + "'.", exn); + } + } + } /** * Returns whether {@link #registerByteSizeObserver} cheap enough to @@ -246,21 +295,44 @@ public abstract T decode(InputStream inStream, Context context) *

Not intended to be called by user code, but instead by * {@link PipelineRunner} * implementations. + * + *

By default, returns false. The default {@link #registerByteSizeObserver} implementation + * invokes {@link #getEncodedElementByteSize} which requires re-encoding an element + * unless it is overridden. This is considered expensive. */ - public abstract boolean isRegisterByteSizeObserverCheap(T value); + public boolean isRegisterByteSizeObserverCheap(T value) { + return isRegisterByteSizeObserverCheap(value, Context.NESTED); + } /** - * Returns whether {@link #registerByteSizeObserver} cheap enough to - * call for every element, that is, if this {@code Coder} can - * calculate the byte size of the element to be coded in roughly - * constant time (or lazily). + * {@inheritDoc} * *

Not intended to be called by user code, but instead by * {@link PipelineRunner} * implementations. + * + * @return {@code false} unless it is overridden. {@link StructuredCoder#registerByteSizeObserver} + * invokes {@link #getEncodedElementByteSize} which requires re-encoding an element + * unless it is overridden. This is considered expensive. */ @Deprecated - public abstract boolean isRegisterByteSizeObserverCheap(T value, Context context); + public boolean isRegisterByteSizeObserverCheap(T value, Context context) { + return false; + } + + /** + * Returns the size in bytes of the encoded value using this coder. + */ + protected long getEncodedElementByteSize(T value, Context context) + throws Exception { + try (CountingOutputStream os = new CountingOutputStream(ByteStreams.nullOutputStream())) { + encode(value, os, context); + return os.getCount(); + } catch (Exception exn) { + throw new IllegalArgumentException( + "Unable to encode element '" + value + "' with coder '" + this + "'.", exn); + } + } /** * Notifies the {@code ElementByteSizeObserver} about the byte size @@ -269,10 +341,14 @@ public abstract T decode(InputStream inStream, Context context) *

Not intended to be called by user code, but instead by * {@link PipelineRunner} * implementations. + * + *

By default, this notifies {@code observer} about the byte size + * of the encoded value using this coder as returned by {@link #getEncodedElementByteSize}. */ - public abstract void registerByteSizeObserver( - T value, ElementByteSizeObserver observer) - throws Exception; + public void registerByteSizeObserver(T value, ElementByteSizeObserver observer) + throws Exception { + registerByteSizeObserver(value, observer, Context.NESTED); + } /** * Notifies the {@code ElementByteSizeObserver} about the byte size @@ -283,15 +359,20 @@ public abstract void registerByteSizeObserver( * implementations. */ @Deprecated - public abstract void registerByteSizeObserver( + public void registerByteSizeObserver( T value, ElementByteSizeObserver observer, Context context) - throws Exception; + throws Exception { + observer.update(getEncodedElementByteSize(value, context)); + } /** * Returns the {@link TypeDescriptor} for the type encoded. */ @Experimental(Kind.CODER_TYPE_ENCODING) - public abstract TypeDescriptor getEncodedTypeDescriptor(); + public TypeDescriptor getEncodedTypeDescriptor(){ + return (TypeDescriptor) + TypeDescriptor.of(getClass()).resolveType(new TypeDescriptor() {}.getType()); + } /** * Exception thrown by {@link Coder#verifyDeterministic()} if the encoding is diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuredCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuredCoder.java index 0c72618c2108..af92e931e4ba 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuredCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuredCoder.java @@ -26,13 +26,15 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; -import org.apache.beam.sdk.util.common.ElementByteSizeObserver; import org.apache.beam.sdk.values.TypeDescriptor; /** * An abstract base class to implement a {@link Coder} that defines equality, hashing, and printing * via the class name and recursively using {@link #getComponents}. * + *

A {@link StructuredCoder} should be defined purely in terms of its component coders, and + * contain no additional configuration. + * *

To extend {@link StructuredCoder}, override the following methods as appropriate: * *

    @@ -101,12 +103,14 @@ public String toString() { return builder.toString(); } + @Override public void encode(T value, OutputStream outStream) throws CoderException, IOException { encode(value, outStream, Coder.Context.NESTED); } @Deprecated + @Override public void encodeOuter(T value, OutputStream outStream) throws CoderException, IOException { encode(value, outStream, Coder.Context.OUTER); @@ -122,11 +126,13 @@ public void encode(T value, OutputStream outStream, Coder.Context context) } } + @Override public T decode(InputStream inStream) throws CoderException, IOException { return decode(inStream, Coder.Context.NESTED); } @Deprecated + @Override public T decodeOuter(InputStream inStream) throws CoderException, IOException { return decode(inStream, Coder.Context.OUTER); } @@ -141,30 +147,6 @@ public T decode(InputStream inStream, Coder.Context context) } } - /** - * {@inheritDoc} - * - * @return {@code false} unless it is overridden. {@link StructuredCoder#registerByteSizeObserver} - * invokes {@link #getEncodedElementByteSize} which requires re-encoding an element - * unless it is overridden. This is considered expensive. - */ - @Override - public boolean isRegisterByteSizeObserverCheap(T value) { - return isRegisterByteSizeObserverCheap(value, Context.NESTED); - } - - /** - * {@inheritDoc} - * - * @return {@code false} unless it is overridden. {@link StructuredCoder#registerByteSizeObserver} - * invokes {@link #getEncodedElementByteSize} which requires re-encoding an element - * unless it is overridden. This is considered expensive. - */ - @Override - public boolean isRegisterByteSizeObserverCheap(T value, Context context) { - return false; - } - /** * Returns the size in bytes of the encoded value using this coder. */ @@ -179,25 +161,6 @@ protected long getEncodedElementByteSize(T value, Context context) } } - @Override - public void registerByteSizeObserver(T value, ElementByteSizeObserver observer) - throws Exception { - registerByteSizeObserver(value, observer, Context.NESTED); - } - - /** - * {@inheritDoc} - * - *

    For {@link StructuredCoder} subclasses, this notifies {@code observer} about the byte size - * of the encoded value using this coder as returned by {@link #getEncodedElementByteSize}. - */ - @Override - public void registerByteSizeObserver( - T value, ElementByteSizeObserver observer, Context context) - throws Exception { - observer.update(getEncodedElementByteSize(value, context)); - } - protected void verifyDeterministic(String message, Iterable> coders) throws NonDeterministicException { for (Coder coder : coders) { From 2d47ecb2b372c31fd504ac25c708d84ba3d63ecf Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Fri, 5 May 2017 10:13:05 -0700 Subject: [PATCH 3/5] Reparent many Coders to Atomic or StructuredCoder These coders do not take configuration, or take configuration only in terms of other Coders, and are appropriate to reparent. --- .../translation/utils/ApexStreamTuple.java | 5 ++- .../UnboundedReadFromBoundedSource.java | 4 +- .../runners/core/construction/CodersTest.java | 14 +------ .../core/construction/PCollectionsTest.java | 3 +- .../core/ElementAndRestrictionCoder.java | 17 +++++++- .../beam/runners/core/KeyedWorkItemCoder.java | 4 +- .../beam/runners/core/TimerInternals.java | 6 +-- .../direct/CloningBundleFactoryTest.java | 10 ++--- .../beam/runners/direct/DirectRunnerTest.java | 5 +-- .../UnboundedReadEvaluatorFactoryTest.java | 4 +- .../SingletonKeyedWorkItemCoder.java | 4 +- .../runners/dataflow/BatchViewOverrides.java | 8 ++-- .../runners/dataflow/internal/IsmFormat.java | 40 +++++++++++++++---- .../dataflow/util/RandomAccessData.java | 4 +- .../org/apache/beam/sdk/coders/AvroCoder.java | 19 +++++++++ .../beam/sdk/coders/BigDecimalCoder.java | 2 +- .../sdk/coders/BigEndianIntegerCoder.java | 2 +- .../beam/sdk/coders/BigEndianLongCoder.java | 4 +- .../beam/sdk/coders/BigIntegerCoder.java | 4 +- .../apache/beam/sdk/coders/BitSetCoder.java | 4 +- .../beam/sdk/coders/ByteArrayCoder.java | 6 +-- .../org/apache/beam/sdk/coders/ByteCoder.java | 2 +- .../beam/sdk/coders/CoderFactories.java | 9 +++-- .../apache/beam/sdk/coders/DoubleCoder.java | 2 +- .../apache/beam/sdk/coders/DurationCoder.java | 2 +- .../apache/beam/sdk/coders/InstantCoder.java | 2 +- .../org/apache/beam/sdk/coders/KvCoder.java | 4 +- .../org/apache/beam/sdk/coders/ListCoder.java | 3 +- .../org/apache/beam/sdk/coders/MapCoder.java | 2 +- .../apache/beam/sdk/coders/NullableCoder.java | 6 +-- .../beam/sdk/coders/StringUtf8Coder.java | 2 +- .../beam/sdk/coders/TextualIntegerCoder.java | 2 +- .../apache/beam/sdk/coders/VarIntCoder.java | 2 +- .../org/apache/beam/sdk/coders/VoidCoder.java | 4 +- .../org/apache/beam/sdk/io/FileBasedSink.java | 4 +- .../sdk/transforms/ApproximateQuantiles.java | 26 +++++++++--- .../apache/beam/sdk/transforms/Combine.java | 12 ++++-- .../beam/sdk/transforms/CombineFns.java | 4 +- .../org/apache/beam/sdk/transforms/Count.java | 4 +- .../org/apache/beam/sdk/transforms/Mean.java | 7 ++-- .../org/apache/beam/sdk/transforms/Top.java | 23 ++++++++++- .../beam/sdk/transforms/join/CoGbkResult.java | 2 +- .../beam/sdk/transforms/join/UnionCoder.java | 7 ++-- .../sdk/transforms/windowing/PaneInfo.java | 10 ++++- .../org/apache/beam/sdk/util/BitSetCoder.java | 9 +++-- .../apache/beam/sdk/util/WindowedValue.java | 7 ++-- .../beam/sdk/values/TimestampedValue.java | 13 ++++-- .../beam/sdk/values/ValueInSingleWindow.java | 4 +- .../beam/sdk/values/ValueWithRecordId.java | 4 +- .../beam/sdk/coders/CoderRegistryTest.java | 4 +- .../beam/sdk/coders/DelegateCoderTest.java | 25 ------------ .../beam/sdk/coders/NullableCoderTest.java | 2 +- .../beam/sdk/testing/CoderPropertiesTest.java | 37 +++++++++++++++-- .../apache/beam/sdk/testing/PAssertTest.java | 4 +- .../sdk/testing/SerializableMatchersTest.java | 4 +- .../beam/sdk/testing/WindowSupplierTest.java | 6 +-- .../beam/sdk/transforms/CombineFnsTest.java | 4 +- .../beam/sdk/transforms/CombineTest.java | 6 +-- .../beam/sdk/transforms/CreateTest.java | 6 +-- .../beam/sdk/transforms/GroupByKeyTest.java | 4 +- .../apache/beam/sdk/transforms/ParDoTest.java | 6 +-- .../apache/beam/sdk/transforms/ViewTest.java | 4 +- .../transforms/reflect/DoFnInvokersTest.java | 6 +-- .../apache/beam/sdk/util/CoderUtilsTest.java | 4 +- .../beam/sdk/util/SerializableUtilsTest.java | 4 +- .../extensions/protobuf/ByteStringCoder.java | 4 +- .../sdk/io/gcp/bigquery/ShardedKeyCoder.java | 10 ++--- .../gcp/bigquery/TableDestinationCoder.java | 4 +- .../io/gcp/bigquery/TableRowInfoCoder.java | 4 +- .../io/gcp/bigquery/TableRowJsonCoder.java | 4 +- .../io/gcp/bigquery/WriteBundlesToFiles.java | 11 ++++- .../io/gcp/pubsub/PubsubUnboundedSink.java | 4 +- .../io/gcp/pubsub/PubsubUnboundedSource.java | 12 ++++-- .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 4 +- .../beam/sdk/io/hadoop/WritableCoder.java | 18 +++++++++ .../beam/sdk/io/hbase/HBaseMutationCoder.java | 4 +- .../beam/sdk/io/hbase/HBaseResultCoder.java | 4 +- .../org/apache/beam/sdk/io/kafka/KafkaIO.java | 6 +-- .../beam/sdk/io/kafka/KafkaRecordCoder.java | 4 +- .../sdk/io/kinesis/KinesisRecordCoder.java | 4 +- .../org/apache/beam/sdk/io/xml/JAXBCoder.java | 18 +++++++++ .../apache/beam/sdk/io/xml/JAXBCoderTest.java | 4 +- 82 files changed, 369 insertions(+), 223 deletions(-) diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStreamTuple.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStreamTuple.java index 4ce351b882f3..4aa6ee82017e 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStreamTuple.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStreamTuple.java @@ -31,7 +31,7 @@ import org.apache.beam.runners.apex.ApexPipelineOptions; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.CustomCoder; +import org.apache.beam.sdk.coders.StructuredCoder; /** * The common interface for all objects transmitted through streams. @@ -149,7 +149,7 @@ public String toString() { /** * Coder for {@link ApexStreamTuple}. */ - class ApexStreamTupleCoder extends CustomCoder> { + class ApexStreamTupleCoder extends StructuredCoder> { private static final long serialVersionUID = 1L; final Coder valueCoder; @@ -194,6 +194,7 @@ public List> getCoderArguments() { @Override public void verifyDeterministic() throws NonDeterministicException { verifyDeterministic( + this, this.getClass().getSimpleName() + " requires a deterministic valueCoder", valueCoder); } diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java index 0ea13b8235d4..1424b8be2518 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java @@ -35,10 +35,10 @@ import javax.annotation.Nullable; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.CustomCoder; import org.apache.beam.sdk.coders.ListCoder; import org.apache.beam.sdk.coders.NullableCoder; import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.coders.StructuredCoder; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.BoundedSource.BoundedReader; import org.apache.beam.sdk.io.Read; @@ -203,7 +203,7 @@ public void finalizeCheckpoint() {} } @VisibleForTesting - static class CheckpointCoder extends CustomCoder> { + static class CheckpointCoder extends StructuredCoder> { // The coder for a list of residual elements and their timestamps private final Coder>> elemsCoder; diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java index 32a78faec70e..765723c9bb3d 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java @@ -30,11 +30,11 @@ import java.io.Serializable; import java.util.HashSet; import java.util.Set; +import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.CustomCoder; import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.LengthPrefixCoder; @@ -149,7 +149,7 @@ public void toAndFromProto() throws Exception { static class Record implements Serializable {} - private static class RecordCoder extends CustomCoder { + private static class RecordCoder extends AtomicCoder { @Override public void encode(Record value, OutputStream outStream, Context context) throws CoderException, IOException {} @@ -159,16 +159,6 @@ public Record decode(InputStream inStream, Context context) throws CoderException, IOException { return new Record(); } - - @Override - public boolean equals(Object other) { - return other != null && getClass().equals(other.getClass()); - } - - @Override - public int hashCode() { - return getClass().hashCode(); - } } } } diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionsTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionsTest.java index c177c58cb6d6..2c45cbdb94b3 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionsTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionsTest.java @@ -29,6 +29,7 @@ import java.util.Collection; import java.util.Collections; import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.BigEndianLongCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CustomCoder; @@ -158,7 +159,7 @@ public boolean isCompatible(WindowFn other) { @Override public Coder windowCoder() { - return new CustomCoder() { + return new AtomicCoder() { @Override public void verifyDeterministic() {} @Override diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java index 64c1e14156c0..83c4e6281d71 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java @@ -17,18 +17,20 @@ */ package org.apache.beam.runners.core; +import com.google.common.collect.ImmutableList; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.util.List; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.CustomCoder; +import org.apache.beam.sdk.coders.StructuredCoder; /** A {@link Coder} for {@link ElementAndRestriction}. */ @Experimental(Experimental.Kind.SPLITTABLE_DO_FN) public class ElementAndRestrictionCoder - extends CustomCoder> { + extends StructuredCoder> { private final Coder elementCoder; private final Coder restrictionCoder; @@ -65,6 +67,17 @@ public ElementAndRestriction decode(InputStream inStream return ElementAndRestriction.of(key, value); } + @Override + public List> getCoderArguments() { + return ImmutableList.of(elementCoder, restrictionCoder); + } + + @Override + public void verifyDeterministic() throws NonDeterministicException { + elementCoder.verifyDeterministic(); + restrictionCoder.verifyDeterministic(); + } + public Coder getElementCoder() { return elementCoder; } diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java index fddf7fa478e1..e1872b542260 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java @@ -26,8 +26,8 @@ import org.apache.beam.runners.core.TimerInternals.TimerDataCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.CustomCoder; import org.apache.beam.sdk.coders.IterableCoder; +import org.apache.beam.sdk.coders.StructuredCoder; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder; @@ -35,7 +35,7 @@ /** * A {@link Coder} for {@link KeyedWorkItem KeyedWorkItems}. */ -public class KeyedWorkItemCoder extends CustomCoder> { +public class KeyedWorkItemCoder extends StructuredCoder> { /** * Create a new {@link KeyedWorkItemCoder} with the provided key coder, element coder, and window * coder. diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java index 21fe4301ea57..888c11f97313 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java @@ -27,9 +27,9 @@ import javax.annotation.Nullable; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.CustomCoder; import org.apache.beam.sdk.coders.InstantCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.StructuredCoder; import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -224,7 +224,7 @@ public int compareTo(TimerData that) { /** * A {@link Coder} for {@link TimerData}. */ - class TimerDataCoder extends CustomCoder { + class TimerDataCoder extends StructuredCoder { private static final StringUtf8Coder STRING_CODER = StringUtf8Coder.of(); private static final InstantCoder INSTANT_CODER = InstantCoder.of(); private final Coder windowCoder; @@ -266,7 +266,7 @@ public List> getCoderArguments() { @Override public void verifyDeterministic() throws NonDeterministicException { - verifyDeterministic("window coder must be deterministic", windowCoder); + verifyDeterministic(this, "window coder must be deterministic", windowCoder); } } } diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java index 7d037d124fd7..33d171e50449 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java @@ -31,8 +31,8 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.CustomCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; @@ -173,7 +173,7 @@ public void keyedBundleDecodeFailsAddFails() { } static class Record {} - static class RecordNoEncodeCoder extends CustomCoder { + static class RecordNoEncodeCoder extends AtomicCoder { @Override public void encode( @@ -192,7 +192,7 @@ public Record decode( } } - static class RecordNoDecodeCoder extends CustomCoder { + static class RecordNoDecodeCoder extends AtomicCoder { @Override public void encode( Record value, @@ -208,7 +208,7 @@ public Record decode( } } - private static class RecordStructuralValueCoder extends CustomCoder { + private static class RecordStructuralValueCoder extends AtomicCoder { @Override public void encode( Record value, @@ -240,7 +240,7 @@ public Object structuralValue(Record value) { } private static class RecordNotConsistentWithEqualsStructuralValueCoder - extends CustomCoder { + extends AtomicCoder { @Override public void encode( Record value, diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java index 428c6fc43d71..0fe958515465 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java @@ -44,9 +44,9 @@ import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.PipelineResult.State; import org.apache.beam.sdk.PipelineRunner; +import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.CustomCoder; import org.apache.beam.sdk.coders.ListCoder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.coders.VarLongCoder; @@ -523,8 +523,7 @@ public void testUnencodableOutputFromUnboundedRead() { p.run(); } - private static class LongNoDecodeCoder extends CustomCoder { - + private static class LongNoDecodeCoder extends AtomicCoder { @Override public void encode( Long value, OutputStream outStream, Context context) throws IOException { diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java index ceb078be3058..b9ba7f49df79 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java @@ -46,10 +46,10 @@ import javax.annotation.Nullable; import org.apache.beam.runners.direct.UnboundedReadDeduplicator.NeverDeduplicator; import org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory.UnboundedSourceShard; +import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.BigEndianLongCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.CustomCoder; import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.io.CountingSource; import org.apache.beam.sdk.io.Read; @@ -586,7 +586,7 @@ boolean isFinalized() { return finalized; } - public static class Coder extends CustomCoder { + public static class Coder extends AtomicCoder { @Override public void encode( TestCheckpointMark value, diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java index c73700f9dc23..f21869341a01 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java @@ -26,7 +26,7 @@ import org.apache.beam.runners.core.KeyedWorkItemCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.CustomCoder; +import org.apache.beam.sdk.coders.StructuredCoder; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.WindowedValue; @@ -34,7 +34,7 @@ * Singleton keyed work item coder. */ public class SingletonKeyedWorkItemCoder - extends CustomCoder> { + extends StructuredCoder> { /** * Create a new {@link KeyedWorkItemCoder} with the provided key coder, element coder, and window * coder. diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java index ef2bfed39e7c..ecd0365ed88c 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java @@ -50,11 +50,11 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.Coder.NonDeterministicException; import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.CustomCoder; import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.MapCoder; import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.coders.StructuredCoder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.runners.AppliedPTransform; @@ -1335,7 +1335,7 @@ protected Map delegate() { * A {@link Coder} for {@link TransformedMap}s. */ static class TransformedMapCoder - extends CustomCoder> { + extends StructuredCoder> { private final Coder> transformCoder; private final Coder> originalMapCoder; @@ -1373,8 +1373,8 @@ public List> getCoderArguments() { @Override public void verifyDeterministic() throws org.apache.beam.sdk.coders.Coder.NonDeterministicException { - verifyDeterministic("Expected transform coder to be deterministic.", transformCoder); - verifyDeterministic("Expected map coder to be deterministic.", originalMapCoder); + verifyDeterministic(this, "Expected transform coder to be deterministic.", transformCoder); + verifyDeterministic(this, "Expected map coder to be deterministic.", originalMapCoder); } } diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java index fbfe49aaa5ad..aed514a08c7c 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java @@ -32,14 +32,17 @@ import java.io.OutputStream; import java.util.ArrayList; import java.util.List; +import java.util.Objects; import javax.annotation.Nullable; import org.apache.beam.runners.dataflow.util.RandomAccessData; +import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.Coder.NonDeterministicException; import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.CustomCoder; import org.apache.beam.sdk.coders.ListCoder; +import org.apache.beam.sdk.coders.StructuredCoder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.util.VarInt; @@ -356,8 +359,9 @@ public List> getCoderArguments() { @Override public void verifyDeterministic() throws Coder.NonDeterministicException { - verifyDeterministic("Key component coders expected to be deterministic.", keyComponentCoders); - verifyDeterministic("Value coder expected to be deterministic.", valueCoder); + verifyDeterministic( + this, "Key component coders expected to be deterministic.", keyComponentCoders); + verifyDeterministic(this, "Value coder expected to be deterministic.", valueCoder); } @Override @@ -393,6 +397,28 @@ public Object structuralValue(IsmRecord record) { } return super.structuralValue(record); } + + @Override + public boolean equals(Object other) { + if (other == this) { + return true; + } + if (!(other instanceof IsmRecordCoder)) { + return false; + } + IsmRecordCoder that = (IsmRecordCoder) other; + return Objects.equals(this.numberOfShardKeyCoders, that.numberOfShardKeyCoders) + && Objects.equals( + this.numberOfMetadataShardKeyCoders, that.numberOfMetadataShardKeyCoders) + && Objects.equals(this.keyComponentCoders, that.keyComponentCoders) + && Objects.equals(this.valueCoder, that.valueCoder); + } + + @Override + public int hashCode() { + return Objects.hash( + numberOfShardKeyCoders, numberOfMetadataShardKeyCoders, keyComponentCoders, valueCoder); + } } /** @@ -450,7 +476,7 @@ public static Object getMetadataKey() { * A coder for metadata key component. Can be used to wrap key component coder allowing for * the metadata key component to be used as a place holder instead of an actual key. */ - public static class MetadataKeyCoder extends CustomCoder { + public static class MetadataKeyCoder extends StructuredCoder { public static MetadataKeyCoder of(Coder keyCoder) { checkNotNull(keyCoder); return new MetadataKeyCoder<>(keyCoder); @@ -497,7 +523,7 @@ public List> getCoderArguments() { @Override public void verifyDeterministic() throws NonDeterministicException { - verifyDeterministic("Expected key coder to be deterministic", keyCoder); + verifyDeterministic(this, "Expected key coder to be deterministic", keyCoder); } } @@ -584,7 +610,7 @@ public IsmShard withIndexOffset(long indexOffset) { *

  • indexOffset (variable length long encoding)
  • *
*/ - public static class IsmShardCoder extends CustomCoder { + public static class IsmShardCoder extends AtomicCoder { private static final IsmShardCoder INSTANCE = new IsmShardCoder(); /** Returns an IsmShardCoder. */ @@ -649,7 +675,7 @@ public static KeyPrefix of(int sharedKeySize, int unsharedKeySize) { } /** A {@link Coder} for {@link KeyPrefix}. */ - public static final class KeyPrefixCoder extends CustomCoder { + public static final class KeyPrefixCoder extends AtomicCoder { private static final KeyPrefixCoder INSTANCE = new KeyPrefixCoder(); public static KeyPrefixCoder of() { @@ -721,7 +747,7 @@ public static Footer of(long indexPosition, long bloomFilterPosition, long numbe } /** A {@link Coder} for {@link Footer}. */ - public static final class FooterCoder extends CustomCoder