Skip to content

Commit

Permalink
Merge 5fb9936 into a1d82c2
Browse files Browse the repository at this point in the history
  • Loading branch information
vikkyrk committed May 3, 2017
2 parents a1d82c2 + 5fb9936 commit 20f3914
Show file tree
Hide file tree
Showing 8 changed files with 95 additions and 84 deletions.
Expand Up @@ -36,6 +36,7 @@
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
Expand Down Expand Up @@ -106,26 +107,30 @@ public void validate(PipelineOptions options) {
@Override
public WriteResult expand(PCollection<KV<DestinationT, TableRow>> input) {
Pipeline p = input.getPipeline();
BigQueryOptions options = p.getOptions().as(BigQueryOptions.class);

validate(p.getOptions());

final String stepUuid = BigQueryHelpers.randomUUIDString();

String tempLocation = options.getTempLocation();
String tempFilePrefix;
try {
IOChannelFactory factory = IOChannelUtils.getFactory(tempLocation);
tempFilePrefix =
factory.resolve(factory.resolve(tempLocation, "BigQueryWriteTemp"), stepUuid);
} catch (IOException e) {
throw new RuntimeException(
String.format("Failed to resolve BigQuery temp location in %s", tempLocation), e);
}

// Create a singleton job ID token at execution time. This will be used as the base for all
// load jobs issued from this instance of the transfomr.
PCollection<String> singleton = p.apply("Create", Create.of(tempFilePrefix));
// load jobs issued from this instance of the transform.
PCollection<String> singleton = p
.apply("Create", Create.of((Void) null))
.apply("GetTempFilePrefix", ParDo.of(new DoFn<Void, String>() {
@ProcessElement
public void getTempFilePrefix(ProcessContext c) {
String tempLocation = c.getPipelineOptions().getTempLocation();
String tempFilePrefix;
try {
IOChannelFactory factory = IOChannelUtils.getFactory(tempLocation);
tempFilePrefix =
factory.resolve(
factory.resolve(tempLocation, "BigQueryWriteTemp"), stepUuid);
} catch (IOException e) {
throw new RuntimeException(
String.format("Failed to resolve BigQuery temp location in %s", tempLocation), e);
}
c.output(tempFilePrefix);
}
}));

PCollectionView<String> jobIdTokenView =
p.apply("TriggerIdCreation", Create.of("ignored"))
.apply(
Expand All @@ -152,7 +157,7 @@ public String apply(String input) {
PCollection<WriteBundlesToFiles.Result<DestinationT>> results =
inputInGlobalWindow
.apply("WriteBundlesToFiles", ParDo.of(
new WriteBundlesToFiles<DestinationT>(tempFilePrefix)))
new WriteBundlesToFiles<DestinationT>(stepUuid)))
.setCoder(WriteBundlesToFiles.ResultCoder.of(destinationCoder));

TupleTag<KV<ShardedKey<DestinationT>, List<String>>> multiPartitionsTag =
Expand Down Expand Up @@ -209,7 +214,7 @@ public String apply(String input) {
bigQueryServices,
jobIdTokenView,
schemasView,
tempFilePrefix,
stepUuid,
WriteDisposition.WRITE_EMPTY,
CreateDisposition.CREATE_IF_NEEDED,
dynamicDestinations))
Expand Down Expand Up @@ -247,7 +252,7 @@ public String apply(String input) {
bigQueryServices,
jobIdTokenView,
schemasView,
tempFilePrefix,
stepUuid,
writeDisposition,
createDisposition,
dynamicDestinations))
Expand Down
Expand Up @@ -482,17 +482,7 @@ public void validate(PipelineOptions options) {
@Override
public PCollection<TableRow> expand(PBegin input) {
final String stepUuid = BigQueryHelpers.randomUUIDString();
BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class);
BoundedSource<TableRow> source;
final String extractDestinationDir;
String tempLocation = bqOptions.getTempLocation();
try {
IOChannelFactory factory = IOChannelUtils.getFactory(tempLocation);
extractDestinationDir = factory.resolve(tempLocation, stepUuid);
} catch (IOException e) {
throw new RuntimeException(
String.format("Failed to resolve extract destination directory in %s", tempLocation));
}

if (getQuery() != null
&& (!getQuery().isAccessible() || !Strings.isNullOrEmpty(getQuery().get()))) {
Expand All @@ -502,21 +492,29 @@ public PCollection<TableRow> expand(PBegin input) {
getQuery(),
getFlattenResults(),
getUseLegacySql(),
extractDestinationDir,
getBigQueryServices());
} else {
source =
BigQueryTableSource.create(
stepUuid,
getTableProvider(),
extractDestinationDir,
getBigQueryServices());
}
PassThroughThenCleanup.CleanupOperation cleanupOperation =
new PassThroughThenCleanup.CleanupOperation() {
@Override
void cleanup(PipelineOptions options) throws Exception {
BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
final String extractDestinationDir;
String tempLocation = bqOptions.getTempLocation();
try {
IOChannelFactory factory = IOChannelUtils.getFactory(tempLocation);
extractDestinationDir = factory.resolve(tempLocation, stepUuid);
} catch (IOException e) {
throw new RuntimeException(
String.format("Failed to resolve extract destination directory in %s",
tempLocation));
}

JobReference jobRef =
new JobReference()
Expand Down
Expand Up @@ -53,14 +53,12 @@ static BigQueryQuerySource create(
ValueProvider<String> query,
Boolean flattenResults,
Boolean useLegacySql,
String extractDestinationDir,
BigQueryServices bqServices) {
return new BigQueryQuerySource(
stepUuid,
query,
flattenResults,
useLegacySql,
extractDestinationDir,
bqServices);
}

Expand All @@ -74,9 +72,8 @@ private BigQueryQuerySource(
ValueProvider<String> query,
Boolean flattenResults,
Boolean useLegacySql,
String extractDestinationDir,
BigQueryServices bqServices) {
super(stepUuid, extractDestinationDir, bqServices);
super(stepUuid, bqServices);
this.query = checkNotNull(query, "query");
this.flattenResults = checkNotNull(flattenResults, "flattenResults");
this.useLegacySql = checkNotNull(useLegacySql, "useLegacySql");
Expand Down
Expand Up @@ -41,6 +41,8 @@
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.util.IOChannelFactory;
import org.apache.beam.sdk.util.IOChannelUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -64,14 +66,12 @@ abstract class BigQuerySourceBase extends BoundedSource<TableRow> {
protected static final int JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE;

protected final String stepUuid;
protected final String extractDestinationDir;
protected final BigQueryServices bqServices;

private transient List<BoundedSource<TableRow>> cachedSplitResult;

BigQuerySourceBase(String stepUuid, String extractDestinationDir, BigQueryServices bqServices) {
BigQuerySourceBase(String stepUuid, BigQueryServices bqServices) {
this.stepUuid = checkNotNull(stepUuid, "stepUuid");
this.extractDestinationDir = checkNotNull(extractDestinationDir, "extractDestinationDir");
this.bqServices = checkNotNull(bqServices, "bqServices");
}

Expand All @@ -86,9 +86,20 @@ public List<BoundedSource<TableRow>> split(
BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
TableReference tableToExtract = getTableToExtract(bqOptions);
JobService jobService = bqServices.getJobService(bqOptions);

final String extractDestinationDir;
String tempLocation = bqOptions.getTempLocation();
try {
IOChannelFactory factory = IOChannelUtils.getFactory(tempLocation);
extractDestinationDir = factory.resolve(tempLocation, stepUuid);
} catch (IOException e) {
throw new RuntimeException(
String.format("Failed to resolve extract destination directory in %s", tempLocation));
}

String extractJobId = getExtractJobId(createJobIdToken(options.getJobName(), stepUuid));
List<String> tempFiles = executeExtract(
extractJobId, tableToExtract, jobService, bqOptions.getProject());
extractJobId, tableToExtract, jobService, bqOptions.getProject(), extractDestinationDir);

TableSchema tableSchema = bqServices.getDatasetService(bqOptions)
.getTable(tableToExtract).getSchema();
Expand All @@ -114,7 +125,8 @@ public Coder<TableRow> getDefaultOutputCoder() {
}

private List<String> executeExtract(
String jobId, TableReference table, JobService jobService, String executingProject)
String jobId, TableReference table, JobService jobService, String executingProject,
String extractDestinationDir)
throws InterruptedException, IOException {
JobReference jobRef = new JobReference()
.setProjectId(executingProject)
Expand Down
Expand Up @@ -45,9 +45,8 @@ class BigQueryTableSource extends BigQuerySourceBase {
static BigQueryTableSource create(
String stepUuid,
ValueProvider<TableReference> table,
String extractDestinationDir,
BigQueryServices bqServices) {
return new BigQueryTableSource(stepUuid, table, extractDestinationDir, bqServices);
return new BigQueryTableSource(stepUuid, table, bqServices);
}

private final ValueProvider<String> jsonTable;
Expand All @@ -56,9 +55,8 @@ static BigQueryTableSource create(
private BigQueryTableSource(
String stepUuid,
ValueProvider<TableReference> table,
String extractDestinationDir,
BigQueryServices bqServices) {
super(stepUuid, extractDestinationDir, bqServices);
super(stepUuid, bqServices);
this.jsonTable = NestedValueProvider.of(checkNotNull(table, "table"), new TableRefToJson());
this.tableSizeBytes = new AtomicReference<>();
}
Expand Down
Expand Up @@ -32,7 +32,8 @@
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.IOChannelFactory;
import org.apache.beam.sdk.util.IOChannelUtils;
import org.apache.beam.sdk.values.KV;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -47,7 +48,7 @@ class WriteBundlesToFiles<DestinationT>

// Map from tablespec to a writer for that table.
private transient Map<DestinationT, TableRowWriter> writers;
private final String tempFilePrefix;
private final String stepUuid;

/**
* The result of the {@link WriteBundlesToFiles} transform. Corresponds to a single output file,
Expand Down Expand Up @@ -104,8 +105,8 @@ public Result<DestinationT> decode(InputStream inStream, Context context) throws
public void verifyDeterministic() {}
}

WriteBundlesToFiles(String tempFilePrefix) {
this.tempFilePrefix = tempFilePrefix;
WriteBundlesToFiles(String stepUuid) {
this.stepUuid = stepUuid;
}

@StartBundle
Expand All @@ -117,6 +118,17 @@ public void startBundle(Context c) {

@ProcessElement
public void processElement(ProcessContext c) throws Exception {
String tempLocation = c.getPipelineOptions().getTempLocation();
String tempFilePrefix;
try {
IOChannelFactory factory = IOChannelUtils.getFactory(tempLocation);
tempFilePrefix =
factory.resolve(
factory.resolve(tempLocation, "BigQueryWriteTemp"), stepUuid);
} catch (IOException e) {
throw new RuntimeException(
String.format("Failed to resolve BigQuery temp location in %s", tempLocation), e);
}
TableRowWriter writer = writers.get(c.element().getKey());
if (writer == null) {
writer = new TableRowWriter(tempFilePrefix);
Expand Down Expand Up @@ -147,12 +159,4 @@ public void finishBundle(Context c) throws Exception {
}
writers.clear();
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);

builder.addIfNotNull(
DisplayData.item("tempFilePrefix", tempFilePrefix).withLabel("Temporary File Prefix"));
}
}
Expand Up @@ -41,7 +41,6 @@
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.FileIOChannelFactory;
import org.apache.beam.sdk.util.GcsIOChannelFactory;
import org.apache.beam.sdk.util.GcsUtil;
Expand Down Expand Up @@ -73,7 +72,7 @@ class WriteTables<DestinationT>
private final BigQueryServices bqServices;
private final PCollectionView<String> jobIdToken;
private final PCollectionView<Map<DestinationT, String>> schemasView;
private final String tempFilePrefix;
private final String stepUuid;
private final WriteDisposition writeDisposition;
private final CreateDisposition createDisposition;
private final DynamicDestinations<?, DestinationT> dynamicDestinations;
Expand All @@ -83,15 +82,15 @@ public WriteTables(
BigQueryServices bqServices,
PCollectionView<String> jobIdToken,
PCollectionView<Map<DestinationT, String>> schemasView,
String tempFilePrefix,
String stepUuid,
WriteDisposition writeDisposition,
CreateDisposition createDisposition,
DynamicDestinations<?, DestinationT> dynamicDestinations) {
this.singlePartition = singlePartition;
this.bqServices = bqServices;
this.jobIdToken = jobIdToken;
this.schemasView = schemasView;
this.tempFilePrefix = tempFilePrefix;
this.stepUuid = stepUuid;
this.writeDisposition = writeDisposition;
this.createDisposition = createDisposition;
this.dynamicDestinations = dynamicDestinations;
Expand All @@ -113,6 +112,18 @@ public void processElement(ProcessContext c) throws Exception {
tableReference, tableDestination.getTableDescription());
}

String tempLocation = c.getPipelineOptions().getTempLocation();
String tempFilePrefix;
try {
IOChannelFactory factory = IOChannelUtils.getFactory(tempLocation);
tempFilePrefix =
factory.resolve(
factory.resolve(tempLocation, "BigQueryWriteTemp"), stepUuid);
} catch (IOException e) {
throw new RuntimeException(
String.format("Failed to resolve BigQuery temp location in %s", tempLocation), e);
}

Integer partition = c.element().getKey().getShardNumber();
List<String> partitionFiles = Lists.newArrayList(c.element().getValue());
String jobIdPrefix =
Expand Down Expand Up @@ -213,12 +224,4 @@ static void removeTemporaryFiles(
throw new IOException("Unrecognized file system.");
}
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);

builder.addIfNotNull(
DisplayData.item("tempFilePrefix", tempFilePrefix).withLabel("Temporary File Prefix"));
}
}

0 comments on commit 20f3914

Please sign in to comment.