From 6c1ebdbe142f073df42447339eade1d98641a21a Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Wed, 1 Nov 2017 14:01:11 -0700 Subject: [PATCH] Stage the portable pipeline; put URL in pipeline options --- .../apache/beam/runners/dataflow/DataflowRunner.java | 11 ++++------- .../dataflow/options/DataflowPipelineOptions.java | 7 +++++++ .../beam/runners/dataflow/DataflowRunnerTest.java | 10 +++++----- 3 files changed, 16 insertions(+), 12 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 334c8e53e7c4..0a20a0f173c4 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -192,9 +192,6 @@ public class DataflowRunner extends PipelineRunner { @VisibleForTesting static final String PIPELINE_FILE_NAME = "pipeline.pb"; - @VisibleForTesting - static final String STAGED_PIPELINE_METADATA_PROPERTY = "pipeline_url"; - private final Set> pcollectionsRequiringIndexedFormat; /** @@ -544,6 +541,10 @@ public DataflowPipelineJob run(Pipeline pipeline) { DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class); maybeRegisterDebuggee(dataflowOptions, requestId); + // Set the location of the staged pipeline; this must happen before + // translation, because that is where the JSON pipeline options are set up + dataflowOptions.setPipelineUrl(stagedPipeline.getLocation()); + JobSpecification jobSpecification = translator.translate(pipeline, this, packages); Job newJob = jobSpecification.getJob(); @@ -571,10 +572,6 @@ public DataflowPipelineJob run(Pipeline pipeline) { String workerHarnessContainerImage = getContainerImageForJob(options); for (WorkerPool workerPool : newJob.getEnvironment().getWorkerPools()) { workerPool.setWorkerHarnessContainerImage(workerHarnessContainerImage); - - // https://issues.apache.org/jira/browse/BEAM-3116 - // workerPool.setMetadata( - // ImmutableMap.of(STAGED_PIPELINE_METADATA_PROPERTY, stagedPipeline.getLocation())); } newJob.getEnvironment().setVersion(getEnvironmentVersion(options)); diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java index 091f89bc22be..ddb98f16d1f4 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java @@ -123,6 +123,13 @@ public interface DataflowPipelineOptions Map getLabels(); void setLabels(Map labels); + /** + * The URL of the staged portable pipeline. + */ + @Description("The URL of the staged portable pipeline") + String getPipelineUrl(); + void setPipelineUrl(String urlString); + /** * Returns a default staging location under {@link GcpOptions#getGcpTempLocation}. */ diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java index 66cf11db4676..3467d53df997 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java @@ -22,6 +22,7 @@ import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.startsWith; @@ -63,6 +64,7 @@ import java.util.Collections; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import java.util.regex.Pattern; import javax.annotation.Nullable; @@ -164,11 +166,9 @@ private static void assertValidJob(Job job) { assertNull(job.getCurrentState()); assertTrue(Pattern.matches("[a-z]([-a-z0-9]*[a-z0-9])?", job.getName())); - // https://issues.apache.org/jira/browse/BEAM-3116 - // for (WorkerPool workerPool : job.getEnvironment().getWorkerPools()) { - // assertThat(workerPool.getMetadata(), - // hasKey(DataflowRunner.STAGED_PIPELINE_METADATA_PROPERTY)); - // } + assertThat( + (Map) job.getEnvironment().getSdkPipelineOptions().get("options"), + hasKey("pipelineUrl")); } @Before