diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/TemplatingDataflowPipelineRunner.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/TemplatingDataflowPipelineRunner.java index d01507be92..f7c3274bd9 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/TemplatingDataflowPipelineRunner.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/TemplatingDataflowPipelineRunner.java @@ -31,6 +31,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.TimeUnit; /** @@ -67,6 +69,13 @@ public boolean failOnJobFileWriteFailure() { public static TemplatingDataflowPipelineRunner fromOptions(PipelineOptions options) { DataflowPipelineDebugOptions dataflowOptions = PipelineOptionsValidator.validate(DataflowPipelineDebugOptions.class, options); + List experiments = dataflowOptions.getExperiments(); + if (experiments == null) { + experiments = new ArrayList<>(); + dataflowOptions.setExperiments(experiments); + } + experiments.add("enable_custom_bigquery_source"); + experiments.add("enable_custom_bigquery_sink"); DataflowPipelineRunner dataflowPipelineRunner = DataflowPipelineRunner.fromOptions(dataflowOptions); checkArgument(!Strings.isNullOrEmpty(dataflowOptions.getDataflowJobFile()),