Skip to content

Commit

Permalink
This closes #2657: [BEAM-2058] Generate BigQuery load job at runtime
Browse files Browse the repository at this point in the history
  • Loading branch information
jkff committed Jun 5, 2017
2 parents 6543e56 + 94d8547 commit ad2c1f1
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 30 deletions.
Expand Up @@ -60,10 +60,14 @@
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** PTransform that uses BigQuery batch-load jobs to write a PCollection to BigQuery. */
class BatchLoads<DestinationT>
extends PTransform<PCollection<KV<DestinationT, TableRow>>, WriteResult> {
static final Logger LOG = LoggerFactory.getLogger(BatchLoads.class);

// The maximum number of file writers to keep open in a single bundle at a time, since file
// writers default to 64mb buffers. This comes into play when writing dynamic table destinations.
// The first 20 tables from a single BatchLoads transform will write files inline in the
Expand Down Expand Up @@ -161,39 +165,38 @@ public void validate(PipelineOptions options) {
@Override
public WriteResult expand(PCollection<KV<DestinationT, TableRow>> input) {
Pipeline p = input.getPipeline();
final String stepUuid = BigQueryHelpers.randomUUIDString();

PCollectionView<String> tempFilePrefix =
p.apply("Create", Create.of((Void) null))
.apply(
"GetTempFilePrefix",
ParDo.of(
new DoFn<Void, String>() {
@ProcessElement
public void getTempFilePrefix(ProcessContext c) {
c.output(
resolveTempLocation(
c.getPipelineOptions().getTempLocation(),
"BigQueryWriteTemp",
stepUuid));
}
}))
.apply("TempFilePrefixView", View.<String>asSingleton());

// Create a singleton job ID token at execution time. This will be used as the base for all
// load jobs issued from this instance of the transform.
PCollectionView<String> jobIdTokenView =
final PCollection<String> jobIdToken =
p.apply("TriggerIdCreation", Create.of("ignored"))
.apply(
"CreateJobId",
MapElements.via(
new SimpleFunction<String, String>() {
@Override
public String apply(String input) {
return stepUuid;
return BigQueryHelpers.randomUUIDString();
}
}));
final PCollectionView<String> jobIdTokenView = jobIdToken.apply(View.<String>asSingleton());

PCollectionView<String> tempFilePrefix = jobIdToken
.apply(
"GetTempFilePrefix",
ParDo.of(
new DoFn<String, String>() {
@ProcessElement
public void getTempFilePrefix(ProcessContext c) {
String tempLocation = resolveTempLocation(
c.getPipelineOptions().getTempLocation(),
"BigQueryWriteTemp", c.element());
LOG.info("Writing BigQuery temporary files to {} before loading them.",
tempLocation);
c.output(tempLocation);
}
}))
.apply(View.<String>asSingleton());
.apply("TempFilePrefixView", View.<String>asSingleton());

PCollection<KV<DestinationT, TableRow>> inputInGlobalWindow =
input.apply(
Expand All @@ -210,9 +213,10 @@ public String apply(String input) {
new TupleTag<KV<ShardedKey<DestinationT>, TableRow>>("unwrittenRecords") {};
PCollectionTuple writeBundlesTuple = inputInGlobalWindow
.apply("WriteBundlesToFiles",
ParDo.of(new WriteBundlesToFiles<>(stepUuid, unwrittedRecordsTag,
ParDo.of(new WriteBundlesToFiles<>(tempFilePrefix, unwrittedRecordsTag,
maxNumWritersPerBundle, maxFileSize))
.withOutputTags(writtenFilesTag, TupleTagList.of(unwrittedRecordsTag)));
.withSideInputs(tempFilePrefix)
.withOutputTags(writtenFilesTag, TupleTagList.of(unwrittedRecordsTag)));
PCollection<WriteBundlesToFiles.Result<DestinationT>> writtenFiles =
writeBundlesTuple.get(writtenFilesTag)
.setCoder(WriteBundlesToFiles.ResultCoder.of(destinationCoder));
Expand Down
Expand Up @@ -19,8 +19,6 @@
package org.apache.beam.sdk.io.gcp.bigquery;

import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.resolveTempLocation;

import com.google.api.services.bigquery.model.TableRow;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
Expand All @@ -41,6 +39,7 @@
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -64,7 +63,7 @@ class WriteBundlesToFiles<DestinationT>
// Map from tablespec to a writer for that table.
private transient Map<DestinationT, TableRowWriter> writers;
private transient Map<DestinationT, BoundedWindow> writerWindows;
private final String stepUuid;
private final PCollectionView<String> tempFilePrefixView;
private final TupleTag<KV<ShardedKey<DestinationT>, TableRow>> unwrittedRecordsTag;
private int maxNumWritersPerBundle;
private long maxFileSize;
Expand Down Expand Up @@ -131,11 +130,11 @@ public void verifyDeterministic() {}
}

WriteBundlesToFiles(
String stepUuid,
PCollectionView<String> tempFilePrefixView,
TupleTag<KV<ShardedKey<DestinationT>, TableRow>> unwrittedRecordsTag,
int maxNumWritersPerBundle,
long maxFileSize) {
this.stepUuid = stepUuid;
this.tempFilePrefixView = tempFilePrefixView;
this.unwrittedRecordsTag = unwrittedRecordsTag;
this.maxNumWritersPerBundle = maxNumWritersPerBundle;
this.maxFileSize = maxFileSize;
Expand All @@ -159,8 +158,7 @@ TableRowWriter createAndInsertWriter(DestinationT destination, String tempFilePr

@ProcessElement
public void processElement(ProcessContext c, BoundedWindow window) throws Exception {
String tempFilePrefix = resolveTempLocation(
c.getPipelineOptions().getTempLocation(), "BigQueryWriteTemp", stepUuid);
String tempFilePrefix = c.sideInput(tempFilePrefixView);
DestinationT destination = c.element().getKey();

TableRowWriter writer;
Expand Down

0 comments on commit ad2c1f1

Please sign in to comment.