diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java index d74cd56735b6..8727cb55676f 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java @@ -25,9 +25,9 @@ import org.apache.beam.sdk.io.Write; import org.apache.beam.sdk.io.Write.Bound; import org.apache.beam.sdk.transforms.Count; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.GroupByKey; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Values; @@ -102,7 +102,7 @@ public PDone apply(PCollection input) { } @VisibleForTesting - static class KeyBasedOnCountFn extends OldDoFn> { + static class KeyBasedOnCountFn extends DoFn> { @VisibleForTesting static final int MIN_SHARDS_FOR_LOG = 3; @@ -116,7 +116,7 @@ static class KeyBasedOnCountFn extends OldDoFn> { this.randomExtraShards = extraShards; } - @Override + @ProcessElement public void processElement(ProcessContext c) throws Exception { if (maxShards == 0) { maxShards = calculateShards(c.sideInput(numRecords)); diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitorTest.java index 1c9b5a6da6d6..e8f2a7e7891a 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitorTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitorTest.java @@ -26,8 +26,8 @@ import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.View; @@ -60,9 +60,9 @@ public void getViewsReturnsViews() { p.apply("listCreate", Create.of("foo", "bar")) .apply( ParDo.of( - new OldDoFn() { - @Override - public void processElement(OldDoFn.ProcessContext c) + new DoFn() { + @ProcessElement + public void processElement(DoFn.ProcessContext c) throws Exception { c.output(Integer.toString(c.element().length())); } @@ -107,9 +107,9 @@ public void getValueToConsumersSucceeds() { PCollection transformed = created.apply( ParDo.of( - new OldDoFn() { - @Override - public void processElement(OldDoFn.ProcessContext c) + new DoFn() { + @ProcessElement + public void processElement(DoFn.ProcessContext c) throws Exception { c.output(Integer.toString(c.element().length())); } @@ -138,9 +138,9 @@ public void getUnfinalizedPValuesContainsDanglingOutputs() { PCollection transformed = created.apply( ParDo.of( - new OldDoFn() { - @Override - public void processElement(OldDoFn.ProcessContext c) + new DoFn() { + @ProcessElement + public void processElement(DoFn.ProcessContext c) throws Exception { c.output(Integer.toString(c.element().length())); } @@ -155,9 +155,9 @@ public void getUnfinalizedPValuesEmpty() { p.apply(Create.of("1", "2", "3")) .apply( ParDo.of( - new OldDoFn() { - @Override - public void processElement(OldDoFn.ProcessContext c) + new DoFn() { + @ProcessElement + public void processElement(DoFn.ProcessContext c) throws Exception { c.output(Integer.toString(c.element().length())); } @@ -180,9 +180,9 @@ public void getStepNamesContainsAllTransforms() { PCollection transformed = created.apply( ParDo.of( - new OldDoFn() { - @Override - public void processElement(OldDoFn.ProcessContext c) + new DoFn() { + @ProcessElement + public void processElement(DoFn.ProcessContext c) throws Exception { c.output(Integer.toString(c.element().length())); } diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java index 4027d259f79d..34a5469fac38 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java @@ -59,7 +59,6 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.MapElements; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.SimpleFunction; @@ -221,8 +220,8 @@ public KV apply(KV input) { @Test public void transformDisplayDataExceptionShouldFail() { - OldDoFn brokenDoFn = new OldDoFn() { - @Override + DoFn brokenDoFn = new DoFn() { + @ProcessElement public void processElement(ProcessContext c) throws Exception {} @Override @@ -242,7 +241,7 @@ public void populateDisplayData(DisplayData.Builder builder) { } /** - * Tests that a {@link OldDoFn} that mutates an output with a good equals() fails in the + * Tests that a {@link DoFn} that mutates an output with a good equals() fails in the * {@link DirectRunner}. */ @Test @@ -251,8 +250,9 @@ public void testMutatingOutputThenOutputDoFnError() throws Exception { pipeline .apply(Create.of(42)) - .apply(ParDo.of(new OldDoFn>() { - @Override public void processElement(ProcessContext c) { + .apply(ParDo.of(new DoFn>() { + @ProcessElement + public void processElement(ProcessContext c) { List outputList = Arrays.asList(1, 2, 3, 4); c.output(outputList); outputList.set(0, 37); @@ -267,7 +267,7 @@ public void testMutatingOutputThenOutputDoFnError() throws Exception { } /** - * Tests that a {@link OldDoFn} that mutates an output with a good equals() fails in the + * Tests that a {@link DoFn} that mutates an output with a good equals() fails in the * {@link DirectRunner}. */ @Test @@ -276,8 +276,9 @@ public void testMutatingOutputThenTerminateDoFnError() throws Exception { pipeline .apply(Create.of(42)) - .apply(ParDo.of(new OldDoFn>() { - @Override public void processElement(ProcessContext c) { + .apply(ParDo.of(new DoFn>() { + @ProcessElement + public void processElement(ProcessContext c) { List outputList = Arrays.asList(1, 2, 3, 4); c.output(outputList); outputList.set(0, 37); @@ -291,7 +292,7 @@ public void testMutatingOutputThenTerminateDoFnError() throws Exception { } /** - * Tests that a {@link OldDoFn} that mutates an output with a bad equals() still fails + * Tests that a {@link DoFn} that mutates an output with a bad equals() still fails * in the {@link DirectRunner}. */ @Test @@ -300,8 +301,9 @@ public void testMutatingOutputCoderDoFnError() throws Exception { pipeline .apply(Create.of(42)) - .apply(ParDo.of(new OldDoFn() { - @Override public void processElement(ProcessContext c) { + .apply(ParDo.of(new DoFn() { + @ProcessElement + public void processElement(ProcessContext c) { byte[] outputArray = new byte[]{0x1, 0x2, 0x3}; c.output(outputArray); outputArray[0] = 0xa; @@ -316,7 +318,7 @@ public void testMutatingOutputCoderDoFnError() throws Exception { } /** - * Tests that a {@link OldDoFn} that mutates its input with a good equals() fails in the + * Tests that a {@link DoFn} that mutates its input with a good equals() fails in the * {@link DirectRunner}. */ @Test @@ -326,8 +328,9 @@ public void testMutatingInputDoFnError() throws Exception { pipeline .apply(Create.of(Arrays.asList(1, 2, 3), Arrays.asList(4, 5, 6)) .withCoder(ListCoder.of(VarIntCoder.of()))) - .apply(ParDo.of(new OldDoFn, Integer>() { - @Override public void processElement(ProcessContext c) { + .apply(ParDo.of(new DoFn, Integer>() { + @ProcessElement + public void processElement(ProcessContext c) { List inputList = c.element(); inputList.set(0, 37); c.output(12); @@ -341,7 +344,7 @@ public void testMutatingInputDoFnError() throws Exception { } /** - * Tests that a {@link OldDoFn} that mutates an input with a bad equals() still fails + * Tests that a {@link DoFn} that mutates an input with a bad equals() still fails * in the {@link DirectRunner}. */ @Test @@ -350,8 +353,9 @@ public void testMutatingInputCoderDoFnError() throws Exception { pipeline .apply(Create.of(new byte[]{0x1, 0x2, 0x3}, new byte[]{0x4, 0x5, 0x6})) - .apply(ParDo.of(new OldDoFn() { - @Override public void processElement(ProcessContext c) { + .apply(ParDo.of(new DoFn() { + @ProcessElement + public void processElement(ProcessContext c) { byte[] inputArray = c.element(); inputArray[0] = 0xa; c.output(13); diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java index d445944fd6d2..ea441256d560 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java @@ -26,7 +26,7 @@ import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; @@ -179,9 +179,9 @@ public void mutationAfterAddCreateBundleThrows() { intermediate.commit(Instant.now()); } - private static class IdentityDoFn extends OldDoFn { - @Override - public void processElement(OldDoFn.ProcessContext c) throws Exception { + private static class IdentityDoFn extends DoFn { + @ProcessElement + public void processElement(ProcessContext c) throws Exception { c.output(c.element()); } } diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java index 812d7d562070..a7277fef90a7 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java @@ -24,7 +24,7 @@ import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.util.IllegalMutationException; import org.apache.beam.sdk.util.WindowedValue; @@ -57,9 +57,9 @@ public void setup() { p.apply(Create.of("foo".getBytes(), "spamhameggs".getBytes())) .apply( ParDo.of( - new OldDoFn() { - @Override - public void processElement(OldDoFn.ProcessContext c) + new DoFn() { + @ProcessElement + public void processElement(ProcessContext c) throws Exception { c.element()[0] = 'b'; } diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java index ee6b2b4ed2e8..cf65936f77f6 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java @@ -31,9 +31,9 @@ import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.Keys; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.KV; @@ -177,9 +177,9 @@ public PCollection apply(PCollection input) { } } - private static class IdentityFn extends OldDoFn { - @Override - public void processElement(OldDoFn.ProcessContext c) throws Exception { + private static class IdentityFn extends DoFn { + @ProcessElement + public void processElement(ProcessContext c) throws Exception { c.output(c.element()); } } diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java index 1a742f0a006b..6d00aa19de03 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java @@ -37,7 +37,7 @@ import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -168,7 +168,7 @@ private ParDoEvaluator createEvaluator( ImmutableMap., PCollection>of(mainOutputTag, output)); } - private static class RecorderFn extends OldDoFn { + private static class RecorderFn extends DoFn { private Collection processed; private final PCollectionView view; @@ -177,8 +177,8 @@ public RecorderFn(PCollectionView view) { this.view = view; } - @Override - public void processElement(OldDoFn.ProcessContext c) throws Exception { + @ProcessElement + public void processElement(ProcessContext c) throws Exception { processed.add(c.element()); c.output(c.element() + c.sideInput(view)); } diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java index 8b0070b5a0fd..cc83323488d0 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java @@ -32,7 +32,7 @@ import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.ParDo.BoundMulti; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -41,11 +41,16 @@ import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.TimeDomain; +import org.apache.beam.sdk.util.Timer; import org.apache.beam.sdk.util.TimerInternals.TimerData; +import org.apache.beam.sdk.util.TimerSpec; +import org.apache.beam.sdk.util.TimerSpecs; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.state.BagState; import org.apache.beam.sdk.util.state.StateNamespace; import org.apache.beam.sdk.util.state.StateNamespaces; +import org.apache.beam.sdk.util.state.StateSpec; +import org.apache.beam.sdk.util.state.StateSpecs; import org.apache.beam.sdk.util.state.StateTag; import org.apache.beam.sdk.util.state.StateTags; import org.apache.beam.sdk.util.state.WatermarkHoldState; @@ -81,8 +86,8 @@ public void testParDoMultiInMemoryTransformEvaluator() throws Exception { BoundMulti> pardo = ParDo.of( - new OldDoFn>() { - @Override + new DoFn>() { + @ProcessElement public void processElement(ProcessContext c) { c.output(KV.of(c.element(), c.element().length())); c.sideOutput(elementTag, c.element()); @@ -170,8 +175,8 @@ public void testParDoMultiUndeclaredSideOutput() throws Exception { BoundMulti> pardo = ParDo.of( - new OldDoFn>() { - @Override + new DoFn>() { + @ProcessElement public void processElement(ProcessContext c) { c.output(KV.of(c.element(), c.element().length())); c.sideOutput(elementTag, c.element()); @@ -258,20 +263,17 @@ public void finishBundleWithStatePutsStateInResult() throws Exception { StateNamespaces.window(GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE); BoundMulti> pardo = ParDo.of( - new OldDoFn>() { - @Override - public void processElement(ProcessContext c) { - c.windowingInternals() - .stateInternals() - .state(StateNamespaces.global(), watermarkTag) - .add(new Instant(20202L + c.element().length())); - c.windowingInternals() - .stateInternals() - .state( - StateNamespaces.window( - GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE), - bagTag) - .add(c.element()); + new DoFn>() { + private static final String STATE_ID = "my-state-id"; + + @StateId(STATE_ID) + private final StateSpec> bagSpec = + StateSpecs.bag(StringUtf8Coder.of()); + + @ProcessElement + public void processElement( + ProcessContext c, @StateId(STATE_ID) BagState bagState) { + bagState.add(c.element()); } }) .withOutputTags(mainOutputTag, TupleTagList.of(elementTag)); @@ -362,34 +364,25 @@ public void finishBundleWithStateAndTimersPutsTimersInResult() throws Exception BoundMulti> pardo = ParDo.of( - new OldDoFn>() { - @Override - public void processElement(ProcessContext c) { - c.windowingInternals().stateInternals(); - c.windowingInternals() - .timerInternals() - .setTimer( - TimerData.of( - StateNamespaces.window( - IntervalWindow.getCoder(), - new IntervalWindow( - new Instant(0).plus(Duration.standardMinutes(5)), - new Instant(1) - .plus(Duration.standardMinutes(5)) - .plus(Duration.standardHours(1)))), - new Instant(54541L), - TimeDomain.EVENT_TIME)); - c.windowingInternals() - .timerInternals() - .deleteTimer( - TimerData.of( - StateNamespaces.window( - IntervalWindow.getCoder(), - new IntervalWindow( - new Instant(0), - new Instant(0).plus(Duration.standardHours(1)))), - new Instant(3400000), - TimeDomain.SYNCHRONIZED_PROCESSING_TIME)); + new DoFn>() { + private static final String EVENT_TIME_TIMER = "event-time-timer"; + private static final String SYNC_PROC_TIME_TIMER = "sync-proc-time-timer"; + + @TimerId(EVENT_TIME_TIMER) + TimerSpec myTimerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME); + + @TimerId(SYNC_PROC_TIME_TIMER) + TimerSpec syncProcTimerSpec = + TimerSpecs.timer(TimeDomain.SYNCHRONIZED_PROCESSING_TIME); + + @ProcessElement + public void processElement( + ProcessContext c, + @TimerId(EVENT_TIME_TIMER) Timer eventTimeTimer, + @TimerId(SYNC_PROC_TIME_TIMER) Timer syncProcTimeTimer) { + + eventTimeTimer.setForNowPlus(Duration.standardMinutes(5)); + syncProcTimeTimer.cancel(); } }) .withOutputTags(mainOutputTag, TupleTagList.of(elementTag)); diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java index e562b2867290..d22643a06d04 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java @@ -32,22 +32,25 @@ import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; -import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.TimeDomain; +import org.apache.beam.sdk.util.Timer; import org.apache.beam.sdk.util.TimerInternals.TimerData; +import org.apache.beam.sdk.util.TimerSpec; +import org.apache.beam.sdk.util.TimerSpecs; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.state.BagState; import org.apache.beam.sdk.util.state.StateNamespace; import org.apache.beam.sdk.util.state.StateNamespaces; +import org.apache.beam.sdk.util.state.StateSpec; +import org.apache.beam.sdk.util.state.StateSpecs; import org.apache.beam.sdk.util.state.StateTag; import org.apache.beam.sdk.util.state.StateTags; -import org.apache.beam.sdk.util.state.WatermarkHoldState; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TupleTag; @@ -74,8 +77,8 @@ public void testParDoInMemoryTransformEvaluator() throws Exception { PCollection collection = input.apply( ParDo.of( - new OldDoFn() { - @Override + new DoFn() { + @ProcessElement public void processElement(ProcessContext c) { c.output(c.element().length()); } @@ -128,8 +131,8 @@ public void testSideOutputToUndeclaredSideOutputSucceeds() throws Exception { PCollection collection = input.apply( ParDo.of( - new OldDoFn() { - @Override + new DoFn() { + @ProcessElement public void processElement(ProcessContext c) { c.sideOutput(sideOutputTag, c.element().length()); } @@ -178,26 +181,22 @@ public void finishBundleWithStatePutsStateInResult() throws Exception { PCollection input = p.apply(Create.of("foo", "bara", "bazam")); - final StateTag> watermarkTag = - StateTags.watermarkStateInternal("myId", OutputTimeFns.outputAtEarliestInputTimestamp()); final StateTag> bagTag = StateTags.bag("myBag", StringUtf8Coder.of()); final StateNamespace windowNs = StateNamespaces.window(GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE); ParDo.Bound> pardo = ParDo.of( - new OldDoFn>() { - @Override - public void processElement(ProcessContext c) { - c.windowingInternals() - .stateInternals() - .state(StateNamespaces.global(), watermarkTag) - .add(new Instant(124443L - c.element().length())); - c.windowingInternals() - .stateInternals() - .state( - StateNamespaces.window(GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE), - bagTag) - .add(c.element()); + new DoFn>() { + private static final String STATE_ID = "my-state-id"; + + @StateId(STATE_ID) + private final StateSpec> bagSpec = + StateSpecs.bag(StringUtf8Coder.of()); + + @ProcessElement + public void processElement( + ProcessContext c, @StateId(STATE_ID) BagState bagState) { + bagState.add(c.element()); } }); PCollection> mainOutput = input.apply(pardo); @@ -236,9 +235,6 @@ public void processElement(ProcessContext c) { TransformResult result = evaluator.finishBundle(); assertThat(result.getWatermarkHold(), equalTo(new Instant(124438L))); assertThat(result.getState(), not(nullValue())); - assertThat( - result.getState().state(StateNamespaces.global(), watermarkTag).read(), - equalTo(new Instant(124438L))); assertThat( result.getState().state(windowNs, bagTag).read(), containsInAnyOrder("foo", "bara", "bazam")); @@ -255,6 +251,8 @@ public void finishBundleWithStateAndTimersPutsTimersInResult() throws Exception PCollection input = p.apply(Create.of("foo", "bara", "bazam")); + // TODO: this timer data is absolute, but the new API only support relative settings. + // It will require adjustments when @Ignore is removed final TimerData addedTimer = TimerData.of( StateNamespaces.window( @@ -276,34 +274,24 @@ public void finishBundleWithStateAndTimersPutsTimersInResult() throws Exception ParDo.Bound> pardo = ParDo.of( - new OldDoFn>() { - @Override - public void processElement(ProcessContext c) { - c.windowingInternals().stateInternals(); - c.windowingInternals() - .timerInternals() - .setTimer( - TimerData.of( - StateNamespaces.window( - IntervalWindow.getCoder(), - new IntervalWindow( - new Instant(0).plus(Duration.standardMinutes(5)), - new Instant(1) - .plus(Duration.standardMinutes(5)) - .plus(Duration.standardHours(1)))), - new Instant(54541L), - TimeDomain.EVENT_TIME)); - c.windowingInternals() - .timerInternals() - .deleteTimer( - TimerData.of( - StateNamespaces.window( - IntervalWindow.getCoder(), - new IntervalWindow( - new Instant(0), - new Instant(0).plus(Duration.standardHours(1)))), - new Instant(3400000), - TimeDomain.SYNCHRONIZED_PROCESSING_TIME)); + new DoFn>() { + private static final String EVENT_TIME_TIMER = "event-time-timer"; + private static final String SYNC_PROC_TIME_TIMER = "sync-proc-time-timer"; + + @TimerId(EVENT_TIME_TIMER) + TimerSpec myTimerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME); + + @TimerId(SYNC_PROC_TIME_TIMER) + TimerSpec syncProcTimerSpec = + TimerSpecs.timer(TimeDomain.SYNCHRONIZED_PROCESSING_TIME); + + @ProcessElement + public void processElement( + ProcessContext c, + @TimerId(EVENT_TIME_TIMER) Timer eventTimeTimer, + @TimerId(SYNC_PROC_TIME_TIMER) Timer syncProcTimeTimer) { + eventTimeTimer.setForNowPlus(Duration.standardMinutes(5)); + syncProcTimeTimer.cancel(); } }); PCollection> mainOutput = input.apply(pardo); diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java index 042abab24f3e..19540054936a 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java @@ -47,9 +47,9 @@ import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Filter; import org.apache.beam.sdk.transforms.Flatten; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.WithKeys; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -101,9 +101,9 @@ public void setup() { createdInts = p.apply("createdInts", Create.of(1, 2, 3)); filtered = createdInts.apply("filtered", Filter.greaterThan(1)); - filteredTimesTwo = filtered.apply("timesTwo", ParDo.of(new OldDoFn() { - @Override - public void processElement(OldDoFn.ProcessContext c) throws Exception { + filteredTimesTwo = filtered.apply("timesTwo", ParDo.of(new DoFn() { + @ProcessElement + public void processElement(ProcessContext c) throws Exception { c.output(c.element() * 2); } })); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ValueWithRecordId.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ValueWithRecordId.java index d6b5fe3b4b38..abca59879b37 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ValueWithRecordId.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ValueWithRecordId.java @@ -31,7 +31,7 @@ import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.StandardCoder; -import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.DoFn; /** * Immutable struct containing a value as well as a unique id identifying the value. @@ -136,9 +136,9 @@ public Coder getValueCoder() { ByteArrayCoder idCoder; } - /** {@link OldDoFn} to turn a {@code ValueWithRecordId} back to the value {@code T}. */ - public static class StripIdsDoFn extends OldDoFn, T> { - @Override + /** {@link DoFn} to turn a {@code ValueWithRecordId} back to the value {@code T}. */ + public static class StripIdsDoFn extends DoFn, T> { + @ProcessElement public void processElement(ProcessContext c) { c.output(c.element().getValue()); }