Skip to content

Commit

Permalink
Fully general dynamic tables (including schemas) in BigQueryIO.
Browse files Browse the repository at this point in the history
  • Loading branch information
reuvenlax authored and jkff committed May 3, 2017
1 parent 17f0843 commit 35db745
Show file tree
Hide file tree
Showing 15 changed files with 1,241 additions and 576 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,24 @@
import static com.google.common.base.Preconditions.checkArgument;

import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
Expand All @@ -58,27 +56,31 @@
import org.apache.beam.sdk.values.TupleTagList;

/** PTransform that uses BigQuery batch-load jobs to write a PCollection to BigQuery. */
class BatchLoads extends PTransform<PCollection<KV<TableDestination, TableRow>>, WriteResult> {
BigQueryIO.Write<?> write;
class BatchLoads<DestinationT>
extends PTransform<PCollection<KV<DestinationT, TableRow>>, WriteResult> {
private BigQueryServices bigQueryServices;
private final WriteDisposition writeDisposition;
private final CreateDisposition createDisposition;
// Indicates that we are writing to a constant single table. If this is the case, we will create
// the table, even if there is no data in it.
private final boolean singletonTable;
private final DynamicDestinations<?, DestinationT> dynamicDestinations;
private final Coder<DestinationT> destinationCoder;

private static class ConstantSchemaFunction
implements SerializableFunction<TableDestination, TableSchema> {
private final @Nullable ValueProvider<String> jsonSchema;

ConstantSchemaFunction(ValueProvider<String> jsonSchema) {
this.jsonSchema = jsonSchema;
}

@Override
@Nullable
public TableSchema apply(TableDestination table) {
return BigQueryHelpers.fromJsonString(
jsonSchema == null ? null : jsonSchema.get(), TableSchema.class);
}
BatchLoads(WriteDisposition writeDisposition, CreateDisposition createDisposition,
boolean singletonTable,
DynamicDestinations<?, DestinationT> dynamicDestinations,
Coder<DestinationT> destinationCoder) {
bigQueryServices = new BigQueryServicesImpl();
this.writeDisposition = writeDisposition;
this.createDisposition = createDisposition;
this.singletonTable = singletonTable;
this.dynamicDestinations = dynamicDestinations;
this.destinationCoder = destinationCoder;
}

BatchLoads(BigQueryIO.Write<?> write) {
this.write = write;
void setTestServices(BigQueryServices bigQueryServices) {
this.bigQueryServices = bigQueryServices;
}

@Override
Expand All @@ -88,7 +90,7 @@ public void validate(PipelineOptions options) {
checkArgument(
!Strings.isNullOrEmpty(tempLocation),
"BigQueryIO.Write needs a GCS temp location to store temp files.");
if (write.getBigQueryServices() == null) {
if (bigQueryServices == null) {
try {
GcsPath.fromUri(tempLocation);
} catch (IllegalArgumentException e) {
Expand All @@ -102,7 +104,7 @@ public void validate(PipelineOptions options) {
}

@Override
public WriteResult expand(PCollection<KV<TableDestination, TableRow>> input) {
public WriteResult expand(PCollection<KV<DestinationT, TableRow>> input) {
Pipeline p = input.getPipeline();
BigQueryOptions options = p.getOptions().as(BigQueryOptions.class);

Expand Down Expand Up @@ -137,53 +139,56 @@ public String apply(String input) {
}))
.apply(View.<String>asSingleton());

PCollection<KV<TableDestination, TableRow>> inputInGlobalWindow =
PCollection<KV<DestinationT, TableRow>> inputInGlobalWindow =
input.apply(
"rewindowIntoGlobal",
Window.<KV<TableDestination, TableRow>>into(new GlobalWindows())
Window.<KV<DestinationT, TableRow>>into(new GlobalWindows())
.triggering(DefaultTrigger.of())
.discardingFiredPanes());
PCollectionView<Map<DestinationT, String>> schemasView =
inputInGlobalWindow.apply(new CalculateSchemas<>(dynamicDestinations));

// PCollection of filename, file byte size, and table destination.
PCollection<WriteBundlesToFiles.Result> results =
PCollection<WriteBundlesToFiles.Result<DestinationT>> results =
inputInGlobalWindow
.apply("WriteBundlesToFiles", ParDo.of(new WriteBundlesToFiles(tempFilePrefix)))
.setCoder(WriteBundlesToFiles.ResultCoder.of());
.apply("WriteBundlesToFiles", ParDo.of(
new WriteBundlesToFiles<DestinationT>(tempFilePrefix)))
.setCoder(WriteBundlesToFiles.ResultCoder.of(destinationCoder));

TupleTag<KV<ShardedKey<TableDestination>, List<String>>> multiPartitionsTag =
new TupleTag<KV<ShardedKey<TableDestination>, List<String>>>("multiPartitionsTag") {};
TupleTag<KV<ShardedKey<TableDestination>, List<String>>> singlePartitionTag =
new TupleTag<KV<ShardedKey<TableDestination>, List<String>>>("singlePartitionTag") {};
TupleTag<KV<ShardedKey<DestinationT>, List<String>>> multiPartitionsTag =
new TupleTag<KV<ShardedKey<DestinationT>, List<String>>>("multiPartitionsTag") {};
TupleTag<KV<ShardedKey<DestinationT>, List<String>>> singlePartitionTag =
new TupleTag<KV<ShardedKey<DestinationT>, List<String>>>("singlePartitionTag") {};

// Turn the list of files and record counts in a PCollectionView that can be used as a
// side input.
PCollectionView<Iterable<WriteBundlesToFiles.Result>> resultsView =
results.apply("ResultsView", View.<WriteBundlesToFiles.Result>asIterable());
PCollectionView<Iterable<WriteBundlesToFiles.Result<DestinationT>>> resultsView =
results.apply("ResultsView",
View.<WriteBundlesToFiles.Result<DestinationT>>asIterable());
// This transform will look at the set of files written for each table, and if any table has
// too many files or bytes, will partition that table's files into multiple partitions for
// loading.
PCollectionTuple partitions =
singleton.apply(
"WritePartition",
ParDo.of(
new WritePartition(
write.getJsonTableRef(),
write.getTableDescription(),
new WritePartition<>(
singletonTable,
resultsView,
multiPartitionsTag,
singlePartitionTag))
.withSideInputs(resultsView)
.withOutputTags(multiPartitionsTag, TupleTagList.of(singlePartitionTag)));

// Since BigQueryIO.java does not yet have support for per-table schemas, inject a constant
// schema function here. If no schema is specified, this function will return null.
// TODO: Turn this into a side-input instead.
SerializableFunction<TableDestination, TableSchema> schemaFunction =
new ConstantSchemaFunction(write.getJsonSchema());
List<PCollectionView<?>> writeTablesSideInputs =
Lists.newArrayList(jobIdTokenView, schemasView);
writeTablesSideInputs.addAll(dynamicDestinations.getSideInputs());

Coder<KV<ShardedKey<DestinationT>, List<String>>> partitionsCoder =
KvCoder.of(
ShardedKeyCoder.of(NullableCoder.of(destinationCoder)),
ListCoder.of(StringUtf8Coder.of()));

Coder<KV<ShardedKey<TableDestination>, List<String>>> partitionsCoder =
KvCoder.of(
ShardedKeyCoder.of(TableDestinationCoder.of()), ListCoder.of(StringUtf8Coder.of()));
// If WriteBundlesToFiles produced more than MAX_NUM_FILES files or MAX_SIZE_BYTES bytes, then
// the import needs to be split into multiple partitions, and those partitions will be
// specified in multiPartitionsTag.
Expand All @@ -195,19 +200,20 @@ public String apply(String input) {
// reexecution of the WritePartitions step once WriteTables has begun.
.apply(
"MultiPartitionsReshuffle",
Reshuffle.<ShardedKey<TableDestination>, List<String>>of())
Reshuffle.<ShardedKey<DestinationT>, List<String>>of())
.apply(
"MultiPartitionsWriteTables",
ParDo.of(
new WriteTables(
new WriteTables<>(
false,
write.getBigQueryServices(),
bigQueryServices,
jobIdTokenView,
schemasView,
tempFilePrefix,
WriteDisposition.WRITE_EMPTY,
CreateDisposition.CREATE_IF_NEEDED,
schemaFunction))
.withSideInputs(jobIdTokenView));
dynamicDestinations))
.withSideInputs(writeTablesSideInputs));

// This view maps each final table destination to the set of temporary partitioned tables
// the PCollection was loaded into.
Expand All @@ -218,10 +224,10 @@ public String apply(String input) {
"WriteRename",
ParDo.of(
new WriteRename(
write.getBigQueryServices(),
bigQueryServices,
jobIdTokenView,
write.getWriteDisposition(),
write.getCreateDisposition(),
writeDisposition,
createDisposition,
tempTablesView))
.withSideInputs(tempTablesView, jobIdTokenView));

Expand All @@ -232,19 +238,20 @@ public String apply(String input) {
// Reshuffle will distribute this among multiple workers, and also guard against
// reexecution of the WritePartitions step once WriteTables has begun.
.apply(
"SinglePartitionsReshuffle", Reshuffle.<ShardedKey<TableDestination>, List<String>>of())
"SinglePartitionsReshuffle", Reshuffle.<ShardedKey<DestinationT>, List<String>>of())
.apply(
"SinglePartitionWriteTables",
ParDo.of(
new WriteTables(
new WriteTables<>(
true,
write.getBigQueryServices(),
bigQueryServices,
jobIdTokenView,
schemasView,
tempFilePrefix,
write.getWriteDisposition(),
write.getCreateDisposition(),
schemaFunction))
.withSideInputs(jobIdTokenView));
writeDisposition,
createDisposition,
dynamicDestinations))
.withSideInputs(writeTablesSideInputs));

return WriteResult.in(input.getPipeline());
}
Expand Down

0 comments on commit 35db745

Please sign in to comment.