From b31be38a8a2f839fb37af4963512c50424ac9f13 Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Tue, 14 Jun 2016 17:51:48 -0700 Subject: [PATCH 1/2] Remove DoFnRunner from GroupAlsoByWindowsProperties DoFnRunner is a runner implementation detail, and core SDK code should instead use DoFnTester. --- .../beam/sdk/transforms/DoFnTester.java | 62 +++- .../util/GroupAlsoByWindowsProperties.java | 293 +++++++----------- 2 files changed, 173 insertions(+), 182 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java index 3ec749ecc496..fb9e4b6db820 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.transforms; import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Combine.CombineFn; @@ -25,8 +26,11 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.PTuple; import org.apache.beam.sdk.util.SerializableUtils; +import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingInternals; +import org.apache.beam.sdk.util.state.InMemoryStateInternals; +import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TimestampedValue; import org.apache.beam.sdk.values.TupleTag; @@ -40,8 +44,10 @@ import org.joda.time.Instant; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -564,9 +570,54 @@ public PaneInfo pane() { @Override public WindowingInternals windowingInternals() { - throw new UnsupportedOperationException( - "WindowingInternals is an internal implementation detail of the Beam SDK, " - + "and should not be used by user code"); + return new WindowingInternals() { + StateInternals stateInternals = InMemoryStateInternals.forKey(new Object()); + + @Override + public StateInternals stateInternals() { + return stateInternals; + } + + @Override + public void outputWindowedValue( + OutT output, + Instant timestamp, + Collection windows, + PaneInfo pane) { + context.noteOutput(mainOutputTag, WindowedValue.of(output, timestamp, windows, pane)); + } + + @Override + public TimerInternals timerInternals() { + throw + new UnsupportedOperationException("Timer Internals are not supported in DoFnTester"); + } + + @Override + public Collection windows() { + return element.getWindows(); + } + + @Override + public PaneInfo pane() { + return element.getPane(); + } + + @Override + public void writePCollectionViewData( + TupleTag tag, Iterable> data, Coder elemCoder) + throws IOException { + throw new UnsupportedOperationException( + "WritePCollectionViewData is not supported in in the context of DoFnTester"); + } + + @Override + public T sideInput( + PCollectionView view, BoundedWindow mainInputWindow) { + throw new UnsupportedOperationException( + "SideInput from WindowingInternals is not supported in in the context of DoFnTester"); + } + }; } @Override @@ -614,11 +665,6 @@ enum State { FINISHED } - /** The name of the step of a DoFnTester. */ - static final String STEP_NAME = "stepName"; - /** The name of the enclosing DoFn PTransform for a DoFnTester. */ - static final String TRANSFORM_NAME = "transformName"; - final PipelineOptions options = PipelineOptionsFactory.create(); /** The original DoFn under test. */ diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsProperties.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsProperties.java index c4f3c8b16b60..f653f497cc89 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsProperties.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsProperties.java @@ -20,12 +20,11 @@ import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; import static org.junit.Assert.assertThat; -import org.apache.beam.sdk.TestUtils.KvMatcher; -import org.apache.beam.sdk.WindowMatchers; -import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Combine.CombineFn; +import org.apache.beam.sdk.transforms.DoFnTester; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; @@ -33,8 +32,8 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.Sessions; import org.apache.beam.sdk.transforms.windowing.SlidingWindows; -import org.apache.beam.sdk.util.common.CounterSet; import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.TimestampedValue; import org.apache.beam.sdk.values.TupleTag; import com.google.common.collect.ImmutableList; @@ -43,18 +42,16 @@ import org.joda.time.Duration; import org.joda.time.Instant; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.List; /** * Properties of {@link GroupAlsoByWindowsDoFn}. * *

Some properties may not hold of some implementations, due to restrictions on the context - * in which the implementation is applicable. For example, - * {@link GroupAlsoByWindowsViaIteratorsDoFn} does not support merging window functions. + * in which the implementation is applicable. For example, some {@code GroupAlsoByWindows} may not + * support merging windows. */ public class GroupAlsoByWindowsProperties { @@ -80,13 +77,13 @@ public static void emptyInputEmptyOutput( WindowingStrategy windowingStrategy = WindowingStrategy.of(FixedWindows.of(Duration.millis(10))); - List result = runGABW( + DoFnTester>>, KV> result = runGABW( gabwFactory, windowingStrategy, (K) null, // key should never be used Collections.>emptyList()); - assertThat(result.size(), equalTo(0)); + assertThat(result.peekOutputElements(), hasSize(0)); } /** @@ -100,7 +97,7 @@ public static void groupsElementsIntoFixedWindows( WindowingStrategy windowingStrategy = WindowingStrategy.of(FixedWindows.of(Duration.millis(10))); - List>>> result = + DoFnTester>>, KV>> result = runGABW(gabwFactory, windowingStrategy, "key", WindowedValue.of( "v1", @@ -118,18 +115,17 @@ public static void groupsElementsIntoFixedWindows( Arrays.asList(window(10, 20)), PaneInfo.NO_FIRING)); - assertThat(result.size(), equalTo(2)); + assertThat(result.peekOutputElements(), hasSize(2)); - WindowedValue>> item0 = result.get(0); + TimestampedValue>> item0 = + Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(0, 10))); assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v2")); assertThat(item0.getTimestamp(), equalTo(window(0, 10).maxTimestamp())); - assertThat(item0.getWindows(), contains(window(0, 10))); - WindowedValue>> item1 = result.get(1); + TimestampedValue>> item1 = + Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(10, 20))); assertThat(item1.getValue().getValue(), contains("v3")); assertThat(item1.getTimestamp(), equalTo(window(10, 20).maxTimestamp())); - assertThat(item1.getWindows(), - contains(window(10, 20))); } /** @@ -146,7 +142,7 @@ public static void groupsElementsIntoSlidingWindowsWithMinTimestamp( SlidingWindows.of(Duration.millis(20)).every(Duration.millis(10))) .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()); - List>>> result = + DoFnTester>>, KV>> result = runGABW(gabwFactory, windowingStrategy, "key", WindowedValue.of( "v1", @@ -159,25 +155,22 @@ public static void groupsElementsIntoSlidingWindowsWithMinTimestamp( Arrays.asList(window(0, 20), window(10, 30)), PaneInfo.NO_FIRING)); - assertThat(result.size(), equalTo(3)); + assertThat(result.peekOutputElements(), hasSize(3)); - WindowedValue>> item0 = result.get(0); + TimestampedValue>> item0 = + Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(-10, 10))); assertThat(item0.getValue().getValue(), contains("v1")); assertThat(item0.getTimestamp(), equalTo(new Instant(5))); - assertThat(item0.getWindows(), - contains(window(-10, 10))); - WindowedValue>> item1 = result.get(1); + TimestampedValue>> item1 = + Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(0, 20))); assertThat(item1.getValue().getValue(), containsInAnyOrder("v1", "v2")); assertThat(item1.getTimestamp(), equalTo(new Instant(10))); - assertThat(item1.getWindows(), - contains(window(0, 20))); - WindowedValue>> item2 = result.get(2); + TimestampedValue>> item2 = + Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(10, 30))); assertThat(item2.getValue().getValue(), contains("v2")); assertThat(item2.getTimestamp(), equalTo(new Instant(20))); - assertThat(item2.getWindows(), - contains(window(10, 30))); } /** @@ -195,7 +188,7 @@ public static void combinesElementsInSlidingWindows( WindowingStrategy.of(SlidingWindows.of(Duration.millis(20)).every(Duration.millis(10))) .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()); - List>> result = + DoFnTester>>, KV> result = runGABW(gabwFactory, windowingStrategy, "k", WindowedValue.of( 1L, @@ -213,30 +206,25 @@ public static void combinesElementsInSlidingWindows( Arrays.asList(window(0, 20), window(10, 30)), PaneInfo.NO_FIRING)); - assertThat(result.size(), equalTo(3)); - - assertThat(result, contains( - WindowMatchers.isSingleWindowedValue( - KvMatcher.isKv( - equalTo("k"), - equalTo(combineFn.apply(ImmutableList.of(1L)))), - 5, // aggregate timestamp - -10, // window start - 10), // window end - WindowMatchers.isSingleWindowedValue( - KvMatcher.isKv( - equalTo("k"), - equalTo(combineFn.apply(ImmutableList.of(1L, 2L, 4L)))), - 10, // aggregate timestamp - 0, // window start - 20), // window end - WindowMatchers.isSingleWindowedValue( - KvMatcher.isKv( - equalTo("k"), - equalTo(combineFn.apply(ImmutableList.of(2L, 4L)))), - 20, // aggregate timestamp - 10, // window start - 30))); // window end + assertThat(result.peekOutputElements(), hasSize(3)); + + TimestampedValue> item0 = + Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(-10, 10))); + assertThat(item0.getValue().getKey(), equalTo("k")); + assertThat(item0.getValue().getValue(), equalTo(combineFn.apply(ImmutableList.of(1L)))); + assertThat(item0.getTimestamp(), equalTo(new Instant(5L))); + + TimestampedValue> item1 = + Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(0, 20))); + assertThat(item1.getValue().getKey(), equalTo("k")); + assertThat(item1.getValue().getValue(), equalTo(combineFn.apply(ImmutableList.of(1L, 2L, 4L)))); + assertThat(item1.getTimestamp(), equalTo(new Instant(5L))); + + TimestampedValue> item2 = + Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(10, 30))); + assertThat(item2.getValue().getKey(), equalTo("k")); + assertThat(item2.getValue().getValue(), equalTo(combineFn.apply(ImmutableList.of(2L, 4L)))); + assertThat(item2.getTimestamp(), equalTo(new Instant(15L))); } /** @@ -250,7 +238,7 @@ public static void groupsIntoOverlappingNonmergingWindows( WindowingStrategy windowingStrategy = WindowingStrategy.of(FixedWindows.of(Duration.millis(10))); - List>>> result = + DoFnTester>>, KV>> result = runGABW(gabwFactory, windowingStrategy, "key", WindowedValue.of( "v1", @@ -268,19 +256,17 @@ public static void groupsIntoOverlappingNonmergingWindows( Arrays.asList(window(0, 5)), PaneInfo.NO_FIRING)); - assertThat(result.size(), equalTo(2)); + assertThat(result.peekOutputElements(), hasSize(2)); - WindowedValue>> item0 = result.get(0); + TimestampedValue>> item0 = + Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(0, 5))); assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v3")); assertThat(item0.getTimestamp(), equalTo(window(1, 5).maxTimestamp())); - assertThat(item0.getWindows(), - contains(window(0, 5))); - WindowedValue>> item1 = result.get(1); + TimestampedValue>> item1 = + Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(1, 5))); assertThat(item1.getValue().getValue(), contains("v2")); assertThat(item1.getTimestamp(), equalTo(window(0, 5).maxTimestamp())); - assertThat(item1.getWindows(), - contains(window(1, 5))); } /** @@ -293,7 +279,7 @@ public static void groupsElementsInMergedSessions( WindowingStrategy windowingStrategy = WindowingStrategy.of(Sessions.withGapDuration(Duration.millis(10))); - List>>> result = + DoFnTester>>, KV>> result = runGABW(gabwFactory, windowingStrategy, "key", WindowedValue.of( "v1", @@ -311,19 +297,17 @@ public static void groupsElementsInMergedSessions( Arrays.asList(window(15, 25)), PaneInfo.NO_FIRING)); - assertThat(result.size(), equalTo(2)); + assertThat(result.peekOutputElements(), hasSize(2)); - WindowedValue>> item0 = result.get(0); + TimestampedValue>> item0 = + Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(0, 15))); assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v2")); assertThat(item0.getTimestamp(), equalTo(window(0, 15).maxTimestamp())); - assertThat(item0.getWindows(), - contains(window(0, 15))); - WindowedValue>> item1 = result.get(1); + TimestampedValue>> item1 = + Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(15, 25))); assertThat(item1.getValue().getValue(), contains("v3")); assertThat(item1.getTimestamp(), equalTo(window(15, 25).maxTimestamp())); - assertThat(item1.getWindows(), - contains(window(15, 25))); } /** @@ -338,7 +322,7 @@ public static void combinesElementsPerSession( WindowingStrategy windowingStrategy = WindowingStrategy.of(Sessions.withGapDuration(Duration.millis(10))); - List>> result = + DoFnTester>>, KV> result = runGABW(gabwFactory, windowingStrategy, "k", WindowedValue.of( 1L, @@ -356,21 +340,19 @@ public static void combinesElementsPerSession( Arrays.asList(window(15, 25)), PaneInfo.NO_FIRING)); - assertThat(result, contains( - WindowMatchers.isSingleWindowedValue( - KvMatcher.isKv( - equalTo("k"), - equalTo(combineFn.apply(ImmutableList.of(1L, 2L)))), - window(0, 15).maxTimestamp().getMillis(), // aggregate timestamp - 0, // window start - 15), // window end - WindowMatchers.isSingleWindowedValue( - KvMatcher.isKv( - equalTo("k"), - equalTo(combineFn.apply(ImmutableList.of(4L)))), - window(15, 25).maxTimestamp().getMillis(), // aggregate timestamp - 15, // window start - 25))); // window end + assertThat(result.peekOutputElements(), hasSize(2)); + + TimestampedValue> item0 = + Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(0, 15))); + assertThat(item0.getValue().getKey(), equalTo("k")); + assertThat(item0.getValue().getValue(), equalTo(combineFn.apply(ImmutableList.of(1L, 2L)))); + assertThat(item0.getTimestamp(), equalTo(window(0, 15).maxTimestamp())); + + TimestampedValue> item1 = + Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(15, 25))); + assertThat(item1.getValue().getKey(), equalTo("k")); + assertThat(item1.getValue().getValue(), equalTo(combineFn.apply(ImmutableList.of(4L)))); + assertThat(item1.getTimestamp(), equalTo(window(15, 25).maxTimestamp())); } /** @@ -386,7 +368,7 @@ public static void groupsElementsIntoFixedWindowsWithEndOfWindowTimestamp( WindowingStrategy.of(FixedWindows.of(Duration.millis(10))) .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow()); - List>>> result = + DoFnTester>>, KV>> result = runGABW(gabwFactory, windowingStrategy, "key", WindowedValue.of( "v1", @@ -404,19 +386,17 @@ public static void groupsElementsIntoFixedWindowsWithEndOfWindowTimestamp( Arrays.asList(window(10, 20)), PaneInfo.NO_FIRING)); - assertThat(result.size(), equalTo(2)); + assertThat(result.peekOutputElements(), hasSize(2)); - WindowedValue>> item0 = result.get(0); + TimestampedValue>> item0 = + Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(0, 10))); assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v2")); assertThat(item0.getTimestamp(), equalTo(window(0, 10).maxTimestamp())); - assertThat(item0.getTimestamp(), - equalTo(Iterables.getOnlyElement(item0.getWindows()).maxTimestamp())); - WindowedValue>> item1 = result.get(1); + TimestampedValue>> item1 = + Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(10, 20))); assertThat(item1.getValue().getValue(), contains("v3")); assertThat(item1.getTimestamp(), equalTo(window(10, 20).maxTimestamp())); - assertThat(item1.getTimestamp(), - equalTo(Iterables.getOnlyElement(item1.getWindows()).maxTimestamp())); } /** @@ -432,7 +412,7 @@ public static void groupsElementsIntoFixedWindowsWithLatestTimestamp( WindowingStrategy.of(FixedWindows.of(Duration.millis(10))) .withOutputTimeFn(OutputTimeFns.outputAtLatestInputTimestamp()); - List>>> result = + DoFnTester>>, KV>> result = runGABW(gabwFactory, windowingStrategy, "k", WindowedValue.of( "v1", @@ -450,16 +430,16 @@ public static void groupsElementsIntoFixedWindowsWithLatestTimestamp( Arrays.asList(window(10, 20)), PaneInfo.NO_FIRING)); - assertThat(result.size(), equalTo(2)); + assertThat(result.peekOutputElements(), hasSize(2)); - WindowedValue>> item0 = result.get(0); + TimestampedValue>> item0 = + Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(0, 10))); assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v2")); - assertThat(item0.getWindows(), contains(window(0, 10))); assertThat(item0.getTimestamp(), equalTo(new Instant(2))); - WindowedValue>> item1 = result.get(1); + TimestampedValue>> item1 = + Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(10, 20))); assertThat(item1.getValue().getValue(), contains("v3")); - assertThat(item1.getWindows(), contains(window(10, 20))); assertThat(item1.getTimestamp(), equalTo(new Instant(13))); } @@ -475,7 +455,7 @@ public static void groupsElementsInMergedSessionsWithEndOfWindowTimestamp( WindowingStrategy.of(Sessions.withGapDuration(Duration.millis(10))) .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow()); - List>>> result = + DoFnTester>>, KV>> result = runGABW(gabwFactory, windowingStrategy, "k", WindowedValue.of( "v1", @@ -493,19 +473,17 @@ public static void groupsElementsInMergedSessionsWithEndOfWindowTimestamp( Arrays.asList(window(15, 25)), PaneInfo.NO_FIRING)); - assertThat(result.size(), equalTo(2)); + assertThat(result.peekOutputElements(), hasSize(2)); - WindowedValue>> item0 = result.get(0); + TimestampedValue>> item0 = + Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(0, 15))); assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v2")); - assertThat(item0.getWindows(), contains(window(0, 15))); - assertThat(item0.getTimestamp(), - equalTo(Iterables.getOnlyElement(item0.getWindows()).maxTimestamp())); + assertThat(item0.getTimestamp(), equalTo(window(0, 15).maxTimestamp())); - WindowedValue>> item1 = result.get(1); + TimestampedValue>> item1 = + Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(15, 25))); assertThat(item1.getValue().getValue(), contains("v3")); - assertThat(item1.getWindows(), contains(window(15, 25))); - assertThat(item1.getTimestamp(), - equalTo(Iterables.getOnlyElement(item1.getWindows()).maxTimestamp())); + assertThat(item1.getTimestamp(), equalTo(window(15, 25).maxTimestamp())); } /** @@ -520,7 +498,8 @@ public static void groupsElementsInMergedSessionsWithLatestTimestamp( WindowingStrategy.of(Sessions.withGapDuration(Duration.millis(10))) .withOutputTimeFn(OutputTimeFns.outputAtLatestInputTimestamp()); - List>>> result = + BoundedWindow unmergedWindow = window(15, 25); + DoFnTester>>, KV>> result = runGABW(gabwFactory, windowingStrategy, "k", WindowedValue.of( "v1", @@ -535,19 +514,20 @@ public static void groupsElementsInMergedSessionsWithLatestTimestamp( WindowedValue.of( "v3", new Instant(15), - Arrays.asList(window(15, 25)), + Arrays.asList(unmergedWindow), PaneInfo.NO_FIRING)); - assertThat(result.size(), equalTo(2)); + assertThat(result.peekOutputElements(), hasSize(2)); - WindowedValue>> item0 = result.get(0); + BoundedWindow mergedWindow = window(0, 15); + TimestampedValue>> item0 = + Iterables.getOnlyElement(result.peekOutputElementsInWindow(mergedWindow)); assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v2")); - assertThat(item0.getWindows(), contains(window(0, 15))); assertThat(item0.getTimestamp(), equalTo(new Instant(5))); - WindowedValue>> item1 = result.get(1); + TimestampedValue>> item1 = + Iterables.getOnlyElement(result.peekOutputElementsInWindow(unmergedWindow)); assertThat(item1.getValue().getValue(), contains("v3")); - assertThat(item1.getWindows(), contains(window(15, 25))); assertThat(item1.getTimestamp(), equalTo(new Instant(15))); } @@ -564,8 +544,8 @@ public static void combinesElementsPerSessionWithEndOfWindowTimestamp( WindowingStrategy.of(Sessions.withGapDuration(Duration.millis(10))) .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow()); - - List>> result = + BoundedWindow secondWindow = window(15, 25); + DoFnTester> result = runGABW(gabwFactory, windowingStrategy, "k", WindowedValue.of( 1L, @@ -580,91 +560,56 @@ public static void combinesElementsPerSessionWithEndOfWindowTimestamp( WindowedValue.of( 4L, new Instant(15), - Arrays.asList(window(15, 25)), + Arrays.asList(secondWindow), PaneInfo.NO_FIRING)); - assertThat(result.size(), equalTo(2)); + assertThat(result.peekOutputElements(), hasSize(2)); - WindowedValue> item0 = result.get(0); + BoundedWindow firstResultWindow = window(0, 15); + TimestampedValue> item0 = + Iterables.getOnlyElement(result.peekOutputElementsInWindow(firstResultWindow)); assertThat(item0.getValue().getValue(), equalTo(combineFn.apply(ImmutableList.of(1L, 2L)))); - assertThat(item0.getWindows(), contains(window(0, 15))); - assertThat(item0.getTimestamp(), - equalTo(Iterables.getOnlyElement(item0.getWindows()).maxTimestamp())); + assertThat(item0.getTimestamp(), equalTo(firstResultWindow.maxTimestamp())); - WindowedValue> item1 = result.get(1); + TimestampedValue> item1 = + Iterables.getOnlyElement(result.peekOutputElementsInWindow(secondWindow)); assertThat(item1.getValue().getValue(), equalTo(combineFn.apply(ImmutableList.of(4L)))); - assertThat(item1.getWindows(), contains(window(15, 25))); assertThat(item1.getTimestamp(), - equalTo(Iterables.getOnlyElement(item1.getWindows()).maxTimestamp())); + equalTo(secondWindow.maxTimestamp())); } @SafeVarargs private static - List>> runGABW( + DoFnTester>>, KV> runGABW( GroupAlsoByWindowsDoFnFactory gabwFactory, WindowingStrategy windowingStrategy, K key, - WindowedValue... values) { + WindowedValue... values) throws Exception { return runGABW(gabwFactory, windowingStrategy, key, Arrays.asList(values)); } private static - List>> runGABW( + DoFnTester>>, KV> runGABW( GroupAlsoByWindowsDoFnFactory gabwFactory, WindowingStrategy windowingStrategy, K key, - Collection> values) { + Collection> values) throws Exception { TupleTag> outputTag = new TupleTag<>(); DoFnRunnerBase.ListOutputManager outputManager = new DoFnRunnerBase.ListOutputManager(); - DoFnRunner>>, KV> runner = - makeRunner( - gabwFactory.forStrategy(windowingStrategy), - windowingStrategy, - outputTag, - outputManager); - - runner.startBundle(); - - if (values.size() > 0) { - runner.processElement(WindowedValue.valueInEmptyWindows( - KV.of(key, (Iterable>) values))); - } - - runner.finishBundle(); - - List>> result = outputManager.getOutput(outputTag); + DoFnTester>>, KV> tester = + DoFnTester.of(gabwFactory.forStrategy(windowingStrategy)); + tester.startBundle(); + tester.processElement(KV.>>of(key, values)); + tester.finishBundle(); // Sanity check for corruption - for (WindowedValue> elem : result) { - assertThat(elem.getValue().getKey(), equalTo(key)); + for (KV elem : tester.peekOutputElements()) { + assertThat(elem.getKey(), equalTo(key)); } - return result; - } - - private static - DoFnRunner>>, KV> - makeRunner( - GroupAlsoByWindowsDoFn fn, - WindowingStrategy windowingStrategy, - TupleTag> outputTag, - DoFnRunners.OutputManager outputManager) { - - ExecutionContext executionContext = DirectModeExecutionContext.create(); - CounterSet counters = new CounterSet(); - - return DoFnRunners.simpleRunner( - PipelineOptionsFactory.create(), - fn, - NullSideInputReader.empty(), - outputManager, - outputTag, - new ArrayList>(), - executionContext.getOrCreateStepContext("GABWStep", "GABWTransform"), - counters.getAddCounterMutator(), - windowingStrategy); + return tester; } private static BoundedWindow window(long start, long end) { From bf476e12beedd4598c388fae02b662b3d4794aaa Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Tue, 14 Jun 2016 17:52:49 -0700 Subject: [PATCH 2/2] Remove the DirectPipelineRunner from the Core SDK --- .../examples/common/DataflowExampleUtils.java | 11 +- runners/spark/pom.xml | 6 + .../translation/TransformTranslatorTest.java | 4 +- .../java/org/apache/beam/sdk/io/PubsubIO.java | 1 - .../java/org/apache/beam/sdk/io/Read.java | 44 - .../java/org/apache/beam/sdk/io/TextIO.java | 1 - .../sdk/options/DirectPipelineOptions.java | 1 - .../sdk/runners/DirectPipelineRegistrar.java | 55 - .../sdk/runners/DirectPipelineRunner.java | 1298 ----------------- .../apache/beam/sdk/transforms/Flatten.java | 32 - .../org/apache/beam/sdk/transforms/ParDo.java | 302 +--- .../org/apache/beam/sdk/transforms/View.java | 24 - .../sdk/util/DirectModeExecutionContext.java | 130 -- .../apache/beam/sdk/util/DoFnRunnerBase.java | 1 - .../org/apache/beam/sdk/PipelineTest.java | 4 +- .../BoundedReadFromUnboundedSourceTest.java | 6 - .../runners/DirectPipelineRegistrarTest.java | 71 - .../sdk/runners/DirectPipelineRunnerTest.java | 222 --- .../beam/sdk/runners/PipelineRunnerTest.java | 9 +- .../beam/sdk/transforms/CombineTest.java | 21 - .../beam/sdk/transforms/GroupByKeyTest.java | 13 +- .../apache/beam/sdk/transforms/ViewTest.java | 29 +- .../java/common/DataflowExampleUtils.java | 13 +- testing/travis/test_wordcount.sh | 4 +- 24 files changed, 40 insertions(+), 2262 deletions(-) delete mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/runners/DirectPipelineRegistrar.java delete mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/runners/DirectPipelineRunner.java delete mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/util/DirectModeExecutionContext.java delete mode 100644 sdks/java/core/src/test/java/org/apache/beam/sdk/runners/DirectPipelineRegistrarTest.java delete mode 100644 sdks/java/core/src/test/java/org/apache/beam/sdk/runners/DirectPipelineRunnerTest.java diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/DataflowExampleUtils.java b/examples/java/src/main/java/org/apache/beam/examples/common/DataflowExampleUtils.java index fb4f3bfb413c..a0b7319db81c 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/common/DataflowExampleUtils.java +++ b/examples/java/src/main/java/org/apache/beam/examples/common/DataflowExampleUtils.java @@ -17,6 +17,7 @@ */ package org.apache.beam.examples.common; +import org.apache.beam.runners.dataflow.BlockingDataflowPipelineRunner; import org.apache.beam.runners.dataflow.DataflowPipelineJob; import org.apache.beam.runners.dataflow.DataflowPipelineRunner; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; @@ -25,7 +26,7 @@ import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.options.BigQueryOptions; -import org.apache.beam.sdk.runners.DirectPipelineRunner; +import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.transforms.IntraBundleParallelization; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff; @@ -315,11 +316,13 @@ public void startInjectorIfNeeded(String inputFile) { /** * Do some runner setup: check that the DirectPipelineRunner is not used in conjunction with - * streaming, and if streaming is specified, use the DataflowPipelineRunner. Return the streaming - * flag value. + * streaming, and if streaming is specified, use the DataflowPipelineRunner. */ public void setupRunner() { - if (options.isStreaming() && options.getRunner() != DirectPipelineRunner.class) { + Class> runner = options.getRunner(); + if (options.isStreaming() + && (runner.equals(DataflowPipelineRunner.class) + || runner.equals(BlockingDataflowPipelineRunner.class))) { // In order to cancel the pipelines automatically, // {@literal DataflowPipelineRunner} is forced to be used. options.setRunner(DataflowPipelineRunner.class); diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml index 4110689c33ef..e7d08342f1dc 100644 --- a/runners/spark/pom.xml +++ b/runners/spark/pom.xml @@ -118,6 +118,12 @@ hamcrest-all test + + org.apache.beam + beam-runners-direct-java + 0.2.0-incubating-SNAPSHOT + test + diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java index 4ef26d308715..01f3070f73ab 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java @@ -21,12 +21,12 @@ import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.assertThat; +import org.apache.beam.runners.direct.InProcessPipelineRunner; import org.apache.beam.runners.spark.SparkPipelineRunner; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.runners.DirectPipelineRunner; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.values.PCollection; @@ -58,7 +58,7 @@ public class TransformTranslatorTest { */ @Test public void testTextIOReadAndWriteTransforms() throws IOException { - String directOut = runPipeline(DirectPipelineRunner.class); + String directOut = runPipeline(InProcessPipelineRunner.class); String sparkOut = runPipeline(SparkPipelineRunner.class); List directOutput = diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java index 7e24253821b8..2a5698cc15c5 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java @@ -23,7 +23,6 @@ import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.options.PubsubOptions; -import org.apache.beam.sdk.runners.DirectPipelineRunner; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java index fb4006383011..c0440f260104 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java @@ -20,11 +20,9 @@ import static org.apache.beam.sdk.util.StringUtils.approximateSimpleName; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.runners.DirectPipelineRunner; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.SerializableUtils; -import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollection.IsBounded; @@ -32,9 +30,6 @@ import org.joda.time.Duration; -import java.util.ArrayList; -import java.util.List; - import javax.annotation.Nullable; /** @@ -153,45 +148,6 @@ public void populateDisplayData(DisplayData.Builder builder) { .withLabel("Read Source")) .include(source); } - - static { - registerDefaultTransformEvaluator(); - } - - @SuppressWarnings({"rawtypes", "unchecked"}) - private static void registerDefaultTransformEvaluator() { - DirectPipelineRunner.registerDefaultTransformEvaluator( - Bounded.class, - new DirectPipelineRunner.TransformEvaluator() { - @Override - public void evaluate( - Bounded transform, DirectPipelineRunner.EvaluationContext context) { - evaluateReadHelper(transform, context); - } - - private void evaluateReadHelper( - Read.Bounded transform, DirectPipelineRunner.EvaluationContext context) { - try { - List> output = new ArrayList<>(); - BoundedSource source = transform.getSource(); - try (BoundedSource.BoundedReader reader = - source.createReader(context.getPipelineOptions())) { - for (boolean available = reader.start(); - available; - available = reader.advance()) { - output.add( - DirectPipelineRunner.ValueWithMetadata.of( - WindowedValue.timestampedValueInGlobalWindow( - reader.getCurrent(), reader.getCurrentTimestamp()))); - } - } - context.setPCollectionValuesWithMetadata(context.getOutput(transform), output); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - }); - } } /** diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java index 13cb45e2099a..bbef07291636 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java @@ -25,7 +25,6 @@ import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.io.Read.Bounded; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.runners.DirectPipelineRunner; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.IOChannelUtils; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/DirectPipelineOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/DirectPipelineOptions.java index 4cdc0cae2040..c2095e3bb5fd 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/DirectPipelineOptions.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/DirectPipelineOptions.java @@ -18,7 +18,6 @@ package org.apache.beam.sdk.options; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.runners.DirectPipelineRunner; import org.apache.beam.sdk.values.PCollection; import com.fasterxml.jackson.annotation.JsonIgnore; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/DirectPipelineRegistrar.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/DirectPipelineRegistrar.java deleted file mode 100644 index 7dd0fdd62965..000000000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/DirectPipelineRegistrar.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.runners; - -import org.apache.beam.sdk.options.DirectPipelineOptions; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsRegistrar; - -import com.google.auto.service.AutoService; -import com.google.common.collect.ImmutableList; - -/** - * Contains the {@link PipelineOptionsRegistrar} and {@link PipelineRunnerRegistrar} for - * the {@link DirectPipeline}. - */ -public class DirectPipelineRegistrar { - private DirectPipelineRegistrar() { } - - /** - * Register the {@link DirectPipelineRunner}. - */ - @AutoService(PipelineRunnerRegistrar.class) - public static class Runner implements PipelineRunnerRegistrar { - @Override - public Iterable>> getPipelineRunners() { - return ImmutableList.>>of(DirectPipelineRunner.class); - } - } - - /** - * Register the {@link DirectPipelineOptions}. - */ - @AutoService(PipelineOptionsRegistrar.class) - public static class Options implements PipelineOptionsRegistrar { - @Override - public Iterable> getPipelineOptions() { - return ImmutableList.>of(DirectPipelineOptions.class); - } - } -} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/DirectPipelineRunner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/DirectPipelineRunner.java deleted file mode 100644 index 1eb25c5608fb..000000000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/DirectPipelineRunner.java +++ /dev/null @@ -1,1298 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.runners; - -import static org.apache.beam.sdk.util.CoderUtils.encodeToByteArray; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkState; - -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.Pipeline.PipelineVisitor; -import org.apache.beam.sdk.PipelineResult; -import org.apache.beam.sdk.coders.CannotProvideCoderException; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.ListCoder; -import org.apache.beam.sdk.io.AvroIO; -import org.apache.beam.sdk.io.FileBasedSink; -import org.apache.beam.sdk.io.TextIO; -import org.apache.beam.sdk.options.DirectPipelineOptions; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsValidator; -import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.AppliedPTransform; -import org.apache.beam.sdk.transforms.Combine; -import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.GroupByKey; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.Partition; -import org.apache.beam.sdk.transforms.Partition.PartitionFn; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.transforms.windowing.WindowFn; -import org.apache.beam.sdk.util.AppliedCombineFn; -import org.apache.beam.sdk.util.AssignWindows; -import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly; -import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly; -import org.apache.beam.sdk.util.IOChannelUtils; -import org.apache.beam.sdk.util.MapAggregatorValues; -import org.apache.beam.sdk.util.PerKeyCombineFnRunner; -import org.apache.beam.sdk.util.PerKeyCombineFnRunners; -import org.apache.beam.sdk.util.SerializableUtils; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.WindowingStrategy; -import org.apache.beam.sdk.util.common.Counter; -import org.apache.beam.sdk.util.common.CounterSet; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionList; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.PDone; -import org.apache.beam.sdk.values.PInput; -import org.apache.beam.sdk.values.POutput; -import org.apache.beam.sdk.values.PValue; -import org.apache.beam.sdk.values.TypedPValue; - -import com.google.common.base.Function; -import com.google.common.collect.Lists; - -import org.joda.time.Instant; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Random; - -/** - * Executes the operations in the pipeline directly, in this process, without - * any optimization. Useful for small local execution and tests. - * - *

Throws an exception from {@link #run} if execution fails. - * - *

Permissions

- * When reading from a Dataflow source or writing to a Dataflow sink using - * {@code DirectPipelineRunner}, the Cloud Platform account that you configured with the - * gcloud executable will need access to the - * corresponding source/sink. - * - *

Please see Google Cloud - * Dataflow Security and Permissions for more details. - */ -@SuppressWarnings({"rawtypes", "unchecked"}) -public class DirectPipelineRunner - extends PipelineRunner { - private static final Logger LOG = LoggerFactory.getLogger(DirectPipelineRunner.class); - - /** - * A source of random data, which can be seeded if determinism is desired. - */ - private Random rand; - - /** - * A map from PTransform class to the corresponding - * TransformEvaluator to use to evaluate that transform. - * - *

A static map that contains system-wide defaults. - */ - private static Map defaultTransformEvaluators = - new HashMap<>(); - - /** - * A map from PTransform class to the corresponding - * TransformEvaluator to use to evaluate that transform. - * - *

An instance map that contains bindings for this DirectPipelineRunner. - * Bindings in this map override those in the default map. - */ - private Map localTransformEvaluators = - new HashMap<>(); - - /** - * Records that instances of the specified PTransform class - * should be evaluated by default by the corresponding - * TransformEvaluator. - */ - public static > - void registerDefaultTransformEvaluator( - Class transformClass, - TransformEvaluator transformEvaluator) { - if (defaultTransformEvaluators.put(transformClass, transformEvaluator) - != null) { - throw new IllegalArgumentException( - "defining multiple evaluators for " + transformClass); - } - } - - ///////////////////////////////////////////////////////////////////////////// - - /** - * Records that instances of the specified PTransform class - * should be evaluated by the corresponding TransformEvaluator. - * Overrides any bindings specified by - * {@link #registerDefaultTransformEvaluator}. - */ - public > - void registerTransformEvaluator( - Class transformClass, - TransformEvaluator transformEvaluator) { - if (localTransformEvaluators.put(transformClass, transformEvaluator) - != null) { - throw new IllegalArgumentException( - "defining multiple evaluators for " + transformClass); - } - } - - /** - * Returns the TransformEvaluator to use for instances of the - * specified PTransform class, or null if none registered. - */ - public > - TransformEvaluator getTransformEvaluator(Class transformClass) { - TransformEvaluator transformEvaluator = - localTransformEvaluators.get(transformClass); - if (transformEvaluator == null) { - transformEvaluator = defaultTransformEvaluators.get(transformClass); - } - return transformEvaluator; - } - - /** - * Constructs a DirectPipelineRunner from the given options. - */ - public static DirectPipelineRunner fromOptions(PipelineOptions options) { - DirectPipelineOptions directOptions = - PipelineOptionsValidator.validate(DirectPipelineOptions.class, options); - LOG.debug("Creating DirectPipelineRunner"); - return new DirectPipelineRunner(directOptions); - } - - /** - * Enable runtime testing to verify that all functions and {@link Coder} - * instances can be serialized. - * - *

Enabled by default. - * - *

This method modifies the {@code DirectPipelineRunner} instance and - * returns itself. - */ - public DirectPipelineRunner withSerializabilityTesting(boolean enable) { - this.testSerializability = enable; - return this; - } - - /** - * Enable runtime testing to verify that all values can be encoded. - * - *

Enabled by default. - * - *

This method modifies the {@code DirectPipelineRunner} instance and - * returns itself. - */ - public DirectPipelineRunner withEncodabilityTesting(boolean enable) { - this.testEncodability = enable; - return this; - } - - /** - * Enable runtime testing to verify that functions do not depend on order - * of the elements. - * - *

This is accomplished by randomizing the order of elements. - * - *

Enabled by default. - * - *

This method modifies the {@code DirectPipelineRunner} instance and - * returns itself. - */ - public DirectPipelineRunner withUnorderednessTesting(boolean enable) { - this.testUnorderedness = enable; - return this; - } - - @Override - public OutputT apply( - PTransform transform, InputT input) { - if (transform instanceof Combine.GroupedValues) { - return (OutputT) applyTestCombine((Combine.GroupedValues) transform, (PCollection) input); - } else if (transform instanceof TextIO.Write.Bound) { - return (OutputT) applyTextIOWrite((TextIO.Write.Bound) transform, (PCollection) input); - } else if (transform instanceof AvroIO.Write.Bound) { - return (OutputT) applyAvroIOWrite((AvroIO.Write.Bound) transform, (PCollection) input); - } else if (transform instanceof GroupByKey) { - return (OutputT) - ((PCollection) input).apply(new GroupByKeyViaGroupByKeyOnly((GroupByKey) transform)); - } else if (transform instanceof Window.Bound) { - return (OutputT) - ((PCollection) input).apply(new AssignWindowsAndSetStrategy((Window.Bound) transform)); - } else { - return super.apply(transform, input); - } - } - - private PCollection> applyTestCombine( - Combine.GroupedValues transform, - PCollection>> input) { - - PCollection> output = input - .apply(ParDo.of(TestCombineDoFn.create(transform, input, testSerializability, rand)) - .withSideInputs(transform.getSideInputs())); - - try { - output.setCoder(transform.getDefaultOutputCoder(input)); - } catch (CannotProvideCoderException exc) { - // let coder inference occur later, if it can - } - return output; - } - - private static class ElementProcessingOrderPartitionFn implements PartitionFn { - private int elementNumber; - @Override - public int partitionFor(T elem, int numPartitions) { - return elementNumber++ % numPartitions; - } - } - - /** - * Applies TextIO.Write honoring user requested sharding controls (i.e. withNumShards) - * by applying a partition function based upon the number of shards the user requested. - */ - private static class DirectTextIOWrite extends PTransform, PDone> { - private final TextIO.Write.Bound transform; - - private DirectTextIOWrite(TextIO.Write.Bound transform) { - this.transform = transform; - } - - @Override - public PDone apply(PCollection input) { - checkState(transform.getNumShards() > 1, - "DirectTextIOWrite is expected to only be used when sharding controls are required."); - - // Evenly distribute all the elements across the partitions. - PCollectionList partitionedElements = - input.apply(Partition.of(transform.getNumShards(), - new ElementProcessingOrderPartitionFn())); - - // For each input PCollection partition, create a write transform that represents - // one of the specific shards. - for (int i = 0; i < transform.getNumShards(); ++i) { - /* - * This logic mirrors the file naming strategy within - * {@link FileBasedSink#generateDestinationFilenames()} - */ - String outputFilename = IOChannelUtils.constructName( - transform.getFilenamePrefix(), - transform.getShardNameTemplate(), - getFileExtension(transform.getFilenameSuffix()), - i, - transform.getNumShards()); - - String transformName = String.format("%s(Shard:%s)", transform.getName(), i); - partitionedElements.get(i).apply(transformName, - transform.withNumShards(1).withShardNameTemplate("").withSuffix("").to(outputFilename)); - } - return PDone.in(input.getPipeline()); - } - } - - /** - * Returns the file extension to be used. If the user did not request a file - * extension then this method returns the empty string. Otherwise this method - * adds a {@code "."} to the beginning of the users extension if one is not present. - * - *

This is copied from {@link FileBasedSink} to not expose it. - */ - private static String getFileExtension(String usersExtension) { - if (usersExtension == null || usersExtension.isEmpty()) { - return ""; - } - if (usersExtension.startsWith(".")) { - return usersExtension; - } - return "." + usersExtension; - } - - /** - * Apply the override for TextIO.Write.Bound if the user requested sharding controls - * greater than one. - */ - private PDone applyTextIOWrite(TextIO.Write.Bound transform, PCollection input) { - if (transform.getNumShards() <= 1) { - // By default, the DirectPipelineRunner outputs to only 1 shard. Since the user never - // requested sharding controls greater than 1, we default to outputting to 1 file. - return super.apply(transform.withNumShards(1), input); - } - return input.apply(new DirectTextIOWrite<>(transform)); - } - - /** - * Applies AvroIO.Write honoring user requested sharding controls (i.e. withNumShards) - * by applying a partition function based upon the number of shards the user requested. - */ - private static class DirectAvroIOWrite extends PTransform, PDone> { - private final AvroIO.Write.Bound transform; - - private DirectAvroIOWrite(AvroIO.Write.Bound transform) { - this.transform = transform; - } - - @Override - public PDone apply(PCollection input) { - checkState(transform.getNumShards() > 1, - "DirectAvroIOWrite is expected to only be used when sharding controls are required."); - - // Evenly distribute all the elements across the partitions. - PCollectionList partitionedElements = - input.apply(Partition.of(transform.getNumShards(), - new ElementProcessingOrderPartitionFn())); - - // For each input PCollection partition, create a write transform that represents - // one of the specific shards. - for (int i = 0; i < transform.getNumShards(); ++i) { - /* - * This logic mirrors the file naming strategy within - * {@link FileBasedSink#generateDestinationFilenames()} - */ - String outputFilename = IOChannelUtils.constructName( - transform.getFilenamePrefix(), - transform.getShardNameTemplate(), - getFileExtension(transform.getFilenameSuffix()), - i, - transform.getNumShards()); - - String transformName = String.format("%s(Shard:%s)", transform.getName(), i); - partitionedElements.get(i).apply(transformName, - transform.withNumShards(1).withShardNameTemplate("").withSuffix("").to(outputFilename)); - } - return PDone.in(input.getPipeline()); - } - } - - private static class AssignWindowsAndSetStrategy - extends PTransform, PCollection> { - - private final Window.Bound wrapped; - - public AssignWindowsAndSetStrategy(Window.Bound wrapped) { - this.wrapped = wrapped; - } - - @Override - public PCollection apply(PCollection input) { - WindowingStrategy outputStrategy = - wrapped.getOutputStrategyInternal(input.getWindowingStrategy()); - - WindowFn windowFn = - (WindowFn) outputStrategy.getWindowFn(); - - // If the Window.Bound transform only changed parts other than the WindowFn, then - // we skip AssignWindows even though it should be harmless in a perfect world. - // The world is not perfect, and a GBK may have set it to InvalidWindows to forcibly - // crash if another GBK is performed without explicitly setting the WindowFn. So we skip - // AssignWindows in this case. - if (wrapped.getWindowFn() == null) { - return input.apply("Identity", ParDo.of(new IdentityFn())) - .setWindowingStrategyInternal(outputStrategy); - } else { - return input - .apply("AssignWindows", new AssignWindows(windowFn)) - .setWindowingStrategyInternal(outputStrategy); - } - } - } - - private static class IdentityFn extends DoFn { - @Override - public void processElement(ProcessContext c) { - c.output(c.element()); - } - } - - /** - * Apply the override for AvroIO.Write.Bound if the user requested sharding controls - * greater than one. - */ - private PDone applyAvroIOWrite(AvroIO.Write.Bound transform, PCollection input) { - if (transform.getNumShards() <= 1) { - // By default, the DirectPipelineRunner outputs to only 1 shard. Since the user never - // requested sharding controls greater than 1, we default to outputting to 1 file. - return super.apply(transform.withNumShards(1), input); - } - return input.apply(new DirectAvroIOWrite<>(transform)); - } - - /** - * The implementation may split the {@link KeyedCombineFn} into ADD, MERGE and EXTRACT phases ( - * see {@code org.apache.beam.sdk.runners.worker.CombineValuesFn}). In order to emulate - * this for the {@link DirectPipelineRunner} and provide an experience closer to the service, go - * through heavy serializability checks for the equivalent of the results of the ADD phase, but - * after the {@link org.apache.beam.sdk.transforms.GroupByKey} shuffle, and the MERGE - * phase. Doing these checks ensure that not only is the accumulator coder serializable, but - * the accumulator coder can actually serialize the data in question. - */ - public static class TestCombineDoFn - extends DoFn>, KV> { - private final PerKeyCombineFnRunner fnRunner; - private final Coder accumCoder; - private final boolean testSerializability; - private final Random rand; - - public static TestCombineDoFn create( - Combine.GroupedValues transform, - PCollection>> input, - boolean testSerializability, - Random rand) { - - AppliedCombineFn fn = transform.getAppliedFn( - input.getPipeline().getCoderRegistry(), input.getCoder(), input.getWindowingStrategy()); - - return new TestCombineDoFn( - PerKeyCombineFnRunners.create(fn.getFn()), - fn.getAccumulatorCoder(), - testSerializability, - rand); - } - - public TestCombineDoFn( - PerKeyCombineFnRunner fnRunner, - Coder accumCoder, - boolean testSerializability, - Random rand) { - this.fnRunner = fnRunner; - this.accumCoder = accumCoder; - this.testSerializability = testSerializability; - this.rand = rand; - - // Check that this does not crash, specifically to catch anonymous CustomCoder subclasses. - this.accumCoder.getEncodingId(); - } - - @Override - public void processElement(ProcessContext c) throws Exception { - K key = c.element().getKey(); - Iterable values = c.element().getValue(); - List groupedPostShuffle = - ensureSerializableByCoder(ListCoder.of(accumCoder), - addInputsRandomly(fnRunner, key, values, rand, c), - "After addInputs of KeyedCombineFn " + fnRunner.fn().toString()); - AccumT merged = - ensureSerializableByCoder(accumCoder, - fnRunner.mergeAccumulators(key, groupedPostShuffle, c), - "After mergeAccumulators of KeyedCombineFn " + fnRunner.fn().toString()); - // Note: The serializability of KV is ensured by the - // runner itself, since it's a transform output. - c.output(KV.of(key, fnRunner.extractOutput(key, merged, c))); - } - - /** - * Create a random list of accumulators from the given list of values. - * - *

Visible for testing purposes only. - */ - public static List addInputsRandomly( - PerKeyCombineFnRunner fnRunner, - K key, - Iterable values, - Random random, - DoFn.ProcessContext c) { - List out = new ArrayList(); - int i = 0; - AccumT accumulator = fnRunner.createAccumulator(key, c); - boolean hasInput = false; - - for (InputT value : values) { - accumulator = fnRunner.addInput(key, accumulator, value, c); - hasInput = true; - - // For each index i, flip a 1/2^i weighted coin for whether to - // create a new accumulator after index i is added, i.e. [0] - // is guaranteed, [1] is an even 1/2, [2] is 1/4, etc. The - // goal is to partition the inputs into accumulators, and make - // the accumulators potentially lumpy. Also compact about half - // of the accumulators. - if (i == 0 || random.nextInt(1 << Math.min(i, 30)) == 0) { - if (i % 2 == 0) { - accumulator = fnRunner.compact(key, accumulator, c); - } - out.add(accumulator); - accumulator = fnRunner.createAccumulator(key, c); - hasInput = false; - } - i++; - } - if (hasInput) { - out.add(accumulator); - } - - Collections.shuffle(out, random); - return out; - } - - public T ensureSerializableByCoder( - Coder coder, T value, String errorContext) { - if (testSerializability) { - return SerializableUtils.ensureSerializableByCoder( - coder, value, errorContext); - } - return value; - } - } - - @Override - public EvaluationResults run(Pipeline pipeline) { - LOG.info("Executing pipeline using the DirectPipelineRunner."); - - Evaluator evaluator = new Evaluator(rand); - evaluator.run(pipeline); - - // Log all counter values for debugging purposes. - for (Counter counter : evaluator.getCounters()) { - LOG.info("Final aggregator value: {}", counter); - } - - LOG.info("Pipeline execution complete."); - - return evaluator; - } - - /** - * An evaluator of a PTransform. - */ - public interface TransformEvaluator { - public void evaluate(TransformT transform, - EvaluationContext context); - } - - /** - * The interface provided to registered callbacks for interacting - * with the {@code DirectPipelineRunner}, including reading and writing the - * values of {@link PCollection}s and {@link PCollectionView}s. - */ - public interface EvaluationResults extends PipelineResult { - /** - * Retrieves the value of the given PCollection. - * Throws an exception if the PCollection's value hasn't already been set. - */ - List getPCollection(PCollection pc); - - /** - * Retrieves the windowed value of the given PCollection. - * Throws an exception if the PCollection's value hasn't already been set. - */ - List> getPCollectionWindowedValues(PCollection pc); - - /** - * Retrieves the values of each PCollection in the given - * PCollectionList. Throws an exception if the PCollectionList's - * value hasn't already been set. - */ - List> getPCollectionList(PCollectionList pcs); - - /** - * Retrieves the values indicated by the given {@link PCollectionView}. - * Note that within the {@link org.apache.beam.sdk.transforms.DoFn.Context} - * implementation a {@link PCollectionView} should convert from this representation to a - * suitable side input value. - */ - Iterable> getPCollectionView(PCollectionView view); - } - - /** - * An immutable (value, timestamp) pair, along with other metadata necessary - * for the implementation of {@code DirectPipelineRunner}. - */ - public static class ValueWithMetadata { - /** - * Returns a new {@code ValueWithMetadata} with the {@code WindowedValue}. - * Key is null. - */ - public static ValueWithMetadata of(WindowedValue windowedValue) { - return new ValueWithMetadata<>(windowedValue, null); - } - - /** - * Returns a new {@code ValueWithMetadata} with the implicit key associated - * with this value set. The key is the last key grouped by in the chain of - * productions that produced this element. - * These keys are used internally by {@link DirectPipelineRunner} for keeping - * persisted state separate across keys. - */ - public ValueWithMetadata withKey(Object key) { - return new ValueWithMetadata<>(windowedValue, key); - } - - /** - * Returns a new {@code ValueWithMetadata} that is a copy of this one, but with - * a different value. - */ - public ValueWithMetadata withValue(T value) { - return new ValueWithMetadata(windowedValue.withValue(value), getKey()); - } - - /** - * Returns the {@code WindowedValue} associated with this element. - */ - public WindowedValue getWindowedValue() { - return windowedValue; - } - - /** - * Returns the value associated with this element. - * - * @see #withValue - */ - public V getValue() { - return windowedValue.getValue(); - } - - /** - * Returns the timestamp associated with this element. - */ - public Instant getTimestamp() { - return windowedValue.getTimestamp(); - } - - /** - * Returns the collection of windows this element has been placed into. May - * be null if the {@code PCollection} this element is in has not yet been - * windowed. - * - * @see #getWindows() - */ - public Collection getWindows() { - return windowedValue.getWindows(); - } - - - /** - * Returns the key associated with this element. May be null if the - * {@code PCollection} this element is in is not keyed. - * - * @see #withKey - */ - public Object getKey() { - return key; - } - - //////////////////////////////////////////////////////////////////////////// - - private final Object key; - private final WindowedValue windowedValue; - - private ValueWithMetadata(WindowedValue windowedValue, - Object key) { - this.windowedValue = windowedValue; - this.key = key; - } - } - - /** - * The interface provided to registered callbacks for interacting - * with the {@code DirectPipelineRunner}, including reading and writing the - * values of {@link PCollection}s and {@link PCollectionView}s. - */ - public interface EvaluationContext extends EvaluationResults { - /** - * Returns the configured pipeline options. - */ - DirectPipelineOptions getPipelineOptions(); - - /** - * Returns the input of the currently being processed transform. - */ - InputT getInput(PTransform transform); - - /** - * Returns the output of the currently being processed transform. - */ - OutputT getOutput(PTransform transform); - - /** - * Sets the value of the given PCollection, where each element also has a timestamp - * and collection of windows. - * Throws an exception if the PCollection's value has already been set. - */ - void setPCollectionValuesWithMetadata( - PCollection pc, List> elements); - - /** - * Sets the value of the given PCollection, where each element also has a timestamp - * and collection of windows. - * Throws an exception if the PCollection's value has already been set. - */ - void setPCollectionWindowedValue(PCollection pc, List> elements); - - /** - * Shorthand for setting the value of a PCollection where the elements do not have - * timestamps or windows. - * Throws an exception if the PCollection's value has already been set. - */ - void setPCollection(PCollection pc, List elements); - - /** - * Retrieves the value of the given PCollection, along with element metadata - * such as timestamps and windows. - * Throws an exception if the PCollection's value hasn't already been set. - */ - List> getPCollectionValuesWithMetadata(PCollection pc); - - /** - * Sets the value associated with the given {@link PCollectionView}. - * Throws an exception if the {@link PCollectionView}'s value has already been set. - */ - void setPCollectionView( - PCollectionView pc, - Iterable> value); - - /** - * Ensures that the element is encodable and decodable using the - * TypePValue's coder, by encoding it and decoding it, and - * returning the result. - */ - T ensureElementEncodable(TypedPValue pvalue, T element); - - /** - * If the evaluation context is testing unorderedness, - * randomly permutes the order of the elements, in a - * copy if !inPlaceAllowed, and returns the permuted list, - * otherwise returns the argument unchanged. - */ - List randomizeIfUnordered(List elements, - boolean inPlaceAllowed); - - /** - * If the evaluation context is testing serializability, ensures - * that the argument function is serializable and deserializable - * by encoding it and then decoding it, and returning the result. - * Otherwise returns the argument unchanged. - */ - FunctionT ensureSerializable(FunctionT fn); - - /** - * If the evaluation context is testing serializability, ensures - * that the argument Coder is serializable and deserializable - * by encoding it and then decoding it, and returning the result. - * Otherwise returns the argument unchanged. - */ - Coder ensureCoderSerializable(Coder coder); - - /** - * If the evaluation context is testing serializability, ensures - * that the given data is serializable and deserializable with the - * given Coder by encoding it and then decoding it, and returning - * the result. Otherwise returns the argument unchanged. - * - *

Error context is prefixed to any thrown exceptions. - */ - T ensureSerializableByCoder(Coder coder, - T data, String errorContext); - - /** - * Returns a mutator, which can be used to add additional counters to - * this EvaluationContext. - */ - CounterSet.AddCounterMutator getAddCounterMutator(); - - /** - * Gets the step name for this transform. - */ - public String getStepName(PTransform transform); - } - - - ///////////////////////////////////////////////////////////////////////////// - - class Evaluator extends PipelineVisitor.Defaults implements EvaluationContext { - /** - * A map from PTransform to the step name of that transform. This is the internal name for the - * transform (e.g. "s2"). - */ - private final Map, String> stepNames = new HashMap<>(); - private final Map store = new HashMap<>(); - private final CounterSet counters = new CounterSet(); - private AppliedPTransform currentTransform; - - private Map, Collection>> aggregatorSteps = null; - - /** - * A map from PTransform to the full name of that transform. This is the user name of the - * transform (e.g. "RemoveDuplicates/Combine/GroupByKey"). - */ - private final Map, String> fullNames = new HashMap<>(); - - private Random rand; - - public Evaluator() { - this(new Random()); - } - - public Evaluator(Random rand) { - this.rand = rand; - } - - public void run(Pipeline pipeline) { - pipeline.traverseTopologically(this); - aggregatorSteps = new AggregatorPipelineExtractor(pipeline).getAggregatorSteps(); - } - - @Override - public DirectPipelineOptions getPipelineOptions() { - return options; - } - - @Override - public InputT getInput(PTransform transform) { - checkArgument(currentTransform != null && currentTransform.getTransform() == transform, - "can only be called with current transform"); - return (InputT) currentTransform.getInput(); - } - - @Override - public OutputT getOutput(PTransform transform) { - checkArgument(currentTransform != null && currentTransform.getTransform() == transform, - "can only be called with current transform"); - return (OutputT) currentTransform.getOutput(); - } - - @Override - public void visitPrimitiveTransform(TransformTreeNode node) { - PTransform transform = node.getTransform(); - fullNames.put(transform, node.getFullName()); - TransformEvaluator evaluator = - getTransformEvaluator(transform.getClass()); - if (evaluator == null) { - throw new IllegalStateException( - "no evaluator registered for " + transform); - } - LOG.debug("Evaluating {}", transform); - currentTransform = AppliedPTransform.of( - node.getFullName(), node.getInput(), node.getOutput(), (PTransform) transform); - evaluator.evaluate(transform, this); - currentTransform = null; - } - - @Override - public void visitValue(PValue value, TransformTreeNode producer) { - LOG.debug("Checking evaluation of {}", value); - if (value.getProducingTransformInternal() == null) { - throw new RuntimeException( - "internal error: expecting a PValue to have a producingTransform"); - } - if (!producer.isCompositeNode()) { - // Verify that primitive transform outputs are already computed. - getPValue(value); - } - } - - /** - * Sets the value of the given PValue. - * Throws an exception if the PValue's value has already been set. - */ - void setPValue(PValue pvalue, Object contents) { - if (store.containsKey(pvalue)) { - throw new IllegalStateException( - "internal error: setting the value of " + pvalue - + " more than once"); - } - store.put(pvalue, contents); - } - - /** - * Retrieves the value of the given PValue. - * Throws an exception if the PValue's value hasn't already been set. - */ - Object getPValue(PValue pvalue) { - if (!store.containsKey(pvalue)) { - throw new IllegalStateException( - "internal error: getting the value of " + pvalue - + " before it has been computed"); - } - return store.get(pvalue); - } - - /** - * Convert a list of T to a list of {@code ValueWithMetadata}, with a timestamp of 0 - * and null windows. - */ - List> toValueWithMetadata(List values) { - List> result = new ArrayList<>(values.size()); - for (T value : values) { - result.add(ValueWithMetadata.of(WindowedValue.valueInGlobalWindow(value))); - } - return result; - } - - /** - * Convert a list of {@code WindowedValue} to a list of {@code ValueWithMetadata}. - */ - List> toValueWithMetadataFromWindowedValue( - List> values) { - List> result = new ArrayList<>(values.size()); - for (WindowedValue value : values) { - result.add(ValueWithMetadata.of(value)); - } - return result; - } - - @Override - public void setPCollection(PCollection pc, List elements) { - setPCollectionValuesWithMetadata(pc, toValueWithMetadata(elements)); - } - - @Override - public void setPCollectionWindowedValue( - PCollection pc, List> elements) { - setPCollectionValuesWithMetadata(pc, toValueWithMetadataFromWindowedValue(elements)); - } - - @Override - public void setPCollectionValuesWithMetadata( - PCollection pc, List> elements) { - LOG.debug("Setting {} = {}", pc, elements); - ensurePCollectionEncodable(pc, elements); - setPValue(pc, elements); - } - - @Override - public void setPCollectionView( - PCollectionView view, - Iterable> value) { - LOG.debug("Setting {} = {}", view, value); - setPValue(view, value); - } - - /** - * Retrieves the value of the given {@link PCollection}. - * Throws an exception if the {@link PCollection}'s value hasn't already been set. - */ - @Override - public List getPCollection(PCollection pc) { - List result = new ArrayList<>(); - for (ValueWithMetadata elem : getPCollectionValuesWithMetadata(pc)) { - result.add(elem.getValue()); - } - return result; - } - - @Override - public List> getPCollectionWindowedValues(PCollection pc) { - return Lists.transform( - getPCollectionValuesWithMetadata(pc), - new Function, WindowedValue>() { - @Override - public WindowedValue apply(ValueWithMetadata input) { - return input.getWindowedValue(); - }}); - } - - @Override - public List> getPCollectionValuesWithMetadata(PCollection pc) { - List> elements = (List>) getPValue(pc); - elements = randomizeIfUnordered(elements, false /* not inPlaceAllowed */); - LOG.debug("Getting {} = {}", pc, elements); - return elements; - } - - @Override - public List> getPCollectionList(PCollectionList pcs) { - List> elementsList = new ArrayList<>(); - for (PCollection pc : pcs.getAll()) { - elementsList.add(getPCollection(pc)); - } - return elementsList; - } - - /** - * Retrieves the value indicated by the given {@link PCollectionView}. - * Note that within the {@link DoFnContext} a {@link PCollectionView} - * converts from this representation to a suitable side input value. - */ - @Override - public Iterable> getPCollectionView(PCollectionView view) { - Iterable> value = (Iterable>) getPValue(view); - LOG.debug("Getting {} = {}", view, value); - return value; - } - - /** - * If {@code testEncodability}, ensures that the {@link PCollection}'s coder and elements are - * encodable and decodable by encoding them and decoding them, and returning the result. - * Otherwise returns the argument elements. - */ - List> ensurePCollectionEncodable( - PCollection pc, List> elements) { - ensureCoderSerializable(pc.getCoder()); - if (!testEncodability) { - return elements; - } - List> elementsCopy = new ArrayList<>(elements.size()); - for (ValueWithMetadata element : elements) { - elementsCopy.add( - element.withValue(ensureElementEncodable(pc, element.getValue()))); - } - return elementsCopy; - } - - @Override - public T ensureElementEncodable(TypedPValue pvalue, T element) { - return ensureSerializableByCoder( - pvalue.getCoder(), element, "Within " + pvalue.toString()); - } - - @Override - public List randomizeIfUnordered(List elements, - boolean inPlaceAllowed) { - if (!testUnorderedness) { - return elements; - } - List elementsCopy = new ArrayList<>(elements); - Collections.shuffle(elementsCopy, rand); - return elementsCopy; - } - - @Override - public FunctionT ensureSerializable(FunctionT fn) { - if (!testSerializability) { - return fn; - } - return SerializableUtils.ensureSerializable(fn); - } - - @Override - public Coder ensureCoderSerializable(Coder coder) { - if (testSerializability) { - SerializableUtils.ensureSerializable(coder); - } - return coder; - } - - @Override - public T ensureSerializableByCoder( - Coder coder, T value, String errorContext) { - if (testSerializability) { - return SerializableUtils.ensureSerializableByCoder( - coder, value, errorContext); - } - return value; - } - - @Override - public CounterSet.AddCounterMutator getAddCounterMutator() { - return counters.getAddCounterMutator(); - } - - @Override - public String getStepName(PTransform transform) { - String stepName = stepNames.get(transform); - if (stepName == null) { - stepName = "s" + (stepNames.size() + 1); - stepNames.put(transform, stepName); - } - return stepName; - } - - /** - * Returns the CounterSet generated during evaluation, which includes - * user-defined Aggregators and may include system-defined counters. - */ - public CounterSet getCounters() { - return counters; - } - - /** - * Returns JobState.DONE in all situations. The Evaluator is not returned - * until the pipeline has been traversed, so it will either be returned - * after a successful run or the run call will terminate abnormally. - */ - @Override - public State getState() { - return State.DONE; - } - - @Override - public AggregatorValues getAggregatorValues(Aggregator aggregator) { - Map stepValues = new HashMap<>(); - for (PTransform step : aggregatorSteps.get(aggregator)) { - String stepName = String.format("user-%s-%s", stepNames.get(step), aggregator.getName()); - String fullName = fullNames.get(step); - Counter counter = counters.getExistingCounter(stepName); - if (counter == null) { - throw new IllegalArgumentException( - "Aggregator " + aggregator + " is not used in this pipeline"); - } - stepValues.put(fullName, (T) counter.getAggregate()); - } - return new MapAggregatorValues<>(stepValues); - } - } - - ///////////////////////////////////////////////////////////////////////////// - - /** - * The key by which GBK groups inputs - elements are grouped by the encoded form of the key, - * but the original key may be accessed as well. - */ - private static class GroupingKey { - private K key; - private byte[] encodedKey; - - public GroupingKey(K key, byte[] encodedKey) { - this.key = key; - this.encodedKey = encodedKey; - } - - public K getKey() { - return key; - } - - @Override - public boolean equals(Object o) { - if (o instanceof GroupingKey) { - GroupingKey that = (GroupingKey) o; - return Arrays.equals(this.encodedKey, that.encodedKey); - } else { - return false; - } - } - - @Override - public int hashCode() { - return Arrays.hashCode(encodedKey); - } - } - - private final DirectPipelineOptions options; - private boolean testSerializability; - private boolean testEncodability; - private boolean testUnorderedness; - - /** Returns a new DirectPipelineRunner. */ - private DirectPipelineRunner(DirectPipelineOptions options) { - this.options = options; - // (Re-)register standard IO factories. Clobbers any prior credentials. - IOChannelUtils.registerStandardIOFactories(options); - long randomSeed; - if (options.getDirectPipelineRunnerRandomSeed() != null) { - randomSeed = options.getDirectPipelineRunnerRandomSeed(); - } else { - randomSeed = new Random().nextLong(); - } - - LOG.debug("DirectPipelineRunner using random seed {}.", randomSeed); - rand = new Random(randomSeed); - - testSerializability = options.isTestSerializability(); - testEncodability = options.isTestEncodability(); - testUnorderedness = options.isTestUnorderedness(); - } - - /** - * Get the options used in this {@link Pipeline}. - */ - public DirectPipelineOptions getPipelineOptions() { - return options; - } - - @Override - public String toString() { - return "DirectPipelineRunner#" + hashCode(); - } - - public static void evaluateGroupByKeyOnly( - GroupByKeyOnly transform, - EvaluationContext context) { - PCollection> input = context.getInput(transform); - - List>> inputElems = - context.getPCollectionValuesWithMetadata(input); - - Coder keyCoder = GroupByKey.getKeyCoder(input.getCoder()); - - Map, List> groupingMap = new HashMap<>(); - - for (ValueWithMetadata> elem : inputElems) { - K key = elem.getValue().getKey(); - V value = elem.getValue().getValue(); - byte[] encodedKey; - try { - encodedKey = encodeToByteArray(keyCoder, key); - } catch (CoderException exn) { - // TODO: Put in better element printing: - // truncate if too long. - throw new IllegalArgumentException( - "unable to encode key " + key + " of input to " + transform - + " using " + keyCoder, - exn); - } - GroupingKey groupingKey = - new GroupingKey<>(key, encodedKey); - List values = groupingMap.get(groupingKey); - if (values == null) { - values = new ArrayList(); - groupingMap.put(groupingKey, values); - } - values.add(value); - } - - List>>> outputElems = - new ArrayList<>(); - for (Map.Entry, List> entry : groupingMap.entrySet()) { - GroupingKey groupingKey = entry.getKey(); - K key = groupingKey.getKey(); - List values = entry.getValue(); - values = context.randomizeIfUnordered(values, true /* inPlaceAllowed */); - outputElems.add(ValueWithMetadata - .of(WindowedValue.valueInEmptyWindows(KV.>of(key, values))) - .withKey(key)); - } - - context.setPCollectionValuesWithMetadata(context.getOutput(transform), - outputElems); - } - - @SuppressWarnings({"rawtypes", "unchecked"}) - public - static void registerGroupByKeyOnly() { - registerDefaultTransformEvaluator( - GroupByKeyOnly.class, - new TransformEvaluator() { - @Override - public void evaluate( - GroupByKeyOnly transform, - EvaluationContext context) { - evaluateGroupByKeyOnly(transform, context); - } - }); - } - - static { - registerGroupByKeyOnly(); - } - -} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java index 7c6fed3a22e3..93917f3dbc88 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java @@ -20,16 +20,12 @@ import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.IterableLikeCoder; -import org.apache.beam.sdk.runners.DirectPipelineRunner; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollection.IsBounded; import org.apache.beam.sdk.values.PCollectionList; -import java.util.ArrayList; -import java.util.List; - /** * {@code Flatten} takes multiple {@code PCollection}s bundled * into a {@code PCollectionList} and returns a single @@ -189,32 +185,4 @@ public void processElement(ProcessContext c) { .setCoder(elemCoder); } } - - ///////////////////////////////////////////////////////////////////////////// - - static { - DirectPipelineRunner.registerDefaultTransformEvaluator( - FlattenPCollectionList.class, - new DirectPipelineRunner.TransformEvaluator() { - @Override - public void evaluate( - FlattenPCollectionList transform, - DirectPipelineRunner.EvaluationContext context) { - evaluateHelper(transform, context); - } - }); - } - - private static void evaluateHelper( - FlattenPCollectionList transform, - DirectPipelineRunner.EvaluationContext context) { - List> outputElems = new ArrayList<>(); - PCollectionList inputs = context.getInput(transform); - - for (PCollection input : inputs.getAll()) { - outputElems.addAll(context.getPCollectionValuesWithMetadata(input)); - } - - context.setPCollectionValuesWithMetadata(context.getOutput(transform), outputElems); - } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java index 511f0d8efa75..cb7d372e7de1 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java @@ -21,27 +21,12 @@ import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.runners.DirectPipelineRunner; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.DisplayData.Builder; import org.apache.beam.sdk.transforms.windowing.WindowFn; -import org.apache.beam.sdk.util.DirectModeExecutionContext; -import org.apache.beam.sdk.util.DirectSideInputReader; -import org.apache.beam.sdk.util.DoFnRunner; -import org.apache.beam.sdk.util.DoFnRunnerBase; -import org.apache.beam.sdk.util.DoFnRunners; -import org.apache.beam.sdk.util.IllegalMutationException; -import org.apache.beam.sdk.util.MutationDetector; -import org.apache.beam.sdk.util.MutationDetectors; -import org.apache.beam.sdk.util.PTuple; import org.apache.beam.sdk.util.SerializableUtils; -import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.StringUtils; -import org.apache.beam.sdk.util.UserCodeException; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.PCollectionView; @@ -50,16 +35,10 @@ import org.apache.beam.sdk.values.TypedPValue; import com.google.common.collect.ImmutableList; -import com.google.common.collect.Maps; import java.io.Serializable; import java.util.Arrays; -import java.util.Collections; import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentMap; - -import javax.annotation.Nullable; /** * {@link ParDo} is the core element-wise transform in Google Cloud @@ -84,7 +63,7 @@ *

Conceptually, when a {@link ParDo} transform is executed, the * elements of the input {@link PCollection} are first divided up * into some number of "bundles". These are farmed off to distributed - * worker machines (or run locally, if using the {@link DirectPipelineRunner}). + * worker machines (or run locally, if using the {@code DirectRunner}). * For each bundle of input elements processing proceeds as follows: * *

    @@ -1072,288 +1051,11 @@ public List> getSideInputs() { } } - ///////////////////////////////////////////////////////////////////////////// - - static { - DirectPipelineRunner.registerDefaultTransformEvaluator( - Bound.class, - new DirectPipelineRunner.TransformEvaluator() { - @Override - public void evaluate( - Bound transform, - DirectPipelineRunner.EvaluationContext context) { - evaluateSingleHelper(transform, context); - } - }); - } - - private static void evaluateSingleHelper( - Bound transform, - DirectPipelineRunner.EvaluationContext context) { - TupleTag mainOutputTag = new TupleTag<>("out"); - - DirectModeExecutionContext executionContext = DirectModeExecutionContext.create(); - - PCollectionTuple outputs = PCollectionTuple.of(mainOutputTag, context.getOutput(transform)); - - evaluateHelper( - transform.fn, - context.getStepName(transform), - context.getInput(transform), - transform.sideInputs, - mainOutputTag, - Collections.>emptyList(), - outputs, - context, - executionContext); - - context.setPCollectionValuesWithMetadata( - context.getOutput(transform), - executionContext.getOutput(mainOutputTag)); - } - - ///////////////////////////////////////////////////////////////////////////// - - static { - DirectPipelineRunner.registerDefaultTransformEvaluator( - BoundMulti.class, - new DirectPipelineRunner.TransformEvaluator() { - @Override - public void evaluate( - BoundMulti transform, - DirectPipelineRunner.EvaluationContext context) { - evaluateMultiHelper(transform, context); - } - }); - } - - private static void evaluateMultiHelper( - BoundMulti transform, - DirectPipelineRunner.EvaluationContext context) { - - DirectModeExecutionContext executionContext = DirectModeExecutionContext.create(); - - evaluateHelper( - transform.fn, - context.getStepName(transform), - context.getInput(transform), - transform.sideInputs, - transform.mainOutputTag, - transform.sideOutputTags.getAll(), - context.getOutput(transform), - context, - executionContext); - - for (Map.Entry, PCollection> entry - : context.getOutput(transform).getAll().entrySet()) { - @SuppressWarnings("unchecked") - TupleTag tag = (TupleTag) entry.getKey(); - @SuppressWarnings("unchecked") - PCollection pc = (PCollection) entry.getValue(); - - context.setPCollectionValuesWithMetadata( - pc, - (tag == transform.mainOutputTag - ? executionContext.getOutput(tag) - : executionContext.getSideOutput(tag))); - } - } - - /** - * Evaluates a single-output or multi-output {@link ParDo} directly. - * - *

    This evaluation method is intended for use in testing scenarios; it is designed for clarity - * and correctness-checking, not speed. - * - *

    Of particular note, this performs best-effort checking that inputs and outputs are not - * mutated in violation of the requirements upon a {@link DoFn}. - */ - private static void evaluateHelper( - DoFn doFn, - String stepName, - PCollection input, - List> sideInputs, - TupleTag mainOutputTag, - List> sideOutputTags, - PCollectionTuple outputs, - DirectPipelineRunner.EvaluationContext context, - DirectModeExecutionContext executionContext) { - // TODO: Run multiple shards? - DoFn fn = context.ensureSerializable(doFn); - - SideInputReader sideInputReader = makeSideInputReader(context, sideInputs); - - // When evaluating via the DirectPipelineRunner, this output manager checks each output for - // illegal mutations when the next output comes along. We then verify again after finishBundle() - // The common case we expect this to catch is a user mutating an input in order to repeatedly - // emit "variations". - ImmutabilityCheckingOutputManager outputManager = - new ImmutabilityCheckingOutputManager<>( - fn.getClass().getSimpleName(), - new DoFnRunnerBase.ListOutputManager(), - outputs); - - DoFnRunner fnRunner = - DoFnRunners.createDefault( - context.getPipelineOptions(), - fn, - sideInputReader, - outputManager, - mainOutputTag, - sideOutputTags, - executionContext.getOrCreateStepContext(stepName, stepName), - context.getAddCounterMutator(), - input.getWindowingStrategy()); - - fnRunner.startBundle(); - - for (DirectPipelineRunner.ValueWithMetadata elem - : context.getPCollectionValuesWithMetadata(input)) { - if (elem.getValue() instanceof KV) { - // In case the DoFn needs keyed state, set the implicit keys to the keys - // in the input elements. - @SuppressWarnings("unchecked") - KV kvElem = (KV) elem.getValue(); - executionContext.setKey(kvElem.getKey()); - } else { - executionContext.setKey(elem.getKey()); - } - - // We check the input for mutations only through the call span of processElement. - // This will miss some cases, but the check is ad hoc and best effort. The common case - // is that the input is mutated to be used for output. - try { - MutationDetector inputMutationDetector = MutationDetectors.forValueWithCoder( - elem.getWindowedValue().getValue(), input.getCoder()); - @SuppressWarnings("unchecked") - WindowedValue windowedElem = ((WindowedValue) elem.getWindowedValue()); - fnRunner.processElement(windowedElem); - inputMutationDetector.verifyUnmodified(); - } catch (CoderException e) { - throw UserCodeException.wrap(e); - } catch (IllegalMutationException exn) { - throw new IllegalMutationException( - String.format("DoFn %s mutated input value %s of class %s (new value was %s)." - + " Input values must not be mutated in any way.", - fn.getClass().getSimpleName(), - exn.getSavedValue(), exn.getSavedValue().getClass(), exn.getNewValue()), - exn.getSavedValue(), - exn.getNewValue(), - exn); - } - } - - // Note that the input could have been retained and mutated prior to this final output, - // but for now it degrades readability too much to be worth trying to catch that particular - // corner case. - fnRunner.finishBundle(); - outputManager.verifyLatestOutputsUnmodified(); - } - - private static SideInputReader makeSideInputReader( - DirectPipelineRunner.EvaluationContext context, List> sideInputs) { - PTuple sideInputValues = PTuple.empty(); - for (PCollectionView view : sideInputs) { - sideInputValues = sideInputValues.and( - view.getTagInternal(), - context.getPCollectionView(view)); - } - return DirectSideInputReader.of(sideInputValues); - } - private static void populateDisplayData( DisplayData.Builder builder, DoFn fn, Class fnClass) { builder .include(fn) .add(DisplayData.item("fn", fnClass) - .withLabel("Transform Function")); - } - - /** - * A {@code DoFnRunner.OutputManager} that provides facilities for checking output values for - * illegal mutations. - * - *

    When used via the try-with-resources pattern, it is guaranteed that every value passed - * to {@link #output} will have been checked for illegal mutation. - */ - private static class ImmutabilityCheckingOutputManager - implements DoFnRunners.OutputManager, AutoCloseable { - - private final DoFnRunners.OutputManager underlyingOutputManager; - private final ConcurrentMap, MutationDetector> mutationDetectorForTag; - private final PCollectionTuple outputs; - private String doFnName; - - public ImmutabilityCheckingOutputManager( - String doFnName, - DoFnRunners.OutputManager underlyingOutputManager, - PCollectionTuple outputs) { - this.doFnName = doFnName; - this.underlyingOutputManager = underlyingOutputManager; - this.outputs = outputs; - this.mutationDetectorForTag = Maps.newConcurrentMap(); - } - - @Override - public void output(TupleTag tag, WindowedValue output) { - - // Skip verifying undeclared outputs, since we don't have coders for them. - if (outputs.has(tag)) { - try { - MutationDetector newDetector = - MutationDetectors.forValueWithCoder( - output.getValue(), outputs.get(tag).getCoder()); - MutationDetector priorDetector = mutationDetectorForTag.put(tag, newDetector); - verifyOutputUnmodified(priorDetector); - } catch (CoderException e) { - throw UserCodeException.wrap(e); - } - } - - // Actually perform the output. - underlyingOutputManager.output(tag, output); - } - - /** - * Throws {@link IllegalMutationException} if the prior output for any tag has been mutated - * since being output. - */ - public void verifyLatestOutputsUnmodified() { - for (MutationDetector detector : mutationDetectorForTag.values()) { - verifyOutputUnmodified(detector); - } - } - - /** - * Adapts the error message from the provided {@code detector}. - * - *

    The {@code detector} may be null, in which case no check is performed. This is merely - * to consolidate null checking to this method. - */ - private void verifyOutputUnmodified(@Nullable MutationDetector detector) { - if (detector == null) { - return; - } - - try { - detector.verifyUnmodified(); - } catch (IllegalMutationException exn) { - throw new IllegalMutationException(String.format( - "DoFn %s mutated value %s after it was output (new value was %s)." - + " Values must not be mutated in any way after being output.", - doFnName, exn.getSavedValue(), exn.getNewValue()), - exn.getSavedValue(), exn.getNewValue(), - exn); - } - } - - /** - * When used in a {@code try}-with-resources block, verifies all of the latest outputs upon - * {@link #close()}. - */ - @Override - public void close() { - verifyLatestOutputsUnmodified(); - } + .withLabel("Transform Function")); } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java index 3df915b7f37c..7a97c13d336c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java @@ -17,10 +17,8 @@ */ package org.apache.beam.sdk.transforms; -import org.apache.beam.sdk.runners.DirectPipelineRunner; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.util.PCollectionViews; -import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; @@ -445,27 +443,5 @@ public PCollectionView getView() { public PCollectionView apply(PCollection input) { return view; } - - static { - DirectPipelineRunner.registerDefaultTransformEvaluator( - CreatePCollectionView.class, - new DirectPipelineRunner.TransformEvaluator() { - @SuppressWarnings("rawtypes") - @Override - public void evaluate( - CreatePCollectionView transform, - DirectPipelineRunner.EvaluationContext context) { - evaluateTyped(transform, context); - } - - private void evaluateTyped( - CreatePCollectionView transform, - DirectPipelineRunner.EvaluationContext context) { - List> elems = - context.getPCollectionWindowedValues(context.getInput(transform)); - context.setPCollectionView(context.getOutput(transform), elems); - } - }); - } } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DirectModeExecutionContext.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DirectModeExecutionContext.java deleted file mode 100644 index 85e36dd6d14b..000000000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DirectModeExecutionContext.java +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import static com.google.common.base.Preconditions.checkNotNull; - -import org.apache.beam.sdk.runners.DirectPipelineRunner.ValueWithMetadata; -import org.apache.beam.sdk.util.state.InMemoryStateInternals; -import org.apache.beam.sdk.util.state.StateInternals; -import org.apache.beam.sdk.values.TupleTag; - -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; - -import java.util.List; -import java.util.Map; - -/** - * {@link ExecutionContext} for use in direct mode. - */ -public class DirectModeExecutionContext - extends BaseExecutionContext { - - private Object key; - private List> output = Lists.newArrayList(); - private Map, List>> sideOutputs = Maps.newHashMap(); - - protected DirectModeExecutionContext() {} - - public static DirectModeExecutionContext create() { - return new DirectModeExecutionContext(); - } - - @Override - protected StepContext createStepContext(String stepName, String transformName) { - return new StepContext(this, stepName, transformName); - } - - public Object getKey() { - return key; - } - - public void setKey(Object newKey) { - // The direct mode runner may reorder elements, so we need to keep - // around the state used for each key. - for (ExecutionContext.StepContext stepContext : getAllStepContexts()) { - ((StepContext) stepContext).switchKey(newKey); - } - key = newKey; - } - - @Override - public void noteOutput(WindowedValue outputElem) { - output.add(ValueWithMetadata.of(outputElem).withKey(getKey())); - } - - @Override - public void noteSideOutput(TupleTag tag, WindowedValue outputElem) { - List> output = sideOutputs.get(tag); - if (output == null) { - output = Lists.newArrayList(); - sideOutputs.put(tag, output); - } - output.add(ValueWithMetadata.of(outputElem).withKey(getKey())); - } - - public List> getOutput(@SuppressWarnings("unused") TupleTag tag) { - @SuppressWarnings({"unchecked", "rawtypes"}) // Cast not expressible without rawtypes - List> typedOutput = (List) output; - return typedOutput; - } - - public List> getSideOutput(TupleTag tag) { - if (sideOutputs.containsKey(tag)) { - @SuppressWarnings({"unchecked", "rawtypes"}) // Cast not expressible without rawtypes - List> typedOutput = (List) sideOutputs.get(tag); - return typedOutput; - } else { - return Lists.newArrayList(); - } - } - - /** - * {@link ExecutionContext.StepContext} used in direct mode. - */ - public static class StepContext extends BaseExecutionContext.StepContext { - - /** A map from each key to the state associated with it. */ - private final Map> stateInternals = Maps.newHashMap(); - private InMemoryStateInternals currentStateInternals = null; - - private StepContext(ExecutionContext executionContext, String stepName, String transformName) { - super(executionContext, stepName, transformName); - switchKey(null); - } - - public void switchKey(Object newKey) { - currentStateInternals = stateInternals.get(newKey); - if (currentStateInternals == null) { - currentStateInternals = InMemoryStateInternals.forKey(newKey); - stateInternals.put(newKey, currentStateInternals); - } - } - - @Override - public StateInternals stateInternals() { - return checkNotNull(currentStateInternals); - } - - @Override - public TimerInternals timerInternals() { - throw new UnsupportedOperationException("Direct mode cannot return timerInternals"); - } - } -} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java index 75861fe399fc..58b10a747b55 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java @@ -20,7 +20,6 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.runners.DirectPipelineRunner; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.DoFn; diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java index ea708e560725..8abfb05c6725 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java @@ -28,8 +28,8 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptions.CheckEnabled; import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.runners.DirectPipelineRunner; import org.apache.beam.sdk.runners.PipelineRunner; +import org.apache.beam.sdk.testing.CrashingRunner; import org.apache.beam.sdk.testing.ExpectedLogs; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; @@ -157,7 +157,7 @@ public void processElement(DoFn.ProcessContext c) { @Test public void testToString() { PipelineOptions options = PipelineOptionsFactory.as(PipelineOptions.class); - options.setRunner(DirectPipelineRunner.class); + options.setRunner(CrashingRunner.class); Pipeline pipeline = Pipeline.create(options); assertEquals("Pipeline#" + pipeline.hashCode(), pipeline.toString()); } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSourceTest.java index 774968f85ef9..cabfc215773b 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSourceTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSourceTest.java @@ -18,14 +18,12 @@ package org.apache.beam.sdk.io; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFrom; - import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.runners.DirectPipelineRunner; import org.apache.beam.sdk.runners.dataflow.TestCountingSource; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.RunnableOnService; @@ -123,10 +121,6 @@ public Void apply(Iterable> input) { private void test(boolean dedup, boolean timeBound) throws Exception { Pipeline p = TestPipeline.create(); - if (p.getOptions().getRunner() == DirectPipelineRunner.class) { - finalizeTracker = new ArrayList<>(); - TestCountingSource.setFinalizeTracker(finalizeTracker); - } TestCountingSource source = new TestCountingSource(Integer.MAX_VALUE).withoutSplitting(); if (dedup) { source = source.withDedup(); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/DirectPipelineRegistrarTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/DirectPipelineRegistrarTest.java deleted file mode 100644 index 92c48351ab3d..000000000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/DirectPipelineRegistrarTest.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.runners; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - -import org.apache.beam.sdk.options.DirectPipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsRegistrar; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; - -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -import java.util.ServiceLoader; - -/** Tests for {@link DirectPipelineRegistrar}. */ -@RunWith(JUnit4.class) -public class DirectPipelineRegistrarTest { - @Test - public void testCorrectOptionsAreReturned() { - assertEquals(ImmutableList.of(DirectPipelineOptions.class), - new DirectPipelineRegistrar.Options().getPipelineOptions()); - } - - @Test - public void testCorrectRunnersAreReturned() { - assertEquals(ImmutableList.of(DirectPipelineRunner.class), - new DirectPipelineRegistrar.Runner().getPipelineRunners()); - } - - @Test - public void testServiceLoaderForOptions() { - for (PipelineOptionsRegistrar registrar : - Lists.newArrayList(ServiceLoader.load(PipelineOptionsRegistrar.class).iterator())) { - if (registrar instanceof DirectPipelineRegistrar.Options) { - return; - } - } - fail("Expected to find " + DirectPipelineRegistrar.Options.class); - } - - @Test - public void testServiceLoaderForRunner() { - for (PipelineRunnerRegistrar registrar : - Lists.newArrayList(ServiceLoader.load(PipelineRunnerRegistrar.class).iterator())) { - if (registrar instanceof DirectPipelineRegistrar.Runner) { - return; - } - } - fail("Expected to find " + DirectPipelineRegistrar.Runner.class); - } -} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/DirectPipelineRunnerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/DirectPipelineRunnerTest.java deleted file mode 100644 index edf699672d9b..000000000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/DirectPipelineRunnerTest.java +++ /dev/null @@ -1,222 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.runners; - -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.hamcrest.Matchers.isA; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertThat; - -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.coders.AtomicCoder; -import org.apache.beam.sdk.coders.AvroCoder; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.io.AvroIO; -import org.apache.beam.sdk.io.ShardNameTemplate; -import org.apache.beam.sdk.io.TextIO; -import org.apache.beam.sdk.options.DirectPipelineOptions; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.util.IOChannelUtils; - -import com.google.common.collect.Iterables; -import com.google.common.io.Files; - -import org.apache.avro.file.DataFileReader; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -import java.io.File; -import java.io.InputStream; -import java.io.OutputStream; -import java.io.Serializable; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.List; - -/** Tests for {@link DirectPipelineRunner}. */ -@RunWith(JUnit4.class) -public class DirectPipelineRunnerTest implements Serializable { - @Rule public TemporaryFolder tmpFolder = new TemporaryFolder(); - @Rule public ExpectedException expectedException = ExpectedException.none(); - - @Test - public void testToString() { - PipelineOptions options = PipelineOptionsFactory.create(); - options.setRunner(DirectPipelineRunner.class); - DirectPipelineRunner runner = DirectPipelineRunner.fromOptions(options); - assertEquals("DirectPipelineRunner#" + runner.hashCode(), - runner.toString()); - } - - /** A {@link Coder} that fails during decoding. */ - private static class CrashingCoder extends AtomicCoder { - @Override - public void encode(T value, OutputStream stream, Context context) throws CoderException { - throw new CoderException("Called CrashingCoder.encode"); - } - - @Override - public T decode( - InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context) - throws CoderException { - throw new CoderException("Called CrashingCoder.decode"); - } - } - - /** A {@link DoFn} that outputs {@code 'hello'}. */ - private static class HelloDoFn extends DoFn { - @Override - public void processElement(DoFn.ProcessContext c) throws Exception { - c.output("hello"); - } - } - - @Test - public void testCoderException() { - PipelineOptions options = PipelineOptionsFactory.create(); - options.setRunner(DirectPipelineRunner.class); - Pipeline p = Pipeline.create(options); - - p.apply("CreateTestData", Create.of(42)) - .apply("CrashDuringCoding", ParDo.of(new HelloDoFn())) - .setCoder(new CrashingCoder()); - - expectedException.expect(RuntimeException.class); - expectedException.expectCause(isA(CoderException.class)); - p.run(); - } - - @Test - public void testDirectPipelineOptions() { - DirectPipelineOptions options = PipelineOptionsFactory.create().as(DirectPipelineOptions.class); - assertNull(options.getDirectPipelineRunnerRandomSeed()); - } - - @Test - public void testTextIOWriteWithDefaultShardingStrategy() throws Exception { - String prefix = IOChannelUtils.resolve(Files.createTempDir().toString(), "output"); - PipelineOptions options = PipelineOptionsFactory.create(); - options.setRunner(DirectPipelineRunner.class); - Pipeline p = Pipeline.create(options); - String[] expectedElements = new String[]{ "a", "b", "c", "d", "e", "f", "g", "h", "i" }; - p.apply(Create.of(expectedElements)) - .apply(TextIO.Write.to(prefix).withSuffix("txt")); - p.run(); - - String filename = - IOChannelUtils.constructName(prefix, ShardNameTemplate.INDEX_OF_MAX, ".txt", 0, 1); - List fileContents = - Files.readLines(new File(filename), StandardCharsets.UTF_8); - // Ensure that each file got at least one record - assertFalse(fileContents.isEmpty()); - - assertThat(fileContents, containsInAnyOrder(expectedElements)); - } - - @Test - public void testTextIOWriteWithLimitedNumberOfShards() throws Exception { - final int numShards = 3; - String prefix = IOChannelUtils.resolve(Files.createTempDir().toString(), "shardedOutput"); - PipelineOptions options = PipelineOptionsFactory.create(); - options.setRunner(DirectPipelineRunner.class); - Pipeline p = Pipeline.create(options); - String[] expectedElements = new String[]{ "a", "b", "c", "d", "e", "f", "g", "h", "i" }; - p.apply(Create.of(expectedElements)) - .apply(TextIO.Write.to(prefix).withNumShards(numShards).withSuffix("txt")); - p.run(); - - List allContents = new ArrayList<>(); - for (int i = 0; i < numShards; ++i) { - String shardFileName = - IOChannelUtils.constructName(prefix, ShardNameTemplate.INDEX_OF_MAX, ".txt", i, 3); - List shardFileContents = - Files.readLines(new File(shardFileName), StandardCharsets.UTF_8); - - // Ensure that each file got at least one record - assertFalse(shardFileContents.isEmpty()); - - allContents.addAll(shardFileContents); - } - - assertThat(allContents, containsInAnyOrder(expectedElements)); - } - - @Test - public void testAvroIOWriteWithDefaultShardingStrategy() throws Exception { - String prefix = IOChannelUtils.resolve(Files.createTempDir().toString(), "output"); - PipelineOptions options = PipelineOptionsFactory.create(); - options.setRunner(DirectPipelineRunner.class); - Pipeline p = Pipeline.create(options); - String[] expectedElements = new String[]{ "a", "b", "c", "d", "e", "f", "g", "h", "i" }; - p.apply(Create.of(expectedElements)) - .apply(AvroIO.Write.withSchema(String.class).to(prefix).withSuffix(".avro")); - p.run(); - - String filename = - IOChannelUtils.constructName(prefix, ShardNameTemplate.INDEX_OF_MAX, ".avro", 0, 1); - List fileContents = new ArrayList<>(); - Iterables.addAll(fileContents, DataFileReader.openReader( - new File(filename), AvroCoder.of(String.class).createDatumReader())); - - // Ensure that each file got at least one record - assertFalse(fileContents.isEmpty()); - - assertThat(fileContents, containsInAnyOrder(expectedElements)); - } - - @Test - public void testAvroIOWriteWithLimitedNumberOfShards() throws Exception { - final int numShards = 3; - String prefix = IOChannelUtils.resolve(Files.createTempDir().toString(), "shardedOutput"); - PipelineOptions options = PipelineOptionsFactory.create(); - options.setRunner(DirectPipelineRunner.class); - Pipeline p = Pipeline.create(options); - String[] expectedElements = new String[]{ "a", "b", "c", "d", "e", "f", "g", "h", "i" }; - p.apply(Create.of(expectedElements)) - .apply(AvroIO.Write.withSchema(String.class).to(prefix) - .withNumShards(numShards).withSuffix(".avro")); - p.run(); - - List allContents = new ArrayList<>(); - for (int i = 0; i < numShards; ++i) { - String shardFileName = - IOChannelUtils.constructName(prefix, ShardNameTemplate.INDEX_OF_MAX, ".avro", i, 3); - List shardFileContents = new ArrayList<>(); - Iterables.addAll(shardFileContents, DataFileReader.openReader( - new File(shardFileName), AvroCoder.of(String.class).createDatumReader())); - - // Ensure that each file got at least one record - assertFalse(shardFileContents.isEmpty()); - - allContents.addAll(shardFileContents); - } - - assertThat(allContents, containsInAnyOrder(expectedElements)); - } -} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/PipelineRunnerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/PipelineRunnerTest.java index 9313439d26f5..5d2e69d796bf 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/PipelineRunnerTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/PipelineRunnerTest.java @@ -22,6 +22,7 @@ import org.apache.beam.sdk.options.ApplicationNameOptions; import org.apache.beam.sdk.options.DirectPipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.CrashingRunner; import org.apache.beam.sdk.util.GcsUtil; import org.apache.beam.sdk.util.TestCredential; @@ -53,10 +54,10 @@ public void testLongName() { options.setAppName("test"); options.setProject("test"); options.setGcsUtil(mockGcsUtil); - options.setRunner(DirectPipelineRunner.class); + options.setRunner(CrashingRunner.class); options.setGcpCredential(new TestCredential()); PipelineRunner runner = PipelineRunner.fromOptions(options); - assertTrue(runner instanceof DirectPipelineRunner); + assertTrue(runner instanceof CrashingRunner); } @Test @@ -66,10 +67,10 @@ public void testShortName() { options.setAppName("test"); options.setProject("test"); options.setGcsUtil(mockGcsUtil); - options.setRunner(DirectPipelineRunner.class); + options.setRunner(CrashingRunner.class); options.setGcpCredential(new TestCredential()); PipelineRunner runner = PipelineRunner.fromOptions(options); - assertTrue(runner instanceof DirectPipelineRunner); + assertTrue(runner instanceof CrashingRunner); } @Test diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java index a0b508c0a749..b0ca70b9fd96 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java @@ -39,7 +39,6 @@ import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.coders.StandardCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.runners.DirectPipelineRunner; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.RunnableOnService; @@ -56,7 +55,6 @@ import org.apache.beam.sdk.transforms.windowing.SlidingWindows; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior; -import org.apache.beam.sdk.util.PerKeyCombineFnRunners; import org.apache.beam.sdk.util.PropertyNames; import org.apache.beam.sdk.util.common.ElementByteSizeObserver; import org.apache.beam.sdk.values.KV; @@ -87,7 +85,6 @@ import java.util.HashSet; import java.util.List; import java.util.Objects; -import java.util.Random; import java.util.Set; /** @@ -516,24 +513,6 @@ public void testCombinerNames() { assertThat(sum.getName(), Matchers.startsWith("Sum")); } - @Test - public void testAddInputsRandomly() { - TestCounter counter = new TestCounter(); - Combine.KeyedCombineFn< - String, Integer, TestCounter.Counter, Iterable> fn = - counter.asKeyedFn(); - - List accums = DirectPipelineRunner.TestCombineDoFn.addInputsRandomly( - PerKeyCombineFnRunners.create(fn), "bob", Arrays.asList(NUMBERS), new Random(42), - processContext); - - assertThat(accums, Matchers.contains( - counter.new Counter(3, 2, 0, 0), - counter.new Counter(131, 5, 0, 0), - counter.new Counter(8, 2, 0, 0), - counter.new Counter(1, 1, 0, 0))); - } - private static final SerializableFunction hotKeyFanout = new SerializableFunction() { @Override diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java index 4ce025d909a7..299def752e9f 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java @@ -30,9 +30,6 @@ import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.MapCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.options.DirectPipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.runners.DirectPipelineRunner; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.RunnableOnService; @@ -244,15 +241,9 @@ public void testWindowFnInvalidation() { Duration.standardMinutes(1))))); } - private Pipeline createTestDirectRunner() { - DirectPipelineOptions options = PipelineOptionsFactory.as(DirectPipelineOptions.class); - options.setRunner(DirectPipelineRunner.class); - return Pipeline.create(options); - } - @Test public void testInvalidWindowsDirect() { - Pipeline p = createTestDirectRunner(); + Pipeline p = TestPipeline.create(); List> ungroupedPairs = Arrays.asList(); @@ -297,7 +288,7 @@ public void testRemerge() { @Test public void testGroupByKeyDirectUnbounded() { - Pipeline p = createTestDirectRunner(); + Pipeline p = TestPipeline.create(); PCollection> input = p.apply( diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java index 18d39d7a9fea..5e6e6a3d1b14 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java @@ -33,9 +33,6 @@ import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.coders.VoidCoder; -import org.apache.beam.sdk.options.DirectPipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.runners.DirectPipelineRunner; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.RunnableOnService; @@ -1335,12 +1332,6 @@ public void testViewGetName() { assertEquals("View.AsMultimap", View.asMultimap().getName()); } - private Pipeline createTestDirectRunner() { - DirectPipelineOptions options = PipelineOptionsFactory.as(DirectPipelineOptions.class); - options.setRunner(DirectPipelineRunner.class); - return Pipeline.create(options); - } - private void testViewUnbounded( Pipeline pipeline, PTransform>, ? extends PCollectionView> view) { @@ -1378,51 +1369,51 @@ private void testViewNonmerging( @Test public void testViewUnboundedAsSingletonDirect() { - testViewUnbounded(createTestDirectRunner(), View.>asSingleton()); + testViewUnbounded(TestPipeline.create(), View.>asSingleton()); } @Test public void testViewUnboundedAsIterableDirect() { - testViewUnbounded(createTestDirectRunner(), View.>asIterable()); + testViewUnbounded(TestPipeline.create(), View.>asIterable()); } @Test public void testViewUnboundedAsListDirect() { - testViewUnbounded(createTestDirectRunner(), View.>asList()); + testViewUnbounded(TestPipeline.create(), View.>asList()); } @Test public void testViewUnboundedAsMapDirect() { - testViewUnbounded(createTestDirectRunner(), View.asMap()); + testViewUnbounded(TestPipeline.create(), View.asMap()); } @Test public void testViewUnboundedAsMultimapDirect() { - testViewUnbounded(createTestDirectRunner(), View.asMultimap()); + testViewUnbounded(TestPipeline.create(), View.asMultimap()); } @Test public void testViewNonmergingAsSingletonDirect() { - testViewNonmerging(createTestDirectRunner(), View.>asSingleton()); + testViewNonmerging(TestPipeline.create(), View.>asSingleton()); } @Test public void testViewNonmergingAsIterableDirect() { - testViewNonmerging(createTestDirectRunner(), View.>asIterable()); + testViewNonmerging(TestPipeline.create(), View.>asIterable()); } @Test public void testViewNonmergingAsListDirect() { - testViewNonmerging(createTestDirectRunner(), View.>asList()); + testViewNonmerging(TestPipeline.create(), View.>asList()); } @Test public void testViewNonmergingAsMapDirect() { - testViewNonmerging(createTestDirectRunner(), View.asMap()); + testViewNonmerging(TestPipeline.create(), View.asMap()); } @Test public void testViewNonmergingAsMultimapDirect() { - testViewNonmerging(createTestDirectRunner(), View.asMultimap()); + testViewNonmerging(TestPipeline.create(), View.asMultimap()); } } diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/DataflowExampleUtils.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/DataflowExampleUtils.java index 4914d4c0a47e..76df4d49f200 100644 --- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/DataflowExampleUtils.java +++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/DataflowExampleUtils.java @@ -17,6 +17,7 @@ */ package ${package}.common; +import org.apache.beam.runners.dataflow.BlockingDataflowPipelineRunner; import org.apache.beam.runners.dataflow.DataflowPipelineJob; import org.apache.beam.runners.dataflow.DataflowPipelineRunner; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; @@ -39,7 +40,6 @@ import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.options.BigQueryOptions; -import org.apache.beam.sdk.runners.DirectPipelineRunner; import org.apache.beam.sdk.transforms.IntraBundleParallelization; import org.apache.beam.sdk.util.Transport; import com.google.common.collect.Lists; @@ -250,17 +250,8 @@ public void startInjectorIfNeeded(String inputFile) { } } - /** - * Do some runner setup: check that the DirectPipelineRunner is not used in conjunction with - * streaming, and if streaming is specified, use the DataflowPipelineRunner. Return the streaming - * flag value. - */ public void setupRunner() { - if (options.isStreaming()) { - if (options.getRunner() == DirectPipelineRunner.class) { - throw new IllegalArgumentException( - "Processing of unbounded input sources is not supported with the DirectPipelineRunner."); - } + if (options.isStreaming() && options.getRunner().equals(BlockingDataflowPipelineRunner.class)) { // In order to cancel the pipelines automatically, // {@literal DataflowPipelineRunner} is forced to be used. options.setRunner(DataflowPipelineRunner.class); diff --git a/testing/travis/test_wordcount.sh b/testing/travis/test_wordcount.sh index 40e272418696..b00b0d642434 100755 --- a/testing/travis/test_wordcount.sh +++ b/testing/travis/test_wordcount.sh @@ -70,7 +70,7 @@ function run_via_mvn { local outfile_prefix="$(get_outfile_prefix "$name")" || exit 2 local cmd='mvn exec:java -f pom.xml -pl examples/java \ -Dexec.mainClass=org.apache.beam.examples.WordCount \ - -Dexec.args="--runner=DirectPipelineRunner --inputFile='"$input"' --output='"$outfile_prefix"'"' + -Dexec.args="--runner=InProcessPipelineRunner --inputFile='"$input"' --output='"$outfile_prefix"'"' echo "$name: Running $cmd" >&2 sh -c "$cmd" check_result_hash "$name" "$outfile_prefix" "$expected_hash" @@ -84,7 +84,7 @@ function run_bundled { local outfile_prefix="$(get_outfile_prefix "$name")" || exit 2 local cmd='java -cp '"$JAR_FILE"' \ org.apache.beam.examples.WordCount \ - --runner=DirectPipelineRunner \ + --runner=InProcessPipelineRunner \ --inputFile='"'$input'"' \ --output='"$outfile_prefix" echo "$name: Running $cmd" >&2