From 32dcf3dce9c9b4ed89f4a5a96057641665b35bd4 Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Fri, 5 May 2017 09:20:10 -0700 Subject: [PATCH 1/2] Convert Coder into an Abstract Static Class --- .../org/apache/beam/sdk/coders/Coder.java | 36 +++++++++---------- .../beam/sdk/coders/CoderFactories.java | 35 +++++++++--------- .../beam/sdk/coders/StructuredCoder.java | 2 +- .../DoFnSignaturesSplittableDoFnTest.java | 4 +-- 4 files changed, 39 insertions(+), 38 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java index c923719eb025..169e448623c7 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java @@ -57,10 +57,10 @@ * * @param the type of the values being transcoded */ -public interface Coder extends Serializable { +public abstract class Coder implements Serializable { /** The context in which encoding or decoding is being done. */ @Deprecated - class Context { + public static class Context { /** * The outer context: the value being encoded or decoded takes * up the remainder of the record/stream contents. @@ -118,7 +118,7 @@ public String toString() { * for some reason * @throws CoderException if the value could not be encoded for some reason */ - void encode(T value, OutputStream outStream) + public abstract void encode(T value, OutputStream outStream) throws CoderException, IOException; /** @@ -130,7 +130,7 @@ void encode(T value, OutputStream outStream) * @throws CoderException if the value could not be encoded for some reason */ @Deprecated - void encodeOuter(T value, OutputStream outStream) + public abstract void encodeOuter(T value, OutputStream outStream) throws CoderException, IOException; /** @@ -142,7 +142,7 @@ void encodeOuter(T value, OutputStream outStream) * @throws CoderException if the value could not be encoded for some reason */ @Deprecated - void encode(T value, OutputStream outStream, Context context) + public abstract void encode(T value, OutputStream outStream, Context context) throws CoderException, IOException; /** @@ -153,7 +153,7 @@ void encode(T value, OutputStream outStream, Context context) * for some reason * @throws CoderException if the value could not be decoded for some reason */ - T decode(InputStream inStream) throws CoderException, IOException; + public abstract T decode(InputStream inStream) throws CoderException, IOException; /** * Decodes a value of type {@code T} from the given input stream in @@ -164,7 +164,7 @@ void encode(T value, OutputStream outStream, Context context) * @throws CoderException if the value could not be decoded for some reason */ @Deprecated - T decodeOuter(InputStream inStream) throws CoderException, IOException; + public abstract T decodeOuter(InputStream inStream) throws CoderException, IOException; /** * Decodes a value of type {@code T} from the given input stream in @@ -175,7 +175,7 @@ void encode(T value, OutputStream outStream, Context context) * @throws CoderException if the value could not be decoded for some reason */ @Deprecated - T decode(InputStream inStream, Context context) + public abstract T decode(InputStream inStream, Context context) throws CoderException, IOException; /** @@ -184,7 +184,7 @@ T decode(InputStream inStream, Context context) * returns {@code null} if this cannot be done or this is not a * parameterized type. */ - List> getCoderArguments(); + public abstract List> getCoderArguments(); /** * Throw {@link NonDeterministicException} if the coding is not deterministic. @@ -202,7 +202,7 @@ T decode(InputStream inStream, Context context) * * @throws Coder.NonDeterministicException if this coder is not deterministic. */ - void verifyDeterministic() throws Coder.NonDeterministicException; + public abstract void verifyDeterministic() throws Coder.NonDeterministicException; /** * Returns {@code true} if this {@link Coder} is injective with respect to {@link Objects#equals}. @@ -214,7 +214,7 @@ T decode(InputStream inStream, Context context) * whenever {@code equals()} compares object identity, rather than performing a * semantic/structural comparison. */ - boolean consistentWithEquals(); + public abstract boolean consistentWithEquals(); /** * Returns an object with an {@code Object.equals()} method that represents structural equality @@ -234,7 +234,7 @@ T decode(InputStream inStream, Context context) * *

See also {@link #consistentWithEquals()}. */ - Object structuralValue(T value); + public abstract Object structuralValue(T value); /** * Returns whether {@link #registerByteSizeObserver} cheap enough to @@ -246,7 +246,7 @@ T decode(InputStream inStream, Context context) * {@link org.apache.beam.sdk.runners.PipelineRunner} * implementations. */ - boolean isRegisterByteSizeObserverCheap(T value); + public abstract boolean isRegisterByteSizeObserverCheap(T value); /** * Returns whether {@link #registerByteSizeObserver} cheap enough to @@ -259,7 +259,7 @@ T decode(InputStream inStream, Context context) * implementations. */ @Deprecated - boolean isRegisterByteSizeObserverCheap(T value, Context context); + public abstract boolean isRegisterByteSizeObserverCheap(T value, Context context); /** * Notifies the {@code ElementByteSizeObserver} about the byte size @@ -269,7 +269,7 @@ T decode(InputStream inStream, Context context) * {@link org.apache.beam.sdk.runners.PipelineRunner} * implementations. */ - void registerByteSizeObserver( + public abstract void registerByteSizeObserver( T value, ElementByteSizeObserver observer) throws Exception; @@ -282,7 +282,7 @@ void registerByteSizeObserver( * implementations. */ @Deprecated - void registerByteSizeObserver( + public abstract void registerByteSizeObserver( T value, ElementByteSizeObserver observer, Context context) throws Exception; @@ -290,13 +290,13 @@ void registerByteSizeObserver( * Returns the {@link TypeDescriptor} for the type encoded. */ @Experimental(Kind.CODER_TYPE_ENCODING) - TypeDescriptor getEncodedTypeDescriptor(); + public abstract TypeDescriptor getEncodedTypeDescriptor(); /** * Exception thrown by {@link Coder#verifyDeterministic()} if the encoding is * not deterministic, including details of why the encoding is not deterministic. */ - class NonDeterministicException extends Exception { + public static class NonDeterministicException extends Exception { private Coder coder; private List reasons; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderFactories.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderFactories.java index 0031698c0353..2a1d792b7603 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderFactories.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderFactories.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.coders; +import static com.google.common.base.Preconditions.checkArgument; + import com.google.common.base.MoreObjects; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; @@ -70,7 +72,12 @@ private CoderFactories() { } // Static utility class * will produce a {@code Coder>} for any {@code Coder Coder}. */ public static CoderFactory fromStaticMethods(Class clazz) { - return new CoderFactoryFromStaticMethods(clazz); + checkArgument( + Coder.class.isAssignableFrom(clazz), + "%s is not a subtype of %s", + clazz.getName(), + Coder.class.getSimpleName()); + return new CoderFactoryFromStaticMethods((Class) clazz); } /** @@ -142,7 +149,7 @@ public List getInstanceComponents(Object value) { * Returns a CoderFactory that invokes the given static factory method * to create the Coder. */ - private CoderFactoryFromStaticMethods(Class coderClazz) { + private CoderFactoryFromStaticMethods(Class coderClazz) { this.factoryMethod = getFactoryMethod(coderClazz); this.getComponentsMethod = getInstanceComponentsMethod(coderClazz); } @@ -203,8 +210,8 @@ private Method getFactoryMethod(Class coderClazz) { * each corresponding to an argument of the {@code of} * method. */ - private Method getInstanceComponentsMethod(Class coderClazz) { - TypeDescriptor coderType = TypeDescriptor.of(coderClazz); + private Method getInstanceComponentsMethod(Class coderClazz) { + TypeDescriptor coderType = TypeDescriptor.of(coderClazz); TypeDescriptor argumentType = getCodedType(coderType); // getInstanceComponents may be implemented in a superclass, @@ -235,19 +242,13 @@ private Method getInstanceComponentsMethod(Class coderClazz) { * If {@code coderType} is a subclass of {@link Coder} for a specific * type {@code T}, returns {@code T.class}. Otherwise, raises IllegalArgumentException. */ - private TypeDescriptor getCodedType(TypeDescriptor coderType) { - for (TypeDescriptor ifaceType : coderType.getInterfaces()) { - if (ifaceType.getRawType().equals(Coder.class)) { - ParameterizedType coderIface = (ParameterizedType) ifaceType.getType(); - @SuppressWarnings("unchecked") - TypeDescriptor token = - (TypeDescriptor) TypeDescriptor.of(coderIface.getActualTypeArguments()[0]); - return token; - } - } - throw new IllegalArgumentException( - "cannot build CoderFactory from class " + coderType - + ": does not implement Coder for any T."); + private TypeDescriptor getCodedType(TypeDescriptor coderType) { + TypeDescriptor coderSupertype = coderType.getSupertype(Coder.class); + ParameterizedType coderIface = (ParameterizedType) coderSupertype.getType(); + @SuppressWarnings("unchecked") + TypeDescriptor token = + (TypeDescriptor) TypeDescriptor.of(coderIface.getActualTypeArguments()[0]); + return token; } @Override diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuredCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuredCoder.java index cc39429a8e20..0c72618c2108 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuredCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuredCoder.java @@ -43,7 +43,7 @@ * expensive. * */ -public abstract class StructuredCoder implements Coder { +public abstract class StructuredCoder extends Coder { protected StructuredCoder() {} /** diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java index b937e84c5c96..07b3348fe101 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java @@ -25,8 +25,8 @@ import com.google.common.base.Predicates; import com.google.common.collect.Iterables; import java.util.List; -import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.StructuredCoder; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFn.BoundedPerElement; import org.apache.beam.sdk.transforms.DoFn.UnboundedPerElement; @@ -57,7 +57,7 @@ private static class SomeRestriction {} private abstract static class SomeRestrictionTracker implements RestrictionTracker {} - private abstract static class SomeRestrictionCoder implements Coder {} + private abstract static class SomeRestrictionCoder extends StructuredCoder {} @Test public void testHasRestrictionTracker() throws Exception { From f4fa28342a302ec4c95005819d309977557e930e Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Fri, 5 May 2017 10:22:47 -0700 Subject: [PATCH 2/2] fixup! Convert Coder into an Abstract Static Class --- runners/google-cloud-dataflow-java/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml index e21b6de0f784..c7142fe024b2 100644 --- a/runners/google-cloud-dataflow-java/pom.xml +++ b/runners/google-cloud-dataflow-java/pom.xml @@ -33,7 +33,7 @@ jar - beam-master-20170504-2 + beam-master-20170505-wd-2914 1 6