From f04537ccbc2897ea4337941d5ca8121432daef43 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Wed, 21 Dec 2016 14:34:27 -0800 Subject: [PATCH 1/3] Add explicit translation builder for a Step to in Dataflow translator Previously, there was always a "current" step that was the most recent step created. This makes it cumbersome or impossible to do things like translate one primitive transform into a small subgraph of steps. Thus we added hacks like CreatePCollectionView which are not actually part of the model at all - in fact, we should be able to add the needed CollectionToSingleton steps simply by looking at the side inputs of a ParDo node. --- .../dataflow/DataflowPipelineTranslator.java | 313 +++++++++--------- .../beam/runners/dataflow/DataflowRunner.java | 60 ++-- .../dataflow/internal/ReadTranslator.java | 9 +- .../runners/dataflow/DataflowRunnerTest.java | 5 +- 4 files changed, 196 insertions(+), 191 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java index 8d2b0763be10b..2385fa1e9f075 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java @@ -213,14 +213,12 @@ TransformTranslator getTransformTranslator(Class transfo } /** - * A {@link TransformTranslator} knows how to translate - * a particular subclass of {@link PTransform} for the - * Cloud Dataflow service. It does so by - * mutating the {@link TranslationContext}. + * A {@link TransformTranslator} knows how to translate a particular subclass of {@link + * PTransform} for the Cloud Dataflow service. It does so by mutating the {@link + * TranslationContext}. */ public interface TransformTranslator { - void translate(TransformT transform, - TranslationContext context); + void translate(TransformT transform, TranslationContext context); } /** @@ -252,10 +250,8 @@ public interface TranslationContext { /** * Adds a step to the Dataflow workflow for the given transform, with * the given Dataflow step type. - * This step becomes "current" for the purpose of {@link #addInput} and - * {@link #addOutput}. */ - void addStep(PTransform transform, String type); + StepTranslationContext addStep(PTransform transform, String type); /** * Adds a pre-defined step to the Dataflow workflow. The given PTransform should be @@ -264,8 +260,14 @@ public interface TranslationContext { *

This is a low-level operation, when using this method it is up to * the caller to ensure that names do not collide. */ - void addStep(PTransform transform, Step step); + Step addStep(PTransform transform, Step step); + /** + * Encode a PValue reference as an output reference. + */ + OutputReference asOutputReference(PValue value); + } + public interface StepTranslationContext { /** * Sets the encoding for the current Dataflow step. */ @@ -330,12 +332,7 @@ public interface TranslationContext { * output encoding. Returns a pipeline level unique id. */ long addCollectionToSingletonOutput(PValue inputValue, - PValue outputValue); - - /** - * Encode a PValue reference as an output reference. - */ - OutputReference asOutputReference(PValue value); + PValue outputValue); } @@ -343,6 +340,8 @@ long addCollectionToSingletonOutput(PValue inputValue, /** * Translates a Pipeline into the Dataflow representation. + * + *

For internal use only. */ class Translator extends PipelineVisitor.Defaults implements TranslationContext { /** @@ -367,11 +366,6 @@ public Long get() { /** The Cloud Dataflow Job representation. */ private final Job job = new Job(); - /** - * Translator is stateful, as addProperty calls refer to the current step. - */ - private Step currentStep; - /** * A Map from AppliedPTransform to their unique Dataflow step names. */ @@ -546,7 +540,7 @@ public void visitValue(PValue value, TransformHierarchy.Node producer) { } @Override - public void addStep(PTransform transform, String type) { + public StepTranslator addStep(PTransform transform, String type) { String stepName = genStepName(); if (stepNames.put(getCurrentTransform(transform), stepName) != null) { throw new IllegalArgumentException( @@ -559,16 +553,19 @@ public void addStep(PTransform transform, String type) { job.setSteps(steps); } - currentStep = new Step(); - currentStep.setName(stepName); - currentStep.setKind(type); - steps.add(currentStep); - addInput(PropertyNames.USER_NAME, getFullName(transform)); - addDisplayData(stepName, transform); + Step step = new Step(); + step.setName(stepName); + step.setKind(type); + steps.add(step); + + StepTranslator stepContext = new StepTranslator(this, step); + stepContext.addInput(PropertyNames.USER_NAME, getFullName(transform)); + stepContext.addDisplayData(step, stepName, transform); + return stepContext; } @Override - public void addStep(PTransform transform, Step original) { + public Step addStep(PTransform transform, Step original) { Step step = original.clone(); String stepName = step.getName(); if (stepNames.put(getCurrentTransform(transform), stepName) != null) { @@ -605,8 +602,59 @@ public void addStep(PTransform transform, Step original) { steps = new LinkedList<>(); job.setSteps(steps); } - currentStep = step; steps.add(step); + return step; + } + + @Override + public OutputReference asOutputReference(PValue value) { + AppliedPTransform transform = + value.getProducingTransformInternal(); + String stepName = stepNames.get(transform); + if (stepName == null) { + throw new IllegalArgumentException(transform + " doesn't have a name specified"); + } + + String outputName = outputNames.get(value); + if (outputName == null) { + throw new IllegalArgumentException( + "output " + value + " doesn't have a name specified"); + } + + return new OutputReference(stepName, outputName); + } + + /** + * Returns a fresh Dataflow step name. + */ + private String genStepName() { + return "s" + (stepNames.size() + 1); + } + + /** + * Records the name of the given output PValue, + * within its producing transform. + */ + private void registerOutputName(POutput value, String name) { + if (outputNames.put(value, name) != null) { + throw new IllegalArgumentException( + "output " + value + " already has a name specified"); + } + } + } + + static class StepTranslator implements StepTranslationContext { + + private final Translator translator; + private final Step step; + + private StepTranslator(Translator translator, Step step) { + this.translator = translator; + this.step = step; + } + + private Map getProperties() { + return DataflowPipelineTranslator.getProperties(step); } @Override @@ -643,7 +691,7 @@ public void addInput(String name, List> elements) @Override public void addInput(String name, PInput value) { if (value instanceof PValue) { - addInput(name, asOutputReference((PValue) value)); + addInput(name, translator.asOutputReference((PValue) value)); } else { throw new IllegalStateException("Input must be a PValue"); } @@ -685,10 +733,10 @@ public long addValueOnlyOutput(PValue value) { } @Override - public long addCollectionToSingletonOutput(PValue inputValue, - PValue outputValue) { + public long addCollectionToSingletonOutput( + PValue inputValue, PValue outputValue) { Coder inputValueCoder = - checkNotNull(outputCoders.get(inputValue)); + checkNotNull(translator.outputCoders.get(inputValue)); // The inputValueCoder for the input PCollection should be some // WindowedValueCoder of the input PCollection's element // coder. @@ -707,8 +755,8 @@ public long addCollectionToSingletonOutput(PValue inputValue, * with the given {@code Coder} (if not {@code null}). */ private long addOutput(PValue value, Coder valueCoder) { - long id = idGenerator.get(); - registerOutputName(value, Long.toString(id)); + long id = translator.idGenerator.get(); + translator.registerOutputName(value, Long.toString(id)); Map properties = getProperties(); @Nullable List> outputInfoList = null; @@ -728,7 +776,7 @@ private long addOutput(PValue value, Coder valueCoder) { addString(outputInfo, PropertyNames.OUTPUT_NAME, Long.toString(id)); addString(outputInfo, PropertyNames.USER_NAME, value.getName()); if (value instanceof PCollection - && runner.doesPCollectionRequireIndexedFormat((PCollection) value)) { + && translator.runner.doesPCollectionRequireIndexedFormat((PCollection) value)) { addBoolean(outputInfo, PropertyNames.USE_INDEXED_FORMAT, true); } if (valueCoder != null) { @@ -736,63 +784,19 @@ private long addOutput(PValue value, Coder valueCoder) { // failures as early as possible. CloudObject encoding = SerializableUtils.ensureSerializable(valueCoder); addObject(outputInfo, PropertyNames.ENCODING, encoding); - outputCoders.put(value, valueCoder); + translator.outputCoders.put(value, valueCoder); } outputInfoList.add(outputInfo); return id; } - private void addDisplayData(String stepName, HasDisplayData hasDisplayData) { + private void addDisplayData(Step step, String stepName, HasDisplayData hasDisplayData) { DisplayData displayData = DisplayData.from(hasDisplayData); List> list = MAPPER.convertValue(displayData, List.class); addList(getProperties(), PropertyNames.DISPLAY_DATA, list); } - @Override - public OutputReference asOutputReference(PValue value) { - AppliedPTransform transform = - value.getProducingTransformInternal(); - String stepName = stepNames.get(transform); - if (stepName == null) { - throw new IllegalArgumentException(transform + " doesn't have a name specified"); - } - - String outputName = outputNames.get(value); - if (outputName == null) { - throw new IllegalArgumentException( - "output " + value + " doesn't have a name specified"); - } - - return new OutputReference(stepName, outputName); - } - - private Map getProperties() { - Map properties = currentStep.getProperties(); - if (properties == null) { - properties = new HashMap<>(); - currentStep.setProperties(properties); - } - return properties; - } - - /** - * Returns a fresh Dataflow step name. - */ - private String genStepName() { - return "s" + (stepNames.size() + 1); - } - - /** - * Records the name of the given output PValue, - * within its producing transform. - */ - private void registerOutputName(POutput value, String name) { - if (outputNames.put(value, name) != null) { - throw new IllegalArgumentException( - "output " + value + " already has a name specified"); - } - } } ///////////////////////////////////////////////////////////////////////////// @@ -802,6 +806,14 @@ public String toString() { return "DataflowPipelineTranslator#" + hashCode(); } + private static Map getProperties(Step step) { + Map properties = step.getProperties(); + if (properties == null) { + properties = new HashMap<>(); + step.setProperties(properties); + } + return properties; + } /////////////////////////////////////////////////////////////////////////// @@ -810,20 +822,17 @@ public String toString() { View.CreatePCollectionView.class, new TransformTranslator() { @Override - public void translate( - View.CreatePCollectionView transform, - TranslationContext context) { + public void translate(View.CreatePCollectionView transform, TranslationContext context) { translateTyped(transform, context); } private void translateTyped( - View.CreatePCollectionView transform, - TranslationContext context) { - context.addStep(transform, "CollectionToSingleton"); - context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform)); - context.addCollectionToSingletonOutput( - context.getInput(transform), - context.getOutput(transform)); + View.CreatePCollectionView transform, TranslationContext context) { + StepTranslationContext stepContext = + context.addStep(transform, "CollectionToSingleton"); + stepContext.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform)); + stepContext.addCollectionToSingletonOutput( + context.getInput(transform), context.getOutput(transform)); } }); @@ -839,21 +848,21 @@ public void translate( private void translateHelper( final Combine.GroupedValues transform, - DataflowPipelineTranslator.TranslationContext context) { - context.addStep(transform, "CombineValues"); - translateInputs(context.getInput(transform), transform.getSideInputs(), context); + TranslationContext context) { + StepTranslationContext stepContext = context.addStep(transform, "CombineValues"); + translateInputs( + stepContext, context.getInput(transform), transform.getSideInputs(), context); AppliedCombineFn fn = transform.getAppliedFn( context.getInput(transform).getPipeline().getCoderRegistry(), - context.getInput(transform).getCoder(), - context.getInput(transform).getWindowingStrategy()); + context.getInput(transform).getCoder(), + context.getInput(transform).getWindowingStrategy()); - context.addEncodingInput(fn.getAccumulatorCoder()); - context.addInput( - PropertyNames.SERIALIZED_FN, - byteArrayToJsonString(serializeToByteArray(fn))); - context.addOutput(context.getOutput(transform)); + stepContext.addEncodingInput(fn.getAccumulatorCoder()); + stepContext.addInput( + PropertyNames.SERIALIZED_FN, byteArrayToJsonString(serializeToByteArray(fn))); + stepContext.addOutput(context.getOutput(transform)); } }); @@ -870,14 +879,14 @@ public void translate( private void flattenHelper( Flatten.FlattenPCollectionList transform, TranslationContext context) { - context.addStep(transform, "Flatten"); + StepTranslationContext stepContext = context.addStep(transform, "Flatten"); List inputs = new LinkedList<>(); for (PCollection input : context.getInput(transform).getAll()) { inputs.add(context.asOutputReference(input)); } - context.addInput(PropertyNames.INPUTS, inputs); - context.addOutput(context.getOutput(transform)); + stepContext.addInput(PropertyNames.INPUTS, inputs); + stepContext.addOutput(context.getOutput(transform)); } }); @@ -885,23 +894,19 @@ private void flattenHelper( GroupByKeyAndSortValuesOnly.class, new TransformTranslator() { @Override - public void translate( - GroupByKeyAndSortValuesOnly transform, - TranslationContext context) { + public void translate(GroupByKeyAndSortValuesOnly transform, TranslationContext context) { groupByKeyAndSortValuesHelper(transform, context); } private void groupByKeyAndSortValuesHelper( - GroupByKeyAndSortValuesOnly transform, - TranslationContext context) { - context.addStep(transform, "GroupByKey"); - context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform)); - context.addOutput(context.getOutput(transform)); - context.addInput(PropertyNames.SORT_VALUES, true); + GroupByKeyAndSortValuesOnly transform, TranslationContext context) { + StepTranslationContext stepContext = context.addStep(transform, "GroupByKey"); + stepContext.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform)); + stepContext.addOutput(context.getOutput(transform)); + stepContext.addInput(PropertyNames.SORT_VALUES, true); // TODO: Add support for combiner lifting once the need arises. - context.addInput( - PropertyNames.DISALLOW_COMBINER_LIFTING, true); + stepContext.addInput(PropertyNames.DISALLOW_COMBINER_LIFTING, true); } }); @@ -918,9 +923,9 @@ public void translate( private void groupByKeyHelper( GroupByKey transform, TranslationContext context) { - context.addStep(transform, "GroupByKey"); - context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform)); - context.addOutput(context.getOutput(transform)); + StepTranslationContext stepContext = context.addStep(transform, "GroupByKey"); + stepContext.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform)); + stepContext.addOutput(context.getOutput(transform)); WindowingStrategy windowingStrategy = context.getInput(transform).getWindowingStrategy(); @@ -931,12 +936,12 @@ private void groupByKeyHelper( || (isStreaming && !transform.fewKeys()) // TODO: Allow combiner lifting on the non-default trigger, as appropriate. || !(windowingStrategy.getTrigger() instanceof DefaultTrigger); - context.addInput( + stepContext.addInput( PropertyNames.DISALLOW_COMBINER_LIFTING, disallowCombinerLifting); - context.addInput( + stepContext.addInput( PropertyNames.SERIALIZED_FN, byteArrayToJsonString(serializeToByteArray(windowingStrategy))); - context.addInput( + stepContext.addInput( PropertyNames.IS_MERGING_WINDOW_FN, !windowingStrategy.getWindowFn().isNonMerging()); } @@ -946,22 +951,21 @@ private void groupByKeyHelper( ParDo.BoundMulti.class, new TransformTranslator() { @Override - public void translate( - ParDo.BoundMulti transform, - TranslationContext context) { + public void translate(ParDo.BoundMulti transform, TranslationContext context) { translateMultiHelper(transform, context); } private void translateMultiHelper( - ParDo.BoundMulti transform, - TranslationContext context) { - rejectStatefulDoFn(transform.getFn()); + ParDo.BoundMulti transform, TranslationContext context) { + DataflowPipelineTranslator.rejectStatefulDoFn(transform.getFn()); - context.addStep(transform, "ParallelDo"); - translateInputs(context.getInput(transform), transform.getSideInputs(), context); + StepTranslationContext stepContext = context.addStep(transform, "ParallelDo"); + translateInputs( + stepContext, context.getInput(transform), transform.getSideInputs(), context); BiMap> outputMap = - translateOutputs(context.getOutput(transform), context); + translateOutputs(context.getOutput(transform), stepContext); translateFn( + stepContext, transform.getFn(), context.getInput(transform).getWindowingStrategy(), transform.getSideInputs(), @@ -976,30 +980,28 @@ private void translateMultiHelper( ParDo.Bound.class, new TransformTranslator() { @Override - public void translate( - ParDo.Bound transform, - TranslationContext context) { + public void translate(ParDo.Bound transform, TranslationContext context) { translateSingleHelper(transform, context); } private void translateSingleHelper( - ParDo.Bound transform, - TranslationContext context) { + ParDo.Bound transform, TranslationContext context) { rejectStatefulDoFn(transform.getFn()); - context.addStep(transform, "ParallelDo"); - translateInputs(context.getInput(transform), transform.getSideInputs(), context); - long mainOutput = context.addOutput(context.getOutput(transform)); + StepTranslationContext stepContext = context.addStep(transform, "ParallelDo"); + translateInputs( + stepContext, context.getInput(transform), transform.getSideInputs(), context); + long mainOutput = stepContext.addOutput(context.getOutput(transform)); translateFn( + stepContext, transform.getFn(), context.getInput(transform).getWindowingStrategy(), transform.getSideInputs(), context.getInput(transform).getCoder(), context, mainOutput, - ImmutableMap.>of(mainOutput, - new TupleTag<>(PropertyNames.OUTPUT))); - + ImmutableMap.>of( + mainOutput, new TupleTag<>(PropertyNames.OUTPUT))); } }); @@ -1014,16 +1016,16 @@ public void translate( private void translateHelper( Window.Bound transform, TranslationContext context) { - context.addStep(transform, "Bucket"); - context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform)); - context.addOutput(context.getOutput(transform)); + StepTranslationContext stepContext = context.addStep(transform, "Bucket"); + stepContext.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform)); + stepContext.addOutput(context.getOutput(transform)); WindowingStrategy strategy = context.getOutput(transform).getWindowingStrategy(); byte[] serializedBytes = serializeToByteArray(strategy); String serializedJson = byteArrayToJsonString(serializedBytes); assert Arrays.equals(serializedBytes, jsonStringToByteArray(serializedJson)); - context.addInput(PropertyNames.SERIALIZED_FN, serializedJson); + stepContext.addInput(PropertyNames.SERIALIZED_FN, serializedJson); } }); @@ -1046,15 +1048,17 @@ private static void rejectStatefulDoFn(DoFn doFn) { } private static void translateInputs( + StepTranslationContext stepContext, PCollection input, List> sideInputs, TranslationContext context) { - context.addInput(PropertyNames.PARALLEL_INPUT, input); - translateSideInputs(sideInputs, context); + stepContext.addInput(PropertyNames.PARALLEL_INPUT, input); + translateSideInputs(stepContext, sideInputs, context); } // Used for ParDo private static void translateSideInputs( + StepTranslationContext stepContext, List> sideInputs, TranslationContext context) { Map nonParInputs = new HashMap<>(); @@ -1065,10 +1069,11 @@ private static void translateSideInputs( context.asOutputReference(view)); } - context.addInput(PropertyNames.NON_PARALLEL_INPUTS, nonParInputs); + stepContext.addInput(PropertyNames.NON_PARALLEL_INPUTS, nonParInputs); } private static void translateFn( + StepTranslationContext stepContext, DoFn fn, WindowingStrategy windowingStrategy, Iterable> sideInputs, @@ -1076,8 +1081,8 @@ private static void translateFn( TranslationContext context, long mainOutput, Map> outputMap) { - context.addInput(PropertyNames.USER_FN, fn.getClass().getName()); - context.addInput( + stepContext.addInput(PropertyNames.USER_FN, fn.getClass().getName()); + stepContext.addInput( PropertyNames.SERIALIZED_FN, byteArrayToJsonString( serializeToByteArray( @@ -1087,13 +1092,13 @@ private static void translateFn( private static BiMap> translateOutputs( PCollectionTuple outputs, - TranslationContext context) { + StepTranslationContext stepContext) { ImmutableBiMap.Builder> mapBuilder = ImmutableBiMap.builder(); for (Map.Entry, PCollection> entry : outputs.getAll().entrySet()) { TupleTag tag = entry.getKey(); PCollection output = entry.getValue(); - mapBuilder.put(context.addOutput(output), tag); + mapBuilder.put(stepContext.addOutput(output), tag); } return mapBuilder.build(); } diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 03e5dfcbe9d17..d2c1e668759d3 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -33,6 +33,7 @@ import com.google.api.services.dataflow.model.DataflowPackage; import com.google.api.services.dataflow.model.Job; import com.google.api.services.dataflow.model.ListJobsResponse; +import com.google.api.services.dataflow.model.Step; import com.google.api.services.dataflow.model.WorkerPool; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; @@ -72,6 +73,7 @@ import java.util.TreeSet; import javax.annotation.Nullable; import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.JobSpecification; +import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.StepTranslationContext; import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.TransformTranslator; import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.TranslationContext; import org.apache.beam.runners.dataflow.internal.AssignWindows; @@ -2116,50 +2118,46 @@ protected String getKindString() { } } - /** - * Rewrite {@link StreamingPubsubIORead} to the appropriate internal node. - */ - private static class StreamingPubsubIOReadTranslator implements - TransformTranslator> { + /** Rewrite {@link StreamingPubsubIORead} to the appropriate internal node. */ + private static class StreamingPubsubIOReadTranslator + implements TransformTranslator> { @Override - public void translate( - StreamingPubsubIORead transform, - TranslationContext context) { - checkArgument(context.getPipelineOptions().isStreaming(), - "StreamingPubsubIORead is only for streaming pipelines."); + public void translate(StreamingPubsubIORead transform, TranslationContext context) { + checkArgument( + context.getPipelineOptions().isStreaming(), + "StreamingPubsubIORead is only for streaming pipelines."); PubsubUnboundedSource overriddenTransform = transform.getOverriddenTransform(); - context.addStep(transform, "ParallelRead"); - context.addInput(PropertyNames.FORMAT, "pubsub"); + StepTranslationContext stepContext = context.addStep(transform, "ParallelRead"); + stepContext.addInput(PropertyNames.FORMAT, "pubsub"); if (overriddenTransform.getTopicProvider() != null) { if (overriddenTransform.getTopicProvider().isAccessible()) { - context.addInput( + stepContext.addInput( PropertyNames.PUBSUB_TOPIC, overriddenTransform.getTopic().getV1Beta1Path()); } else { - context.addInput( + stepContext.addInput( PropertyNames.PUBSUB_TOPIC_OVERRIDE, ((NestedValueProvider) overriddenTransform.getTopicProvider()).propertyName()); } } if (overriddenTransform.getSubscriptionProvider() != null) { if (overriddenTransform.getSubscriptionProvider().isAccessible()) { - context.addInput( + stepContext.addInput( PropertyNames.PUBSUB_SUBSCRIPTION, overriddenTransform.getSubscription().getV1Beta1Path()); } else { - context.addInput( + stepContext.addInput( PropertyNames.PUBSUB_SUBSCRIPTION_OVERRIDE, - ((NestedValueProvider) overriddenTransform.getSubscriptionProvider()) - .propertyName()); + ((NestedValueProvider) overriddenTransform.getSubscriptionProvider()).propertyName()); } } if (overriddenTransform.getTimestampLabel() != null) { - context.addInput(PropertyNames.PUBSUB_TIMESTAMP_LABEL, - overriddenTransform.getTimestampLabel()); + stepContext.addInput( + PropertyNames.PUBSUB_TIMESTAMP_LABEL, overriddenTransform.getTimestampLabel()); } if (overriddenTransform.getIdLabel() != null) { - context.addInput(PropertyNames.PUBSUB_ID_LABEL, overriddenTransform.getIdLabel()); + stepContext.addInput(PropertyNames.PUBSUB_ID_LABEL, overriddenTransform.getIdLabel()); } - context.addValueOnlyOutput(context.getOutput(transform)); + stepContext.addValueOnlyOutput(context.getOutput(transform)); } } @@ -2211,26 +2209,26 @@ public void translate( checkArgument(context.getPipelineOptions().isStreaming(), "StreamingPubsubIOWrite is only for streaming pipelines."); PubsubUnboundedSink overriddenTransform = transform.getOverriddenTransform(); - context.addStep(transform, "ParallelWrite"); - context.addInput(PropertyNames.FORMAT, "pubsub"); + StepTranslationContext stepContext = context.addStep(transform, "ParallelWrite"); + stepContext.addInput(PropertyNames.FORMAT, "pubsub"); if (overriddenTransform.getTopicProvider().isAccessible()) { - context.addInput( + stepContext.addInput( PropertyNames.PUBSUB_TOPIC, overriddenTransform.getTopic().getV1Beta1Path()); } else { - context.addInput( + stepContext.addInput( PropertyNames.PUBSUB_TOPIC_OVERRIDE, ((NestedValueProvider) overriddenTransform.getTopicProvider()).propertyName()); } if (overriddenTransform.getTimestampLabel() != null) { - context.addInput(PropertyNames.PUBSUB_TIMESTAMP_LABEL, - overriddenTransform.getTimestampLabel()); + stepContext.addInput( + PropertyNames.PUBSUB_TIMESTAMP_LABEL, overriddenTransform.getTimestampLabel()); } if (overriddenTransform.getIdLabel() != null) { - context.addInput(PropertyNames.PUBSUB_ID_LABEL, overriddenTransform.getIdLabel()); + stepContext.addInput(PropertyNames.PUBSUB_ID_LABEL, overriddenTransform.getIdLabel()); } - context.addEncodingInput( + stepContext.addEncodingInput( WindowedValue.getValueOnlyCoder(overriddenTransform.getElementCoder())); - context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform)); + stepContext.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform)); } } diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java index 84950f7aeba08..1a5a9a5bccabd 100755 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java @@ -25,6 +25,7 @@ import java.util.HashMap; import java.util.Map; import org.apache.beam.runners.dataflow.DataflowPipelineTranslator; +import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.StepTranslationContext; import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.TransformTranslator; import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.TranslationContext; import org.apache.beam.sdk.io.FileBasedSource; @@ -60,13 +61,13 @@ public static void translateReadHelper(Source source, } } - context.addStep(transform, "ParallelRead"); - context.addInput(PropertyNames.FORMAT, PropertyNames.CUSTOM_SOURCE_FORMAT); - context.addInput( + StepTranslationContext stepContext = context.addStep(transform, "ParallelRead"); + stepContext.addInput(PropertyNames.FORMAT, PropertyNames.CUSTOM_SOURCE_FORMAT); + stepContext.addInput( PropertyNames.SOURCE_STEP_INPUT, cloudSourceToDictionary( CustomSources.serializeToCloudSource(source, context.getPipelineOptions()))); - context.addValueOnlyOutput(context.getOutput(transform)); + stepContext.addValueOnlyOutput(context.getOutput(transform)); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java index 21d575aeb7986..a19fd8ca1480f 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java @@ -61,6 +61,7 @@ import java.util.List; import java.util.Map; import java.util.regex.Pattern; +import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.StepTranslationContext; import org.apache.beam.runners.dataflow.DataflowRunner.BatchViewAsList; import org.apache.beam.runners.dataflow.DataflowRunner.BatchViewAsMap; import org.apache.beam.runners.dataflow.DataflowRunner.BatchViewAsMultimap; @@ -998,8 +999,8 @@ public void translate( // Note: This is about the minimum needed to fake out a // translation. This obviously isn't a real translation. - context.addStep(transform, "TestTranslate"); - context.addOutput(context.getOutput(transform)); + StepTranslationContext stepContext = context.addStep(transform, "TestTranslate"); + stepContext.addOutput(context.getOutput(transform)); } }); From 5d2cb3e2310dbf7046785e9e8f6403b854b2dd03 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Thu, 5 Jan 2017 16:51:23 -0800 Subject: [PATCH 2/3] Move some pieces of Dataflow translator to top level --- .../dataflow/DataflowPipelineTranslator.java | 134 +----------------- .../beam/runners/dataflow/DataflowRunner.java | 8 +- .../runners/dataflow/TransformTranslator.java | 123 ++++++++++++++++ .../dataflow/internal/ReadTranslator.java | 7 +- .../DataflowPipelineTranslatorTest.java | 3 +- .../runners/dataflow/DataflowRunnerTest.java | 5 +- 6 files changed, 137 insertions(+), 143 deletions(-) create mode 100644 runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java index 2385fa1e9f075..e9cf6f4aa523f 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java @@ -56,6 +56,8 @@ import java.util.concurrent.atomic.AtomicLong; import javax.annotation.Nullable; import org.apache.beam.runners.dataflow.DataflowRunner.GroupByKeyAndSortValuesOnly; +import org.apache.beam.runners.dataflow.TransformTranslator.StepTranslationContext; +import org.apache.beam.runners.dataflow.TransformTranslator.TranslationContext; import org.apache.beam.runners.dataflow.internal.ReadTranslator; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.runners.dataflow.util.DoFnInfo; @@ -69,6 +71,7 @@ import org.apache.beam.sdk.runners.TransformHierarchy; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.Combine.GroupedValues; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.GroupByKey; @@ -80,6 +83,7 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; import org.apache.beam.sdk.transforms.windowing.DefaultTrigger; import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.transforms.windowing.Window.Bound; import org.apache.beam.sdk.util.AppliedCombineFn; import org.apache.beam.sdk.util.CloudObject; import org.apache.beam.sdk.util.PropertyNames; @@ -212,130 +216,6 @@ TransformTranslator getTransformTranslator(Class transfo return transformTranslators.get(transformClass); } - /** - * A {@link TransformTranslator} knows how to translate a particular subclass of {@link - * PTransform} for the Cloud Dataflow service. It does so by mutating the {@link - * TranslationContext}. - */ - public interface TransformTranslator { - void translate(TransformT transform, TranslationContext context); - } - - /** - * The interface provided to registered callbacks for interacting - * with the {@link DataflowRunner}, including reading and writing the - * values of {@link PCollection}s and side inputs ({@link PCollectionView}s). - */ - public interface TranslationContext { - /** - * Returns the configured pipeline options. - */ - DataflowPipelineOptions getPipelineOptions(); - - /** - * Returns the input of the currently being translated transform. - */ - InputT getInput(PTransform transform); - - /** - * Returns the output of the currently being translated transform. - */ - OutputT getOutput(PTransform transform); - - /** - * Returns the full name of the currently being translated transform. - */ - String getFullName(PTransform transform); - - /** - * Adds a step to the Dataflow workflow for the given transform, with - * the given Dataflow step type. - */ - StepTranslationContext addStep(PTransform transform, String type); - - /** - * Adds a pre-defined step to the Dataflow workflow. The given PTransform should be - * consistent with the Step, in terms of input, output and coder types. - * - *

This is a low-level operation, when using this method it is up to - * the caller to ensure that names do not collide. - */ - Step addStep(PTransform transform, Step step); - /** - * Encode a PValue reference as an output reference. - */ - OutputReference asOutputReference(PValue value); - } - - public interface StepTranslationContext { - /** - * Sets the encoding for the current Dataflow step. - */ - void addEncodingInput(Coder value); - - /** - * Adds an input with the given name and value to the current - * Dataflow step. - */ - void addInput(String name, Boolean value); - - /** - * Adds an input with the given name and value to the current - * Dataflow step. - */ - void addInput(String name, String value); - - /** - * Adds an input with the given name and value to the current - * Dataflow step. - */ - void addInput(String name, Long value); - - /** - * Adds an input with the given name to the previously added Dataflow - * step, coming from the specified input PValue. - */ - void addInput(String name, PInput value); - - /** - * Adds an input that is a dictionary of strings to objects. - */ - void addInput(String name, Map elements); - - /** - * Adds an input that is a list of objects. - */ - void addInput(String name, List> elements); - - /** - * Adds an output to the previously added Dataflow step, - * producing the specified output {@code PValue}, - * including its {@code Coder} if a {@code TypedPValue}. If the - * {@code PValue} is a {@code PCollection}, wraps its coder inside - * a {@code WindowedValueCoder}. Returns a pipeline level unique id. - */ - long addOutput(PValue value); - - /** - * Adds an output to the previously added Dataflow step, - * producing the specified output {@code PValue}, - * including its {@code Coder} if a {@code TypedPValue}. If the - * {@code PValue} is a {@code PCollection}, wraps its coder inside - * a {@code ValueOnlyCoder}. Returns a pipeline level unique id. - */ - long addValueOnlyOutput(PValue value); - - /** - * Adds an output to the previously added CollectionToSingleton Dataflow step, - * consuming the specified input {@code PValue} and producing the specified output - * {@code PValue}. This step requires special treatment for its - * output encoding. Returns a pipeline level unique id. - */ - long addCollectionToSingletonOutput(PValue inputValue, - PValue outputValue); - } - - ///////////////////////////////////////////////////////////////////////////// /** @@ -838,11 +718,11 @@ private void translateTyped( DataflowPipelineTranslator.registerTransformTranslator( Combine.GroupedValues.class, - new DataflowPipelineTranslator.TransformTranslator() { + new TransformTranslator() { @Override public void translate( Combine.GroupedValues transform, - DataflowPipelineTranslator.TranslationContext context) { + TranslationContext context) { translateHelper(transform, context); } @@ -1007,7 +887,7 @@ private void translateSingleHelper( registerTransformTranslator( Window.Bound.class, - new DataflowPipelineTranslator.TransformTranslator() { + new TransformTranslator() { @Override public void translate( Window.Bound transform, TranslationContext context) { diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index d2c1e668759d3..9da7d2400674f 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -33,7 +33,6 @@ import com.google.api.services.dataflow.model.DataflowPackage; import com.google.api.services.dataflow.model.Job; import com.google.api.services.dataflow.model.ListJobsResponse; -import com.google.api.services.dataflow.model.Step; import com.google.api.services.dataflow.model.WorkerPool; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; @@ -73,9 +72,6 @@ import java.util.TreeSet; import javax.annotation.Nullable; import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.JobSpecification; -import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.StepTranslationContext; -import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.TransformTranslator; -import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.TranslationContext; import org.apache.beam.runners.dataflow.internal.AssignWindows; import org.apache.beam.runners.dataflow.internal.DataflowAggregatorTransforms; import org.apache.beam.runners.dataflow.internal.DataflowUnboundedReadFromBoundedSource; @@ -2315,10 +2311,10 @@ public String getKindString() { } private static class ReadWithIdsTranslator - implements DataflowPipelineTranslator.TransformTranslator> { + implements TransformTranslator> { @Override public void translate(ReadWithIds transform, - DataflowPipelineTranslator.TranslationContext context) { + TranslationContext context) { ReadTranslator.translateReadHelper(transform.getSource(), transform, context); } } diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java new file mode 100644 index 0000000000000..2aa83275a24a8 --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow; + +import com.google.api.services.dataflow.model.Step; +import java.util.List; +import java.util.Map; +import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; +import org.apache.beam.runners.dataflow.util.OutputReference; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PInput; +import org.apache.beam.sdk.values.POutput; +import org.apache.beam.sdk.values.PValue; + +/** + * A {@link TransformTranslator} knows how to translate a particular subclass of {@link PTransform} + * for the Cloud Dataflow service. It does so by mutating the {@link TranslationContext}. + */ +public interface TransformTranslator { + void translate(TransformT transform, TranslationContext context); + + /** + * The interface provided to registered callbacks for interacting with the {@link DataflowRunner}, + * including reading and writing the values of {@link PCollection}s and side inputs. + */ + interface TranslationContext { + /** Returns the configured pipeline options. */ + DataflowPipelineOptions getPipelineOptions(); + + /** Returns the input of the currently being translated transform. */ + InputT getInput(PTransform transform); + + /** Returns the output of the currently being translated transform. */ + OutputT getOutput(PTransform transform); + + /** Returns the full name of the currently being translated transform. */ + String getFullName(PTransform transform); + + /** + * Adds a step to the Dataflow workflow for the given transform, with the given Dataflow step + * type. + */ + StepTranslationContext addStep(PTransform transform, String type); + + /** + * Adds a pre-defined step to the Dataflow workflow. The given PTransform should be consistent + * with the Step, in terms of input, output and coder types. + * + *

This is a low-level operation, when using this method it is up to the caller to ensure + * that names do not collide. + */ + Step addStep(PTransform transform, Step step); + /** Encode a PValue reference as an output reference. */ + OutputReference asOutputReference(PValue value); + } + + /** The interface for a {@link TransformTranslator} to build a Dataflow step. */ + interface StepTranslationContext { + /** Sets the encoding for this Dataflow step. */ + void addEncodingInput(Coder value); + + /** Adds an input with the given name and value to this Dataflow step. */ + void addInput(String name, Boolean value); + + /** Adds an input with the given name and value to this Dataflow step. */ + void addInput(String name, String value); + + /** Adds an input with the given name and value to this Dataflow step. */ + void addInput(String name, Long value); + + /** + * Adds an input with the given name to this Dataflow step, coming from the specified input + * PValue. + */ + void addInput(String name, PInput value); + + /** Adds an input that is a dictionary of strings to objects. */ + void addInput(String name, Map elements); + + /** Adds an input that is a list of objects. */ + void addInput(String name, List> elements); + + /** + * Adds an output to this Dataflow step, producing the specified output {@code PValue}, + * including its {@code Coder} if a {@code TypedPValue}. If the {@code PValue} is a {@code + * PCollection}, wraps its coder inside a {@code WindowedValueCoder}. Returns a pipeline level + * unique id. + */ + long addOutput(PValue value); + + /** + * Adds an output to this Dataflow step, producing the specified output {@code PValue}, + * including its {@code Coder} if a {@code TypedPValue}. If the {@code PValue} is a {@code + * PCollection}, wraps its coder inside a {@code ValueOnlyCoder}. Returns a pipeline level + * unique id. + */ + long addValueOnlyOutput(PValue value); + + /** + * Adds an output to this {@code CollectionToSingleton} Dataflow step, consuming the specified + * input {@code PValue} and producing the specified output {@code PValue}. This step requires + * special treatment for its output encoding. Returns a pipeline level unique id. + */ + long addCollectionToSingletonOutput(PValue inputValue, PValue outputValue); + } +} diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java index 1a5a9a5bccabd..a15a2a3dc242c 100755 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java @@ -24,10 +24,7 @@ import com.google.api.services.dataflow.model.SourceMetadata; import java.util.HashMap; import java.util.Map; -import org.apache.beam.runners.dataflow.DataflowPipelineTranslator; -import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.StepTranslationContext; -import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.TransformTranslator; -import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.TranslationContext; +import org.apache.beam.runners.dataflow.TransformTranslator; import org.apache.beam.sdk.io.FileBasedSource; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.io.Source; @@ -47,7 +44,7 @@ public void translate(Read.Bounded transform, TranslationContext context) { public static void translateReadHelper(Source source, PTransform transform, - DataflowPipelineTranslator.TranslationContext context) { + TranslationContext context) { try { // TODO: Move this validation out of translation once IOChannelUtils is portable // and can be reconstructed on the worker. diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java index ab82941f02a9f..84b585ad3586f 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java @@ -56,7 +56,6 @@ import java.util.Map; import java.util.Set; import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.JobSpecification; -import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.TranslationContext; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions; import org.apache.beam.runners.dataflow.util.OutputReference; @@ -566,7 +565,7 @@ protected Coder getDefaultOutputCoder() { * {@link TranslationContext#addStep} and remaps the input port reference. */ private static class EmbeddedTranslator - implements DataflowPipelineTranslator.TransformTranslator { + implements TransformTranslator { @Override public void translate(EmbeddedTransform transform, TranslationContext context) { addObject(transform.step.getProperties(), PropertyNames.PARALLEL_INPUT, context.asOutputReference(context.getInput(transform))); diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java index a19fd8ca1480f..4fff1c6b3bdb8 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java @@ -61,7 +61,6 @@ import java.util.List; import java.util.Map; import java.util.regex.Pattern; -import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.StepTranslationContext; import org.apache.beam.runners.dataflow.DataflowRunner.BatchViewAsList; import org.apache.beam.runners.dataflow.DataflowRunner.BatchViewAsMap; import org.apache.beam.runners.dataflow.DataflowRunner.BatchViewAsMultimap; @@ -989,12 +988,12 @@ public void testTransformTranslator() throws IOException { DataflowPipelineTranslator.registerTransformTranslator( TestTransform.class, - new DataflowPipelineTranslator.TransformTranslator() { + new TransformTranslator() { @SuppressWarnings("unchecked") @Override public void translate( TestTransform transform, - DataflowPipelineTranslator.TranslationContext context) { + TranslationContext context) { transform.translated = true; // Note: This is about the minimum needed to fake out a From 33907f8908238199b166070bc1e12796af32829a Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Thu, 5 Jan 2017 17:15:52 -0800 Subject: [PATCH 3/3] Reduce visibility of many Dataflow runner internals --- .../beam/runners/dataflow/{internal => }/AssignWindows.java | 4 ++-- .../{internal => }/DataflowAggregatorTransforms.java | 4 ++-- .../{internal => }/DataflowMetricUpdateExtractor.java | 4 ++-- .../apache/beam/runners/dataflow/DataflowPipelineJob.java | 2 -- .../beam/runners/dataflow/DataflowPipelineTranslator.java | 3 +-- .../org/apache/beam/runners/dataflow/DataflowRunner.java | 4 ---- .../DataflowUnboundedReadFromBoundedSource.java | 4 ++-- .../runners/dataflow/{internal => }/ReadTranslator.java | 6 +++--- .../apache/beam/runners/dataflow/TransformTranslator.java | 2 +- .../beam/runners/dataflow/DataflowPipelineJobTest.java | 1 - .../DataflowUnboundedReadFromBoundedSourceTest.java | 2 +- 11 files changed, 14 insertions(+), 22 deletions(-) rename runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/{internal => }/AssignWindows.java (95%) rename runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/{internal => }/DataflowAggregatorTransforms.java (97%) rename runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/{internal => }/DataflowMetricUpdateExtractor.java (97%) rename runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/{internal => }/DataflowUnboundedReadFromBoundedSource.java (99%) rename runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/{internal => }/ReadTranslator.java (95%) rename runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/{internal => }/DataflowUnboundedReadFromBoundedSourceTest.java (98%) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/AssignWindows.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/AssignWindows.java similarity index 95% rename from runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/AssignWindows.java rename to runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/AssignWindows.java index 27fe13d76b1df..880cd26ef3377 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/AssignWindows.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/AssignWindows.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.runners.dataflow.internal; +package org.apache.beam.runners.dataflow; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.DoFn; @@ -41,7 +41,7 @@ * * @param the type of input element */ -public class AssignWindows extends PTransform, PCollection> { +class AssignWindows extends PTransform, PCollection> { private final Window.Bound transform; /** diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowAggregatorTransforms.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowAggregatorTransforms.java similarity index 97% rename from runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowAggregatorTransforms.java rename to runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowAggregatorTransforms.java index fb7897361fa5b..0198ccac70b09 100755 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowAggregatorTransforms.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowAggregatorTransforms.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.runners.dataflow.internal; +package org.apache.beam.runners.dataflow; import com.google.common.collect.BiMap; import com.google.common.collect.HashBiMap; @@ -32,7 +32,7 @@ /** * A mapping relating {@link Aggregator}s and the {@link PTransform} in which they are used. */ -public class DataflowAggregatorTransforms { +class DataflowAggregatorTransforms { private final Map, Collection>> aggregatorTransforms; private final Multimap, AppliedPTransform> transformAppliedTransforms; private final BiMap, String> appliedStepNames; diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowMetricUpdateExtractor.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetricUpdateExtractor.java similarity index 97% rename from runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowMetricUpdateExtractor.java rename to runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetricUpdateExtractor.java index d715437ae68d0..f725c46f25f8f 100755 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowMetricUpdateExtractor.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetricUpdateExtractor.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.runners.dataflow.internal; +package org.apache.beam.runners.dataflow; import com.google.api.services.dataflow.model.MetricStructuredName; import com.google.api.services.dataflow.model.MetricUpdate; @@ -32,7 +32,7 @@ * Methods for extracting the values of an {@link Aggregator} from a collection of {@link * MetricUpdate MetricUpdates}. */ -public final class DataflowMetricUpdateExtractor { +final class DataflowMetricUpdateExtractor { private static final String STEP_NAME_CONTEXT_KEY = "step"; private static final String IS_TENTATIVE_KEY = "tentative"; diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java index 00c88f994958e..0da7137742d2e 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java @@ -35,8 +35,6 @@ import java.util.List; import java.util.Map; import javax.annotation.Nullable; -import org.apache.beam.runners.dataflow.internal.DataflowAggregatorTransforms; -import org.apache.beam.runners.dataflow.internal.DataflowMetricUpdateExtractor; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.runners.dataflow.util.MonitoringUtil; import org.apache.beam.sdk.AggregatorRetrievalException; diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java index e9cf6f4aa523f..8e5901e643be6 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java @@ -58,7 +58,6 @@ import org.apache.beam.runners.dataflow.DataflowRunner.GroupByKeyAndSortValuesOnly; import org.apache.beam.runners.dataflow.TransformTranslator.StepTranslationContext; import org.apache.beam.runners.dataflow.TransformTranslator.TranslationContext; -import org.apache.beam.runners.dataflow.internal.ReadTranslator; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.runners.dataflow.util.DoFnInfo; import org.apache.beam.runners.dataflow.util.OutputReference; @@ -106,7 +105,7 @@ * into Cloud Dataflow Service API {@link Job}s. */ @SuppressWarnings({"rawtypes", "unchecked"}) -public class DataflowPipelineTranslator { +class DataflowPipelineTranslator { // Must be kept in sync with their internal counterparts. private static final Logger LOG = LoggerFactory.getLogger(DataflowPipelineTranslator.class); private static final ObjectMapper MAPPER = new ObjectMapper(); diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 9da7d2400674f..9ff856aceff7c 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -72,14 +72,10 @@ import java.util.TreeSet; import javax.annotation.Nullable; import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.JobSpecification; -import org.apache.beam.runners.dataflow.internal.AssignWindows; -import org.apache.beam.runners.dataflow.internal.DataflowAggregatorTransforms; -import org.apache.beam.runners.dataflow.internal.DataflowUnboundedReadFromBoundedSource; import org.apache.beam.runners.dataflow.internal.IsmFormat; import org.apache.beam.runners.dataflow.internal.IsmFormat.IsmRecord; import org.apache.beam.runners.dataflow.internal.IsmFormat.IsmRecordCoder; import org.apache.beam.runners.dataflow.internal.IsmFormat.MetadataKeyCoder; -import org.apache.beam.runners.dataflow.internal.ReadTranslator; import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions; diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowUnboundedReadFromBoundedSource.java similarity index 99% rename from runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java rename to runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowUnboundedReadFromBoundedSource.java index a2ae799e2e337..cfb5ebce60b41 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowUnboundedReadFromBoundedSource.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.runners.dataflow.internal; +package org.apache.beam.runners.dataflow; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; @@ -78,7 +78,7 @@ * time dependency. It should be replaced in the dataflow worker as an execution time dependency. */ @Deprecated -public class DataflowUnboundedReadFromBoundedSource extends PTransform> { +class DataflowUnboundedReadFromBoundedSource extends PTransform> { private static final Logger LOG = LoggerFactory.getLogger(DataflowUnboundedReadFromBoundedSource.class); diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java similarity index 95% rename from runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java rename to runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java index a15a2a3dc242c..ed03b53e39c07 100755 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.runners.dataflow.internal; +package org.apache.beam.runners.dataflow; import static org.apache.beam.sdk.util.Structs.addBoolean; import static org.apache.beam.sdk.util.Structs.addDictionary; @@ -24,7 +24,7 @@ import com.google.api.services.dataflow.model.SourceMetadata; import java.util.HashMap; import java.util.Map; -import org.apache.beam.runners.dataflow.TransformTranslator; +import org.apache.beam.runners.dataflow.internal.CustomSources; import org.apache.beam.sdk.io.FileBasedSource; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.io.Source; @@ -36,7 +36,7 @@ /** * Translator for the {@code Read} {@code PTransform} for the Dataflow back-end. */ -public class ReadTranslator implements TransformTranslator> { +class ReadTranslator implements TransformTranslator> { @Override public void translate(Read.Bounded transform, TranslationContext context) { translateReadHelper(transform.getSource(), transform, context); diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java index 2aa83275a24a8..fb883a7566e70 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java @@ -33,7 +33,7 @@ * A {@link TransformTranslator} knows how to translate a particular subclass of {@link PTransform} * for the Cloud Dataflow service. It does so by mutating the {@link TranslationContext}. */ -public interface TransformTranslator { +interface TransformTranslator { void translate(TransformT transform, TranslationContext context); /** diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java index 6999e03e1484d..d5d7aa989c475 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java @@ -54,7 +54,6 @@ import java.net.SocketTimeoutException; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; -import org.apache.beam.runners.dataflow.internal.DataflowAggregatorTransforms; import org.apache.beam.runners.dataflow.testing.TestDataflowPipelineOptions; import org.apache.beam.runners.dataflow.util.MonitoringUtil; import org.apache.beam.sdk.AggregatorRetrievalException; diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSourceTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowUnboundedReadFromBoundedSourceTest.java similarity index 98% rename from runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSourceTest.java rename to runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowUnboundedReadFromBoundedSourceTest.java index d38428b80ff8a..c479332a84128 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSourceTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowUnboundedReadFromBoundedSourceTest.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.beam.runners.dataflow.internal; +package org.apache.beam.runners.dataflow; import static org.junit.Assert.assertEquals;