Skip to content

Commit

Permalink
[BEAM-9344] Add support for bundle finalization to the Beam Java SDK.
Browse files Browse the repository at this point in the history
This change adds support for passing in the BundleFinalizer as an input parameter to start/process/finish and wires it through within the SDK harness.

The only supported combination that can execute this is Python ULR and Dataflow using UW which is why all the validates runner test configurations have added the test category to the exclusions list.
  • Loading branch information
lukecwik committed Feb 20, 2020
1 parent b483ddb commit a5bf78f
Show file tree
Hide file tree
Showing 46 changed files with 1,404 additions and 524 deletions.
1 change: 1 addition & 0 deletions runners/apex/build.gradle
Expand Up @@ -100,6 +100,7 @@ task validatesRunnerBatch(type: Test) {
excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedPCollections'
// TODO[BEAM-8304]: Support multiple side inputs with different coders.
excludeCategories 'org.apache.beam.sdk.testing.UsesSideInputsWithDifferentCoders'
excludeCategories 'org.apache.beam.sdk.testing.UsesBundleFinalizer'
}

// apex runner is run in embedded mode. Increase default HeapSize
Expand Down
Expand Up @@ -31,6 +31,7 @@
import org.apache.beam.sdk.state.Timer;
import org.apache.beam.sdk.state.TimerMap;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.StartBundleContext;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Reshuffle;
Expand Down Expand Up @@ -132,10 +133,21 @@ public void setup() {
@StartBundle
public void startBundle(StartBundleContext c) {
invoker.invokeStartBundle(
new DoFn<InputT, OutputT>.StartBundleContext() {
new BaseArgumentProvider<InputT, OutputT>() {
@Override
public PipelineOptions getPipelineOptions() {
return c.getPipelineOptions();
public DoFn<InputT, OutputT>.StartBundleContext startBundleContext(
DoFn<InputT, OutputT> doFn) {
return new DoFn<InputT, OutputT>.StartBundleContext() {
@Override
public PipelineOptions getPipelineOptions() {
return c.getPipelineOptions();
}
};
}

@Override
public String getErrorContext() {
return "SplittableParDoNaiveBounded/StartBundle";
}
});
}
Expand Down Expand Up @@ -174,23 +186,35 @@ public String getErrorContext() {
@FinishBundle
public void finishBundle(FinishBundleContext c) {
invoker.invokeFinishBundle(
new DoFn<InputT, OutputT>.FinishBundleContext() {
@Override
public PipelineOptions getPipelineOptions() {
return c.getPipelineOptions();
}

new BaseArgumentProvider<InputT, OutputT>() {
@Override
public void output(@Nullable OutputT output, Instant timestamp, BoundedWindow window) {
throw new UnsupportedOperationException(
"Output from FinishBundle for SDF is not supported");
public DoFn<InputT, OutputT>.FinishBundleContext finishBundleContext(
DoFn<InputT, OutputT> doFn) {
return new DoFn<InputT, OutputT>.FinishBundleContext() {
@Override
public PipelineOptions getPipelineOptions() {
return c.getPipelineOptions();
}

@Override
public void output(
@Nullable OutputT output, Instant timestamp, BoundedWindow window) {
throw new UnsupportedOperationException(
"Output from FinishBundle for SDF is not supported");
}

@Override
public <T> void output(
TupleTag<T> tag, T output, Instant timestamp, BoundedWindow window) {
throw new UnsupportedOperationException(
"Output from FinishBundle for SDF is not supported");
}
};
}

@Override
public <T> void output(
TupleTag<T> tag, T output, Instant timestamp, BoundedWindow window) {
throw new UnsupportedOperationException(
"Output from FinishBundle for SDF is not supported");
public String getErrorContext() {
return "SplittableParDoNaiveBounded/StartBundle";
}
});
}
Expand Down Expand Up @@ -317,6 +341,11 @@ public <T> OutputReceiver<Row> getRowReceiver(TupleTag<T> tag) {
};
}

@Override
public BundleFinalizer bundleFinalizer() {
throw new UnsupportedOperationException();
}

@Override
public Object restriction() {
return tracker.currentRestriction();
Expand Down
Expand Up @@ -32,6 +32,7 @@
import org.apache.beam.sdk.state.Timer;
import org.apache.beam.sdk.state.TimerMap;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.BundleFinalizer;
import org.apache.beam.sdk.transforms.DoFn.FinishBundleContext;
import org.apache.beam.sdk.transforms.DoFn.MultiOutputReceiver;
import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
Expand Down Expand Up @@ -169,6 +170,12 @@ public MultiOutputReceiver taggedOutputReceiver(DoFn<InputT, OutputT> doFn) {
return DoFnOutputReceivers.windowedMultiReceiver(processContext, null);
}

@Override
public BundleFinalizer bundleFinalizer() {
throw new UnsupportedOperationException(
"Not supported in non-portable SplittableDoFn");
}

@Override
public RestrictionTracker<?, ?> restrictionTracker() {
return processContext.tracker;
Expand Down
Expand Up @@ -38,6 +38,7 @@
import org.apache.beam.sdk.state.TimerMap;
import org.apache.beam.sdk.state.TimerSpec;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.BundleFinalizer;
import org.apache.beam.sdk.transforms.DoFn.MultiOutputReceiver;
import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.DoFnOutputReceivers;
Expand Down Expand Up @@ -391,6 +392,12 @@ public TimerMap timerFamily(String tagId) {
throw new UnsupportedOperationException(
"Cannot access timer family outside of @ProcessElement and @OnTimer methods");
}

@Override
public BundleFinalizer bundleFinalizer() {
throw new UnsupportedOperationException(
"Bundle finalization is not supported in non-portable pipelines.");
}
}

/** B A concrete implementation of {@link DoFn.FinishBundleContext}. */
Expand Down Expand Up @@ -538,6 +545,12 @@ public void output(OutputT output, Instant timestamp, BoundedWindow window) {
public <T> void output(TupleTag<T> tag, T output, Instant timestamp, BoundedWindow window) {
outputWindowedValue(tag, WindowedValue.of(output, timestamp, window, PaneInfo.NO_FIRING));
}

@Override
public BundleFinalizer bundleFinalizer() {
throw new UnsupportedOperationException(
"Bundle finalization is not supported in non-portable pipelines.");
}
}

/**
Expand Down Expand Up @@ -791,6 +804,12 @@ public TimerMap timerFamily(String timerFamilyId) {
throw new RuntimeException(e);
}
}

@Override
public BundleFinalizer bundleFinalizer() {
throw new UnsupportedOperationException(
"Bundle finalization is not supported in non-portable pipelines.");
}
}

/**
Expand Down Expand Up @@ -1014,6 +1033,12 @@ public <T> void output(TupleTag<T> tag, T output) {
public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
outputWindowedValue(tag, WindowedValue.of(output, timestamp, window(), PaneInfo.NO_FIRING));
}

@Override
public BundleFinalizer bundleFinalizer() {
throw new UnsupportedOperationException(
"Bundle finalization is not supported in non-portable pipelines.");
}
}

private class TimerInternalsTimer implements Timer {
Expand Down
Expand Up @@ -36,6 +36,9 @@
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.state.WatermarkHoldState;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.FinishBundleContext;
import org.apache.beam.sdk.transforms.DoFn.ProcessContext;
import org.apache.beam.sdk.transforms.DoFn.StartBundleContext;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
Expand Down Expand Up @@ -414,39 +417,62 @@ public String getErrorContext() {
TimerInternals.TimerData.of(stateNamespace, wakeupTime, TimeDomain.PROCESSING_TIME));
}

private DoFn<InputT, OutputT>.StartBundleContext wrapContextAsStartBundle(
private DoFnInvoker.ArgumentProvider<InputT, OutputT> wrapContextAsStartBundle(
final StartBundleContext baseContext) {
return fn.new StartBundleContext() {
return new BaseArgumentProvider<InputT, OutputT>() {
@Override
public PipelineOptions getPipelineOptions() {
return baseContext.getPipelineOptions();
public DoFn<InputT, OutputT>.StartBundleContext startBundleContext(
DoFn<InputT, OutputT> doFn) {
return fn.new StartBundleContext() {
@Override
public PipelineOptions getPipelineOptions() {
return baseContext.getPipelineOptions();
}
};
}
};
}

private DoFn<InputT, OutputT>.FinishBundleContext wrapContextAsFinishBundle(
final FinishBundleContext baseContext) {
return fn.new FinishBundleContext() {
@Override
public void output(OutputT output, Instant timestamp, BoundedWindow window) {
throwUnsupportedOutput();
public String getErrorContext() {
return "SplittableParDoViaKeyedWorkItems/StartBundle";
}
};
}

private DoFnInvoker.ArgumentProvider<InputT, OutputT> wrapContextAsFinishBundle(
final FinishBundleContext baseContext) {
return new BaseArgumentProvider<InputT, OutputT>() {
@Override
public <T> void output(TupleTag<T> tag, T output, Instant timestamp, BoundedWindow window) {
throwUnsupportedOutput();
public DoFn<InputT, OutputT>.FinishBundleContext finishBundleContext(
DoFn<InputT, OutputT> doFn) {
return fn.new FinishBundleContext() {
@Override
public void output(OutputT output, Instant timestamp, BoundedWindow window) {
throwUnsupportedOutput();
}

@Override
public <T> void output(
TupleTag<T> tag, T output, Instant timestamp, BoundedWindow window) {
throwUnsupportedOutput();
}

@Override
public PipelineOptions getPipelineOptions() {
return baseContext.getPipelineOptions();
}

private void throwUnsupportedOutput() {
throw new UnsupportedOperationException(
String.format(
"Splittable DoFn can only output from @%s",
ProcessElement.class.getSimpleName()));
}
};
}

@Override
public PipelineOptions getPipelineOptions() {
return baseContext.getPipelineOptions();
}

private void throwUnsupportedOutput() {
throw new UnsupportedOperationException(
String.format(
"Splittable DoFn can only output from @%s",
ProcessElement.class.getSimpleName()));
public String getErrorContext() {
return "SplittableParDoViaKeyedWorkItems/FinishBundle";
}
};
}
Expand Down
2 changes: 2 additions & 0 deletions runners/direct-java/build.gradle
Expand Up @@ -110,6 +110,7 @@ task needsRunnerTests(type: Test) {
// MetricsPusher isn't implemented in direct runner
excludeCategories "org.apache.beam.sdk.testing.UsesMetricsPusher"
excludeCategories "org.apache.beam.sdk.testing.UsesCrossLanguageTransforms"
excludeCategories 'org.apache.beam.sdk.testing.UsesBundleFinalizer'
}
}

Expand All @@ -134,6 +135,7 @@ task validatesRunner(type: Test) {
excludeCategories "org.apache.beam.sdk.testing.LargeKeys\$Above100MB"
excludeCategories 'org.apache.beam.sdk.testing.UsesMetricsPusher'
excludeCategories "org.apache.beam.sdk.testing.UsesCrossLanguageTransforms"
excludeCategories 'org.apache.beam.sdk.testing.UsesBundleFinalizer'
}
}

Expand Down
1 change: 1 addition & 0 deletions runners/flink/flink_runner.gradle
Expand Up @@ -202,6 +202,7 @@ def createValidatesRunnerTask(Map m) {
excludeCategories 'org.apache.beam.sdk.testing.LargeKeys$Above100MB'
excludeCategories 'org.apache.beam.sdk.testing.UsesCommittedMetrics'
excludeCategories 'org.apache.beam.sdk.testing.UsesSystemMetrics'
excludeCategories 'org.apache.beam.sdk.testing.UsesBundleFinalizer'
if (config.streaming) {
excludeCategories 'org.apache.beam.sdk.testing.UsesTimerMap'
excludeCategories 'org.apache.beam.sdk.testing.UsesImpulse'
Expand Down
1 change: 1 addition & 0 deletions runners/flink/job-server/flink_job_server.gradle
Expand Up @@ -149,6 +149,7 @@ def portableValidatesRunnerTask(String name, Boolean streaming) {
excludeCategories 'org.apache.beam.sdk.testing.UsesSetState'
excludeCategories 'org.apache.beam.sdk.testing.UsesStrictTimerOrdering'
excludeCategories 'org.apache.beam.sdk.testing.UsesTimerMap'
excludeCategories 'org.apache.beam.sdk.testing.UsesBundleFinalizer'
if (streaming) {
excludeCategories 'org.apache.beam.sdk.testing.UsesTestStreamWithProcessingTime'
excludeCategories 'org.apache.beam.sdk.testing.UsesTestStreamWithMultipleStages'
Expand Down
1 change: 1 addition & 0 deletions runners/gearpump/build.gradle
Expand Up @@ -85,6 +85,7 @@ task validatesRunnerStreaming(type: Test) {
excludeCategories 'org.apache.beam.sdk.testing.UsesParDoLifecycle'
excludeCategories 'org.apache.beam.sdk.testing.UsesImpulse'
excludeCategories 'org.apache.beam.sdk.testing.UsesMetricsPusher'
excludeCategories 'org.apache.beam.sdk.testing.UsesBundleFinalizer'
}
}

Expand Down
1 change: 1 addition & 0 deletions runners/google-cloud-dataflow-java/build.gradle
Expand Up @@ -142,6 +142,7 @@ def commonExcludeCategories = [
'org.apache.beam.sdk.testing.UsesTestStream',
'org.apache.beam.sdk.testing.UsesParDoLifecycle',
'org.apache.beam.sdk.testing.UsesMetricsPusher',
'org.apache.beam.sdk.testing.UsesBundleFinalizer'
]

def fnApiWorkerExcludeCategories = [
Expand Down
2 changes: 2 additions & 0 deletions runners/jet/build.gradle
Expand Up @@ -74,6 +74,7 @@ task validatesRunnerBatch(type: Test) {
excludeCategories 'org.apache.beam.sdk.testing.UsesTimerMap'
excludeCategories 'org.apache.beam.sdk.testing.UsesImpulse' //impulse doesn't cooperate properly with Jet when multiple cluster members are used
exclude '**/SplittableDoFnTest.class' //Splittable DoFn functionality not yet in the runner
excludeCategories 'org.apache.beam.sdk.testing.UsesBundleFinalizer'
}

maxHeapSize = '4g'
Expand All @@ -97,6 +98,7 @@ task needsRunnerTests(type: Test) {
useJUnit {
includeCategories "org.apache.beam.sdk.testing.NeedsRunner"
excludeCategories "org.apache.beam.sdk.testing.LargeKeys\$Above100MB"
excludeCategories 'org.apache.beam.sdk.testing.UsesBundleFinalizer'
}
}

Expand Down
5 changes: 0 additions & 5 deletions runners/portability/java/build.gradle
Expand Up @@ -23,11 +23,6 @@ description = "Apache Beam :: Runners :: Portability :: Java"
ext.summary = """A Java implementation of the Beam Model which utilizes the portability
framework to execute user-definied functions."""


configurations {
validatesRunner
}

dependencies {
compile library.java.vendored_guava_26_0_jre
compile library.java.hamcrest_library
Expand Down
1 change: 1 addition & 0 deletions runners/samza/build.gradle
Expand Up @@ -87,6 +87,7 @@ task validatesRunner(type: Test) {
excludeCategories 'org.apache.beam.sdk.testing.UsesParDoLifecycle'
excludeCategories 'org.apache.beam.sdk.testing.UsesStrictTimerOrdering'
excludeCategories 'org.apache.beam.sdk.testing.UsesTimerMap'
excludeCategories 'org.apache.beam.sdk.testing.UsesBundleFinalizer'
}
}

Expand Down
2 changes: 2 additions & 0 deletions runners/spark/build.gradle
Expand Up @@ -148,6 +148,7 @@ task validatesRunnerBatch(type: Test) {
// Portability
excludeCategories 'org.apache.beam.sdk.testing.UsesImpulse'
excludeCategories 'org.apache.beam.sdk.testing.UsesCrossLanguageTransforms'
excludeCategories 'org.apache.beam.sdk.testing.UsesBundleFinalizer'
}
jvmArgs '-Xmx3g'
}
Expand Down Expand Up @@ -214,6 +215,7 @@ task validatesStructuredStreamingRunnerBatch(type: Test) {
excludeCategories 'org.apache.beam.sdk.testing.UsesImpulse'
excludeCategories 'org.apache.beam.sdk.testing.UsesCrossLanguageTransforms'
excludeCategories 'org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders'
excludeCategories 'org.apache.beam.sdk.testing.UsesBundleFinalizer'
}
filter {
// Combine with context not implemented
Expand Down
1 change: 1 addition & 0 deletions runners/spark/job-server/build.gradle
Expand Up @@ -113,6 +113,7 @@ def portableValidatesRunnerTask(String name) {
excludeCategories 'org.apache.beam.sdk.testing.UsesSplittableParDoWithWindowedSideInputs'
excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo'
excludeCategories 'org.apache.beam.sdk.testing.UsesStrictTimerOrdering'
excludeCategories 'org.apache.beam.sdk.testing.UsesBundleFinalizer'
},
)
}
Expand Down

0 comments on commit a5bf78f

Please sign in to comment.