From d1ca82faf585fb27a54a14520f3d95bd20dd8403 Mon Sep 17 00:00:00 2001 From: Luke Cwik Date: Fri, 23 Jun 2017 14:31:58 -0700 Subject: [PATCH 1/4] [BEAM-1347] Rename DoFnRunnerFactory to FnApiDoFnRunner. --- .../core/{DoFnRunnerFactory.java => FnApiDoFnRunner.java} | 2 +- ...oFnRunnerFactoryTest.java => FnApiDoFnRunnerTest.java} | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) rename sdks/java/harness/src/main/java/org/apache/beam/runners/core/{DoFnRunnerFactory.java => FnApiDoFnRunner.java} (99%) rename sdks/java/harness/src/test/java/org/apache/beam/runners/core/{DoFnRunnerFactoryTest.java => FnApiDoFnRunnerTest.java} (97%) diff --git a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/DoFnRunnerFactory.java b/sdks/java/harness/src/main/java/org/apache/beam/runners/core/FnApiDoFnRunner.java similarity index 99% rename from sdks/java/harness/src/main/java/org/apache/beam/runners/core/DoFnRunnerFactory.java rename to sdks/java/harness/src/main/java/org/apache/beam/runners/core/FnApiDoFnRunner.java index 3c0b6ebcb408..adf735ada1dd 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/DoFnRunnerFactory.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/runners/core/FnApiDoFnRunner.java @@ -54,7 +54,7 @@ *

TODO: Move DoFnRunners into SDK harness and merge the methods below into it removing this * class. */ -public class DoFnRunnerFactory { +public class FnApiDoFnRunner { private static final String URN = "urn:org.apache.beam:dofn:java:0.1"; diff --git a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/DoFnRunnerFactoryTest.java b/sdks/java/harness/src/test/java/org/apache/beam/runners/core/FnApiDoFnRunnerTest.java similarity index 97% rename from sdks/java/harness/src/test/java/org/apache/beam/runners/core/DoFnRunnerFactoryTest.java rename to sdks/java/harness/src/test/java/org/apache/beam/runners/core/FnApiDoFnRunnerTest.java index 62646ffa9710..ae5cbacbec16 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/DoFnRunnerFactoryTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/runners/core/FnApiDoFnRunnerTest.java @@ -62,9 +62,9 @@ import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -/** Tests for {@link DoFnRunnerFactory}. */ +/** Tests for {@link FnApiDoFnRunner}. */ @RunWith(JUnit4.class) -public class DoFnRunnerFactoryTest { +public class FnApiDoFnRunnerTest { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final Coder> STRING_CODER = @@ -155,7 +155,7 @@ public void testCreatingAndProcessingDoFn() throws Exception { List startFunctions = new ArrayList<>(); List finishFunctions = new ArrayList<>(); - new DoFnRunnerFactory.Factory<>().createRunnerForPTransform( + new FnApiDoFnRunner.Factory<>().createRunnerForPTransform( PipelineOptionsFactory.create(), null /* beamFnDataClient */, pTransformId, @@ -199,7 +199,7 @@ public void testCreatingAndProcessingDoFn() throws Exception { public void testRegistration() { for (Registrar registrar : ServiceLoader.load(Registrar.class)) { - if (registrar instanceof DoFnRunnerFactory.Registrar) { + if (registrar instanceof FnApiDoFnRunner.Registrar) { assertThat(registrar.getPTransformRunnerFactories(), IsMapContaining.hasKey(URN)); return; } From f0c13f5323c24b94fa7eaab7721592073314f503 Mon Sep 17 00:00:00 2001 From: Luke Cwik Date: Fri, 23 Jun 2017 14:34:36 -0700 Subject: [PATCH 2/4] [BEAM-1347] Add DoFnRunner specific to Fn Api. --- sdks/java/harness/pom.xml | 5 + .../beam/runners/core/FnApiDoFnRunner.java | 468 +++++++++++++++--- .../runners/core/FnApiDoFnRunnerTest.java | 7 +- 3 files changed, 418 insertions(+), 62 deletions(-) diff --git a/sdks/java/harness/pom.xml b/sdks/java/harness/pom.xml index 9cfadc215eda..198b592c122f 100644 --- a/sdks/java/harness/pom.xml +++ b/sdks/java/harness/pom.xml @@ -149,6 +149,11 @@ linux-x86_64 + + joda-time + joda-time + + org.slf4j slf4j-api diff --git a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/FnApiDoFnRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/runners/core/FnApiDoFnRunner.java index adf735ada1dd..2b7b3528e126 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/FnApiDoFnRunner.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/runners/core/FnApiDoFnRunner.java @@ -27,49 +27,59 @@ import com.google.protobuf.ByteString; import com.google.protobuf.BytesValue; import com.google.protobuf.InvalidProtocolBufferException; -import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; +import java.util.Iterator; import java.util.Map; import java.util.Objects; import java.util.function.Consumer; import java.util.function.Supplier; import org.apache.beam.fn.harness.data.BeamFnDataClient; -import org.apache.beam.fn.harness.fake.FakeStepContext; import org.apache.beam.fn.harness.fn.ThrowingConsumer; import org.apache.beam.fn.harness.fn.ThrowingRunnable; -import org.apache.beam.runners.core.DoFnRunners.OutputManager; +import org.apache.beam.runners.core.construction.ParDoTranslation; import org.apache.beam.runners.dataflow.util.DoFnInfo; import org.apache.beam.sdk.common.runner.v1.RunnerApi; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.state.State; +import org.apache.beam.sdk.state.TimeDomain; +import org.apache.beam.sdk.state.Timer; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.OnTimerContext; +import org.apache.beam.sdk.transforms.DoFn.ProcessContext; +import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; +import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.SerializableUtils; +import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowingStrategy; +import org.joda.time.Instant; /** - * Classes associated with converting {@link RunnerApi.PTransform}s to {@link DoFnRunner}s. - * - *

TODO: Move DoFnRunners into SDK harness and merge the methods below into it removing this - * class. + * A {@link DoFnRunner} specific to integrating with the Fn Api. This is to remove the layers + * of abstraction caused by StateInternals/TimerInternals since they model state and timer + * concepts differently. */ -public class FnApiDoFnRunner { - - private static final String URN = "urn:org.apache.beam:dofn:java:0.1"; - - /** A registrar which provides a factory to handle Java {@link DoFn}s. */ +public class FnApiDoFnRunner implements DoFnRunner { + /** + * A registrar which provides a factory to handle Java {@link DoFn}s. + */ @AutoService(PTransformRunnerFactory.Registrar.class) public static class Registrar implements PTransformRunnerFactory.Registrar { @Override public Map getPTransformRunnerFactories() { - return ImmutableMap.of(URN, new Factory()); + return ImmutableMap.of(ParDoTranslation.CUSTOM_JAVA_DO_FN_URN, new Factory()); } } - /** A factory for {@link DoFnRunner}s. */ + /** A factory for {@link FnApiDoFnRunner}. */ static class Factory implements PTransformRunnerFactory> { @@ -105,9 +115,9 @@ public DoFnRunner createRunnerForPTransform( throw new IllegalArgumentException( String.format("Unable to unwrap DoFn %s", pTransform.getSpec()), e); } - DoFnInfo doFnInfo = - (DoFnInfo) - SerializableUtils.deserializeFromByteArray(serializedFn.toByteArray(), "DoFnInfo"); + @SuppressWarnings({"unchecked", "rawtypes"}) + DoFnInfo doFnInfo = (DoFnInfo) SerializableUtils.deserializeFromByteArray( + serializedFn.toByteArray(), "DoFnInfo"); // Verify that the DoFnInfo tag to output map matches the output map on the PTransform. checkArgument( @@ -119,54 +129,26 @@ public DoFnRunner createRunnerForPTransform( doFnInfo.getOutputMap()); ImmutableMultimap.Builder, - ThrowingConsumer>> tagToOutput = + ThrowingConsumer>> tagToOutputMapBuilder = ImmutableMultimap.builder(); for (Map.Entry> entry : doFnInfo.getOutputMap().entrySet()) { @SuppressWarnings({"unchecked", "rawtypes"}) - Collection>> consumers = - (Collection) outputMap.get(Long.toString(entry.getKey())); - tagToOutput.putAll(entry.getValue(), consumers); + Collection>> consumers = + outputMap.get(Long.toString(entry.getKey())); + tagToOutputMapBuilder.putAll(entry.getValue(), consumers); } + ImmutableMultimap, ThrowingConsumer>> tagToOutputMap = + tagToOutputMapBuilder.build(); + @SuppressWarnings({"unchecked", "rawtypes"}) - Map, Collection>>> tagBasedOutputMap = - (Map) tagToOutput.build().asMap(); - - OutputManager outputManager = - new OutputManager() { - Map, Collection>>> tupleTagToOutput = - tagBasedOutputMap; - - @Override - public void output(TupleTag tag, WindowedValue output) { - try { - Collection>> consumers = - tupleTagToOutput.get(tag); - if (consumers == null) { - /* This is a normal case, e.g., if a DoFn has output but that output is not - * consumed. Drop the output. */ - return; - } - for (ThrowingConsumer> consumer : consumers) { - consumer.accept(output); - } - } catch (Throwable t) { - throw new RuntimeException(t); - } - } - }; - - @SuppressWarnings({"unchecked", "rawtypes", "deprecation"}) - DoFnRunner runner = - DoFnRunners.simpleRunner( - pipelineOptions, - (DoFn) doFnInfo.getDoFn(), - NullSideInputReader.empty(), /* TODO */ - outputManager, - (TupleTag) doFnInfo.getOutputMap().get(doFnInfo.getMainOutput()), - new ArrayList<>(doFnInfo.getOutputMap().values()), - new FakeStepContext(), - (WindowingStrategy) doFnInfo.getWindowingStrategy()); + DoFnRunner runner = new FnApiDoFnRunner<>( + pipelineOptions, + doFnInfo.getDoFn(), + (Collection>>) (Collection) + tagToOutputMap.get(doFnInfo.getOutputMap().get(doFnInfo.getMainOutput())), + tagToOutputMap, + doFnInfo.getWindowingStrategy()); // Register the appropriate handlers. addStartFunction.accept(runner::startBundle); @@ -179,4 +161,372 @@ public void output(TupleTag tag, WindowedValue output) { return runner; } } + + ////////////////////////////////////////////////////////////////////////////////////////////////// + + private final PipelineOptions pipelineOptions; + private final DoFn doFn; + private final Collection>> mainOutputConsumers; + private final Multimap, ThrowingConsumer>> outputMap; + private final DoFnInvoker doFnInvoker; + private final StartBundleContext startBundleContext; + private final ProcessBundleContext processBundleContext; + private final FinishBundleContext finishBundleContext; + + /** + * The lifetime of this member is only valid during {@link #processElement(WindowedValue)}. + */ + private WindowedValue currentElement; + + /** + * The lifetime of this member is only valid during {@link #processElement(WindowedValue)}. + */ + private BoundedWindow currentWindow; + + FnApiDoFnRunner( + PipelineOptions pipelineOptions, + DoFn doFn, + Collection>> mainOutputConsumers, + Multimap, ThrowingConsumer>> outputMap, + WindowingStrategy windowingStrategy) { + this.pipelineOptions = pipelineOptions; + this.doFn = doFn; + this.mainOutputConsumers = mainOutputConsumers; + this.outputMap = outputMap; + this.doFnInvoker = DoFnInvokers.invokerFor(doFn); + this.startBundleContext = new StartBundleContext(); + this.processBundleContext = new ProcessBundleContext(); + this.finishBundleContext = new FinishBundleContext(); + } + + @Override + public void startBundle() { + doFnInvoker.invokeStartBundle(startBundleContext); + } + + @Override + public void processElement(WindowedValue elem) { + currentElement = elem; + try { + Iterator windowIterator = + (Iterator) elem.getWindows().iterator(); + while (windowIterator.hasNext()) { + currentWindow = windowIterator.next(); + doFnInvoker.invokeProcessElement(processBundleContext); + } + } finally { + currentElement = null; + currentWindow = null; + } + } + + @Override + public void onTimer( + String timerId, + BoundedWindow window, + Instant timestamp, + TimeDomain timeDomain) { + throw new UnsupportedOperationException("TODO: Add support for timers"); + } + + @Override + public void finishBundle() { + doFnInvoker.invokeFinishBundle(finishBundleContext); + } + + /** + * Outputs the given element to the specified set of consumers wrapping any exceptions. + */ + private void outputTo( + Collection>> consumers, + WindowedValue output) { + Iterator>> consumerIterator; + try { + for (ThrowingConsumer> consumer : consumers) { + consumer.accept(output); + } + } catch (Throwable t) { + throw UserCodeException.wrap(t); + } + } + + /** + * Provides arguments for a {@link DoFnInvoker} for {@link DoFn.StartBundle @StartBundle}. + */ + private class StartBundleContext + extends DoFn.StartBundleContext + implements DoFnInvoker.ArgumentProvider { + + private StartBundleContext() { + doFn.super(); + } + + @Override + public PipelineOptions getPipelineOptions() { + return pipelineOptions; + } + + @Override + public BoundedWindow window() { + throw new UnsupportedOperationException( + "Cannot access window outside of @ProcessElement and @OnTimer methods."); + } + + @Override + public DoFn.StartBundleContext startBundleContext( + DoFn doFn) { + return this; + } + + @Override + public DoFn.FinishBundleContext finishBundleContext( + DoFn doFn) { + throw new UnsupportedOperationException( + "Cannot access FinishBundleContext outside of @FinishBundle method."); + } + + @Override + public DoFn.ProcessContext processContext(DoFn doFn) { + throw new UnsupportedOperationException( + "Cannot access ProcessContext outside of @ProcessElement method."); + } + + @Override + public DoFn.OnTimerContext onTimerContext(DoFn doFn) { + throw new UnsupportedOperationException( + "Cannot access OnTimerContext outside of @OnTimer methods."); + } + + @Override + public RestrictionTracker restrictionTracker() { + throw new UnsupportedOperationException( + "Cannot access RestrictionTracker outside of @ProcessElement method."); + } + + @Override + public State state(String stateId) { + throw new UnsupportedOperationException( + "Cannot access state outside of @ProcessElement and @OnTimer methods."); + } + + @Override + public Timer timer(String timerId) { + throw new UnsupportedOperationException( + "Cannot access timers outside of @ProcessElement and @OnTimer methods."); + } + } + + /** + * Provides arguments for a {@link DoFnInvoker} for {@link DoFn.ProcessElement @ProcessElement}. + */ + private class ProcessBundleContext + extends DoFn.ProcessContext + implements DoFnInvoker.ArgumentProvider { + + private ProcessBundleContext() { + doFn.super(); + } + + @Override + public BoundedWindow window() { + return currentWindow; + } + + @Override + public DoFn.StartBundleContext startBundleContext(DoFn doFn) { + throw new UnsupportedOperationException( + "Cannot access StartBundleContext outside of @StartBundle method."); + } + + @Override + public DoFn.FinishBundleContext finishBundleContext(DoFn doFn) { + throw new UnsupportedOperationException( + "Cannot access FinishBundleContext outside of @FinishBundle method."); + } + + @Override + public ProcessContext processContext(DoFn doFn) { + return this; + } + + @Override + public OnTimerContext onTimerContext(DoFn doFn) { + throw new UnsupportedOperationException("TODO: Add support for timers"); + } + + @Override + public RestrictionTracker restrictionTracker() { + throw new UnsupportedOperationException("TODO: Add support for SplittableDoFn"); + } + + @Override + public State state(String stateId) { + throw new UnsupportedOperationException("TODO: Add support for state"); + } + + @Override + public Timer timer(String timerId) { + throw new UnsupportedOperationException("TODO: Add support for timers"); + } + + @Override + public PipelineOptions getPipelineOptions() { + return pipelineOptions; + } + + @Override + public void output(OutputT output) { + outputTo(mainOutputConsumers, + WindowedValue.of( + output, + currentElement.getTimestamp(), + currentWindow, + currentElement.getPane())); + } + + @Override + public void outputWithTimestamp(OutputT output, Instant timestamp) { + outputTo(mainOutputConsumers, + WindowedValue.of( + output, + timestamp, + currentWindow, + currentElement.getPane())); + } + + @Override + public void output(TupleTag tag, T output) { + Collection>> consumers = (Collection) outputMap.get(tag); + if (consumers == null) { + throw new IllegalArgumentException(String.format("Unknown output tag %s", tag)); + } + outputTo(consumers, + WindowedValue.of( + output, + currentElement.getTimestamp(), + currentWindow, + currentElement.getPane())); + } + + @Override + public void outputWithTimestamp(TupleTag tag, T output, Instant timestamp) { + Collection>> consumers = (Collection) outputMap.get(tag); + if (consumers == null) { + throw new IllegalArgumentException(String.format("Unknown output tag %s", tag)); + } + outputTo(consumers, + WindowedValue.of( + output, + timestamp, + currentWindow, + currentElement.getPane())); + } + + @Override + public InputT element() { + return currentElement.getValue(); + } + + @Override + public T sideInput(PCollectionView view) { + throw new UnsupportedOperationException("TODO: Support side inputs"); + } + + @Override + public Instant timestamp() { + return currentElement.getTimestamp(); + } + + @Override + public PaneInfo pane() { + return currentElement.getPane(); + } + + @Override + public void updateWatermark(Instant watermark) { + throw new UnsupportedOperationException("TODO: Add support for SplittableDoFn"); + } + } + + /** + * Provides arguments for a {@link DoFnInvoker} for {@link DoFn.FinishBundle @FinishBundle}. + */ + private class FinishBundleContext + extends DoFn.FinishBundleContext + implements DoFnInvoker.ArgumentProvider { + + private FinishBundleContext() { + doFn.super(); + } + + @Override + public PipelineOptions getPipelineOptions() { + return pipelineOptions; + } + + @Override + public BoundedWindow window() { + throw new UnsupportedOperationException( + "Cannot access window outside of @ProcessElement and @OnTimer methods."); + } + + @Override + public DoFn.StartBundleContext startBundleContext( + DoFn doFn) { + throw new UnsupportedOperationException( + "Cannot access StartBundleContext outside of @StartBundle method."); + } + + @Override + public DoFn.FinishBundleContext finishBundleContext( + DoFn doFn) { + return this; + } + + @Override + public DoFn.ProcessContext processContext(DoFn doFn) { + throw new UnsupportedOperationException( + "Cannot access ProcessContext outside of @ProcessElement method."); + } + + @Override + public DoFn.OnTimerContext onTimerContext(DoFn doFn) { + throw new UnsupportedOperationException( + "Cannot access OnTimerContext outside of @OnTimer methods."); + } + + @Override + public RestrictionTracker restrictionTracker() { + throw new UnsupportedOperationException( + "Cannot access RestrictionTracker outside of @ProcessElement method."); + } + + @Override + public State state(String stateId) { + throw new UnsupportedOperationException( + "Cannot access state outside of @ProcessElement and @OnTimer methods."); + } + + @Override + public Timer timer(String timerId) { + throw new UnsupportedOperationException( + "Cannot access timers outside of @ProcessElement and @OnTimer methods."); + } + + @Override + public void output(OutputT output, Instant timestamp, BoundedWindow window) { + outputTo(mainOutputConsumers, + WindowedValue.of(output, timestamp, window, PaneInfo.NO_FIRING)); + } + + @Override + public void output(TupleTag tag, T output, Instant timestamp, BoundedWindow window) { + Collection>> consumers = (Collection) outputMap.get(tag); + if (consumers == null) { + throw new IllegalArgumentException(String.format("Unknown output tag %s", tag)); + } + outputTo(consumers, + WindowedValue.of(output, timestamp, window, PaneInfo.NO_FIRING)); + } + } } diff --git a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/FnApiDoFnRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/runners/core/FnApiDoFnRunnerTest.java index ae5cbacbec16..c4df77af8b29 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/FnApiDoFnRunnerTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/runners/core/FnApiDoFnRunnerTest.java @@ -44,6 +44,7 @@ import org.apache.beam.fn.harness.fn.ThrowingConsumer; import org.apache.beam.fn.harness.fn.ThrowingRunnable; import org.apache.beam.runners.core.PTransformRunnerFactory.Registrar; +import org.apache.beam.runners.core.construction.ParDoTranslation; import org.apache.beam.runners.dataflow.util.CloudObjects; import org.apache.beam.runners.dataflow.util.DoFnInfo; import org.apache.beam.sdk.coders.Coder; @@ -71,7 +72,6 @@ public class FnApiDoFnRunnerTest { WindowedValue.getFullCoder(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE); private static final String STRING_CODER_SPEC_ID = "999L"; private static final RunnerApi.Coder STRING_CODER_SPEC; - private static final String URN = "urn:org.apache.beam:dofn:java:0.1"; static { try { @@ -132,7 +132,7 @@ public void testCreatingAndProcessingDoFn() throws Exception { Long.parseLong(mainOutputId), TestDoFn.mainOutput, Long.parseLong(additionalOutputId), TestDoFn.additionalOutput)); RunnerApi.FunctionSpec functionSpec = RunnerApi.FunctionSpec.newBuilder() - .setUrn("urn:org.apache.beam:dofn:java:0.1") + .setUrn(ParDoTranslation.CUSTOM_JAVA_DO_FN_URN) .setParameter(Any.pack(BytesValue.newBuilder() .setValue(ByteString.copyFrom(SerializableUtils.serializeToByteArray(doFnInfo))) .build())) @@ -200,7 +200,8 @@ public void testRegistration() { for (Registrar registrar : ServiceLoader.load(Registrar.class)) { if (registrar instanceof FnApiDoFnRunner.Registrar) { - assertThat(registrar.getPTransformRunnerFactories(), IsMapContaining.hasKey(URN)); + assertThat(registrar.getPTransformRunnerFactories(), + IsMapContaining.hasKey(ParDoTranslation.CUSTOM_JAVA_DO_FN_URN)); return; } } From 2ea2c2bae8c8fe9fbae83905b3365e9cca30a39d Mon Sep 17 00:00:00 2001 From: Luke Cwik Date: Fri, 7 Jul 2017 09:48:57 -0700 Subject: [PATCH 3/4] fixup! Fix issue due to ArgumentProvider requiring additional method. --- .../apache/beam/runners/core/FnApiDoFnRunner.java | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/FnApiDoFnRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/runners/core/FnApiDoFnRunner.java index 2b7b3528e126..b3cf3a76de98 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/FnApiDoFnRunner.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/runners/core/FnApiDoFnRunner.java @@ -266,6 +266,11 @@ public PipelineOptions getPipelineOptions() { return pipelineOptions; } + @Override + public PipelineOptions pipelineOptions() { + return pipelineOptions; + } + @Override public BoundedWindow window() { throw new UnsupportedOperationException( @@ -374,6 +379,11 @@ public PipelineOptions getPipelineOptions() { return pipelineOptions; } + @Override + public PipelineOptions pipelineOptions() { + return pipelineOptions; + } + @Override public void output(OutputT output) { outputTo(mainOutputConsumers, @@ -464,6 +474,11 @@ public PipelineOptions getPipelineOptions() { return pipelineOptions; } + @Override + public PipelineOptions pipelineOptions() { + return pipelineOptions; + } + @Override public BoundedWindow window() { throw new UnsupportedOperationException( From fca51dc38b8b810c0e46018eb16f77b282fdaed7 Mon Sep 17 00:00:00 2001 From: Luke Cwik Date: Fri, 7 Jul 2017 10:10:50 -0700 Subject: [PATCH 4/4] fixup! Add missing dep --- sdks/java/harness/pom.xml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/sdks/java/harness/pom.xml b/sdks/java/harness/pom.xml index 198b592c122f..fe5c2f1c0c06 100644 --- a/sdks/java/harness/pom.xml +++ b/sdks/java/harness/pom.xml @@ -81,6 +81,11 @@ beam-runners-core-java + + org.apache.beam + beam-runners-core-construction-java + + org.apache.beam beam-runners-google-cloud-dataflow-java