From e6a3c45baa8081886bd4684b33cf374c4bcf9aff Mon Sep 17 00:00:00 2001 From: Sam McVeety Date: Wed, 19 Apr 2017 20:34:01 -0700 Subject: [PATCH 01/13] Provide implementation of BQIO.Read that does not use Source API --- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 168 +++++++++++++++--- .../sdk/io/gcp/bigquery/BigQueryOptions.java | 5 + .../io/gcp/bigquery/BigQuerySourceBase.java | 24 ++- .../gcp/bigquery/PassThroughThenCleanup.java | 12 +- .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 9 +- .../io/gcp/bigquery/FakeDatasetService.java | 1 + 6 files changed, 183 insertions(+), 36 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 6a93279f7171..c7541edc65a3 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -45,6 +45,7 @@ import javax.annotation.Nullable; import org.apache.beam.sdk.PipelineRunner; import org.apache.beam.sdk.coders.CannotProvideCoderException; +import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.VoidCoder; @@ -54,6 +55,7 @@ import org.apache.beam.sdk.io.fs.MoveOptions; import org.apache.beam.sdk.io.fs.ResolveOptions; import org.apache.beam.sdk.io.fs.ResourceId; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.CreateJsonTableRefFromUuid; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.JsonTableRefToTableRef; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableRefToJson; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableSchemaToJsonSchema; @@ -67,8 +69,15 @@ import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; +import org.apache.beam.sdk.runners.PipelineRunner; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.Transport; import org.apache.beam.sdk.util.gcsfs.GcsPath; @@ -77,6 +86,10 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollection.IsBounded; import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; import org.apache.beam.sdk.values.ValueInSingleWindow; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -402,6 +415,37 @@ Read withTestServices(BigQueryServices testServices) { return toBuilder().setBigQueryServices(testServices).build(); } + private BigQuerySourceBase applySourceTransform( + Pipeline p, String jobUuid, String extractDestinationDir) { + BigQuerySourceBase source; + BigQueryOptions bqOptions = p.getOptions().as(BigQueryOptions.class); + final BigQueryServices bqServices = getBigQueryServices(); + final String executingProject = bqOptions.getProject(); + if (getQuery() != null + && (!getQuery().isAccessible() || !Strings.isNullOrEmpty(getQuery().get()))) { + source = + BigQueryQuerySource.create( + StaticValueProvider.of(jobUuid), + getQuery(), + NestedValueProvider.of( + StaticValueProvider.of(jobUuid), + new CreateJsonTableRefFromUuid(executingProject)), + getFlattenResults(), + getUseLegacySql(), + extractDestinationDir, + getBigQueryServices()); + } else { + source = + BigQueryTableSource.create( + StaticValueProvider.of(jobUuid), + getTableProvider(), + extractDestinationDir, + getBigQueryServices(), + StaticValueProvider.of(executingProject)); + } + return source; + } + @Override public void validate(PipelineOptions options) { // Even if existence validation is disabled, we need to make sure that the BigQueryIO @@ -483,29 +527,104 @@ public void validate(PipelineOptions options) { @Override public PCollection expand(PBegin input) { - final String stepUuid = BigQueryHelpers.randomUUIDString(); - BoundedSource source; + final Pipeline p = input.getPipeline(); + String jobUuid = BigQueryHelpers.randomUUIDString(); + BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class); + final String executingProject = bqOptions.getProject(); + final String extractDestinationDir; + String tempLocation = bqOptions.getTempLocation(); + try { + IOChannelFactory factory = IOChannelUtils.getFactory(tempLocation); + extractDestinationDir = factory.resolve(tempLocation, jobUuid); + } catch (IOException e) { + throw new RuntimeException( + String.format("Failed to resolve extract destination directory in %s", tempLocation)); + } - if (getQuery() != null - && (!getQuery().isAccessible() || !Strings.isNullOrEmpty(getQuery().get()))) { - source = - BigQueryQuerySource.create( - stepUuid, - getQuery(), - getFlattenResults(), - getUseLegacySql(), - getBigQueryServices()); + final PCollectionView jobIdTokenView; + PCollection jobIdTokenCollection = null; + if (!bqOptions.getUseNewSource()) { + // Create a singleton job ID token at construction time. + jobIdTokenView = p + .apply("TriggerIdCreation", Create.of(jobUuid)) + .apply(View.asSingleton()); } else { - source = - BigQueryTableSource.create( - stepUuid, - getTableProvider(), - getBigQueryServices()); + // Create a singleton job ID token at execution time. + jobIdTokenCollection = p + .apply("TriggerIdCreation", Create.of("ignored")) + .apply("CreateJobId", MapElements.via( + new SimpleFunction() { + @Override + public String apply(String input) { + return BigQueryHelpers.randomUUIDString(); + } + })); + jobIdTokenView = jobIdTokenCollection + .apply(View.asSingleton()); + } + + PCollection rows; + if (!bqOptions.getUseNewSource()) { + // Apply the traditional Source model. + rows = p.apply( + org.apache.beam.sdk.io.Read.from(applySourceTransform( + p, jobUuid, extractDestinationDir))) + .setCoder(getDefaultOutputCoder()); + } else { + // Apply the DoFn version. + final TupleTag filesTag = + new TupleTag(){}; + final TupleTag tableSchemaTag = + new TupleTag(){}; + PCollectionTuple tuple = jobIdTokenCollection.apply("RunCreateJob", ParDo + .of( + new DoFn() { + @ProcessElement + public void processElement(ProcessContext c) + throws Exception { + String jobUuid = c.element(); + BigQuerySourceBase source = applySourceTransform( + p, jobUuid, extractDestinationDir); + TableSchema schema = source.getSchema(p.getOptions()); + c.output(tableSchemaTag, schema); + List files = source.extractFiles(p.getOptions()); + for (String file : files) { + c.output(file); + } + } + }) + .withOutputTags(filesTag, TupleTagList.of(tableSchemaTag))); + final PCollectionView schemaView = tuple.get(tableSchemaTag) + .apply(View.asSingleton()); + rows = tuple.get(filesTag) + //.apply(Reshuffle.of()) + .apply("ReadFiles", ParDo.of( + new DoFn() { + @ProcessElement + public void processElement(ProcessContext c) + throws Exception { + TableSchema schema = c.sideInput(schemaView); + BigQuerySourceBase source = applySourceTransform(p, null, null); + List> sources = + source.createSources(ImmutableList.of(c.element()), schema); + for (BoundedSource avroSource : sources) { + BoundedSource.BoundedReader reader = + avroSource.createReader(p.getOptions()); + if (reader.start()) { + c.output(reader.getCurrent()); + while (reader.advance()) { + c.output(reader.getCurrent()); + } + } + } + } + }).withSideInputs(schemaView)); } PassThroughThenCleanup.CleanupOperation cleanupOperation = new PassThroughThenCleanup.CleanupOperation() { @Override - void cleanup(PipelineOptions options) throws Exception { + void cleanup(DoFn.ProcessContext c) throws Exception { + PipelineOptions options = c.getPipelineOptions(); BigQueryOptions bqOptions = options.as(BigQueryOptions.class); final String extractDestinationDir = resolveTempLocation( @@ -515,9 +634,8 @@ void cleanup(PipelineOptions options) throws Exception { JobReference jobRef = new JobReference() - .setProjectId(bqOptions.getProject()) - .setJobId( - getExtractJobId(createJobIdToken(bqOptions.getJobName(), stepUuid))); + .setProjectId(executingProject) + .setJobId(getExtractJobId((String) c.sideInput(jobIdTokenView))); Job extractJob = getBigQueryServices().getJobService(bqOptions).getJob(jobRef); @@ -531,10 +649,8 @@ void cleanup(PipelineOptions options) throws Exception { } } }; - return input.getPipeline() - .apply(org.apache.beam.sdk.io.Read.from(source)) - .setCoder(getDefaultOutputCoder()) - .apply(new PassThroughThenCleanup(cleanupOperation)); + return rows.apply( + new PassThroughThenCleanup(cleanupOperation, jobIdTokenView)); } @Override @@ -575,6 +691,10 @@ public TableReference getTable() { } } + static String getExtractJobId(String jobIdToken) { + return jobIdToken + "-extract"; + } + static String getExtractDestinationUri(String extractDestinationDir) { return String.format("%s/%s", extractDestinationDir, "*.avro"); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java index d3116ebaec4d..2bc58634a522 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java @@ -36,4 +36,9 @@ public interface BigQueryOptions extends ApplicationNameOptions, GcpOptions, @Default.String("bigquery.googleapis.com/cloud_dataflow") String getTempDatasetId(); void setTempDatasetId(String value); + + @Description("Whether to use the SDF-style (experimental) source.") + @Default.Boolean(false) + Boolean getUseNewSource(); + void setUseNewSource(Boolean value); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java index 945c7d40b19b..f4b5ef95a4f0 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java @@ -75,6 +75,24 @@ abstract class BigQuerySourceBase extends BoundedSource { this.bqServices = checkNotNull(bqServices, "bqServices"); } + protected TableSchema getSchema(PipelineOptions options) throws Exception { + BigQueryOptions bqOptions = options.as(BigQueryOptions.class); + TableReference tableToExtract = getTableToExtract(bqOptions); + TableSchema tableSchema = bqServices.getDatasetService(bqOptions) + .getTable(tableToExtract).getSchema(); + return tableSchema; + } + + protected List extractFiles(PipelineOptions options) throws Exception { + BigQueryOptions bqOptions = options.as(BigQueryOptions.class); + TableReference tableToExtract = getTableToExtract(bqOptions); + JobService jobService = bqServices.getJobService(bqOptions); + String extractJobId = BigQueryIO.getExtractJobId(jobIdToken.get()); + List tempFiles = executeExtract(extractJobId, tableToExtract, jobService); + cleanupTempResource(bqOptions); + return tempFiles; + } + @Override public List> split( long desiredBundleSizeBytes, PipelineOptions options) throws Exception { @@ -98,7 +116,7 @@ public List> split( .getTable(tableToExtract).getSchema(); cleanupTempResource(bqOptions); - cachedSplitResult = checkNotNull(createSources(tempFiles, tableSchema)); + cachedSplitResult = checkNotNull(createSources(extractFiles(options), getSchema(options))); } return cachedSplitResult; } @@ -147,8 +165,8 @@ private List executeExtract( return BigQueryIO.getExtractFilePaths(extractDestinationDir, extractJob); } - private List> createSources( - List files, TableSchema tableSchema) throws IOException, InterruptedException { + List> createSources( + List files, TableSchema tableSchema) throws IOException, InterruptedException { final String jsonSchema = BigQueryIO.JSON_FACTORY.toString(tableSchema); SerializableFunction function = diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PassThroughThenCleanup.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PassThroughThenCleanup.java index f49c4e1954fa..426d90ed7cb3 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PassThroughThenCleanup.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PassThroughThenCleanup.java @@ -21,7 +21,6 @@ import com.google.common.annotations.VisibleForTesting; import java.io.Serializable; import org.apache.beam.sdk.coders.VoidCoder; -import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; @@ -41,9 +40,12 @@ class PassThroughThenCleanup extends PTransform, PCollection> { private CleanupOperation cleanupOperation; + private PCollectionView sideInput; - PassThroughThenCleanup(CleanupOperation cleanupOperation) { + PassThroughThenCleanup(CleanupOperation cleanupOperation, + PCollectionView sideInput) { this.cleanupOperation = cleanupOperation; + this.sideInput = sideInput; } @Override @@ -64,9 +66,9 @@ public PCollection expand(PCollection input) { @ProcessElement public void processElement(ProcessContext c) throws Exception { - c.element().cleanup(c.getPipelineOptions()); + c.element().cleanup(c); } - }).withSideInputs(cleanupSignalView)); + }).withSideInputs(sideInput, cleanupSignalView)); return outputs.get(mainOutput); } @@ -79,6 +81,6 @@ public void processElement(ProcessContext c) { } abstract static class CleanupOperation implements Serializable { - abstract void cleanup(PipelineOptions options) throws Exception; + abstract void cleanup(DoFn.ProcessContext options) throws Exception; } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index bfd260a30879..f30ebdb49531 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -1641,6 +1641,7 @@ public void testBigQueryNoTableQuerySourceInitSplit() throws Exception { StaticValueProvider.of(query), true /* flattenResults */, true /* useLegacySql */, fakeBqServices); + PipelineOptions options = PipelineOptionsFactory.create(); options.setTempLocation(baseDir.toString()); List read = convertBigDecimaslToLong( @@ -1723,9 +1724,9 @@ public void testPassThroughThenCleanup() throws Exception { .apply(Create.of(1, 2, 3)) .apply(new PassThroughThenCleanup(new CleanupOperation() { @Override - void cleanup(PipelineOptions options) throws Exception { + void cleanup(DoFn.ProcessContext c) throws Exception { // no-op - }})); + }}, p.apply("Create1", Create.of("")).apply(View.asSingleton()))); PAssert.that(output).containsInAnyOrder(1, 2, 3); @@ -1738,9 +1739,9 @@ public void testPassThroughThenCleanupExecuted() throws Exception { p.apply(Create.empty(VarIntCoder.of())) .apply(new PassThroughThenCleanup(new CleanupOperation() { @Override - void cleanup(PipelineOptions options) throws Exception { + void cleanup(DoFn.ProcessContext c) throws Exception { throw new RuntimeException("cleanup executed"); - }})); + }}, p.apply("Create1", Create.of("")).apply(View.asSingleton()))); thrown.expect(RuntimeException.class); thrown.expectMessage("cleanup executed"); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java index 6ee53404eb49..30d0749dbf65 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java @@ -273,6 +273,7 @@ Map> getInsertErrors() { void throwNotFound(String format, Object... args) throws IOException { throw new IOException( + String.format(format, args), new GoogleJsonResponseException.Builder(404, String.format(format, args), new HttpHeaders()).build()); } From f821e958c0d49d5ac6098dccad90ea1c519f39db Mon Sep 17 00:00:00 2001 From: Sam McVeety Date: Wed, 19 Apr 2017 20:51:08 -0700 Subject: [PATCH 02/13] Make tests pass. --- .../beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java index f4b5ef95a4f0..8a16a2b795c7 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java @@ -104,19 +104,14 @@ public List> split( BigQueryOptions bqOptions = options.as(BigQueryOptions.class); TableReference tableToExtract = getTableToExtract(bqOptions); JobService jobService = bqServices.getJobService(bqOptions); - - final String extractDestinationDir = - resolveTempLocation(bqOptions.getTempLocation(), "BigQueryExtractTemp", stepUuid); - - String extractJobId = getExtractJobId(createJobIdToken(options.getJobName(), stepUuid)); - List tempFiles = executeExtract( - extractJobId, tableToExtract, jobService, bqOptions.getProject(), extractDestinationDir); + String extractJobId = BigQueryIO.getExtractJobId(jobIdToken.get()); + List tempFiles = executeExtract(extractJobId, tableToExtract, jobService); TableSchema tableSchema = bqServices.getDatasetService(bqOptions) .getTable(tableToExtract).getSchema(); cleanupTempResource(bqOptions); - cachedSplitResult = checkNotNull(createSources(extractFiles(options), getSchema(options))); + cachedSplitResult = checkNotNull(createSources(tempFiles, tableSchema)); } return cachedSplitResult; } From bc08c697a53093e7c60b81c7d61d980749a3bf0c Mon Sep 17 00:00:00 2001 From: Sam McVeety Date: Wed, 19 Apr 2017 21:37:46 -0700 Subject: [PATCH 03/13] Add reshuffle --- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 15 +++++++++++++-- .../sdk/io/gcp/bigquery/BigQuerySourceBase.java | 5 +---- 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index c7541edc65a3..b102c3c39850 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -77,8 +77,13 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.transforms.Values; import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.transforms.WithKeys; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.util.IOChannelFactory; +import org.apache.beam.sdk.util.IOChannelUtils; +import org.apache.beam.sdk.util.Reshuffle; import org.apache.beam.sdk.util.Transport; import org.apache.beam.sdk.util.gcsfs.GcsPath; import org.apache.beam.sdk.values.KV; @@ -597,14 +602,20 @@ public void processElement(ProcessContext c) final PCollectionView schemaView = tuple.get(tableSchemaTag) .apply(View.asSingleton()); rows = tuple.get(filesTag) - //.apply(Reshuffle.of()) + .apply(WithKeys.of(new SerializableFunction() { + public String apply(String s) { + return s; + } + })) + .apply(Reshuffle.of()) + .apply(Values.create()) .apply("ReadFiles", ParDo.of( new DoFn() { @ProcessElement public void processElement(ProcessContext c) throws Exception { TableSchema schema = c.sideInput(schemaView); - BigQuerySourceBase source = applySourceTransform(p, null, null); + BigQuerySourceBase source = applySourceTransform(p, "unused", "unused"); List> sources = source.createSources(ImmutableList.of(c.element()), schema); for (BoundedSource avroSource : sources) { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java index 8a16a2b795c7..dbc78a1729f1 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java @@ -89,7 +89,6 @@ protected List extractFiles(PipelineOptions options) throws Exception { JobService jobService = bqServices.getJobService(bqOptions); String extractJobId = BigQueryIO.getExtractJobId(jobIdToken.get()); List tempFiles = executeExtract(extractJobId, tableToExtract, jobService); - cleanupTempResource(bqOptions); return tempFiles; } @@ -106,9 +105,7 @@ public List> split( JobService jobService = bqServices.getJobService(bqOptions); String extractJobId = BigQueryIO.getExtractJobId(jobIdToken.get()); List tempFiles = executeExtract(extractJobId, tableToExtract, jobService); - - TableSchema tableSchema = bqServices.getDatasetService(bqOptions) - .getTable(tableToExtract).getSchema(); + TableSchema tableSchema = getSchema(options); cleanupTempResource(bqOptions); cachedSplitResult = checkNotNull(createSources(tempFiles, tableSchema)); From d9925e8e03cd13ca57f93f675582fb75f0e3afe8 Mon Sep 17 00:00:00 2001 From: Sam McVeety Date: Wed, 19 Apr 2017 22:00:29 -0700 Subject: [PATCH 04/13] Iterate on testing. --- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 24 +++++++++---------- .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 1 + 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index b102c3c39850..9e747f2b1cbb 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -421,10 +421,9 @@ Read withTestServices(BigQueryServices testServices) { } private BigQuerySourceBase applySourceTransform( - Pipeline p, String jobUuid, String extractDestinationDir) { + PipelineOptions options, String jobUuid, String extractDestinationDir) { BigQuerySourceBase source; - BigQueryOptions bqOptions = p.getOptions().as(BigQueryOptions.class); - final BigQueryServices bqServices = getBigQueryServices(); + BigQueryOptions bqOptions = options.as(BigQueryOptions.class); final String executingProject = bqOptions.getProject(); if (getQuery() != null && (!getQuery().isAccessible() || !Strings.isNullOrEmpty(getQuery().get()))) { @@ -532,7 +531,7 @@ public void validate(PipelineOptions options) { @Override public PCollection expand(PBegin input) { - final Pipeline p = input.getPipeline(); + Pipeline p = input.getPipeline(); String jobUuid = BigQueryHelpers.randomUUIDString(); BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class); final String executingProject = bqOptions.getProject(); @@ -552,7 +551,7 @@ public PCollection expand(PBegin input) { // Create a singleton job ID token at construction time. jobIdTokenView = p .apply("TriggerIdCreation", Create.of(jobUuid)) - .apply(View.asSingleton()); + .apply("ViewId", View.asSingleton()); } else { // Create a singleton job ID token at execution time. jobIdTokenCollection = p @@ -565,7 +564,7 @@ public String apply(String input) { } })); jobIdTokenView = jobIdTokenCollection - .apply(View.asSingleton()); + .apply("ViewId", View.asSingleton()); } PCollection rows; @@ -573,7 +572,7 @@ public String apply(String input) { // Apply the traditional Source model. rows = p.apply( org.apache.beam.sdk.io.Read.from(applySourceTransform( - p, jobUuid, extractDestinationDir))) + p.getOptions(), jobUuid, extractDestinationDir))) .setCoder(getDefaultOutputCoder()); } else { // Apply the DoFn version. @@ -589,10 +588,10 @@ public void processElement(ProcessContext c) throws Exception { String jobUuid = c.element(); BigQuerySourceBase source = applySourceTransform( - p, jobUuid, extractDestinationDir); - TableSchema schema = source.getSchema(p.getOptions()); + c.getPipelineOptions(), jobUuid, extractDestinationDir); + TableSchema schema = source.getSchema(c.getPipelineOptions()); c.output(tableSchemaTag, schema); - List files = source.extractFiles(p.getOptions()); + List files = source.extractFiles(c.getPipelineOptions()); for (String file : files) { c.output(file); } @@ -615,12 +614,13 @@ public String apply(String s) { public void processElement(ProcessContext c) throws Exception { TableSchema schema = c.sideInput(schemaView); - BigQuerySourceBase source = applySourceTransform(p, "unused", "unused"); + BigQuerySourceBase source = applySourceTransform( + c.getPipelineOptions(), "unused", "unused"); List> sources = source.createSources(ImmutableList.of(c.element()), schema); for (BoundedSource avroSource : sources) { BoundedSource.BoundedReader reader = - avroSource.createReader(p.getOptions()); + avroSource.createReader(c.getPipelineOptions()); if (reader.start()) { c.output(reader.getCurrent()); while (reader.advance()) { diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index f30ebdb49531..c7ff12a59731 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -403,6 +403,7 @@ public void testBuildSourceWithTableAndSqlDialect() { public void testReadFromTable() throws IOException, InterruptedException { BigQueryOptions bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class); bqOptions.setProject("defaultproject"); + bqOptions.setUseNewSource(Boolean.TRUE); bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath()); Job job = new Job(); From a4c82e86f33d32d0a3b4fcc12cf9976f901ad13c Mon Sep 17 00:00:00 2001 From: Sam McVeety Date: Wed, 19 Apr 2017 22:18:54 -0700 Subject: [PATCH 05/13] Avoid coder issues for now. --- .../java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java | 3 +++ .../org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 9e747f2b1cbb..0543f43ccbc2 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -48,6 +48,8 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.StringDelegateCoder; +import org.apache.beam.sdk.coders.TableRowJsonCoder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; import org.apache.beam.sdk.io.BoundedSource; @@ -599,6 +601,7 @@ public void processElement(ProcessContext c) }) .withOutputTags(filesTag, TupleTagList.of(tableSchemaTag))); final PCollectionView schemaView = tuple.get(tableSchemaTag) + .setCoder(StringDelegateCoder.of(TableSchema.class)) .apply(View.asSingleton()); rows = tuple.get(filesTag) .apply(WithKeys.of(new SerializableFunction() { diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index c7ff12a59731..efc2f63319cb 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -403,7 +403,7 @@ public void testBuildSourceWithTableAndSqlDialect() { public void testReadFromTable() throws IOException, InterruptedException { BigQueryOptions bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class); bqOptions.setProject("defaultproject"); - bqOptions.setUseNewSource(Boolean.TRUE); + // bqOptions.setUseNewSource(Boolean.TRUE); bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath()); Job job = new Job(); From d476cb9386bfb61d2be263778f137762e51f0449 Mon Sep 17 00:00:00 2001 From: Sam McVeety Date: Thu, 20 Apr 2017 07:41:25 -0700 Subject: [PATCH 06/13] Fix coder issues and improve tests. --- .../beam/sdk/coders/TableSchemaJsonCoder.java | 89 +++++++++++++++++++ .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 4 +- .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 14 ++- 3 files changed, 103 insertions(+), 4 deletions(-) create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TableSchemaJsonCoder.java diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TableSchemaJsonCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TableSchemaJsonCoder.java new file mode 100644 index 000000000000..02fcd26227dc --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TableSchemaJsonCoder.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.coders; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.google.api.services.bigquery.model.TableSchema; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import org.apache.beam.sdk.values.TypeDescriptor; + +/** + * A {@link Coder} that encodes BigQuery {@link TableSchema} objects in their native JSON format. + */ +public class TableSchemaJsonCoder extends AtomicCoder { + + @JsonCreator + public static TableSchemaJsonCoder of() { + return INSTANCE; + } + + @Override + public void encode(TableSchema value, OutputStream outStream, Context context) + throws IOException { + String strValue = MAPPER.writeValueAsString(value); + StringUtf8Coder.of().encode(strValue, outStream, context); + } + + @Override + public TableSchema decode(InputStream inStream, Context context) + throws IOException { + String strValue = StringUtf8Coder.of().decode(inStream, context); + return MAPPER.readValue(strValue, TableSchema.class); + } + + @Override + protected long getEncodedElementByteSize(TableSchema value, Context context) + throws Exception { + String strValue = MAPPER.writeValueAsString(value); + return StringUtf8Coder.of().getEncodedElementByteSize(strValue, context); + } + + ///////////////////////////////////////////////////////////////////////////// + + // FAIL_ON_EMPTY_BEANS is disabled in order to handle null values in + // TableSchema. + private static final ObjectMapper MAPPER = + new ObjectMapper().disable(SerializationFeature.FAIL_ON_EMPTY_BEANS); + + private static final TableSchemaJsonCoder INSTANCE = new TableSchemaJsonCoder(); + private static final TypeDescriptor TYPE_DESCRIPTOR = + new TypeDescriptor() {}; + + private TableSchemaJsonCoder() { } + + /** + * {@inheritDoc} + * + * @throws NonDeterministicException always. A {@link TableSchema} can hold arbitrary + * {@link Object} instances, which makes the encoding non-deterministic. + */ + @Override + public void verifyDeterministic() throws NonDeterministicException { + throw new NonDeterministicException(this, + "TableCell can hold arbitrary instances, which may be non-deterministic."); + } + + @Override + public TypeDescriptor getEncodedTypeDescriptor() { + return TYPE_DESCRIPTOR; + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 0543f43ccbc2..173613bd19f5 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -48,8 +48,8 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.coders.StringDelegateCoder; import org.apache.beam.sdk.coders.TableRowJsonCoder; +import org.apache.beam.sdk.coders.TableSchemaJsonCoder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; import org.apache.beam.sdk.io.BoundedSource; @@ -601,7 +601,7 @@ public void processElement(ProcessContext c) }) .withOutputTags(filesTag, TupleTagList.of(tableSchemaTag))); final PCollectionView schemaView = tuple.get(tableSchemaTag) - .setCoder(StringDelegateCoder.of(TableSchema.class)) + .setCoder(TableSchemaJsonCoder.of()) .apply(View.asSingleton()); rows = tuple.get(filesTag) .apply(WithKeys.of(new SerializableFunction() { diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index efc2f63319cb..89d0e220cc5e 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -400,10 +400,20 @@ public void testBuildSourceWithTableAndSqlDialect() { } @Test - public void testReadFromTable() throws IOException, InterruptedException { + public void testReadFromTableOldSource() throws IOException, InterruptedException { + testReadFromTable(false); + } + + @Test + public void testReadFromTableNewSource() throws IOException, InterruptedException { + testReadFromTable(true); + } + + private void testReadFromTable(boolean useNewSource) + throws IOException, InterruptedException { BigQueryOptions bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class); bqOptions.setProject("defaultproject"); - // bqOptions.setUseNewSource(Boolean.TRUE); + bqOptions.setUseNewSource(useNewSource); bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath()); Job job = new Job(); From 3d7b6d66a4726a14e7e0e450027ed844217b9d0e Mon Sep 17 00:00:00 2001 From: Sam McVeety Date: Thu, 20 Apr 2017 14:23:16 -0700 Subject: [PATCH 07/13] Pass schema as String instead of adding a coder. --- .../beam/sdk/coders/TableSchemaJsonCoder.java | 89 ------------------- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 16 ++-- 2 files changed, 8 insertions(+), 97 deletions(-) delete mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TableSchemaJsonCoder.java diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TableSchemaJsonCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TableSchemaJsonCoder.java deleted file mode 100644 index 02fcd26227dc..000000000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TableSchemaJsonCoder.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.coders; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.SerializationFeature; -import com.google.api.services.bigquery.model.TableSchema; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import org.apache.beam.sdk.values.TypeDescriptor; - -/** - * A {@link Coder} that encodes BigQuery {@link TableSchema} objects in their native JSON format. - */ -public class TableSchemaJsonCoder extends AtomicCoder { - - @JsonCreator - public static TableSchemaJsonCoder of() { - return INSTANCE; - } - - @Override - public void encode(TableSchema value, OutputStream outStream, Context context) - throws IOException { - String strValue = MAPPER.writeValueAsString(value); - StringUtf8Coder.of().encode(strValue, outStream, context); - } - - @Override - public TableSchema decode(InputStream inStream, Context context) - throws IOException { - String strValue = StringUtf8Coder.of().decode(inStream, context); - return MAPPER.readValue(strValue, TableSchema.class); - } - - @Override - protected long getEncodedElementByteSize(TableSchema value, Context context) - throws Exception { - String strValue = MAPPER.writeValueAsString(value); - return StringUtf8Coder.of().getEncodedElementByteSize(strValue, context); - } - - ///////////////////////////////////////////////////////////////////////////// - - // FAIL_ON_EMPTY_BEANS is disabled in order to handle null values in - // TableSchema. - private static final ObjectMapper MAPPER = - new ObjectMapper().disable(SerializationFeature.FAIL_ON_EMPTY_BEANS); - - private static final TableSchemaJsonCoder INSTANCE = new TableSchemaJsonCoder(); - private static final TypeDescriptor TYPE_DESCRIPTOR = - new TypeDescriptor() {}; - - private TableSchemaJsonCoder() { } - - /** - * {@inheritDoc} - * - * @throws NonDeterministicException always. A {@link TableSchema} can hold arbitrary - * {@link Object} instances, which makes the encoding non-deterministic. - */ - @Override - public void verifyDeterministic() throws NonDeterministicException { - throw new NonDeterministicException(this, - "TableCell can hold arbitrary instances, which may be non-deterministic."); - } - - @Override - public TypeDescriptor getEncodedTypeDescriptor() { - return TYPE_DESCRIPTOR; - } -} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 173613bd19f5..24e79f11b762 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -49,7 +49,6 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.TableRowJsonCoder; -import org.apache.beam.sdk.coders.TableSchemaJsonCoder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; import org.apache.beam.sdk.io.BoundedSource; @@ -580,8 +579,8 @@ public String apply(String input) { // Apply the DoFn version. final TupleTag filesTag = new TupleTag(){}; - final TupleTag tableSchemaTag = - new TupleTag(){}; + final TupleTag tableSchemaTag = + new TupleTag(){}; PCollectionTuple tuple = jobIdTokenCollection.apply("RunCreateJob", ParDo .of( new DoFn() { @@ -591,7 +590,8 @@ public void processElement(ProcessContext c) String jobUuid = c.element(); BigQuerySourceBase source = applySourceTransform( c.getPipelineOptions(), jobUuid, extractDestinationDir); - TableSchema schema = source.getSchema(c.getPipelineOptions()); + String schema = BigQueryHelpers.toJsonString( + source.getSchema(c.getPipelineOptions())); c.output(tableSchemaTag, schema); List files = source.extractFiles(c.getPipelineOptions()); for (String file : files) { @@ -600,9 +600,8 @@ public void processElement(ProcessContext c) } }) .withOutputTags(filesTag, TupleTagList.of(tableSchemaTag))); - final PCollectionView schemaView = tuple.get(tableSchemaTag) - .setCoder(TableSchemaJsonCoder.of()) - .apply(View.asSingleton()); + final PCollectionView schemaView = tuple.get(tableSchemaTag) + .apply(View.asSingleton()); rows = tuple.get(filesTag) .apply(WithKeys.of(new SerializableFunction() { public String apply(String s) { @@ -616,7 +615,8 @@ public String apply(String s) { @ProcessElement public void processElement(ProcessContext c) throws Exception { - TableSchema schema = c.sideInput(schemaView); + TableSchema schema = BigQueryHelpers.fromJsonString( + c.sideInput(schemaView), TableSchema.class); BigQuerySourceBase source = applySourceTransform( c.getPipelineOptions(), "unused", "unused"); List> sources = From b2f6c43de5b147a79f341c218da632df84f1424a Mon Sep 17 00:00:00 2001 From: Sam McVeety Date: Mon, 26 Jun 2017 06:47:02 -0700 Subject: [PATCH 08/13] Iterating toward a workable version --- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 71 +++++++------------ .../io/gcp/bigquery/BigQuerySourceBase.java | 21 ++++-- .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 2 +- 3 files changed, 40 insertions(+), 54 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 24e79f11b762..948b764f7ec1 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -19,7 +19,6 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; -import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.createJobIdToken; import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.getExtractJobId; import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.resolveTempLocation; @@ -43,12 +42,11 @@ import java.util.Map; import java.util.regex.Pattern; import javax.annotation.Nullable; +import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineRunner; import org.apache.beam.sdk.coders.CannotProvideCoderException; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.coders.TableRowJsonCoder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; import org.apache.beam.sdk.io.BoundedSource; @@ -56,7 +54,6 @@ import org.apache.beam.sdk.io.fs.MoveOptions; import org.apache.beam.sdk.io.fs.ResolveOptions; import org.apache.beam.sdk.io.fs.ResourceId; -import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.CreateJsonTableRefFromUuid; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.JsonTableRefToTableRef; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableRefToJson; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableSchemaToJsonSchema; @@ -70,28 +67,24 @@ import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; -import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.Reshuffle; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.transforms.Values; import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.WithKeys; import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.util.IOChannelFactory; -import org.apache.beam.sdk.util.IOChannelUtils; -import org.apache.beam.sdk.util.Reshuffle; import org.apache.beam.sdk.util.Transport; import org.apache.beam.sdk.util.gcsfs.GcsPath; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollection.IsBounded; -import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; @@ -421,32 +414,23 @@ Read withTestServices(BigQueryServices testServices) { return toBuilder().setBigQueryServices(testServices).build(); } - private BigQuerySourceBase applySourceTransform( - PipelineOptions options, String jobUuid, String extractDestinationDir) { + private BigQuerySourceBase applySourceTransform(String jobUuid) { BigQuerySourceBase source; - BigQueryOptions bqOptions = options.as(BigQueryOptions.class); - final String executingProject = bqOptions.getProject(); if (getQuery() != null && (!getQuery().isAccessible() || !Strings.isNullOrEmpty(getQuery().get()))) { source = BigQueryQuerySource.create( - StaticValueProvider.of(jobUuid), + jobUuid, getQuery(), - NestedValueProvider.of( - StaticValueProvider.of(jobUuid), - new CreateJsonTableRefFromUuid(executingProject)), getFlattenResults(), getUseLegacySql(), - extractDestinationDir, getBigQueryServices()); } else { source = BigQueryTableSource.create( - StaticValueProvider.of(jobUuid), + jobUuid, getTableProvider(), - extractDestinationDir, - getBigQueryServices(), - StaticValueProvider.of(executingProject)); + getBigQueryServices()); } return source; } @@ -533,22 +517,14 @@ public void validate(PipelineOptions options) { @Override public PCollection expand(PBegin input) { Pipeline p = input.getPipeline(); - String jobUuid = BigQueryHelpers.randomUUIDString(); - BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class); - final String executingProject = bqOptions.getProject(); - final String extractDestinationDir; - String tempLocation = bqOptions.getTempLocation(); - try { - IOChannelFactory factory = IOChannelUtils.getFactory(tempLocation); - extractDestinationDir = factory.resolve(tempLocation, jobUuid); - } catch (IOException e) { - throw new RuntimeException( - String.format("Failed to resolve extract destination directory in %s", tempLocation)); - } + final String jobUuid = BigQueryHelpers.randomUUIDString(); final PCollectionView jobIdTokenView; PCollection jobIdTokenCollection = null; - if (!bqOptions.getUseNewSource()) { + // TODO: Fix this. + // final boolean useNewSource = bqOptions.getUseNewSource(); + final boolean useNewSource = false; + if (!useNewSource) { // Create a singleton job ID token at construction time. jobIdTokenView = p .apply("TriggerIdCreation", Create.of(jobUuid)) @@ -569,11 +545,10 @@ public String apply(String input) { } PCollection rows; - if (!bqOptions.getUseNewSource()) { + if (!useNewSource) { // Apply the traditional Source model. rows = p.apply( - org.apache.beam.sdk.io.Read.from(applySourceTransform( - p.getOptions(), jobUuid, extractDestinationDir))) + org.apache.beam.sdk.io.Read.from(applySourceTransform(jobUuid))) .setCoder(getDefaultOutputCoder()); } else { // Apply the DoFn version. @@ -589,13 +564,13 @@ public void processElement(ProcessContext c) throws Exception { String jobUuid = c.element(); BigQuerySourceBase source = applySourceTransform( - c.getPipelineOptions(), jobUuid, extractDestinationDir); + jobUuid); String schema = BigQueryHelpers.toJsonString( source.getSchema(c.getPipelineOptions())); c.output(tableSchemaTag, schema); - List files = source.extractFiles(c.getPipelineOptions()); - for (String file : files) { - c.output(file); + List files = source.extractFiles(c.getPipelineOptions()); + for (ResourceId file : files) { + c.output(file.toString()); } } }) @@ -617,10 +592,12 @@ public void processElement(ProcessContext c) throws Exception { TableSchema schema = BigQueryHelpers.fromJsonString( c.sideInput(schemaView), TableSchema.class); - BigQuerySourceBase source = applySourceTransform( - c.getPipelineOptions(), "unused", "unused"); + // TODO: Get job UUID from side input. + BigQuerySourceBase source = applySourceTransform(jobUuid); List> sources = - source.createSources(ImmutableList.of(c.element()), schema); + source.createSources(ImmutableList.of( + FileSystems.matchNewResource( + c.element(), false /* is directory */)), schema); for (BoundedSource avroSource : sources) { BoundedSource.BoundedReader reader = avroSource.createReader(c.getPipelineOptions()); @@ -644,8 +621,8 @@ void cleanup(DoFn.ProcessContext c) throws Exception { resolveTempLocation( bqOptions.getTempLocation(), "BigQueryExtractTemp", - stepUuid); - + jobUuid); + final String executingProject = bqOptions.getProject(); JobReference jobRef = new JobReference() .setProjectId(executingProject) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java index dbc78a1729f1..f2c135b28e7b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java @@ -83,12 +83,15 @@ protected TableSchema getSchema(PipelineOptions options) throws Exception { return tableSchema; } - protected List extractFiles(PipelineOptions options) throws Exception { + protected List extractFiles(PipelineOptions options) throws Exception { BigQueryOptions bqOptions = options.as(BigQueryOptions.class); TableReference tableToExtract = getTableToExtract(bqOptions); JobService jobService = bqServices.getJobService(bqOptions); - String extractJobId = BigQueryIO.getExtractJobId(jobIdToken.get()); - List tempFiles = executeExtract(extractJobId, tableToExtract, jobService); + String extractJobId = getExtractJobId(createJobIdToken(options.getJobName(), stepUuid)); + final String extractDestinationDir = + resolveTempLocation(bqOptions.getTempLocation(), "BigQueryExtractTemp", stepUuid); + List tempFiles = executeExtract( + extractJobId, tableToExtract, jobService, bqOptions.getProject(), extractDestinationDir); return tempFiles; } @@ -103,8 +106,14 @@ public List> split( BigQueryOptions bqOptions = options.as(BigQueryOptions.class); TableReference tableToExtract = getTableToExtract(bqOptions); JobService jobService = bqServices.getJobService(bqOptions); - String extractJobId = BigQueryIO.getExtractJobId(jobIdToken.get()); - List tempFiles = executeExtract(extractJobId, tableToExtract, jobService); + + final String extractDestinationDir = + resolveTempLocation(bqOptions.getTempLocation(), "BigQueryExtractTemp", stepUuid); + + String extractJobId = getExtractJobId(createJobIdToken(options.getJobName(), stepUuid)); + List tempFiles = executeExtract( + extractJobId, tableToExtract, jobService, bqOptions.getProject(), extractDestinationDir); + TableSchema tableSchema = getSchema(options); cleanupTempResource(bqOptions); @@ -158,7 +167,7 @@ private List executeExtract( } List> createSources( - List files, TableSchema tableSchema) throws IOException, InterruptedException { + List files, TableSchema tableSchema) throws IOException, InterruptedException { final String jsonSchema = BigQueryIO.JSON_FACTORY.toString(tableSchema); SerializableFunction function = diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index 89d0e220cc5e..55a06a0fb05e 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -1652,7 +1652,7 @@ public void testBigQueryNoTableQuerySourceInitSplit() throws Exception { StaticValueProvider.of(query), true /* flattenResults */, true /* useLegacySql */, fakeBqServices); - PipelineOptions options = PipelineOptionsFactory.create(); + // PipelineOptions options = PipelineOptionsFactory.create(); options.setTempLocation(baseDir.toString()); List read = convertBigDecimaslToLong( From 7b859d1d8de5f683a7db50df6d02ce1163aa304f Mon Sep 17 00:00:00 2001 From: Sam McVeety Date: Mon, 26 Jun 2017 21:43:07 -0700 Subject: [PATCH 09/13] Iterating toward a workable version --- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 43 +++++++++++-------- .../sdk/io/gcp/bigquery/BigQueryOptions.java | 5 --- .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 12 +++--- 3 files changed, 32 insertions(+), 28 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 948b764f7ec1..dac43ac85e94 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -19,6 +19,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; +import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.createJobIdToken; import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.getExtractJobId; import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.resolveTempLocation; @@ -296,6 +297,7 @@ public TableRow apply(TableRow input) { public static Read read() { return new AutoValue_BigQueryIO_Read.Builder() .setValidate(true) + .setUseNewSource(false) .setBigQueryServices(new BigQueryServicesImpl()) .build(); } @@ -308,6 +310,7 @@ public abstract static class Read extends PTransformWith new source. TODO: Better comment. + */ + public Read withNewSource() { + return toBuilder().setUseNewSource(true).build(); + } + @VisibleForTesting Read withTestServices(BigQueryServices testServices) { return toBuilder().setBigQueryServices(testServices).build(); @@ -517,17 +530,14 @@ public void validate(PipelineOptions options) { @Override public PCollection expand(PBegin input) { Pipeline p = input.getPipeline(); - final String jobUuid = BigQueryHelpers.randomUUIDString(); + final String staticJobUuid = BigQueryHelpers.randomUUIDString(); final PCollectionView jobIdTokenView; PCollection jobIdTokenCollection = null; - // TODO: Fix this. - // final boolean useNewSource = bqOptions.getUseNewSource(); - final boolean useNewSource = false; - if (!useNewSource) { + if (!getUseNewSource()) { // Create a singleton job ID token at construction time. jobIdTokenView = p - .apply("TriggerIdCreation", Create.of(jobUuid)) + .apply("TriggerIdCreation", Create.of(staticJobUuid)) .apply("ViewId", View.asSingleton()); } else { // Create a singleton job ID token at execution time. @@ -545,10 +555,10 @@ public String apply(String input) { } PCollection rows; - if (!useNewSource) { + if (!getUseNewSource()) { // Apply the traditional Source model. rows = p.apply( - org.apache.beam.sdk.io.Read.from(applySourceTransform(jobUuid))) + org.apache.beam.sdk.io.Read.from(applySourceTransform(staticJobUuid))) .setCoder(getDefaultOutputCoder()); } else { // Apply the DoFn version. @@ -592,7 +602,7 @@ public void processElement(ProcessContext c) throws Exception { TableSchema schema = BigQueryHelpers.fromJsonString( c.sideInput(schemaView), TableSchema.class); - // TODO: Get job UUID from side input. + String jobUuid = (String) c.sideInput(jobIdTokenView); BigQuerySourceBase source = applySourceTransform(jobUuid); List> sources = source.createSources(ImmutableList.of( @@ -609,7 +619,7 @@ public void processElement(ProcessContext c) } } } - }).withSideInputs(schemaView)); + }).withSideInputs(schemaView, jobIdTokenView)); } PassThroughThenCleanup.CleanupOperation cleanupOperation = new PassThroughThenCleanup.CleanupOperation() { @@ -617,16 +627,17 @@ public void processElement(ProcessContext c) void cleanup(DoFn.ProcessContext c) throws Exception { PipelineOptions options = c.getPipelineOptions(); BigQueryOptions bqOptions = options.as(BigQueryOptions.class); + String jobUuid = (String) c.sideInput(jobIdTokenView); final String extractDestinationDir = resolveTempLocation( bqOptions.getTempLocation(), "BigQueryExtractTemp", jobUuid); final String executingProject = bqOptions.getProject(); - JobReference jobRef = - new JobReference() - .setProjectId(executingProject) - .setJobId(getExtractJobId((String) c.sideInput(jobIdTokenView))); + JobReference jobRef = new JobReference() + .setProjectId(executingProject) + .setJobId(getExtractJobId( + createJobIdToken(bqOptions.getJobName(), jobUuid))); Job extractJob = getBigQueryServices().getJobService(bqOptions).getJob(jobRef); @@ -682,10 +693,6 @@ public TableReference getTable() { } } - static String getExtractJobId(String jobIdToken) { - return jobIdToken + "-extract"; - } - static String getExtractDestinationUri(String extractDestinationDir) { return String.format("%s/%s", extractDestinationDir, "*.avro"); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java index 2bc58634a522..d3116ebaec4d 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java @@ -36,9 +36,4 @@ public interface BigQueryOptions extends ApplicationNameOptions, GcpOptions, @Default.String("bigquery.googleapis.com/cloud_dataflow") String getTempDatasetId(); void setTempDatasetId(String value); - - @Description("Whether to use the SDF-style (experimental) source.") - @Default.Boolean(false) - Boolean getUseNewSource(); - void setUseNewSource(Boolean value); } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index 55a06a0fb05e..9a64196402e7 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -413,7 +413,6 @@ private void testReadFromTable(boolean useNewSource) throws IOException, InterruptedException { BigQueryOptions bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class); bqOptions.setProject("defaultproject"); - bqOptions.setUseNewSource(useNewSource); bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath()); Job job = new Job(); @@ -453,10 +452,14 @@ private void testReadFromTable(boolean useNewSource) .withDatasetService(fakeDatasetService); Pipeline p = TestPipeline.create(bqOptions); + BigQueryIO.Read read = BigQueryIO.read().from("non-executing-project:somedataset.sometable") + .withTestServices(fakeBqServices) + .withoutValidation(); + if (useNewSource) { + read = read.withNewSource(); + } PCollection> output = p - .apply(BigQueryIO.read().from("non-executing-project:somedataset.sometable") - .withTestServices(fakeBqServices) - .withoutValidation()) + .apply(read) .apply(ParDo.of(new DoFn>() { @ProcessElement public void processElement(ProcessContext c) throws Exception { @@ -1652,7 +1655,6 @@ public void testBigQueryNoTableQuerySourceInitSplit() throws Exception { StaticValueProvider.of(query), true /* flattenResults */, true /* useLegacySql */, fakeBqServices); - // PipelineOptions options = PipelineOptionsFactory.create(); options.setTempLocation(baseDir.toString()); List read = convertBigDecimaslToLong( From 10115455ca2cd9aeea3ee197b1bbd51cfb6b0ddc Mon Sep 17 00:00:00 2001 From: Sam McVeety Date: Tue, 27 Jun 2017 07:39:18 -0700 Subject: [PATCH 10/13] Iterating toward a workable version --- .../org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index dac43ac85e94..cf1455611ae5 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -45,6 +45,7 @@ import javax.annotation.Nullable; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineRunner; +import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; @@ -414,10 +415,12 @@ public Read usingStandardSql() { } /** - * Use new SDF-like source. + * Use new SplittableDoFn-compatible source implementation. * - *

With new source. TODO: Better comment. + *

Use new SplittableDoFn-compatible source implementation. This implementation is also + * compatible with repeated template invocations. */ + @Experimental(Experimental.Kind.SOURCE_SINK) public Read withNewSource() { return toBuilder().setUseNewSource(true).build(); } From 34051f46325fc0bb9502f133912d6dceebf5569f Mon Sep 17 00:00:00 2001 From: Sam McVeety Date: Tue, 27 Jun 2017 07:48:23 -0700 Subject: [PATCH 11/13] Remove duplicate code --- .../sdk/io/gcp/bigquery/BigQuerySourceBase.java | 13 ++----------- 1 file changed, 2 insertions(+), 11 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java index f2c135b28e7b..67b2072b2189 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java @@ -103,19 +103,10 @@ public List> split( // We ignore desiredBundleSizeBytes anyway, however in any case, we should not initiate // another BigQuery extract job for the repeated split() calls. if (cachedSplitResult == null) { - BigQueryOptions bqOptions = options.as(BigQueryOptions.class); - TableReference tableToExtract = getTableToExtract(bqOptions); - JobService jobService = bqServices.getJobService(bqOptions); - - final String extractDestinationDir = - resolveTempLocation(bqOptions.getTempLocation(), "BigQueryExtractTemp", stepUuid); - - String extractJobId = getExtractJobId(createJobIdToken(options.getJobName(), stepUuid)); - List tempFiles = executeExtract( - extractJobId, tableToExtract, jobService, bqOptions.getProject(), extractDestinationDir); - + List tempFiles = extractFiles(options); TableSchema tableSchema = getSchema(options); + BigQueryOptions bqOptions = options.as(BigQueryOptions.class); cleanupTempResource(bqOptions); cachedSplitResult = checkNotNull(createSources(tempFiles, tableSchema)); } From bcc7e0b620c6d19f8a37fdbea3d8cece076f17df Mon Sep 17 00:00:00 2001 From: Sam McVeety Date: Wed, 12 Jul 2017 18:39:23 -0700 Subject: [PATCH 12/13] Iterated on comments --- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 51 ++++++++----------- .../gcp/bigquery/PassThroughThenCleanup.java | 32 +++++++++--- .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 4 +- 3 files changed, 50 insertions(+), 37 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index cf1455611ae5..ae9b1d31db69 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -415,10 +415,11 @@ public Read usingStandardSql() { } /** - * Use new SplittableDoFn-compatible source implementation. + * Use new template-compatible source implementation. * - *

Use new SplittableDoFn-compatible source implementation. This implementation is also - * compatible with repeated template invocations. + *

Use new template-compatible source implementation. This implementation is + * compatible with repeated template invocations. It does not support dynamic work + * rebalancing. */ @Experimental(Experimental.Kind.SOURCE_SINK) public Read withNewSource() { @@ -430,7 +431,7 @@ Read withTestServices(BigQueryServices testServices) { return toBuilder().setBigQueryServices(testServices).build(); } - private BigQuerySourceBase applySourceTransform(String jobUuid) { + private BigQuerySourceBase createSource(String jobUuid) { BigQuerySourceBase source; if (getQuery() != null && (!getQuery().isAccessible() || !Strings.isNullOrEmpty(getQuery().get()))) { @@ -533,15 +534,19 @@ public void validate(PipelineOptions options) { @Override public PCollection expand(PBegin input) { Pipeline p = input.getPipeline(); - final String staticJobUuid = BigQueryHelpers.randomUUIDString(); - final PCollectionView jobIdTokenView; PCollection jobIdTokenCollection = null; + PCollection rows; if (!getUseNewSource()) { // Create a singleton job ID token at construction time. + final String staticJobUuid = BigQueryHelpers.randomUUIDString(); jobIdTokenView = p .apply("TriggerIdCreation", Create.of(staticJobUuid)) .apply("ViewId", View.asSingleton()); + // Apply the traditional Source model. + rows = p.apply( + org.apache.beam.sdk.io.Read.from(createSource(staticJobUuid))) + .setCoder(getDefaultOutputCoder()); } else { // Create a singleton job ID token at execution time. jobIdTokenCollection = p @@ -555,16 +560,7 @@ public String apply(String input) { })); jobIdTokenView = jobIdTokenCollection .apply("ViewId", View.asSingleton()); - } - PCollection rows; - if (!getUseNewSource()) { - // Apply the traditional Source model. - rows = p.apply( - org.apache.beam.sdk.io.Read.from(applySourceTransform(staticJobUuid))) - .setCoder(getDefaultOutputCoder()); - } else { - // Apply the DoFn version. final TupleTag filesTag = new TupleTag(){}; final TupleTag tableSchemaTag = @@ -576,7 +572,7 @@ public String apply(String input) { public void processElement(ProcessContext c) throws Exception { String jobUuid = c.element(); - BigQuerySourceBase source = applySourceTransform( + BigQuerySourceBase source = createSource( jobUuid); String schema = BigQueryHelpers.toJsonString( source.getSchema(c.getPipelineOptions())); @@ -605,21 +601,18 @@ public void processElement(ProcessContext c) throws Exception { TableSchema schema = BigQueryHelpers.fromJsonString( c.sideInput(schemaView), TableSchema.class); - String jobUuid = (String) c.sideInput(jobIdTokenView); - BigQuerySourceBase source = applySourceTransform(jobUuid); + String jobUuid = c.sideInput(jobIdTokenView); + BigQuerySourceBase source = createSource(jobUuid); List> sources = source.createSources(ImmutableList.of( FileSystems.matchNewResource( c.element(), false /* is directory */)), schema); - for (BoundedSource avroSource : sources) { - BoundedSource.BoundedReader reader = - avroSource.createReader(c.getPipelineOptions()); - if (reader.start()) { - c.output(reader.getCurrent()); - while (reader.advance()) { - c.output(reader.getCurrent()); - } - } + checkArgument(sources.size() == 1, "Expected exactly one source."); + BoundedSource avroSource = sources.get(0); + BoundedSource.BoundedReader reader = + avroSource.createReader(c.getPipelineOptions()); + for (boolean more = reader.start(); more; more = reader.advance()) { + c.output(reader.getCurrent()); } } }).withSideInputs(schemaView, jobIdTokenView)); @@ -627,10 +620,10 @@ public void processElement(ProcessContext c) PassThroughThenCleanup.CleanupOperation cleanupOperation = new PassThroughThenCleanup.CleanupOperation() { @Override - void cleanup(DoFn.ProcessContext c) throws Exception { + void cleanup(PassThroughThenCleanup.ContextContainer c) throws Exception { PipelineOptions options = c.getPipelineOptions(); BigQueryOptions bqOptions = options.as(BigQueryOptions.class); - String jobUuid = (String) c.sideInput(jobIdTokenView); + String jobUuid = c.getJobId(); final String extractDestinationDir = resolveTempLocation( bqOptions.getTempLocation(), diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PassThroughThenCleanup.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PassThroughThenCleanup.java index 426d90ed7cb3..364934cb0aa6 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PassThroughThenCleanup.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PassThroughThenCleanup.java @@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting; import java.io.Serializable; import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; @@ -40,12 +41,12 @@ class PassThroughThenCleanup extends PTransform, PCollection> { private CleanupOperation cleanupOperation; - private PCollectionView sideInput; + private PCollectionView jobIdSideInput; PassThroughThenCleanup(CleanupOperation cleanupOperation, - PCollectionView sideInput) { + PCollectionView jobIdSideInput) { this.cleanupOperation = cleanupOperation; - this.sideInput = sideInput; + this.jobIdSideInput = jobIdSideInput; } @Override @@ -66,9 +67,9 @@ public PCollection expand(PCollection input) { @ProcessElement public void processElement(ProcessContext c) throws Exception { - c.element().cleanup(c); + c.element().cleanup(new ContextContainer(c, jobIdSideInput)); } - }).withSideInputs(sideInput, cleanupSignalView)); + }).withSideInputs(jobIdSideInput, cleanupSignalView)); return outputs.get(mainOutput); } @@ -81,6 +82,25 @@ public void processElement(ProcessContext c) { } abstract static class CleanupOperation implements Serializable { - abstract void cleanup(DoFn.ProcessContext options) throws Exception; + abstract void cleanup(ContextContainer container) throws Exception; + } + + static class ContextContainer { + private PCollectionView view; + private DoFn.ProcessContext context; + + public ContextContainer( + DoFn.ProcessContext context, PCollectionView view) { + this.view = view; + this.context = context; + } + + public PipelineOptions getPipelineOptions() { + return context.getPipelineOptions(); + } + + public String getJobId() { + return (String) context.sideInput(view); + } } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index 9a64196402e7..351a24b0859c 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -1737,7 +1737,7 @@ public void testPassThroughThenCleanup() throws Exception { .apply(Create.of(1, 2, 3)) .apply(new PassThroughThenCleanup(new CleanupOperation() { @Override - void cleanup(DoFn.ProcessContext c) throws Exception { + void cleanup(PassThroughThenCleanup.ContextContainer c) throws Exception { // no-op }}, p.apply("Create1", Create.of("")).apply(View.asSingleton()))); @@ -1752,7 +1752,7 @@ public void testPassThroughThenCleanupExecuted() throws Exception { p.apply(Create.empty(VarIntCoder.of())) .apply(new PassThroughThenCleanup(new CleanupOperation() { @Override - void cleanup(DoFn.ProcessContext c) throws Exception { + void cleanup(PassThroughThenCleanup.ContextContainer c) throws Exception { throw new RuntimeException("cleanup executed"); }}, p.apply("Create1", Create.of("")).apply(View.asSingleton()))); From be1bdeacf8d2c29c9302096898dc8d6c07eb5db5 Mon Sep 17 00:00:00 2001 From: Sam McVeety Date: Fri, 14 Jul 2017 15:21:15 -0700 Subject: [PATCH 13/13] Rename useNewSource --- .../apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java | 12 ++++++------ .../beam/sdk/io/gcp/bigquery/BigQueryIOTest.java | 8 ++++---- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index ae9b1d31db69..3c320516857b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -298,7 +298,7 @@ public TableRow apply(TableRow input) { public static Read read() { return new AutoValue_BigQueryIO_Read.Builder() .setValidate(true) - .setUseNewSource(false) + .setWithTemplateCompatibility(false) .setBigQueryServices(new BigQueryServicesImpl()) .build(); } @@ -311,7 +311,7 @@ public abstract static class Read extends PTransform expand(PBegin input) { final PCollectionView jobIdTokenView; PCollection jobIdTokenCollection = null; PCollection rows; - if (!getUseNewSource()) { + if (!getWithTemplateCompatibility()) { // Create a singleton job ID token at construction time. final String staticJobUuid = BigQueryHelpers.randomUUIDString(); jobIdTokenView = p diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index 351a24b0859c..dc8c9ea72c3d 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -405,11 +405,11 @@ public void testReadFromTableOldSource() throws IOException, InterruptedExceptio } @Test - public void testReadFromTableNewSource() throws IOException, InterruptedException { + public void testReadFromTableTemplateCompatibility() throws IOException, InterruptedException { testReadFromTable(true); } - private void testReadFromTable(boolean useNewSource) + private void testReadFromTable(boolean useTemplateCompatibility) throws IOException, InterruptedException { BigQueryOptions bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class); bqOptions.setProject("defaultproject"); @@ -455,8 +455,8 @@ private void testReadFromTable(boolean useNewSource) BigQueryIO.Read read = BigQueryIO.read().from("non-executing-project:somedataset.sometable") .withTestServices(fakeBqServices) .withoutValidation(); - if (useNewSource) { - read = read.withNewSource(); + if (useTemplateCompatibility) { + read = read.withTemplateCompatibility(); } PCollection> output = p .apply(read)