Skip to content

Commit

Permalink
Merge 5954e22 into 15bd3a3
Browse files Browse the repository at this point in the history
  • Loading branch information
reuvenlax committed May 11, 2017
2 parents 15bd3a3 + 5954e22 commit 3250eef
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,14 @@
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** PTransform that uses BigQuery batch-load jobs to write a PCollection to BigQuery. */
class BatchLoads<DestinationT>
extends PTransform<PCollection<KV<DestinationT, TableRow>>, WriteResult> {
static final Logger LOG = LoggerFactory.getLogger(BatchLoads.class);

// The maximum number of file writers to keep open in a single bundle at a time, since file
// writers default to 64mb buffers. This comes into play when writing dynamic table destinations.
// The first 20 tables from a single BatchLoads transform will write files inline in the
Expand Down Expand Up @@ -160,7 +164,6 @@ public void validate(PipelineOptions options) {
@Override
public WriteResult expand(PCollection<KV<DestinationT, TableRow>> input) {
Pipeline p = input.getPipeline();
final String stepUuid = BigQueryHelpers.randomUUIDString();

PCollectionView<String> tempFilePrefix =
p.apply("Create", Create.of((Void) null))
Expand All @@ -170,11 +173,12 @@ public WriteResult expand(PCollection<KV<DestinationT, TableRow>> input) {
new DoFn<Void, String>() {
@ProcessElement
public void getTempFilePrefix(ProcessContext c) {
c.output(
resolveTempLocation(
c.getPipelineOptions().getTempLocation(),
"BigQueryWriteTemp",
stepUuid));
String tempLocation = resolveTempLocation(
c.getPipelineOptions().getTempLocation(),
"BigQueryWriteTemp",
BigQueryHelpers.randomUUIDString());
LOG.info("BigQuery temporary file location {}", tempLocation);
c.output(tempLocation);
}
}))
.apply("TempFilePrefixView", View.<String>asSingleton());
Expand All @@ -189,7 +193,9 @@ public void getTempFilePrefix(ProcessContext c) {
new SimpleFunction<String, String>() {
@Override
public String apply(String input) {
return stepUuid;
String jobId = BigQueryHelpers.randomUUIDString();
LOG.info("BigQuery export job id {}", jobId);
return jobId;
}
}))
.apply(View.<String>asSingleton());
Expand All @@ -209,9 +215,10 @@ public String apply(String input) {
new TupleTag<KV<ShardedKey<DestinationT>, TableRow>>("unwrittenRecords") {};
PCollectionTuple writeBundlesTuple = inputInGlobalWindow
.apply("WriteBundlesToFiles",
ParDo.of(new WriteBundlesToFiles<>(stepUuid, unwrittedRecordsTag,
ParDo.of(new WriteBundlesToFiles<>(tempFilePrefix, unwrittedRecordsTag,
maxNumWritersPerBundle, maxFileSize))
.withOutputTags(writtenFilesTag, TupleTagList.of(unwrittedRecordsTag)));
.withSideInputs(tempFilePrefix)
.withOutputTags(writtenFilesTag, TupleTagList.of(unwrittedRecordsTag)));
PCollection<WriteBundlesToFiles.Result<DestinationT>> writtenFiles =
writeBundlesTuple.get(writtenFilesTag)
.setCoder(WriteBundlesToFiles.ResultCoder.of(destinationCoder));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

package org.apache.beam.sdk.io.gcp.bigquery;

import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.resolveTempLocation;

import com.google.api.services.bigquery.model.TableRow;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
Expand All @@ -40,6 +38,7 @@
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -63,7 +62,7 @@ class WriteBundlesToFiles<DestinationT>
// Map from tablespec to a writer for that table.
private transient Map<DestinationT, TableRowWriter> writers;
private transient Map<DestinationT, BoundedWindow> writerWindows;
private final String stepUuid;
private final PCollectionView<String> tempFilePrefixView;
private final TupleTag<KV<ShardedKey<DestinationT>, TableRow>> unwrittedRecordsTag;
private int maxNumWritersPerBundle;
private long maxFileSize;
Expand Down Expand Up @@ -129,11 +128,11 @@ public void verifyDeterministic() {}
}

WriteBundlesToFiles(
String stepUuid,
PCollectionView<String> tempFilePrefixView,
TupleTag<KV<ShardedKey<DestinationT>, TableRow>> unwrittedRecordsTag,
int maxNumWritersPerBundle,
long maxFileSize) {
this.stepUuid = stepUuid;
this.tempFilePrefixView = tempFilePrefixView;
this.unwrittedRecordsTag = unwrittedRecordsTag;
this.maxNumWritersPerBundle = maxNumWritersPerBundle;
this.maxFileSize = maxFileSize;
Expand All @@ -157,8 +156,7 @@ TableRowWriter createAndInsertWriter(DestinationT destination, String tempFilePr

@ProcessElement
public void processElement(ProcessContext c, BoundedWindow window) throws Exception {
String tempFilePrefix = resolveTempLocation(
c.getPipelineOptions().getTempLocation(), "BigQueryWriteTemp", stepUuid);
String tempFilePrefix = c.sideInput(tempFilePrefixView);
DestinationT destination = c.element().getKey();

TableRowWriter writer;
Expand Down

0 comments on commit 3250eef

Please sign in to comment.