Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PValue;
Expand Down Expand Up @@ -126,13 +125,12 @@ public void translate(ParDo.MultiOutput<InputT, OutputT> transform, TranslationC
}
}

static class SplittableProcessElementsTranslator<
InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker<RestrictionT, ?>>
implements TransformTranslator<ProcessElements<InputT, OutputT, RestrictionT, TrackerT>> {
static class SplittableProcessElementsTranslator<InputT, OutputT, RestrictionT, PositionT>
implements TransformTranslator<ProcessElements<InputT, OutputT, RestrictionT, PositionT>> {

@Override
public void translate(
ProcessElements<InputT, OutputT, RestrictionT, TrackerT> transform,
ProcessElements<InputT, OutputT, RestrictionT, PositionT> transform,
TranslationContext context) {

Map<TupleTag<?>, PValue> outputs = context.getOutputs();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,7 @@ public PCollectionTuple expand(PCollection<KV<byte[], KV<InputT, RestrictionT>>>
}
}

static class NaiveProcessFn<
InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker<RestrictionT, ?>>
static class NaiveProcessFn<InputT, OutputT, RestrictionT, PositionT>
extends DoFn<KV<InputT, RestrictionT>, OutputT> {
private final DoFn<InputT, OutputT> fn;

Expand Down Expand Up @@ -142,7 +141,7 @@ public void process(ProcessContext c, BoundedWindow w) {
InputT element = c.element().getKey();
RestrictionT restriction = c.element().getValue();
while (true) {
TrackerT tracker = invoker.invokeNewTracker(restriction);
RestrictionTracker<RestrictionT, PositionT> tracker = invoker.invokeNewTracker(restriction);
ProcessContinuation continuation =
invoker.invokeProcessElement(new NestedProcessContext<>(fn, c, element, w, tracker));
if (continuation.shouldResume()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,8 @@ private abstract static class SomeTracker extends RestrictionTracker<Void, Void>
private DoFn<KV<String, Integer>, Integer> splittableDoFn =
new DoFn<KV<String, Integer>, Integer>() {
@ProcessElement
public void processElement(ProcessContext context, SomeTracker tracker) {}
public void processElement(
ProcessContext context, RestrictionTracker<Void, Void> tracker) {}

@GetInitialRestriction
public Void getInitialRestriction(KV<String, Integer> element) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public SomeRestrictionTracker(SomeRestriction someRestriction) {
}

@Override
protected boolean tryClaimImpl(Void position) {
public boolean tryClaim(Void position) {
return false;
}

Expand All @@ -78,7 +78,8 @@ public void checkDone() {}

private static class BoundedFakeFn extends DoFn<Integer, String> {
@ProcessElement
public void processElement(ProcessContext context, SomeRestrictionTracker tracker) {}
public void processElement(
ProcessContext context, RestrictionTracker<SomeRestriction, Void> tracker) {}

@GetInitialRestriction
public SomeRestriction getInitialRestriction(Integer element) {
Expand All @@ -89,7 +90,7 @@ public SomeRestriction getInitialRestriction(Integer element) {
private static class UnboundedFakeFn extends DoFn<Integer, String> {
@ProcessElement
public ProcessContinuation processElement(
ProcessContext context, SomeRestrictionTracker tracker) {
ProcessContext context, RestrictionTracker<SomeRestriction, Void> tracker) {
return stop();
}

Expand Down
1 change: 1 addition & 0 deletions runners/core-java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ dependencies {
shadow project(path: ":beam-sdks-java-core", configuration: "shadow")
shadow project(path: ":beam-model-fn-execution", configuration: "shadow")
shadow project(path: ":beam-runners-core-construction-java", configuration: "shadow")
shadow project(path: ":beam-sdks-java-fn-execution", configuration: "shadow")
shadow library.java.joda_time
shadowTest project(path: ":beam-sdks-java-core", configuration: "shadowTest")
shadowTest library.java.junit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.beam.sdk.fn.splittabledofn.RestrictionTrackers;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.state.State;
import org.apache.beam.sdk.state.TimeDomain;
Expand Down Expand Up @@ -55,12 +56,8 @@
* outputs), or runs for the given duration.
*/
public class OutputAndTimeBoundedSplittableProcessElementInvoker<
InputT,
OutputT,
RestrictionT,
PositionT,
TrackerT extends RestrictionTracker<RestrictionT, PositionT>>
extends SplittableProcessElementInvoker<InputT, OutputT, RestrictionT, TrackerT> {
InputT, OutputT, RestrictionT, PositionT>
extends SplittableProcessElementInvoker<InputT, OutputT, RestrictionT, PositionT> {
private final DoFn<InputT, OutputT> fn;
private final PipelineOptions pipelineOptions;
private final OutputWindowedValue<OutputT> output;
Expand Down Expand Up @@ -106,9 +103,9 @@ public OutputAndTimeBoundedSplittableProcessElementInvoker(
public Result invokeProcessElement(
DoFnInvoker<InputT, OutputT> invoker,
final WindowedValue<InputT> element,
final TrackerT tracker) {
final RestrictionTracker<RestrictionT, PositionT> tracker) {
final ProcessContext processContext = new ProcessContext(element, tracker);
tracker.setClaimObserver(processContext);

DoFn.ProcessContinuation cont =
invoker.invokeProcessElement(
new DoFnInvoker.ArgumentProvider<InputT, OutputT>() {
Expand Down Expand Up @@ -156,7 +153,7 @@ public MultiOutputReceiver taggedOutputReceiver(DoFn<InputT, OutputT> doFn) {

@Override
public RestrictionTracker<?, ?> restrictionTracker() {
return tracker;
return processContext.tracker;
}

// Unsupported methods below.
Expand Down Expand Up @@ -226,7 +223,7 @@ public Timer timer(String timerId) {
// restriction that describes exactly the work that wasn't done in the current call.
if (processContext.numClaimedBlocks > 0) {
residual = checkNotNull(processContext.takeCheckpointNow());
tracker.checkDone();
processContext.tracker.checkDone();
} else {
// The call returned resume() without trying to claim any blocks, i.e. it is unaware
// of any work to be done at the moment, but more might emerge later. This is a valid
Expand Down Expand Up @@ -254,14 +251,14 @@ public Timer timer(String timerId) {
// ProcessElement call.
// In other words, if we took a checkpoint *after* ProcessElement completed (like in the
// branch above), it would have been equivalent to this one.
tracker.checkDone();
processContext.tracker.checkDone();
}
} else {
// The ProcessElement call returned stop() - that means the tracker's current restriction
// has been fully processed by the call. A checkpoint may or may not have been taken in
// "residual"; if it was, then we'll need to process it; if no, then we don't - nothing
// special needs to be done.
tracker.checkDone();
processContext.tracker.checkDone();
}
if (residual == null) {
// Can only be true if cont.shouldResume() is false and no checkpoint was taken.
Expand All @@ -273,9 +270,9 @@ public Timer timer(String timerId) {
}

private class ProcessContext extends DoFn<InputT, OutputT>.ProcessContext
implements RestrictionTracker.ClaimObserver<PositionT> {
implements RestrictionTrackers.ClaimObserver<PositionT> {
private final WindowedValue<InputT> element;
private final TrackerT tracker;
private final RestrictionTracker<RestrictionT, PositionT> tracker;
private int numClaimedBlocks;
private boolean hasClaimFailed;

Expand All @@ -293,10 +290,11 @@ private class ProcessContext extends DoFn<InputT, OutputT>.ProcessContext
private @Nullable Future<?> scheduledCheckpoint;
private @Nullable Instant lastReportedWatermark;

public ProcessContext(WindowedValue<InputT> element, TrackerT tracker) {
public ProcessContext(
WindowedValue<InputT> element, RestrictionTracker<RestrictionT, PositionT> tracker) {
fn.super();
this.element = element;
this.tracker = tracker;
this.tracker = RestrictionTrackers.observe(tracker, this);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,7 @@ public PCollectionTuple expand(PCollection<KV<byte[], KV<InputT, RestrictionT>>>
}

/** A primitive transform wrapping around {@link ProcessFn}. */
public static class ProcessElements<
InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker<RestrictionT, ?>>
public static class ProcessElements<InputT, OutputT, RestrictionT, PositionT>
extends PTransform<
PCollection<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>>, PCollectionTuple> {
private final ProcessKeyedElements<InputT, OutputT, RestrictionT> original;
Expand All @@ -163,7 +162,7 @@ public ProcessElements(ProcessKeyedElements<InputT, OutputT, RestrictionT> origi
this.original = original;
}

public ProcessFn<InputT, OutputT, RestrictionT, TrackerT> newProcessFn(
public ProcessFn<InputT, OutputT, RestrictionT, PositionT> newProcessFn(
DoFn<InputT, OutputT> fn) {
return new ProcessFn<>(
fn,
Expand Down Expand Up @@ -214,8 +213,7 @@ public PCollectionTuple expand(
* <p>See also: https://issues.apache.org/jira/browse/BEAM-1983
*/
@VisibleForTesting
public static class ProcessFn<
InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker<RestrictionT, ?>>
public static class ProcessFn<InputT, OutputT, RestrictionT, PositionT>
extends DoFn<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>, OutputT> {
/**
* The state cell containing a watermark hold for the output of this {@link DoFn}. The hold is
Expand Down Expand Up @@ -252,7 +250,7 @@ public static class ProcessFn<
private transient @Nullable StateInternalsFactory<byte[]> stateInternalsFactory;
private transient @Nullable TimerInternalsFactory<byte[]> timerInternalsFactory;
private transient @Nullable SplittableProcessElementInvoker<
InputT, OutputT, RestrictionT, TrackerT>
InputT, OutputT, RestrictionT, PositionT>
processElementInvoker;

private transient @Nullable DoFnInvoker<InputT, OutputT> invoker;
Expand Down Expand Up @@ -283,7 +281,7 @@ public void setTimerInternalsFactory(TimerInternalsFactory<byte[]> timerInternal
}

public void setProcessElementInvoker(
SplittableProcessElementInvoker<InputT, OutputT, RestrictionT, TrackerT> invoker) {
SplittableProcessElementInvoker<InputT, OutputT, RestrictionT, PositionT> invoker) {
this.processElementInvoker = invoker;
}

Expand Down Expand Up @@ -368,8 +366,9 @@ public void processElement(final ProcessContext c) {
elementAndRestriction = KV.of(elementState.read(), restrictionState.read());
}

final TrackerT tracker = invoker.invokeNewTracker(elementAndRestriction.getValue());
SplittableProcessElementInvoker<InputT, OutputT, RestrictionT, TrackerT>.Result result =
final RestrictionTracker<RestrictionT, PositionT> tracker =
invoker.invokeNewTracker(elementAndRestriction.getValue());
SplittableProcessElementInvoker<InputT, OutputT, RestrictionT, PositionT>.Result result =
processElementInvoker.invokeProcessElement(
invoker, elementAndRestriction.getKey(), tracker);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@
* A runner-specific hook for invoking a {@link DoFn.ProcessElement} method for a splittable {@link
* DoFn}, in particular, allowing the runner to access the {@link RestrictionTracker}.
*/
public abstract class SplittableProcessElementInvoker<
InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker<RestrictionT, ?>> {
public abstract class SplittableProcessElementInvoker<InputT, OutputT, RestrictionT, PositionT> {
/** Specifies how to resume a splittable {@link DoFn.ProcessElement} call. */
public class Result {
@Nullable private final RestrictionT residualRestriction;
Expand Down Expand Up @@ -77,5 +76,7 @@ public DoFn.ProcessContinuation getContinuation() {
* DoFn.ProcessContinuation}, and a future output watermark.
*/
public abstract Result invokeProcessElement(
DoFnInvoker<InputT, OutputT> invoker, WindowedValue<InputT> element, TrackerT tracker);
DoFnInvoker<InputT, OutputT> invoker,
WindowedValue<InputT> element,
RestrictionTracker<RestrictionT, PositionT> tracker);
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
Expand Down Expand Up @@ -66,7 +67,8 @@ private SomeFn(
}

@ProcessElement
public ProcessContinuation process(ProcessContext context, OffsetRangeTracker tracker) {
public ProcessContinuation process(
ProcessContext context, RestrictionTracker<OffsetRange, Long> tracker) {
Uninterruptibles.sleepUninterruptibly(
sleepBeforeFirstClaim.getMillis(), TimeUnit.MILLISECONDS);
for (long i = tracker.currentRestriction().getFrom(), numIterations = 1;
Expand All @@ -88,20 +90,19 @@ public OffsetRange getInitialRestriction(Void element) {
}
}

private SplittableProcessElementInvoker<Void, String, OffsetRange, OffsetRangeTracker>.Result
runTest(
int totalNumOutputs,
Duration sleepBeforeFirstClaim,
int numOutputsPerProcessCall,
Duration sleepBeforeEachOutput) {
private SplittableProcessElementInvoker<Void, String, OffsetRange, Long>.Result runTest(
int totalNumOutputs,
Duration sleepBeforeFirstClaim,
int numOutputsPerProcessCall,
Duration sleepBeforeEachOutput) {
SomeFn fn = new SomeFn(sleepBeforeFirstClaim, numOutputsPerProcessCall, sleepBeforeEachOutput);
OffsetRange initialRestriction = new OffsetRange(0, totalNumOutputs);
return runTest(fn, initialRestriction);
}

private SplittableProcessElementInvoker<Void, String, OffsetRange, OffsetRangeTracker>.Result
runTest(DoFn<Void, String> fn, OffsetRange initialRestriction) {
SplittableProcessElementInvoker<Void, String, OffsetRange, OffsetRangeTracker> invoker =
private SplittableProcessElementInvoker<Void, String, OffsetRange, Long>.Result runTest(
DoFn<Void, String> fn, OffsetRange initialRestriction) {
SplittableProcessElementInvoker<Void, String, OffsetRange, Long> invoker =
new OutputAndTimeBoundedSplittableProcessElementInvoker<>(
fn,
PipelineOptionsFactory.create(),
Expand Down Expand Up @@ -134,7 +135,7 @@ public <AdditionalOutputT> void outputWindowedValue(

@Test
public void testInvokeProcessElementOutputBounded() throws Exception {
SplittableProcessElementInvoker<Void, String, OffsetRange, OffsetRangeTracker>.Result res =
SplittableProcessElementInvoker<Void, String, OffsetRange, Long>.Result res =
runTest(10000, Duration.ZERO, Integer.MAX_VALUE, Duration.ZERO);
assertFalse(res.getContinuation().shouldResume());
OffsetRange residualRange = res.getResidualRestriction();
Expand All @@ -145,7 +146,7 @@ public void testInvokeProcessElementOutputBounded() throws Exception {

@Test
public void testInvokeProcessElementTimeBounded() throws Exception {
SplittableProcessElementInvoker<Void, String, OffsetRange, OffsetRangeTracker>.Result res =
SplittableProcessElementInvoker<Void, String, OffsetRange, Long>.Result res =
runTest(10000, Duration.ZERO, Integer.MAX_VALUE, Duration.millis(100));
assertFalse(res.getContinuation().shouldResume());
OffsetRange residualRange = res.getResidualRestriction();
Expand All @@ -158,7 +159,7 @@ public void testInvokeProcessElementTimeBounded() throws Exception {

@Test
public void testInvokeProcessElementTimeBoundedWithStartupDelay() throws Exception {
SplittableProcessElementInvoker<Void, String, OffsetRange, OffsetRangeTracker>.Result res =
SplittableProcessElementInvoker<Void, String, OffsetRange, Long>.Result res =
runTest(10000, Duration.standardSeconds(3), Integer.MAX_VALUE, Duration.millis(100));
assertFalse(res.getContinuation().shouldResume());
OffsetRange residualRange = res.getResidualRestriction();
Expand All @@ -170,15 +171,15 @@ public void testInvokeProcessElementTimeBoundedWithStartupDelay() throws Excepti

@Test
public void testInvokeProcessElementVoluntaryReturnStop() throws Exception {
SplittableProcessElementInvoker<Void, String, OffsetRange, OffsetRangeTracker>.Result res =
SplittableProcessElementInvoker<Void, String, OffsetRange, Long>.Result res =
runTest(5, Duration.ZERO, Integer.MAX_VALUE, Duration.millis(100));
assertFalse(res.getContinuation().shouldResume());
assertNull(res.getResidualRestriction());
}

@Test
public void testInvokeProcessElementVoluntaryReturnResume() throws Exception {
SplittableProcessElementInvoker<Void, String, OffsetRange, OffsetRangeTracker>.Result res =
SplittableProcessElementInvoker<Void, String, OffsetRange, Long>.Result res =
runTest(10, Duration.ZERO, 5, Duration.millis(100));
assertTrue(res.getContinuation().shouldResume());
assertEquals(new OffsetRange(5, 10), res.getResidualRestriction());
Expand All @@ -189,7 +190,7 @@ public void testInvokeProcessElementOutputDisallowedBeforeTryClaim() throws Exce
DoFn<Void, String> brokenFn =
new DoFn<Void, String>() {
@ProcessElement
public void process(ProcessContext c, OffsetRangeTracker tracker) {
public void process(ProcessContext c, RestrictionTracker<OffsetRange, Long> tracker) {
c.output("foo");
}

Expand All @@ -207,7 +208,7 @@ public void testInvokeProcessElementOutputDisallowedAfterFailedTryClaim() throws
DoFn<Void, String> brokenFn =
new DoFn<Void, String>() {
@ProcessElement
public void process(ProcessContext c, OffsetRangeTracker tracker) {
public void process(ProcessContext c, RestrictionTracker<OffsetRange, Long> tracker) {
assertFalse(tracker.tryClaim(6L));
c.output("foo");
}
Expand Down
Loading