diff --git a/sdks/java/io/bigquery-io-perf-tests/src/test/java/org/apache/beam/sdk/bigqueryioperftests/BigQueryIOIT.java b/sdks/java/io/bigquery-io-perf-tests/src/test/java/org/apache/beam/sdk/bigqueryioperftests/BigQueryIOIT.java index 79472bb3ba53c..17d01538a679b 100644 --- a/sdks/java/io/bigquery-io-perf-tests/src/test/java/org/apache/beam/sdk/bigqueryioperftests/BigQueryIOIT.java +++ b/sdks/java/io/bigquery-io-perf-tests/src/test/java/org/apache/beam/sdk/bigqueryioperftests/BigQueryIOIT.java @@ -25,9 +25,12 @@ import com.google.cloud.bigquery.BigQueryOptions; import com.google.cloud.bigquery.TableId; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Collections; import java.util.UUID; import java.util.function.Function; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.io.Read; @@ -59,15 +62,15 @@ *

Usage: * *

- *  ./gradlew integrationTest -p sdks/java/io/gcp/bigquery -DintegrationTestPipelineOptions='[
- *  "--testBigQueryDataset=test-dataset",
- *  "--testBigQueryTable=test-table",
- *  "--metricsBigQueryDataset=metrics-dataset",
- *  "--metricsBigQueryTable=metrics-table",
- *  "--writeMethod=FILE_LOADS",
- *  "--sourceOptions={"numRecords":"1000", "keySize":1, valueSize:"1024"}
- *  }"]'
- *  --tests org.apache.beam.sdk.io.gcp.bigQuery.BigQueryIOIT
+ *  ./gradlew integrationTest -p sdks/java/io/bigquery-io-perf-tests -DintegrationTestPipelineOptions='[ \
+ *    "--testBigQueryDataset=test_dataset", \
+ *    "--testBigQueryTable=test_table", \
+ *    "--metricsBigQueryDataset=metrics_dataset", \
+ *    "--metricsBigQueryTable=metrics_table", \
+ *    "--writeMethod=FILE_LOADS", \
+ *    "--sourceOptions={\"numRecords\":\"1000\", \"keySizeBytes\":\"1\", \"valueSizeBytes\":\"1024\"}" \
+ *    ]' \
+ *  --tests org.apache.beam.sdk.bigqueryioperftests.BigQueryIOIT \
  *  -DintegrationTestRunner=direct
  * 
*/ @@ -78,6 +81,7 @@ public class BigQueryIOIT { private static final String TEST_TIMESTAMP = Timestamp.now().toString(); private static final String READ_TIME_METRIC_NAME = "read_time"; private static final String WRITE_TIME_METRIC_NAME = "write_time"; + private static final String AVRO_WRITE_TIME_METRIC_NAME = "avro_write_time"; private static String metricsBigQueryTable; private static String metricsBigQueryDataset; private static String testBigQueryDataset; @@ -113,11 +117,38 @@ public static void tearDown() { @Test public void testWriteThenRead() { - testWrite(); + testJsonWrite(); + testAvroWrite(); testRead(); } - private void testWrite() { + private void testJsonWrite() { + BigQueryIO.Write writeIO = + BigQueryIO.write() + .withFormatFunction( + input -> { + TableRow tableRow = new TableRow(); + tableRow.set("data", input); + return tableRow; + }); + testWrite(writeIO, WRITE_TIME_METRIC_NAME); + } + + private void testAvroWrite() { + BigQueryIO.Write writeIO = + BigQueryIO.write() + .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE) + .withAvroFormatFunction( + writeRequest -> { + byte[] data = writeRequest.getElement(); + GenericRecord record = new GenericData.Record(writeRequest.getSchema()); + record.put("data", ByteBuffer.wrap(data)); + return record; + }); + testWrite(writeIO, AVRO_WRITE_TIME_METRIC_NAME); + } + + private void testWrite(BigQueryIO.Write writeIO, String metricName) { Pipeline pipeline = Pipeline.create(options); BigQueryIO.Write.Method method = BigQueryIO.Write.Method.valueOf(options.getWriteMethod()); @@ -127,14 +158,8 @@ private void testWrite() { .apply("Map records", ParDo.of(new MapKVToV())) .apply( "Write to BQ", - BigQueryIO.write() + writeIO .to(tableQualifier) - .withFormatFunction( - input -> { - TableRow tableRow = new TableRow(); - tableRow.set("data", input); - return tableRow; - }) .withCustomGcsTempLocation(ValueProvider.StaticValueProvider.of(tempRoot)) .withMethod(method) .withSchema( @@ -145,7 +170,7 @@ private void testWrite() { PipelineResult pipelineResult = pipeline.run(); pipelineResult.waitUntilFinish(); - extractAndPublishTime(pipelineResult, WRITE_TIME_METRIC_NAME); + extractAndPublishTime(pipelineResult, metricName); } private void testRead() { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroRowWriter.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroRowWriter.java new file mode 100644 index 0000000000000..a0509a6ab030b --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroRowWriter.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigquery; + +import java.io.IOException; +import org.apache.avro.Schema; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.util.MimeTypes; + +class AvroRowWriter extends BigQueryRowWriter { + private final DataFileWriter writer; + private final Schema schema; + private final SerializableFunction, GenericRecord> toAvroRecord; + + AvroRowWriter( + String basename, + Schema schema, + SerializableFunction, GenericRecord> toAvroRecord) + throws Exception { + super(basename, MimeTypes.BINARY); + + this.schema = schema; + this.toAvroRecord = toAvroRecord; + this.writer = + new DataFileWriter(new GenericDatumWriter<>()) + .create(schema, getOutputStream()); + } + + @Override + public void write(T element) throws IOException { + AvroWriteRequest writeRequest = new AvroWriteRequest<>(element, schema); + writer.append(toAvroRecord.apply(writeRequest)); + } + + public Schema getSchema() { + return this.schema; + } + + @Override + public void close() throws IOException { + writer.close(); + super.close(); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroWriteRequest.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroWriteRequest.java new file mode 100644 index 0000000000000..bea79c6c76fe3 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroWriteRequest.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigquery; + +import org.apache.avro.Schema; + +public class AvroWriteRequest { + private final T element; + private final Schema schema; + + AvroWriteRequest(T element, Schema schema) { + this.element = element; + this.schema = schema; + } + + public T getElement() { + return element; + } + + public Schema getSchema() { + return schema; + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java index 0616c405ba04f..23c81c5337ab2 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java @@ -47,7 +47,6 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Reshuffle; -import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.Values; import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.WithKeys; @@ -131,7 +130,7 @@ class BatchLoads private ValueProvider customGcsTempLocation; private ValueProvider loadJobProjectId; private final Coder elementCoder; - private final SerializableFunction toRowFunction; + private final RowWriterFactory rowWriterFactory; private String kmsKey; // The maximum number of times to retry failed load or copy jobs. @@ -147,7 +146,7 @@ class BatchLoads @Nullable ValueProvider loadJobProjectId, boolean ignoreUnknownValues, Coder elementCoder, - SerializableFunction toRowFunction, + RowWriterFactory rowWriterFactory, @Nullable String kmsKey) { bigQueryServices = new BigQueryServicesImpl(); this.writeDisposition = writeDisposition; @@ -165,8 +164,8 @@ class BatchLoads this.loadJobProjectId = loadJobProjectId; this.ignoreUnknownValues = ignoreUnknownValues; this.elementCoder = elementCoder; - this.toRowFunction = toRowFunction; this.kmsKey = kmsKey; + this.rowWriterFactory = rowWriterFactory; } void setTestServices(BigQueryServices bigQueryServices) { @@ -305,7 +304,8 @@ private WriteResult expandTriggered(PCollection> inpu maxFilesPerPartition, maxBytesPerPartition, multiPartitionsTag, - singlePartitionTag)) + singlePartitionTag, + rowWriterFactory)) .withSideInputs(tempFilePrefixView) .withOutputTags(multiPartitionsTag, TupleTagList.of(singlePartitionTag))); PCollection> tempTables = @@ -375,7 +375,8 @@ public WriteResult expandUntriggered(PCollection> inp maxFilesPerPartition, maxBytesPerPartition, multiPartitionsTag, - singlePartitionTag)) + singlePartitionTag, + rowWriterFactory)) .withSideInputs(tempFilePrefixView) .withOutputTags(multiPartitionsTag, TupleTagList.of(singlePartitionTag))); PCollection> tempTables = @@ -466,7 +467,7 @@ PCollection> writeDynamicallyShardedFil unwrittedRecordsTag, maxNumWritersPerBundle, maxFileSize, - toRowFunction)) + rowWriterFactory)) .withSideInputs(tempFilePrefix) .withOutputTags(writtenFilesTag, TupleTagList.of(unwrittedRecordsTag))); PCollection> writtenFiles = @@ -535,7 +536,7 @@ private PCollection> writeShardedRecords( "WriteGroupedRecords", ParDo.of( new WriteGroupedRecordsToFiles( - tempFilePrefix, maxFileSize, toRowFunction)) + tempFilePrefix, maxFileSize, rowWriterFactory)) .withSideInputs(tempFilePrefix)) .setCoder(WriteBundlesToFiles.ResultCoder.of(destinationCoder)); } @@ -585,7 +586,8 @@ private PCollection> writeTempTables( loadJobProjectId, maxRetryJobs, ignoreUnknownValues, - kmsKey)); + kmsKey, + rowWriterFactory.getSourceFormat())); } // In the case where the files fit into a single load job, there's no need to write temporary @@ -618,7 +620,8 @@ void writeSinglePartition( loadJobProjectId, maxRetryJobs, ignoreUnknownValues, - kmsKey)); + kmsKey, + rowWriterFactory.getSourceFormat())); } private WriteResult writeResult(Pipeline p) { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java index d425a964b5d38..382705f119a86 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java @@ -360,14 +360,20 @@ static Schema toGenericAvroSchema(String schemaName, List fiel } return Schema.createRecord( schemaName, - "org.apache.beam.sdk.io.gcp.bigquery", "Translated Avro Schema for " + schemaName, + "org.apache.beam.sdk.io.gcp.bigquery", false, avroFields); } private static Field convertField(TableFieldSchema bigQueryField) { - Type avroType = BIG_QUERY_TO_AVRO_TYPES.get(bigQueryField.getType()).iterator().next(); + ImmutableCollection avroTypes = BIG_QUERY_TO_AVRO_TYPES.get(bigQueryField.getType()); + if (avroTypes.isEmpty()) { + throw new IllegalArgumentException( + "Unable to map BigQuery field type " + bigQueryField.getType() + " to avro type."); + } + + Type avroType = avroTypes.iterator().next(); Schema elementSchema; if (avroType == Type.RECORD) { elementSchema = toGenericAvroSchema(bigQueryField.getName(), bigQueryField.getFields()); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index f17c3900e8daa..5987a29d36aec 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -276,9 +276,20 @@ *

To write to a BigQuery table, apply a {@link BigQueryIO.Write} transformation. This consumes a * {@link PCollection} of a user-defined type when using {@link BigQueryIO#write()} (recommended), * or a {@link PCollection} of {@link TableRow TableRows} as input when using {@link - * BigQueryIO#writeTableRows()} (not recommended). When using a user-defined type, a function must - * be provided to turn this type into a {@link TableRow} using {@link - * BigQueryIO.Write#withFormatFunction(SerializableFunction)}. + * BigQueryIO#writeTableRows()} (not recommended). When using a user-defined type, one of the + * following must be provided. + * + *

    + *
  • {@link BigQueryIO.Write#withAvroFormatFunction(SerializableFunction)} (recommended) to + * write data using avro records. + *
  • {@link BigQueryIO.Write#withFormatFunction(SerializableFunction)} to write data as json + * encoded {@link TableRow TableRows}. + *
+ * + * If {@link BigQueryIO.Write#withAvroFormatFunction(SerializableFunction)} is used, the table + * schema MUST be specified using one of the {@link Write#withJsonSchema(String)}, {@link + * Write#withJsonSchema(ValueProvider)}, {@link Write#withSchemaFromView(PCollectionView)} methods, + * or {@link Write#to(DynamicDestinations)}. * *
{@code
  * class Quote {
@@ -465,6 +476,15 @@ public class BigQueryIO {
    */
   static final SerializableFunction IDENTITY_FORMATTER = input -> input;
 
+  private static final SerializableFunction
+      DEFAULT_AVRO_SCHEMA_FACTORY =
+          new SerializableFunction() {
+            @Override
+            public org.apache.avro.Schema apply(TableSchema input) {
+              return BigQueryAvroUtils.toGenericAvroSchema("root", input.getFields());
+            }
+          };
+
   /**
    * @deprecated Use {@link #read(SerializableFunction)} or {@link #readTableRows} instead. {@link
    *     #readTableRows()} does exactly the same as {@link #read}, however {@link
@@ -1662,6 +1682,12 @@ public enum Method {
     @Nullable
     abstract SerializableFunction getFormatFunction();
 
+    @Nullable
+    abstract SerializableFunction, GenericRecord> getAvroFormatFunction();
+
+    @Nullable
+    abstract SerializableFunction getAvroSchemaFactory();
+
     @Nullable
     abstract DynamicDestinations getDynamicDestinations();
 
@@ -1738,6 +1764,12 @@ abstract Builder setTableFunction(
 
       abstract Builder setFormatFunction(SerializableFunction formatFunction);
 
+      abstract Builder setAvroFormatFunction(
+          SerializableFunction, GenericRecord> avroFormatFunction);
+
+      abstract Builder setAvroSchemaFactory(
+          SerializableFunction avroSchemaFactory);
+
       abstract Builder setDynamicDestinations(DynamicDestinations dynamicDestinations);
 
       abstract Builder setSchemaFromView(PCollectionView> view);
@@ -1910,6 +1942,27 @@ public Write withFormatFunction(SerializableFunction formatFunct
       return toBuilder().setFormatFunction(formatFunction).build();
     }
 
+    /**
+     * Formats the user's type into a {@link GenericRecord} to be written to BigQuery.
+     *
+     * 

This is mutually exclusive with {@link #withFormatFunction}, only one may be set. + */ + public Write withAvroFormatFunction( + SerializableFunction, GenericRecord> avroFormatFunction) { + return toBuilder().setAvroFormatFunction(avroFormatFunction).setOptimizeWrites(true).build(); + } + + /** + * Uses the specified function to convert a {@link TableSchema} to a {@link + * org.apache.avro.Schema}. + * + *

If not specified, the TableSchema will automatically be converted to an avro schema. + */ + public Write withAvroSchemaFactory( + SerializableFunction avroSchemaFactory) { + return toBuilder().setAvroSchemaFactory(avroSchemaFactory).build(); + } + /** * Uses the specified schema for rows to be written. * @@ -2280,6 +2333,16 @@ public WriteResult expand(PCollection input) { input.isBounded(), method); } + + if (method != Method.FILE_LOADS) { + // we only support writing avro for FILE_LOADS + checkArgument( + getAvroFormatFunction() == null, + "Writing avro formatted data is only supported for FILE_LOADS, however " + + "the method was %s", + method); + } + if (getJsonTimePartitioning() != null) { checkArgument( getDynamicDestinations() == null, @@ -2336,12 +2399,26 @@ private WriteResult expandTyped( PCollection input, DynamicDestinations dynamicDestinations) { boolean optimizeWrites = getOptimizeWrites(); SerializableFunction formatFunction = getFormatFunction(); + SerializableFunction, GenericRecord> avroFormatFunction = + getAvroFormatFunction(); + + boolean hasSchema = + getJsonSchema() != null + || getDynamicDestinations() != null + || getSchemaFromView() != null; + if (getUseBeamSchema()) { checkArgument(input.hasSchema()); optimizeWrites = true; + + checkArgument( + avroFormatFunction == null, + "avroFormatFunction is unsupported when using Beam schemas."); + if (formatFunction == null) { // If no format function set, then we will automatically convert the input type to a // TableRow. + // TODO: it would be trivial to convert to avro records here instead. formatFunction = BigQueryUtils.toTableRow(input.getToRowFunction()); } // Infer the TableSchema from the input Beam schema. @@ -2353,19 +2430,10 @@ private WriteResult expandTyped( } else { // Require a schema if creating one or more tables. checkArgument( - getCreateDisposition() != CreateDisposition.CREATE_IF_NEEDED - || getJsonSchema() != null - || getDynamicDestinations() != null - || getSchemaFromView() != null, + getCreateDisposition() != CreateDisposition.CREATE_IF_NEEDED || hasSchema, "CreateDisposition is CREATE_IF_NEEDED, however no schema was provided."); } - checkArgument( - formatFunction != null, - "A function must be provided to convert type into a TableRow. " - + "use BigQueryIO.Write.withFormatFunction to provide a formatting function." - + "A format function is not required if Beam schemas are used."); - Coder destinationCoder = null; try { destinationCoder = @@ -2377,6 +2445,34 @@ private WriteResult expandTyped( Method method = resolveMethod(input); if (optimizeWrites) { + RowWriterFactory rowWriterFactory; + if (avroFormatFunction != null) { + checkArgument( + formatFunction == null, + "Only one of withFormatFunction or withAvroFormatFunction maybe set, not both."); + + SerializableFunction avroSchemaFactory = + getAvroSchemaFactory(); + if (avroSchemaFactory == null) { + checkArgument( + hasSchema, + "A schema must be provided if an avroFormatFunction " + + "is set but no avroSchemaFactory is defined."); + avroSchemaFactory = DEFAULT_AVRO_SCHEMA_FACTORY; + } + rowWriterFactory = + RowWriterFactory.avroRecords( + avroFormatFunction, avroSchemaFactory, dynamicDestinations); + } else if (formatFunction != null) { + rowWriterFactory = RowWriterFactory.tableRows(formatFunction); + } else { + throw new IllegalArgumentException( + "A function must be provided to convert the input type into a TableRow or " + + "GenericRecord. Use BigQueryIO.Write.withFormatFunction or " + + "BigQueryIO.Write.withAvroFormatFunction to provide a formatting function. " + + "A format function is not required if Beam schemas are used."); + } + PCollection> rowsWithDestination = input .apply( @@ -2388,19 +2484,31 @@ private WriteResult expandTyped( input.getCoder(), destinationCoder, dynamicDestinations, - formatFunction, + rowWriterFactory, method); } else { + checkArgument(avroFormatFunction == null); + checkArgument( + formatFunction != null, + "A function must be provided to convert the input type into a TableRow or " + + "GenericRecord. Use BigQueryIO.Write.withFormatFunction or " + + "BigQueryIO.Write.withAvroFormatFunction to provide a formatting function. " + + "A format function is not required if Beam schemas are used."); + PCollection> rowsWithDestination = input .apply("PrepareWrite", new PrepareWrite<>(dynamicDestinations, formatFunction)) .setCoder(KvCoder.of(destinationCoder, TableRowJsonCoder.of())); + + RowWriterFactory rowWriterFactory = + RowWriterFactory.tableRows(SerializableFunctions.identity()); + return continueExpandTyped( rowsWithDestination, TableRowJsonCoder.of(), destinationCoder, dynamicDestinations, - SerializableFunctions.identity(), + rowWriterFactory, method); } } @@ -2410,7 +2518,7 @@ private WriteResult continueExpandTyped( Coder elementCoder, Coder destinationCoder, DynamicDestinations dynamicDestinations, - SerializableFunction toRowFunction, + RowWriterFactory rowWriterFactory, Method method) { if (method == Method.STREAMING_INSERTS) { checkArgument( @@ -2419,9 +2527,19 @@ private WriteResult continueExpandTyped( InsertRetryPolicy retryPolicy = MoreObjects.firstNonNull(getFailedInsertRetryPolicy(), InsertRetryPolicy.alwaysRetry()); + checkArgument( + rowWriterFactory.getOutputType() == RowWriterFactory.OutputType.JsonTableRow, + "Avro output is not supported when method == STREAMING_INSERTS"); + + RowWriterFactory.TableRowWriterFactory tableRowWriterFactory = + (RowWriterFactory.TableRowWriterFactory) rowWriterFactory; + StreamingInserts streamingInserts = new StreamingInserts<>( - getCreateDisposition(), dynamicDestinations, elementCoder, toRowFunction) + getCreateDisposition(), + dynamicDestinations, + elementCoder, + tableRowWriterFactory.getToRowFn()) .withInsertRetryPolicy(retryPolicy) .withTestServices(getBigQueryServices()) .withExtendedErrorInfo(getExtendedErrorInfo()) @@ -2445,7 +2563,7 @@ private WriteResult continueExpandTyped( getLoadJobProjectId(), getIgnoreUnknownValues(), elementCoder, - toRowFunction, + rowWriterFactory, getKmsKey()); batchLoads.setTestServices(getBigQueryServices()); if (getMaxFilesPerBundle() != null) { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryRowWriter.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryRowWriter.java new file mode 100644 index 0000000000000..f96f05d62f753 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryRowWriter.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigquery; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState; + +import com.google.api.services.bigquery.model.TableRow; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.channels.Channels; +import java.nio.channels.WritableByteChannel; +import java.util.UUID; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.io.fs.ResourceId; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.CountingOutputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Writes {@link TableRow} objects out to a file. Used when doing batch load jobs into BigQuery. */ +abstract class BigQueryRowWriter implements AutoCloseable { + private static final Logger LOG = LoggerFactory.getLogger(BigQueryRowWriter.class); + + private ResourceId resourceId; + private WritableByteChannel channel; + private CountingOutputStream out; + + private boolean isClosed = false; + + static final class Result { + final ResourceId resourceId; + final long byteSize; + + public Result(ResourceId resourceId, long byteSize) { + this.resourceId = resourceId; + this.byteSize = byteSize; + } + } + + BigQueryRowWriter(String basename, String mimeType) throws Exception { + String uId = UUID.randomUUID().toString(); + resourceId = FileSystems.matchNewResource(basename + uId, false); + LOG.info("Opening {} to {}.", this.getClass().getSimpleName(), resourceId); + channel = FileSystems.create(resourceId, mimeType); + out = new CountingOutputStream(Channels.newOutputStream(channel)); + } + + protected OutputStream getOutputStream() { + return out; + } + + abstract void write(T value) throws Exception; + + long getByteSize() { + return out.getCount(); + } + + @Override + public void close() throws IOException { + checkState(!isClosed, "Already closed"); + isClosed = true; + channel.close(); + } + + Result getResult() { + checkState(isClosed, "Not yet closed"); + return new Result(resourceId, out.getCount()); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java index 04a92ec0d8ead..e3347612489dd 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java @@ -388,6 +388,11 @@ public static Row toBeamRow(GenericRecord record, Schema schema, ConversionOptio return Row.withSchema(schema).addValues(valuesInOrder).build(); } + public static TableRow convertGenericRecordToTableRow( + GenericRecord record, TableSchema tableSchema) { + return BigQueryAvroUtils.convertGenericRecordToTableRow(record, tableSchema); + } + /** Convert a BigQuery TableRow to a Beam Row. */ public static TableRow toTableRow(Row row) { TableRow output = new TableRow(); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RowWriterFactory.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RowWriterFactory.java new file mode 100644 index 0000000000000..d8e4ea6b29cf3 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RowWriterFactory.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigquery; + +import com.google.api.services.bigquery.model.TableRow; +import com.google.api.services.bigquery.model.TableSchema; +import java.io.Serializable; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.beam.sdk.transforms.SerializableFunction; + +abstract class RowWriterFactory implements Serializable { + private RowWriterFactory() {} + + enum OutputType { + JsonTableRow, + AvroGenericRecord + } + + abstract OutputType getOutputType(); + + abstract String getSourceFormat(); + + abstract BigQueryRowWriter createRowWriter( + String tempFilePrefix, DestinationT destination) throws Exception; + + static RowWriterFactory tableRows( + SerializableFunction toRow) { + return new TableRowWriterFactory(toRow); + } + + static final class TableRowWriterFactory + extends RowWriterFactory { + + private final SerializableFunction toRow; + + private TableRowWriterFactory(SerializableFunction toRow) { + this.toRow = toRow; + } + + public SerializableFunction getToRowFn() { + return toRow; + } + + @Override + public OutputType getOutputType() { + return OutputType.JsonTableRow; + } + + @Override + public BigQueryRowWriter createRowWriter( + String tempFilePrefix, DestinationT destination) throws Exception { + return new TableRowWriter<>(tempFilePrefix, toRow); + } + + @Override + String getSourceFormat() { + return "NEWLINE_DELIMITED_JSON"; + } + } + + static RowWriterFactory avroRecords( + SerializableFunction, GenericRecord> toAvro, + SerializableFunction schemaFactory, + DynamicDestinations dynamicDestinations) { + return new AvroRowWriterFactory<>(toAvro, schemaFactory, dynamicDestinations); + } + + private static final class AvroRowWriterFactory + extends RowWriterFactory { + + private final SerializableFunction, GenericRecord> toAvro; + private final SerializableFunction schemaFactory; + private final DynamicDestinations dynamicDestinations; + + private AvroRowWriterFactory( + SerializableFunction, GenericRecord> toAvro, + SerializableFunction schemaFactory, + DynamicDestinations dynamicDestinations) { + this.toAvro = toAvro; + this.schemaFactory = schemaFactory; + this.dynamicDestinations = dynamicDestinations; + } + + @Override + OutputType getOutputType() { + return OutputType.AvroGenericRecord; + } + + @Override + BigQueryRowWriter createRowWriter(String tempFilePrefix, DestinationT destination) + throws Exception { + TableSchema tableSchema = dynamicDestinations.getSchema(destination); + Schema avroSchema = schemaFactory.apply(tableSchema); + return new AvroRowWriter<>(tempFilePrefix, avroSchema, toAvro); + } + + @Override + String getSourceFormat() { + return "AVRO"; + } + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java index b02a5ea3f12d7..6cbeb61f624f9 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java @@ -17,71 +17,29 @@ */ package org.apache.beam.sdk.io.gcp.bigquery; -import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState; - import com.google.api.services.bigquery.model.TableRow; -import java.io.IOException; -import java.nio.channels.Channels; -import java.nio.channels.WritableByteChannel; import java.nio.charset.StandardCharsets; -import java.util.UUID; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.Coder.Context; -import org.apache.beam.sdk.io.FileSystems; -import org.apache.beam.sdk.io.fs.ResourceId; +import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.util.MimeTypes; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.CountingOutputStream; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** Writes {@link TableRow} objects out to a file. Used when doing batch load jobs into BigQuery. */ -class TableRowWriter implements AutoCloseable { - private static final Logger LOG = LoggerFactory.getLogger(TableRowWriter.class); - +class TableRowWriter extends BigQueryRowWriter { private static final Coder CODER = TableRowJsonCoder.of(); private static final byte[] NEWLINE = "\n".getBytes(StandardCharsets.UTF_8); - private ResourceId resourceId; - private WritableByteChannel channel; - private CountingOutputStream out; - - private boolean isClosed = false; - static final class Result { - final ResourceId resourceId; - final long byteSize; + private final SerializableFunction toRow; - public Result(ResourceId resourceId, long byteSize) { - this.resourceId = resourceId; - this.byteSize = byteSize; - } - } - - TableRowWriter(String basename) throws Exception { - String uId = UUID.randomUUID().toString(); - resourceId = FileSystems.matchNewResource(basename + uId, false); - LOG.info("Opening TableRowWriter to {}.", resourceId); - channel = FileSystems.create(resourceId, MimeTypes.TEXT); - out = new CountingOutputStream(Channels.newOutputStream(channel)); - } - - void write(TableRow value) throws Exception { - CODER.encode(value, out, Context.OUTER); - out.write(NEWLINE); - } - - long getByteSize() { - return out.getCount(); + TableRowWriter(String basename, SerializableFunction toRow) throws Exception { + super(basename, MimeTypes.TEXT); + this.toRow = toRow; } @Override - public void close() throws IOException { - checkState(!isClosed, "Already closed"); - isClosed = true; - channel.close(); - } - - Result getResult() { - checkState(isClosed, "Not yet closed"); - return new Result(resourceId, out.getCount()); + void write(T value) throws Exception { + TableRow tableRow = toRow.apply(value); + CODER.encode(tableRow, getOutputStream(), Context.OUTER); + getOutputStream().write(NEWLINE); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java index 0d8393841f600..b6c06d94516da 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java @@ -36,7 +36,6 @@ import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.Result; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; @@ -61,13 +60,13 @@ class WriteBundlesToFiles private static final int SPILLED_RECORD_SHARDING_FACTOR = 10; // Map from tablespec to a writer for that table. - private transient Map writers; + private transient Map> writers; private transient Map writerWindows; private final PCollectionView tempFilePrefixView; private final TupleTag, ElementT>> unwrittenRecordsTag; private final int maxNumWritersPerBundle; private final long maxFileSize; - private final SerializableFunction toRowFunction; + private final RowWriterFactory rowWriterFactory; private int spilledShardNumber; /** @@ -164,12 +163,12 @@ public void verifyDeterministic() {} TupleTag, ElementT>> unwrittenRecordsTag, int maxNumWritersPerBundle, long maxFileSize, - SerializableFunction toRowFunction) { + RowWriterFactory rowWriterFactory) { this.tempFilePrefixView = tempFilePrefixView; this.unwrittenRecordsTag = unwrittenRecordsTag; this.maxNumWritersPerBundle = maxNumWritersPerBundle; this.maxFileSize = maxFileSize; - this.toRowFunction = toRowFunction; + this.rowWriterFactory = rowWriterFactory; } @StartBundle @@ -181,9 +180,10 @@ public void startBundle() { this.spilledShardNumber = ThreadLocalRandom.current().nextInt(SPILLED_RECORD_SHARDING_FACTOR); } - TableRowWriter createAndInsertWriter( + BigQueryRowWriter createAndInsertWriter( DestinationT destination, String tempFilePrefix, BoundedWindow window) throws Exception { - TableRowWriter writer = new TableRowWriter(tempFilePrefix); + BigQueryRowWriter writer = + rowWriterFactory.createRowWriter(tempFilePrefix, destination); writers.put(destination, writer); writerWindows.put(destination, window); return writer; @@ -196,7 +196,7 @@ public void processElement( String tempFilePrefix = c.sideInput(tempFilePrefixView); DestinationT destination = c.element().getKey(); - TableRowWriter writer; + BigQueryRowWriter writer; if (writers.containsKey(destination)) { writer = writers.get(destination); } else { @@ -219,13 +219,13 @@ public void processElement( if (writer.getByteSize() > maxFileSize) { // File is too big. Close it and open a new file. writer.close(); - TableRowWriter.Result result = writer.getResult(); + BigQueryRowWriter.Result result = writer.getResult(); c.output(new Result<>(result.resourceId.toString(), result.byteSize, destination)); writer = createAndInsertWriter(destination, tempFilePrefix, window); } try { - writer.write(toRowFunction.apply(element.getValue())); + writer.write(element.getValue()); } catch (Exception e) { // Discard write result and close the write. try { @@ -242,7 +242,7 @@ public void processElement( @FinishBundle public void finishBundle(FinishBundleContext c) throws Exception { List exceptionList = Lists.newArrayList(); - for (TableRowWriter writer : writers.values()) { + for (BigQueryRowWriter writer : writers.values()) { try { writer.close(); } catch (Exception e) { @@ -257,11 +257,11 @@ public void finishBundle(FinishBundleContext c) throws Exception { throw e; } - for (Map.Entry entry : writers.entrySet()) { + for (Map.Entry> entry : writers.entrySet()) { try { DestinationT destination = entry.getKey(); - TableRowWriter writer = entry.getValue(); - TableRowWriter.Result result = writer.getResult(); + BigQueryRowWriter writer = entry.getValue(); + BigQueryRowWriter.Result result = writer.getResult(); c.output( new Result<>(result.resourceId.toString(), result.byteSize, destination), writerWindows.get(destination).maxTimestamp(), diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteGroupedRecordsToFiles.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteGroupedRecordsToFiles.java index 403cb6a36fdb3..6db179bfeb8fb 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteGroupedRecordsToFiles.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteGroupedRecordsToFiles.java @@ -17,9 +17,7 @@ */ package org.apache.beam.sdk.io.gcp.bigquery; -import com.google.api.services.bigquery.model.TableRow; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.ShardedKey; @@ -36,15 +34,15 @@ class WriteGroupedRecordsToFiles private final PCollectionView tempFilePrefix; private final long maxFileSize; - private final SerializableFunction toRowFunction; + private final RowWriterFactory rowWriterFactory; WriteGroupedRecordsToFiles( PCollectionView tempFilePrefix, long maxFileSize, - SerializableFunction toRowFunction) { + RowWriterFactory rowWriterFactory) { this.tempFilePrefix = tempFilePrefix; this.maxFileSize = maxFileSize; - this.toRowFunction = toRowFunction; + this.rowWriterFactory = rowWriterFactory; } @ProcessElement @@ -53,25 +51,29 @@ public void processElement( @Element KV, Iterable> element, OutputReceiver> o) throws Exception { + String tempFilePrefix = c.sideInput(this.tempFilePrefix); - TableRowWriter writer = new TableRowWriter(tempFilePrefix); + + BigQueryRowWriter writer = + rowWriterFactory.createRowWriter(tempFilePrefix, element.getKey().getKey()); + try { for (ElementT tableRow : element.getValue()) { if (writer.getByteSize() > maxFileSize) { writer.close(); - writer = new TableRowWriter(tempFilePrefix); - TableRowWriter.Result result = writer.getResult(); + writer = rowWriterFactory.createRowWriter(tempFilePrefix, element.getKey().getKey()); + BigQueryRowWriter.Result result = writer.getResult(); o.output( new WriteBundlesToFiles.Result<>( result.resourceId.toString(), result.byteSize, c.element().getKey().getKey())); } - writer.write(toRowFunction.apply(tableRow)); + writer.write(tableRow); } } finally { writer.close(); } - TableRowWriter.Result result = writer.getResult(); + BigQueryRowWriter.Result result = writer.getResult(); o.output( new WriteBundlesToFiles.Result<>( result.resourceId.toString(), result.byteSize, c.element().getKey().getKey())); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java index 0b4482732b4f2..505af261979f5 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java @@ -42,6 +42,7 @@ class WritePartition private final PCollectionView tempFilePrefix; private final int maxNumFiles; private final long maxSizeBytes; + private final RowWriterFactory rowWriterFactory; @Nullable private TupleTag, List>> multiPartitionsTag; private TupleTag, List>> singlePartitionTag; @@ -128,7 +129,8 @@ void addPartition(PartitionData partition) { int maxNumFiles, long maxSizeBytes, TupleTag, List>> multiPartitionsTag, - TupleTag, List>> singlePartitionTag) { + TupleTag, List>> singlePartitionTag, + RowWriterFactory rowWriterFactory) { this.singletonTable = singletonTable; this.dynamicDestinations = dynamicDestinations; this.tempFilePrefix = tempFilePrefix; @@ -136,6 +138,7 @@ void addPartition(PartitionData partition) { this.maxSizeBytes = maxSizeBytes; this.multiPartitionsTag = multiPartitionsTag; this.singlePartitionTag = singlePartitionTag; + this.rowWriterFactory = rowWriterFactory; } @ProcessElement @@ -146,16 +149,16 @@ public void processElement(ProcessContext c) throws Exception { // generate an empty table of that name. if (results.isEmpty() && singletonTable) { String tempFilePrefix = c.sideInput(this.tempFilePrefix); - TableRowWriter writer = new TableRowWriter(tempFilePrefix); - writer.close(); - TableRowWriter.Result writerResult = writer.getResult(); // Return a null destination in this case - the constant DynamicDestinations class will // resolve it to the singleton output table. + DestinationT destination = dynamicDestinations.getDestination(null); + + BigQueryRowWriter writer = rowWriterFactory.createRowWriter(tempFilePrefix, destination); + writer.close(); + BigQueryRowWriter.Result writerResult = writer.getResult(); + results.add( - new Result<>( - writerResult.resourceId.toString(), - writerResult.byteSize, - dynamicDestinations.getDestination(null))); + new Result<>(writerResult.resourceId.toString(), writerResult.byteSize, destination)); } Map currentResults = Maps.newHashMap(); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java index dbe0962350023..10f368fd5cadd 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java @@ -98,6 +98,7 @@ class WriteTables private final int maxRetryJobs; private final boolean ignoreUnknownValues; @Nullable private final String kmsKey; + private final String sourceFormat; private class WriteTablesDoFn extends DoFn, List>, KV> { @@ -286,7 +287,8 @@ public WriteTables( @Nullable ValueProvider loadJobProjectId, int maxRetryJobs, boolean ignoreUnknownValues, - String kmsKey) { + String kmsKey, + String sourceFormat) { this.tempTable = tempTable; this.bqServices = bqServices; this.loadJobIdPrefixView = loadJobIdPrefixView; @@ -300,6 +302,7 @@ public WriteTables( this.maxRetryJobs = maxRetryJobs; this.ignoreUnknownValues = ignoreUnknownValues; this.kmsKey = kmsKey; + this.sourceFormat = sourceFormat; } @Override @@ -351,7 +354,7 @@ private PendingJob startLoad( .setSourceUris(gcsUris) .setWriteDisposition(writeDisposition.name()) .setCreateDisposition(createDisposition.name()) - .setSourceFormat("NEWLINE_DELIMITED_JSON") + .setSourceFormat(sourceFormat) .setIgnoreUnknownValues(ignoreUnknownValues); if (timePartitioning != null) { loadConfig.setTimePartitioning(timePartitioning); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeJobService.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeJobService.java index 9729f78562625..f0b4cd7f6a904 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeJobService.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeJobService.java @@ -43,6 +43,7 @@ import com.google.api.services.bigquery.model.TimePartitioning; import java.io.BufferedReader; import java.io.ByteArrayInputStream; +import java.io.File; import java.io.IOException; import java.io.Serializable; import java.nio.channels.Channels; @@ -55,7 +56,10 @@ import java.util.Objects; import java.util.concurrent.ThreadLocalRandom; import org.apache.avro.Schema; +import org.apache.avro.file.DataFileReader; import org.apache.avro.file.DataFileWriter; +import org.apache.avro.file.FileReader; +import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecordBuilder; @@ -351,7 +355,7 @@ private JobStatus runLoadJob(JobReference jobRef, JobConfigurationLoad load) List sourceFiles = filesForLoadJobs.get(jobRef.getProjectId(), jobRef.getJobId()); WriteDisposition writeDisposition = WriteDisposition.valueOf(load.getWriteDisposition()); CreateDisposition createDisposition = CreateDisposition.valueOf(load.getCreateDisposition()); - checkArgument("NEWLINE_DELIMITED_JSON".equals(load.getSourceFormat())); + Table existingTable = datasetService.getTable(destination); if (!validateDispositions(existingTable, createDisposition, writeDisposition)) { return new JobStatus().setState("FAILED").setErrorResult(new ErrorProto()); @@ -373,8 +377,13 @@ private JobStatus runLoadJob(JobReference jobRef, JobConfigurationLoad load) List rows = Lists.newArrayList(); for (ResourceId filename : sourceFiles) { - rows.addAll(readRows(filename.toString())); + if (load.getSourceFormat().equals("NEWLINE_DELIMITED_JSON")) { + rows.addAll(readJsonTableRows(filename.toString())); + } else if (load.getSourceFormat().equals("AVRO")) { + rows.addAll(readAvroTableRows(filename.toString(), schema)); + } } + datasetService.insertAll(destination, rows, null); FileSystems.delete(sourceFiles); return new JobStatus().setState("DONE"); @@ -453,7 +462,7 @@ private JobStatus runQueryJob(JobConfigurationQuery query) return new JobStatus().setState("DONE"); } - private List readRows(String filename) throws IOException { + private List readJsonTableRows(String filename) throws IOException { Coder coder = TableRowJsonCoder.of(); List tableRows = Lists.newArrayList(); try (BufferedReader reader = @@ -469,6 +478,19 @@ private List readRows(String filename) throws IOException { return tableRows; } + private List readAvroTableRows(String filename, TableSchema tableSchema) + throws IOException { + List tableRows = Lists.newArrayList(); + FileReader dfr = + DataFileReader.openReader(new File(filename), new GenericDatumReader<>()); + + while (dfr.hasNext()) { + GenericRecord record = dfr.next(null); + tableRows.add(BigQueryUtils.convertGenericRecordToTableRow(record, tableSchema)); + } + return tableRows; + } + private long writeRows( String tableId, List rows, TableSchema schema, String destinationPattern) throws IOException { diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java index aeeab068b91a7..506cc10192f2c 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java @@ -243,8 +243,8 @@ public void testConvertBigQuerySchemaToAvroSchema() { Schema.create(Type.NULL), Schema.createRecord( "scion", - "org.apache.beam.sdk.io.gcp.bigquery", "Translated Avro Schema for scion", + "org.apache.beam.sdk.io.gcp.bigquery", false, ImmutableList.of( new Field( @@ -259,8 +259,8 @@ public void testConvertBigQuerySchemaToAvroSchema() { Schema.createArray( Schema.createRecord( "associates", - "org.apache.beam.sdk.io.gcp.bigquery", "Translated Avro Schema for associates", + "org.apache.beam.sdk.io.gcp.bigquery", false, ImmutableList.of( new Field( diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java index cd0312d3e1e1f..aace26cb5ddae 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java @@ -43,6 +43,7 @@ import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; import com.google.api.services.bigquery.model.TimePartitioning; +import com.google.auto.value.AutoValue; import java.io.File; import java.io.IOException; import java.io.InputStream; @@ -58,8 +59,11 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.regex.Matcher; import java.util.regex.Pattern; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write; @@ -82,6 +86,7 @@ import org.apache.beam.sdk.transforms.DoFnTester; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.SerializableFunctions; import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.display.DisplayData; @@ -671,6 +676,75 @@ public void testWrite() throws Exception { p.run(); } + @AutoValue + abstract static class InputRecord implements Serializable { + + public static InputRecord create( + String strValue, long longVal, double doubleVal, Instant instantVal) { + return new AutoValue_BigQueryIOWriteTest_InputRecord( + strValue, longVal, doubleVal, instantVal); + } + + abstract String strVal(); + + abstract long longVal(); + + abstract double doubleVal(); + + abstract Instant instantVal(); + } + + private static final Coder INPUT_RECORD_CODER = + SerializableCoder.of(InputRecord.class); + + @Test + public void testWriteAvro() throws Exception { + p.apply( + Create.of( + InputRecord.create("test", 1, 1.0, Instant.parse("2019-01-01T00:00:00Z")), + InputRecord.create("test2", 2, 2.0, Instant.parse("2019-02-01T00:00:00Z"))) + .withCoder(INPUT_RECORD_CODER)) + .apply( + BigQueryIO.write() + .to("dataset-id.table-id") + .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) + .withSchema( + new TableSchema() + .setFields( + ImmutableList.of( + new TableFieldSchema().setName("strVal").setType("STRING"), + new TableFieldSchema().setName("longVal").setType("INTEGER"), + new TableFieldSchema().setName("doubleVal").setType("FLOAT"), + new TableFieldSchema().setName("instantVal").setType("TIMESTAMP")))) + .withTestServices(fakeBqServices) + .withAvroFormatFunction( + r -> { + GenericRecord rec = new GenericData.Record(r.getSchema()); + InputRecord i = r.getElement(); + rec.put("strVal", i.strVal()); + rec.put("longVal", i.longVal()); + rec.put("doubleVal", i.doubleVal()); + rec.put("instantVal", i.instantVal().getMillis() * 1000); + return rec; + }) + .withoutValidation()); + p.run(); + + assertThat( + fakeDatasetService.getAllRows("project-id", "dataset-id", "table-id"), + containsInAnyOrder( + new TableRow() + .set("strVal", "test") + .set("longVal", "1") + .set("doubleVal", 1.0D) + .set("instantVal", "2019-01-01 00:00:00 UTC"), + new TableRow() + .set("strVal", "test2") + .set("longVal", "2") + .set("doubleVal", 2.0D) + .set("instantVal", "2019-02-01 00:00:00 UTC"))); + } + @Test public void testStreamingWrite() throws Exception { p.apply( @@ -1213,6 +1287,69 @@ public void testWriteValidateFailsCreateNoSchema() { .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)); } + @Test + public void testWriteValidateFailsNoFormatFunction() { + p.enableAbandonedNodeEnforcement(false); + + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage( + "A function must be provided to convert the input type into a TableRow or GenericRecord"); + p.apply(Create.empty(INPUT_RECORD_CODER)) + .apply( + BigQueryIO.write() + .to("dataset.table") + .withSchema(new TableSchema()) + .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)); + } + + @Test + public void testWriteValidateFailsBothFormatFunctions() { + p.enableAbandonedNodeEnforcement(false); + + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage( + "Only one of withFormatFunction or withAvroFormatFunction maybe set, not both"); + p.apply(Create.empty(INPUT_RECORD_CODER)) + .apply( + BigQueryIO.write() + .to("dataset.table") + .withSchema(new TableSchema()) + .withFormatFunction(r -> new TableRow()) + .withAvroFormatFunction(r -> new GenericData.Record(r.getSchema())) + .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)); + } + + @Test + public void testWriteValidateFailsWithBeamSchemaAndAvroFormatFunction() { + p.enableAbandonedNodeEnforcement(false); + + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("avroFormatFunction is unsupported when using Beam schemas"); + p.apply(Create.of(new SchemaPojo("a", 1))) + .apply( + BigQueryIO.write() + .to("dataset.table") + .useBeamSchema() + .withAvroFormatFunction(r -> new GenericData.Record(r.getSchema())) + .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)); + } + + @Test + public void testWriteValidateFailsWithAvroFormatAndStreamingInserts() { + p.enableAbandonedNodeEnforcement(false); + + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Writing avro formatted data is only supported for FILE_LOADS"); + p.apply(Create.empty(INPUT_RECORD_CODER)) + .apply( + BigQueryIO.write() + .to("dataset.table") + .withSchema(new TableSchema()) + .withAvroFormatFunction(r -> new GenericData.Record(r.getSchema())) + .withMethod(Method.STREAMING_INSERTS) + .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)); + } + @Test public void testWritePartitionEmptyData() throws Exception { long numFiles = 0; @@ -1310,7 +1447,8 @@ private void testWritePartition( BatchLoads.DEFAULT_MAX_FILES_PER_PARTITION, BatchLoads.DEFAULT_MAX_BYTES_PER_PARTITION, multiPartitionsTag, - singlePartitionTag); + singlePartitionTag, + RowWriterFactory.tableRows(SerializableFunctions.identity())); DoFnTester< Iterable>, @@ -1393,7 +1531,8 @@ public void testWriteTables() throws Exception { testFolder.getRoot().getAbsolutePath(), String.format("files0x%08x_%05d", tempTableId.hashCode(), k)) .toString(); - TableRowWriter writer = new TableRowWriter(filename); + TableRowWriter writer = + new TableRowWriter<>(filename, SerializableFunctions.identity()); try (TableRowWriter ignored = writer) { TableRow tableRow = new TableRow().set("name", tableName); writer.write(tableRow); @@ -1429,7 +1568,8 @@ public void testWriteTables() throws Exception { null, 4, false, - null); + null, + "NEWLINE_DELIMITED_JSON"); PCollection> writeTablesOutput = writeTablesInput.apply(writeTables); @@ -1455,7 +1595,8 @@ public void testRemoveTemporaryFiles() throws Exception { List fileNames = Lists.newArrayList(); String tempFilePrefix = options.getTempLocation() + "/"; for (int i = 0; i < numFiles; ++i) { - TableRowWriter writer = new TableRowWriter(tempFilePrefix); + TableRowWriter writer = + new TableRowWriter<>(tempFilePrefix, SerializableFunctions.identity()); writer.close(); fileNames.add(writer.getResult().resourceId.toString()); }