Skip to content

Commit

Permalink
Revert ": [BEAM-6352] Revert PR#6467 to fix Watch transform from sweg…
Browse files Browse the repository at this point in the history
…ner/revert_pr6467"

This reverts commit 2bf5ead, reversing
changes made to 6a59667.
  • Loading branch information
lukecwik committed Mar 26, 2019
1 parent 5521415 commit 8faffb2
Show file tree
Hide file tree
Showing 33 changed files with 511 additions and 229 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PValue;
Expand Down Expand Up @@ -130,13 +129,12 @@ public void translate(ParDo.MultiOutput<InputT, OutputT> transform, TranslationC
}
}

static class SplittableProcessElementsTranslator<
InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker<RestrictionT, ?>>
implements TransformTranslator<ProcessElements<InputT, OutputT, RestrictionT, TrackerT>> {
static class SplittableProcessElementsTranslator<InputT, OutputT, RestrictionT, PositionT>
implements TransformTranslator<ProcessElements<InputT, OutputT, RestrictionT, PositionT>> {

@Override
public void translate(
ProcessElements<InputT, OutputT, RestrictionT, TrackerT> transform,
ProcessElements<InputT, OutputT, RestrictionT, PositionT> transform,
TranslationContext context) {

Map<TupleTag<?>, PValue> outputs = context.getOutputs();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,7 @@ public PCollectionTuple expand(PCollection<KV<byte[], KV<InputT, RestrictionT>>>
}
}

static class NaiveProcessFn<
InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker<RestrictionT, ?>>
static class NaiveProcessFn<InputT, OutputT, RestrictionT, PositionT>
extends DoFn<KV<InputT, RestrictionT>, OutputT> {
private final DoFn<InputT, OutputT> fn;

Expand Down Expand Up @@ -144,7 +143,7 @@ public void process(ProcessContext c, BoundedWindow w) {
InputT element = c.element().getKey();
RestrictionT restriction = c.element().getValue();
while (true) {
TrackerT tracker = invoker.invokeNewTracker(restriction);
RestrictionTracker<RestrictionT, PositionT> tracker = invoker.invokeNewTracker(restriction);
ProcessContinuation continuation =
invoker.invokeProcessElement(new NestedProcessContext<>(fn, c, element, w, tracker));
if (continuation.shouldResume()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,8 @@ private abstract static class SomeTracker extends RestrictionTracker<Void, Void>
private DoFn<KV<String, Integer>, Integer> splittableDoFn =
new DoFn<KV<String, Integer>, Integer>() {
@ProcessElement
public void processElement(ProcessContext context, SomeTracker tracker) {}
public void processElement(
ProcessContext context, RestrictionTracker<Void, Void> tracker) {}

@GetInitialRestriction
public Void getInitialRestriction(KV<String, Integer> element) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public SomeRestrictionTracker(SomeRestriction someRestriction) {
}

@Override
protected boolean tryClaimImpl(Void position) {
public boolean tryClaim(Void position) {
return false;
}

Expand All @@ -78,7 +78,8 @@ public void checkDone() {}

private static class BoundedFakeFn extends DoFn<Integer, String> {
@ProcessElement
public void processElement(ProcessContext context, SomeRestrictionTracker tracker) {}
public void processElement(
ProcessContext context, RestrictionTracker<SomeRestriction, Void> tracker) {}

@GetInitialRestriction
public SomeRestriction getInitialRestriction(Integer element) {
Expand All @@ -89,7 +90,7 @@ public SomeRestriction getInitialRestriction(Integer element) {
private static class UnboundedFakeFn extends DoFn<Integer, String> {
@ProcessElement
public ProcessContinuation processElement(
ProcessContext context, SomeRestrictionTracker tracker) {
ProcessContext context, RestrictionTracker<SomeRestriction, Void> tracker) {
return stop();
}

Expand Down
1 change: 1 addition & 0 deletions runners/core-java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ dependencies {
shadow project(path: ":beam-sdks-java-core", configuration: "shadow")
shadow project(path: ":beam-model-fn-execution", configuration: "shadow")
shadow project(path: ":beam-runners-core-construction-java", configuration: "shadow")
shadow project(path: ":beam-sdks-java-fn-execution", configuration: "shadow")
shadow library.java.vendored_guava_20_0
shadow library.java.joda_time
shadowTest project(path: ":beam-sdks-java-core", configuration: "shadowTest")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.beam.sdk.fn.splittabledofn.RestrictionTrackers;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.state.State;
import org.apache.beam.sdk.state.TimeDomain;
Expand Down Expand Up @@ -55,12 +56,8 @@
* outputs), or runs for the given duration.
*/
public class OutputAndTimeBoundedSplittableProcessElementInvoker<
InputT,
OutputT,
RestrictionT,
PositionT,
TrackerT extends RestrictionTracker<RestrictionT, PositionT>>
extends SplittableProcessElementInvoker<InputT, OutputT, RestrictionT, TrackerT> {
InputT, OutputT, RestrictionT, PositionT>
extends SplittableProcessElementInvoker<InputT, OutputT, RestrictionT, PositionT> {
private final DoFn<InputT, OutputT> fn;
private final PipelineOptions pipelineOptions;
private final OutputWindowedValue<OutputT> output;
Expand Down Expand Up @@ -106,9 +103,9 @@ public OutputAndTimeBoundedSplittableProcessElementInvoker(
public Result invokeProcessElement(
DoFnInvoker<InputT, OutputT> invoker,
final WindowedValue<InputT> element,
final TrackerT tracker) {
final RestrictionTracker<RestrictionT, PositionT> tracker) {
final ProcessContext processContext = new ProcessContext(element, tracker);
tracker.setClaimObserver(processContext);

DoFn.ProcessContinuation cont =
invoker.invokeProcessElement(
new DoFnInvoker.ArgumentProvider<InputT, OutputT>() {
Expand Down Expand Up @@ -156,7 +153,7 @@ public MultiOutputReceiver taggedOutputReceiver(DoFn<InputT, OutputT> doFn) {

@Override
public RestrictionTracker<?, ?> restrictionTracker() {
return tracker;
return processContext.tracker;
}

// Unsupported methods below.
Expand Down Expand Up @@ -226,7 +223,7 @@ public Timer timer(String timerId) {
// restriction that describes exactly the work that wasn't done in the current call.
if (processContext.numClaimedBlocks > 0) {
residual = checkNotNull(processContext.takeCheckpointNow());
tracker.checkDone();
processContext.tracker.checkDone();
} else {
// The call returned resume() without trying to claim any blocks, i.e. it is unaware
// of any work to be done at the moment, but more might emerge later. This is a valid
Expand Down Expand Up @@ -254,14 +251,14 @@ public Timer timer(String timerId) {
// ProcessElement call.
// In other words, if we took a checkpoint *after* ProcessElement completed (like in the
// branch above), it would have been equivalent to this one.
tracker.checkDone();
processContext.tracker.checkDone();
}
} else {
// The ProcessElement call returned stop() - that means the tracker's current restriction
// has been fully processed by the call. A checkpoint may or may not have been taken in
// "residual"; if it was, then we'll need to process it; if no, then we don't - nothing
// special needs to be done.
tracker.checkDone();
processContext.tracker.checkDone();
}
if (residual == null) {
// Can only be true if cont.shouldResume() is false and no checkpoint was taken.
Expand All @@ -273,9 +270,9 @@ public Timer timer(String timerId) {
}

private class ProcessContext extends DoFn<InputT, OutputT>.ProcessContext
implements RestrictionTracker.ClaimObserver<PositionT> {
implements RestrictionTrackers.ClaimObserver<PositionT> {
private final WindowedValue<InputT> element;
private final TrackerT tracker;
private final RestrictionTracker<RestrictionT, PositionT> tracker;
private int numClaimedBlocks;
private boolean hasClaimFailed;

Expand All @@ -293,10 +290,11 @@ private class ProcessContext extends DoFn<InputT, OutputT>.ProcessContext
private @Nullable Future<?> scheduledCheckpoint;
private @Nullable Instant lastReportedWatermark;

public ProcessContext(WindowedValue<InputT> element, TrackerT tracker) {
public ProcessContext(
WindowedValue<InputT> element, RestrictionTracker<RestrictionT, PositionT> tracker) {
fn.super();
this.element = element;
this.tracker = tracker;
this.tracker = RestrictionTrackers.observe(tracker, this);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,7 @@ public PCollectionTuple expand(PCollection<KV<byte[], KV<InputT, RestrictionT>>>
}

/** A primitive transform wrapping around {@link ProcessFn}. */
public static class ProcessElements<
InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker<RestrictionT, ?>>
public static class ProcessElements<InputT, OutputT, RestrictionT, PositionT>
extends PTransform<
PCollection<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>>, PCollectionTuple> {
private final ProcessKeyedElements<InputT, OutputT, RestrictionT> original;
Expand All @@ -165,7 +164,7 @@ public ProcessElements(ProcessKeyedElements<InputT, OutputT, RestrictionT> origi
this.original = original;
}

public ProcessFn<InputT, OutputT, RestrictionT, TrackerT> newProcessFn(
public ProcessFn<InputT, OutputT, RestrictionT, PositionT> newProcessFn(
DoFn<InputT, OutputT> fn) {
return new ProcessFn<>(
fn,
Expand Down Expand Up @@ -216,8 +215,7 @@ public PCollectionTuple expand(
* <p>See also: https://issues.apache.org/jira/browse/BEAM-1983
*/
@VisibleForTesting
public static class ProcessFn<
InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker<RestrictionT, ?>>
public static class ProcessFn<InputT, OutputT, RestrictionT, PositionT>
extends DoFn<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>, OutputT> {
/**
* The state cell containing a watermark hold for the output of this {@link DoFn}. The hold is
Expand Down Expand Up @@ -254,7 +252,7 @@ public static class ProcessFn<
private transient @Nullable StateInternalsFactory<byte[]> stateInternalsFactory;
private transient @Nullable TimerInternalsFactory<byte[]> timerInternalsFactory;
private transient @Nullable SplittableProcessElementInvoker<
InputT, OutputT, RestrictionT, TrackerT>
InputT, OutputT, RestrictionT, PositionT>
processElementInvoker;

private transient @Nullable DoFnInvoker<InputT, OutputT> invoker;
Expand Down Expand Up @@ -285,7 +283,7 @@ public void setTimerInternalsFactory(TimerInternalsFactory<byte[]> timerInternal
}

public void setProcessElementInvoker(
SplittableProcessElementInvoker<InputT, OutputT, RestrictionT, TrackerT> invoker) {
SplittableProcessElementInvoker<InputT, OutputT, RestrictionT, PositionT> invoker) {
this.processElementInvoker = invoker;
}

Expand Down Expand Up @@ -370,8 +368,9 @@ public void processElement(final ProcessContext c) {
elementAndRestriction = KV.of(elementState.read(), restrictionState.read());
}

final TrackerT tracker = invoker.invokeNewTracker(elementAndRestriction.getValue());
SplittableProcessElementInvoker<InputT, OutputT, RestrictionT, TrackerT>.Result result =
final RestrictionTracker<RestrictionT, PositionT> tracker =
invoker.invokeNewTracker(elementAndRestriction.getValue());
SplittableProcessElementInvoker<InputT, OutputT, RestrictionT, PositionT>.Result result =
processElementInvoker.invokeProcessElement(
invoker, elementAndRestriction.getKey(), tracker);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@
* A runner-specific hook for invoking a {@link DoFn.ProcessElement} method for a splittable {@link
* DoFn}, in particular, allowing the runner to access the {@link RestrictionTracker}.
*/
public abstract class SplittableProcessElementInvoker<
InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker<RestrictionT, ?>> {
public abstract class SplittableProcessElementInvoker<InputT, OutputT, RestrictionT, PositionT> {
/** Specifies how to resume a splittable {@link DoFn.ProcessElement} call. */
public class Result {
@Nullable private final RestrictionT residualRestriction;
Expand Down Expand Up @@ -84,5 +83,7 @@ public DoFn.ProcessContinuation getContinuation() {
* DoFn.ProcessContinuation}, and a future output watermark.
*/
public abstract Result invokeProcessElement(
DoFnInvoker<InputT, OutputT> invoker, WindowedValue<InputT> element, TrackerT tracker);
DoFnInvoker<InputT, OutputT> invoker,
WindowedValue<InputT> element,
RestrictionTracker<RestrictionT, PositionT> tracker);
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
Expand Down Expand Up @@ -66,7 +67,8 @@ private SomeFn(
}

@ProcessElement
public ProcessContinuation process(ProcessContext context, OffsetRangeTracker tracker) {
public ProcessContinuation process(
ProcessContext context, RestrictionTracker<OffsetRange, Long> tracker) {
Uninterruptibles.sleepUninterruptibly(
sleepBeforeFirstClaim.getMillis(), TimeUnit.MILLISECONDS);
for (long i = tracker.currentRestriction().getFrom(), numIterations = 1;
Expand All @@ -88,20 +90,19 @@ public OffsetRange getInitialRestriction(Void element) {
}
}

private SplittableProcessElementInvoker<Void, String, OffsetRange, OffsetRangeTracker>.Result
runTest(
int totalNumOutputs,
Duration sleepBeforeFirstClaim,
int numOutputsPerProcessCall,
Duration sleepBeforeEachOutput) {
private SplittableProcessElementInvoker<Void, String, OffsetRange, Long>.Result runTest(
int totalNumOutputs,
Duration sleepBeforeFirstClaim,
int numOutputsPerProcessCall,
Duration sleepBeforeEachOutput) {
SomeFn fn = new SomeFn(sleepBeforeFirstClaim, numOutputsPerProcessCall, sleepBeforeEachOutput);
OffsetRange initialRestriction = new OffsetRange(0, totalNumOutputs);
return runTest(fn, initialRestriction);
}

private SplittableProcessElementInvoker<Void, String, OffsetRange, OffsetRangeTracker>.Result
runTest(DoFn<Void, String> fn, OffsetRange initialRestriction) {
SplittableProcessElementInvoker<Void, String, OffsetRange, OffsetRangeTracker> invoker =
private SplittableProcessElementInvoker<Void, String, OffsetRange, Long>.Result runTest(
DoFn<Void, String> fn, OffsetRange initialRestriction) {
SplittableProcessElementInvoker<Void, String, OffsetRange, Long> invoker =
new OutputAndTimeBoundedSplittableProcessElementInvoker<>(
fn,
PipelineOptionsFactory.create(),
Expand Down Expand Up @@ -134,7 +135,7 @@ public <AdditionalOutputT> void outputWindowedValue(

@Test
public void testInvokeProcessElementOutputBounded() throws Exception {
SplittableProcessElementInvoker<Void, String, OffsetRange, OffsetRangeTracker>.Result res =
SplittableProcessElementInvoker<Void, String, OffsetRange, Long>.Result res =
runTest(10000, Duration.ZERO, Integer.MAX_VALUE, Duration.ZERO);
assertFalse(res.getContinuation().shouldResume());
OffsetRange residualRange = res.getResidualRestriction();
Expand All @@ -145,7 +146,7 @@ public void testInvokeProcessElementOutputBounded() throws Exception {

@Test
public void testInvokeProcessElementTimeBounded() throws Exception {
SplittableProcessElementInvoker<Void, String, OffsetRange, OffsetRangeTracker>.Result res =
SplittableProcessElementInvoker<Void, String, OffsetRange, Long>.Result res =
runTest(10000, Duration.ZERO, Integer.MAX_VALUE, Duration.millis(100));
assertFalse(res.getContinuation().shouldResume());
OffsetRange residualRange = res.getResidualRestriction();
Expand All @@ -158,7 +159,7 @@ public void testInvokeProcessElementTimeBounded() throws Exception {

@Test
public void testInvokeProcessElementTimeBoundedWithStartupDelay() throws Exception {
SplittableProcessElementInvoker<Void, String, OffsetRange, OffsetRangeTracker>.Result res =
SplittableProcessElementInvoker<Void, String, OffsetRange, Long>.Result res =
runTest(10000, Duration.standardSeconds(3), Integer.MAX_VALUE, Duration.millis(100));
assertFalse(res.getContinuation().shouldResume());
OffsetRange residualRange = res.getResidualRestriction();
Expand All @@ -170,15 +171,15 @@ public void testInvokeProcessElementTimeBoundedWithStartupDelay() throws Excepti

@Test
public void testInvokeProcessElementVoluntaryReturnStop() throws Exception {
SplittableProcessElementInvoker<Void, String, OffsetRange, OffsetRangeTracker>.Result res =
SplittableProcessElementInvoker<Void, String, OffsetRange, Long>.Result res =
runTest(5, Duration.ZERO, Integer.MAX_VALUE, Duration.millis(100));
assertFalse(res.getContinuation().shouldResume());
assertNull(res.getResidualRestriction());
}

@Test
public void testInvokeProcessElementVoluntaryReturnResume() throws Exception {
SplittableProcessElementInvoker<Void, String, OffsetRange, OffsetRangeTracker>.Result res =
SplittableProcessElementInvoker<Void, String, OffsetRange, Long>.Result res =
runTest(10, Duration.ZERO, 5, Duration.millis(100));
assertTrue(res.getContinuation().shouldResume());
assertEquals(new OffsetRange(5, 10), res.getResidualRestriction());
Expand All @@ -189,7 +190,7 @@ public void testInvokeProcessElementOutputDisallowedBeforeTryClaim() throws Exce
DoFn<Void, String> brokenFn =
new DoFn<Void, String>() {
@ProcessElement
public void process(ProcessContext c, OffsetRangeTracker tracker) {
public void process(ProcessContext c, RestrictionTracker<OffsetRange, Long> tracker) {
c.output("foo");
}

Expand All @@ -207,7 +208,7 @@ public void testInvokeProcessElementOutputDisallowedAfterFailedTryClaim() throws
DoFn<Void, String> brokenFn =
new DoFn<Void, String>() {
@ProcessElement
public void process(ProcessContext c, OffsetRangeTracker tracker) {
public void process(ProcessContext c, RestrictionTracker<OffsetRange, Long> tracker) {
assertFalse(tracker.tryClaim(6L));
c.output("foo");
}
Expand Down
Loading

0 comments on commit 8faffb2

Please sign in to comment.