Skip to content

Commit

Permalink
[BEAM-2879] Support writing data to BigQuery via avro
Browse files Browse the repository at this point in the history
  • Loading branch information
steve committed Nov 8, 2019
1 parent bb6f9ed commit ad3fdf2
Show file tree
Hide file tree
Showing 17 changed files with 731 additions and 144 deletions.
Expand Up @@ -25,9 +25,12 @@
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.TableId;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.UUID;
import java.util.function.Function;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.Read;
Expand Down Expand Up @@ -59,15 +62,15 @@
* <p>Usage:
*
* <pre>
* ./gradlew integrationTest -p sdks/java/io/gcp/bigquery -DintegrationTestPipelineOptions='[
* "--testBigQueryDataset=test-dataset",
* "--testBigQueryTable=test-table",
* "--metricsBigQueryDataset=metrics-dataset",
* "--metricsBigQueryTable=metrics-table",
* "--writeMethod=FILE_LOADS",
* "--sourceOptions={"numRecords":"1000", "keySize":1, valueSize:"1024"}
* }"]'
* --tests org.apache.beam.sdk.io.gcp.bigQuery.BigQueryIOIT
* ./gradlew integrationTest -p sdks/java/io/bigquery-io-perf-tests -DintegrationTestPipelineOptions='[ \
* "--testBigQueryDataset=test_dataset", \
* "--testBigQueryTable=test_table", \
* "--metricsBigQueryDataset=metrics_dataset", \
* "--metricsBigQueryTable=metrics_table", \
* "--writeMethod=FILE_LOADS", \
* "--sourceOptions={\"numRecords\":\"1000\", \"keySizeBytes\":\"1\", \"valueSizeBytes\":\"1024\"}" \
* ]' \
* --tests org.apache.beam.sdk.bigqueryioperftests.BigQueryIOIT \
* -DintegrationTestRunner=direct
* </pre>
*/
Expand All @@ -78,6 +81,7 @@ public class BigQueryIOIT {
private static final String TEST_TIMESTAMP = Timestamp.now().toString();
private static final String READ_TIME_METRIC_NAME = "read_time";
private static final String WRITE_TIME_METRIC_NAME = "write_time";
private static final String AVRO_WRITE_TIME_METRIC_NAME = "avro_write_time";
private static String metricsBigQueryTable;
private static String metricsBigQueryDataset;
private static String testBigQueryDataset;
Expand Down Expand Up @@ -113,11 +117,38 @@ public static void tearDown() {

@Test
public void testWriteThenRead() {
testWrite();
testJsonWrite();
testAvroWrite();
testRead();
}

private void testWrite() {
private void testJsonWrite() {
BigQueryIO.Write<byte[]> writeIO =
BigQueryIO.<byte[]>write()
.withFormatFunction(
input -> {
TableRow tableRow = new TableRow();
tableRow.set("data", input);
return tableRow;
});
testWrite(writeIO, WRITE_TIME_METRIC_NAME);
}

private void testAvroWrite() {
BigQueryIO.Write<byte[]> writeIO =
BigQueryIO.<byte[]>write()
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
.withAvroFormatFunction(
writeRequest -> {
byte[] data = writeRequest.getElement();
GenericRecord record = new GenericData.Record(writeRequest.getSchema());
record.put("data", ByteBuffer.wrap(data));
return record;
});
testWrite(writeIO, AVRO_WRITE_TIME_METRIC_NAME);
}

private void testWrite(BigQueryIO.Write<byte[]> writeIO, String metricName) {
Pipeline pipeline = Pipeline.create(options);

BigQueryIO.Write.Method method = BigQueryIO.Write.Method.valueOf(options.getWriteMethod());
Expand All @@ -127,14 +158,8 @@ private void testWrite() {
.apply("Map records", ParDo.of(new MapKVToV()))
.apply(
"Write to BQ",
BigQueryIO.<byte[]>write()
writeIO
.to(tableQualifier)
.withFormatFunction(
input -> {
TableRow tableRow = new TableRow();
tableRow.set("data", input);
return tableRow;
})
.withCustomGcsTempLocation(ValueProvider.StaticValueProvider.of(tempRoot))
.withMethod(method)
.withSchema(
Expand All @@ -145,7 +170,7 @@ private void testWrite() {

PipelineResult pipelineResult = pipeline.run();
pipelineResult.waitUntilFinish();
extractAndPublishTime(pipelineResult, WRITE_TIME_METRIC_NAME);
extractAndPublishTime(pipelineResult, metricName);
}

private void testRead() {
Expand Down
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.gcp.bigquery;

import java.io.IOException;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.util.MimeTypes;

class AvroRowWriter<T> extends BigQueryRowWriter<T> {
private final DataFileWriter<GenericRecord> writer;
private final Schema schema;
private final SerializableFunction<AvroWriteRequest<T>, GenericRecord> toAvroRecord;

AvroRowWriter(
String basename,
Schema schema,
SerializableFunction<AvroWriteRequest<T>, GenericRecord> toAvroRecord)
throws Exception {
super(basename, MimeTypes.BINARY);

this.schema = schema;
this.toAvroRecord = toAvroRecord;
this.writer =
new DataFileWriter<GenericRecord>(new GenericDatumWriter<>())
.create(schema, getOutputStream());
}

@Override
public void write(T element) throws IOException {
AvroWriteRequest<T> writeRequest = new AvroWriteRequest<>(element, schema);
writer.append(toAvroRecord.apply(writeRequest));
}

public Schema getSchema() {
return this.schema;
}

@Override
public void close() throws IOException {
writer.close();
super.close();
}
}
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.gcp.bigquery;

import org.apache.avro.Schema;

public class AvroWriteRequest<T> {
private final T element;
private final Schema schema;

AvroWriteRequest(T element, Schema schema) {
this.element = element;
this.schema = schema;
}

public T getElement() {
return element;
}

public Schema getSchema() {
return schema;
}
}
Expand Up @@ -47,7 +47,6 @@
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.WithKeys;
Expand Down Expand Up @@ -131,7 +130,7 @@ class BatchLoads<DestinationT, ElementT>
private ValueProvider<String> customGcsTempLocation;
private ValueProvider<String> loadJobProjectId;
private final Coder<ElementT> elementCoder;
private final SerializableFunction<ElementT, TableRow> toRowFunction;
private final RowWriterFactory<ElementT, DestinationT> rowWriterFactory;
private String kmsKey;

// The maximum number of times to retry failed load or copy jobs.
Expand All @@ -147,7 +146,7 @@ class BatchLoads<DestinationT, ElementT>
@Nullable ValueProvider<String> loadJobProjectId,
boolean ignoreUnknownValues,
Coder<ElementT> elementCoder,
SerializableFunction<ElementT, TableRow> toRowFunction,
RowWriterFactory<ElementT, DestinationT> rowWriterFactory,
@Nullable String kmsKey) {
bigQueryServices = new BigQueryServicesImpl();
this.writeDisposition = writeDisposition;
Expand All @@ -165,8 +164,8 @@ class BatchLoads<DestinationT, ElementT>
this.loadJobProjectId = loadJobProjectId;
this.ignoreUnknownValues = ignoreUnknownValues;
this.elementCoder = elementCoder;
this.toRowFunction = toRowFunction;
this.kmsKey = kmsKey;
this.rowWriterFactory = rowWriterFactory;
}

void setTestServices(BigQueryServices bigQueryServices) {
Expand Down Expand Up @@ -305,7 +304,8 @@ private WriteResult expandTriggered(PCollection<KV<DestinationT, ElementT>> inpu
maxFilesPerPartition,
maxBytesPerPartition,
multiPartitionsTag,
singlePartitionTag))
singlePartitionTag,
rowWriterFactory))
.withSideInputs(tempFilePrefixView)
.withOutputTags(multiPartitionsTag, TupleTagList.of(singlePartitionTag)));
PCollection<KV<TableDestination, String>> tempTables =
Expand Down Expand Up @@ -375,7 +375,8 @@ public WriteResult expandUntriggered(PCollection<KV<DestinationT, ElementT>> inp
maxFilesPerPartition,
maxBytesPerPartition,
multiPartitionsTag,
singlePartitionTag))
singlePartitionTag,
rowWriterFactory))
.withSideInputs(tempFilePrefixView)
.withOutputTags(multiPartitionsTag, TupleTagList.of(singlePartitionTag)));
PCollection<KV<TableDestination, String>> tempTables =
Expand Down Expand Up @@ -466,7 +467,7 @@ PCollection<WriteBundlesToFiles.Result<DestinationT>> writeDynamicallyShardedFil
unwrittedRecordsTag,
maxNumWritersPerBundle,
maxFileSize,
toRowFunction))
rowWriterFactory))
.withSideInputs(tempFilePrefix)
.withOutputTags(writtenFilesTag, TupleTagList.of(unwrittedRecordsTag)));
PCollection<WriteBundlesToFiles.Result<DestinationT>> writtenFiles =
Expand Down Expand Up @@ -535,7 +536,7 @@ private PCollection<Result<DestinationT>> writeShardedRecords(
"WriteGroupedRecords",
ParDo.of(
new WriteGroupedRecordsToFiles<DestinationT, ElementT>(
tempFilePrefix, maxFileSize, toRowFunction))
tempFilePrefix, maxFileSize, rowWriterFactory))
.withSideInputs(tempFilePrefix))
.setCoder(WriteBundlesToFiles.ResultCoder.of(destinationCoder));
}
Expand Down Expand Up @@ -585,7 +586,8 @@ private PCollection<KV<TableDestination, String>> writeTempTables(
loadJobProjectId,
maxRetryJobs,
ignoreUnknownValues,
kmsKey));
kmsKey,
rowWriterFactory.getSourceFormat()));
}

// In the case where the files fit into a single load job, there's no need to write temporary
Expand Down Expand Up @@ -618,7 +620,8 @@ void writeSinglePartition(
loadJobProjectId,
maxRetryJobs,
ignoreUnknownValues,
kmsKey));
kmsKey,
rowWriterFactory.getSourceFormat()));
}

private WriteResult writeResult(Pipeline p) {
Expand Down
Expand Up @@ -360,14 +360,20 @@ static Schema toGenericAvroSchema(String schemaName, List<TableFieldSchema> fiel
}
return Schema.createRecord(
schemaName,
"org.apache.beam.sdk.io.gcp.bigquery",
"Translated Avro Schema for " + schemaName,
"org.apache.beam.sdk.io.gcp.bigquery",
false,
avroFields);
}

private static Field convertField(TableFieldSchema bigQueryField) {
Type avroType = BIG_QUERY_TO_AVRO_TYPES.get(bigQueryField.getType()).iterator().next();
ImmutableCollection<Type> avroTypes = BIG_QUERY_TO_AVRO_TYPES.get(bigQueryField.getType());
if (avroTypes.isEmpty()) {
throw new IllegalArgumentException(
"Unable to map BigQuery field type " + bigQueryField.getType() + " to avro type.");
}

Type avroType = avroTypes.iterator().next();
Schema elementSchema;
if (avroType == Type.RECORD) {
elementSchema = toGenericAvroSchema(bigQueryField.getName(), bigQueryField.getFields());
Expand Down

0 comments on commit ad3fdf2

Please sign in to comment.