From 9ee103b25b232cbd53da05510d45c2cec2a8676a Mon Sep 17 00:00:00 2001 From: Robert Burke Date: Tue, 27 Jun 2017 15:41:56 -0700 Subject: [PATCH 1/6] BEAM-2524 Update Google Cloud Dataflow FE URLs from the Dataflow Runners to regionalized paths. --- .../beam/runners/dataflow/DataflowPipelineJob.java | 14 ++++++++++++-- .../beam/runners/dataflow/DataflowRunner.java | 3 ++- .../beam/runners/dataflow/util/MonitoringUtil.java | 9 ++++----- .../dataflow/BatchStatefulParDoOverridesTest.java | 1 + .../dataflow/DataflowPipelineTranslatorTest.java | 1 + .../runners/dataflow/internal/apiclient.py | 7 +++++-- .../runners/dataflow/test_dataflow_runner.py | 5 +++-- 7 files changed, 28 insertions(+), 12 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java index 2d2398314cb8..864a4575e975 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java @@ -164,6 +164,13 @@ public String getProjectId() { return dataflowOptions.getProject(); } + /** + * Get the region this job exists in. + */ + public String getRegion() { + return dataflowOptions.getRegion(); + } + /** * Returns a new {@link DataflowPipelineJob} for the job that replaced this one, if applicable. * @@ -340,7 +347,9 @@ State waitUntilFinish( getJobId(), getReplacedByJob().getJobId(), MonitoringUtil.getJobMonitoringPageURL( - getReplacedByJob().getProjectId(), getReplacedByJob().getJobId())); + getReplacedByJob().getProjectId(), + getRegion(), + getReplacedByJob().getJobId())); break; default: LOG.info("Job {} failed with status {}.", getJobId(), state); @@ -418,7 +427,8 @@ public State call() throws Exception { "Failed to cancel job in state %s, " + "please go to the Developers Console to cancel it manually: %s", state, - MonitoringUtil.getJobMonitoringPageURL(getProjectId(), getJobId())); + MonitoringUtil.getJobMonitoringPageURL( + getProjectId(), getRegion(), getJobId())); LOG.warn(errorMsg); throw new IOException(errorMsg, e); } diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 5d9f0f32aca4..ca892e51e045 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -679,7 +679,8 @@ public DataflowPipelineJob run(Pipeline pipeline) { } LOG.info("To access the Dataflow monitoring console, please navigate to {}", - MonitoringUtil.getJobMonitoringPageURL(options.getProject(), jobResult.getId())); + MonitoringUtil.getJobMonitoringPageURL( + options.getProject(), options.getRegion(), jobResult.getId())); System.out.println("Submitted job: " + jobResult.getId()); LOG.info("To cancel the job using the 'gcloud' tool, run:\n> {}", diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java index 759387c8e5b2..cb073dda7bc5 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java @@ -180,14 +180,13 @@ public List getJobMessages( return allMessages; } - public static String getJobMonitoringPageURL(String projectName, String jobId) { + public static String getJobMonitoringPageURL(String projectName, String regionId, String jobId) { try { // Project name is allowed in place of the project id: the user will be redirected to a URL // that has the project name replaced with project id. - return String.format( - "https://console.developers.google.com/project/%s/dataflow/job/%s", - URLEncoder.encode(projectName, "UTF-8"), - URLEncoder.encode(jobId, "UTF-8")); + return String.format("https://console.cloud.google.com/dataflow/jobsDetail/locations/%s/jobs/%s?project=%s", + URLEncoder.encode(regionId, "UTF-8"), URLEncoder.encode(jobId, "UTF-8"), + URLEncoder.encode(projectName, "UTF-8")); } catch (UnsupportedEncodingException e) { // Should never happen. throw new AssertionError("UTF-8 encoding is not supported by the environment", e); diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java index d2ab3579f75a..e62a8b8ccdc0 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java @@ -161,6 +161,7 @@ public List answer(InvocationOnMock invocation) throws Throwable { options.setGcpCredential(new TestCredential()); options.setJobName("some-job-name"); options.setProject("some-project"); + options.setRegion("some-region"); options.setTempLocation(GcsPath.fromComponents("somebucket", "some/path").toString()); options.setFilesToStage(new LinkedList()); options.setGcsUtil(mockGcsUtil); diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java index 948af1cf606a..7e76091873a2 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java @@ -200,6 +200,7 @@ public List answer(InvocationOnMock invocation) throws Throwable { options.setGcpCredential(new TestCredential()); options.setJobName("some-job-name"); options.setProject("some-project"); + options.setRegion("some-region"); options.setTempLocation(GcsPath.fromComponents("somebucket", "some/path").toString()); options.setFilesToStage(new LinkedList()); options.setDataflowClient(buildMockDataflow(new IsValidCreateRequest())); diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py index df1a3f22d9cd..f587ff8ed9da 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py @@ -489,8 +489,11 @@ def submit_job_description(self, job): logging.info('Created job with id: [%s]', response.id) logging.info( 'To access the Dataflow monitoring console, please navigate to ' - 'https://console.developers.google.com/project/%s/dataflow/job/%s', - self.google_cloud_options.project, response.id) + 'https://console.cloud.google.com/dataflow/jobsDetail' + '/locations/%s/jobs/%s?project=%s', + self.google_cloud_options.region, + response.id, + self.google_cloud_options.project) return response diff --git a/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py index b339882de0f5..96e6a66caab4 100644 --- a/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py @@ -38,12 +38,13 @@ def run(self, pipeline): self.result = super(TestDataflowRunner, self).run(pipeline) if self.result.has_job: project = pipeline._options.view_as(GoogleCloudOptions).project + region_id = pipeline._options.view_as(GoogleCloudOptions).region job_id = self.result.job_id() # TODO(markflyhigh)(BEAM-1890): Use print since Nose dosen't show logs # in some cases. print ( - 'Found: https://console.cloud.google.com/dataflow/job/%s?project=%s' % - (job_id, project)) + 'Found: https://console.cloud.google.com/dataflow/jobsDetail' + '/locations/%s/jobs/%s?project=%s' % (region_id, job_id, project)) self.result.wait_until_finish() if on_success_matcher: From d38ab5edc8346854dae4092681b0193a0897767d Mon Sep 17 00:00:00 2001 From: Robert Burke Date: Wed, 28 Jun 2017 09:12:32 -0700 Subject: [PATCH 2/6] Update too long line in MonitoringUtil.java --- .../apache/beam/runners/dataflow/util/MonitoringUtil.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java index cb073dda7bc5..ab04915cb6cc 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java @@ -184,8 +184,10 @@ public static String getJobMonitoringPageURL(String projectName, String regionId try { // Project name is allowed in place of the project id: the user will be redirected to a URL // that has the project name replaced with project id. - return String.format("https://console.cloud.google.com/dataflow/jobsDetail/locations/%s/jobs/%s?project=%s", - URLEncoder.encode(regionId, "UTF-8"), URLEncoder.encode(jobId, "UTF-8"), + return String.format( + "https://console.cloud.google.com/dataflow/jobsDetail/locations/%s/jobs/%s?project=%s", + URLEncoder.encode(regionId, "UTF-8"), + URLEncoder.encode(jobId, "UTF-8"), URLEncoder.encode(projectName, "UTF-8")); } catch (UnsupportedEncodingException e) { // Should never happen. From ac63ddaa60d300575749d1e854598b1ae45b7e82 Mon Sep 17 00:00:00 2001 From: Robert Burke Date: Wed, 28 Jun 2017 09:19:35 -0700 Subject: [PATCH 3/6] Fix indentation. --- .../org/apache/beam/runners/dataflow/DataflowPipelineJob.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java index 864a4575e975..ca6db8c1f30b 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java @@ -428,7 +428,7 @@ public State call() throws Exception { + "please go to the Developers Console to cancel it manually: %s", state, MonitoringUtil.getJobMonitoringPageURL( - getProjectId(), getRegion(), getJobId())); + getProjectId(), getRegion(), getJobId())); LOG.warn(errorMsg); throw new IOException(errorMsg, e); } From 31cbd83af77713f6d3cebdfc3818fb11d969cd29 Mon Sep 17 00:00:00 2001 From: Robert Burke Date: Mon, 17 Jul 2017 20:27:10 -0700 Subject: [PATCH 4/6] Use old getJobMonitoringPageURL as overload. Use old getJobMonitoringPageURL signature as overload. Fill in "us-central1" as the default when it's used. --- .../org/apache/beam/runners/dataflow/util/MonitoringUtil.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java index ab04915cb6cc..ba4f68e3f326 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java @@ -180,6 +180,10 @@ public List getJobMessages( return allMessages; } + public static String getJobMonitoringPageURL(String projectName, String jobId) { + return getJobMonitoringPageURL(projectName, "us-central1", jobId); + } + public static String getJobMonitoringPageURL(String projectName, String regionId, String jobId) { try { // Project name is allowed in place of the project id: the user will be redirected to a URL From 167115a55212b5811f33014f367a1385546677d4 Mon Sep 17 00:00:00 2001 From: Robert Burke Date: Tue, 18 Jul 2017 11:11:43 -0700 Subject: [PATCH 5/6] Deprecate two parameter getJobMonitoringPageURL() --- .../apache/beam/runners/dataflow/util/MonitoringUtil.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java index ba4f68e3f326..6ce185751e93 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java @@ -180,6 +180,11 @@ public List getJobMessages( return allMessages; } + /** + * @deprecated this method defaults the region to "us-central1". Prefer using the overload with an explicit regionId + * parameter. + */ + @Deprecated public static String getJobMonitoringPageURL(String projectName, String jobId) { return getJobMonitoringPageURL(projectName, "us-central1", jobId); } From a3a0c29dffa82f70af15afea7624edc3c29044b8 Mon Sep 17 00:00:00 2001 From: Robert Burke Date: Tue, 18 Jul 2017 12:52:39 -0700 Subject: [PATCH 6/6] delint - line length --- .../org/apache/beam/runners/dataflow/util/MonitoringUtil.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java index 6ce185751e93..780a97925210 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java @@ -181,8 +181,8 @@ public List getJobMessages( } /** - * @deprecated this method defaults the region to "us-central1". Prefer using the overload with an explicit regionId - * parameter. + * @deprecated this method defaults the region to "us-central1". Prefer using the overload with + * an explicit regionId parameter. */ @Deprecated public static String getJobMonitoringPageURL(String projectName, String jobId) {