From d1bf08463e091d6900dd81bcfde4948f9f9a09b9 Mon Sep 17 00:00:00 2001 From: bchambers Date: Wed, 2 Mar 2016 17:38:49 -0800 Subject: [PATCH 1/3] Register debuggee prior to job submission --- pom.xml | 1 + sdk/pom.xml | 14 +++++ .../sdk/options/CloudDebuggerOptions.java | 9 +++- .../sdk/runners/DataflowPipelineRunner.java | 51 +++++++++++++++++-- .../cloud/dataflow/sdk/util/Transport.java | 12 +++++ 5 files changed, 82 insertions(+), 5 deletions(-) diff --git a/pom.xml b/pom.xml index ba130d25a3d25..dcc31a2b7bdbc 100644 --- a/pom.xml +++ b/pom.xml @@ -70,6 +70,7 @@ 1.7.7 v2-rev248-1.21.0 0.2.3 + v2-rev6-1.21.0 v1b3-rev19-1.21.0 0.5.160222 v1beta2-rev1-4.0.0 diff --git a/sdk/pom.xml b/sdk/pom.xml index c49c175709013..9be93db77d5c0 100644 --- a/sdk/pom.xml +++ b/sdk/pom.xml @@ -502,6 +502,20 @@ + + com.google.apis + google-api-services-clouddebugger + ${clouddebugger.version} + + + + com.google.guava + guava-jdk5 + + + + com.google.apis google-api-services-pubsub diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/CloudDebuggerOptions.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/CloudDebuggerOptions.java index 62be4c9ec2e22..2e1ad9451fa0a 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/CloudDebuggerOptions.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/CloudDebuggerOptions.java @@ -16,8 +16,11 @@ package com.google.cloud.dataflow.sdk.options; +import com.google.api.services.clouddebugger.v2.model.Debuggee; import com.google.cloud.dataflow.sdk.annotations.Experimental; +import javax.annotation.Nullable; + /** * Options for controlling Cloud Debugger. */ @@ -32,5 +35,9 @@ public interface CloudDebuggerOptions { @Description("Whether to enable the Cloud Debugger snapshot agent for the current job.") boolean getEnableCloudDebugger(); void setEnableCloudDebugger(boolean enabled); -} + @Description("The Cloud Debugger debugee to associate with. This should not be set directly.") + @Hidden + @Nullable Debuggee getDebuggee(); + void setDebuggee(Debuggee debuggee); +} diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java index 6eb6c2f7ad968..d2d5aa22de133 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java @@ -23,6 +23,10 @@ import static com.google.common.base.Preconditions.checkState; import com.google.api.client.googleapis.json.GoogleJsonResponseException; +import com.google.api.services.clouddebugger.v2.Clouddebugger; +import com.google.api.services.clouddebugger.v2.model.Debuggee; +import com.google.api.services.clouddebugger.v2.model.RegisterDebuggeeRequest; +import com.google.api.services.clouddebugger.v2.model.RegisterDebuggeeResponse; import com.google.api.services.dataflow.Dataflow; import com.google.api.services.dataflow.model.DataflowPackage; import com.google.api.services.dataflow.model.Job; @@ -168,6 +172,8 @@ import java.util.SortedSet; import java.util.TreeSet; +import javax.annotation.Nullable; + /** * A {@link PipelineRunner} that executes the operations in the * pipeline by first translating them to the Dataflow representation @@ -420,6 +426,37 @@ private PCollection applyWindow( return super.apply(new AssignWindows<>(transform), input); } + @Nullable + private void maybeRegisterDebuggee(DataflowPipelineOptions options, String uniquifier) { + if (!options.getEnableCloudDebugger() || options.getDebuggee() != null) { + return; + } + + Clouddebugger debuggerClient = Transport.newClouddebuggerClient(options).build(); + RegisterDebuggeeRequest registerReq = new RegisterDebuggeeRequest(); + registerReq.setDebuggee(new Debuggee() + .setProject(options.getProject()) + .setUniquifier(uniquifier) + .setDescription(uniquifier) + .setAgentVersion("google.com/cloud-dataflow-java/v1")); + + try { + RegisterDebuggeeResponse registerResponse = + debuggerClient.controller().debuggees().register(registerReq).execute(); + Debuggee debuggee = registerResponse.getDebuggee(); + if (debuggee.getStatus() != null && debuggee.getStatus().getIsError()) { + LOG.error("Error registering with debugger: {}", + debuggee.getStatus().getDescription().getFormat()); + return; + } + + options.setDebuggee(debuggee); + } catch (IOException e) { + System.out.println("Unable to register with debugger: " + e); + LOG.error("Unable to register with debugger:", e); + } + } + @Override public DataflowPipelineJob run(Pipeline pipeline) { logWarningIfPCollectionViewHasNonDeterministicKeyCoder(pipeline); @@ -428,9 +465,7 @@ public DataflowPipelineJob run(Pipeline pipeline) { + "related to Google Compute Engine usage and other Google Cloud Services."); List packages = options.getStager().stageFiles(); - JobSpecification jobSpecification = - translator.translate(pipeline, this, packages); - Job newJob = jobSpecification.getJob(); + // Set a unique client_request_id in the CreateJob request. // This is used to ensure idempotence of job creation across retried @@ -442,6 +477,15 @@ public DataflowPipelineJob run(Pipeline pipeline) { int randomNum = new Random().nextInt(9000) + 1000; String requestId = DateTimeFormat.forPattern("YYYYMMddHHmmssmmm").withZone(DateTimeZone.UTC) .print(DateTimeUtils.currentTimeMillis()) + "_" + randomNum; + + // Try to create a debuggee ID. This must happen before the job is translated since it may + // update the options. + DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class); + maybeRegisterDebuggee(dataflowOptions, requestId); + + JobSpecification jobSpecification = + translator.translate(pipeline, this, packages); + Job newJob = jobSpecification.getJob(); newJob.setClientRequestId(requestId); String version = DataflowReleaseInfo.getReleaseInfo().getVersion(); @@ -450,7 +494,6 @@ public DataflowPipelineJob run(Pipeline pipeline) { newJob.getEnvironment().setUserAgent(DataflowReleaseInfo.getReleaseInfo()); // The Dataflow Service may write to the temporary directory directly, so // must be verified. - DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class); if (!Strings.isNullOrEmpty(options.getTempLocation())) { newJob.getEnvironment().setTempStoragePrefix( dataflowOptions.getPathValidator().verifyPath(options.getTempLocation())); diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/Transport.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/Transport.java index 7735a9e01fcc0..a869782d4cb0d 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/Transport.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/Transport.java @@ -23,6 +23,7 @@ import com.google.api.client.json.JsonFactory; import com.google.api.client.json.jackson2.JacksonFactory; import com.google.api.services.bigquery.Bigquery; +import com.google.api.services.clouddebugger.v2.Clouddebugger; import com.google.api.services.dataflow.Dataflow; import com.google.api.services.pubsub.Pubsub; import com.google.api.services.storage.Storage; @@ -148,6 +149,17 @@ public static Dataflow.Builder newDataflowClient(DataflowPipelineOptions options .setGoogleClientRequestInitializer(options.getGoogleApiTrace()); } + public static Clouddebugger.Builder newClouddebuggerClient(DataflowPipelineOptions options) { + return new Clouddebugger.Builder(getTransport(), + getJsonFactory(), + chainHttpRequestInitializer( + options.getGcpCredential(), + // Do not log 404. It clutters the output and is possible even required by the caller. + new RetryHttpRequestInitializer(ImmutableList.of(404)))) + .setApplicationName(options.getAppName()) + .setGoogleClientRequestInitializer(options.getGoogleApiTrace()); + } + /** * Returns a Dataflow client that does not automatically retry failed * requests. From f6c5e9965fd538612ba372f9df5aefa58e76031c Mon Sep 17 00:00:00 2001 From: bchambers Date: Fri, 4 Mar 2016 10:09:56 -0800 Subject: [PATCH 2/3] Round 1 Review - Fail if debugging is requested and we're unable to register - Don't silently retry on 404 --- .../cloud/dataflow/sdk/runners/DataflowPipelineRunner.java | 6 ++---- .../java/com/google/cloud/dataflow/sdk/util/Transport.java | 5 +---- 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java index d2d5aa22de133..340602ee8ac1c 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java @@ -445,15 +445,13 @@ private void maybeRegisterDebuggee(DataflowPipelineOptions options, String uniqu debuggerClient.controller().debuggees().register(registerReq).execute(); Debuggee debuggee = registerResponse.getDebuggee(); if (debuggee.getStatus() != null && debuggee.getStatus().getIsError()) { - LOG.error("Error registering with debugger: {}", + throw new RuntimeException("Unable to register with the debugger: " + debuggee.getStatus().getDescription().getFormat()); - return; } options.setDebuggee(debuggee); } catch (IOException e) { - System.out.println("Unable to register with debugger: " + e); - LOG.error("Unable to register with debugger:", e); + throw new RuntimeException("Unable to register with the debugger: ", e); } } diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/Transport.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/Transport.java index a869782d4cb0d..15fe2863395a9 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/Transport.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/Transport.java @@ -152,10 +152,7 @@ public static Dataflow.Builder newDataflowClient(DataflowPipelineOptions options public static Clouddebugger.Builder newClouddebuggerClient(DataflowPipelineOptions options) { return new Clouddebugger.Builder(getTransport(), getJsonFactory(), - chainHttpRequestInitializer( - options.getGcpCredential(), - // Do not log 404. It clutters the output and is possible even required by the caller. - new RetryHttpRequestInitializer(ImmutableList.of(404)))) + chainHttpRequestInitializer(options.getGcpCredential(), new RetryHttpRequestInitializer())) .setApplicationName(options.getAppName()) .setGoogleClientRequestInitializer(options.getGoogleApiTrace()); } From 78878a83f611a85ada87e3814a804a77c526f6eb Mon Sep 17 00:00:00 2001 From: bchambers Date: Mon, 7 Mar 2016 09:21:06 -0800 Subject: [PATCH 3/3] Review comments. --- .../sdk/runners/DataflowPipelineRunner.java | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java index 340602ee8ac1c..0612cca4d0fda 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java @@ -426,13 +426,21 @@ private PCollection applyWindow( return super.apply(new AssignWindows<>(transform), input); } - @Nullable private void maybeRegisterDebuggee(DataflowPipelineOptions options, String uniquifier) { - if (!options.getEnableCloudDebugger() || options.getDebuggee() != null) { + if (!options.getEnableCloudDebugger()) { return; } + + if (options.getDebuggee() != null) { + throw new RuntimeException("Should not specify the debuggee"); + } Clouddebugger debuggerClient = Transport.newClouddebuggerClient(options).build(); + Debuggee debuggee = registerDebuggee(debuggerClient, uniquifier); + options.setDebuggee(debuggee); + } + + private Debuggee registerDebuggee(Clouddebugger debuggerClient, String uniquifier) { RegisterDebuggeeRequest registerReq = new RegisterDebuggeeRequest(); registerReq.setDebuggee(new Debuggee() .setProject(options.getProject()) @@ -449,7 +457,7 @@ private void maybeRegisterDebuggee(DataflowPipelineOptions options, String uniqu debuggee.getStatus().getDescription().getFormat()); } - options.setDebuggee(debuggee); + return debuggee; } catch (IOException e) { throw new RuntimeException("Unable to register with the debugger: ", e); }