From f4dfbb206382d3ea73881727aa8b0f74eaf98ef4 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Tue, 2 May 2017 19:31:22 -0700 Subject: [PATCH 1/7] Annotate internal methods of PCollection --- .../org/apache/beam/sdk/values/PCollection.java | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java index 034f0de67340..20e5d68946b4 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java @@ -22,6 +22,7 @@ import javax.annotation.Nullable; import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.CannotProvideCoderException.ReasonCode; import org.apache.beam.sdk.coders.Coder; @@ -330,30 +331,27 @@ public PCollection setTypeDescriptor(TypeDescriptor typeDescriptor) { } /** - * Sets the {@link WindowingStrategy} of this {@link PCollection}. - * - *

For use by primitive transformations only. + * For internal use only; no backwards-compatibility guarantees. */ + @Internal public PCollection setWindowingStrategyInternal(WindowingStrategy windowingStrategy) { this.windowingStrategy = windowingStrategy; return this; } /** - * Sets the {@link PCollection.IsBounded} of this {@link PCollection}. - * - *

For use by internal transformations only. + * For internal use only; no backwards-compatibility guarantees. */ + @Internal public PCollection setIsBoundedInternal(IsBounded isBounded) { this.isBounded = isBounded; return this; } /** - * Creates and returns a new {@link PCollection} for a primitive output. - * - *

For use by primitive transformations only. + * For internal use only; no backwards-compatibility guarantees. */ + @Internal public static PCollection createPrimitiveOutputInternal( Pipeline pipeline, WindowingStrategy windowingStrategy, From c1b26a1b53c334ab171fad60501ba67593fde5d2 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Tue, 2 May 2017 19:48:38 -0700 Subject: [PATCH 2/7] Annotate internal pieces of sdks.transforms --- .../sdk/transforms/AppliedPTransform.java | 7 ++-- .../beam/sdk/transforms/CombineFnBase.java | 22 ++++++++---- .../beam/sdk/transforms/Materialization.java | 6 +++- .../beam/sdk/transforms/Materializations.java | 11 ++++-- .../org/apache/beam/sdk/transforms/View.java | 36 ++++++++++++------- .../apache/beam/sdk/transforms/ViewFn.java | 6 +++- 6 files changed, 62 insertions(+), 26 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AppliedPTransform.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AppliedPTransform.java index bdb61b834026..4e049a590d7e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AppliedPTransform.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AppliedPTransform.java @@ -20,16 +20,14 @@ import com.google.auto.value.AutoValue; import java.util.Map; import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.POutput; import org.apache.beam.sdk.values.PValue; import org.apache.beam.sdk.values.TupleTag; /** - * Represents the application of a {@link PTransform} to a specific input to produce - * a specific output. - * - *

For internal use. + * For internal use only; no backwards-compatibility guarantees. * *

Inputs and outputs are stored in their expanded forms, as the condensed form of a composite * {@link PInput} or {@link POutput} is a language-specific concept, and {@link AppliedPTransform} @@ -40,6 +38,7 @@ * @param transform output type * @param transform type */ +@Internal @AutoValue public abstract class AppliedPTransform< InputT extends PInput, OutputT extends POutput, diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFnBase.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFnBase.java index a88109968e35..29990cd60b00 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFnBase.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFnBase.java @@ -21,6 +21,7 @@ import java.io.Serializable; import java.lang.reflect.Type; import java.lang.reflect.TypeVariable; +import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderRegistry; @@ -31,14 +32,19 @@ import org.apache.beam.sdk.values.TypeDescriptor; /** - * This class contains the shared interfaces and abstract classes for different types of combine + * For internal use only; no backwards-compatibility guarantees. + * + *

This class contains the shared interfaces and abstract classes for different types of combine * functions. * *

Users should not implement or extend them directly. */ +@Internal public class CombineFnBase { /** - * A {@code GloballyCombineFn} specifies how to combine a + * For internal use only; no backwards-compatibility guarantees. + * + *

A {@code GloballyCombineFn} specifies how to combine a * collection of input values of type {@code InputT} into a single * output value of type {@code OutputT}. It does this via one or more * intermediate mutable accumulator values of type {@code AccumT}. @@ -50,6 +56,7 @@ public class CombineFnBase { * @param type of mutable accumulator values * @param type of output values */ + @Internal public interface GlobalCombineFn extends Serializable, HasDisplayData { /** @@ -93,16 +100,19 @@ Coder getDefaultOutputCoder(CoderRegistry registry, Coder input } /** - * An abstract {@link GlobalCombineFn} base class shared by - * {@link CombineFn} and {@link CombineFnWithContext}. + * For internal use only; no backwards-compatibility guarantees. * - *

Do not extend this class directly. - * Extends {@link CombineFn} and {@link CombineFnWithContext} instead. + *

An abstract {@link GlobalCombineFn} base class shared by {@link CombineFn} and {@link + * CombineFnWithContext}. + * + *

Do not extend this class directly. Extends {@link CombineFn} and {@link + * CombineFnWithContext} instead. * * @param type of input values * @param type of mutable accumulator values * @param type of output values */ + @Internal abstract static class AbstractGlobalCombineFn implements GlobalCombineFn, Serializable { private static final String INCOMPATIBLE_GLOBAL_WINDOW_ERROR_MESSAGE = diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Materialization.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Materialization.java index 7cd6256d9f00..6fb8c297f547 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Materialization.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Materialization.java @@ -18,15 +18,19 @@ package org.apache.beam.sdk.transforms; +import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.runners.PipelineRunner; /** - * How a view should be physically materialized by a {@link PipelineRunner}. + * For internal use only; no backwards-compatibility guarantees. + * + *

How a view should be physically materialized by a {@link PipelineRunner}. * *

A {@link PipelineRunner} will support some set of materializations, and will reject * {@link ViewFn ViewFns} that require materializations it does not support. See * {@link Materializations} for known implementations. */ +@Internal public interface Materialization { /** * Gets the URN describing this {@link Materialization}. This is a stable, SDK-independent URN diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Materializations.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Materializations.java index 35925fae5f7a..6e4f83d6f6a1 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Materializations.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Materializations.java @@ -20,12 +20,16 @@ import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; +import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.util.WindowedValue; /** - * Utility methods for constructing known {@link Materialization materializations} for a + * For internal use only; no backwards-compatibility guarantees. + * + *

Utility methods for constructing known {@link Materialization materializations} for a * {@link ViewFn}. */ +@Internal public class Materializations { /** * The URN for a {@link Materialization} where the primitive view type is an iterable of fully @@ -36,9 +40,12 @@ public class Materializations { "urn:beam:sideinput:materialization:iterable:0.1"; /** - * A {@link Materialization} where the primitive view type is an iterable of fully specified + * For internal use only; no backwards-compatibility guarantees. + * + *

A {@link Materialization} where the primitive view type is an iterable of fully specified * windowed values. */ + @Internal public static Materialization>> iterable() { return new IterableMaterialization<>(); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java index b3b8918d411b..d17d423441a9 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java @@ -19,6 +19,7 @@ import java.util.List; import java.util.Map; +import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.runners.PipelineRunner; @@ -237,11 +238,13 @@ public static AsMultimap asMultimap() { } /** - * Not intended for direct use by pipeline authors; public only so a {@link PipelineRunner} may - * override its behavior. + * For internal use only; no backwards-compatibility guarantees. + * + *

Public only so a {@link PipelineRunner} may override its behavior. * *

See {@link View#asList()}. */ + @Internal public static class AsList extends PTransform, PCollectionView>> { private AsList() { } @@ -259,11 +262,13 @@ public PCollectionView> expand(PCollection input) { } /** - * Not intended for direct use by pipeline authors; public only so a {@link PipelineRunner} may - * override its behavior. + * For internal use only; no backwards-compatibility guarantees. + * + *

Public only so a {@link PipelineRunner} may override its behavior. * *

See {@link View#asIterable()}. */ + @Internal public static class AsIterable extends PTransform, PCollectionView>> { private AsIterable() { } @@ -282,11 +287,13 @@ public PCollectionView> expand(PCollection input) { } /** - * Not intended for direct use by pipeline authors; public only so a {@link PipelineRunner} may - * override its behavior. + * For internal use only; no backwards-compatibility guarantees. + * + *

Public only so a {@link PipelineRunner} may override its behavior. * *

See {@link View#asSingleton()}. */ + @Internal public static class AsSingleton extends PTransform, PCollectionView> { private final T defaultValue; private final boolean hasDefault; @@ -396,11 +403,13 @@ public T identity() { } /** - * Not intended for direct use by pipeline authors; public only so a {@link PipelineRunner} may - * override its behavior. + * For internal use only; no backwards-compatibility guarantees. + * + *

Public only so a {@link PipelineRunner} may override its behavior. * *

See {@link View#asMultimap()}. */ + @Internal public static class AsMultimap extends PTransform>, PCollectionView>>> { private AsMultimap() { } @@ -422,11 +431,13 @@ public PCollectionView>> expand(PCollection> input) } /** - * Not intended for direct use by pipeline authors; public only so a {@link PipelineRunner} may - * override its behavior. + * For internal use only; no backwards-compatibility guarantees. + * + *

Public only so a {@link PipelineRunner} may override its behavior. * *

See {@link View#asMap()}. */ + @Internal public static class AsMap extends PTransform>, PCollectionView>> { private AsMap() { } @@ -459,13 +470,14 @@ public PCollectionView> expand(PCollection> input) { // Internal details below /** - * Creates a primitive {@link PCollectionView}. + * For internal use only; no backwards-compatibility guarantees. * - *

For internal use only by runner implementors. + *

Creates a primitive {@link PCollectionView}. * * @param The type of the elements of the input PCollection * @param The type associated with the {@link PCollectionView} used as a side input */ + @Internal public static class CreatePCollectionView extends PTransform, PCollectionView> { private PCollectionView view; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ViewFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ViewFn.java index cdfcb88652dc..d51a9171035f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ViewFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ViewFn.java @@ -18,11 +18,14 @@ package org.apache.beam.sdk.transforms; import java.io.Serializable; +import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; /** - * A function to adapt a primitive "view" of a {@link PCollection} - some materialization + * For internal use only; no backwards-compatibility guarantees. + * + *

A function to adapt a primitive "view" of a {@link PCollection} - some materialization * specified in the Beam model and implemented by the runner - to a user-facing view type * for side input. * @@ -36,6 +39,7 @@ * @param the type of the underlying primitive view, provided by the runner * {@code } the type of the value(s) accessible via this {@link PCollectionView} */ +@Internal public abstract class ViewFn implements Serializable { /** * Gets the materialization of this {@link ViewFn}. From 49cf433c5c08f3cc91512aa9544a36a5d3e84333 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Tue, 2 May 2017 19:59:32 -0700 Subject: [PATCH 3/7] Tighten access control and internal annotations for triggers --- .../sdk/transforms/windowing/AfterAll.java | 4 +- .../sdk/transforms/windowing/AfterEach.java | 2 +- .../sdk/transforms/windowing/AfterFirst.java | 2 +- .../sdk/transforms/windowing/AfterPane.java | 2 +- .../windowing/AfterProcessingTime.java | 2 +- .../transforms/windowing/AfterWatermark.java | 4 +- .../transforms/windowing/DefaultTrigger.java | 2 +- .../beam/sdk/transforms/windowing/Never.java | 2 +- .../windowing/OrFinallyTrigger.java | 2 +- .../sdk/transforms/windowing/Repeatedly.java | 2 +- .../windowing/TimestampTransform.java | 41 +++++++++++++++---- .../sdk/transforms/windowing/Trigger.java | 18 ++++++-- 12 files changed, 62 insertions(+), 21 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java index 2747311dff98..eb0a7acbcac6 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java @@ -23,6 +23,7 @@ import java.util.Arrays; import java.util.List; import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger; import org.joda.time.Instant; @@ -51,6 +52,7 @@ public static AfterAll of(List triggers) { return new AfterAll(triggers); } + @Internal @Override public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { // This trigger will fire after the latest of its sub-triggers. @@ -65,7 +67,7 @@ public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { } @Override - public OnceTrigger getContinuationTrigger(List continuationTriggers) { + protected OnceTrigger getContinuationTrigger(List continuationTriggers) { return new AfterAll(continuationTriggers); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java index 56a9d14a2487..1fc4fbf52a97 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java @@ -72,7 +72,7 @@ public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { } @Override - public Trigger getContinuationTrigger(List continuationTriggers) { + protected Trigger getContinuationTrigger(List continuationTriggers) { return Repeatedly.forever(new AfterFirst(continuationTriggers)); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java index 79fd6391d60a..f0beb0a6d82b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java @@ -66,7 +66,7 @@ public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { } @Override - public OnceTrigger getContinuationTrigger(List continuationTriggers) { + protected OnceTrigger getContinuationTrigger(List continuationTriggers) { return new AfterFirst(continuationTriggers); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java index 25c55936a223..eade95d1350a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java @@ -61,7 +61,7 @@ public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { } @Override - public OnceTrigger getContinuationTrigger(List continuationTriggers) { + protected OnceTrigger getContinuationTrigger(List continuationTriggers) { return AfterPane.elementCountAtLeast(1); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java index eda269adf248..cc7ec137b87f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java @@ -40,7 +40,7 @@ public class AfterProcessingTime extends OnceTrigger { private final List timestampTransforms; - public AfterProcessingTime(List timestampTransforms) { + private AfterProcessingTime(List timestampTransforms) { super(null); this.timestampTransforms = timestampTransforms; } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java index 6825ab08d1fc..14a8c98a7787 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java @@ -88,7 +88,7 @@ public OnceTrigger getLateTrigger() { } @SuppressWarnings("unchecked") - public AfterWatermarkEarlyAndLate(OnceTrigger earlyTrigger, OnceTrigger lateTrigger) { + private AfterWatermarkEarlyAndLate(OnceTrigger earlyTrigger, OnceTrigger lateTrigger) { super(lateTrigger == null ? ImmutableList.of(earlyTrigger) : ImmutableList.of(earlyTrigger, lateTrigger)); @@ -178,7 +178,7 @@ public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { } @Override - public FromEndOfWindow getContinuationTrigger(List continuationTriggers) { + protected FromEndOfWindow getContinuationTrigger(List continuationTriggers) { return this; } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/DefaultTrigger.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/DefaultTrigger.java index a649b4ff1f51..78f373540fd2 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/DefaultTrigger.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/DefaultTrigger.java @@ -51,7 +51,7 @@ public boolean isCompatible(Trigger other) { } @Override - public Trigger getContinuationTrigger(List continuationTriggers) { + protected Trigger getContinuationTrigger(List continuationTriggers) { return this; } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java index 664ae83460e8..6dfeea72067f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java @@ -43,7 +43,7 @@ public static NeverTrigger ever() { * The actual trigger class for {@link Never} triggers. */ public static class NeverTrigger extends OnceTrigger { - protected NeverTrigger() { + private NeverTrigger() { super(null); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java index 1ed9b550ca99..ad0de47e0bbb 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java @@ -58,7 +58,7 @@ public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { } @Override - public Trigger getContinuationTrigger(List continuationTriggers) { + protected Trigger getContinuationTrigger(List continuationTriggers) { // Use OrFinallyTrigger instead of AfterFirst because the continuation of ACTUAL // may not be a OnceTrigger. return Repeatedly.forever( diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Repeatedly.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Repeatedly.java index 40591e3f8bd4..78b79c74fbab 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Repeatedly.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Repeatedly.java @@ -66,7 +66,7 @@ public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { } @Override - public Trigger getContinuationTrigger(List continuationTriggers) { + protected Trigger getContinuationTrigger(List continuationTriggers) { return new Repeatedly(continuationTriggers.get(REPEATED)); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/TimestampTransform.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/TimestampTransform.java index 5318592f5141..8bdf6ee0d486 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/TimestampTransform.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/TimestampTransform.java @@ -19,37 +19,59 @@ import com.google.auto.value.AutoValue; import java.io.Serializable; +import org.apache.beam.sdk.annotations.Internal; import org.joda.time.Duration; import org.joda.time.Instant; -/** An abstract description of a standardized transformation on timestamps. */ +/** + * For internal use only; no backwards-compatibility guarantees. + * + *

An abstract description of a standardized transformation on timestamps. + */ +@Internal public abstract class TimestampTransform implements Serializable{ - /** Returns a transform that shifts a timestamp later by {@code delay}. */ + TimestampTransform() {} + + /** + * For internal use only; no backwards-compatibility guarantees. + * + *

Returns a transform that shifts a timestamp later by {@code delay}. + */ + @Internal public static TimestampTransform delay(Duration delay) { return new AutoValue_TimestampTransform_Delay(delay); } /** - * Returns a transform that aligns a timestamp to the next boundary of {@code period}, starting + * For internal use only; no backwards-compatibility guarantees. + * + *

Returns a transform that aligns a timestamp to the next boundary of {@code period}, starting * from {@code offset}. */ + @Internal public static TimestampTransform alignTo(Duration period, Instant offset) { return new AutoValue_TimestampTransform_AlignTo(period, offset); } /** - * Returns a transform that aligns a timestamp to the next boundary of {@code period}, starting + * For internal use only; no backwards-compatibility guarantees. + * + *

Returns a transform that aligns a timestamp to the next boundary of {@code period}, starting * from the start of the epoch. */ + @Internal public static TimestampTransform alignTo(Duration period) { return alignTo(period, new Instant(0)); } /** - * Represents the transform that aligns a timestamp to the next boundary of {@link #getPeriod()} - * start at {@link #getOffset()}. + * For internal use only; no backwards-compatibility guarantees. + * + *

Represents the transform that aligns a timestamp to the next boundary of {@link + * #getPeriod()} start at {@link #getOffset()}. */ + @Internal @AutoValue public abstract static class AlignTo extends TimestampTransform { public abstract Duration getPeriod(); @@ -57,7 +79,12 @@ public abstract static class AlignTo extends TimestampTransform { public abstract Instant getOffset(); } - /** Represents the transform that delays a timestamp by {@link #getDelay()}. */ + /** + * For internal use only; no backwards-compatibility guarantees. + * + *

Represents the transform that delays a timestamp by {@link #getDelay()}. + */ + @Internal @AutoValue public abstract static class Delay extends TimestampTransform { public abstract Duration getDelay(); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java index 07d30779b079..519ab6789857 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Objects; import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.transforms.GroupByKey; import org.joda.time.Instant; @@ -117,8 +118,10 @@ public Trigger getContinuationTrigger() { protected abstract Trigger getContinuationTrigger(List continuationTriggers); /** - * Returns a bound in event time by which this trigger would have fired at least once for a given - * window had there been input data. + * For internal use only; no backwards-compatibility guarantees. + * + *

Returns a bound in event time by which this trigger would have fired at least once for a + * given window had there been input data. * *

For triggers that do not fire based on the watermark advancing, returns {@link * BoundedWindow#TIMESTAMP_MAX_VALUE}. @@ -126,9 +129,15 @@ public Trigger getContinuationTrigger() { *

This estimate may be used, for example, to determine that there are no elements in a * side-input window, which causes the default value to be used instead. */ + @Internal public abstract Instant getWatermarkThatGuaranteesFiring(BoundedWindow window); - /** Returns whether this performs the same triggering as the given {@link Trigger}. */ + /** + * For internal use only; no backwards-compatibility guarantees. + * + *

Returns whether this performs the same triggering as the given {@link Trigger}. + */ + @Internal public boolean isCompatible(Trigger other) { if (!getClass().equals(other.getClass())) { return false; @@ -208,9 +217,12 @@ public OrFinallyTrigger orFinally(OnceTrigger until) { } /** + * For internal use only; no backwards-compatibility guarantees. + * * {@link Trigger Triggers} that are guaranteed to fire at most once should extend {@link * OnceTrigger} rather than the general {@link Trigger} class to indicate that behavior. */ + @Internal public abstract static class OnceTrigger extends Trigger { protected OnceTrigger(List subTriggers) { super(subTriggers); From 9b8a4e5c4b876d4459c64a9bffee613aeae72fb2 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Tue, 2 May 2017 20:05:34 -0700 Subject: [PATCH 4/7] The transforms.reflect package is not for users --- .../org/apache/beam/sdk/transforms/reflect/package-info.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/package-info.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/package-info.java index 4df5209e6b39..fe2f6b1f8cad 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/package-info.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/package-info.java @@ -16,8 +16,11 @@ * limitations under the License. */ /** - * Defines reflection-based utilities for analyzing {@link org.apache.beam.sdk.transforms.DoFn}'s + * For internal use only; no backwards-compatibility guarantees. + * + *

Defines reflection-based utilities for analyzing {@link org.apache.beam.sdk.transforms.DoFn}'s * and creating {@link org.apache.beam.sdk.transforms.reflect.DoFnSignature}'s and * {@link org.apache.beam.sdk.transforms.reflect.DoFnInvoker}'s from them. */ package org.apache.beam.sdk.transforms.reflect; + From fe51cc0d1a8aa14adbee81b220f9ca8a442f26fe Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Tue, 2 May 2017 20:05:45 -0700 Subject: [PATCH 5/7] Annotate internal-only bits of Java sdk.runners --- .../java/org/apache/beam/sdk/runners/PTransformMatcher.java | 6 +++++- .../org/apache/beam/sdk/runners/PTransformOverride.java | 6 +++++- .../apache/beam/sdk/runners/PTransformOverrideFactory.java | 6 +++++- .../apache/beam/sdk/runners/PipelineRunnerRegistrar.java | 4 ++++ .../org/apache/beam/sdk/runners/TransformHierarchy.java | 6 +++++- 5 files changed, 24 insertions(+), 4 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PTransformMatcher.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PTransformMatcher.java index 30dca6d1dcee..6378ecc82f6e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PTransformMatcher.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PTransformMatcher.java @@ -20,12 +20,16 @@ import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; +import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; /** - * Matches applications of {@link PTransform PTransforms}. + * For internal use only; no backwards-compatibility guarantees. + * + *

Matches applications of {@link PTransform PTransforms}. */ +@Internal @Experimental(Kind.CORE_RUNNERS_ONLY) public interface PTransformMatcher { boolean matches(AppliedPTransform application); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PTransformOverride.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PTransformOverride.java index 33b9114abd57..2820364d82eb 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PTransformOverride.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PTransformOverride.java @@ -19,12 +19,16 @@ package org.apache.beam.sdk.runners; import com.google.auto.value.AutoValue; +import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.transforms.PTransform; /** - * A {@link PTransformMatcher} and associated {@link PTransformOverrideFactory} to replace all + * For internal use only; no backwards-compatibility guarantees. + * + *

A {@link PTransformMatcher} and associated {@link PTransformOverrideFactory} to replace all * matching {@link PTransform PTransforms}. */ +@Internal @AutoValue public abstract class PTransformOverride { public static PTransformOverride of( diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PTransformOverrideFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PTransformOverrideFactory.java index 786c61c95aa1..a28f303688d6 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PTransformOverrideFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PTransformOverrideFactory.java @@ -23,6 +23,7 @@ import java.util.Map; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; +import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PInput; @@ -32,9 +33,12 @@ import org.apache.beam.sdk.values.TupleTag; /** - * Produces {@link PipelineRunner}-specific overrides of {@link PTransform PTransforms}, and + * For internal use only; no backwards-compatibility guarantees. + * + *

Produces {@link PipelineRunner}-specific overrides of {@link PTransform PTransforms}, and * provides mappings between original and replacement outputs. */ +@Internal @Experimental(Kind.CORE_RUNNERS_ONLY) public interface PTransformOverrideFactory< InputT extends PInput, diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunnerRegistrar.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunnerRegistrar.java index be95044bb6c5..41fd6f0029ba 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunnerRegistrar.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunnerRegistrar.java @@ -19,8 +19,11 @@ import com.google.auto.service.AutoService; import java.util.ServiceLoader; +import org.apache.beam.sdk.annotations.Internal; /** + * For internal use only; no backwards-compatibility guarantees. + * * {@link PipelineRunner} creators have the ability to automatically have their * {@link PipelineRunner} registered with this SDK by creating a {@link ServiceLoader} entry * and a concrete implementation of this interface. @@ -33,6 +36,7 @@ *

It is optional but recommended to use one of the many build time tools such as * {@link AutoService} to generate the necessary META-INF files automatically. */ +@Internal public interface PipelineRunnerRegistrar { /** * Get the set of {@link PipelineRunner PipelineRunners} to register. diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java index 18bf2e95e8b9..92361942fc8a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java @@ -36,6 +36,7 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.Pipeline.PipelineVisitor; import org.apache.beam.sdk.Pipeline.PipelineVisitor.CompositeBehavior; +import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.runners.PTransformOverrideFactory.ReplacementOutput; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; @@ -47,9 +48,12 @@ import org.slf4j.LoggerFactory; /** - * Captures information about a collection of transformations and their + * For internal use only; no backwards-compatibility guarantees. + * + *

Captures information about a collection of transformations and their * associated {@link PValue}s. */ +@Internal public class TransformHierarchy { private static final Logger LOG = LoggerFactory.getLogger(TransformHierarchy.class); From 58298d866fe9d1f4fcaf2ccda3078809f4d55b27 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Wed, 3 May 2017 10:10:07 -0700 Subject: [PATCH 6/7] Tighten access in sdk.options --- .../{ValueProviderUtils.java => ValueProviders.java} | 4 ++-- .../org/apache/beam/sdk/options/ValueProviderTest.java | 4 ++-- ...eProviderUtilsTest.java => ValueProvidersTest.java} | 10 +++++----- 3 files changed, 9 insertions(+), 9 deletions(-) rename sdks/java/core/src/main/java/org/apache/beam/sdk/options/{ValueProviderUtils.java => ValueProviders.java} (96%) rename sdks/java/core/src/test/java/org/apache/beam/sdk/options/{ValueProviderUtilsTest.java => ValueProvidersTest.java} (90%) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProviderUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProviders.java similarity index 96% rename from sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProviderUtils.java rename to sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProviders.java index 14a5f2391a66..d034b819f40f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProviderUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProviders.java @@ -28,8 +28,8 @@ /** * Utilities for working with the {@link ValueProvider} interface. */ -public class ValueProviderUtils { - private ValueProviderUtils() {} +class ValueProviders { + private ValueProviders() {} /** * Given {@code serializedOptions} as a JSON-serialized {@link PipelineOptions}, updates diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ValueProviderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ValueProviderTest.java index 383de535299d..9369ae6957c1 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ValueProviderTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ValueProviderTest.java @@ -199,7 +199,7 @@ public void testSerializeDeserializeNoArg() throws Exception { ObjectMapper mapper = new ObjectMapper(); String serializedOptions = mapper.writeValueAsString(submitOptions); - String runnerString = ValueProviderUtils.updateSerializedOptions( + String runnerString = ValueProviders.updateSerializedOptions( serializedOptions, ImmutableMap.of("foo", "quux")); TestOptions runtime = mapper.readValue(runnerString, PipelineOptions.class) .as(TestOptions.class); @@ -218,7 +218,7 @@ public void testSerializeDeserializeWithArg() throws Exception { ObjectMapper mapper = new ObjectMapper(); String serializedOptions = mapper.writeValueAsString(submitOptions); - String runnerString = ValueProviderUtils.updateSerializedOptions( + String runnerString = ValueProviders.updateSerializedOptions( serializedOptions, ImmutableMap.of("foo", "quux")); TestOptions runtime = mapper.readValue(runnerString, PipelineOptions.class) .as(TestOptions.class); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ValueProviderUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ValueProvidersTest.java similarity index 90% rename from sdks/java/core/src/test/java/org/apache/beam/sdk/options/ValueProviderUtilsTest.java rename to sdks/java/core/src/test/java/org/apache/beam/sdk/options/ValueProvidersTest.java index e09f4ad3af2e..14f86bc279e0 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ValueProviderUtilsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ValueProvidersTest.java @@ -26,9 +26,9 @@ import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -/** Tests for {@link ValueProviderUtils}. */ +/** Tests for {@link ValueProviders}. */ @RunWith(JUnit4.class) -public class ValueProviderUtilsTest { +public class ValueProvidersTest { /** A test interface. */ public interface TestOptions extends PipelineOptions { String getString(); @@ -43,7 +43,7 @@ public void testUpdateSerialize() throws Exception { TestOptions submitOptions = PipelineOptionsFactory.as(TestOptions.class); ObjectMapper mapper = new ObjectMapper(); String serializedOptions = mapper.writeValueAsString(submitOptions); - String updatedOptions = ValueProviderUtils.updateSerializedOptions( + String updatedOptions = ValueProviders.updateSerializedOptions( serializedOptions, ImmutableMap.of("string", "bar")); TestOptions runtime = mapper.readValue(updatedOptions, PipelineOptions.class) .as(TestOptions.class); @@ -56,7 +56,7 @@ public void testUpdateSerializeExistingValue() throws Exception { "--string=baz", "--otherString=quux").as(TestOptions.class); ObjectMapper mapper = new ObjectMapper(); String serializedOptions = mapper.writeValueAsString(submitOptions); - String updatedOptions = ValueProviderUtils.updateSerializedOptions( + String updatedOptions = ValueProviders.updateSerializedOptions( serializedOptions, ImmutableMap.of("string", "bar")); TestOptions runtime = mapper.readValue(updatedOptions, PipelineOptions.class) .as(TestOptions.class); @@ -69,7 +69,7 @@ public void testUpdateSerializeEmptyUpdate() throws Exception { TestOptions submitOptions = PipelineOptionsFactory.as(TestOptions.class); ObjectMapper mapper = new ObjectMapper(); String serializedOptions = mapper.writeValueAsString(submitOptions); - String updatedOptions = ValueProviderUtils.updateSerializedOptions( + String updatedOptions = ValueProviders.updateSerializedOptions( serializedOptions, ImmutableMap.of()); TestOptions runtime = mapper.readValue(updatedOptions, PipelineOptions.class) .as(TestOptions.class); From 362d0be79222ad67f1639d54434c1505ef76752b Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Wed, 3 May 2017 10:13:15 -0700 Subject: [PATCH 7/7] Annotate internal methods on Pipeline --- .../java/org/apache/beam/sdk/Pipeline.java | 29 ++++++++++++++----- 1 file changed, 22 insertions(+), 7 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java index 351e1b8170dd..6b15f0d3f655 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java @@ -28,6 +28,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.options.PipelineOptions; @@ -190,12 +191,15 @@ public OutputT apply( } /** - * Replaces all nodes that match a {@link PTransformOverride} in this pipeline. Overrides are + * For internal use only; no backwards-compatibility guarantees. + * + *

Replaces all nodes that match a {@link PTransformOverride} in this pipeline. Overrides are * applied in the order they are present within the list. * *

After all nodes are replaced, ensures that no nodes in the updated graph match any of the * overrides. */ + @Internal public void replaceAll(List overrides) { for (PTransformOverride override : overrides) { replace(override); @@ -334,10 +338,12 @@ public void setCoderRegistry(CoderRegistry coderRegistry) { } /** - * A {@link PipelineVisitor} can be passed into - * {@link Pipeline#traverseTopologically} to be called for each of the - * transforms and values in the {@link Pipeline}. + * For internal use only; no backwards-compatibility guarantees. + * + *

A {@link PipelineVisitor} can be passed into {@link Pipeline#traverseTopologically} to be + * called for each of the transforms and values in the {@link Pipeline}. */ + @Internal public interface PipelineVisitor { /** * Called for each composite transform after all topological predecessors have been visited @@ -396,7 +402,9 @@ public void visitValue(PValue value, TransformHierarchy.Node producer) { } } /** - * Invokes the {@link PipelineVisitor PipelineVisitor's} + * For internal use only; no backwards-compatibility guarantees. + * + *

Invokes the {@link PipelineVisitor PipelineVisitor's} * {@link PipelineVisitor#visitPrimitiveTransform} and * {@link PipelineVisitor#visitValue} operations on each of this * {@link Pipeline Pipeline's} transform and value nodes, in forward @@ -408,14 +416,18 @@ public void visitValue(PValue value, TransformHierarchy.Node producer) { } * *

Typically invoked by {@link PipelineRunner} subclasses. */ + @Internal public void traverseTopologically(PipelineVisitor visitor) { transforms.visit(visitor); } /** - * Like {@link #applyTransform(String, PInput, PTransform)} but defaulting to the name + * For internal use only; no backwards-compatibility guarantees. + * + *

Like {@link #applyTransform(String, PInput, PTransform)} but defaulting to the name * provided by the {@link PTransform}. */ + @Internal public static OutputT applyTransform(InputT input, PTransform transform) { @@ -423,7 +435,9 @@ OutputT applyTransform(InputT input, } /** - * Applies the given {@code PTransform} to this input {@code InputT} and returns + * For internal use only; no backwards-compatibility guarantees. + * + *

Applies the given {@code PTransform} to this input {@code InputT} and returns * its {@code OutputT}. This uses {@code name} to identify this specific application * of the transform. This name is used in various places, including the monitoring UI, * logging, and to stably identify this application node in the {@link Pipeline} graph during @@ -432,6 +446,7 @@ OutputT applyTransform(InputT input, *

Each {@link PInput} subclass that provides an {@code apply} method should delegate to * this method to ensure proper registration with the {@link PipelineRunner}. */ + @Internal public static OutputT applyTransform(String name, InputT input, PTransform transform) {