From f9be62f224000f0777375a15548c02e96344f83f Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Wed, 3 May 2017 14:56:37 -0700 Subject: [PATCH 1/3] Split Coder's encode/decode methods into two methods depending on context. This allows the outer context to be marked deprecated. A follow-up PR will remove the old method once all consumers have been updated. --- .../org/apache/beam/sdk/coders/Coder.java | 46 +++++++++++++++++++ .../beam/sdk/coders/StructuredCoder.java | 31 +++++++++++++ 2 files changed, 77 insertions(+) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java index 8ba8ad3e6c23..467c81f153bc 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java @@ -59,6 +59,7 @@ */ public interface Coder extends Serializable { /** The context in which encoding or decoding is being done. */ + @Deprecated class Context { /** * The outer context: the value being encoded or decoded takes @@ -110,6 +111,28 @@ public String toString() { } } + /** + * Encodes the given value of type {@code T} onto the given output stream. + * + * @throws IOException if writing to the {@code OutputStream} fails + * for some reason + * @throws CoderException if the value could not be encoded for some reason + */ + void encode(T value, OutputStream outStream) + throws CoderException, IOException; + + /** + * Encodes the given value of type {@code T} onto the given output stream + * in the outer context. + * + * @throws IOException if writing to the {@code OutputStream} fails + * for some reason + * @throws CoderException if the value could not be encoded for some reason + */ + @Deprecated + void encodeOuter(T value, OutputStream outStream) + throws CoderException, IOException; + /** * Encodes the given value of type {@code T} onto the given output stream * in the given context. @@ -118,6 +141,7 @@ public String toString() { * for some reason * @throws CoderException if the value could not be encoded for some reason */ + @Deprecated void encode(T value, OutputStream outStream, Context context) throws CoderException, IOException; @@ -129,6 +153,28 @@ void encode(T value, OutputStream outStream, Context context) * for some reason * @throws CoderException if the value could not be decoded for some reason */ + T decode(InputStream inStream) throws CoderException, IOException; + + /** + * Decodes a value of type {@code T} from the given input stream in + * the outer context. Returns the decoded value. + * + * @throws IOException if reading from the {@code InputStream} fails + * for some reason + * @throws CoderException if the value could not be decoded for some reason + */ + @Deprecated + T decodeOuter(InputStream inStream) throws CoderException, IOException; + + /** + * Decodes a value of type {@code T} from the given input stream in + * the given context. Returns the decoded value. + * + * @throws IOException if reading from the {@code InputStream} fails + * for some reason + * @throws CoderException if the value could not be decoded for some reason + */ + @Deprecated T decode(InputStream inStream, Context context) throws CoderException, IOException; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuredCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuredCoder.java index 0cd53b0573c8..d8542fa24468 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuredCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuredCoder.java @@ -20,6 +20,9 @@ import com.google.common.io.ByteStreams; import com.google.common.io.CountingOutputStream; import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -98,6 +101,34 @@ public String toString() { return builder.toString(); } + public void encode(T value, OutputStream outStream) + throws CoderException, IOException { + encode(value, outStream, Coder.Context.NESTED); + } + + @Deprecated + public void encodeOuter(T value, OutputStream outStream) + throws CoderException, IOException { + encode(value, outStream, Coder.Context.OUTER); + } + + public T decode(InputStream inStream) throws CoderException, IOException { + return decode(inStream, Coder.Context.NESTED); + } + + /** + * Decodes a value of type {@code T} from the given input stream in + * the outer context. Returns the decoded value. + * + * @throws IOException if reading from the {@code InputStream} fails + * for some reason + * @throws CoderException if the value could not be decoded for some reason + */ + @Deprecated + public T decodeOuter(InputStream inStream) throws CoderException, IOException { + return decode(inStream, Coder.Context.OUTER); + } + /** * {@inheritDoc} * From 6dd6447b64aae3e562383734ac65bb2129574f46 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Wed, 3 May 2017 16:35:46 -0700 Subject: [PATCH 2/3] add default decode() and encode() methods --- .../beam/sdk/coders/StructuredCoder.java | 28 +++++++++++++------ 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuredCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuredCoder.java index d8542fa24468..36dba90aad21 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuredCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuredCoder.java @@ -112,23 +112,35 @@ public void encodeOuter(T value, OutputStream outStream) encode(value, outStream, Coder.Context.OUTER); } + @Deprecated + public void encode(T value, OutputStream outStream, Coder.Context context) + throws CoderException, IOException { + if (context == Coder.Context.NESTED) { + encode(value, outStream); + } else { + encodeOuter(value, outStream); + } + } + public T decode(InputStream inStream) throws CoderException, IOException { return decode(inStream, Coder.Context.NESTED); } - /** - * Decodes a value of type {@code T} from the given input stream in - * the outer context. Returns the decoded value. - * - * @throws IOException if reading from the {@code InputStream} fails - * for some reason - * @throws CoderException if the value could not be decoded for some reason - */ @Deprecated public T decodeOuter(InputStream inStream) throws CoderException, IOException { return decode(inStream, Coder.Context.OUTER); } + @Deprecated + public T decode(InputStream inStream, Coder.Context context) + throws CoderException, IOException { + if (context == Coder.Context.NESTED) { + return decode(inStream); + } else { + return decodeOuter(inStream); + } + } + /** * {@inheritDoc} * From 2dc6c86464396e3f32c2ca84a565b7db198c4bf5 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Wed, 3 May 2017 17:28:56 -0700 Subject: [PATCH 3/3] Add non-context versions of size estimation utilities. --- .../org/apache/beam/sdk/coders/Coder.java | 26 +++++++++++++++++++ .../beam/sdk/coders/StructuredCoder.java | 18 +++++++++++++ 2 files changed, 44 insertions(+) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java index 467c81f153bc..c923719eb025 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java @@ -246,6 +246,19 @@ T decode(InputStream inStream, Context context) * {@link org.apache.beam.sdk.runners.PipelineRunner} * implementations. */ + boolean isRegisterByteSizeObserverCheap(T value); + + /** + * Returns whether {@link #registerByteSizeObserver} cheap enough to + * call for every element, that is, if this {@code Coder} can + * calculate the byte size of the element to be coded in roughly + * constant time (or lazily). + * + *

Not intended to be called by user code, but instead by + * {@link org.apache.beam.sdk.runners.PipelineRunner} + * implementations. + */ + @Deprecated boolean isRegisterByteSizeObserverCheap(T value, Context context); /** @@ -256,6 +269,19 @@ T decode(InputStream inStream, Context context) * {@link org.apache.beam.sdk.runners.PipelineRunner} * implementations. */ + void registerByteSizeObserver( + T value, ElementByteSizeObserver observer) + throws Exception; + + /** + * Notifies the {@code ElementByteSizeObserver} about the byte size + * of the encoded value using this {@code Coder}. + * + *

Not intended to be called by user code, but instead by + * {@link org.apache.beam.sdk.runners.PipelineRunner} + * implementations. + */ + @Deprecated void registerByteSizeObserver( T value, ElementByteSizeObserver observer, Context context) throws Exception; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuredCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuredCoder.java index 36dba90aad21..cc39429a8e20 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuredCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuredCoder.java @@ -141,6 +141,18 @@ public T decode(InputStream inStream, Coder.Context context) } } + /** + * {@inheritDoc} + * + * @return {@code false} unless it is overridden. {@link StructuredCoder#registerByteSizeObserver} + * invokes {@link #getEncodedElementByteSize} which requires re-encoding an element + * unless it is overridden. This is considered expensive. + */ + @Override + public boolean isRegisterByteSizeObserverCheap(T value) { + return isRegisterByteSizeObserverCheap(value, Context.NESTED); + } + /** * {@inheritDoc} * @@ -167,6 +179,12 @@ protected long getEncodedElementByteSize(T value, Context context) } } + @Override + public void registerByteSizeObserver(T value, ElementByteSizeObserver observer) + throws Exception { + registerByteSizeObserver(value, observer, Context.NESTED); + } + /** * {@inheritDoc} *