From 98a75551064c742d108d8c5ec8fc0783db7761d2 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Mon, 22 May 2017 15:28:44 -0700 Subject: [PATCH 01/15] Move StepContext to top level --- .../translation/utils/NoOpStepContext.java | 6 +- .../runners/core/BaseExecutionContext.java | 8 +-- .../apache/beam/runners/core/DoFnRunners.java | 1 - .../beam/runners/core/ExecutionContext.java | 47 ------------- .../beam/runners/core/SimpleDoFnRunner.java | 1 - .../apache/beam/runners/core/StepContext.java | 70 +++++++++++++++++++ .../functions/FlinkNoOpStepContext.java | 2 +- .../wrappers/streaming/DoFnOperator.java | 7 +- .../translation/SparkProcessContext.java | 2 +- .../beam/fn/harness/fake/FakeStepContext.java | 2 +- 10 files changed, 83 insertions(+), 63 deletions(-) create mode 100644 runners/core-java/src/main/java/org/apache/beam/runners/core/StepContext.java diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java index 721eecd3d626..241a9856736d 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java @@ -19,8 +19,8 @@ import java.io.IOException; import java.io.Serializable; -import org.apache.beam.runners.core.ExecutionContext; import org.apache.beam.runners.core.StateInternals; +import org.apache.beam.runners.core.StepContext; import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -28,9 +28,9 @@ import org.apache.beam.sdk.values.TupleTag; /** - * Serializable {@link ExecutionContext.StepContext} that does nothing. + * Serializable {@link StepContext} that does nothing. */ -public class NoOpStepContext implements ExecutionContext.StepContext, Serializable { +public class NoOpStepContext implements StepContext, Serializable { private static final long serialVersionUID = 1L; @Override diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java index 23d61f89b1db..ed3714372b47 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java @@ -49,7 +49,7 @@ * {@link #getOrCreateStepContext(String, String)}, and {@link #getAllStepContexts()} * will be appropriately specialized. */ -public abstract class BaseExecutionContext +public abstract class BaseExecutionContext implements ExecutionContext { private Map cachedStepContexts = new LinkedHashMap<>(); @@ -81,7 +81,7 @@ public T create() { * Factory method interface to create an execution context if none exists during * {@link #getOrCreateStepContext(String, CreateStepContextFunction)}. */ - protected interface CreateStepContextFunction { + protected interface CreateStepContextFunction { T create(); } @@ -111,12 +111,12 @@ public void noteOutput(WindowedValue output) {} public void noteOutput(TupleTag tag, WindowedValue output) {} /** - * Base class for implementations of {@link ExecutionContext.StepContext}. + * Base class for implementations of {@link org.apache.beam.runners.core.StepContext}. * *

To complete a concrete subclass, implement {@link #timerInternals} and * {@link #stateInternals}. */ - public abstract static class StepContext implements ExecutionContext.StepContext { + public abstract static class StepContext implements org.apache.beam.runners.core.StepContext { private final ExecutionContext executionContext; private final String stepName; private final String transformName; diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java index 71dfd11a496d..9d3e25dbb310 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java @@ -19,7 +19,6 @@ import java.util.Collection; import java.util.List; -import org.apache.beam.runners.core.ExecutionContext.StepContext; import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems.ProcessFn; import org.apache.beam.runners.core.StatefulDoFnRunner.CleanupTimer; import org.apache.beam.runners.core.StatefulDoFnRunner.StateCleaner; diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java index d2fdaac50982..f431c92859ec 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java @@ -17,11 +17,8 @@ */ package org.apache.beam.runners.core; -import java.io.IOException; import java.util.Collection; -import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.DoFn.WindowedContext; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.TupleTag; @@ -52,48 +49,4 @@ public interface ExecutionContext { */ void noteOutput(TupleTag tag, WindowedValue output); - /** - * Per-step, per-key context used for retrieving state. - */ - public interface StepContext { - - /** - * The name of the step. - */ - String getStepName(); - - /** - * The name of the transform for the step. - */ - String getTransformName(); - - /** - * Hook for subclasses to implement that will be called whenever - * {@link WindowedContext#output} - * is called. - */ - void noteOutput(WindowedValue output); - - /** - * Hook for subclasses to implement that will be called whenever - * {@link WindowedContext#output} - * is called. - */ - void noteOutput(TupleTag tag, WindowedValue output); - - /** - * Writes the given {@code PCollectionView} data to a globally accessible location. - */ - void writePCollectionViewData( - TupleTag tag, - Iterable> data, - Coder>> dataCoder, - W window, - Coder windowCoder) - throws IOException; - - StateInternals stateInternals(); - - TimerInternals timerInternals(); - } } diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java index 65384da9f8f7..adbe62e4cc43 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java @@ -29,7 +29,6 @@ import java.util.Set; import javax.annotation.Nullable; import org.apache.beam.runners.core.DoFnRunners.OutputManager; -import org.apache.beam.runners.core.ExecutionContext.StepContext; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.state.State; diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StepContext.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StepContext.java new file mode 100644 index 000000000000..a414830bae6d --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StepContext.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import java.io.IOException; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.DoFn.WindowedContext; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.TupleTag; + +/** + * Per-step, per-key context used for retrieving state. + */ +public interface StepContext { + + /** + * The name of the step. + */ + String getStepName(); + + /** + * The name of the transform for the step. + */ + String getTransformName(); + + /** + * Hook for subclasses to implement that will be called whenever + * {@link WindowedContext#output} + * is called. + */ + void noteOutput(WindowedValue output); + + /** + * Hook for subclasses to implement that will be called whenever + * {@link WindowedContext#output} + * is called. + */ + void noteOutput(TupleTag tag, WindowedValue output); + + /** + * Writes the given {@code PCollectionView} data to a globally accessible location. + */ + void writePCollectionViewData( + TupleTag tag, + Iterable> data, + Coder>> dataCoder, + W window, + Coder windowCoder) + throws IOException; + + StateInternals stateInternals(); + + TimerInternals timerInternals(); +} diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java index 86408016f19b..c394ebdf6f28 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java @@ -18,7 +18,7 @@ package org.apache.beam.runners.flink.translation.functions; import java.io.IOException; -import org.apache.beam.runners.core.ExecutionContext.StepContext; +import org.apache.beam.runners.core.StepContext; import org.apache.beam.runners.core.StateInternals; import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.sdk.coders.Coder; diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index f35ba7a0b086..c9f106a9079b 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -32,7 +32,6 @@ import javax.annotation.Nullable; import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners; -import org.apache.beam.runners.core.ExecutionContext; import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn; import org.apache.beam.runners.core.NullSideInputReader; import org.apache.beam.runners.core.PushbackSideInputDoFnRunner; @@ -184,7 +183,7 @@ public DoFnOperator( TimerInternals.TimerDataCoder.of(windowingStrategy.getWindowFn().windowCoder()); } - private ExecutionContext.StepContext createStepContext() { + private org.apache.beam.runners.core.StepContext createStepContext() { return new StepContext(); } @@ -250,7 +249,7 @@ public void open() throws Exception { doFnInvoker.invokeSetup(); - ExecutionContext.StepContext stepContext = createStepContext(); + org.apache.beam.runners.core.StepContext stepContext = createStepContext(); doFnRunner = DoFnRunners.simpleRunner( serializedOptions.getPipelineOptions(), @@ -676,7 +675,7 @@ public void output(TupleTag tag, WindowedValue value) { * {@link StepContext} for running {@link DoFn DoFns} on Flink. This does not allow * accessing state or timer internals. */ - protected class StepContext implements ExecutionContext.StepContext { + protected class StepContext implements org.apache.beam.runners.core.StepContext { @Override public String getStepName() { diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java index ffe343bc2d8b..91474223a7cb 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java @@ -24,7 +24,7 @@ import java.util.Iterator; import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners.OutputManager; -import org.apache.beam.runners.core.ExecutionContext.StepContext; +import org.apache.beam.runners.core.StepContext; import org.apache.beam.runners.core.StateInternals; import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.sdk.coders.Coder; diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java index 9b79d110bf37..b206bc7b8f85 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java @@ -19,7 +19,7 @@ package org.apache.beam.fn.harness.fake; import java.io.IOException; -import org.apache.beam.runners.core.ExecutionContext.StepContext; +import org.apache.beam.runners.core.StepContext; import org.apache.beam.runners.core.StateInternals; import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.sdk.coders.Coder; From bed1c53fb47e2d623d6671ce69b82579992df642 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Mon, 22 May 2017 15:30:33 -0700 Subject: [PATCH 02/15] Remove StepContext.noteOutput --- .../apex/translation/utils/NoOpStepContext.java | 8 -------- .../beam/runners/core/BaseExecutionContext.java | 16 ---------------- .../beam/runners/core/ExecutionContext.java | 16 ---------------- .../beam/runners/core/SimpleDoFnRunner.java | 6 ------ .../apache/beam/runners/core/StepContext.java | 15 --------------- .../functions/FlinkNoOpStepContext.java | 10 ---------- .../wrappers/streaming/DoFnOperator.java | 6 ------ .../spark/translation/SparkProcessContext.java | 6 ------ .../beam/fn/harness/fake/FakeStepContext.java | 8 -------- 9 files changed, 91 deletions(-) diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java index 241a9856736d..51e843b56652 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java @@ -43,14 +43,6 @@ public String getTransformName() { return null; } - @Override - public void noteOutput(WindowedValue output) { - } - - @Override - public void noteOutput(TupleTag tag, WindowedValue output) { - } - @Override public void writePCollectionViewData(TupleTag tag, Iterable> data, diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java index ed3714372b47..a00699998bcd 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java @@ -104,12 +104,6 @@ public Collection getAllStepContexts() { return Collections.unmodifiableCollection(cachedStepContexts.values()); } - @Override - public void noteOutput(WindowedValue output) {} - - @Override - public void noteOutput(TupleTag tag, WindowedValue output) {} - /** * Base class for implementations of {@link org.apache.beam.runners.core.StepContext}. * @@ -137,16 +131,6 @@ public String getTransformName() { return transformName; } - @Override - public void noteOutput(WindowedValue output) { - executionContext.noteOutput(output); - } - - @Override - public void noteOutput(TupleTag tag, WindowedValue output) { - executionContext.noteOutput(tag, output); - } - @Override public void writePCollectionViewData( TupleTag tag, diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java index f431c92859ec..eac3599d9757 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java @@ -18,9 +18,6 @@ package org.apache.beam.runners.core; import java.util.Collection; -import org.apache.beam.sdk.transforms.DoFn.WindowedContext; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.TupleTag; /** * Context for the current execution. This is guaranteed to exist during processing, @@ -36,17 +33,4 @@ public interface ExecutionContext { * Returns a collection view of all of the {@link StepContext}s. */ Collection getAllStepContexts(); - - /** - * Hook for subclasses to implement that will be called whenever - * {@link WindowedContext#output(TupleTag, Object)} is called. - */ - void noteOutput(WindowedValue output); - - /** - * Hook for subclasses to implement that will be called whenever - * {@link WindowedContext#output(TupleTag, Object)} is called. - */ - void noteOutput(TupleTag tag, WindowedValue output); - } diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java index adbe62e4cc43..97b0b3307740 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java @@ -352,9 +352,6 @@ void outputWindowedValue( void outputWindowedValue(WindowedValue windowedElem) { outputManager.output(mainOutputTag, windowedElem); - if (stepContext != null) { - stepContext.noteOutput(windowedElem); - } } private void outputWindowedValue( @@ -380,9 +377,6 @@ private void outputWindowedValue(TupleTag tag, WindowedValue windowedE } outputManager.output(tag, windowedElem); - if (stepContext != null) { - stepContext.noteOutput(tag, windowedElem); - } } } diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StepContext.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StepContext.java index a414830bae6d..fd2575d5f74b 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StepContext.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StepContext.java @@ -19,7 +19,6 @@ import java.io.IOException; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.transforms.DoFn.WindowedContext; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.TupleTag; @@ -39,20 +38,6 @@ public interface StepContext { */ String getTransformName(); - /** - * Hook for subclasses to implement that will be called whenever - * {@link WindowedContext#output} - * is called. - */ - void noteOutput(WindowedValue output); - - /** - * Hook for subclasses to implement that will be called whenever - * {@link WindowedContext#output} - * is called. - */ - void noteOutput(TupleTag tag, WindowedValue output); - /** * Writes the given {@code PCollectionView} data to a globally accessible location. */ diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java index c394ebdf6f28..d999494eee79 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java @@ -41,16 +41,6 @@ public String getTransformName() { return null; } - @Override - public void noteOutput(WindowedValue output) { - - } - - @Override - public void noteOutput(TupleTag tag, WindowedValue output) { - - } - @Override public void writePCollectionViewData( TupleTag tag, diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index c9f106a9079b..2bb9c2003898 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -687,12 +687,6 @@ public String getTransformName() { return null; } - @Override - public void noteOutput(WindowedValue output) {} - - @Override - public void noteOutput(TupleTag tag, WindowedValue output) {} - @Override public void writePCollectionViewData( TupleTag tag, diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java index 91474223a7cb..31e616cbf0f4 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java @@ -109,12 +109,6 @@ public String getTransformName() { return null; } - @Override - public void noteOutput(WindowedValue output) { } - - @Override - public void noteOutput(TupleTag tag, WindowedValue output) { } - @Override public void writePCollectionViewData( TupleTag tag, diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java index b206bc7b8f85..750c167d8993 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java @@ -41,14 +41,6 @@ public String getTransformName() { return "TODO"; } - @Override - public void noteOutput(WindowedValue output) { - } - - @Override - public void noteOutput(TupleTag tag, WindowedValue output) { - } - @Override public void writePCollectionViewData( TupleTag tag, From 8b7a1f6dfe0ac33814a0b0c67f37f47ab449ec4b Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Mon, 22 May 2017 15:34:37 -0700 Subject: [PATCH 03/15] Rename BaseExecutionContext.StepContext to BaseStepContext --- .../beam/runners/core/BaseExecutionContext.java | 14 +++++++------- .../beam/runners/core/SimpleDoFnRunnerTest.java | 5 +++-- .../beam/runners/core/StatefulDoFnRunnerTest.java | 5 +++-- .../runners/direct/DirectExecutionContext.java | 2 +- 4 files changed, 14 insertions(+), 12 deletions(-) diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java index a00699998bcd..5667250b3288 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java @@ -31,11 +31,11 @@ * Base class for implementations of {@link ExecutionContext}. * *

A concrete subclass should implement {@link #createStepContext} to create the appropriate - * {@link StepContext} implementation. Any {@code StepContext} created will + * {@link BaseStepContext} implementation. Any {@code StepContext} created will * be cached for the lifetime of this {@link ExecutionContext}. * *

BaseExecutionContext is generic to allow implementing subclasses to return a concrete subclass - * of {@link StepContext} from {@link #getOrCreateStepContext(String, String)} and + * of {@link BaseStepContext} from {@link #getOrCreateStepContext(String, String)} and * {@link #getAllStepContexts()} without forcing each subclass to override the method, e.g. *

{@code
  * {@literal @}Override
@@ -56,12 +56,12 @@ public abstract class BaseExecutionContext
 
   /**
    * Implementations should override this to create the specific type
-   * of {@link StepContext} they need.
+   * of {@link BaseStepContext} they need.
    */
   protected abstract T createStepContext(String stepName, String transformName);
 
   /**
-   * Returns the {@link StepContext} associated with the given step.
+   * Returns the {@link BaseStepContext} associated with the given step.
    */
   @Override
   public T getOrCreateStepContext(String stepName, String transformName) {
@@ -97,7 +97,7 @@ protected final T getOrCreateStepContext(String stepName,
   }
 
   /**
-   * Returns a collection view of all of the {@link StepContext}s.
+   * Returns a collection view of all of the {@link BaseStepContext}s.
    */
   @Override
   public Collection getAllStepContexts() {
@@ -110,12 +110,12 @@ public Collection getAllStepContexts() {
    * 

To complete a concrete subclass, implement {@link #timerInternals} and * {@link #stateInternals}. */ - public abstract static class StepContext implements org.apache.beam.runners.core.StepContext { + public abstract static class BaseStepContext implements org.apache.beam.runners.core.StepContext { private final ExecutionContext executionContext; private final String stepName; private final String transformName; - public StepContext(ExecutionContext executionContext, String stepName, String transformName) { + public BaseStepContext(ExecutionContext executionContext, String stepName, String transformName) { this.executionContext = executionContext; this.stepName = stepName; this.transformName = transformName; diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java index abefd1c488b7..3750e6c9f0f7 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java @@ -29,7 +29,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; -import org.apache.beam.runners.core.BaseExecutionContext.StepContext; +import org.apache.beam.runners.core.BaseExecutionContext.BaseStepContext; import org.apache.beam.runners.core.DoFnRunners.OutputManager; import org.apache.beam.runners.core.TimerInternals.TimerData; import org.apache.beam.sdk.coders.Coder; @@ -63,7 +63,8 @@ public class SimpleDoFnRunnerTest { @Rule public ExpectedException thrown = ExpectedException.none(); - @Mock StepContext mockStepContext; + @Mock + BaseStepContext mockStepContext; @Mock TimerInternals mockTimerInternals; diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java index 5172f433135a..a335c3a5daa4 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java @@ -24,7 +24,7 @@ import com.google.common.base.MoreObjects; import java.util.Collections; -import org.apache.beam.runners.core.BaseExecutionContext.StepContext; +import org.apache.beam.runners.core.BaseExecutionContext.BaseStepContext; import org.apache.beam.runners.core.metrics.MetricsContainerImpl; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.VarIntCoder; @@ -69,7 +69,8 @@ public class StatefulDoFnRunnerTest { private static final IntervalWindow WINDOW_2 = new IntervalWindow(new Instant(10), new Instant(20)); - @Mock StepContext mockStepContext; + @Mock + BaseStepContext mockStepContext; private InMemoryStateInternals stateInternals; private InMemoryTimerInternals timerInternals; diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java index 107f39ad4f74..6d2d02ab4fb2 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java @@ -57,7 +57,7 @@ protected DirectStepContext createStepContext(String stepName, String transformN * Step Context for the {@link DirectRunner}. */ public class DirectStepContext - extends BaseExecutionContext.StepContext { + extends BaseStepContext { private CopyOnAccessInMemoryStateInternals stateInternals; private DirectTimerInternals timerInternals; From 59322d51e80e7480710a296f51a4cb65303f5e06 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Mon, 22 May 2017 15:35:46 -0700 Subject: [PATCH 04/15] Move BaseStepContext to the top level --- .../runners/core/BaseExecutionContext.java | 46 ------------- .../beam/runners/core/BaseStepContext.java | 66 +++++++++++++++++++ .../runners/core/SimpleDoFnRunnerTest.java | 1 - .../runners/core/StatefulDoFnRunnerTest.java | 1 - .../direct/DirectExecutionContext.java | 1 + 5 files changed, 67 insertions(+), 48 deletions(-) create mode 100644 runners/core-java/src/main/java/org/apache/beam/runners/core/BaseStepContext.java diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java index 5667250b3288..877fa0a4f105 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java @@ -17,15 +17,10 @@ */ package org.apache.beam.runners.core; -import java.io.IOException; import java.util.Collection; import java.util.Collections; import java.util.LinkedHashMap; import java.util.Map; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.TupleTag; /** * Base class for implementations of {@link ExecutionContext}. @@ -104,45 +99,4 @@ public Collection getAllStepContexts() { return Collections.unmodifiableCollection(cachedStepContexts.values()); } - /** - * Base class for implementations of {@link org.apache.beam.runners.core.StepContext}. - * - *

To complete a concrete subclass, implement {@link #timerInternals} and - * {@link #stateInternals}. - */ - public abstract static class BaseStepContext implements org.apache.beam.runners.core.StepContext { - private final ExecutionContext executionContext; - private final String stepName; - private final String transformName; - - public BaseStepContext(ExecutionContext executionContext, String stepName, String transformName) { - this.executionContext = executionContext; - this.stepName = stepName; - this.transformName = transformName; - } - - @Override - public String getStepName() { - return stepName; - } - - @Override - public String getTransformName() { - return transformName; - } - - @Override - public void writePCollectionViewData( - TupleTag tag, - Iterable> data, Coder>> dataCoder, - W window, Coder windowCoder) throws IOException { - throw new UnsupportedOperationException("Not implemented."); - } - - @Override - public abstract StateInternals stateInternals(); - - @Override - public abstract TimerInternals timerInternals(); - } } diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseStepContext.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseStepContext.java new file mode 100644 index 000000000000..f0436acefdc5 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseStepContext.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import java.io.IOException; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.TupleTag; + +/** + * Base class for implementations of {@link StepContext}. + * + *

To complete a concrete subclass, implement {@link #timerInternals} and + * {@link #stateInternals}. + */ +public abstract class BaseStepContext implements StepContext { + private final ExecutionContext executionContext; + private final String stepName; + private final String transformName; + + public BaseStepContext(ExecutionContext executionContext, String stepName, String transformName) { + this.executionContext = executionContext; + this.stepName = stepName; + this.transformName = transformName; + } + + @Override + public String getStepName() { + return stepName; + } + + @Override + public String getTransformName() { + return transformName; + } + + @Override + public void writePCollectionViewData( + TupleTag tag, + Iterable> data, Coder>> dataCoder, + W window, Coder windowCoder) throws IOException { + throw new UnsupportedOperationException("Not implemented."); + } + + @Override + public abstract StateInternals stateInternals(); + + @Override + public abstract TimerInternals timerInternals(); +} diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java index 3750e6c9f0f7..59e5857c3aa1 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java @@ -29,7 +29,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; -import org.apache.beam.runners.core.BaseExecutionContext.BaseStepContext; import org.apache.beam.runners.core.DoFnRunners.OutputManager; import org.apache.beam.runners.core.TimerInternals.TimerData; import org.apache.beam.sdk.coders.Coder; diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java index a335c3a5daa4..62a657882cc6 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java @@ -24,7 +24,6 @@ import com.google.common.base.MoreObjects; import java.util.Collections; -import org.apache.beam.runners.core.BaseExecutionContext.BaseStepContext; import org.apache.beam.runners.core.metrics.MetricsContainerImpl; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.VarIntCoder; diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java index 6d2d02ab4fb2..e5b88e5ac8ee 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java @@ -18,6 +18,7 @@ package org.apache.beam.runners.direct; import org.apache.beam.runners.core.BaseExecutionContext; +import org.apache.beam.runners.core.BaseStepContext; import org.apache.beam.runners.core.ExecutionContext; import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext; From 248c808a6603dc2c28a0b55296e0d596b8903a08 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Mon, 22 May 2017 15:36:41 -0700 Subject: [PATCH 05/15] Remove extraneous ExecutionContext parameter to BaseStepContext --- .../java/org/apache/beam/runners/core/BaseStepContext.java | 4 +--- .../apache/beam/runners/direct/DirectExecutionContext.java | 2 +- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseStepContext.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseStepContext.java index f0436acefdc5..014fe0def371 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseStepContext.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseStepContext.java @@ -30,12 +30,10 @@ * {@link #stateInternals}. */ public abstract class BaseStepContext implements StepContext { - private final ExecutionContext executionContext; private final String stepName; private final String transformName; - public BaseStepContext(ExecutionContext executionContext, String stepName, String transformName) { - this.executionContext = executionContext; + public BaseStepContext(String stepName, String transformName) { this.stepName = stepName; this.transformName = transformName; } diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java index e5b88e5ac8ee..d676f247298c 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java @@ -64,7 +64,7 @@ public class DirectStepContext public DirectStepContext( ExecutionContext executionContext, String stepName, String transformName) { - super(executionContext, stepName, transformName); + super(stepName, transformName); } @Override From 5ac24e0a89b95feafccbe381bdde9c11fdf82a88 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Mon, 22 May 2017 15:44:17 -0700 Subject: [PATCH 06/15] Implement StepContext directly in the DirectRunner --- .../direct/DirectExecutionContext.java | 33 ++++++++++++++++--- 1 file changed, 29 insertions(+), 4 deletions(-) diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java index d676f247298c..2a75ef51a4ba 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java @@ -17,13 +17,18 @@ */ package org.apache.beam.runners.direct; +import java.io.IOException; import org.apache.beam.runners.core.BaseExecutionContext; -import org.apache.beam.runners.core.BaseStepContext; import org.apache.beam.runners.core.ExecutionContext; +import org.apache.beam.runners.core.StepContext; import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext; import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate; import org.apache.beam.runners.direct.WatermarkManager.TransformWatermarks; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.TupleTag; /** * Execution Context for the {@link DirectRunner}. @@ -57,14 +62,16 @@ protected DirectStepContext createStepContext(String stepName, String transformN /** * Step Context for the {@link DirectRunner}. */ - public class DirectStepContext - extends BaseStepContext { + public class DirectStepContext implements StepContext { private CopyOnAccessInMemoryStateInternals stateInternals; private DirectTimerInternals timerInternals; + private final String stepName; + private final String transformName; public DirectStepContext( ExecutionContext executionContext, String stepName, String transformName) { - super(stepName, transformName); + this.stepName = stepName; + this.transformName = transformName; } @Override @@ -95,6 +102,24 @@ public CopyOnAccessInMemoryStateInternals commitState() { return null; } + @Override + public String getStepName() { + return stepName; + } + + @Override + public String getTransformName() { + return transformName; + } + + @Override + public void writePCollectionViewData( + TupleTag tag, + Iterable> data, Coder>> dataCoder, + W window, Coder windowCoder) throws IOException { + throw new UnsupportedOperationException("Not implemented."); + } + /** * Gets the timer update of the {@link TimerInternals} of this {@link DirectStepContext}, * which is empty if the {@link TimerInternals} were never accessed. From 32c6cb160f42e401f3e170cc8ed18d76c627d3e4 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Mon, 22 May 2017 16:26:00 -0700 Subject: [PATCH 07/15] Remove writePCollectionViewData from the Beam codebase --- .../apex/translation/utils/NoOpStepContext.java | 13 ------------- .../beam/runners/core/BaseStepContext.java | 14 -------------- .../apache/beam/runners/core/StepContext.java | 17 ----------------- .../runners/direct/DirectExecutionContext.java | 13 ------------- .../functions/FlinkNoOpStepContext.java | 16 +--------------- .../wrappers/streaming/DoFnOperator.java | 11 ----------- .../spark/translation/SparkProcessContext.java | 14 +------------- .../beam/fn/harness/fake/FakeStepContext.java | 16 +--------------- 8 files changed, 3 insertions(+), 111 deletions(-) diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java index 51e843b56652..820b1897b5db 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java @@ -17,15 +17,10 @@ */ package org.apache.beam.runners.apex.translation.utils; -import java.io.IOException; import java.io.Serializable; import org.apache.beam.runners.core.StateInternals; import org.apache.beam.runners.core.StepContext; import org.apache.beam.runners.core.TimerInternals; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.TupleTag; /** * Serializable {@link StepContext} that does nothing. @@ -43,14 +38,6 @@ public String getTransformName() { return null; } - @Override - public void writePCollectionViewData(TupleTag tag, - Iterable> data, - Coder>> dataCoder, W window, Coder windowCoder) throws - IOException { - - } - @Override public StateInternals stateInternals() { return null; diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseStepContext.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseStepContext.java index 014fe0def371..e639c46c9f62 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseStepContext.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseStepContext.java @@ -17,12 +17,6 @@ */ package org.apache.beam.runners.core; -import java.io.IOException; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.TupleTag; - /** * Base class for implementations of {@link StepContext}. * @@ -48,14 +42,6 @@ public String getTransformName() { return transformName; } - @Override - public void writePCollectionViewData( - TupleTag tag, - Iterable> data, Coder>> dataCoder, - W window, Coder windowCoder) throws IOException { - throw new UnsupportedOperationException("Not implemented."); - } - @Override public abstract StateInternals stateInternals(); diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StepContext.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StepContext.java index fd2575d5f74b..62a81f154eca 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StepContext.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StepContext.java @@ -17,12 +17,6 @@ */ package org.apache.beam.runners.core; -import java.io.IOException; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.TupleTag; - /** * Per-step, per-key context used for retrieving state. */ @@ -38,17 +32,6 @@ public interface StepContext { */ String getTransformName(); - /** - * Writes the given {@code PCollectionView} data to a globally accessible location. - */ - void writePCollectionViewData( - TupleTag tag, - Iterable> data, - Coder>> dataCoder, - W window, - Coder windowCoder) - throws IOException; - StateInternals stateInternals(); TimerInternals timerInternals(); diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java index 2a75ef51a4ba..39174d630954 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java @@ -17,7 +17,6 @@ */ package org.apache.beam.runners.direct; -import java.io.IOException; import org.apache.beam.runners.core.BaseExecutionContext; import org.apache.beam.runners.core.ExecutionContext; import org.apache.beam.runners.core.StepContext; @@ -25,10 +24,6 @@ import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext; import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate; import org.apache.beam.runners.direct.WatermarkManager.TransformWatermarks; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.TupleTag; /** * Execution Context for the {@link DirectRunner}. @@ -112,14 +107,6 @@ public String getTransformName() { return transformName; } - @Override - public void writePCollectionViewData( - TupleTag tag, - Iterable> data, Coder>> dataCoder, - W window, Coder windowCoder) throws IOException { - throw new UnsupportedOperationException("Not implemented."); - } - /** * Gets the timer update of the {@link TimerInternals} of this {@link DirectStepContext}, * which is empty if the {@link TimerInternals} were never accessed. diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java index d999494eee79..1ff322ee0ba5 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java @@ -17,14 +17,9 @@ */ package org.apache.beam.runners.flink.translation.functions; -import java.io.IOException; -import org.apache.beam.runners.core.StepContext; import org.apache.beam.runners.core.StateInternals; +import org.apache.beam.runners.core.StepContext; import org.apache.beam.runners.core.TimerInternals; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.TupleTag; /** * A {@link StepContext} for Flink Batch Runner execution. @@ -41,15 +36,6 @@ public String getTransformName() { return null; } - @Override - public void writePCollectionViewData( - TupleTag tag, - Iterable> data, - Coder>> dataCoder, - W window, - Coder windowCoder) throws IOException { - } - @Override public StateInternals stateInternals() { return null; diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index 2bb9c2003898..4f8998e9df8c 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -23,7 +23,6 @@ import com.google.common.collect.Iterables; import java.io.DataInputStream; import java.io.DataOutputStream; -import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; @@ -687,16 +686,6 @@ public String getTransformName() { return null; } - @Override - public void writePCollectionViewData( - TupleTag tag, - Iterable> data, - Coder>> dataCoder, - W window, - Coder windowCoder) throws IOException { - throw new UnsupportedOperationException("Writing side-input data is not supported."); - } - @Override public StateInternals stateInternals() { return stateInternals; diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java index 31e616cbf0f4..e693143ade16 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java @@ -20,19 +20,15 @@ import com.google.common.collect.AbstractIterator; import com.google.common.collect.Lists; -import java.io.IOException; import java.util.Iterator; import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners.OutputManager; -import org.apache.beam.runners.core.StepContext; import org.apache.beam.runners.core.StateInternals; +import org.apache.beam.runners.core.StepContext; import org.apache.beam.runners.core.TimerInternals; -import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.TupleTag; /** @@ -109,14 +105,6 @@ public String getTransformName() { return null; } - @Override - public void writePCollectionViewData( - TupleTag tag, - Iterable> data, - Coder>> dataCoder, - W window, - Coder windowCoder) throws IOException { } - @Override public StateInternals stateInternals() { return null; diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java index 750c167d8993..3f6a2daed61c 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java @@ -18,14 +18,9 @@ package org.apache.beam.fn.harness.fake; -import java.io.IOException; -import org.apache.beam.runners.core.StepContext; import org.apache.beam.runners.core.StateInternals; +import org.apache.beam.runners.core.StepContext; import org.apache.beam.runners.core.TimerInternals; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.TupleTag; /** * A fake {@link StepContext} factory that performs no-ops. @@ -41,15 +36,6 @@ public String getTransformName() { return "TODO"; } - @Override - public void writePCollectionViewData( - TupleTag tag, - Iterable> data, - Coder>> dataCoder, - W window, - Coder windowCoder) throws IOException { - } - @Override public StateInternals stateInternals() { throw new UnsupportedOperationException(); From 0be3cf3462c19f0b007b2329c95ea4865d22cad5 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Mon, 22 May 2017 16:50:41 -0700 Subject: [PATCH 08/15] Inline and delete BaseExecutionContext --- .../runners/core/BaseExecutionContext.java | 102 ------------------ .../direct/DirectExecutionContext.java | 39 +++++-- 2 files changed, 32 insertions(+), 109 deletions(-) delete mode 100644 runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java deleted file mode 100644 index 877fa0a4f105..000000000000 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.core; - -import java.util.Collection; -import java.util.Collections; -import java.util.LinkedHashMap; -import java.util.Map; - -/** - * Base class for implementations of {@link ExecutionContext}. - * - *

A concrete subclass should implement {@link #createStepContext} to create the appropriate - * {@link BaseStepContext} implementation. Any {@code StepContext} created will - * be cached for the lifetime of this {@link ExecutionContext}. - * - *

BaseExecutionContext is generic to allow implementing subclasses to return a concrete subclass - * of {@link BaseStepContext} from {@link #getOrCreateStepContext(String, String)} and - * {@link #getAllStepContexts()} without forcing each subclass to override the method, e.g. - *

{@code
- * {@literal @}Override
- * StreamingModeExecutionContext.StepContext getOrCreateStepContext(...) {
- *   return (StreamingModeExecutionContext.StepContext) super.getOrCreateStepContext(...);
- * }
- * }
- * - *

When a subclass of {@code BaseExecutionContext} has been downcast, the return types of - * {@link #createStepContext(String, String)}, - * {@link #getOrCreateStepContext(String, String)}, and {@link #getAllStepContexts()} - * will be appropriately specialized. - */ -public abstract class BaseExecutionContext - implements ExecutionContext { - - private Map cachedStepContexts = new LinkedHashMap<>(); - - /** - * Implementations should override this to create the specific type - * of {@link BaseStepContext} they need. - */ - protected abstract T createStepContext(String stepName, String transformName); - - /** - * Returns the {@link BaseStepContext} associated with the given step. - */ - @Override - public T getOrCreateStepContext(String stepName, String transformName) { - final String finalStepName = stepName; - final String finalTransformName = transformName; - return getOrCreateStepContext( - stepName, - new CreateStepContextFunction() { - @Override - public T create() { - return createStepContext(finalStepName, finalTransformName); - } - }); - } - - /** - * Factory method interface to create an execution context if none exists during - * {@link #getOrCreateStepContext(String, CreateStepContextFunction)}. - */ - protected interface CreateStepContextFunction { - T create(); - } - - protected final T getOrCreateStepContext(String stepName, - CreateStepContextFunction createContextFunc) { - T context = cachedStepContexts.get(stepName); - if (context == null) { - context = createContextFunc.create(); - cachedStepContexts.put(stepName, context); - } - - return context; - } - - /** - * Returns a collection view of all of the {@link BaseStepContext}s. - */ - @Override - public Collection getAllStepContexts() { - return Collections.unmodifiableCollection(cachedStepContexts.values()); - } - -} diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java index 39174d630954..9b6866216a61 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java @@ -17,11 +17,14 @@ */ package org.apache.beam.runners.direct; -import org.apache.beam.runners.core.BaseExecutionContext; +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.Map; +import org.apache.beam.runners.core.BaseStepContext; import org.apache.beam.runners.core.ExecutionContext; import org.apache.beam.runners.core.StepContext; import org.apache.beam.runners.core.TimerInternals; -import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext; import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate; import org.apache.beam.runners.direct.WatermarkManager.TransformWatermarks; @@ -31,12 +34,12 @@ *

This implementation is not thread safe. A new {@link DirectExecutionContext} must be created * for each thread that requires it. */ -class DirectExecutionContext - extends BaseExecutionContext { +class DirectExecutionContext implements ExecutionContext { private final Clock clock; private final StructuralKey key; private final CopyOnAccessInMemoryStateInternals existingState; private final TransformWatermarks watermarks; + private Map cachedStepContexts = new LinkedHashMap<>(); public DirectExecutionContext( Clock clock, @@ -49,9 +52,31 @@ public DirectExecutionContext( this.watermarks = watermarks; } + private DirectStepContext createStepContext(String stepName, String transformName) { + return new DirectStepContext(stepName, transformName); + } + + /** + * Returns the {@link BaseStepContext} associated with the given step. + */ + @Override + public DirectStepContext getOrCreateStepContext(String stepName, String transformName) { + final String finalStepName = stepName; + final String finalTransformName = transformName; + DirectStepContext context = cachedStepContexts.get(stepName); + if (context == null) { + context = createStepContext(finalStepName, finalTransformName); + cachedStepContexts.put(stepName, context); + } + return context; + } + + /** + * Returns a collection view of all of the {@link BaseStepContext}s. + */ @Override - protected DirectStepContext createStepContext(String stepName, String transformName) { - return new DirectStepContext(this, stepName, transformName); + public Collection getAllStepContexts() { + return Collections.unmodifiableCollection(cachedStepContexts.values()); } /** @@ -64,7 +89,7 @@ public class DirectStepContext implements StepContext { private final String transformName; public DirectStepContext( - ExecutionContext executionContext, String stepName, String transformName) { + String stepName, String transformName) { this.stepName = stepName; this.transformName = transformName; } From 97c230af62151fdbe06ac622282d69c74db30b2f Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Mon, 22 May 2017 17:33:38 -0700 Subject: [PATCH 09/15] Delete unused ExecutionContext --- .../beam/runners/core/ExecutionContext.java | 36 ------------------- .../direct/DirectExecutionContext.java | 3 +- .../runners/direct/EvaluationContext.java | 29 +++++++-------- 3 files changed, 14 insertions(+), 54 deletions(-) delete mode 100644 runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java deleted file mode 100644 index eac3599d9757..000000000000 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.core; - -import java.util.Collection; - -/** - * Context for the current execution. This is guaranteed to exist during processing, - * but does not necessarily persist between different batches of work. - */ -public interface ExecutionContext { - /** - * Returns the {@link StepContext} associated with the given step. - */ - StepContext getOrCreateStepContext(String stepName, String transformName); - - /** - * Returns a collection view of all of the {@link StepContext}s. - */ - Collection getAllStepContexts(); -} diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java index 9b6866216a61..05dbebc053c7 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java @@ -22,7 +22,6 @@ import java.util.LinkedHashMap; import java.util.Map; import org.apache.beam.runners.core.BaseStepContext; -import org.apache.beam.runners.core.ExecutionContext; import org.apache.beam.runners.core.StepContext; import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate; @@ -34,7 +33,7 @@ *

This implementation is not thread safe. A new {@link DirectExecutionContext} must be created * for each thread that requires it. */ -class DirectExecutionContext implements ExecutionContext { +class DirectExecutionContext { private final Clock clock; private final StructuralKey key; private final CopyOnAccessInMemoryStateInternals existingState; diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java index c62711996648..88ce85a7cd22 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java @@ -31,7 +31,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import javax.annotation.Nullable; -import org.apache.beam.runners.core.ExecutionContext; import org.apache.beam.runners.core.ReadyCheckingSideInputReader; import org.apache.beam.runners.core.SideInputReader; import org.apache.beam.runners.core.TimerInternals.TimerData; @@ -52,22 +51,20 @@ import org.joda.time.Instant; /** - * The evaluation context for a specific pipeline being executed by the - * {@link DirectRunner}. Contains state shared within the execution across all - * transforms. + * The evaluation context for a specific pipeline being executed by the {@link DirectRunner}. + * Contains state shared within the execution across all transforms. * - *

{@link EvaluationContext} contains shared state for an execution of the - * {@link DirectRunner} that can be used while evaluating a {@link PTransform}. This - * consists of views into underlying state and watermark implementations, access to read and write - * {@link PCollectionView PCollectionViews}, and managing the - * {@link ExecutionContext ExecutionContexts}. This includes executing callbacks asynchronously when - * state changes to the appropriate point (e.g. when a {@link PCollectionView} is requested and - * known to be empty). + *

{@link EvaluationContext} contains shared state for an execution of the {@link DirectRunner} + * that can be used while evaluating a {@link PTransform}. This consists of views into underlying + * state and watermark implementations, access to read and write {@link PCollectionView + * PCollectionViews}, and managing the {@link DirectExecutionContext ExecutionContexts}. This + * includes executing callbacks asynchronously when state changes to the appropriate point (e.g. + * when a {@link PCollectionView} is requested and known to be empty). * - *

{@link EvaluationContext} also handles results by committing finalizing bundles based - * on the current global state and updating the global state appropriately. This includes updating - * the per-{@link StepAndKey} state, updating global watermarks, and executing any callbacks that - * can be executed. + *

{@link EvaluationContext} also handles results by committing finalizing bundles based on the + * current global state and updating the global state appropriately. This includes updating the + * per-{@link StepAndKey} state, updating global watermarks, and executing any callbacks that can be + * executed. */ class EvaluationContext { /** @@ -312,7 +309,7 @@ public DirectOptions getPipelineOptions() { } /** - * Get an {@link ExecutionContext} for the provided {@link AppliedPTransform} and key. + * Get a {@link DirectExecutionContext} for the provided {@link AppliedPTransform} and key. */ public DirectExecutionContext getExecutionContext( AppliedPTransform application, StructuralKey key) { From acce24ce1388b7953fbb9d87da5bb2271286c58a Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Mon, 22 May 2017 17:42:10 -0700 Subject: [PATCH 10/15] Remove unused StepContext name methods --- .../apex/translation/utils/NoOpStepContext.java | 10 ---------- .../org/apache/beam/runners/core/BaseStepContext.java | 10 ---------- .../java/org/apache/beam/runners/core/StepContext.java | 10 ---------- .../beam/runners/direct/DirectExecutionContext.java | 10 ---------- .../translation/functions/FlinkNoOpStepContext.java | 10 ---------- .../translation/wrappers/streaming/DoFnOperator.java | 10 ---------- .../runners/spark/translation/SparkProcessContext.java | 9 --------- .../apache/beam/fn/harness/fake/FakeStepContext.java | 9 --------- 8 files changed, 78 deletions(-) diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java index 820b1897b5db..b49e4da27bcd 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java @@ -28,16 +28,6 @@ public class NoOpStepContext implements StepContext, Serializable { private static final long serialVersionUID = 1L; - @Override - public String getStepName() { - return null; - } - - @Override - public String getTransformName() { - return null; - } - @Override public StateInternals stateInternals() { return null; diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseStepContext.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseStepContext.java index e639c46c9f62..4abd4d228160 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseStepContext.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseStepContext.java @@ -32,16 +32,6 @@ public BaseStepContext(String stepName, String transformName) { this.transformName = transformName; } - @Override - public String getStepName() { - return stepName; - } - - @Override - public String getTransformName() { - return transformName; - } - @Override public abstract StateInternals stateInternals(); diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StepContext.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StepContext.java index 62a81f154eca..60fc40291452 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StepContext.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StepContext.java @@ -22,16 +22,6 @@ */ public interface StepContext { - /** - * The name of the step. - */ - String getStepName(); - - /** - * The name of the transform for the step. - */ - String getTransformName(); - StateInternals stateInternals(); TimerInternals timerInternals(); diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java index 05dbebc053c7..651af8f4f818 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java @@ -121,16 +121,6 @@ public CopyOnAccessInMemoryStateInternals commitState() { return null; } - @Override - public String getStepName() { - return stepName; - } - - @Override - public String getTransformName() { - return transformName; - } - /** * Gets the timer update of the {@link TimerInternals} of this {@link DirectStepContext}, * which is empty if the {@link TimerInternals} were never accessed. diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java index 1ff322ee0ba5..9c7b63677ba3 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java @@ -26,16 +26,6 @@ */ public class FlinkNoOpStepContext implements StepContext { - @Override - public String getStepName() { - return null; - } - - @Override - public String getTransformName() { - return null; - } - @Override public StateInternals stateInternals() { return null; diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index 4f8998e9df8c..d2ab7e1cabbd 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -676,16 +676,6 @@ public void output(TupleTag tag, WindowedValue value) { */ protected class StepContext implements org.apache.beam.runners.core.StepContext { - @Override - public String getStepName() { - return null; - } - - @Override - public String getTransformName() { - return null; - } - @Override public StateInternals stateInternals() { return stateInternals; diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java index e693143ade16..f4ab7d9b02cb 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java @@ -95,15 +95,6 @@ interface SparkOutputManager extends OutputManager, Iterable { } static class NoOpStepContext implements StepContext { - @Override - public String getStepName() { - return null; - } - - @Override - public String getTransformName() { - return null; - } @Override public StateInternals stateInternals() { diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java index 3f6a2daed61c..bdf138b31523 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java @@ -26,15 +26,6 @@ * A fake {@link StepContext} factory that performs no-ops. */ public class FakeStepContext implements StepContext { - @Override - public String getStepName() { - return "TODO"; - } - - @Override - public String getTransformName() { - return "TODO"; - } @Override public StateInternals stateInternals() { From dc585510e6b4def3a0442114d77e96f2b5d4880f Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Mon, 22 May 2017 17:46:12 -0700 Subject: [PATCH 11/15] Delete unused remnants in DirectExecutionContext --- .../beam/runners/direct/DirectExecutionContext.java | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java index 651af8f4f818..845256539eb9 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java @@ -56,9 +56,8 @@ private DirectStepContext createStepContext(String stepName, String transformNam } /** - * Returns the {@link BaseStepContext} associated with the given step. + * Returns the {@link StepContext} associated with the given step. */ - @Override public DirectStepContext getOrCreateStepContext(String stepName, String transformName) { final String finalStepName = stepName; final String finalTransformName = transformName; @@ -70,14 +69,6 @@ public DirectStepContext getOrCreateStepContext(String stepName, String transfor return context; } - /** - * Returns a collection view of all of the {@link BaseStepContext}s. - */ - @Override - public Collection getAllStepContexts() { - return Collections.unmodifiableCollection(cachedStepContexts.values()); - } - /** * Step Context for the {@link DirectRunner}. */ From 62115b29a7f27a1a74b7c870d4277655adb3dfbf Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Mon, 22 May 2017 17:46:58 -0700 Subject: [PATCH 12/15] Delete unused BaseStepContext --- .../beam/runners/core/BaseStepContext.java | 40 ------------------- .../runners/core/SimpleDoFnRunnerTest.java | 2 +- .../runners/core/StatefulDoFnRunnerTest.java | 2 +- .../direct/DirectExecutionContext.java | 3 -- 4 files changed, 2 insertions(+), 45 deletions(-) delete mode 100644 runners/core-java/src/main/java/org/apache/beam/runners/core/BaseStepContext.java diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseStepContext.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseStepContext.java deleted file mode 100644 index 4abd4d228160..000000000000 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseStepContext.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.core; - -/** - * Base class for implementations of {@link StepContext}. - * - *

To complete a concrete subclass, implement {@link #timerInternals} and - * {@link #stateInternals}. - */ -public abstract class BaseStepContext implements StepContext { - private final String stepName; - private final String transformName; - - public BaseStepContext(String stepName, String transformName) { - this.stepName = stepName; - this.transformName = transformName; - } - - @Override - public abstract StateInternals stateInternals(); - - @Override - public abstract TimerInternals timerInternals(); -} diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java index 59e5857c3aa1..f331b65abcb8 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java @@ -63,7 +63,7 @@ public class SimpleDoFnRunnerTest { @Rule public ExpectedException thrown = ExpectedException.none(); @Mock - BaseStepContext mockStepContext; + StepContext mockStepContext; @Mock TimerInternals mockTimerInternals; diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java index 62a657882cc6..4f155dca4c3a 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java @@ -69,7 +69,7 @@ public class StatefulDoFnRunnerTest { new IntervalWindow(new Instant(10), new Instant(20)); @Mock - BaseStepContext mockStepContext; + StepContext mockStepContext; private InMemoryStateInternals stateInternals; private InMemoryTimerInternals timerInternals; diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java index 845256539eb9..cca57193fbd2 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java @@ -17,11 +17,8 @@ */ package org.apache.beam.runners.direct; -import java.util.Collection; -import java.util.Collections; import java.util.LinkedHashMap; import java.util.Map; -import org.apache.beam.runners.core.BaseStepContext; import org.apache.beam.runners.core.StepContext; import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate; From 0dc0334a0c1350c1693019f104dac911a618c9c8 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Mon, 22 May 2017 17:49:01 -0700 Subject: [PATCH 13/15] Shorten excessive name in DirectExecutionContext --- .../runners/direct/DirectExecutionContext.java | 2 +- .../GroupAlsoByWindowEvaluatorFactory.java | 2 +- .../runners/direct/ParDoEvaluatorFactory.java | 2 +- ...plittableProcessElementsEvaluatorFactory.java | 2 +- .../direct/StatefulParDoEvaluatorFactory.java | 2 +- .../runners/direct/EvaluationContextTest.java | 16 ++++++++-------- .../beam/runners/direct/ParDoEvaluatorTest.java | 2 +- .../StatefulParDoEvaluatorFactoryTest.java | 4 ++-- 8 files changed, 16 insertions(+), 16 deletions(-) diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java index cca57193fbd2..11c1b86a559c 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java @@ -55,7 +55,7 @@ private DirectStepContext createStepContext(String stepName, String transformNam /** * Returns the {@link StepContext} associated with the given step. */ - public DirectStepContext getOrCreateStepContext(String stepName, String transformName) { + public DirectStepContext getStepContext(String stepName, String transformName) { final String finalStepName = stepName; final String finalTransformName = transformName; DirectStepContext context = cachedStepContexts.get(stepName); diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java index 78ef7fe5d6da..49b75129b45c 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java @@ -129,7 +129,7 @@ public GroupAlsoByWindowEvaluator( structuralKey = inputBundle.getKey(); stepContext = evaluationContext .getExecutionContext(application, inputBundle.getKey()) - .getOrCreateStepContext( + .getStepContext( evaluationContext.getStepName(application), application.getTransform().getName()); windowingStrategy = (WindowingStrategy) diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java index 74470bfb8b8d..12c6751f8e7e 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java @@ -112,7 +112,7 @@ DoFnLifecycleManagerRemovingTransformEvaluator createEvaluator( DirectStepContext stepContext = evaluationContext .getExecutionContext(application, inputBundleKey) - .getOrCreateStepContext(stepName, stepName); + .getStepContext(stepName, stepName); DoFnLifecycleManager fnManager = fnClones.getUnchecked(doFn); diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java index dc85d87bc93f..13d9345094fe 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java @@ -109,7 +109,7 @@ public void cleanup() throws Exception { final DirectExecutionContext.DirectStepContext stepContext = evaluationContext .getExecutionContext(application, inputBundle.getKey()) - .getOrCreateStepContext(stepName, stepName); + .getStepContext(stepName, stepName); final ParDoEvaluator>> parDoEvaluator = diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java index 985c3be4e9e9..70d0cf5bf8f1 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java @@ -163,7 +163,7 @@ public Runnable load( evaluationContext .getExecutionContext( transformOutputWindow.getTransform(), transformOutputWindow.getKey()) - .getOrCreateStepContext(stepName, stepName); + .getStepContext(stepName, stepName); final StateNamespace namespace = StateNamespaces.window( diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java index 72b1bbcba188..0e2be8d4d5b7 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java @@ -160,7 +160,7 @@ public void getExecutionContextSameStepSameKeyState() { StateTag> intBag = StateTags.bag("myBag", VarIntCoder.of()); - DirectStepContext stepContext = fooContext.getOrCreateStepContext("s1", "s1"); + DirectStepContext stepContext = fooContext.getStepContext("s1", "s1"); stepContext.stateInternals().state(StateNamespaces.global(), intBag).add(1); context.handleResult( @@ -177,7 +177,7 @@ public void getExecutionContextSameStepSameKeyState() { StructuralKey.of("foo", StringUtf8Coder.of())); assertThat( secondFooContext - .getOrCreateStepContext("s1", "s1") + .getStepContext("s1", "s1") .stateInternals() .state(StateNamespaces.global(), intBag) .read(), @@ -194,7 +194,7 @@ public void getExecutionContextDifferentKeysIndependentState() { StateTag> intBag = StateTags.bag("myBag", VarIntCoder.of()); fooContext - .getOrCreateStepContext("s1", "s1") + .getStepContext("s1", "s1") .stateInternals() .state(StateNamespaces.global(), intBag) .add(1); @@ -205,7 +205,7 @@ public void getExecutionContextDifferentKeysIndependentState() { assertThat(barContext, not(equalTo(fooContext))); assertThat( barContext - .getOrCreateStepContext("s1", "s1") + .getStepContext("s1", "s1") .stateInternals() .state(StateNamespaces.global(), intBag) .read(), @@ -221,7 +221,7 @@ public void getExecutionContextDifferentStepsIndependentState() { StateTag> intBag = StateTags.bag("myBag", VarIntCoder.of()); fooContext - .getOrCreateStepContext("s1", "s1") + .getStepContext("s1", "s1") .stateInternals() .state(StateNamespaces.global(), intBag) .add(1); @@ -230,7 +230,7 @@ public void getExecutionContextDifferentStepsIndependentState() { context.getExecutionContext(downstreamProducer, myKey); assertThat( barContext - .getOrCreateStepContext("s1", "s1") + .getStepContext("s1", "s1") .stateInternals() .state(StateNamespaces.global(), intBag) .read(), @@ -246,7 +246,7 @@ public void handleResultStoresState() { StateTag> intBag = StateTags.bag("myBag", VarIntCoder.of()); CopyOnAccessInMemoryStateInternals state = - fooContext.getOrCreateStepContext("s1", "s1").stateInternals(); + fooContext.getStepContext("s1", "s1").stateInternals(); BagState bag = state.state(StateNamespaces.global(), intBag); bag.add(1); bag.add(2); @@ -266,7 +266,7 @@ public void handleResultStoresState() { context.getExecutionContext(downstreamProducer, myKey); CopyOnAccessInMemoryStateInternals afterResultState = - afterResultContext.getOrCreateStepContext("s1", "s1").stateInternals(); + afterResultContext.getStepContext("s1", "s1").stateInternals(); assertThat(afterResultState.state(StateNamespaces.global(), intBag).read(), contains(1, 2, 4)); } diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java index 286e44d1be04..22b3b7e49096 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java @@ -140,7 +140,7 @@ private ParDoEvaluator createEvaluator( DirectExecutionContext executionContext = mock(DirectExecutionContext.class); DirectStepContext stepContext = mock(DirectStepContext.class); when( - executionContext.getOrCreateStepContext( + executionContext.getStepContext( Mockito.any(String.class), Mockito.any(String.class))) .thenReturn(stepContext); when(stepContext.getTimerUpdate()).thenReturn(TimerUpdate.empty()); diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java index eb54d5c818dc..b233c1bfe868 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java @@ -153,7 +153,7 @@ public void process(ProcessContext c) {} when(mockEvaluationContext.getExecutionContext( eq(producingTransform), Mockito.any())) .thenReturn(mockExecutionContext); - when(mockExecutionContext.getOrCreateStepContext(anyString(), anyString())) + when(mockExecutionContext.getStepContext(anyString(), anyString())) .thenReturn(mockStepContext); IntervalWindow firstWindow = new IntervalWindow(new Instant(0), new Instant(9)); @@ -269,7 +269,7 @@ public void process(ProcessContext c) {} when(mockEvaluationContext.getExecutionContext( eq(producingTransform), Mockito.any())) .thenReturn(mockExecutionContext); - when(mockExecutionContext.getOrCreateStepContext(anyString(), anyString())) + when(mockExecutionContext.getStepContext(anyString(), anyString())) .thenReturn(mockStepContext); when(mockEvaluationContext.createBundle(Matchers.>any())) .thenReturn(mockUncommittedBundle); From d425b2792f754ed6150f7b47eddf743286a45401 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Tue, 23 May 2017 11:10:42 -0700 Subject: [PATCH 14/15] Revise StepContext javadoc --- .../java/org/apache/beam/runners/core/StepContext.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StepContext.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StepContext.java index 60fc40291452..4d66d668c7ff 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StepContext.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StepContext.java @@ -18,7 +18,12 @@ package org.apache.beam.runners.core; /** - * Per-step, per-key context used for retrieving state. + * The context in which a specific step is executing, including access to state and timers. + * + *

This interface exists as the API between a runner and the support code, but is not user + * facing. + * + *

These will often be scoped to a particular step and key, though it is not required. */ public interface StepContext { From b32a1c350398a91b1b1552d5257dab6ab7d1da3a Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Tue, 23 May 2017 11:13:19 -0700 Subject: [PATCH 15/15] Remove unused pieces of DirectStepContext --- .../runners/direct/DirectExecutionContext.java | 18 +++++------------- .../GroupAlsoByWindowEvaluatorFactory.java | 2 +- .../runners/direct/ParDoEvaluatorFactory.java | 2 +- ...ittableProcessElementsEvaluatorFactory.java | 2 +- .../direct/StatefulParDoEvaluatorFactory.java | 2 +- .../runners/direct/EvaluationContextTest.java | 16 ++++++++-------- .../runners/direct/ParDoEvaluatorTest.java | 2 +- .../StatefulParDoEvaluatorFactoryTest.java | 4 ++-- 8 files changed, 20 insertions(+), 28 deletions(-) diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java index 11c1b86a559c..e8ad8d75f351 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java @@ -48,19 +48,17 @@ public DirectExecutionContext( this.watermarks = watermarks; } - private DirectStepContext createStepContext(String stepName, String transformName) { - return new DirectStepContext(stepName, transformName); + private DirectStepContext createStepContext() { + return new DirectStepContext(); } /** * Returns the {@link StepContext} associated with the given step. */ - public DirectStepContext getStepContext(String stepName, String transformName) { - final String finalStepName = stepName; - final String finalTransformName = transformName; + public DirectStepContext getStepContext(String stepName) { DirectStepContext context = cachedStepContexts.get(stepName); if (context == null) { - context = createStepContext(finalStepName, finalTransformName); + context = createStepContext(); cachedStepContexts.put(stepName, context); } return context; @@ -72,14 +70,8 @@ public DirectStepContext getStepContext(String stepName, String transformName) { public class DirectStepContext implements StepContext { private CopyOnAccessInMemoryStateInternals stateInternals; private DirectTimerInternals timerInternals; - private final String stepName; - private final String transformName; - public DirectStepContext( - String stepName, String transformName) { - this.stepName = stepName; - this.transformName = transformName; - } + public DirectStepContext() { } @Override public CopyOnAccessInMemoryStateInternals stateInternals() { diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java index 49b75129b45c..1a588eedf895 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java @@ -130,7 +130,7 @@ public GroupAlsoByWindowEvaluator( stepContext = evaluationContext .getExecutionContext(application, inputBundle.getKey()) .getStepContext( - evaluationContext.getStepName(application), application.getTransform().getName()); + evaluationContext.getStepName(application)); windowingStrategy = (WindowingStrategy) application.getTransform().getInputWindowingStrategy(); diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java index 12c6751f8e7e..8aa75cf1445e 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java @@ -112,7 +112,7 @@ DoFnLifecycleManagerRemovingTransformEvaluator createEvaluator( DirectStepContext stepContext = evaluationContext .getExecutionContext(application, inputBundleKey) - .getStepContext(stepName, stepName); + .getStepContext(stepName); DoFnLifecycleManager fnManager = fnClones.getUnchecked(doFn); diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java index 13d9345094fe..b85f481c1489 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java @@ -109,7 +109,7 @@ public void cleanup() throws Exception { final DirectExecutionContext.DirectStepContext stepContext = evaluationContext .getExecutionContext(application, inputBundle.getKey()) - .getStepContext(stepName, stepName); + .getStepContext(stepName); final ParDoEvaluator>> parDoEvaluator = diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java index 70d0cf5bf8f1..506c84cec639 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java @@ -163,7 +163,7 @@ public Runnable load( evaluationContext .getExecutionContext( transformOutputWindow.getTransform(), transformOutputWindow.getKey()) - .getStepContext(stepName, stepName); + .getStepContext(stepName); final StateNamespace namespace = StateNamespaces.window( diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java index 0e2be8d4d5b7..80b04f80a06a 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java @@ -160,7 +160,7 @@ public void getExecutionContextSameStepSameKeyState() { StateTag> intBag = StateTags.bag("myBag", VarIntCoder.of()); - DirectStepContext stepContext = fooContext.getStepContext("s1", "s1"); + DirectStepContext stepContext = fooContext.getStepContext("s1"); stepContext.stateInternals().state(StateNamespaces.global(), intBag).add(1); context.handleResult( @@ -177,7 +177,7 @@ public void getExecutionContextSameStepSameKeyState() { StructuralKey.of("foo", StringUtf8Coder.of())); assertThat( secondFooContext - .getStepContext("s1", "s1") + .getStepContext("s1") .stateInternals() .state(StateNamespaces.global(), intBag) .read(), @@ -194,7 +194,7 @@ public void getExecutionContextDifferentKeysIndependentState() { StateTag> intBag = StateTags.bag("myBag", VarIntCoder.of()); fooContext - .getStepContext("s1", "s1") + .getStepContext("s1") .stateInternals() .state(StateNamespaces.global(), intBag) .add(1); @@ -205,7 +205,7 @@ public void getExecutionContextDifferentKeysIndependentState() { assertThat(barContext, not(equalTo(fooContext))); assertThat( barContext - .getStepContext("s1", "s1") + .getStepContext("s1") .stateInternals() .state(StateNamespaces.global(), intBag) .read(), @@ -221,7 +221,7 @@ public void getExecutionContextDifferentStepsIndependentState() { StateTag> intBag = StateTags.bag("myBag", VarIntCoder.of()); fooContext - .getStepContext("s1", "s1") + .getStepContext("s1") .stateInternals() .state(StateNamespaces.global(), intBag) .add(1); @@ -230,7 +230,7 @@ public void getExecutionContextDifferentStepsIndependentState() { context.getExecutionContext(downstreamProducer, myKey); assertThat( barContext - .getStepContext("s1", "s1") + .getStepContext("s1") .stateInternals() .state(StateNamespaces.global(), intBag) .read(), @@ -246,7 +246,7 @@ public void handleResultStoresState() { StateTag> intBag = StateTags.bag("myBag", VarIntCoder.of()); CopyOnAccessInMemoryStateInternals state = - fooContext.getStepContext("s1", "s1").stateInternals(); + fooContext.getStepContext("s1").stateInternals(); BagState bag = state.state(StateNamespaces.global(), intBag); bag.add(1); bag.add(2); @@ -266,7 +266,7 @@ public void handleResultStoresState() { context.getExecutionContext(downstreamProducer, myKey); CopyOnAccessInMemoryStateInternals afterResultState = - afterResultContext.getStepContext("s1", "s1").stateInternals(); + afterResultContext.getStepContext("s1").stateInternals(); assertThat(afterResultState.state(StateNamespaces.global(), intBag).read(), contains(1, 2, 4)); } diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java index 22b3b7e49096..09a21ac524a5 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java @@ -141,7 +141,7 @@ private ParDoEvaluator createEvaluator( DirectStepContext stepContext = mock(DirectStepContext.class); when( executionContext.getStepContext( - Mockito.any(String.class), Mockito.any(String.class))) + Mockito.any(String.class))) .thenReturn(stepContext); when(stepContext.getTimerUpdate()).thenReturn(TimerUpdate.empty()); when( diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java index b233c1bfe868..9366b7c9ff8c 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java @@ -153,7 +153,7 @@ public void process(ProcessContext c) {} when(mockEvaluationContext.getExecutionContext( eq(producingTransform), Mockito.any())) .thenReturn(mockExecutionContext); - when(mockExecutionContext.getStepContext(anyString(), anyString())) + when(mockExecutionContext.getStepContext(anyString())) .thenReturn(mockStepContext); IntervalWindow firstWindow = new IntervalWindow(new Instant(0), new Instant(9)); @@ -269,7 +269,7 @@ public void process(ProcessContext c) {} when(mockEvaluationContext.getExecutionContext( eq(producingTransform), Mockito.any())) .thenReturn(mockExecutionContext); - when(mockExecutionContext.getStepContext(anyString(), anyString())) + when(mockExecutionContext.getStepContext(anyString())) .thenReturn(mockStepContext); when(mockEvaluationContext.createBundle(Matchers.>any())) .thenReturn(mockUncommittedBundle);