Skip to content

Commit

Permalink
[BEAM-9344] Add support for bundle finalization to the Beam Java SDK.
Browse files Browse the repository at this point in the history
This change adds support for passing in the BundleFinalizer as an input parameter to start/process/finish and wires it through within the SDK harness.

The only supported combination that can execute this is Python ULR and Dataflow using UW which is why all the validates runner test configurations have added the test category to the exclusions list.
  • Loading branch information
lukecwik committed Feb 21, 2020
1 parent c6f812f commit fa5f428
Show file tree
Hide file tree
Showing 50 changed files with 1,543 additions and 525 deletions.
1 change: 1 addition & 0 deletions runners/apex/build.gradle
Expand Up @@ -100,6 +100,7 @@ task validatesRunnerBatch(type: Test) {
excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedPCollections'
// TODO[BEAM-8304]: Support multiple side inputs with different coders.
excludeCategories 'org.apache.beam.sdk.testing.UsesSideInputsWithDifferentCoders'
excludeCategories 'org.apache.beam.sdk.testing.UsesBundleFinalizer'
}

// apex runner is run in embedded mode. Increase default HeapSize
Expand Down
Expand Up @@ -288,6 +288,25 @@ public boolean isRequiresTimeSortedInput() {
return signature.processElement().requiresTimeSortedInput();
}

@Override
public boolean requestsFinalization() {
return (signature.startBundle() != null
&& signature
.startBundle()
.extraParameters()
.contains(Parameter.bundleFinalizer()))
|| (signature.processElement() != null
&& signature
.processElement()
.extraParameters()
.contains(Parameter.bundleFinalizer()))
|| (signature.finishBundle() != null
&& signature
.finishBundle()
.extraParameters()
.contains(Parameter.bundleFinalizer()));
}

@Override
public String translateRestrictionCoderId(SdkComponents newComponents) {
return restrictionCoderId;
Expand Down Expand Up @@ -763,6 +782,8 @@ Map<String, RunnerApi.StateSpec> translateStateSpecs(SdkComponents components)

boolean isRequiresTimeSortedInput();

boolean requestsFinalization();

String translateRestrictionCoderId(SdkComponents newComponents);
}

Expand All @@ -779,6 +800,7 @@ public static ParDoPayload payloadForParDoLike(ParDoLike parDo, SdkComponents co
.setSplittable(parDo.isSplittable())
.setRequiresTimeSortedInput(parDo.isRequiresTimeSortedInput())
.setRestrictionCoderId(parDo.translateRestrictionCoderId(components))
.setRequestsFinalization(parDo.requestsFinalization())
.build();
}
}
Expand Up @@ -416,6 +416,25 @@ public boolean isRequiresTimeSortedInput() {
return false;
}

@Override
public boolean requestsFinalization() {
return (signature.startBundle() != null
&& signature
.startBundle()
.extraParameters()
.contains(DoFnSignature.Parameter.bundleFinalizer()))
|| (signature.processElement() != null
&& signature
.processElement()
.extraParameters()
.contains(DoFnSignature.Parameter.bundleFinalizer()))
|| (signature.finishBundle() != null
&& signature
.finishBundle()
.extraParameters()
.contains(DoFnSignature.Parameter.bundleFinalizer()));
}

@Override
public String translateRestrictionCoderId(SdkComponents newComponents) {
return restrictionCoderId;
Expand Down
Expand Up @@ -31,6 +31,7 @@
import org.apache.beam.sdk.state.Timer;
import org.apache.beam.sdk.state.TimerMap;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.StartBundleContext;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Reshuffle;
Expand Down Expand Up @@ -132,10 +133,21 @@ public void setup() {
@StartBundle
public void startBundle(StartBundleContext c) {
invoker.invokeStartBundle(
new DoFn<InputT, OutputT>.StartBundleContext() {
new BaseArgumentProvider<InputT, OutputT>() {
@Override
public PipelineOptions getPipelineOptions() {
return c.getPipelineOptions();
public DoFn<InputT, OutputT>.StartBundleContext startBundleContext(
DoFn<InputT, OutputT> doFn) {
return new DoFn<InputT, OutputT>.StartBundleContext() {
@Override
public PipelineOptions getPipelineOptions() {
return c.getPipelineOptions();
}
};
}

@Override
public String getErrorContext() {
return "SplittableParDoNaiveBounded/StartBundle";
}
});
}
Expand Down Expand Up @@ -174,23 +186,35 @@ public String getErrorContext() {
@FinishBundle
public void finishBundle(FinishBundleContext c) {
invoker.invokeFinishBundle(
new DoFn<InputT, OutputT>.FinishBundleContext() {
@Override
public PipelineOptions getPipelineOptions() {
return c.getPipelineOptions();
}

new BaseArgumentProvider<InputT, OutputT>() {
@Override
public void output(@Nullable OutputT output, Instant timestamp, BoundedWindow window) {
throw new UnsupportedOperationException(
"Output from FinishBundle for SDF is not supported");
public DoFn<InputT, OutputT>.FinishBundleContext finishBundleContext(
DoFn<InputT, OutputT> doFn) {
return new DoFn<InputT, OutputT>.FinishBundleContext() {
@Override
public PipelineOptions getPipelineOptions() {
return c.getPipelineOptions();
}

@Override
public void output(
@Nullable OutputT output, Instant timestamp, BoundedWindow window) {
throw new UnsupportedOperationException(
"Output from FinishBundle for SDF is not supported");
}

@Override
public <T> void output(
TupleTag<T> tag, T output, Instant timestamp, BoundedWindow window) {
throw new UnsupportedOperationException(
"Output from FinishBundle for SDF is not supported");
}
};
}

@Override
public <T> void output(
TupleTag<T> tag, T output, Instant timestamp, BoundedWindow window) {
throw new UnsupportedOperationException(
"Output from FinishBundle for SDF is not supported");
public String getErrorContext() {
return "SplittableParDoNaiveBounded/StartBundle";
}
});
}
Expand Down Expand Up @@ -317,6 +341,11 @@ public <T> OutputReceiver<Row> getRowReceiver(TupleTag<T> tag) {
};
}

@Override
public BundleFinalizer bundleFinalizer() {
throw new UnsupportedOperationException();
}

@Override
public Object restriction() {
return tracker.currentRestriction();
Expand Down
Expand Up @@ -20,7 +20,9 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;

import java.util.HashMap;
import java.util.Map;
Expand All @@ -47,6 +49,8 @@
import org.apache.beam.sdk.transforms.Combine.BinaryCombineLongFn;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.BundleFinalizer;
import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.ParDo.MultiOutput;
Expand All @@ -64,12 +68,15 @@
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.junit.Test;
import org.junit.experimental.runners.Enclosed;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;

/** Tests for {@link ParDoTranslation}. */
@RunWith(Enclosed.class)
public class ParDoTranslationTest {

/** Tests for translating various {@link ParDo} transforms to/from {@link ParDoPayload} protos. */
Expand Down Expand Up @@ -125,6 +132,7 @@ public void testToProto() throws Exception {
for (PCollectionView<?> view : parDo.getSideInputs().values()) {
payload.getSideInputsOrThrow(view.getTagInternal().getId());
}
assertFalse(payload.getRequestsFinalization());
}

@Test
Expand Down Expand Up @@ -338,4 +346,73 @@ public int hashCode() {
return StateTimerDropElementsFn.class.hashCode();
}
}

@RunWith(JUnit4.class)
public static class BundleFinalizerTranslation {
private static class StartBundleDoFn extends DoFn<String, String> {
@StartBundle
public void startBundle(BundleFinalizer bundleFinalizer) {}

@ProcessElement
public void processElement() {}
}

private static class ProcessContextDoFn extends DoFn<String, String> {
@ProcessElement
public void processElement(BundleFinalizer finalizer) {}
}

private static class FinishBundleDoFn extends DoFn<String, String> {
@FinishBundle
public void finishBundle(BundleFinalizer bundleFinalizer) {}

@ProcessElement
public void processElement(BundleFinalizer finalizer) {}
}

@Test
public void testStartBundle() throws Exception {
SdkComponents sdkComponents = SdkComponents.create();
sdkComponents.registerEnvironment(Environments.createDockerEnvironment("java"));
ParDoPayload payload =
ParDoTranslation.translateParDo(
ParDo.of(new StartBundleDoFn())
.withOutputTags(new TupleTag<>(), TupleTagList.empty()),
DoFnSchemaInformation.create(),
TestPipeline.create(),
sdkComponents);

assertTrue(payload.getRequestsFinalization());
}

@Test
public void testProcessContext() throws Exception {
SdkComponents sdkComponents = SdkComponents.create();
sdkComponents.registerEnvironment(Environments.createDockerEnvironment("java"));
ParDoPayload payload =
ParDoTranslation.translateParDo(
ParDo.of(new ProcessContextDoFn())
.withOutputTags(new TupleTag<>(), TupleTagList.empty()),
DoFnSchemaInformation.create(),
TestPipeline.create(),
sdkComponents);

assertTrue(payload.getRequestsFinalization());
}

@Test
public void testFinishBundle() throws Exception {
SdkComponents sdkComponents = SdkComponents.create();
sdkComponents.registerEnvironment(Environments.createDockerEnvironment("java"));
ParDoPayload payload =
ParDoTranslation.translateParDo(
ParDo.of(new FinishBundleDoFn())
.withOutputTags(new TupleTag<>(), TupleTagList.empty()),
DoFnSchemaInformation.create(),
TestPipeline.create(),
sdkComponents);

assertTrue(payload.getRequestsFinalization());
}
}
}
Expand Up @@ -32,6 +32,7 @@
import org.apache.beam.sdk.state.Timer;
import org.apache.beam.sdk.state.TimerMap;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.BundleFinalizer;
import org.apache.beam.sdk.transforms.DoFn.FinishBundleContext;
import org.apache.beam.sdk.transforms.DoFn.MultiOutputReceiver;
import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
Expand Down Expand Up @@ -169,6 +170,12 @@ public MultiOutputReceiver taggedOutputReceiver(DoFn<InputT, OutputT> doFn) {
return DoFnOutputReceivers.windowedMultiReceiver(processContext, null);
}

@Override
public BundleFinalizer bundleFinalizer() {
throw new UnsupportedOperationException(
"Not supported in non-portable SplittableDoFn");
}

@Override
public RestrictionTracker<?, ?> restrictionTracker() {
return processContext.tracker;
Expand Down
Expand Up @@ -38,6 +38,7 @@
import org.apache.beam.sdk.state.TimerMap;
import org.apache.beam.sdk.state.TimerSpec;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.BundleFinalizer;
import org.apache.beam.sdk.transforms.DoFn.MultiOutputReceiver;
import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.DoFnOutputReceivers;
Expand Down Expand Up @@ -391,6 +392,12 @@ public TimerMap timerFamily(String tagId) {
throw new UnsupportedOperationException(
"Cannot access timer family outside of @ProcessElement and @OnTimer methods");
}

@Override
public BundleFinalizer bundleFinalizer() {
throw new UnsupportedOperationException(
"Bundle finalization is not supported in non-portable pipelines.");
}
}

/** B A concrete implementation of {@link DoFn.FinishBundleContext}. */
Expand Down Expand Up @@ -538,6 +545,12 @@ public void output(OutputT output, Instant timestamp, BoundedWindow window) {
public <T> void output(TupleTag<T> tag, T output, Instant timestamp, BoundedWindow window) {
outputWindowedValue(tag, WindowedValue.of(output, timestamp, window, PaneInfo.NO_FIRING));
}

@Override
public BundleFinalizer bundleFinalizer() {
throw new UnsupportedOperationException(
"Bundle finalization is not supported in non-portable pipelines.");
}
}

/**
Expand Down Expand Up @@ -791,6 +804,12 @@ public TimerMap timerFamily(String timerFamilyId) {
throw new RuntimeException(e);
}
}

@Override
public BundleFinalizer bundleFinalizer() {
throw new UnsupportedOperationException(
"Bundle finalization is not supported in non-portable pipelines.");
}
}

/**
Expand Down Expand Up @@ -1014,6 +1033,12 @@ public <T> void output(TupleTag<T> tag, T output) {
public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
outputWindowedValue(tag, WindowedValue.of(output, timestamp, window(), PaneInfo.NO_FIRING));
}

@Override
public BundleFinalizer bundleFinalizer() {
throw new UnsupportedOperationException(
"Bundle finalization is not supported in non-portable pipelines.");
}
}

private class TimerInternalsTimer implements Timer {
Expand Down

0 comments on commit fa5f428

Please sign in to comment.