From ea3ba7eff6456413d10f4c1c88bcdb8111eb190b Mon Sep 17 00:00:00 2001 From: bchambers Date: Thu, 31 Mar 2016 15:18:09 -0700 Subject: [PATCH 1/2] DebuggingWordCount now takes filter as an option Previously it was hard-coded as "Flourish|stomach". Now it is a PipelineOption with that as the default. This allows "breaking" the pipeline by mis-specifying the pattern without changing the code. --- .../dataflow/examples/DebuggingWordCount.java | 20 ++++++++++++++++-- .../src/main/java/DebuggingWordCount.java | 21 +++++++++++++++++-- 2 files changed, 37 insertions(+), 4 deletions(-) diff --git a/examples/src/main/java/com/google/cloud/dataflow/examples/DebuggingWordCount.java b/examples/src/main/java/com/google/cloud/dataflow/examples/DebuggingWordCount.java index 8823dbc323..df4cd74efc 100644 --- a/examples/src/main/java/com/google/cloud/dataflow/examples/DebuggingWordCount.java +++ b/examples/src/main/java/com/google/cloud/dataflow/examples/DebuggingWordCount.java @@ -16,9 +16,10 @@ package com.google.cloud.dataflow.examples; -import com.google.cloud.dataflow.examples.WordCount.WordCountOptions; import com.google.cloud.dataflow.sdk.Pipeline; import com.google.cloud.dataflow.sdk.io.TextIO; +import com.google.cloud.dataflow.sdk.options.Default; +import com.google.cloud.dataflow.sdk.options.Description; import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; import com.google.cloud.dataflow.sdk.testing.DataflowAssert; import com.google.cloud.dataflow.sdk.transforms.Aggregator; @@ -150,6 +151,21 @@ public void processElement(ProcessContext c) { } } + /** + * Options supported by {@link DebuggingWordCount}. + * + *

Inherits standard configuration options and all options defined in + * {@link WordCount.WordCountOptions}. + */ + public static interface WordCountOptions extends WordCount.WordCountOptions { + + @Description("Regex filter pattern to use in DebuggingWordCount. " + + "Only words matching this pattern will be counted.") + @Default.String("Flourish|stomach") + String getFilterPattern(); + void setFilterPattern(String value); + } + public static void main(String[] args) { WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() .as(WordCountOptions.class); @@ -158,7 +174,7 @@ public static void main(String[] args) { PCollection> filteredWords = p.apply(TextIO.Read.named("ReadLines").from(options.getInputFile())) .apply(new WordCount.CountWords()) - .apply(ParDo.of(new FilterTextFn("Flourish|stomach"))); + .apply(ParDo.of(new FilterTextFn(options.getFilterPattern()))); /** * Concept #4: DataflowAssert is a set of convenient PTransforms in the style of diff --git a/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java b/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java index 3cf2bc0dff..299e3b7fe2 100644 --- a/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java +++ b/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java @@ -16,9 +16,11 @@ package ${package}; -import ${package}.WordCount.WordCountOptions; +import ${package}.WordCount; import com.google.cloud.dataflow.sdk.Pipeline; import com.google.cloud.dataflow.sdk.io.TextIO; +import com.google.cloud.dataflow.sdk.options.Default; +import com.google.cloud.dataflow.sdk.options.Description; import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; import com.google.cloud.dataflow.sdk.testing.DataflowAssert; import com.google.cloud.dataflow.sdk.transforms.Aggregator; @@ -149,6 +151,21 @@ public void processElement(ProcessContext c) { } } } + + /** + * Options supported by {@link DebuggingWordCount}. + * + *

Inherits standard configuration options and all options defined in + * {@link WordCount.WordCountOptions}. + */ + public static interface WordCountOptions extends WordCount.WordCountOptions { + + @Description("Regex filter pattern to use in DebuggingWordCount. " + + "Only words matching this pattern will be counted.") + @Default.String("Flourish|stomach") + String getFilterPattern(); + void setFilterPattern(String value); + } public static void main(String[] args) { WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() @@ -158,7 +175,7 @@ public static void main(String[] args) { PCollection> filteredWords = p.apply(TextIO.Read.named("ReadLines").from(options.getInputFile())) .apply(new WordCount.CountWords()) - .apply(ParDo.of(new FilterTextFn("Flourish|stomach"))); + .apply(ParDo.of(new FilterTextFn(options.getFilterPattern()))); /** * Concept #4: DataflowAssert is a set of convenient PTransforms in the style of From cdb7d0b84e89e502ab518005a7e0f658d073c43d Mon Sep 17 00:00:00 2001 From: bchambers Date: Fri, 1 Apr 2016 11:28:04 -0700 Subject: [PATCH 2/2] Add the MaxConditionCost option --- .../sdk/options/CloudDebuggerOptions.java | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/CloudDebuggerOptions.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/CloudDebuggerOptions.java index 2e1ad9451f..16acc9cd25 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/CloudDebuggerOptions.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/CloudDebuggerOptions.java @@ -29,15 +29,23 @@ @Hidden public interface CloudDebuggerOptions { - /** - * Whether to enable the Cloud Debugger snapshot agent for the current job. - */ + /** Whether to enable the Cloud Debugger snapshot agent for the current job. */ @Description("Whether to enable the Cloud Debugger snapshot agent for the current job.") boolean getEnableCloudDebugger(); void setEnableCloudDebugger(boolean enabled); - @Description("The Cloud Debugger debugee to associate with. This should not be set directly.") + /** The Cloud Debugger debuggee to associate with. This should not be set directly. */ + @Description("The Cloud Debugger debuggee to associate with. This should not be set directly.") @Hidden @Nullable Debuggee getDebuggee(); void setDebuggee(Debuggee debuggee); + + /** The maximum cost (as a ratio of CPU time) allowed for evaluating conditional snapshots. */ + @Description( + "The maximum cost (as a ratio of CPU time) allowed for evaluating conditional snapshots. " + + "Should be a double between 0 and 1. " + + "Snapshots will be cancelled if evaluating conditions takes more than this ratio of time.") + @Default.Double(0.01) + double getMaxConditionCost(); + void setMaxConditionCost(double maxConditionCost); }