From 1c4264142247cfcc8c532463f5a1f2da6d6e5270 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Wed, 7 Dec 2016 14:28:39 -0800 Subject: [PATCH] Move CopyOnAccessStateInternals to runners/direct --- .../CopyOnAccessInMemoryStateInternals.java | 46 ++++++++++++------- .../direct/DirectExecutionContext.java | 1 - .../runners/direct/EvaluationContext.java | 1 - .../GroupAlsoByWindowEvaluatorFactory.java | 1 - .../beam/runners/direct/ParDoEvaluator.java | 1 - .../runners/direct/StepTransformResult.java | 1 - .../beam/runners/direct/TransformResult.java | 1 - ...opyOnAccessInMemoryStateInternalsTest.java | 12 ++++- .../runners/direct/EvaluationContextTest.java | 1 - .../StatefulParDoEvaluatorFactoryTest.java | 1 - .../util/state/InMemoryStateInternals.java | 33 +++++++++---- 11 files changed, 66 insertions(+), 33 deletions(-) rename {sdks/java/core/src/main/java/org/apache/beam/sdk/util/state => runners/direct-java/src/main/java/org/apache/beam/runners/direct}/CopyOnAccessInMemoryStateInternals.java (90%) rename {sdks/java/core/src/test/java/org/apache/beam/sdk/util/state => runners/direct-java/src/test/java/org/apache/beam/runners/direct}/CopyOnAccessInMemoryStateInternalsTest.java (97%) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/CopyOnAccessInMemoryStateInternals.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java similarity index 90% rename from sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/CopyOnAccessInMemoryStateInternals.java rename to runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java index 3ca00a910a96..e486a754a4ff 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/CopyOnAccessInMemoryStateInternals.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.util.state; +package org.apache.beam.runners.direct; import static com.google.common.base.Preconditions.checkState; @@ -32,8 +32,24 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; import org.apache.beam.sdk.util.CombineFnUtil; +import org.apache.beam.sdk.util.state.AccumulatorCombiningState; +import org.apache.beam.sdk.util.state.BagState; +import org.apache.beam.sdk.util.state.InMemoryStateInternals.InMemoryBag; +import org.apache.beam.sdk.util.state.InMemoryStateInternals.InMemoryCombiningValue; import org.apache.beam.sdk.util.state.InMemoryStateInternals.InMemoryState; +import org.apache.beam.sdk.util.state.InMemoryStateInternals.InMemoryStateBinder; +import org.apache.beam.sdk.util.state.InMemoryStateInternals.InMemoryValue; +import org.apache.beam.sdk.util.state.InMemoryStateInternals.InMemoryWatermarkHold; +import org.apache.beam.sdk.util.state.State; +import org.apache.beam.sdk.util.state.StateContext; +import org.apache.beam.sdk.util.state.StateContexts; +import org.apache.beam.sdk.util.state.StateInternals; +import org.apache.beam.sdk.util.state.StateNamespace; +import org.apache.beam.sdk.util.state.StateTable; +import org.apache.beam.sdk.util.state.StateTag; import org.apache.beam.sdk.util.state.StateTag.StateBinder; +import org.apache.beam.sdk.util.state.ValueState; +import org.apache.beam.sdk.util.state.WatermarkHoldState; import org.joda.time.Instant; /** @@ -262,11 +278,11 @@ public WatermarkHoldState bindWatermark( if (containedInUnderlying(namespace, address)) { @SuppressWarnings("unchecked") InMemoryState> existingState = - (InMemoryStateInternals.InMemoryState>) + (InMemoryState>) underlying.get().get(namespace, address, c); return existingState.copy(); } else { - return new InMemoryStateInternals.InMemoryWatermarkHold<>( + return new InMemoryWatermarkHold<>( outputTimeFn); } } @@ -277,11 +293,11 @@ public ValueState bindValue( if (containedInUnderlying(namespace, address)) { @SuppressWarnings("unchecked") InMemoryState> existingState = - (InMemoryStateInternals.InMemoryState>) + (InMemoryState>) underlying.get().get(namespace, address, c); return existingState.copy(); } else { - return new InMemoryStateInternals.InMemoryValue<>(); + return new InMemoryValue<>(); } } @@ -294,12 +310,11 @@ public ValueState bindValue( @SuppressWarnings("unchecked") InMemoryState> existingState = ( - InMemoryStateInternals - .InMemoryState>) underlying.get().get(namespace, address, c); + InMemoryState>) underlying.get().get(namespace, address, c); return existingState.copy(); } else { - return new InMemoryStateInternals.InMemoryCombiningValue<>( + return new InMemoryCombiningValue<>( key, combineFn.asKeyedFn()); } } @@ -310,11 +325,11 @@ public BagState bindBag( if (containedInUnderlying(namespace, address)) { @SuppressWarnings("unchecked") InMemoryState> existingState = - (InMemoryStateInternals.InMemoryState>) + (InMemoryState>) underlying.get().get(namespace, address, c); return existingState.copy(); } else { - return new InMemoryStateInternals.InMemoryBag<>(); + return new InMemoryBag<>(); } } @@ -328,12 +343,11 @@ public BagState bindBag( @SuppressWarnings("unchecked") InMemoryState> existingState = ( - InMemoryStateInternals - .InMemoryState>) underlying.get().get(namespace, address, c); + InMemoryState>) underlying.get().get(namespace, address, c); return existingState.copy(); } else { - return new InMemoryStateInternals.InMemoryCombiningValue<>(key, combineFn); + return new InMemoryCombiningValue<>(key, combineFn); } } @@ -446,7 +460,7 @@ public InMemoryStateBinderFactory(K key) { @Override public StateBinder forNamespace(StateNamespace namespace, StateContext c) { - return new InMemoryStateInternals.InMemoryStateBinder<>(key, c); + return new InMemoryStateBinder<>(key, c); } } } diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java index 8cec8f7417c8..c6051f08dfb1 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java @@ -23,7 +23,6 @@ import org.apache.beam.sdk.util.BaseExecutionContext; import org.apache.beam.sdk.util.ExecutionContext; import org.apache.beam.sdk.util.TimerInternals; -import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals; /** * Execution Context for the {@link DirectRunner}. diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java index b5a23d7cbc97..230d91b52dff 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java @@ -48,7 +48,6 @@ import org.apache.beam.sdk.util.TimerInternals.TimerData; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; -import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollection.IsBounded; import org.apache.beam.sdk.values.PCollectionView; diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java index 87cbbcde54f9..bb11923fd1cd 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java @@ -50,7 +50,6 @@ import org.apache.beam.sdk.util.WindowTracing; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; -import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java index 504ddc462aab..a915cf0bc990 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java @@ -35,7 +35,6 @@ import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; -import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java index d58b027dd8a1..01b2a7261f07 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java @@ -30,7 +30,6 @@ import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals; import org.joda.time.Instant; /** diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java index b4797b01ce15..8bb5f9355860 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java @@ -28,7 +28,6 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals; import org.joda.time.Instant; /** diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/CopyOnAccessInMemoryStateInternalsTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java similarity index 97% rename from sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/CopyOnAccessInMemoryStateInternalsTest.java rename to runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java index ad70bcafe753..deefc68489dc 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/CopyOnAccessInMemoryStateInternalsTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.util.state; +package org.apache.beam.runners.direct; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.emptyIterable; @@ -39,6 +39,16 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; +import org.apache.beam.sdk.util.state.AccumulatorCombiningState; +import org.apache.beam.sdk.util.state.BagState; +import org.apache.beam.sdk.util.state.CombiningState; +import org.apache.beam.sdk.util.state.StateNamespace; +import org.apache.beam.sdk.util.state.StateNamespaceForTest; +import org.apache.beam.sdk.util.state.StateNamespaces; +import org.apache.beam.sdk.util.state.StateTag; +import org.apache.beam.sdk.util.state.StateTags; +import org.apache.beam.sdk.util.state.ValueState; +import org.apache.beam.sdk.util.state.WatermarkHoldState; import org.joda.time.Instant; import org.junit.Rule; import org.junit.Test; diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java index a2bb15ecb037..f11c370ac9d2 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java @@ -58,7 +58,6 @@ import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.state.BagState; -import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals; import org.apache.beam.sdk.util.state.StateNamespaces; import org.apache.beam.sdk.util.state.StateTag; import org.apache.beam.sdk.util.state.StateTags; diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java index 06c85ef06914..7c086a1b629e 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java @@ -51,7 +51,6 @@ import org.apache.beam.sdk.util.ReadyCheckingSideInputReader; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; -import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals; import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.util.state.StateNamespace; import org.apache.beam.sdk.util.state.StateNamespaces; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryStateInternals.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryStateInternals.java index efb270cfa9ec..66118374d808 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryStateInternals.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryStateInternals.java @@ -56,7 +56,11 @@ public K getKey() { return key; } - interface InMemoryState> { + /** + * Interface common to all in-memory state cells. Includes ability to see whether a cell has been + * cleared and the ability to create a clone of the contents. + */ + public interface InMemoryState> { boolean isCleared(); T copy(); } @@ -94,11 +98,11 @@ public T state( /** * A {@link StateBinder} that returns In Memory {@link State} objects. */ - static class InMemoryStateBinder implements StateBinder { + public static class InMemoryStateBinder implements StateBinder { private final K key; private final StateContext c; - InMemoryStateBinder(K key, StateContext c) { + public InMemoryStateBinder(K key, StateContext c) { this.key = key; this.c = c; } @@ -150,7 +154,11 @@ public WatermarkHoldState bindWatermark( } } - static final class InMemoryValue implements ValueState, InMemoryState> { + /** + * An {@link InMemoryState} implementation of {@link ValueState}. + */ + public static final class InMemoryValue + implements ValueState, InMemoryState> { private boolean isCleared = true; private T value = null; @@ -194,7 +202,10 @@ public boolean isCleared() { } } - static final class InMemoryWatermarkHold + /** + * An {@link InMemoryState} implementation of {@link WatermarkHoldState}. + */ + public static final class InMemoryWatermarkHold implements WatermarkHoldState, InMemoryState> { private final OutputTimeFn outputTimeFn; @@ -267,7 +278,10 @@ public InMemoryWatermarkHold copy() { } } - static final class InMemoryCombiningValue + /** + * An {@link InMemoryState} implementation of {@link AccumulatorCombiningState}. + */ + public static final class InMemoryCombiningValue implements AccumulatorCombiningState, InMemoryState> { private final K key; @@ -275,7 +289,7 @@ static final class InMemoryCombiningValue private final KeyedCombineFn combineFn; private AccumT accum; - InMemoryCombiningValue( + public InMemoryCombiningValue( K key, KeyedCombineFn combineFn) { this.key = key; this.combineFn = combineFn; @@ -353,7 +367,10 @@ public InMemoryCombiningValue copy() { } } - static final class InMemoryBag implements BagState, InMemoryState> { + /** + * An {@link InMemoryState} implementation of {@link BagState}. + */ + public static final class InMemoryBag implements BagState, InMemoryState> { private List contents = new ArrayList<>(); @Override