From 4a87d8591a54b4f0783357bf26b61abb656d8a88 Mon Sep 17 00:00:00 2001 From: Vikas Kedigehalli Date: Thu, 4 May 2017 18:30:35 -0700 Subject: [PATCH 1/3] Fix default temp location for DataflowRunner --- .../runners/dataflow/options/DataflowPipelineOptions.java | 4 ++++ .../apache/beam/sdk/extensions/gcp/options/GcpOptions.java | 4 ++-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java index 11618af9b1fe..06c151ca49f6 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java @@ -33,6 +33,8 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.StreamingOptions; import org.apache.beam.sdk.options.Validation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Options that can be used to configure the {@link DataflowRunner}. @@ -117,10 +119,12 @@ public interface DataflowPipelineOptions * Returns a default staging location under {@link GcpOptions#getGcpTempLocation}. */ class StagingLocationFactory implements DefaultValueFactory { + private static final Logger LOG = LoggerFactory.getLogger(StagingLocationFactory.class); @Override public String create(PipelineOptions options) { GcsOptions gcsOptions = options.as(GcsOptions.class); + LOG.info("No staging location provided, falling back to temp location."); String gcpTempLocation; try { gcpTempLocation = gcsOptions.getGcpTempLocation(); diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java index 126b7952d8d2..77c2fad20a4c 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java @@ -278,7 +278,7 @@ static String tryCreateDefaultBucket( } final String bucketName = "dataflow-staging-" + region + "-" + projectNumber; - LOG.info("No staging location provided, attempting to use default bucket: {}", + LOG.info("No temp location provided, attempting to use default bucket: {}", bucketName); Bucket bucket = new Bucket() .setName(bucketName) @@ -306,7 +306,7 @@ static String tryCreateDefaultBucket( throw new RuntimeException( "Unable to determine the owner of the default bucket at gs://" + bucketName, e); } - return "gs://" + bucketName; + return "gs://" + bucketName + "/temp/"; } /** From e8421d8e62c5e292bab73f1265f74b7840e997aa Mon Sep 17 00:00:00 2001 From: Vikas Kedigehalli Date: Fri, 5 May 2017 09:04:02 -0700 Subject: [PATCH 2/3] Fix tests --- .../apache/beam/sdk/extensions/gcp/options/GcpOptionsTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptionsTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptionsTest.java index 68b3818762b0..3cf698030a0d 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptionsTest.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptionsTest.java @@ -220,7 +220,7 @@ public void testCreateBucket() throws Exception { when(mockGcsUtil.bucketOwner(any(GcsPath.class))).thenReturn(1L); String bucket = GcpTempLocationFactory.tryCreateDefaultBucket(options, mockCrmClient); - assertEquals("gs://dataflow-staging-us-north1-1", bucket); + assertEquals("gs://dataflow-staging-us-north1-1/temp/", bucket); } @Test From 02b50fdf234f19580fdc3d9373361d70fd5f10ad Mon Sep 17 00:00:00 2001 From: Vikas Kedigehalli Date: Fri, 5 May 2017 10:38:20 -0700 Subject: [PATCH 3/3] Address comments --- .../beam/runners/dataflow/options/DataflowPipelineOptions.java | 2 +- .../org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java index 06c151ca49f6..4af420b4cae4 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java @@ -124,7 +124,7 @@ class StagingLocationFactory implements DefaultValueFactory { @Override public String create(PipelineOptions options) { GcsOptions gcsOptions = options.as(GcsOptions.class); - LOG.info("No staging location provided, falling back to temp location."); + LOG.info("No stagingLocation provided, falling back to gcpTempLocation"); String gcpTempLocation; try { gcpTempLocation = gcsOptions.getGcpTempLocation(); diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java index 77c2fad20a4c..a4128e803505 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java @@ -278,7 +278,7 @@ static String tryCreateDefaultBucket( } final String bucketName = "dataflow-staging-" + region + "-" + projectNumber; - LOG.info("No temp location provided, attempting to use default bucket: {}", + LOG.info("No tempLocation specified, attempting to use default bucket: {}", bucketName); Bucket bucket = new Bucket() .setName(bucketName)