From e36ca8ef4d47851c018a22eca5b89fd7e2fc0cc2 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Mon, 28 Nov 2016 10:09:39 -0800 Subject: [PATCH 1/2] Update Dataflow worker to beam-master-20161129 --- .../java/org/apache/beam/runners/dataflow/DataflowRunner.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index f1d41f235247..b629d65ed3ad 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -212,9 +212,9 @@ public class DataflowRunner extends PipelineRunner { // Default Docker container images that execute Dataflow worker harness, residing in Google // Container Registry, separately for Batch and Streaming. public static final String BATCH_WORKER_HARNESS_CONTAINER_IMAGE = - "dataflow.gcr.io/v1beta3/beam-java-batch:beam-master-20161031"; + "dataflow.gcr.io/v1beta3/beam-java-batch:beam-master-20161129"; public static final String STREAMING_WORKER_HARNESS_CONTAINER_IMAGE = - "dataflow.gcr.io/v1beta3/beam-java-streaming:beam-master-20161031"; + "dataflow.gcr.io/v1beta3/beam-java-streaming:beam-master-20161129"; // The limit of CreateJob request size. private static final int CREATE_JOB_REQUEST_LIMIT_BYTES = 10 * 1024 * 1024; From 460d2ecb45f9336357cb799e31e1e160dac45535 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Tue, 15 Nov 2016 22:27:35 -0800 Subject: [PATCH 2/2] Transmit new DoFn, not OldDoFn, in Dataflow translator --- .../beam/runners/dataflow/DataflowPipelineTranslator.java | 8 ++++---- .../org/apache/beam/runners/dataflow/DataflowRunner.java | 8 +++----- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java index 9acf0713afd4..12aa2c9fa729 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java @@ -69,9 +69,9 @@ import org.apache.beam.sdk.runners.TransformTreeNode; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.GroupByKey; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.View; @@ -964,7 +964,7 @@ private void translateMultiHelper( BiMap> outputMap = translateOutputs(context.getOutput(transform), context); translateFn( - transform.getFn(), + transform.getNewFn(), context.getInput(transform).getWindowingStrategy(), transform.getSideInputs(), context.getInput(transform).getCoder(), @@ -991,7 +991,7 @@ private void translateSingleHelper( translateInputs(context.getInput(transform), transform.getSideInputs(), context); long mainOutput = context.addOutput(context.getOutput(transform)); translateFn( - transform.getFn(), + transform.getNewFn(), context.getInput(transform).getWindowingStrategy(), transform.getSideInputs(), context.getInput(transform).getCoder(), @@ -1057,7 +1057,7 @@ private static void translateSideInputs( } private static void translateFn( - OldDoFn fn, + DoFn fn, WindowingStrategy windowingStrategy, Iterable> sideInputs, Coder inputCoder, diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index b629d65ed3ad..ca3f0ede8aad 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -128,7 +128,6 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.GroupByKey; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; @@ -2364,8 +2363,7 @@ public final PCollection apply(PBegin input) { * {@link PCollectionView} backend implementation. */ @Deprecated - public static class StreamingPCollectionViewWriterFn - extends OldDoFn, T> implements OldDoFn.RequiresWindowAccess { + public static class StreamingPCollectionViewWriterFn extends DoFn, T> { private final PCollectionView view; private final Coder dataCoder; @@ -2387,8 +2385,8 @@ public Coder getDataCoder() { return dataCoder; } - @Override - public void processElement(ProcessContext c) throws Exception { + @ProcessElement + public void processElement(ProcessContext c, BoundedWindow w) throws Exception { throw new UnsupportedOperationException( String.format( "%s is a marker class only and should never be executed.", getClass().getName()));