From 7d0a5632e51d4bc0bd96cc685a15e4e0393a6221 Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Tue, 9 May 2017 18:04:01 -0700 Subject: [PATCH 1/2] Improve DirectRunner Javadoc --- .../beam/runners/direct/DirectOptions.java | 2 +- .../beam/runners/direct/DirectRegistrar.java | 10 +++--- .../beam/runners/direct/DirectRunner.java | 33 ++++++++++++------- 3 files changed, 28 insertions(+), 17 deletions(-) diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java index 3b66cc661aae..574ab46fb449 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java @@ -49,7 +49,7 @@ public interface DirectOptions extends PipelineOptions, ApplicationNameOptions { @Default.Boolean(true) @Description( "Controls whether the DirectRunner should ensure that all of the elements of every " - + "PCollection are encodable. All elements in a PCollection must be encodable.") + + "PCollection can be encoded and decoded by that PCollection's Coder.") boolean isEnforceEncodability(); void setEnforceEncodability(boolean test); diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRegistrar.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRegistrar.java index 3e69e2b527d2..0e6fbab88820 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRegistrar.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRegistrar.java @@ -26,31 +26,31 @@ /** * Contains the {@link PipelineRunnerRegistrar} and {@link PipelineOptionsRegistrar} for the - * {@link org.apache.beam.runners.direct.DirectRunner}. + * {@link DirectRunner}. */ public class DirectRegistrar { private DirectRegistrar() {} /** - * Registers the {@link org.apache.beam.runners.direct.DirectRunner}. + * Registers the {@link DirectRunner}. */ @AutoService(PipelineRunnerRegistrar.class) public static class Runner implements PipelineRunnerRegistrar { @Override public Iterable>> getPipelineRunners() { return ImmutableList.>>of( - org.apache.beam.runners.direct.DirectRunner.class); + DirectRunner.class); } } /** - * Registers the {@link org.apache.beam.runners.direct.DirectOptions}. + * Registers the {@link DirectOptions}. */ @AutoService(PipelineOptionsRegistrar.class) public static class Options implements PipelineOptionsRegistrar { @Override public Iterable> getPipelineOptions() { return ImmutableList.>of( - org.apache.beam.runners.direct.DirectOptions.class); + DirectOptions.class); } } } diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java index b0ce5eb02845..181896f3b3fc 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java @@ -51,8 +51,14 @@ import org.joda.time.Duration; /** - * An In-Memory implementation of the Dataflow Programming Model. Supports Unbounded - * {@link PCollection PCollections}. + * A {@link PipelineRunner} that executes a {@link Pipeline} within the process that constructed the + * {@link Pipeline}. + * + *

The {@link DirectRunner} is suitable for running a {@link Pipeline} on small scale, example, + * and test data, and should be used for ensuring that processing logic is correct. It also + * is appropriate for executing unit tests and performs additional work to ensure that behavior + * contained within a {@link Pipeline} does not break assumptions within the Beam model, to improve + * the ability to execute a {@link Pipeline} at scale on a distributed backend. */ public class DirectRunner extends PipelineRunner { @@ -127,6 +133,9 @@ static BundleFactory bundleFactoryFor( private final Set enabledEnforcements; private Supplier clockSupplier = new NanosOffsetClockSupplier(); + /** + * Construct a {@link DirectRunner} from the provided options. + */ public static DirectRunner fromOptions(PipelineOptions options) { return new DirectRunner(options.as(DirectOptions.class)); } @@ -246,8 +255,6 @@ private List defaultTransformOverrides() { /** * The result of running a {@link Pipeline} with the {@link DirectRunner}. - * - *

Throws {@link UnsupportedOperationException} for all methods. */ public static class DirectPipelineResult implements PipelineResult { private final PipelineExecutor executor; @@ -274,14 +281,11 @@ public MetricResults metrics() { } /** - * Blocks until the {@link Pipeline} execution represented by this - * {@link DirectPipelineResult} is complete, returning the terminal state. + * {@inheritDoc}. * - *

If the pipeline terminates abnormally by throwing an exception, this will rethrow the - * exception. Future calls to {@link #getState()} will return - * {@link org.apache.beam.sdk.PipelineResult.State#FAILED}. - * - *

See also {@link PipelineExecutor#waitUntilFinish(Duration)}. + *

If the pipeline terminates abnormally by throwing an {@link Exception}, this will rethrow + * the original {@link Exception}. Future calls to {@link #getState()} will return {@link + * org.apache.beam.sdk.PipelineResult.State#FAILED}. */ @Override public State waitUntilFinish() { @@ -298,6 +302,13 @@ public State cancel() { return executor.getPipelineState(); } + /** + * {@inheritDoc}. + * + *

If the pipeline terminates abnormally by throwing an {@link Exception}, this will rethrow + * the original {@link Exception}. Future calls to {@link #getState()} will return {@link + * org.apache.beam.sdk.PipelineResult.State#FAILED}. + */ @Override public State waitUntilFinish(Duration duration) { State startState = this.state; From 970c58c9e35d7b9879f41527eadb98085a0dcb35 Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Tue, 9 May 2017 18:09:12 -0700 Subject: [PATCH 2/2] Update Coder Documentation --- .../apache/beam/sdk/coders/AtomicCoder.java | 17 ++++++++++++-- .../org/apache/beam/sdk/coders/Coder.java | 22 +++++++------------ .../apache/beam/sdk/coders/CustomCoder.java | 17 +++++--------- .../beam/sdk/coders/StructuredCoder.java | 10 +++++---- 4 files changed, 34 insertions(+), 32 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AtomicCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AtomicCoder.java index 7bcd5327c014..b244ed5f0d1e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AtomicCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AtomicCoder.java @@ -35,14 +35,22 @@ public abstract class AtomicCoder extends StructuredCoder { /** * {@inheritDoc}. * - * @throws NonDeterministicException + *

Unless overridden, does not throw. An {@link AtomicCoder} is presumed to be deterministic + * + * @throws NonDeterministicException if overridden to indicate that this sublcass of + * {@link AtomicCoder} is not deterministic */ @Override public void verifyDeterministic() throws NonDeterministicException {} + /** + * {@inheritDoc}. + * + * @return the empty list + */ @Override public List> getCoderArguments() { - return null; + return Collections.emptyList(); } /** @@ -65,6 +73,11 @@ public final boolean equals(Object other) { return other != null && this.getClass().equals(other.getClass()); } + /** + * {@inheritDoc}. + * + * @return the {@link #hashCode()} of the {@link Class} of this {@link AtomicCoder}. + */ @Override public final int hashCode() { return this.getClass().hashCode(); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java index 2ee532dfaa6e..edcc3a81c31d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java @@ -43,23 +43,17 @@ * byte streams. * *

{@link Coder} instances are serialized during job creation and deserialized - * before use, via JSON serialization. See {@link SerializableCoder} for an example of a - * {@link Coder} that adds a custom field to - * the {@link Coder} serialization. It provides a constructor annotated with - * {@link com.fasterxml.jackson.annotation.JsonCreator}, which is a factory method used when - * deserializing a {@link Coder} instance. + * before use. This will generally be performed by serializing the object via Java Serialization. * *

{@link Coder} classes for compound types are often composed from coder classes for types * contains therein. The composition of {@link Coder} instances into a coder for the compound * class is the subject of the {@link CoderProvider} type, which enables automatic generic - * composition of {@link Coder} classes within the {@link CoderRegistry}. With particular - * static methods on a compound {@link Coder} class, a {@link CoderProvider} can be automatically - * inferred. See {@link KvCoder} for an example of a simple compound {@link Coder} that supports - * automatic composition in the {@link CoderRegistry}. + * composition of {@link Coder} classes within the {@link CoderRegistry}. See {@link CoderProvider} + * and {@link CoderRegistry} for more information about how coders are inferred. * *

All methods of a {@link Coder} are required to be thread safe. * - * @param the type of the values being transcoded + * @param the type of values being encoded and decoded */ public abstract class Coder implements Serializable { /** The context in which encoding or decoding is being done. */ @@ -167,10 +161,10 @@ public T decode(InputStream inStream, Context context) } /** - * If this is a {@code Coder} for a parameterized type, returns the - * list of {@code Coder}s being used for each of the parameters, or - * returns {@code null} if this cannot be done or this is not a - * parameterized type. + * If this is a {@link Coder} for a parameterized type, returns the + * list of {@link Coder}s being used for each of the parameters in the same order they appear + * within the parameterized type's type signature. If this cannot be done, or this + * {@link Coder} does not encode/decode a parameterized type, returns the empty list. */ public abstract List> getCoderArguments(); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java index c58192373432..e8ce5b1d538f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java @@ -22,19 +22,12 @@ import java.util.List; /** - * An abstract base class for writing a {@link Coder} class that encodes itself via Java - * serialization. + * An abstract base class that implements all methods of {@link Coder} except {@link Coder#encode} + * and {@link Coder#decode}. * - *

To complete an implementation, subclasses must implement {@link Coder#encode} - * and {@link Coder#decode} methods. - * - *

Not to be confused with {@link SerializableCoder} that encodes objects that implement the - * {@link Serializable} interface. - * - * @param the type of elements handled by this coder + * @param the type of values being encoded and decoded */ -public abstract class CustomCoder extends Coder - implements Serializable { +public abstract class CustomCoder extends Coder implements Serializable { /** * {@inheritDoc}. @@ -61,5 +54,5 @@ public void verifyDeterministic() throws NonDeterministicException { // This coder inherits isRegisterByteSizeObserverCheap, // getEncodedElementByteSize and registerByteSizeObserver - // from StructuredCoder. Override if we can do better. + // from Coder. Override if we can do better. } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuredCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuredCoder.java index 42c0598aa985..a145215608d0 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuredCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuredCoder.java @@ -33,11 +33,11 @@ *

To extend {@link StructuredCoder}, override the following methods as appropriate: * *

    - *
  • {@link #getComponents}: the default implementation returns {@link #getCoderArguments}.
  • - *
  • {@link #getEncodedElementByteSize} and - * {@link #isRegisterByteSizeObserverCheap}: the + *
  • {@link #getComponents}: the default implementation returns {@link #getCoderArguments}. + *
  • {@link #getEncodedElementByteSize} and {@link #isRegisterByteSizeObserverCheap}: the * default implementation encodes values to bytes and counts the bytes, which is considered - * expensive.
  • + * expensive. The default element byte size observer uses the value returned by + * {@link #getEncodedElementByteSize}. *
*/ public abstract class StructuredCoder extends Coder { @@ -45,6 +45,8 @@ protected StructuredCoder() {} /** * Returns the list of {@link Coder Coders} that are components of this {@link Coder}. + * + *

The default components will be equal to the value returned by {@link #getCoderArguments()}. */ public List> getComponents() { List> coderArguments = getCoderArguments();