From f935979de67805b04a6c26233c310794c8cb35dd Mon Sep 17 00:00:00 2001 From: Sam McVeety Date: Wed, 7 Dec 2016 13:07:05 -0800 Subject: [PATCH 1/2] Templated jobs should use custom IO for BigQuery --- .../sdk/runners/TemplatingDataflowPipelineRunner.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/TemplatingDataflowPipelineRunner.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/TemplatingDataflowPipelineRunner.java index d01507be92..6c551216a5 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/TemplatingDataflowPipelineRunner.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/TemplatingDataflowPipelineRunner.java @@ -31,6 +31,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.TimeUnit; /** @@ -67,6 +69,13 @@ public boolean failOnJobFileWriteFailure() { public static TemplatingDataflowPipelineRunner fromOptions(PipelineOptions options) { DataflowPipelineDebugOptions dataflowOptions = PipelineOptionsValidator.validate(DataflowPipelineDebugOptions.class, options); + List experiments = null; + if (dataflowOptions.getExperiments() == null) { + experiments = new ArrayList<>(); + dataflowOptions.setExperiments(experiments); + } + experiments.add("enable_custom_bigquery_source"); + experiments.add("enable_custom_bigquery_sink"); DataflowPipelineRunner dataflowPipelineRunner = DataflowPipelineRunner.fromOptions(dataflowOptions); checkArgument(!Strings.isNullOrEmpty(dataflowOptions.getDataflowJobFile()), From 4a6df8cbb461a74521def86ae5f6659130c727c8 Mon Sep 17 00:00:00 2001 From: Sam McVeety Date: Wed, 7 Dec 2016 15:04:12 -0800 Subject: [PATCH 2/2] Fix assignment --- .../sdk/runners/TemplatingDataflowPipelineRunner.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/TemplatingDataflowPipelineRunner.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/TemplatingDataflowPipelineRunner.java index 6c551216a5..f7c3274bd9 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/TemplatingDataflowPipelineRunner.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/TemplatingDataflowPipelineRunner.java @@ -69,8 +69,8 @@ public boolean failOnJobFileWriteFailure() { public static TemplatingDataflowPipelineRunner fromOptions(PipelineOptions options) { DataflowPipelineDebugOptions dataflowOptions = PipelineOptionsValidator.validate(DataflowPipelineDebugOptions.class, options); - List experiments = null; - if (dataflowOptions.getExperiments() == null) { + List experiments = dataflowOptions.getExperiments(); + if (experiments == null) { experiments = new ArrayList<>(); dataflowOptions.setExperiments(experiments); }