From 19f407c9497c911ca3cb61d989aa5a78c84896cf Mon Sep 17 00:00:00 2001 From: Eugene Kirpichov Date: Fri, 7 Apr 2017 17:00:39 -0700 Subject: [PATCH 1/3] Clarifies doc of ProcessElement re: HasDefaultTracker --- .../src/main/java/org/apache/beam/sdk/transforms/DoFn.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java index de33612710fd..5139290dd497 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java @@ -40,6 +40,7 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.HasDisplayData; +import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; @@ -548,10 +549,11 @@ public interface OutputReceiver { * * *

A non-splittable {@link DoFn} must not define any of these methods. @@ -692,61 +698,9 @@ public interface OutputReceiver { @Experimental(Kind.SPLITTABLE_DO_FN) public @interface UnboundedPerElement {} - // This can't be put into ProcessContinuation itself due to the following problem: - // http://ternarysearch.blogspot.com/2013/07/static-initialization-deadlock.html - private static final ProcessContinuation PROCESS_CONTINUATION_STOP = - new AutoValue_DoFn_ProcessContinuation(false, Duration.ZERO, null); - - /** - * When used as a return value of {@link ProcessElement}, indicates whether there is more work to - * be done for the current element. - */ - @Experimental(Kind.SPLITTABLE_DO_FN) - @AutoValue - public abstract static class ProcessContinuation { - /** Indicates that there is no more work to be done for the current element. */ - public static ProcessContinuation stop() { - return PROCESS_CONTINUATION_STOP; - } - - /** Indicates that there is more work to be done for the current element. */ - public static ProcessContinuation resume() { - return new AutoValue_DoFn_ProcessContinuation(true, Duration.ZERO, null); - } - - /** - * If false, the {@link DoFn} promises that there is no more work remaining for the current - * element, so the runner should not resume the {@link ProcessElement} call. - */ - public abstract boolean shouldResume(); - - /** - * A minimum duration that should elapse between the end of this {@link ProcessElement} call and - * the {@link ProcessElement} call continuing processing of the same element. By default, zero. - */ - public abstract Duration resumeDelay(); - - /** - * A lower bound provided by the {@link DoFn} on timestamps of the output that will be emitted - * by future {@link ProcessElement} calls continuing processing of the current element. - * - *

A runner should treat an absent value as equivalent to the timestamp of the input element. - */ - @Nullable - public abstract Instant getWatermark(); - - /** Builder method to set the value of {@link #resumeDelay()}. */ - public ProcessContinuation withResumeDelay(Duration resumeDelay) { - return new AutoValue_DoFn_ProcessContinuation( - shouldResume(), resumeDelay, getWatermark()); - } - - /** Builder method to set the value of {@link #getWatermark()}. */ - public ProcessContinuation withWatermark(Instant watermark) { - return new AutoValue_DoFn_ProcessContinuation( - shouldResume(), resumeDelay(), watermark); - } - } + /** Do not use. See https://issues.apache.org/jira/browse/BEAM-1904 */ + @Deprecated + public class ProcessContinuation {} /** * Returns an {@link Aggregator} with aggregation logic specified by the {@link CombineFn} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java index 01f0291e527b..88f40356bcbe 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java @@ -671,6 +671,11 @@ public PaneInfo pane() { return element.getPane(); } + @Override + public void updateWatermark(Instant watermark) { + throw new UnsupportedOperationException(); + } + @Override public PipelineOptions getPipelineOptions() { return context.getPipelineOptions(); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java index 6746d3a81e13..4b0cbf74cf65 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java @@ -50,6 +50,7 @@ import net.bytebuddy.implementation.bytecode.assign.Assigner; import net.bytebuddy.implementation.bytecode.assign.Assigner.Typing; import net.bytebuddy.implementation.bytecode.assign.TypeCasting; +import net.bytebuddy.implementation.bytecode.constant.NullConstant; import net.bytebuddy.implementation.bytecode.constant.TextConstant; import net.bytebuddy.implementation.bytecode.member.FieldAccess; import net.bytebuddy.implementation.bytecode.member.MethodInvocation; @@ -625,17 +626,6 @@ public StackManipulation dispatch(TimerParameter p) { * {@link ProcessElement} method. */ private static final class ProcessElementDelegation extends DoFnMethodDelegation { - private static final MethodDescription PROCESS_CONTINUATION_STOP_METHOD; - - static { - try { - PROCESS_CONTINUATION_STOP_METHOD = - new MethodDescription.ForLoadedMethod(DoFn.ProcessContinuation.class.getMethod("stop")); - } catch (NoSuchMethodException e) { - throw new RuntimeException("Failed to locate ProcessContinuation.stop()"); - } - } - private final DoFnSignature.ProcessElementMethod signature; /** Implementation of {@link MethodDelegation} for the {@link ProcessElement} method. */ @@ -672,12 +662,8 @@ protected StackManipulation beforeDelegation(MethodDescription instrumentedMetho @Override protected StackManipulation afterDelegation(MethodDescription instrumentedMethod) { - if (TypeDescription.VOID.equals(targetMethod.getReturnType().asErasure())) { - return new StackManipulation.Compound( - MethodInvocation.invoke(PROCESS_CONTINUATION_STOP_METHOD), MethodReturn.REFERENCE); - } else { - return MethodReturn.of(targetMethod.getReturnType().asErasure()); - } + return new StackManipulation.Compound( + NullConstant.INSTANCE, MethodReturn.REFERENCE); } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java index 85831a7c45f9..cc06e70d070c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java @@ -53,8 +53,8 @@ public interface DoFnInvoker { * Invoke the {@link DoFn.ProcessElement} method on the bound {@link DoFn}. * * @param extra Factory for producing extra parameter objects (such as window), if necessary. - * @return The {@link DoFn.ProcessContinuation} returned by the underlying method, or {@link - * DoFn.ProcessContinuation#stop()} if it returns {@code void}. + * @return {@code null} - see JIRA + * tracking the complete removal of {@link DoFn.ProcessContinuation}. */ DoFn.ProcessContinuation invokeProcessElement(ArgumentProvider extra); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java index 007d8be6b00f..1be741f2db39 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java @@ -28,7 +28,6 @@ import javax.annotation.Nullable; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation; import org.apache.beam.sdk.transforms.DoFn.StateId; import org.apache.beam.sdk.transforms.DoFn.TimerId; import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.RestrictionTrackerParameter; @@ -397,21 +396,16 @@ public abstract static class ProcessElementMethod implements MethodWithExtraPara @Nullable public abstract TypeDescriptor windowT(); - /** Whether this {@link DoFn} returns a {@link ProcessContinuation} or void. */ - public abstract boolean hasReturnValue(); - static ProcessElementMethod create( Method targetMethod, List extraParameters, TypeDescriptor trackerT, - @Nullable TypeDescriptor windowT, - boolean hasReturnValue) { + @Nullable TypeDescriptor windowT) { return new AutoValue_DoFnSignature_ProcessElementMethod( targetMethod, Collections.unmodifiableList(extraParameters), trackerT, - windowT, - hasReturnValue); + windowT); } /** diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java index 006d012cbb00..80dbe1062cbb 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java @@ -17,8 +17,6 @@ */ package org.apache.beam.sdk.transforms.reflect; -import static com.google.common.base.Preconditions.checkState; - import com.google.auto.value.AutoValue; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Predicates; @@ -428,8 +426,6 @@ private static DoFnSignature parseSignature(Class> fnClass) *

  • If the {@link DoFn} (or any of its supertypes) is annotated as {@link * DoFn.BoundedPerElement} or {@link DoFn.UnboundedPerElement}, use that. Only one of * these must be specified. - *
  • If {@link DoFn.ProcessElement} returns {@link DoFn.ProcessContinuation}, assume it is - * unbounded. Otherwise (if it returns {@code void}), assume it is bounded. *
  • If {@link DoFn.ProcessElement} returns {@code void}, but the {@link DoFn} is annotated * {@link DoFn.UnboundedPerElement}, this is an error. * @@ -455,10 +451,7 @@ private static PCollection.IsBounded inferBoundedness( } if (processElement.isSplittable()) { if (isBounded == null) { - isBounded = - processElement.hasReturnValue() - ? PCollection.IsBounded.UNBOUNDED - : PCollection.IsBounded.BOUNDED; + isBounded = PCollection.IsBounded.BOUNDED; } } else { errors.checkArgument( @@ -467,7 +460,6 @@ private static PCollection.IsBounded inferBoundedness( + ((isBounded == PCollection.IsBounded.BOUNDED) ? DoFn.BoundedPerElement.class.getSimpleName() : DoFn.UnboundedPerElement.class.getSimpleName())); - checkState(!processElement.hasReturnValue(), "Should have been inferred splittable"); isBounded = PCollection.IsBounded.BOUNDED; } return isBounded; @@ -691,10 +683,8 @@ static DoFnSignature.ProcessElementMethod analyzeProcessElementMethod( TypeDescriptor outputT, FnAnalysisContext fnContext) { errors.checkArgument( - void.class.equals(m.getReturnType()) - || DoFn.ProcessContinuation.class.equals(m.getReturnType()), - "Must return void or %s", - DoFn.ProcessContinuation.class.getSimpleName()); + void.class.equals(m.getReturnType()), + "Must return void"); MethodAnalysisContext methodContext = MethodAnalysisContext.create(); @@ -734,11 +724,7 @@ static DoFnSignature.ProcessElementMethod analyzeProcessElementMethod( } return DoFnSignature.ProcessElementMethod.create( - m, - methodContext.getExtraParameters(), - trackerT, - windowT, - DoFn.ProcessContinuation.class.equals(m.getReturnType())); + m, methodContext.getExtraParameters(), trackerT, windowT); } private static void checkParameterOneOf( diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java index 87c7bfdb2b3c..0271a0d1f3f1 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java @@ -19,6 +19,9 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + +import org.apache.beam.sdk.transforms.DoFn; /** * A {@link RestrictionTracker} for claiming offsets in an {@link OffsetRange} in a monotonically @@ -27,6 +30,7 @@ public class OffsetRangeTracker implements RestrictionTracker { private OffsetRange range; private Long lastClaimedOffset = null; + private Long lastAttemptedOffset = null; public OffsetRangeTracker(OffsetRange range) { this.range = checkNotNull(range); @@ -59,12 +63,13 @@ public synchronized OffsetRange checkpoint() { */ public synchronized boolean tryClaim(long i) { checkArgument( - lastClaimedOffset == null || i > lastClaimedOffset, - "Trying to claim offset %s while last claimed was %s", + lastAttemptedOffset == null || i > lastAttemptedOffset, + "Trying to claim offset %s while last attempted was %s", i, - lastClaimedOffset); + lastAttemptedOffset); checkArgument( i >= range.getFrom(), "Trying to claim offset %s before start of the range %s", i, range); + lastAttemptedOffset = i; // No respective checkArgument for i < range.to() - it's ok to try claiming offsets beyond it. if (i >= range.getTo()) { return false; @@ -72,4 +77,26 @@ public synchronized boolean tryClaim(long i) { lastClaimedOffset = i; return true; } + + /** + * Marks that there are no more offsets to be claimed in the range. + * + *

    E.g., a {@link DoFn} reading a file and claiming the offset of each record in the file might + * call this if it hits EOF - even though the last attempted claim was before the end of the + * range, there are no more offsets to claim. + */ + public synchronized void markDone() { + lastAttemptedOffset = Long.MAX_VALUE; + } + + @Override + public synchronized void checkDone() throws IllegalStateException { + checkState( + lastAttemptedOffset >= range.getTo() - 1, + "Last attempted offset was %s in range %s, claiming work in [%s, %s) was not attempted", + lastAttemptedOffset, + range, + lastAttemptedOffset + 1, + range.getTo()); + } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java index e9b718e26959..27ef68f4a980 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java @@ -38,5 +38,13 @@ public interface RestrictionTracker { */ RestrictionT checkpoint(); + /** + * Called by the runner after {@link DoFn.ProcessElement} returns. + * + *

    Must throw an exception with an informative error message, if there is still any unclaimed + * work remaining in the restriction. + */ + void checkDone() throws IllegalStateException; + // TODO: Add the more general splitRemainderAfterFraction() and other methods. } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java index 154a088347b0..a122f673c5a5 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java @@ -18,8 +18,6 @@ package org.apache.beam.sdk.transforms; import static com.google.common.base.Preconditions.checkState; -import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.resume; -import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.stop; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -67,14 +65,10 @@ public class SplittableDoFnTest { static class PairStringWithIndexToLength extends DoFn> { @ProcessElement - public ProcessContinuation process(ProcessContext c, OffsetRangeTracker tracker) { + public void process(ProcessContext c, OffsetRangeTracker tracker) { for (long i = tracker.currentRestriction().getFrom(); tracker.tryClaim(i); ++i) { c.output(KV.of(c.element(), (int) i)); - if (i % 3 == 0) { - return resume(); - } } - return stop(); } @GetInitialRestriction @@ -196,19 +190,14 @@ private static int snapToNextBlock(int index, int[] blockStarts) { } @ProcessElement - public ProcessContinuation processElement(ProcessContext c, OffsetRangeTracker tracker) { + public void processElement(ProcessContext c, OffsetRangeTracker tracker) { int[] blockStarts = {-1, 0, 12, 123, 1234, 12345, 34567, MAX_INDEX}; int trueStart = snapToNextBlock((int) tracker.currentRestriction().getFrom(), blockStarts); - int trueEnd = snapToNextBlock((int) tracker.currentRestriction().getTo(), blockStarts); - for (int i = trueStart; i < trueEnd; ++i) { - if (!tracker.tryClaim(blockStarts[i])) { - return resume(); - } + for (int i = trueStart; tracker.tryClaim(blockStarts[i]); ++i) { for (int index = blockStarts[i]; index < blockStarts[i + 1]; ++index) { c.output(index); } } - return stop(); } @GetInitialRestriction diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java index 425f45363953..8b4df4c3e890 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java @@ -39,7 +39,6 @@ import org.apache.beam.sdk.coders.CustomCoder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation; import org.apache.beam.sdk.transforms.reflect.DoFnInvoker.FakeArgumentProvider; import org.apache.beam.sdk.transforms.reflect.testhelper.DoFnInvokersTestHelper; import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker; @@ -83,8 +82,8 @@ public void setUp() { when(mockArgumentProvider.processContext(Matchers.any())).thenReturn(mockProcessContext); } - private ProcessContinuation invokeProcessElement(DoFn fn) { - return DoFnInvokers.invokerFor(fn).invokeProcessElement(mockArgumentProvider); + private void invokeProcessElement(DoFn fn) { + DoFnInvokers.invokerFor(fn).invokeProcessElement(mockArgumentProvider); } private void invokeOnTimer(String timerId, DoFn fn) { @@ -113,7 +112,7 @@ class MockFn extends DoFn { public void processElement(ProcessContext c) throws Exception {} } MockFn mockFn = mock(MockFn.class); - assertEquals(ProcessContinuation.stop(), invokeProcessElement(mockFn)); + invokeProcessElement(mockFn); verify(mockFn).processElement(mockProcessContext); } @@ -134,7 +133,7 @@ public void processElement(DoFn.ProcessContext c) {} public void testDoFnWithProcessElementInterface() throws Exception { IdentityUsingInterfaceWithProcessElement fn = mock(IdentityUsingInterfaceWithProcessElement.class); - assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn)); + invokeProcessElement(fn); verify(fn).processElement(mockProcessContext); } @@ -155,14 +154,14 @@ public void process(DoFn.ProcessContext c) { @Test public void testDoFnWithMethodInSuperclass() throws Exception { IdentityChildWithoutOverride fn = mock(IdentityChildWithoutOverride.class); - assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn)); + invokeProcessElement(fn); verify(fn).process(mockProcessContext); } @Test public void testDoFnWithMethodInSubclass() throws Exception { IdentityChildWithOverride fn = mock(IdentityChildWithOverride.class); - assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn)); + invokeProcessElement(fn); verify(fn).process(mockProcessContext); } @@ -173,7 +172,7 @@ class MockFn extends DoFn { public void processElement(ProcessContext c, IntervalWindow w) throws Exception {} } MockFn fn = mock(MockFn.class); - assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn)); + invokeProcessElement(fn); verify(fn).processElement(mockProcessContext, mockWindow); } @@ -197,7 +196,7 @@ public void processElement(ProcessContext c, @StateId(stateId) ValueState { - @DoFn.ProcessElement - public ProcessContinuation processElement(ProcessContext c, SomeRestrictionTracker tracker) - throws Exception { - return null; - } - - @GetInitialRestriction - public SomeRestriction getInitialRestriction(String element) { - return null; - } - - @NewTracker - public SomeRestrictionTracker newTracker(SomeRestriction restriction) { - return null; - } - } - MockFn fn = mock(MockFn.class); - when(fn.processElement(mockProcessContext, null)).thenReturn(ProcessContinuation.resume()); - assertEquals(ProcessContinuation.resume(), invokeProcessElement(fn)); - } - @Test public void testDoFnWithStartBundleSetupTeardown() throws Exception { class MockFn extends DoFn { @@ -306,9 +281,7 @@ public SomeRestriction decode(InputStream inStream, Context context) { /** Public so Mockito can do "delegatesTo()" in the test below. */ public static class MockFn extends DoFn { @ProcessElement - public ProcessContinuation processElement(ProcessContext c, SomeRestrictionTracker tracker) { - return null; - } + public void processElement(ProcessContext c, SomeRestrictionTracker tracker) {} @GetInitialRestriction public SomeRestriction getInitialRestriction(String element) { @@ -360,7 +333,7 @@ public void splitRestriction( .splitRestriction( eq("blah"), same(restriction), Mockito.>any()); when(fn.newTracker(restriction)).thenReturn(tracker); - when(fn.processElement(mockProcessContext, tracker)).thenReturn(ProcessContinuation.resume()); + fn.processElement(mockProcessContext, tracker); assertEquals(coder, invoker.invokeGetRestrictionCoder(new CoderRegistry())); assertEquals(restriction, invoker.invokeGetInitialRestriction("blah")); @@ -376,8 +349,6 @@ public void output(SomeRestriction output) { }); assertEquals(Arrays.asList(part1, part2, part3), outputs); assertEquals(tracker, invoker.invokeNewTracker(restriction)); - assertEquals( - ProcessContinuation.resume(), invoker.invokeProcessElement( new FakeArgumentProvider() { @Override @@ -389,7 +360,7 @@ public DoFn.ProcessContext processContext(DoFn f public RestrictionTracker restrictionTracker() { return tracker; } - })); + }); } private static class RestrictionWithDefaultTracker @@ -410,6 +381,9 @@ public RestrictionWithDefaultTracker currentRestriction() { public RestrictionWithDefaultTracker checkpoint() { throw new UnsupportedOperationException(); } + + @Override + public void checkDone() throws IllegalStateException {} } private static class CoderForDefaultTracker extends CustomCoder { @@ -459,8 +433,7 @@ public void output(String output) { assertEquals("foo", output); } }); - assertEquals( - ProcessContinuation.stop(), invoker.invokeProcessElement(mockArgumentProvider)); + invoker.invokeProcessElement(mockArgumentProvider); assertThat( invoker.invokeNewTracker(new RestrictionWithDefaultTracker()), instanceOf(DefaultTracker.class)); @@ -550,14 +523,14 @@ public void processThis(ProcessContext c) {} @Test public void testLocalPrivateDoFnClass() throws Exception { PrivateDoFnClass fn = mock(PrivateDoFnClass.class); - assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn)); + invokeProcessElement(fn); verify(fn).processThis(mockProcessContext); } @Test public void testStaticPackagePrivateDoFnClass() throws Exception { DoFn fn = mock(DoFnInvokersTestHelper.newStaticPackagePrivateDoFn().getClass()); - assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn)); + invokeProcessElement(fn); DoFnInvokersTestHelper.verifyStaticPackagePrivateDoFn(fn, mockProcessContext); } @@ -565,28 +538,28 @@ public void testStaticPackagePrivateDoFnClass() throws Exception { public void testInnerPackagePrivateDoFnClass() throws Exception { DoFn fn = mock(new DoFnInvokersTestHelper().newInnerPackagePrivateDoFn().getClass()); - assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn)); + invokeProcessElement(fn); DoFnInvokersTestHelper.verifyInnerPackagePrivateDoFn(fn, mockProcessContext); } @Test public void testStaticPrivateDoFnClass() throws Exception { DoFn fn = mock(DoFnInvokersTestHelper.newStaticPrivateDoFn().getClass()); - assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn)); + invokeProcessElement(fn); DoFnInvokersTestHelper.verifyStaticPrivateDoFn(fn, mockProcessContext); } @Test public void testInnerPrivateDoFnClass() throws Exception { DoFn fn = mock(new DoFnInvokersTestHelper().newInnerPrivateDoFn().getClass()); - assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn)); + invokeProcessElement(fn); DoFnInvokersTestHelper.verifyInnerPrivateDoFn(fn, mockProcessContext); } @Test public void testAnonymousInnerDoFn() throws Exception { DoFn fn = mock(new DoFnInvokersTestHelper().newInnerAnonymousDoFn().getClass()); - assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn)); + invokeProcessElement(fn); DoFnInvokersTestHelper.verifyInnerAnonymousDoFn(fn, mockProcessContext); } @@ -594,7 +567,7 @@ public void testAnonymousInnerDoFn() throws Exception { public void testStaticAnonymousDoFnInOtherPackage() throws Exception { // Can't use mockito for this one - the anonymous class is final and can't be mocked. DoFn fn = DoFnInvokersTestHelper.newStaticAnonymousDoFn(); - assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn)); + invokeProcessElement(fn); DoFnInvokersTestHelper.verifyStaticAnonymousDoFnInvoked(fn, mockProcessContext); } @@ -622,32 +595,6 @@ public DoFn.ProcessContext processContext(DoFn() { - @ProcessElement - public ProcessContinuation processElement( - @SuppressWarnings("unused") ProcessContext c, SomeRestrictionTracker tracker) { - throw new IllegalArgumentException("bogus"); - } - - @GetInitialRestriction - public SomeRestriction getInitialRestriction(Integer element) { - return null; - } - - @NewTracker - public SomeRestrictionTracker newTracker(SomeRestriction restriction) { - return null; - } - }) - .invokeProcessElement(new FakeArgumentProvider()); - } - @Test public void testStartBundleException() throws Exception { DoFnInvoker invoker = diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java index 44ae5c4f2425..d321f54d68bd 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java @@ -50,7 +50,7 @@ private void method(DoFn.ProcessContext c, Integer n) {} @Test public void testBadReturnType() throws Exception { thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("Must return void or ProcessContinuation"); + thrown.expectMessage("Must return void"); analyzeProcessElementMethod( new AnonymousMethod() { diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java index 052feb812b45..b937e84c5c96 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java @@ -59,20 +59,6 @@ private abstract static class SomeRestrictionTracker private abstract static class SomeRestrictionCoder implements Coder {} - @Test - public void testReturnsProcessContinuation() throws Exception { - DoFnSignature.ProcessElementMethod signature = - analyzeProcessElementMethod( - new AnonymousMethod() { - private DoFn.ProcessContinuation method( - DoFn.ProcessContext context) { - return null; - } - }); - - assertTrue(signature.hasReturnValue()); - } - @Test public void testHasRestrictionTracker() throws Exception { DoFnSignature.ProcessElementMethod signature = @@ -157,54 +143,6 @@ public void process(ProcessContext context) {} .isBoundedPerElement()); } - private static class BaseFnWithContinuation extends DoFn { - @ProcessElement - public ProcessContinuation processElement( - ProcessContext context, SomeRestrictionTracker tracker) { - return null; - } - - @GetInitialRestriction - public SomeRestriction getInitialRestriction(Integer element) { - return null; - } - - @NewTracker - public SomeRestrictionTracker newTracker(SomeRestriction restriction) { - return null; - } - } - - @Test - public void testSplittableIsBoundedByDefault() throws Exception { - assertEquals( - PCollection.IsBounded.UNBOUNDED, - DoFnSignatures - .getSignature(BaseFnWithContinuation.class) - .isBoundedPerElement()); - } - - @Test - public void testSplittableRespectsBoundednessAnnotation() throws Exception { - @BoundedPerElement - class BoundedFnWithContinuation extends BaseFnWithContinuation {} - - assertEquals( - PCollection.IsBounded.BOUNDED, - DoFnSignatures - .getSignature(BoundedFnWithContinuation.class) - .isBoundedPerElement()); - - @UnboundedPerElement - class UnboundedFnWithContinuation extends BaseFnWithContinuation {} - - assertEquals( - PCollection.IsBounded.UNBOUNDED, - DoFnSignatures - .getSignature(UnboundedFnWithContinuation.class) - .isBoundedPerElement()); - } - @Test public void testUnsplittableButDeclaresBounded() throws Exception { @BoundedPerElement @@ -234,10 +172,8 @@ public void process(ProcessContext context) {} public void testSplittableWithAllFunctions() throws Exception { class GoodSplittableDoFn extends DoFn { @ProcessElement - public ProcessContinuation processElement( - ProcessContext context, SomeRestrictionTracker tracker) { - return null; - } + public void processElement( + ProcessContext context, SomeRestrictionTracker tracker) {} @GetInitialRestriction public SomeRestriction getInitialRestriction(Integer element) { @@ -262,7 +198,6 @@ public SomeRestrictionCoder getRestrictionCoder() { DoFnSignature signature = DoFnSignatures.getSignature(GoodSplittableDoFn.class); assertEquals(SomeRestrictionTracker.class, signature.processElement().trackerT().getRawType()); assertTrue(signature.processElement().isSplittable()); - assertTrue(signature.processElement().hasReturnValue()); assertEquals( SomeRestriction.class, signature.getInitialRestriction().restrictionT().getRawType()); assertEquals(SomeRestriction.class, signature.splitRestriction().restrictionT().getRawType()); @@ -279,9 +214,7 @@ public SomeRestrictionCoder getRestrictionCoder() { public void testSplittableWithAllFunctionsGeneric() throws Exception { class GoodGenericSplittableDoFn extends DoFn { @ProcessElement - public ProcessContinuation processElement(ProcessContext context, TrackerT tracker) { - return null; - } + public void processElement(ProcessContext context, TrackerT tracker) {} @GetInitialRestriction public RestrictionT getInitialRestriction(Integer element) { @@ -309,7 +242,6 @@ public CoderT getRestrictionCoder() { SomeRestriction, SomeRestrictionTracker, SomeRestrictionCoder>() {}.getClass()); assertEquals(SomeRestrictionTracker.class, signature.processElement().trackerT().getRawType()); assertTrue(signature.processElement().isSplittable()); - assertTrue(signature.processElement().hasReturnValue()); assertEquals( SomeRestriction.class, signature.getInitialRestriction().restrictionT().getRawType()); assertEquals(SomeRestriction.class, signature.splitRestriction().restrictionT().getRawType()); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTrackerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTrackerTest.java index c8a530c655f0..831894ca9692 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTrackerTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTrackerTest.java @@ -95,7 +95,7 @@ public void testCheckpointAfterFailedClaim() throws Exception { @Test public void testNonMonotonicClaim() throws Exception { - expected.expectMessage("Trying to claim offset 103 while last claimed was 110"); + expected.expectMessage("Trying to claim offset 103 while last attempted was 110"); OffsetRangeTracker tracker = new OffsetRangeTracker(new OffsetRange(100, 200)); assertTrue(tracker.tryClaim(105)); assertTrue(tracker.tryClaim(110)); @@ -108,4 +108,51 @@ public void testClaimBeforeStartOfRange() throws Exception { OffsetRangeTracker tracker = new OffsetRangeTracker(new OffsetRange(100, 200)); tracker.tryClaim(90); } + + @Test + public void testCheckDoneAfterTryClaimPastEndOfRange() { + OffsetRangeTracker tracker = new OffsetRangeTracker(new OffsetRange(100, 200)); + assertTrue(tracker.tryClaim(150)); + assertTrue(tracker.tryClaim(175)); + assertFalse(tracker.tryClaim(220)); + tracker.checkDone(); + } + + @Test + public void testCheckDoneAfterTryClaimAtEndOfRange() { + OffsetRangeTracker tracker = new OffsetRangeTracker(new OffsetRange(100, 200)); + assertTrue(tracker.tryClaim(150)); + assertTrue(tracker.tryClaim(175)); + assertFalse(tracker.tryClaim(200)); + tracker.checkDone(); + } + + @Test + public void testCheckDoneAfterTryClaimRightBeforeEndOfRange() { + OffsetRangeTracker tracker = new OffsetRangeTracker(new OffsetRange(100, 200)); + assertTrue(tracker.tryClaim(150)); + assertTrue(tracker.tryClaim(175)); + assertTrue(tracker.tryClaim(199)); + tracker.checkDone(); + } + + @Test + public void testCheckDoneWhenNotDone() { + OffsetRangeTracker tracker = new OffsetRangeTracker(new OffsetRange(100, 200)); + assertTrue(tracker.tryClaim(150)); + assertTrue(tracker.tryClaim(175)); + expected.expectMessage( + "Last attempted offset was 175 in range [100, 200), " + + "claiming work in [176, 200) was not attempted"); + tracker.checkDone(); + } + + @Test + public void testCheckDoneWhenExplicitlyMarkedDone() { + OffsetRangeTracker tracker = new OffsetRangeTracker(new OffsetRange(100, 200)); + assertTrue(tracker.tryClaim(150)); + assertTrue(tracker.tryClaim(175)); + tracker.markDone(); + tracker.checkDone(); + } } From 29c280211c2431f29c5552c35bd3435c65e4975b Mon Sep 17 00:00:00 2001 From: Eugene Kirpichov Date: Fri, 7 Apr 2017 14:00:05 -0700 Subject: [PATCH 3/3] Adds tests for the watermark hold (previously untested) --- .../runners/core/SplittableParDoTest.java | 56 ++++++++++++++++++- 1 file changed, 54 insertions(+), 2 deletions(-) diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java index f8d60959beb2..d30111326363 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java @@ -19,6 +19,7 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.hasItems; import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -36,6 +37,7 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.BigEndianIntegerCoder; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.InstantCoder; import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; @@ -208,7 +210,7 @@ private static class ProcessFnTester< private Instant currentProcessingTime; private InMemoryTimerInternals timerInternals; - private InMemoryStateInternals stateInternals; + private TestInMemoryStateInternals stateInternals; ProcessFnTester( Instant currentProcessingTime, @@ -223,7 +225,7 @@ private static class ProcessFnTester< fn, inputCoder, restrictionCoder, IntervalWindow.getCoder()); this.tester = DoFnTester.of(processFn); this.timerInternals = new InMemoryTimerInternals(); - this.stateInternals = InMemoryStateInternals.forKey("dummy"); + this.stateInternals = new TestInMemoryStateInternals<>("dummy"); processFn.setStateInternalsFactory( new StateInternalsFactory() { @Override @@ -335,6 +337,9 @@ List takeOutputElements() { return tester.takeOutputElements(); } + public Instant getWatermarkHold() { + return stateInternals.earliestWatermarkHold(); + } } private static class OutputWindowedValueToDoFnTester @@ -425,6 +430,53 @@ public void testTrivialProcessFnPropagatesOutputsWindowsAndTimestamp() throws Ex } } + private static class WatermarkUpdateFn extends DoFn { + @ProcessElement + public void process(ProcessContext c, OffsetRangeTracker tracker) { + for (long i = tracker.currentRestriction().getFrom(); tracker.tryClaim(i); ++i) { + c.updateWatermark(c.element().plus(Duration.standardSeconds(i))); + c.output(String.valueOf(i)); + } + } + + @GetInitialRestriction + public OffsetRange getInitialRestriction(Instant elem) { + throw new IllegalStateException("Expected to be supplied explicitly in this test"); + } + + @NewTracker + public OffsetRangeTracker newTracker(OffsetRange range) { + return new OffsetRangeTracker(range); + } + } + + @Test + public void testUpdatesWatermark() throws Exception { + DoFn fn = new WatermarkUpdateFn(); + Instant base = Instant.now(); + + ProcessFnTester tester = + new ProcessFnTester<>( + base, + fn, + InstantCoder.of(), + SerializableCoder.of(OffsetRange.class), + 3, + MAX_BUNDLE_DURATION); + + tester.startElement(base, new OffsetRange(0, 8)); + assertThat(tester.takeOutputElements(), hasItems("0", "1", "2")); + assertEquals(base.plus(Duration.standardSeconds(2)), tester.getWatermarkHold()); + + assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(1))); + assertThat(tester.takeOutputElements(), hasItems("3", "4", "5")); + assertEquals(base.plus(Duration.standardSeconds(5)), tester.getWatermarkHold()); + + assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(1))); + assertThat(tester.takeOutputElements(), hasItems("6", "7")); + assertEquals(null, tester.getWatermarkHold()); + } + /** * A splittable {@link DoFn} that generates the sequence [init, init + total). */