Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -247,14 +247,6 @@ public static DataflowRunner fromOptions(PipelineOptions options) {
"Missing required values: " + Joiner.on(',').join(missing));
}

if (dataflowOptions.getRegion() == null) {
dataflowOptions.setRegion("us-central1");
LOG.warn(
"--region not set; will default to us-central1. Future releases of Beam will "
+ "require the user to set the region explicitly. "
+ "https://cloud.google.com/compute/docs/regions-zones/regions-zones");
}

validateWorkerSettings(PipelineOptionsValidator.validate(GcpOptions.class, options));

PathValidator validator = dataflowOptions.getPathValidator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,13 @@
*/
package org.apache.beam.runners.dataflow.options;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
Expand Down Expand Up @@ -118,8 +124,6 @@ public interface DataflowPipelineOptions
* The Google Compute Engine <a
* href="https://cloud.google.com/compute/docs/regions-zones/regions-zones">region</a> for
* creating Dataflow jobs.
*
* <p>NOTE: The Cloud Dataflow now also supports the region flag.
*/
@Hidden
@Experimental
Expand All @@ -128,6 +132,7 @@ public interface DataflowPipelineOptions
+ "https://cloud.google.com/compute/docs/regions-zones/regions-zones for a list of valid "
+ "options. Currently defaults to us-central1, but future releases of Beam will "
+ "require the user to set the region explicitly.")
@Default.InstanceFactory(DefaultGcpRegionFactory.class)
String getRegion();

void setRegion(String region);
Expand Down Expand Up @@ -201,4 +206,52 @@ public String create(PipelineOptions options) {
.toString();
}
}

/**
* Factory for a default value for Google Cloud region according to
* https://cloud.google.com/compute/docs/gcloud-compute/#default-properties. If no other default
* can be found, returns "us-central1".
*/
class DefaultGcpRegionFactory implements DefaultValueFactory<String> {
private static final Logger LOG = LoggerFactory.getLogger(DefaultGcpRegionFactory.class);

@Override
public String create(PipelineOptions options) {
String environmentRegion = System.getenv("CLOUDSDK_COMPUTE_REGION");
if (environmentRegion != null && !environmentRegion.isEmpty()) {
LOG.info("Using default GCP region {} from $CLOUDSDK_COMPUTE_REGION", environmentRegion);
return environmentRegion;
}
try {
ProcessBuilder pb =
new ProcessBuilder(Arrays.asList("gcloud", "config", "get-value", "compute/region"));
Process process = pb.start();
try (BufferedReader reader =
new BufferedReader(
new InputStreamReader(process.getInputStream(), StandardCharsets.UTF_8));
BufferedReader errorReader =
new BufferedReader(
new InputStreamReader(process.getErrorStream(), StandardCharsets.UTF_8))) {
if (process.waitFor(2, TimeUnit.SECONDS) && process.exitValue() == 0) {
String gcloudRegion = reader.lines().collect(Collectors.joining());
if (!gcloudRegion.isEmpty()) {
LOG.info("Using default GCP region {} from gcloud CLI", gcloudRegion);
return gcloudRegion;
}
} else {
String stderr = errorReader.lines().collect(Collectors.joining("\n"));
LOG.debug("gcloud exited with exit value {}. Stderr:\n{}", process.exitValue(), stderr);
}
}
} catch (Exception e) {
// Ignore.
LOG.debug("Unable to get gcloud compute region", e);
}
LOG.warn(
"Region will default to us-central1. Future releases of Beam will "
+ "require the user to set the region explicitly. "
+ "https://cloud.google.com/compute/docs/regions-zones/regions-zones");
return "us-central1";
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -199,4 +199,10 @@ public void testDefaultStagingLocationUnset() {
thrown.expectMessage("Error constructing default value for stagingLocation");
options.getStagingLocation();
}

@Test
public void testDefaultGcpRegion() {
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
assertEquals("us-central1", options.getRegion());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test will fail on any machine where the CLOUDSDK_COMPUTE_REGION is defined or gcloud config compute/region is set.

You can test your code by instantiating the DefaultGcpRegionFactory directly and ensuring that it is factored in such a way where you can pass in the environment map directly to it and similarly for testing the process execution.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for catching this Luke. Filed #10048 to improve this.

}
}