From 228dedb5614080126408f16b7b19f5518257222e Mon Sep 17 00:00:00 2001 From: Reuven Lax Date: Sun, 30 Jul 2017 21:42:59 -0700 Subject: [PATCH 1/9] Add support for TimePartitioning in streaming. --- .../sdk/io/gcp/bigquery/BigQueryHelpers.java | 8 ++++ .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 46 +++++++++++++++++++ .../sdk/io/gcp/bigquery/CreateTables.java | 13 ++++-- .../bigquery/DynamicDestinationsHelpers.java | 20 ++++++++ .../sdk/io/gcp/bigquery/TableDestination.java | 33 +++++++++++-- .../gcp/bigquery/TableDestinationCoder.java | 6 ++- 6 files changed, 117 insertions(+), 9 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java index 78dcdde097ad..7f9e27ac4d95 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java @@ -24,6 +24,7 @@ import com.google.api.services.bigquery.model.JobStatus; import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableSchema; +import com.google.api.services.bigquery.model.TimePartitioning; import com.google.cloud.hadoop.util.ApiErrorExtractor; import com.google.common.annotations.VisibleForTesting; import com.google.common.hash.Hashing; @@ -291,6 +292,13 @@ public TableReference apply(String from) { } } + static class TimePartitioningToJson implements SerializableFunction { + @Override + public String apply(TimePartitioning partitioning) { + return toJsonString(partitioning); + } + } + static String createJobIdToken(String jobName, String stepUuid) { return String.format("beam_job_%s_%s", stepUuid, jobName.replaceAll("-", "")); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index feb085db6b52..1258d1e513f7 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -31,6 +31,7 @@ import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; +import com.google.api.services.bigquery.model.TimePartitioning; import com.google.auto.value.AutoValue; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Predicates; @@ -60,9 +61,11 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableRefToJson; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableSchemaToJsonSchema; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableSpecToTableRef; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TimePartitioningToJson; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService; import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinationsHelpers.ConstantSchemaDestinations; +import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinationsHelpers.ConstantTimePartitioninDestinations; import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinationsHelpers.SchemaFromViewDestinations; import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinationsHelpers.TableFunctionDestinations; import org.apache.beam.sdk.options.PipelineOptions; @@ -824,6 +827,7 @@ public enum Method { @Nullable abstract DynamicDestinations getDynamicDestinations(); @Nullable abstract PCollectionView> getSchemaFromView(); @Nullable abstract ValueProvider getJsonSchema(); + @Nullable abstract ValueProvider getJsonTimePartitioning(); abstract CreateDisposition getCreateDisposition(); abstract WriteDisposition getWriteDisposition(); /** Table description. Default is empty. */ @@ -854,6 +858,7 @@ abstract Builder setTableFunction( abstract Builder setDynamicDestinations(DynamicDestinations dynamicDestinations); abstract Builder setSchemaFromView(PCollectionView> view); abstract Builder setJsonSchema(ValueProvider jsonSchema); + abstract Builder setJsonTimePartitioning(ValueProvider jsonTimePartitioning); abstract Builder setCreateDisposition(CreateDisposition createDisposition); abstract Builder setWriteDisposition(WriteDisposition writeDisposition); abstract Builder setTableDescription(String tableDescription); @@ -1022,6 +1027,32 @@ public Write withSchemaFromView(PCollectionView> view) { return toBuilder().setSchemaFromView(view).build(); } + /** + * Allows newly created tables to include a {@link TimePartitioning} class. Can only be used + * when writing to a single table. If {@link #to(SerializableFunction)} or + * @link #to(DynamicDestinations)} is used to write dynamic tables, time partitioning can be + * directly in the returned {@link TableDestination}. + */ + public Write withTimePartitioning(TimePartitioning partitioning) { + return withTimePartitioning(StaticValueProvider.of(partitioning)); + } + + /** + * Like {@link #withTimePartitioning(TimePartitioning)} but using a deferred + * {@link ValueProvider}. + */ + public Write withTimePartitioning(ValueProvider partition) { + return withJsonTimePartitioning(NestedValueProvider.of( + partition, new TimePartitioningToJson())); + } + + /** + * The same as {@link #withTimePartitioning}, but takes a JSON-serialized object. + */ + public Write withJsonTimePartitioning(ValueProvider partition) { + return toBuilder().setJsonTimePartitioning(partition).build(); + } + /** Specifies whether the table should be created if it does not exist. */ public Write withCreateDisposition(CreateDisposition createDisposition) { return toBuilder().setCreateDisposition(createDisposition).build(); @@ -1183,6 +1214,15 @@ public WriteResult expand(PCollection input) { input.isBounded(), method); } + if (getJsonTimePartitioning() != null) { + checkArgument(getDynamicDestinations() == null, + "The supplied DynamicDestinations object can directly set TimePartitiong." + + " There is no need to call BigQueryIO.Write.withTimePartitioning."); + checkArgument(getTableFunction() == null, + "The supplied getTableFunction object can directly set TimePartitiong." + + " There is no need to call BigQueryIO.Write.withTimePartitioning."); + } + DynamicDestinations dynamicDestinations = getDynamicDestinations(); if (dynamicDestinations == null) { if (getJsonTableRef() != null) { @@ -1205,6 +1245,12 @@ public WriteResult expand(PCollection input) { (DynamicDestinations) dynamicDestinations, getSchemaFromView()); } + + // Wrap with a DynamicDestinations class that will provide the proper TimePartitioning. + if (getJsonTimePartitioning() != null) { + dynamicDestinations = new ConstantTimePartitioninDestinations( + dynamicDestinations, getJsonTimePartitioning()); + } } return expandTyped(input, dynamicDestinations); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java index 3dc10b08a024..3e83f60edc39 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java @@ -124,11 +124,14 @@ private void possibleCreateTable( DatasetService datasetService = bqServices.getDatasetService(options); if (!createdTables.contains(tableSpec)) { if (datasetService.getTable(tableReference) == null) { - datasetService.createTable( - new Table() - .setTableReference(tableReference) - .setSchema(tableSchema) - .setDescription(tableDescription)); + Table table = new Table() + .setTableReference(tableReference) + .setSchema(tableSchema) + .setDescription(tableDescription); + if (tableDestination.getTimePartitioning() != null) { + table.setTimePartitioning(tableDestination.getTimePartitioning()); + } + datasetService.createTable(table); } createdTables.add(tableSpec); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java index 530e2b6feb6d..d96ec979974d 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java @@ -164,6 +164,26 @@ public TableSchema getSchema(TableDestination destination) { } } + static class ConstantTimePartitioninDestinations + extends DelegatingDynamicDestinations { + + @Nullable + private final ValueProvider jsonTimePartitioning; + + ConstantTimePartitioninDestinations(DynamicDestinations inner, + ValueProvider jsonTimePartitioning) { + super(inner); + this.jsonTimePartitioning = jsonTimePartitioning; + } + + @Override + public TableDestination getDestination(ValueInSingleWindow element) { + TableDestination destination = super.getDestination(element); + return new TableDestination(destination.getTableSpec(), destination.getTableDescription(), + jsonTimePartitioning.get()); + } + } + /** * Takes in a side input mapping tablespec to json table schema, and always returns the * matching schema from the side input. diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java index ecf35d8d80e7..881d0d75aeac 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java @@ -19,6 +19,7 @@ package org.apache.beam.sdk.io.gcp.bigquery; import com.google.api.services.bigquery.model.TableReference; +import com.google.api.services.bigquery.model.TimePartitioning; import java.io.Serializable; import java.util.Objects; import javax.annotation.Nullable; @@ -31,18 +32,32 @@ public class TableDestination implements Serializable { private final String tableSpec; @Nullable private final String tableDescription; + @Nullable + private final String jsonTimePartitioning; public TableDestination(String tableSpec, @Nullable String tableDescription) { - this.tableSpec = tableSpec; - this.tableDescription = tableDescription; + this(tableSpec, tableDescription, (String) null); } public TableDestination(TableReference tableReference, @Nullable String tableDescription) { - this.tableSpec = BigQueryHelpers.toTableSpec(tableReference); + this(BigQueryHelpers.toTableSpec(tableReference), tableDescription, (String) null); + } + + public TableDestination(String tableSpec, @Nullable String tableDescription, + TimePartitioning timePartitioning) { + this(tableSpec, tableDescription, + timePartitioning != null ? BigQueryHelpers.toJsonString(timePartitioning) : null); + } + + public TableDestination(String tableSpec, @Nullable String tableDescription, + String jsonTimePartitioning) { + this.tableSpec = tableSpec; this.tableDescription = tableDescription; + this.jsonTimePartitioning = jsonTimePartitioning; } + public String getTableSpec() { return tableSpec; } @@ -51,6 +66,18 @@ public TableReference getTableReference() { return BigQueryHelpers.parseTableSpec(tableSpec); } + public String getJsonTimePartitioning() { + return jsonTimePartitioning; + } + + public TimePartitioning getTimePartitioning() { + if (jsonTimePartitioning == null) { + return null; + } else { + return BigQueryHelpers.fromJsonString(jsonTimePartitioning, TimePartitioning.class); + } + } + @Nullable public String getTableDescription() { return tableDescription; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java index f034a030da60..05f6cd1a52e9 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.io.gcp.bigquery; +import com.google.api.services.bigquery.model.TimePartitioning; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -32,6 +33,7 @@ public class TableDestinationCoder extends AtomicCoder { private static final TableDestinationCoder INSTANCE = new TableDestinationCoder(); private static final Coder tableSpecCoder = StringUtf8Coder.of(); private static final Coder tableDescriptionCoder = NullableCoder.of(StringUtf8Coder.of()); + private static final Coder timePartitioningCoder = NullableCoder.of(StringUtf8Coder.of()); public static TableDestinationCoder of() { return INSTANCE; @@ -45,13 +47,15 @@ public void encode(TableDestination value, OutputStream outStream) } tableSpecCoder.encode(value.getTableSpec(), outStream); tableDescriptionCoder.encode(value.getTableDescription(), outStream); + timePartitioningCoder.encode(value.getJsonTimePartitioning(), outStream); } @Override public TableDestination decode(InputStream inStream) throws IOException { String tableSpec = tableSpecCoder.decode(inStream); String tableDescription = tableDescriptionCoder.decode(inStream); - return new TableDestination(tableSpec, tableDescription); + String jsonTimePartitioning = timePartitioningCoder.decode(inStream); + return new TableDestination(tableSpec, tableDescription, jsonTimePartitioning); } @Override From d8e4ea657c1972c6eb8c5b0da252e10d9a7f92da Mon Sep 17 00:00:00 2001 From: Reuven Lax Date: Thu, 3 Aug 2017 11:14:00 -0700 Subject: [PATCH 2/9] Make Coder backwards compatible. --- .../apache/beam/sdk/io/gcp/bigquery/TableDestination.java | 2 +- .../beam/sdk/io/gcp/bigquery/TableDestinationCoder.java | 7 ++++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java index 881d0d75aeac..22df2541fc04 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java @@ -51,7 +51,7 @@ public TableDestination(String tableSpec, @Nullable String tableDescription, } public TableDestination(String tableSpec, @Nullable String tableDescription, - String jsonTimePartitioning) { + @Nullable String jsonTimePartitioning) { this.tableSpec = tableSpec; this.tableDescription = tableDescription; this.jsonTimePartitioning = jsonTimePartitioning; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java index 05f6cd1a52e9..139518f7a0c9 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java @@ -54,7 +54,12 @@ public void encode(TableDestination value, OutputStream outStream) public TableDestination decode(InputStream inStream) throws IOException { String tableSpec = tableSpecCoder.decode(inStream); String tableDescription = tableDescriptionCoder.decode(inStream); - String jsonTimePartitioning = timePartitioningCoder.decode(inStream); + String jsonTimePartitioning = null; + try { + jsonTimePartitioning = timePartitioningCoder.decode(inStream); + } catch (IOException e) { + // This implies we're decoding old state that did not contain TimePartitioning. Continue. + } return new TableDestination(tableSpec, tableDescription, jsonTimePartitioning); } From d26eaaaee31422facc7df4befa8e7768ee7e606a Mon Sep 17 00:00:00 2001 From: Reuven Lax Date: Thu, 3 Aug 2017 11:14:28 -0700 Subject: [PATCH 3/9] Remove unused import. --- .../apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java | 1 - 1 file changed, 1 deletion(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java index 139518f7a0c9..2b0911455caa 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java @@ -18,7 +18,6 @@ package org.apache.beam.sdk.io.gcp.bigquery; -import com.google.api.services.bigquery.model.TimePartitioning; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; From e3379e05fa0fe4c38d289216445a20534bcf1260 Mon Sep 17 00:00:00 2001 From: Reuven Lax Date: Thu, 3 Aug 2017 11:15:18 -0700 Subject: [PATCH 4/9] Fix typos. --- .../org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java | 8 ++++---- .../sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java | 6 +++--- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 1258d1e513f7..0e3180670ead 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -65,7 +65,7 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService; import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinationsHelpers.ConstantSchemaDestinations; -import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinationsHelpers.ConstantTimePartitioninDestinations; +import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinationsHelpers.ConstantTimePartitioningDestinations; import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinationsHelpers.SchemaFromViewDestinations; import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinationsHelpers.TableFunctionDestinations; import org.apache.beam.sdk.options.PipelineOptions; @@ -1216,10 +1216,10 @@ public WriteResult expand(PCollection input) { } if (getJsonTimePartitioning() != null) { checkArgument(getDynamicDestinations() == null, - "The supplied DynamicDestinations object can directly set TimePartitiong." + "The supplied DynamicDestinations object can directly set TimePartitioning." + " There is no need to call BigQueryIO.Write.withTimePartitioning."); checkArgument(getTableFunction() == null, - "The supplied getTableFunction object can directly set TimePartitiong." + "The supplied getTableFunction object can directly set TimePartitioning." + " There is no need to call BigQueryIO.Write.withTimePartitioning."); } @@ -1248,7 +1248,7 @@ public WriteResult expand(PCollection input) { // Wrap with a DynamicDestinations class that will provide the proper TimePartitioning. if (getJsonTimePartitioning() != null) { - dynamicDestinations = new ConstantTimePartitioninDestinations( + dynamicDestinations = new ConstantTimePartitioningDestinations( dynamicDestinations, getJsonTimePartitioning()); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java index d96ec979974d..4442306555a5 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java @@ -163,14 +163,14 @@ public TableSchema getSchema(TableDestination destination) { return BigQueryHelpers.fromJsonString(jsonSchema.get(), TableSchema.class); } } - - static class ConstantTimePartitioninDestinations + + static class ConstantTimePartitioningDestinations extends DelegatingDynamicDestinations { @Nullable private final ValueProvider jsonTimePartitioning; - ConstantTimePartitioninDestinations(DynamicDestinations inner, + ConstantTimePartitioningDestinations(DynamicDestinations inner, ValueProvider jsonTimePartitioning) { super(inner); this.jsonTimePartitioning = jsonTimePartitioning; From 6b4ff570b1f2ddd39004675d66aa6de830cfff13 Mon Sep 17 00:00:00 2001 From: Reuven Lax Date: Sun, 20 Aug 2017 12:55:15 -0700 Subject: [PATCH 5/9] Add test coverage. --- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 5 ++- .../bigquery/DynamicDestinationsHelpers.java | 2 +- .../beam/sdk/io/gcp/bigquery/WriteTables.java | 1 - .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 41 +++++++++++++++++++ .../sdk/io/gcp/bigquery/TableContainer.java | 2 + 5 files changed, 48 insertions(+), 3 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 0e3180670ead..62bfe8a19d30 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -1034,7 +1034,8 @@ public Write withSchemaFromView(PCollectionView> view) { * directly in the returned {@link TableDestination}. */ public Write withTimePartitioning(TimePartitioning partitioning) { - return withTimePartitioning(StaticValueProvider.of(partitioning)); + return withJsonTimePartitioning( + StaticValueProvider.of(BigQueryHelpers.toJsonString(partitioning))); } /** @@ -1221,6 +1222,8 @@ public WriteResult expand(PCollection input) { checkArgument(getTableFunction() == null, "The supplied getTableFunction object can directly set TimePartitioning." + " There is no need to call BigQueryIO.Write.withTimePartitioning."); + checkArgument(method != Method.FILE_LOADS, + "TimePartitioning is not yet implemented for BigQuery load jobs."); } DynamicDestinations dynamicDestinations = getDynamicDestinations(); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java index 4442306555a5..7b813db81474 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java @@ -163,7 +163,7 @@ public TableSchema getSchema(TableDestination destination) { return BigQueryHelpers.fromJsonString(jsonSchema.get(), TableSchema.class); } } - + static class ConstantTimePartitioningDestinations extends DelegatingDynamicDestinations { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java index c8fab75f7573..c7f1404c5a3f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java @@ -164,7 +164,6 @@ private void load( .setWriteDisposition(writeDisposition.name()) .setCreateDisposition(createDisposition.name()) .setSourceFormat("NEWLINE_DELIMITED_JSON"); - String projectId = ref.getProjectId(); Job lastFailedLoadJob = null; for (int i = 0; i < BatchLoads.MAX_RETRY_JOBS; ++i) { diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index 0ece3ee0ba1d..7c143141abed 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -47,6 +47,7 @@ import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; +import com.google.api.services.bigquery.model.TimePartitioning; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.HashBasedTable; import com.google.common.collect.ImmutableList; @@ -637,6 +638,46 @@ private void verifySideInputs() { } } + @Test + public void testTimePartitioning() throws Exception { + BigQueryOptions bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class); + bqOptions.setProject("project-id"); + bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath()); + + FakeDatasetService datasetService = new FakeDatasetService(); + FakeBigQueryServices fakeBqServices = + new FakeBigQueryServices() + .withJobService(new FakeJobService()) + .withDatasetService(datasetService); + datasetService.createDataset("project-id", "dataset-id", "", ""); + + Pipeline p = TestPipeline.create(bqOptions); + TableRow row1 = new TableRow().set("name", "a").set("number", "1"); + TableRow row2 = new TableRow().set("name", "b").set("number", "2"); + + TimePartitioning timePartitioning = new TimePartitioning() + .setType("DAY") + .setExpirationMs(1000L); + TableSchema schema = new TableSchema() + .setFields( + ImmutableList.of( + new TableFieldSchema().setName("number").setType("INTEGER"))); + p.apply(Create.of(row1, row1)) + .apply( + BigQueryIO.writeTableRows() + .to("project-id:dataset-id.table-id") + .withTestServices(fakeBqServices) + .withMethod(Method.STREAMING_INSERTS) + .withSchema(schema) + .withTimePartitioning(timePartitioning) + .withoutValidation()); + p.run(); + Table table = datasetService.getTable( + BigQueryHelpers.parseTableSpec("project-id:dataset-id.table-id")); + assertEquals(schema, table.getSchema()); + assertEquals(timePartitioning, table.getTimePartitioning()); + } + @Test @Category({ValidatesRunner.class, UsesTestStream.class}) public void testTriggeredFileLoads() throws Exception { diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableContainer.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableContainer.java index 89150698114c..e016c98c8222 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableContainer.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableContainer.java @@ -32,6 +32,7 @@ class TableContainer { Long sizeBytes; TableContainer(Table table) { this.table = table; + this.rows = new ArrayList<>(); this.ids = new ArrayList<>(); this.sizeBytes = 0L; @@ -54,6 +55,7 @@ Table getTable() { return table; } + List getRows() { return rows; } From 12e600801951c6bc772dea4715e57287f6ef3633 Mon Sep 17 00:00:00 2001 From: Reuven Lax Date: Sun, 20 Aug 2017 13:41:27 -0700 Subject: [PATCH 6/9] Fix javadoc. --- .../java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 62bfe8a19d30..30daa8e535d1 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -1030,7 +1030,7 @@ public Write withSchemaFromView(PCollectionView> view) { /** * Allows newly created tables to include a {@link TimePartitioning} class. Can only be used * when writing to a single table. If {@link #to(SerializableFunction)} or - * @link #to(DynamicDestinations)} is used to write dynamic tables, time partitioning can be + * {@link #to(DynamicDestinations)} is used to write dynamic tables, time partitioning can be * directly in the returned {@link TableDestination}. */ public Write withTimePartitioning(TimePartitioning partitioning) { From 4b92f7455e8814fa4295b72bca581c45baa4dcfa Mon Sep 17 00:00:00 2001 From: Reuven Lax Date: Tue, 22 Aug 2017 23:14:08 -0700 Subject: [PATCH 7/9] Add a constructor that takes a TableReference. --- .../apache/beam/sdk/io/gcp/bigquery/TableDestination.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java index 22df2541fc04..79f1b22e8392 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java @@ -41,7 +41,13 @@ public TableDestination(String tableSpec, @Nullable String tableDescription) { } public TableDestination(TableReference tableReference, @Nullable String tableDescription) { - this(BigQueryHelpers.toTableSpec(tableReference), tableDescription, (String) null); + this(tableReference, tableDescription, null); + } + + public TableDestination(TableReference tableReference, @Nullable String tableDescription, + TimePartitioning timePartitioning) { + this(BigQueryHelpers.toTableSpec(tableReference), tableDescription, + timePartitioning != null ? BigQueryHelpers.toJsonString(timePartitioning) : null); } public TableDestination(String tableSpec, @Nullable String tableDescription, From 76ff7381437c54c0fd39b2f1974f7c32826f4c0a Mon Sep 17 00:00:00 2001 From: Reuven Lax Date: Wed, 23 Aug 2017 16:15:17 -0700 Subject: [PATCH 8/9] Support TimePartitioning in batch as well. --- pom.xml | 2 +- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 2 -- .../beam/sdk/io/gcp/bigquery/WriteTables.java | 6 ++++ .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 13 ++++++-- .../sdk/io/gcp/bigquery/FakeJobService.java | 32 ++++++++++++++++--- 5 files changed, 46 insertions(+), 9 deletions(-) diff --git a/pom.xml b/pom.xml index 1bdaa97f5259..ecca8a347498 100644 --- a/pom.xml +++ b/pom.xml @@ -107,7 +107,7 @@ 2.24.0 1.0.0-rc2 1.8.2 - v2-rev295-1.22.0 + v2-rev355-1.22.0 0.9.7.1 v1-rev6-1.22.0 0.1.0 diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 30daa8e535d1..29828e439b11 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -1222,8 +1222,6 @@ public WriteResult expand(PCollection input) { checkArgument(getTableFunction() == null, "The supplied getTableFunction object can directly set TimePartitioning." + " There is no need to call BigQueryIO.Write.withTimePartitioning."); - checkArgument(method != Method.FILE_LOADS, - "TimePartitioning is not yet implemented for BigQuery load jobs."); } DynamicDestinations dynamicDestinations = getDynamicDestinations(); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java index c7f1404c5a3f..a646f17513c2 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java @@ -23,6 +23,7 @@ import com.google.api.services.bigquery.model.JobReference; import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableSchema; +import com.google.api.services.bigquery.model.TimePartitioning; import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; @@ -135,6 +136,7 @@ public void processElement(ProcessContext c) throws Exception { bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)), jobIdPrefix, tableReference, + tableDestination.getTimePartitioning(), tableSchema, partitionFiles, writeDisposition, @@ -150,6 +152,7 @@ private void load( DatasetService datasetService, String jobIdPrefix, TableReference ref, + TimePartitioning timePartitioning, @Nullable TableSchema schema, List gcsUris, WriteDisposition writeDisposition, @@ -164,6 +167,9 @@ private void load( .setWriteDisposition(writeDisposition.name()) .setCreateDisposition(createDisposition.name()) .setSourceFormat("NEWLINE_DELIMITED_JSON"); + if (timePartitioning != null) { + loadConfig.setTimePartitioning(timePartitioning); + } String projectId = ref.getProjectId(); Job lastFailedLoadJob = null; for (int i = 0; i < BatchLoads.MAX_RETRY_JOBS; ++i) { diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index 7c143141abed..18547cd24848 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -639,7 +639,16 @@ private void verifySideInputs() { } @Test - public void testTimePartitioning() throws Exception { + public void testTimePartitioningStreamingInserts() throws Exception { + testTimePartitioning(Method.STREAMING_INSERTS); + } + + @Test + public void testTimePartitioningBatchLoads() throws Exception { + testTimePartitioning(Method.FILE_LOADS); + } + + public void testTimePartitioning(BigQueryIO.Write.Method insertMethod) throws Exception { BigQueryOptions bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class); bqOptions.setProject("project-id"); bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath()); @@ -667,7 +676,7 @@ public void testTimePartitioning() throws Exception { BigQueryIO.writeTableRows() .to("project-id:dataset-id.table-id") .withTestServices(fakeBqServices) - .withMethod(Method.STREAMING_INSERTS) + .withMethod(insertMethod) .withSchema(schema) .withTimePartitioning(timePartitioning) .withoutValidation()); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java index 7d5101d782f9..cc600d1a5134 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java @@ -19,6 +19,7 @@ package org.apache.beam.sdk.io.gcp.bigquery; import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; import com.google.api.client.json.JsonFactory; import com.google.api.client.util.BackOff; @@ -39,6 +40,7 @@ import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; +import com.google.api.services.bigquery.model.TimePartitioning; import com.google.common.collect.HashBasedTable; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; @@ -310,8 +312,13 @@ private JobStatus runLoadJob(JobReference jobRef, JobConfigurationLoad load) if (!validateDispositions(existingTable, createDisposition, writeDisposition)) { return new JobStatus().setState("FAILED").setErrorResult(new ErrorProto()); } - - datasetService.createTable(new Table().setTableReference(destination).setSchema(schema)); + if (existingTable == null) { + existingTable = new Table().setTableReference(destination).setSchema(schema); + if (load.getTimePartitioning() != null) { + existingTable = existingTable.setTimePartitioning(load.getTimePartitioning()); + } + datasetService.createTable(existingTable); + } List rows = Lists.newArrayList(); for (ResourceId filename : sourceFiles) { @@ -331,13 +338,30 @@ private JobStatus runCopyJob(JobConfigurationTableCopy copy) if (!validateDispositions(existingTable, createDisposition, writeDisposition)) { return new JobStatus().setState("FAILED").setErrorResult(new ErrorProto()); } - + TimePartitioning partitioning = null; + TableSchema schema = null; + boolean first = true; List allRows = Lists.newArrayList(); for (TableReference source : sources) { + Table table = checkNotNull(datasetService.getTable(source)); + if (!first) { + if (partitioning != table.getTimePartitioning()) { + return new JobStatus().setState("FAILED").setErrorResult(new ErrorProto()); + } + if (schema != table.getSchema()) { + return new JobStatus().setState("FAILED").setErrorResult(new ErrorProto()); + } + } + partitioning = table.getTimePartitioning(); + schema = table.getSchema(); + first = false; allRows.addAll(datasetService.getAllRows( source.getProjectId(), source.getDatasetId(), source.getTableId())); } - datasetService.createTable(new Table().setTableReference(destination)); + datasetService.createTable(new Table() + .setTableReference(destination) + .setSchema(schema) + .setTimePartitioning(partitioning)); datasetService.insertAll(destination, allRows, null); return new JobStatus().setState("DONE"); } From 8030fdd2388733746ad0c5ef223f20572b5120c0 Mon Sep 17 00:00:00 2001 From: Reuven Lax Date: Tue, 29 Aug 2017 13:51:09 -0700 Subject: [PATCH 9/9] Fix broken update in DataflowRunner. Add a new Coder instead of extending the old one. --- ...ltCoderCloudObjectTranslatorRegistrar.java | 2 + .../beam/sdk/io/gcp/bigquery/BatchLoads.java | 4 +- .../sdk/io/gcp/bigquery/CreateTables.java | 2 +- .../bigquery/DynamicDestinationsHelpers.java | 7 ++- .../gcp/bigquery/TableDestinationCoder.java | 10 +--- .../gcp/bigquery/TableDestinationCoderV2.java | 59 +++++++++++++++++++ 6 files changed, 71 insertions(+), 13 deletions(-) create mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoderV2.java diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java index 5d42a5fedde0..ff89933dadef 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java @@ -48,6 +48,7 @@ import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.io.gcp.bigquery.TableDestinationCoder; +import org.apache.beam.sdk.io.gcp.bigquery.TableDestinationCoderV2; import org.apache.beam.sdk.io.gcp.bigquery.TableRowJsonCoder; /** @@ -97,6 +98,7 @@ public class DefaultCoderCloudObjectTranslatorRegistrar RandomAccessDataCoder.class, StringUtf8Coder.class, TableDestinationCoder.class, + TableDestinationCoderV2.class, TableRowJsonCoder.class, TextualIntegerCoder.class, VarIntCoder.class, diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java index 0a1306dfc115..76cf7e84d591 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java @@ -266,7 +266,7 @@ private WriteResult expandTriggered(PCollection> inpu .apply(WithKeys.>of((Void) null)) .setCoder( KvCoder.of( - VoidCoder.of(), KvCoder.of(TableDestinationCoder.of(), StringUtf8Coder.of()))) + VoidCoder.of(), KvCoder.of(TableDestinationCoderV2.of(), StringUtf8Coder.of()))) .apply(GroupByKey.>create()) .apply(Values.>>create()) .apply( @@ -323,7 +323,7 @@ public WriteResult expandUntriggered(PCollection> inp tempTables .apply("ReifyRenameInput", new ReifyAsIterable>()) - .setCoder(IterableCoder.of(KvCoder.of(TableDestinationCoder.of(), StringUtf8Coder.of()))) + .setCoder(IterableCoder.of(KvCoder.of(TableDestinationCoderV2.of(), StringUtf8Coder.of()))) .apply( "WriteRenameUntriggered", ParDo.of( diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java index 3e83f60edc39..7f83b83b4026 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java @@ -73,7 +73,7 @@ private CreateTables( } CreateTables withTestServices(BigQueryServices bqServices) { - return new CreateTables(createDisposition, bqServices, dynamicDestinations); + return new CreateTables<>(createDisposition, bqServices, dynamicDestinations); } @Override diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java index 7b813db81474..818ea34253f0 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java @@ -108,7 +108,7 @@ public TableDestination getTable(TableDestination destination) { @Override public Coder getDestinationCoder() { - return TableDestinationCoder.of(); + return TableDestinationCoderV2.of(); } } @@ -182,6 +182,11 @@ public TableDestination getDestination(ValueInSingleWindow element) { return new TableDestination(destination.getTableSpec(), destination.getTableDescription(), jsonTimePartitioning.get()); } + + @Override + public Coder getDestinationCoder() { + return TableDestinationCoderV2.of(); + } } /** diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java index 2b0911455caa..f034a030da60 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java @@ -32,7 +32,6 @@ public class TableDestinationCoder extends AtomicCoder { private static final TableDestinationCoder INSTANCE = new TableDestinationCoder(); private static final Coder tableSpecCoder = StringUtf8Coder.of(); private static final Coder tableDescriptionCoder = NullableCoder.of(StringUtf8Coder.of()); - private static final Coder timePartitioningCoder = NullableCoder.of(StringUtf8Coder.of()); public static TableDestinationCoder of() { return INSTANCE; @@ -46,20 +45,13 @@ public void encode(TableDestination value, OutputStream outStream) } tableSpecCoder.encode(value.getTableSpec(), outStream); tableDescriptionCoder.encode(value.getTableDescription(), outStream); - timePartitioningCoder.encode(value.getJsonTimePartitioning(), outStream); } @Override public TableDestination decode(InputStream inStream) throws IOException { String tableSpec = tableSpecCoder.decode(inStream); String tableDescription = tableDescriptionCoder.decode(inStream); - String jsonTimePartitioning = null; - try { - jsonTimePartitioning = timePartitioningCoder.decode(inStream); - } catch (IOException e) { - // This implies we're decoding old state that did not contain TimePartitioning. Continue. - } - return new TableDestination(tableSpec, tableDescription, jsonTimePartitioning); + return new TableDestination(tableSpec, tableDescription); } @Override diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoderV2.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoderV2.java new file mode 100644 index 000000000000..5bdab0d909b5 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoderV2.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.io.gcp.bigquery; + + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import org.apache.beam.sdk.coders.AtomicCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.NullableCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; + +/** + * A {@link Coder} for {@link TableDestination} that includes time partitioning information. This + * is a new coder (instead of extending the old {@link TableDestinationCoder}) for compatibility + * reasons. The old coder is kept around for the same compatibility reasons. + */ +public class TableDestinationCoderV2 extends AtomicCoder { + private static final TableDestinationCoderV2 INSTANCE = new TableDestinationCoderV2(); + private static final Coder timePartitioningCoder = NullableCoder.of(StringUtf8Coder.of()); + + public static TableDestinationCoderV2 of() { + return INSTANCE; + } + + @Override + public void encode(TableDestination value, OutputStream outStream) throws IOException { + TableDestinationCoder.of().encode(value, outStream); + timePartitioningCoder.encode(value.getJsonTimePartitioning(), outStream); + } + + @Override + public TableDestination decode(InputStream inStream) throws IOException { + TableDestination destination = TableDestinationCoder.of().decode(inStream); + String jsonTimePartitioning = timePartitioningCoder.decode(inStream); + return new TableDestination( + destination.getTableSpec(), destination.getTableDescription(), jsonTimePartitioning); + } + + @Override + public void verifyDeterministic() throws NonDeterministicException {} +}