From fc7ea7c2a60b7edc50c2a9cd9d959f4ab77d156e Mon Sep 17 00:00:00 2001 From: Ian Zhou Date: Wed, 20 Jul 2016 15:56:21 -0700 Subject: [PATCH 1/7] Modified BigQueryIO to write based on number of files and file sizes --- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 559 +++++++++++++----- .../sdk/io/gcp/bigquery/BigQueryServices.java | 7 + .../io/gcp/bigquery/BigQueryServicesImpl.java | 19 + .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 10 +- 4 files changed, 435 insertions(+), 160 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 76f70799d0e3..e6fedbe36dc4 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; +import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.Coder.Context; @@ -33,7 +34,6 @@ import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.io.AvroSource; import org.apache.beam.sdk.io.BoundedSource; -import org.apache.beam.sdk.io.FileBasedSink; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService; @@ -45,6 +45,7 @@ import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; @@ -52,7 +53,13 @@ import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.DefaultTrigger; +import org.apache.beam.sdk.transforms.windowing.GlobalWindows; +import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff; +import org.apache.beam.sdk.util.FileIOChannelFactory; +import org.apache.beam.sdk.util.GcsIOChannelFactory; +import org.apache.beam.sdk.util.GcsUtil; import org.apache.beam.sdk.util.GcsUtil.GcsUtilFactory; import org.apache.beam.sdk.util.IOChannelFactory; import org.apache.beam.sdk.util.IOChannelUtils; @@ -80,6 +87,7 @@ import com.google.api.services.bigquery.model.JobConfigurationExtract; import com.google.api.services.bigquery.model.JobConfigurationLoad; import com.google.api.services.bigquery.model.JobConfigurationQuery; +import com.google.api.services.bigquery.model.JobConfigurationTableCopy; import com.google.api.services.bigquery.model.JobReference; import com.google.api.services.bigquery.model.JobStatistics; import com.google.api.services.bigquery.model.JobStatus; @@ -93,6 +101,7 @@ import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; +import com.google.common.io.CountingOutputStream; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; @@ -110,6 +119,8 @@ import java.nio.channels.Channels; import java.nio.channels.WritableByteChannel; import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Paths; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -1681,7 +1692,7 @@ public PDone apply(PCollection input) { if (Strings.isNullOrEmpty(table.getProjectId())) { table.setProjectId(options.getProject()); } - String jobIdToken = randomUUIDString(); + final String jobIdToken = "beam_job_" + randomUUIDString(); String tempLocation = options.getTempLocation(); String tempFilePrefix; try { @@ -1695,16 +1706,14 @@ public PDone apply(PCollection input) { e); } - return input.apply("Write", org.apache.beam.sdk.io.Write.to( - new BigQuerySink( - jobIdToken, - table, - jsonSchema, - getWriteDisposition(), - getCreateDisposition(), - tempFilePrefix, - input.getCoder(), - bqServices))); + return input.apply("Write", new BigQueryWrite( + bqServices, + jobIdToken, + tempFilePrefix, + toJsonString(table), + jsonSchema, + getWriteDisposition(), + getCreateDisposition())); } @Override @@ -1789,187 +1798,419 @@ private BigQueryServices getBigQueryServices() { private Write() {} } - /** - * {@link BigQuerySink} is implemented as a {@link FileBasedSink}. - * - *

It uses BigQuery load job to import files into BigQuery. - */ - static class BigQuerySink extends FileBasedSink { - private final String jobIdToken; - @Nullable private final String jsonTable; - @Nullable private final String jsonSchema; - private final WriteDisposition writeDisposition; - private final CreateDisposition createDisposition; - private final Coder coder; - private final BigQueryServices bqServices; - - public BigQuerySink( + public static class BigQueryWrite + extends PTransform, PDone> { + private static final String TEMPORARY_FILENAME_SEPARATOR = "-temp-"; + private BigQueryServices bqServices; + private String jobIdToken; + private String tempFilePrefix; + private String jsonTable; + private String jsonSchema; + private WriteDisposition writeDisposition; + private CreateDisposition createDisposition; + + public BigQueryWrite( + BigQueryServices bqServices, String jobIdToken, - @Nullable TableReference table, - @Nullable String jsonSchema, + String tempFilePrefix, + String jsonTable, + String jsonSchema, WriteDisposition writeDisposition, - CreateDisposition createDisposition, - String tempFile, - Coder coder, - BigQueryServices bqServices) { - super(tempFile, ".json"); - this.jobIdToken = checkNotNull(jobIdToken, "jobIdToken"); - if (table == null) { - this.jsonTable = null; - } else { - checkArgument(!Strings.isNullOrEmpty(table.getProjectId()), - "Table %s should have a project specified", table); - this.jsonTable = toJsonString(table); - } + CreateDisposition createDisposition) { + this.bqServices = bqServices; + this.jobIdToken = jobIdToken; + this.tempFilePrefix = tempFilePrefix + TEMPORARY_FILENAME_SEPARATOR; + this.jsonTable = jsonTable; this.jsonSchema = jsonSchema; - this.writeDisposition = checkNotNull(writeDisposition, "writeDisposition"); - this.createDisposition = checkNotNull(createDisposition, "createDisposition"); - this.coder = checkNotNull(coder, "coder"); - this.bqServices = checkNotNull(bqServices, "bqServices"); - } - - @Override - public FileBasedSink.FileBasedWriteOperation createWriteOperation( - PipelineOptions options) { - return new BigQueryWriteOperation(this); + this.writeDisposition = writeDisposition; + this.createDisposition = createDisposition; } @Override - public void populateDisplayData(DisplayData.Builder builder) { - super.populateDisplayData(builder); + public PDone apply(PCollection input) { + Pipeline p = input.getPipeline(); + + PCollection inputInGlobalWindow = + input.apply( + Window.into(new GlobalWindows()) + .triggering(DefaultTrigger.of()) + .discardingFiredPanes()); + + PCollection> results = inputInGlobalWindow + .apply("WriteBundles", + ParDo.of(new WriteBundles(input.getCoder()))); + + PCollectionView>> resultsView + = results.apply("ResultsView", View.>asIterable()); + PCollection>> partitions = p.apply("PartitionsCreate", Create.of(1)) + .apply(ParDo.of(new WritePartition(resultsView)).withSideInputs(resultsView)); + + PCollection tempTables = partitions + .apply("PartitionsGroupByKey", GroupByKey.>create()) + .apply(ParDo.of(new WriteTempTables( + bqServices, + jobIdToken, + tempFilePrefix, + jsonTable, + jsonSchema))); + + PCollectionView> tempTablesView + = tempTables.apply("TempTablesView", View.asIterable()); + p.apply("RenameCreate", Create.of(1)) + .apply(ParDo + .of(new WriteRename( + bqServices, + jobIdToken, + jsonTable, + writeDisposition, + createDisposition, + tempTablesView)) + .withSideInputs(tempTablesView)); - builder - .addIfNotNull(DisplayData.item("schema", jsonSchema) - .withLabel("Table Schema")) - .addIfNotNull(DisplayData.item("tableSpec", jsonTable) - .withLabel("Table Specification")); + return PDone.in(input.getPipeline()); } - private static class BigQueryWriteOperation extends FileBasedWriteOperation { - // The maximum number of retry load jobs. - private static final int MAX_RETRY_LOAD_JOBS = 3; - - // The maximum number of retries to poll the status of a load job. - // It sets to {@code Integer.MAX_VALUE} to block until the BigQuery job finishes. - private static final int LOAD_JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE; - - private final BigQuerySink bigQuerySink; + private class WriteBundles extends DoFn> { + private TableRowWriter writer = null; + private Coder coder; - private BigQueryWriteOperation(BigQuerySink sink) { - super(checkNotNull(sink, "sink")); - this.bigQuerySink = sink; - } - - @Override - public FileBasedWriter createWriter(PipelineOptions options) throws Exception { - return new TableRowWriter(this, bigQuerySink.coder); + WriteBundles(Coder coder) { + this.coder = coder; } @Override - public void finalize(Iterable writerResults, PipelineOptions options) - throws IOException, InterruptedException { + public void processElement(ProcessContext c) throws Exception { + if (writer == null) { + writer = new TableRowWriter(tempFilePrefix, coder); + writer.open(UUID.randomUUID().toString()); + LOG.debug("Done opening writer {}", writer); + } try { - BigQueryOptions bqOptions = options.as(BigQueryOptions.class); - List tempFiles = Lists.newArrayList(); - for (FileResult result : writerResults) { - tempFiles.add(result.getFilename()); - } - if (!tempFiles.isEmpty()) { - load( - bigQuerySink.bqServices.getJobService(bqOptions), - bigQuerySink.jobIdToken, - fromJsonString(bigQuerySink.jsonTable, TableReference.class), - tempFiles, - fromJsonString(bigQuerySink.jsonSchema, TableSchema.class), - bigQuerySink.writeDisposition, - bigQuerySink.createDisposition); + writer.write(c.element()); + } catch (Exception e) { + // Discard write result and close the write. + try { + writer.close(); + // The writer does not need to be reset, as this DoFn cannot be reused. + } catch (Exception closeException) { + // Do not mask the exception that caused the write to fail. + e.addSuppressed(closeException); } - } finally { - removeTemporaryFiles(options); + throw e; } } - /** - * Import files into BigQuery with load jobs. - * - *

Returns if files are successfully loaded into BigQuery. - * Throws a RuntimeException if: - * 1. The status of one load job is UNKNOWN. This is to avoid duplicating data. - * 2. It exceeds {@code MAX_RETRY_LOAD_JOBS}. - * - *

If a load job failed, it will try another load job with a different job id. - */ - private void load( - JobService jobService, - String jobIdPrefix, - TableReference ref, - List gcsUris, - @Nullable TableSchema schema, - WriteDisposition writeDisposition, - CreateDisposition createDisposition) throws InterruptedException, IOException { - JobConfigurationLoad loadConfig = new JobConfigurationLoad() - .setSourceUris(gcsUris) - .setDestinationTable(ref) - .setSchema(schema) - .setWriteDisposition(writeDisposition.name()) - .setCreateDisposition(createDisposition.name()) - .setSourceFormat("NEWLINE_DELIMITED_JSON"); - - boolean retrying = false; - String projectId = ref.getProjectId(); - for (int i = 0; i < MAX_RETRY_LOAD_JOBS; ++i) { - String jobId = jobIdPrefix + "-" + i; - if (retrying) { - LOG.info("Previous load jobs failed, retrying."); - } - LOG.info("Starting BigQuery load job: {}", jobId); - JobReference jobRef = new JobReference() - .setProjectId(projectId) - .setJobId(jobId); - jobService.startLoadJob(jobRef, loadConfig); - Status jobStatus = - parseStatus(jobService.pollJob(jobRef, LOAD_JOB_POLL_MAX_RETRIES)); - switch (jobStatus) { - case SUCCEEDED: - return; - case UNKNOWN: - throw new RuntimeException("Failed to poll the load job status."); - case FAILED: - LOG.info("BigQuery load job failed: {}", jobId); - retrying = true; - continue; - default: - throw new IllegalStateException("Unexpected job status: " + jobStatus); - } + @Override + public void finishBundle(Context c) throws Exception { + if (writer != null) { + KV result = writer.close(); + c.output(result); + writer = null; } - throw new RuntimeException( - "Failed to create the load job, reached max retries: " + MAX_RETRY_LOAD_JOBS); } } - private static class TableRowWriter extends FileBasedWriter { + static class TableRowWriter { private static final byte[] NEWLINE = "\n".getBytes(StandardCharsets.UTF_8); + private String id; + private String tempFilePrefix; + private String fileName; + private WritableByteChannel channel; + protected String mimeType = MimeTypes.TEXT; private final Coder coder; - private OutputStream out; + private CountingOutputStream out; - public TableRowWriter( - FileBasedWriteOperation writeOperation, Coder coder) { - super(writeOperation); - this.mimeType = MimeTypes.TEXT; + TableRowWriter(String basename, Coder coder) { + this.tempFilePrefix = basename; this.coder = coder; } - @Override - protected void prepareWrite(WritableByteChannel channel) throws Exception { - out = Channels.newOutputStream(channel); + public final void open(String uId) throws Exception { + id = uId; + fileName = tempFilePrefix + id; + LOG.debug("Opening {}.", fileName); + channel = IOChannelUtils.create(fileName, mimeType); + try { + out = new CountingOutputStream(Channels.newOutputStream(channel)); + LOG.debug("Writing header to {}.", fileName); + } catch (Exception e) { + try { + LOG.error("Writing header to {} failed, closing channel.", fileName); + channel.close(); + } catch (IOException closeException) { + LOG.error("Closing channel for {} failed: {}", fileName, closeException.getMessage()); + } + throw e; + } + LOG.debug("Starting write of bundle {} to {}.", this.id, fileName); } - @Override public void write(TableRow value) throws Exception { - // Use Context.OUTER to encode and NEWLINE as the delimeter. coder.encode(value, out, Context.OUTER); out.write(NEWLINE); } + + public final KV close() throws IOException { + channel.close(); + KV result = KV.of(fileName, out.getCount()); + LOG.info("Result for bundle {}: {}, {}", this.id, fileName, result.getValue()); + return result; + } + } + } + + /** + * Partitions temporary files based on number of files and file sizes. + */ + static class WritePartition extends DoFn>> { + private static final int MAX_NUM_FILES = 1; + private static final long MAX_SIZE_BYTES = 3 * (1L << 40); + private final PCollectionView>> resultsView; + + public WritePartition(PCollectionView>> resultsView) { + this.resultsView = resultsView; + } + + @Override + public void processElement(ProcessContext c) throws Exception { + List> results = Lists.newArrayList(c.sideInput(resultsView)); + long partitionId = 0; + int currNumFiles = 0; + long currSizeBytes = 0; + List currResults = Lists.newArrayList(); + for (int i = 0; i < results.size(); ++i) { + KV fileResult = results.get(i); + ++currNumFiles; + currSizeBytes += fileResult.getValue(); + currResults.add(fileResult.getKey()); + if (currNumFiles >= MAX_NUM_FILES + || currSizeBytes >= MAX_SIZE_BYTES + || i == results.size() - 1) { + c.output(KV.of(++partitionId, currResults)); + currResults = Lists.newArrayList(); + currNumFiles = 0; + currSizeBytes = 0; + } + } + } + } + + /** + * Writes partitions to separate temporary tables + */ + static class WriteTempTables extends DoFn>>, String> { + private static final int MAX_RETRY_LOAD_JOBS = 3; + private static final int LOAD_JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE; + private BigQueryServices bqServices; + private String jobIdToken; + private String tempFilePrefix; + private String jsonTable; + private String jsonSchema; + + public WriteTempTables( + BigQueryServices bqServices, + String jobIdToken, + String tempFilePrefix, + String jsonTable, + String jsonSchema) { + this.bqServices = bqServices; + this.jobIdToken = jobIdToken; + this.tempFilePrefix = tempFilePrefix; + this.jsonTable = jsonTable; + this.jsonSchema = jsonSchema; + } + + @Override + public void processElement(ProcessContext c) throws Exception { + List partition = Lists.newArrayList(c.element().getValue()).get(0); + String jobIdPrefix = String.format(jobIdToken + "_%05d", c.element().getKey()); + TableReference ref = fromJsonString(jsonTable, TableReference.class) + .setTableId(jobIdPrefix); + load( + bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)), + jobIdPrefix, + ref, + fromJsonString(jsonSchema, TableSchema.class), + partition, + WriteDisposition.WRITE_EMPTY, + CreateDisposition.CREATE_IF_NEEDED); + c.output(toJsonString(ref)); + removeTemporaryFiles(c.getPipelineOptions(), partition); + } + + private void load( + JobService jobService, + String jobIdPrefix, + TableReference ref, + @Nullable TableSchema schema, + List gcsUris, + WriteDisposition writeDisposition, + CreateDisposition createDisposition) throws InterruptedException, IOException { + JobConfigurationLoad loadConfig = new JobConfigurationLoad() + .setDestinationTable(ref) + .setSchema(schema) + .setSourceUris(gcsUris) + .setWriteDisposition(writeDisposition.name()) + .setCreateDisposition(createDisposition.name()) + .setSourceFormat("NEWLINE_DELIMITED_JSON"); + + boolean retrying = false; + String projectId = ref.getProjectId(); + for (int i = 0; i < MAX_RETRY_LOAD_JOBS; ++i) { + String jobId = jobIdPrefix + "-" + i; + if (retrying) { + LOG.info("Previous load jobs failed, retrying."); + } + LOG.info("Starting BigQuery load job: {}", jobId); + JobReference jobRef = new JobReference() + .setProjectId(projectId) + .setJobId(jobId); + jobService.startLoadJob(jobRef, loadConfig); + Status jobStatus = + parseStatus(jobService.pollJob(jobRef, LOAD_JOB_POLL_MAX_RETRIES)); + switch (jobStatus) { + case SUCCEEDED: + return; + case UNKNOWN: + throw new RuntimeException("Failed to poll the load job status."); + case FAILED: + LOG.info("BigQuery load job failed: {}", jobId); + retrying = true; + continue; + default: + throw new IllegalStateException("Unexpected job status: " + jobStatus); + } + } + throw new RuntimeException( + "Failed to create the load job, reached max retries: " + MAX_RETRY_LOAD_JOBS); + } + + private void removeTemporaryFiles(PipelineOptions options, Collection matches) + throws IOException { + String pattern = tempFilePrefix + "*"; + LOG.debug("Finding temporary bundle output files matching {}.", pattern); + IOChannelFactory factory = IOChannelUtils.getFactory(pattern); + if (factory instanceof GcsIOChannelFactory) { + GcsUtil gcsUtil = new GcsUtil.GcsUtilFactory().create(options); + gcsUtil.remove(matches); + } else if (factory instanceof FileIOChannelFactory) { + for (String filename : matches) { + LOG.debug("Removing file {}", filename); + boolean exists = Files.deleteIfExists(Paths.get(filename)); + if (!exists) { + LOG.debug("{} does not exist.", filename); + } + } + } else { + throw new IOException("Unrecognized file system."); + } + } + } + + /** + * Copies temporary tables to destination table + */ + static class WriteRename extends DoFn { + private static final int MAX_RETRY_COPY_JOBS = 3; + private static final int LOAD_JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE; + private BigQueryServices bqServices; + private String jobIdToken; + private String jsonTable; + private WriteDisposition writeDisposition; + private CreateDisposition createDisposition; + private List tempTables; + private final PCollectionView> tempTablesView; + + public WriteRename( + BigQueryServices bqServices, + String jobIdToken, + String jsonTable, + WriteDisposition writeDisposition, + CreateDisposition createDisposition, + PCollectionView> tempTablesView) { + this.bqServices = bqServices; + this.jobIdToken = jobIdToken; + this.jsonTable = jsonTable; + this.writeDisposition = writeDisposition; + this.createDisposition = createDisposition; + this.tempTablesView = tempTablesView; + tempTables = Lists.newArrayList(); + } + + @Override + public void processElement(ProcessContext c) throws Exception { + List tempTablesJson = Lists.newArrayList(c.sideInput(tempTablesView)); + for (String table : tempTablesJson) { + tempTables.add(fromJsonString(table, TableReference.class)); + } + copy( + bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)), + jobIdToken, + fromJsonString(jsonTable, TableReference.class), + tempTables, + writeDisposition, + createDisposition); + } + + @Override + public void finishBundle(Context c) throws Exception { + DatasetService tableService = + bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)); + removeTemporaryTables(tableService, tempTables); + } + + private void copy( + JobService jobService, + String jobIdPrefix, + TableReference ref, + List tempTables, + WriteDisposition writeDisposition, + CreateDisposition createDisposition) throws InterruptedException, IOException { + JobConfigurationTableCopy copyConfig = new JobConfigurationTableCopy() + .setSourceTables(tempTables) + .setDestinationTable(ref) + .setWriteDisposition(writeDisposition.name()) + .setCreateDisposition(createDisposition.name()); + + boolean retrying = false; + String projectId = ref.getProjectId(); + for (int i = 0; i < MAX_RETRY_COPY_JOBS; ++i) { + String jobId = jobIdPrefix + "-" + i; + if (retrying) { + LOG.info("Previous copy jobs failed, retrying."); + } + LOG.info("Starting BigQuery copy job: {}", jobId); + JobReference jobRef = new JobReference() + .setProjectId(projectId) + .setJobId(jobId); + jobService.startCopyJob(jobRef, copyConfig); + Status jobStatus = + parseStatus(jobService.pollJob(jobRef, LOAD_JOB_POLL_MAX_RETRIES)); + switch (jobStatus) { + case SUCCEEDED: + return; + case UNKNOWN: + throw new RuntimeException("Failed to poll the copy job status."); + case FAILED: + LOG.info("BigQuery copy job failed: {}", jobId); + retrying = true; + continue; + default: + throw new IllegalStateException("Unexpected job status: " + jobStatus); + } + } + throw new RuntimeException( + "Failed to create the copy job, reached max retries: " + MAX_RETRY_COPY_JOBS); + } + + private void removeTemporaryTables(DatasetService tableService, + List tempTables) throws Exception { + for (TableReference tableRef : tempTables) { + tableService.deleteTable( + tableRef.getProjectId(), + tableRef.getDatasetId(), + tableRef.getTableId()); + } } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java index 29a335dd5e9e..0af6df8a3317 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java @@ -24,6 +24,7 @@ import com.google.api.services.bigquery.model.JobConfigurationExtract; import com.google.api.services.bigquery.model.JobConfigurationLoad; import com.google.api.services.bigquery.model.JobConfigurationQuery; +import com.google.api.services.bigquery.model.JobConfigurationTableCopy; import com.google.api.services.bigquery.model.JobReference; import com.google.api.services.bigquery.model.JobStatistics; import com.google.api.services.bigquery.model.Table; @@ -82,6 +83,12 @@ void startExtractJob(JobReference jobRef, JobConfigurationExtract extractConfig) void startQueryJob(JobReference jobRef, JobConfigurationQuery query) throws IOException, InterruptedException; + /** + * Start a BigQuery copy job. + */ + void startCopyJob(JobReference jobRef, JobConfigurationTableCopy copyConfig) + throws IOException, InterruptedException; + /** * Waits for the job is Done, and returns the job. * diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java index ef17e0fa14d6..5fa0f91f8943 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java @@ -39,6 +39,7 @@ import com.google.api.services.bigquery.model.JobConfigurationExtract; import com.google.api.services.bigquery.model.JobConfigurationLoad; import com.google.api.services.bigquery.model.JobConfigurationQuery; +import com.google.api.services.bigquery.model.JobConfigurationTableCopy; import com.google.api.services.bigquery.model.JobReference; import com.google.api.services.bigquery.model.JobStatistics; import com.google.api.services.bigquery.model.JobStatus; @@ -175,6 +176,24 @@ public void startQueryJob(JobReference jobRef, JobConfigurationQuery queryConfig startJob(job, errorExtractor, client); } + /** + * {@inheritDoc} + * + *

the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds. + * + * @throws IOException if it exceeds max RPC . + */ + @Override + public void startCopyJob(JobReference jobRef, JobConfigurationTableCopy copyConfig) + throws IOException, InterruptedException { + Job job = new Job() + .setJobReference(jobRef) + .setConfiguration( + new JobConfiguration().setCopy(copyConfig)); + + startJob(job, errorExtractor, client); + } + private static void startJob(Job job, ApiErrorExtractor errorExtractor, Bigquery client) throws IOException, InterruptedException { diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index 00e789166615..99670d858c85 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -76,6 +76,7 @@ import com.google.api.services.bigquery.model.JobConfigurationExtract; import com.google.api.services.bigquery.model.JobConfigurationLoad; import com.google.api.services.bigquery.model.JobConfigurationQuery; +import com.google.api.services.bigquery.model.JobConfigurationTableCopy; import com.google.api.services.bigquery.model.JobReference; import com.google.api.services.bigquery.model.JobStatistics; import com.google.api.services.bigquery.model.JobStatistics2; @@ -274,6 +275,12 @@ public void startQueryJob(JobReference jobRef, JobConfigurationQuery query) startJob(jobRef); } + @Override + public void startCopyJob(JobReference jobRef, JobConfigurationTableCopy copyConfig) + throws IOException, InterruptedException { + startJob(jobRef); + } + @Override public Job pollJob(JobReference jobRef, int maxAttempts) throws InterruptedException { @@ -565,7 +572,8 @@ public void testCustomSink() throws Exception { FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() .withJobService(new FakeJobService() .startJobReturns("done", "done", "done") - .pollJobReturns(Status.FAILED, Status.FAILED, Status.SUCCEEDED)); + .pollJobReturns(Status.FAILED, Status.FAILED, Status.SUCCEEDED)) + .withDatasetService(mockDatasetService); Pipeline p = TestPipeline.create(bqOptions); p.apply(Create.of( From ab35590d8d865dc24d7130613525ee5e20f99270 Mon Sep 17 00:00:00 2001 From: Ian Zhou Date: Thu, 21 Jul 2016 17:04:25 -0700 Subject: [PATCH 2/7] Added unit tests for DoFns used in BigQueryWrite --- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 34 ++-- .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 158 ++++++++++++++++++ 2 files changed, 176 insertions(+), 16 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index e6fedbe36dc4..bacc5968a898 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -1798,8 +1798,16 @@ private BigQueryServices getBigQueryServices() { private Write() {} } + /** + * A {@link PTransform} called by BigQueryIO.Write that performs the write of {@link PCollection} + * containing {@link TableRow TableRows} to a BigQuery table. + */ public static class BigQueryWrite extends PTransform, PDone> { + static final int MAX_NUM_FILES = 10000; + static final long MAX_SIZE_BYTES = 3 * (1L << 40); + static final int MAX_RETRY_JOBS = 3; + static final int LOAD_JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE; private static final String TEMPORARY_FILENAME_SEPARATOR = "-temp-"; private BigQueryServices bqServices; private String jobIdToken; @@ -1963,8 +1971,6 @@ public final KV close() throws IOException { * Partitions temporary files based on number of files and file sizes. */ static class WritePartition extends DoFn>> { - private static final int MAX_NUM_FILES = 1; - private static final long MAX_SIZE_BYTES = 3 * (1L << 40); private final PCollectionView>> resultsView; public WritePartition(PCollectionView>> resultsView) { @@ -1983,8 +1989,8 @@ public void processElement(ProcessContext c) throws Exception { ++currNumFiles; currSizeBytes += fileResult.getValue(); currResults.add(fileResult.getKey()); - if (currNumFiles >= MAX_NUM_FILES - || currSizeBytes >= MAX_SIZE_BYTES + if (currNumFiles >= BigQueryWrite.MAX_NUM_FILES + || currSizeBytes >= BigQueryWrite.MAX_SIZE_BYTES || i == results.size() - 1) { c.output(KV.of(++partitionId, currResults)); currResults = Lists.newArrayList(); @@ -1996,11 +2002,9 @@ public void processElement(ProcessContext c) throws Exception { } /** - * Writes partitions to separate temporary tables + * Writes partitions to separate temporary tables. */ static class WriteTempTables extends DoFn>>, String> { - private static final int MAX_RETRY_LOAD_JOBS = 3; - private static final int LOAD_JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE; private BigQueryServices bqServices; private String jobIdToken; private String tempFilePrefix; @@ -2056,7 +2060,7 @@ private void load( boolean retrying = false; String projectId = ref.getProjectId(); - for (int i = 0; i < MAX_RETRY_LOAD_JOBS; ++i) { + for (int i = 0; i < BigQueryWrite.MAX_RETRY_JOBS; ++i) { String jobId = jobIdPrefix + "-" + i; if (retrying) { LOG.info("Previous load jobs failed, retrying."); @@ -2067,7 +2071,7 @@ private void load( .setJobId(jobId); jobService.startLoadJob(jobRef, loadConfig); Status jobStatus = - parseStatus(jobService.pollJob(jobRef, LOAD_JOB_POLL_MAX_RETRIES)); + parseStatus(jobService.pollJob(jobRef, BigQueryWrite.LOAD_JOB_POLL_MAX_RETRIES)); switch (jobStatus) { case SUCCEEDED: return; @@ -2082,7 +2086,7 @@ private void load( } } throw new RuntimeException( - "Failed to create the load job, reached max retries: " + MAX_RETRY_LOAD_JOBS); + "Failed to create the load job, reached max retries: " + BigQueryWrite.MAX_RETRY_JOBS); } private void removeTemporaryFiles(PipelineOptions options, Collection matches) @@ -2108,11 +2112,9 @@ private void removeTemporaryFiles(PipelineOptions options, Collection ma } /** - * Copies temporary tables to destination table + * Copies temporary tables to destination table. */ static class WriteRename extends DoFn { - private static final int MAX_RETRY_COPY_JOBS = 3; - private static final int LOAD_JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE; private BigQueryServices bqServices; private String jobIdToken; private String jsonTable; @@ -2174,7 +2176,7 @@ private void copy( boolean retrying = false; String projectId = ref.getProjectId(); - for (int i = 0; i < MAX_RETRY_COPY_JOBS; ++i) { + for (int i = 0; i < BigQueryWrite.MAX_RETRY_JOBS; ++i) { String jobId = jobIdPrefix + "-" + i; if (retrying) { LOG.info("Previous copy jobs failed, retrying."); @@ -2185,7 +2187,7 @@ private void copy( .setJobId(jobId); jobService.startCopyJob(jobRef, copyConfig); Status jobStatus = - parseStatus(jobService.pollJob(jobRef, LOAD_JOB_POLL_MAX_RETRIES)); + parseStatus(jobService.pollJob(jobRef, BigQueryWrite.LOAD_JOB_POLL_MAX_RETRIES)); switch (jobStatus) { case SUCCEEDED: return; @@ -2200,7 +2202,7 @@ private void copy( } } throw new RuntimeException( - "Failed to create the copy job, reached max retries: " + MAX_RETRY_COPY_JOBS); + "Failed to create the copy job, reached max retries: " + BigQueryWrite.MAX_RETRY_JOBS); } private void removeTemporaryTables(DatasetService tableService, diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index 99670d858c85..9346e3b343ae 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -32,18 +32,24 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.TableRowJsonCoder; +import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.CountingSource; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.BigQueryQuerySource; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.BigQueryTableSource; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.BigQueryWrite; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.PassThroughThenCleanup; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.PassThroughThenCleanup.CleanupOperation; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Status; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TransformingSource; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.WritePartition; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.WriteRename; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.WriteTempTables; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService; import org.apache.beam.sdk.options.BigQueryOptions; @@ -59,15 +65,21 @@ import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFnTester; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.util.IOChannelFactory; import org.apache.beam.sdk.util.IOChannelUtils; +import org.apache.beam.sdk.util.PCollectionViews; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; import com.google.api.client.util.Data; import com.google.api.client.util.Strings; @@ -111,6 +123,7 @@ import java.io.FileFilter; import java.io.IOException; import java.io.Serializable; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; @@ -592,7 +605,9 @@ public void testCustomSink() throws Exception { p.run(); logged.verifyInfo("Starting BigQuery load job"); + logged.verifyInfo("Starting BigQuery copy job"); logged.verifyInfo("Previous load jobs failed, retrying."); + logged.verifyInfo("Previous copy jobs failed, retrying."); File tempDir = new File(bqOptions.getTempLocation()); assertEquals(0, tempDir.listFiles(new FileFilter() { @Override @@ -1236,4 +1251,147 @@ void cleanup(PipelineOptions options) throws Exception { p.run(); } + + @Test + public void testWritePartitionManyFiles() throws Exception { + final long numFiles = BigQueryWrite.MAX_NUM_FILES * 3; + final long fileSize = 1; + + // One partition is needed for each group of BigQueryWrite.MAX_NUM_FILES files + final long expectedNumPartitions = 3; + testWritePartition(numFiles, fileSize, expectedNumPartitions); + } + + @Test + public void testWritePartitionLargeFileSize() throws Exception { + final long numFiles = 10; + final long fileSize = BigQueryWrite.MAX_SIZE_BYTES / 3; + + // One partition is needed for each group of three files + final long expectedNumPartitions = 4; + testWritePartition(numFiles, fileSize, expectedNumPartitions); + } + + private void testWritePartition(long numFiles, long fileSize, long expectedNumPartitions) + throws Exception { + final List expectedPartitionIds = Lists.newArrayList(); + for (long i = 1; i <= expectedNumPartitions; ++i) { + expectedPartitionIds.add(i); + } + + final List> files = Lists.newArrayList(); + final List fileNames = Lists.newArrayList(); + for (int i = 0; i < numFiles; ++i) { + String fileName = String.format("files%05d", i); + fileNames.add(fileName); + files.add(KV.of(fileName, fileSize)); + } + + final PCollectionView>> filesView = PCollectionViews.iterableView( + TestPipeline.create(), + WindowingStrategy.globalDefault(), + KvCoder.of(StringUtf8Coder.of(), VarLongCoder.of())); + + WritePartition writePartition = new WritePartition(filesView); + + DoFnTester>> tester = DoFnTester.of(writePartition); + tester.setSideInput(filesView, GlobalWindow.INSTANCE, files); + tester.processElement(1); + + List>> partitions = tester.takeOutputElements(); + List partitionIds = Lists.newArrayList(); + List partitionFileNames = Lists.newArrayList(); + for (KV> partition : partitions) { + partitionIds.add(partition.getKey()); + for (String name : partition.getValue()) { + partitionFileNames.add(name); + } + } + + assertEquals(expectedPartitionIds, partitionIds); + assertEquals(fileNames, partitionFileNames); + } + + @Test + public void testWriteTempTables() throws Exception { + FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() + .withJobService(new FakeJobService() + .startJobReturns("done", "done", "done", "done") + .pollJobReturns(Status.FAILED, Status.SUCCEEDED, Status.SUCCEEDED, Status.SUCCEEDED)) + .withDatasetService(mockDatasetService); + + final long numPartitions = 3; + final long numFilesPerPartition = 10; + final String jobIdToken = "jobIdToken"; + final String tempFilePrefix = "tempFilePrefix"; + final String jsonTable = "{}"; + final String jsonSchema = "{}"; + final List expectedTempTables = Lists.newArrayList(); + + final List>>> partitions = Lists.newArrayList(); + for (long i = 0; i < numPartitions; ++i) { + List filesPerPartition = Lists.newArrayList(); + for (int j = 0; j < numFilesPerPartition; ++j) { + filesPerPartition.add(String.format("files%05d", j)); + } + partitions.add(KV.of(i, (Iterable>) Collections.singleton(filesPerPartition))); + expectedTempTables.add(String.format("{\"tableId\":\"%s_%05d\"}", jobIdToken, i)); + } + + WriteTempTables writeTempTables = new WriteTempTables( + fakeBqServices, + jobIdToken, + tempFilePrefix, + jsonTable, + jsonSchema); + + DoFnTester>>, String> tester = DoFnTester.of(writeTempTables); + for (KV>> partition : partitions) { + tester.processElement(partition); + } + + List tempTables = tester.takeOutputElements(); + + logged.verifyInfo("Starting BigQuery load job"); + logged.verifyInfo("Previous load jobs failed, retrying."); + + assertEquals(expectedTempTables, tempTables); + } + + @Test + public void testWriteRename() throws Exception { + FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() + .withJobService(new FakeJobService() + .startJobReturns("done", "done", "done", "done") + .pollJobReturns(Status.FAILED, Status.SUCCEEDED, Status.SUCCEEDED, Status.SUCCEEDED)) + .withDatasetService(mockDatasetService); + + final long numTempTables = 3; + final String jobIdToken = "jobIdToken"; + final String jsonTable = "{}"; + final List tempTables = Lists.newArrayList(); + for (long i = 0; i < numTempTables; ++i) { + tempTables.add(String.format("{\"tableId\":\"%s_%05d\"}", jobIdToken, i)); + } + + final PCollectionView> tempTablesView = PCollectionViews.iterableView( + TestPipeline.create(), + WindowingStrategy.globalDefault(), + StringUtf8Coder.of()); + + WriteRename writeRename = new WriteRename( + fakeBqServices, + jobIdToken, + jsonTable, + WriteDisposition.WRITE_EMPTY, + CreateDisposition.CREATE_IF_NEEDED, + tempTablesView); + + DoFnTester tester = DoFnTester.of(writeRename); + tester.setSideInput(tempTablesView, GlobalWindow.INSTANCE, tempTables); + tester.processElement(1); + + logged.verifyInfo("Starting BigQuery copy job"); + logged.verifyInfo("Previous copy jobs failed, retrying."); + } } From 0940a57d4a8da634aff3132d037d125289ac734c Mon Sep 17 00:00:00 2001 From: Ian Zhou Date: Mon, 25 Jul 2016 11:59:55 -0700 Subject: [PATCH 3/7] Modified Javadoc --- .../io/gcp/bigquery/BigQueryServicesImpl.java | 36 +++++++++---------- .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 4 +-- 2 files changed, 20 insertions(+), 20 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java index 5fa0f91f8943..c23e0a525b4f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java @@ -125,9 +125,9 @@ private JobServiceImpl(BigQueryOptions options) { /** * {@inheritDoc} * - *

the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds. + *

Tries executing the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds. * - * @throws IOException if it exceeds max RPC . + * @throws IOException if it exceeds {@code MAX_RPC_ATTEMPTS} attempts. */ @Override public void startLoadJob( @@ -143,9 +143,9 @@ public void startLoadJob( /** * {@inheritDoc} * - *

the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds. + *

Tries executing the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds. * - * @throws IOException if it exceeds max RPC . + * @throws IOException if it exceeds {@code MAX_RPC_ATTEMPTS} attempts. */ @Override public void startExtractJob(JobReference jobRef, JobConfigurationExtract extractConfig) @@ -161,9 +161,9 @@ public void startExtractJob(JobReference jobRef, JobConfigurationExtract extract /** * {@inheritDoc} * - *

the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds. + *

Tries executing the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds. * - * @throws IOException if it exceeds max RPC . + * @throws IOException if it exceeds {@code MAX_RPC_ATTEMPTS} attempts. */ @Override public void startQueryJob(JobReference jobRef, JobConfigurationQuery queryConfig) @@ -179,9 +179,9 @@ public void startQueryJob(JobReference jobRef, JobConfigurationQuery queryConfig /** * {@inheritDoc} * - *

the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds. + *

Tries executing the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds. * - * @throws IOException if it exceeds max RPC . + * @throws IOException if it exceeds {@code MAX_RPC_ATTEMPTS} attempts. */ @Override public void startCopyJob(JobReference jobRef, JobConfigurationTableCopy copyConfig) @@ -339,9 +339,9 @@ private DatasetServiceImpl(BigQueryOptions bqOptions) { /** * {@inheritDoc} * - *

the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds. + *

Tries executing the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds. * - * @throws IOException if it exceeds max RPC . + * @throws IOException if it exceeds {@code MAX_RPC_ATTEMPTS} attempts. */ @Override public Table getTable(String projectId, String datasetId, String tableId) @@ -360,9 +360,9 @@ public Table getTable(String projectId, String datasetId, String tableId) /** * {@inheritDoc} * - *

the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds. + *

Tries executing the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds. * - * @throws IOException if it exceeds max RPC . + * @throws IOException if it exceeds {@code MAX_RPC_ATTEMPTS} attempts. */ @Override public void deleteTable(String projectId, String datasetId, String tableId) @@ -396,9 +396,9 @@ public boolean isTableEmpty(String projectId, String datasetId, String tableId) /** * {@inheritDoc} * - *

the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds. + *

Tries executing the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds. * - * @throws IOException if it exceeds max RPC . + * @throws IOException if it exceeds {@code MAX_RPC_ATTEMPTS} attempts. */ @Override public Dataset getDataset(String projectId, String datasetId) @@ -417,9 +417,9 @@ public Dataset getDataset(String projectId, String datasetId) /** * {@inheritDoc} * - *

the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds. + *

Tries executing the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds. * - * @throws IOException if it exceeds max RPC . + * @throws IOException if it exceeds {@code MAX_RPC_ATTEMPTS} attempts. */ @Override public void createDataset( @@ -475,9 +475,9 @@ void createDataset( /** * {@inheritDoc} * - *

the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds. + *

Tries executing the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds. * - * @throws IOException if it exceeds max RPC . + * @throws IOException if it exceeds {@code MAX_RPC_ATTEMPTS} attempts. */ @Override public void deleteDataset(String projectId, String datasetId) diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index 9346e3b343ae..8eafef2114eb 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -1362,8 +1362,8 @@ public void testWriteTempTables() throws Exception { public void testWriteRename() throws Exception { FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() .withJobService(new FakeJobService() - .startJobReturns("done", "done", "done", "done") - .pollJobReturns(Status.FAILED, Status.SUCCEEDED, Status.SUCCEEDED, Status.SUCCEEDED)) + .startJobReturns("done", "done") + .pollJobReturns(Status.FAILED, Status.SUCCEEDED)) .withDatasetService(mockDatasetService); final long numTempTables = 3; From 08ff102c28f23631b846f27478f3e16854bd9878 Mon Sep 17 00:00:00 2001 From: Ian Zhou Date: Tue, 26 Jul 2016 10:36:32 -0700 Subject: [PATCH 4/7] Deleted LOG.info --- .../java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index bacc5968a898..e8d7c2c0601d 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -1960,9 +1960,7 @@ public void write(TableRow value) throws Exception { public final KV close() throws IOException { channel.close(); - KV result = KV.of(fileName, out.getCount()); - LOG.info("Result for bundle {}: {}, {}", this.id, fileName, result.getValue()); - return result; + return KV.of(fileName, out.getCount()); } } } From a9eec9820d4b5522d9372a81a13ec9c1ead76b92 Mon Sep 17 00:00:00 2001 From: Ian Zhou Date: Tue, 26 Jul 2016 16:26:53 -0700 Subject: [PATCH 5/7] Refactored to remove BigQueryWrite --- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 796 +++++++++--------- .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 19 +- 2 files changed, 413 insertions(+), 402 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index e8d7c2c0601d..32d4747bbf7b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -1424,6 +1424,21 @@ public static Bound withoutValidation() { * {@link PCollection} of {@link TableRow TableRows} to a BigQuery table. */ public static class Bound extends PTransform, PDone> { + // Maximum number of files in a single partition. + static final int MAX_NUM_FILES = 10000; + + // Maximum number of bytes in a single partition. + static final long MAX_SIZE_BYTES = 3 * (1L << 40); + + // The maximum number of retry jobs. + static final int MAX_RETRY_JOBS = 3; + + // The maximum number of retries to poll the status of a job. + // It sets to {@code Integer.MAX_VALUE} to block until the BigQuery job finishes. + static final int LOAD_JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE; + + private static final String TEMPORARY_FILENAME_SEPARATOR = "-temp-"; + @Nullable final String jsonTableRef; @Nullable final SerializableFunction tableRefFunction; @@ -1678,7 +1693,8 @@ public void validate(PCollection input) { @Override public PDone apply(PCollection input) { - BigQueryOptions options = input.getPipeline().getOptions().as(BigQueryOptions.class); + Pipeline p = input.getPipeline(); + BigQueryOptions options = p.getOptions().as(BigQueryOptions.class); BigQueryServices bqServices = getBigQueryServices(); // In a streaming job, or when a tablespec function is defined, we use StreamWithDeDup @@ -1698,7 +1714,7 @@ public PDone apply(PCollection input) { try { IOChannelFactory factory = IOChannelUtils.getFactory(tempLocation); tempFilePrefix = factory.resolve( - factory.resolve(tempLocation, "BigQuerySinkTemp"), + factory.resolve(tempLocation, "BigQueryWriteTemp"), jobIdToken); } catch (IOException e) { throw new RuntimeException( @@ -1706,14 +1722,142 @@ public PDone apply(PCollection input) { e); } - return input.apply("Write", new BigQueryWrite( - bqServices, - jobIdToken, - tempFilePrefix, - toJsonString(table), - jsonSchema, - getWriteDisposition(), - getCreateDisposition())); + PCollection singleton = p.apply("Create", Create.of((Void) null)); + + PCollection inputInGlobalWindow = + input.apply( + Window.into(new GlobalWindows()) + .triggering(DefaultTrigger.of()) + .discardingFiredPanes()); + + PCollection> results = inputInGlobalWindow + .apply("WriteBundles", + ParDo.of(new WriteBundles(input.getCoder(), tempFilePrefix))); + + PCollectionView>> resultsView + = results.apply("ResultsView", View.>asIterable()); + PCollection>> partitions = singleton + .apply(ParDo.of(new WritePartition(resultsView)).withSideInputs(resultsView)); + + PCollection tempTables = partitions + .apply("PartitionsGroupByKey", GroupByKey.>create()) + .apply(ParDo.of(new WriteTempTables( + bqServices, + jobIdToken, + tempFilePrefix, + toJsonString(table), + jsonSchema))); + + PCollectionView> tempTablesView + = tempTables.apply("TempTablesView", View.asIterable()); + singleton.apply(ParDo + .of(new WriteRename( + bqServices, + jobIdToken, + toJsonString(table), + writeDisposition, + createDisposition, + tempTablesView)) + .withSideInputs(tempTablesView)); + + return PDone.in(input.getPipeline()); + } + + private class WriteBundles extends DoFn> { + private TableRowWriter writer = null; + private Coder coder; + private String tempFilePrefix; + + WriteBundles(Coder coder, String tempFilePrefix) { + this.coder = coder; + this.tempFilePrefix = tempFilePrefix; + } + + @Override + public void processElement(ProcessContext c) throws Exception { + if (writer == null) { + writer = new TableRowWriter(tempFilePrefix, coder); + writer.open(UUID.randomUUID().toString()); + LOG.debug("Done opening writer {}", writer); + } + try { + writer.write(c.element()); + } catch (Exception e) { + // Discard write result and close the write. + try { + writer.close(); + // The writer does not need to be reset, as this DoFn cannot be reused. + } catch (Exception closeException) { + // Do not mask the exception that caused the write to fail. + e.addSuppressed(closeException); + } + throw e; + } + } + + @Override + public void finishBundle(Context c) throws Exception { + if (writer != null) { + KV result = writer.close(); + c.output(result); + writer = null; + } + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + + builder + .addIfNotNull(DisplayData.item("tempFilePrefix", tempFilePrefix) + .withLabel("Temporary File Prefix")); + } + } + + static class TableRowWriter { + private static final byte[] NEWLINE = "\n".getBytes(StandardCharsets.UTF_8); + private String id; + private String tempFilePrefix; + private String fileName; + private WritableByteChannel channel; + protected String mimeType = MimeTypes.TEXT; + private final Coder coder; + private CountingOutputStream out; + + TableRowWriter(String basename, Coder coder) { + this.tempFilePrefix = basename; + this.coder = coder; + } + + public final void open(String uId) throws Exception { + id = uId; + fileName = tempFilePrefix + id; + LOG.debug("Opening {}.", fileName); + channel = IOChannelUtils.create(fileName, mimeType); + try { + out = new CountingOutputStream(Channels.newOutputStream(channel)); + LOG.debug("Writing header to {}.", fileName); + } catch (Exception e) { + try { + LOG.error("Writing header to {} failed, closing channel.", fileName); + channel.close(); + } catch (IOException closeException) { + LOG.error("Closing channel for {} failed: {}", fileName, closeException.getMessage()); + } + throw e; + } + LOG.debug("Starting write of bundle {} to {}.", this.id, fileName); + } + + public void write(TableRow value) throws Exception { + coder.encode(value, out, Context.OUTER); + out.write(NEWLINE); + } + + public final KV close() throws IOException { + channel.close(); + return KV.of(fileName, out.getCount()); + } } @Override @@ -1794,424 +1938,292 @@ private BigQueryServices getBigQueryServices() { } } - /** Disallow construction of utility class. */ - private Write() {} - } - - /** - * A {@link PTransform} called by BigQueryIO.Write that performs the write of {@link PCollection} - * containing {@link TableRow TableRows} to a BigQuery table. - */ - public static class BigQueryWrite - extends PTransform, PDone> { - static final int MAX_NUM_FILES = 10000; - static final long MAX_SIZE_BYTES = 3 * (1L << 40); - static final int MAX_RETRY_JOBS = 3; - static final int LOAD_JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE; - private static final String TEMPORARY_FILENAME_SEPARATOR = "-temp-"; - private BigQueryServices bqServices; - private String jobIdToken; - private String tempFilePrefix; - private String jsonTable; - private String jsonSchema; - private WriteDisposition writeDisposition; - private CreateDisposition createDisposition; - - public BigQueryWrite( - BigQueryServices bqServices, - String jobIdToken, - String tempFilePrefix, - String jsonTable, - String jsonSchema, - WriteDisposition writeDisposition, - CreateDisposition createDisposition) { - this.bqServices = bqServices; - this.jobIdToken = jobIdToken; - this.tempFilePrefix = tempFilePrefix + TEMPORARY_FILENAME_SEPARATOR; - this.jsonTable = jsonTable; - this.jsonSchema = jsonSchema; - this.writeDisposition = writeDisposition; - this.createDisposition = createDisposition; - } - - @Override - public PDone apply(PCollection input) { - Pipeline p = input.getPipeline(); - - PCollection inputInGlobalWindow = - input.apply( - Window.into(new GlobalWindows()) - .triggering(DefaultTrigger.of()) - .discardingFiredPanes()); - - PCollection> results = inputInGlobalWindow - .apply("WriteBundles", - ParDo.of(new WriteBundles(input.getCoder()))); - - PCollectionView>> resultsView - = results.apply("ResultsView", View.>asIterable()); - PCollection>> partitions = p.apply("PartitionsCreate", Create.of(1)) - .apply(ParDo.of(new WritePartition(resultsView)).withSideInputs(resultsView)); - - PCollection tempTables = partitions - .apply("PartitionsGroupByKey", GroupByKey.>create()) - .apply(ParDo.of(new WriteTempTables( - bqServices, - jobIdToken, - tempFilePrefix, - jsonTable, - jsonSchema))); - - PCollectionView> tempTablesView - = tempTables.apply("TempTablesView", View.asIterable()); - p.apply("RenameCreate", Create.of(1)) - .apply(ParDo - .of(new WriteRename( - bqServices, - jobIdToken, - jsonTable, - writeDisposition, - createDisposition, - tempTablesView)) - .withSideInputs(tempTablesView)); - - return PDone.in(input.getPipeline()); - } - - private class WriteBundles extends DoFn> { - private TableRowWriter writer = null; - private Coder coder; + /** + * Partitions temporary files based on number of files and file sizes. + */ + static class WritePartition extends DoFn>> { + private final PCollectionView>> resultsView; - WriteBundles(Coder coder) { - this.coder = coder; + public WritePartition(PCollectionView>> resultsView) { + this.resultsView = resultsView; } @Override public void processElement(ProcessContext c) throws Exception { - if (writer == null) { - writer = new TableRowWriter(tempFilePrefix, coder); - writer.open(UUID.randomUUID().toString()); - LOG.debug("Done opening writer {}", writer); - } - try { - writer.write(c.element()); - } catch (Exception e) { - // Discard write result and close the write. - try { - writer.close(); - // The writer does not need to be reset, as this DoFn cannot be reused. - } catch (Exception closeException) { - // Do not mask the exception that caused the write to fail. - e.addSuppressed(closeException); + List> results = Lists.newArrayList(c.sideInput(resultsView)); + long partitionId = 0; + int currNumFiles = 0; + long currSizeBytes = 0; + List currResults = Lists.newArrayList(); + for (int i = 0; i < results.size(); ++i) { + KV fileResult = results.get(i); + ++currNumFiles; + currSizeBytes += fileResult.getValue(); + currResults.add(fileResult.getKey()); + if (currNumFiles >= Bound.MAX_NUM_FILES + || currSizeBytes >= Bound.MAX_SIZE_BYTES + || i == results.size() - 1) { + c.output(KV.of(++partitionId, currResults)); + currResults = Lists.newArrayList(); + currNumFiles = 0; + currSizeBytes = 0; } - throw e; } } @Override - public void finishBundle(Context c) throws Exception { - if (writer != null) { - KV result = writer.close(); - c.output(result); - writer = null; - } + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); } } - static class TableRowWriter { - private static final byte[] NEWLINE = "\n".getBytes(StandardCharsets.UTF_8); - private String id; + /** + * Writes partitions to separate temporary tables. + */ + static class WriteTempTables extends DoFn>>, String> { + private BigQueryServices bqServices; + private String jobIdToken; private String tempFilePrefix; - private String fileName; - private WritableByteChannel channel; - protected String mimeType = MimeTypes.TEXT; - private final Coder coder; - private CountingOutputStream out; - - TableRowWriter(String basename, Coder coder) { - this.tempFilePrefix = basename; - this.coder = coder; - } - - public final void open(String uId) throws Exception { - id = uId; - fileName = tempFilePrefix + id; - LOG.debug("Opening {}.", fileName); - channel = IOChannelUtils.create(fileName, mimeType); - try { - out = new CountingOutputStream(Channels.newOutputStream(channel)); - LOG.debug("Writing header to {}.", fileName); - } catch (Exception e) { - try { - LOG.error("Writing header to {} failed, closing channel.", fileName); - channel.close(); - } catch (IOException closeException) { - LOG.error("Closing channel for {} failed: {}", fileName, closeException.getMessage()); + private String jsonTableRef; + private String jsonSchema; + + public WriteTempTables( + BigQueryServices bqServices, + String jobIdToken, + String tempFilePrefix, + String jsonTableRef, + String jsonSchema) { + this.bqServices = bqServices; + this.jobIdToken = jobIdToken; + this.tempFilePrefix = tempFilePrefix; + this.jsonTableRef = jsonTableRef; + this.jsonSchema = jsonSchema; + } + + @Override + public void processElement(ProcessContext c) throws Exception { + List partition = Lists.newArrayList(c.element().getValue()).get(0); + String jobIdPrefix = String.format(jobIdToken + "_%05d", c.element().getKey()); + TableReference ref = fromJsonString(jsonTableRef, TableReference.class) + .setTableId(jobIdPrefix); + load( + bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)), + jobIdPrefix, + ref, + fromJsonString(jsonSchema, TableSchema.class), + partition, + WriteDisposition.WRITE_EMPTY, + CreateDisposition.CREATE_IF_NEEDED); + c.output(toJsonString(ref)); + removeTemporaryFiles(c.getPipelineOptions(), partition); + } + + private void load( + JobService jobService, + String jobIdPrefix, + TableReference ref, + @Nullable TableSchema schema, + List gcsUris, + WriteDisposition writeDisposition, + CreateDisposition createDisposition) throws InterruptedException, IOException { + JobConfigurationLoad loadConfig = new JobConfigurationLoad() + .setDestinationTable(ref) + .setSchema(schema) + .setSourceUris(gcsUris) + .setWriteDisposition(writeDisposition.name()) + .setCreateDisposition(createDisposition.name()) + .setSourceFormat("NEWLINE_DELIMITED_JSON"); + + boolean retrying = false; + String projectId = ref.getProjectId(); + for (int i = 0; i < Bound.MAX_RETRY_JOBS; ++i) { + String jobId = jobIdPrefix + "-" + i; + if (retrying) { + LOG.info("Previous load jobs failed, retrying."); + } + LOG.info("Starting BigQuery load job: {}", jobId); + JobReference jobRef = new JobReference() + .setProjectId(projectId) + .setJobId(jobId); + jobService.startLoadJob(jobRef, loadConfig); + Status jobStatus = + parseStatus(jobService.pollJob(jobRef, Bound.LOAD_JOB_POLL_MAX_RETRIES)); + switch (jobStatus) { + case SUCCEEDED: + return; + case UNKNOWN: + throw new RuntimeException("Failed to poll the load job status."); + case FAILED: + LOG.info("BigQuery load job failed: {}", jobId); + retrying = true; + continue; + default: + throw new IllegalStateException("Unexpected job status: " + jobStatus); } - throw e; } - LOG.debug("Starting write of bundle {} to {}.", this.id, fileName); + throw new RuntimeException( + "Failed to create the load job, reached max retries: " + Bound.MAX_RETRY_JOBS); + } + + private void removeTemporaryFiles(PipelineOptions options, Collection matches) + throws IOException { + String pattern = tempFilePrefix + "*"; + LOG.debug("Finding temporary bundle output files matching {}.", pattern); + IOChannelFactory factory = IOChannelUtils.getFactory(pattern); + if (factory instanceof GcsIOChannelFactory) { + GcsUtil gcsUtil = new GcsUtil.GcsUtilFactory().create(options); + gcsUtil.remove(matches); + } else if (factory instanceof FileIOChannelFactory) { + for (String filename : matches) { + LOG.debug("Removing file {}", filename); + boolean exists = Files.deleteIfExists(Paths.get(filename)); + if (!exists) { + LOG.debug("{} does not exist.", filename); + } + } + } else { + throw new IOException("Unrecognized file system."); + } } - public void write(TableRow value) throws Exception { - coder.encode(value, out, Context.OUTER); - out.write(NEWLINE); - } + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); - public final KV close() throws IOException { - channel.close(); - return KV.of(fileName, out.getCount()); + builder + .addIfNotNull(DisplayData.item("jobIdToken", jobIdToken) + .withLabel("Job ID Token")) + .addIfNotNull(DisplayData.item("tempFilePrefix", tempFilePrefix) + .withLabel("Temporary File Prefix")) + .addIfNotNull(DisplayData.item("jsonTableRef", jsonTableRef) + .withLabel("Table Reference")) + .addIfNotNull(DisplayData.item("jsonSchema", jsonSchema) + .withLabel("Table Schema")); } } - } - - /** - * Partitions temporary files based on number of files and file sizes. - */ - static class WritePartition extends DoFn>> { - private final PCollectionView>> resultsView; - - public WritePartition(PCollectionView>> resultsView) { - this.resultsView = resultsView; - } - @Override - public void processElement(ProcessContext c) throws Exception { - List> results = Lists.newArrayList(c.sideInput(resultsView)); - long partitionId = 0; - int currNumFiles = 0; - long currSizeBytes = 0; - List currResults = Lists.newArrayList(); - for (int i = 0; i < results.size(); ++i) { - KV fileResult = results.get(i); - ++currNumFiles; - currSizeBytes += fileResult.getValue(); - currResults.add(fileResult.getKey()); - if (currNumFiles >= BigQueryWrite.MAX_NUM_FILES - || currSizeBytes >= BigQueryWrite.MAX_SIZE_BYTES - || i == results.size() - 1) { - c.output(KV.of(++partitionId, currResults)); - currResults = Lists.newArrayList(); - currNumFiles = 0; - currSizeBytes = 0; - } + /** + * Copies temporary tables to destination table. + */ + static class WriteRename extends DoFn { + private BigQueryServices bqServices; + private String jobIdToken; + private String jsonTableRef; + private WriteDisposition writeDisposition; + private CreateDisposition createDisposition; + private List tempTables; + private final PCollectionView> tempTablesView; + + public WriteRename( + BigQueryServices bqServices, + String jobIdToken, + String jsonTableRef, + WriteDisposition writeDisposition, + CreateDisposition createDisposition, + PCollectionView> tempTablesView) { + this.bqServices = bqServices; + this.jobIdToken = jobIdToken; + this.jsonTableRef = jsonTableRef; + this.writeDisposition = writeDisposition; + this.createDisposition = createDisposition; + this.tempTablesView = tempTablesView; + tempTables = Lists.newArrayList(); } - } - } - - /** - * Writes partitions to separate temporary tables. - */ - static class WriteTempTables extends DoFn>>, String> { - private BigQueryServices bqServices; - private String jobIdToken; - private String tempFilePrefix; - private String jsonTable; - private String jsonSchema; - - public WriteTempTables( - BigQueryServices bqServices, - String jobIdToken, - String tempFilePrefix, - String jsonTable, - String jsonSchema) { - this.bqServices = bqServices; - this.jobIdToken = jobIdToken; - this.tempFilePrefix = tempFilePrefix; - this.jsonTable = jsonTable; - this.jsonSchema = jsonSchema; - } - @Override - public void processElement(ProcessContext c) throws Exception { - List partition = Lists.newArrayList(c.element().getValue()).get(0); - String jobIdPrefix = String.format(jobIdToken + "_%05d", c.element().getKey()); - TableReference ref = fromJsonString(jsonTable, TableReference.class) - .setTableId(jobIdPrefix); - load( - bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)), - jobIdPrefix, - ref, - fromJsonString(jsonSchema, TableSchema.class), - partition, - WriteDisposition.WRITE_EMPTY, - CreateDisposition.CREATE_IF_NEEDED); - c.output(toJsonString(ref)); - removeTemporaryFiles(c.getPipelineOptions(), partition); - } - - private void load( - JobService jobService, - String jobIdPrefix, - TableReference ref, - @Nullable TableSchema schema, - List gcsUris, - WriteDisposition writeDisposition, - CreateDisposition createDisposition) throws InterruptedException, IOException { - JobConfigurationLoad loadConfig = new JobConfigurationLoad() - .setDestinationTable(ref) - .setSchema(schema) - .setSourceUris(gcsUris) - .setWriteDisposition(writeDisposition.name()) - .setCreateDisposition(createDisposition.name()) - .setSourceFormat("NEWLINE_DELIMITED_JSON"); - - boolean retrying = false; - String projectId = ref.getProjectId(); - for (int i = 0; i < BigQueryWrite.MAX_RETRY_JOBS; ++i) { - String jobId = jobIdPrefix + "-" + i; - if (retrying) { - LOG.info("Previous load jobs failed, retrying."); - } - LOG.info("Starting BigQuery load job: {}", jobId); - JobReference jobRef = new JobReference() - .setProjectId(projectId) - .setJobId(jobId); - jobService.startLoadJob(jobRef, loadConfig); - Status jobStatus = - parseStatus(jobService.pollJob(jobRef, BigQueryWrite.LOAD_JOB_POLL_MAX_RETRIES)); - switch (jobStatus) { - case SUCCEEDED: - return; - case UNKNOWN: - throw new RuntimeException("Failed to poll the load job status."); - case FAILED: - LOG.info("BigQuery load job failed: {}", jobId); - retrying = true; - continue; - default: - throw new IllegalStateException("Unexpected job status: " + jobStatus); + @Override + public void processElement(ProcessContext c) throws Exception { + List tempTablesJson = Lists.newArrayList(c.sideInput(tempTablesView)); + for (String table : tempTablesJson) { + tempTables.add(fromJsonString(table, TableReference.class)); } + copy( + bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)), + jobIdToken, + fromJsonString(jsonTableRef, TableReference.class), + tempTables, + writeDisposition, + createDisposition); } - throw new RuntimeException( - "Failed to create the load job, reached max retries: " + BigQueryWrite.MAX_RETRY_JOBS); - } - private void removeTemporaryFiles(PipelineOptions options, Collection matches) - throws IOException { - String pattern = tempFilePrefix + "*"; - LOG.debug("Finding temporary bundle output files matching {}.", pattern); - IOChannelFactory factory = IOChannelUtils.getFactory(pattern); - if (factory instanceof GcsIOChannelFactory) { - GcsUtil gcsUtil = new GcsUtil.GcsUtilFactory().create(options); - gcsUtil.remove(matches); - } else if (factory instanceof FileIOChannelFactory) { - for (String filename : matches) { - LOG.debug("Removing file {}", filename); - boolean exists = Files.deleteIfExists(Paths.get(filename)); - if (!exists) { - LOG.debug("{} does not exist.", filename); + @Override + public void finishBundle(Context c) throws Exception { + DatasetService tableService = + bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)); + removeTemporaryTables(tableService, tempTables); + } + + private void copy( + JobService jobService, + String jobIdPrefix, + TableReference ref, + List tempTables, + WriteDisposition writeDisposition, + CreateDisposition createDisposition) throws InterruptedException, IOException { + JobConfigurationTableCopy copyConfig = new JobConfigurationTableCopy() + .setSourceTables(tempTables) + .setDestinationTable(ref) + .setWriteDisposition(writeDisposition.name()) + .setCreateDisposition(createDisposition.name()); + + boolean retrying = false; + String projectId = ref.getProjectId(); + for (int i = 0; i < Bound.MAX_RETRY_JOBS; ++i) { + String jobId = jobIdPrefix + "-" + i; + if (retrying) { + LOG.info("Previous copy jobs failed, retrying."); + } + LOG.info("Starting BigQuery copy job: {}", jobId); + JobReference jobRef = new JobReference() + .setProjectId(projectId) + .setJobId(jobId); + jobService.startCopyJob(jobRef, copyConfig); + Status jobStatus = + parseStatus(jobService.pollJob(jobRef, Bound.LOAD_JOB_POLL_MAX_RETRIES)); + switch (jobStatus) { + case SUCCEEDED: + return; + case UNKNOWN: + throw new RuntimeException("Failed to poll the copy job status."); + case FAILED: + LOG.info("BigQuery copy job failed: {}", jobId); + retrying = true; + continue; + default: + throw new IllegalStateException("Unexpected job status: " + jobStatus); } } - } else { - throw new IOException("Unrecognized file system."); - } - } - } - - /** - * Copies temporary tables to destination table. - */ - static class WriteRename extends DoFn { - private BigQueryServices bqServices; - private String jobIdToken; - private String jsonTable; - private WriteDisposition writeDisposition; - private CreateDisposition createDisposition; - private List tempTables; - private final PCollectionView> tempTablesView; - - public WriteRename( - BigQueryServices bqServices, - String jobIdToken, - String jsonTable, - WriteDisposition writeDisposition, - CreateDisposition createDisposition, - PCollectionView> tempTablesView) { - this.bqServices = bqServices; - this.jobIdToken = jobIdToken; - this.jsonTable = jsonTable; - this.writeDisposition = writeDisposition; - this.createDisposition = createDisposition; - this.tempTablesView = tempTablesView; - tempTables = Lists.newArrayList(); - } - - @Override - public void processElement(ProcessContext c) throws Exception { - List tempTablesJson = Lists.newArrayList(c.sideInput(tempTablesView)); - for (String table : tempTablesJson) { - tempTables.add(fromJsonString(table, TableReference.class)); + throw new RuntimeException( + "Failed to create the copy job, reached max retries: " + Bound.MAX_RETRY_JOBS); } - copy( - bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)), - jobIdToken, - fromJsonString(jsonTable, TableReference.class), - tempTables, - writeDisposition, - createDisposition); - } - @Override - public void finishBundle(Context c) throws Exception { - DatasetService tableService = - bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)); - removeTemporaryTables(tableService, tempTables); - } - - private void copy( - JobService jobService, - String jobIdPrefix, - TableReference ref, - List tempTables, - WriteDisposition writeDisposition, - CreateDisposition createDisposition) throws InterruptedException, IOException { - JobConfigurationTableCopy copyConfig = new JobConfigurationTableCopy() - .setSourceTables(tempTables) - .setDestinationTable(ref) - .setWriteDisposition(writeDisposition.name()) - .setCreateDisposition(createDisposition.name()); - - boolean retrying = false; - String projectId = ref.getProjectId(); - for (int i = 0; i < BigQueryWrite.MAX_RETRY_JOBS; ++i) { - String jobId = jobIdPrefix + "-" + i; - if (retrying) { - LOG.info("Previous copy jobs failed, retrying."); - } - LOG.info("Starting BigQuery copy job: {}", jobId); - JobReference jobRef = new JobReference() - .setProjectId(projectId) - .setJobId(jobId); - jobService.startCopyJob(jobRef, copyConfig); - Status jobStatus = - parseStatus(jobService.pollJob(jobRef, BigQueryWrite.LOAD_JOB_POLL_MAX_RETRIES)); - switch (jobStatus) { - case SUCCEEDED: - return; - case UNKNOWN: - throw new RuntimeException("Failed to poll the copy job status."); - case FAILED: - LOG.info("BigQuery copy job failed: {}", jobId); - retrying = true; - continue; - default: - throw new IllegalStateException("Unexpected job status: " + jobStatus); + private void removeTemporaryTables(DatasetService tableService, + List tempTables) throws Exception { + for (TableReference tableRef : tempTables) { + tableService.deleteTable( + tableRef.getProjectId(), + tableRef.getDatasetId(), + tableRef.getTableId()); } } - throw new RuntimeException( - "Failed to create the copy job, reached max retries: " + BigQueryWrite.MAX_RETRY_JOBS); - } - private void removeTemporaryTables(DatasetService tableService, - List tempTables) throws Exception { - for (TableReference tableRef : tempTables) { - tableService.deleteTable( - tableRef.getProjectId(), - tableRef.getDatasetId(), - tableRef.getTableId()); + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + + builder + .addIfNotNull(DisplayData.item("jobIdToken", jobIdToken) + .withLabel("Job ID Token")) + .addIfNotNull(DisplayData.item("jsonTableRef", jsonTableRef) + .withLabel("Table Reference")) + .add(DisplayData.item("writeDisposition", writeDisposition.toString()) + .withLabel("Write Disposition")) + .add(DisplayData.item("createDisposition", createDisposition.toString()) + .withLabel("Create Disposition")); } } + + /** Disallow construction of utility class. */ + private Write() {} } private static void verifyDatasetPresence(DatasetService datasetService, TableReference table) { diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index 8eafef2114eb..09af3fc0f0c2 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -40,16 +40,15 @@ import org.apache.beam.sdk.io.CountingSource; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.BigQueryQuerySource; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.BigQueryTableSource; -import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.BigQueryWrite; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.PassThroughThenCleanup; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.PassThroughThenCleanup.CleanupOperation; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Status; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TransformingSource; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; -import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.WritePartition; -import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.WriteRename; -import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.WriteTempTables; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WritePartition; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteRename; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteTempTables; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService; import org.apache.beam.sdk.options.BigQueryOptions; @@ -1254,7 +1253,7 @@ void cleanup(PipelineOptions options) throws Exception { @Test public void testWritePartitionManyFiles() throws Exception { - final long numFiles = BigQueryWrite.MAX_NUM_FILES * 3; + final long numFiles = BigQueryIO.Write.Bound.MAX_NUM_FILES * 3; final long fileSize = 1; // One partition is needed for each group of BigQueryWrite.MAX_NUM_FILES files @@ -1265,7 +1264,7 @@ public void testWritePartitionManyFiles() throws Exception { @Test public void testWritePartitionLargeFileSize() throws Exception { final long numFiles = 10; - final long fileSize = BigQueryWrite.MAX_SIZE_BYTES / 3; + final long fileSize = BigQueryIO.Write.Bound.MAX_SIZE_BYTES / 3; // One partition is needed for each group of three files final long expectedNumPartitions = 4; @@ -1294,9 +1293,9 @@ private void testWritePartition(long numFiles, long fileSize, long expectedNumPa WritePartition writePartition = new WritePartition(filesView); - DoFnTester>> tester = DoFnTester.of(writePartition); + DoFnTester>> tester = DoFnTester.of(writePartition); tester.setSideInput(filesView, GlobalWindow.INSTANCE, files); - tester.processElement(1); + tester.processElement(null); List>> partitions = tester.takeOutputElements(); List partitionIds = Lists.newArrayList(); @@ -1387,9 +1386,9 @@ public void testWriteRename() throws Exception { CreateDisposition.CREATE_IF_NEEDED, tempTablesView); - DoFnTester tester = DoFnTester.of(writeRename); + DoFnTester tester = DoFnTester.of(writeRename); tester.setSideInput(tempTablesView, GlobalWindow.INSTANCE, tempTables); - tester.processElement(1); + tester.processElement(null); logged.verifyInfo("Starting BigQuery copy job"); logged.verifyInfo("Previous copy jobs failed, retrying."); From 81085082184773268546093c0a46d1b14d3f03b8 Mon Sep 17 00:00:00 2001 From: Ian Zhou Date: Tue, 2 Aug 2016 11:21:13 -0700 Subject: [PATCH 6/7] Style and logger changes --- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 93 ++++++++----------- .../io/gcp/bigquery/BigQueryServicesImpl.java | 18 ++-- .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 7 +- 3 files changed, 50 insertions(+), 68 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 32d4747bbf7b..ee21cd118ade 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -1437,8 +1437,6 @@ public static class Bound extends PTransform, PDone> { // It sets to {@code Integer.MAX_VALUE} to block until the BigQuery job finishes. static final int LOAD_JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE; - private static final String TEMPORARY_FILENAME_SEPARATOR = "-temp-"; - @Nullable final String jsonTableRef; @Nullable final SerializableFunction tableRefFunction; @@ -1459,6 +1457,8 @@ public static class Bound extends PTransform, PDone> { @Nullable private BigQueryServices bigQueryServices; + private static Coder coder; + private static class TranslateTableSpecFunction implements SerializableFunction { private SerializableFunction tableSpecFunction; @@ -1696,6 +1696,7 @@ public PDone apply(PCollection input) { Pipeline p = input.getPipeline(); BigQueryOptions options = p.getOptions().as(BigQueryOptions.class); BigQueryServices bqServices = getBigQueryServices(); + coder = input.getCoder(); // In a streaming job, or when a tablespec function is defined, we use StreamWithDeDup // and BigQuery's streaming import API. @@ -1708,7 +1709,7 @@ public PDone apply(PCollection input) { if (Strings.isNullOrEmpty(table.getProjectId())) { table.setProjectId(options.getProject()); } - final String jobIdToken = "beam_job_" + randomUUIDString(); + String jobIdToken = "beam_job_" + randomUUIDString(); String tempLocation = options.getTempLocation(); String tempFilePrefix; try { @@ -1732,10 +1733,10 @@ public PDone apply(PCollection input) { PCollection> results = inputInGlobalWindow .apply("WriteBundles", - ParDo.of(new WriteBundles(input.getCoder(), tempFilePrefix))); + ParDo.of(new WriteBundles(tempFilePrefix))); - PCollectionView>> resultsView - = results.apply("ResultsView", View.>asIterable()); + PCollectionView>> resultsView = results + .apply("ResultsView", View.>asIterable()); PCollection>> partitions = singleton .apply(ParDo.of(new WritePartition(resultsView)).withSideInputs(resultsView)); @@ -1748,8 +1749,8 @@ public PDone apply(PCollection input) { toJsonString(table), jsonSchema))); - PCollectionView> tempTablesView - = tempTables.apply("TempTablesView", View.asIterable()); + PCollectionView> tempTablesView = tempTables + .apply("TempTablesView", View.asIterable()); singleton.apply(ParDo .of(new WriteRename( bqServices, @@ -1765,18 +1766,16 @@ public PDone apply(PCollection input) { private class WriteBundles extends DoFn> { private TableRowWriter writer = null; - private Coder coder; - private String tempFilePrefix; + private final String tempFilePrefix; - WriteBundles(Coder coder, String tempFilePrefix) { - this.coder = coder; + WriteBundles(String tempFilePrefix) { this.tempFilePrefix = tempFilePrefix; } @Override public void processElement(ProcessContext c) throws Exception { if (writer == null) { - writer = new TableRowWriter(tempFilePrefix, coder); + writer = new TableRowWriter(tempFilePrefix); writer.open(UUID.randomUUID().toString()); LOG.debug("Done opening writer {}", writer); } @@ -1816,17 +1815,15 @@ public void populateDisplayData(DisplayData.Builder builder) { static class TableRowWriter { private static final byte[] NEWLINE = "\n".getBytes(StandardCharsets.UTF_8); + private final String tempFilePrefix; private String id; - private String tempFilePrefix; private String fileName; private WritableByteChannel channel; protected String mimeType = MimeTypes.TEXT; - private final Coder coder; private CountingOutputStream out; - TableRowWriter(String basename, Coder coder) { + TableRowWriter(String basename) { this.tempFilePrefix = basename; - this.coder = coder; } public final void open(String uId) throws Exception { @@ -1842,7 +1839,7 @@ public final void open(String uId) throws Exception { LOG.error("Writing header to {} failed, closing channel.", fileName); channel.close(); } catch (IOException closeException) { - LOG.error("Closing channel for {} failed: {}", fileName, closeException.getMessage()); + LOG.error("Closing channel for {} failed", fileName); } throw e; } @@ -1981,11 +1978,11 @@ public void populateDisplayData(DisplayData.Builder builder) { * Writes partitions to separate temporary tables. */ static class WriteTempTables extends DoFn>>, String> { - private BigQueryServices bqServices; - private String jobIdToken; - private String tempFilePrefix; - private String jsonTableRef; - private String jsonSchema; + private final BigQueryServices bqServices; + private final String jobIdToken; + private final String tempFilePrefix; + private final String jsonTableRef; + private final String jsonSchema; public WriteTempTables( BigQueryServices bqServices, @@ -2034,14 +2031,10 @@ private void load( .setCreateDisposition(createDisposition.name()) .setSourceFormat("NEWLINE_DELIMITED_JSON"); - boolean retrying = false; String projectId = ref.getProjectId(); for (int i = 0; i < Bound.MAX_RETRY_JOBS; ++i) { String jobId = jobIdPrefix + "-" + i; - if (retrying) { - LOG.info("Previous load jobs failed, retrying."); - } - LOG.info("Starting BigQuery load job: {}", jobId); + LOG.info("Starting BigQuery load job {}: try {}/{}", jobId, i, Bound.MAX_RETRY_JOBS); JobReference jobRef = new JobReference() .setProjectId(projectId) .setJobId(jobId); @@ -2052,23 +2045,23 @@ private void load( case SUCCEEDED: return; case UNKNOWN: - throw new RuntimeException("Failed to poll the load job status."); + throw new RuntimeException("Failed to poll the load job status of job " + jobId); case FAILED: LOG.info("BigQuery load job failed: {}", jobId); - retrying = true; continue; default: - throw new IllegalStateException("Unexpected job status: " + jobStatus); + throw new IllegalStateException(String.format("Unexpected job status: %s of job %s", + jobStatus, jobId)); } } - throw new RuntimeException( - "Failed to create the load job, reached max retries: " + Bound.MAX_RETRY_JOBS); + throw new RuntimeException(String.format("Failed to create the load job %s, reached max " + + "retries: %d", jobIdPrefix, Bound.MAX_RETRY_JOBS)); } private void removeTemporaryFiles(PipelineOptions options, Collection matches) throws IOException { String pattern = tempFilePrefix + "*"; - LOG.debug("Finding temporary bundle output files matching {}.", pattern); + LOG.debug("Finding temporary files matching {}.", pattern); IOChannelFactory factory = IOChannelUtils.getFactory(pattern); if (factory instanceof GcsIOChannelFactory) { GcsUtil gcsUtil = new GcsUtil.GcsUtilFactory().create(options); @@ -2106,12 +2099,11 @@ public void populateDisplayData(DisplayData.Builder builder) { * Copies temporary tables to destination table. */ static class WriteRename extends DoFn { - private BigQueryServices bqServices; - private String jobIdToken; - private String jsonTableRef; - private WriteDisposition writeDisposition; - private CreateDisposition createDisposition; - private List tempTables; + private final BigQueryServices bqServices; + private final String jobIdToken; + private final String jsonTableRef; + private final WriteDisposition writeDisposition; + private final CreateDisposition createDisposition; private final PCollectionView> tempTablesView; public WriteRename( @@ -2127,12 +2119,12 @@ public WriteRename( this.writeDisposition = writeDisposition; this.createDisposition = createDisposition; this.tempTablesView = tempTablesView; - tempTables = Lists.newArrayList(); } @Override public void processElement(ProcessContext c) throws Exception { List tempTablesJson = Lists.newArrayList(c.sideInput(tempTablesView)); + List tempTables = Lists.newArrayList(); for (String table : tempTablesJson) { tempTables.add(fromJsonString(table, TableReference.class)); } @@ -2143,10 +2135,7 @@ public void processElement(ProcessContext c) throws Exception { tempTables, writeDisposition, createDisposition); - } - @Override - public void finishBundle(Context c) throws Exception { DatasetService tableService = bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)); removeTemporaryTables(tableService, tempTables); @@ -2165,14 +2154,10 @@ private void copy( .setWriteDisposition(writeDisposition.name()) .setCreateDisposition(createDisposition.name()); - boolean retrying = false; String projectId = ref.getProjectId(); for (int i = 0; i < Bound.MAX_RETRY_JOBS; ++i) { String jobId = jobIdPrefix + "-" + i; - if (retrying) { - LOG.info("Previous copy jobs failed, retrying."); - } - LOG.info("Starting BigQuery copy job: {}", jobId); + LOG.info("Starting BigQuery copy job {}: try {}/{}", jobId, i, Bound.MAX_RETRY_JOBS); JobReference jobRef = new JobReference() .setProjectId(projectId) .setJobId(jobId); @@ -2183,17 +2168,17 @@ private void copy( case SUCCEEDED: return; case UNKNOWN: - throw new RuntimeException("Failed to poll the copy job status."); + throw new RuntimeException("Failed to poll the copy job status of job " + jobId); case FAILED: LOG.info("BigQuery copy job failed: {}", jobId); - retrying = true; continue; default: - throw new IllegalStateException("Unexpected job status: " + jobStatus); + throw new IllegalStateException(String.format("Unexpected job status: %s of job %s", + jobStatus, jobId)); } } - throw new RuntimeException( - "Failed to create the copy job, reached max retries: " + Bound.MAX_RETRY_JOBS); + throw new RuntimeException(String.format("Failed to create the copy job %s, reached max " + + "retries: %d", jobIdPrefix, Bound.MAX_RETRY_JOBS)); } private void removeTemporaryTables(DatasetService tableService, diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java index c23e0a525b4f..bd1097f54c0c 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java @@ -125,7 +125,7 @@ private JobServiceImpl(BigQueryOptions options) { /** * {@inheritDoc} * - *

Tries executing the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds. + *

Tries executing the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds. * * @throws IOException if it exceeds {@code MAX_RPC_ATTEMPTS} attempts. */ @@ -143,7 +143,7 @@ public void startLoadJob( /** * {@inheritDoc} * - *

Tries executing the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds. + *

Tries executing the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds. * * @throws IOException if it exceeds {@code MAX_RPC_ATTEMPTS} attempts. */ @@ -161,7 +161,7 @@ public void startExtractJob(JobReference jobRef, JobConfigurationExtract extract /** * {@inheritDoc} * - *

Tries executing the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds. + *

Tries executing the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds. * * @throws IOException if it exceeds {@code MAX_RPC_ATTEMPTS} attempts. */ @@ -179,7 +179,7 @@ public void startQueryJob(JobReference jobRef, JobConfigurationQuery queryConfig /** * {@inheritDoc} * - *

Tries executing the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds. + *

Tries executing the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds. * * @throws IOException if it exceeds {@code MAX_RPC_ATTEMPTS} attempts. */ @@ -339,7 +339,7 @@ private DatasetServiceImpl(BigQueryOptions bqOptions) { /** * {@inheritDoc} * - *

Tries executing the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds. + *

Tries executing the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds. * * @throws IOException if it exceeds {@code MAX_RPC_ATTEMPTS} attempts. */ @@ -360,7 +360,7 @@ public Table getTable(String projectId, String datasetId, String tableId) /** * {@inheritDoc} * - *

Tries executing the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds. + *

Tries executing the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds. * * @throws IOException if it exceeds {@code MAX_RPC_ATTEMPTS} attempts. */ @@ -396,7 +396,7 @@ public boolean isTableEmpty(String projectId, String datasetId, String tableId) /** * {@inheritDoc} * - *

Tries executing the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds. + *

Tries executing the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds. * * @throws IOException if it exceeds {@code MAX_RPC_ATTEMPTS} attempts. */ @@ -417,7 +417,7 @@ public Dataset getDataset(String projectId, String datasetId) /** * {@inheritDoc} * - *

Tries executing the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds. + *

Tries executing the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds. * * @throws IOException if it exceeds {@code MAX_RPC_ATTEMPTS} attempts. */ @@ -475,7 +475,7 @@ void createDataset( /** * {@inheritDoc} * - *

Tries executing the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds. + *

Tries executing the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds. * * @throws IOException if it exceeds {@code MAX_RPC_ATTEMPTS} attempts. */ diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index 09af3fc0f0c2..f3a0dce8f6f2 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.io.gcp.bigquery; +import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.Bound.MAX_RETRY_JOBS; import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.fromJsonString; import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.toJsonString; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; @@ -605,8 +606,6 @@ public void testCustomSink() throws Exception { logged.verifyInfo("Starting BigQuery load job"); logged.verifyInfo("Starting BigQuery copy job"); - logged.verifyInfo("Previous load jobs failed, retrying."); - logged.verifyInfo("Previous copy jobs failed, retrying."); File tempDir = new File(bqOptions.getTempLocation()); assertEquals(0, tempDir.listFiles(new FileFilter() { @Override @@ -635,7 +634,7 @@ public void testCustomSinkUnknown() throws Exception { .withoutValidation()); thrown.expect(RuntimeException.class); - thrown.expectMessage("Failed to poll the load job status."); + thrown.expectMessage("Failed to poll the load job status"); p.run(); File tempDir = new File(bqOptions.getTempLocation()); @@ -1352,7 +1351,6 @@ public void testWriteTempTables() throws Exception { List tempTables = tester.takeOutputElements(); logged.verifyInfo("Starting BigQuery load job"); - logged.verifyInfo("Previous load jobs failed, retrying."); assertEquals(expectedTempTables, tempTables); } @@ -1391,6 +1389,5 @@ public void testWriteRename() throws Exception { tester.processElement(null); logged.verifyInfo("Starting BigQuery copy job"); - logged.verifyInfo("Previous copy jobs failed, retrying."); } } From 81bacd76bbd156b4d7e961d3676adade16f7e4eb Mon Sep 17 00:00:00 2001 From: Ian Zhou Date: Tue, 2 Aug 2016 16:05:37 -0700 Subject: [PATCH 7/7] Added TupleTag logic for single partitions --- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 219 +++++++++++------- .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 75 ++++-- 2 files changed, 199 insertions(+), 95 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index ee21cd118ade..c943dcab6033 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -34,8 +34,6 @@ import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.io.AvroSource; import org.apache.beam.sdk.io.BoundedSource; -import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; -import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService; import org.apache.beam.sdk.options.BigQueryOptions; @@ -207,7 +205,8 @@ *

See {@link BigQueryIO.Write} for details on how to specify if a write should * append to an existing table, replace the table, or verify that the table is * empty. Note that the dataset being written to must already exist. Unbounded PCollections can only - * be written using {@link WriteDisposition#WRITE_EMPTY} or {@link WriteDisposition#WRITE_APPEND}. + * be written using {@link Write.WriteDisposition#WRITE_EMPTY} or + * {@link Write.WriteDisposition#WRITE_APPEND}. * *

Sharding BigQuery output tables

*

A common use case is to dynamically generate BigQuery table names based on @@ -1457,8 +1456,6 @@ public static class Bound extends PTransform, PDone> { @Nullable private BigQueryServices bigQueryServices; - private static Coder coder; - private static class TranslateTableSpecFunction implements SerializableFunction { private SerializableFunction tableSpecFunction; @@ -1696,7 +1693,6 @@ public PDone apply(PCollection input) { Pipeline p = input.getPipeline(); BigQueryOptions options = p.getOptions().as(BigQueryOptions.class); BigQueryServices bqServices = getBigQueryServices(); - coder = input.getCoder(); // In a streaming job, or when a tablespec function is defined, we use StreamWithDeDup // and BigQuery's streaming import API. @@ -1723,7 +1719,7 @@ public PDone apply(PCollection input) { e); } - PCollection singleton = p.apply("Create", Create.of((Void) null)); + PCollection singleton = p.apply("Create", Create.of(tempFilePrefix)); PCollection inputInGlobalWindow = input.apply( @@ -1735,19 +1731,33 @@ public PDone apply(PCollection input) { .apply("WriteBundles", ParDo.of(new WriteBundles(tempFilePrefix))); + TupleTag>> multiPartitionsTag = + new TupleTag>>("multiPartitionsTag") {}; + TupleTag>> singlePartitionTag = + new TupleTag>>("singlePartitionTag") {}; + PCollectionView>> resultsView = results .apply("ResultsView", View.>asIterable()); - PCollection>> partitions = singleton - .apply(ParDo.of(new WritePartition(resultsView)).withSideInputs(resultsView)); - - PCollection tempTables = partitions - .apply("PartitionsGroupByKey", GroupByKey.>create()) - .apply(ParDo.of(new WriteTempTables( + PCollectionTuple partitions = singleton.apply(ParDo + .of(new WritePartition( + resultsView, + multiPartitionsTag, + singlePartitionTag)) + .withSideInputs(resultsView) + .withOutputTags(multiPartitionsTag, TupleTagList.of(singlePartitionTag))); + + // Write multiple partitions to separate temporary tables + PCollection tempTables = partitions.get(multiPartitionsTag) + .apply("MultiPartitionsGroupByKey", GroupByKey.>create()) + .apply("MultiPartitionsWriteTables", ParDo.of(new WriteTables( + false, bqServices, jobIdToken, tempFilePrefix, toJsonString(table), - jsonSchema))); + jsonSchema, + WriteDisposition.WRITE_EMPTY, + CreateDisposition.CREATE_IF_NEEDED))); PCollectionView> tempTablesView = tempTables .apply("TempTablesView", View.asIterable()); @@ -1761,6 +1771,19 @@ public PDone apply(PCollection input) { tempTablesView)) .withSideInputs(tempTablesView)); + // Write single partition to final table + partitions.get(singlePartitionTag) + .apply("SinglePartitionGroupByKey", GroupByKey.>create()) + .apply("SinglePartitionWriteTables", ParDo.of(new WriteTables( + true, + bqServices, + jobIdToken, + tempFilePrefix, + toJsonString(table), + jsonSchema, + writeDisposition, + createDisposition))); + return PDone.in(input.getPipeline()); } @@ -1797,8 +1820,7 @@ public void processElement(ProcessContext c) throws Exception { @Override public void finishBundle(Context c) throws Exception { if (writer != null) { - KV result = writer.close(); - c.output(result); + c.output(writer.close()); writer = null; } } @@ -1813,50 +1835,6 @@ public void populateDisplayData(DisplayData.Builder builder) { } } - static class TableRowWriter { - private static final byte[] NEWLINE = "\n".getBytes(StandardCharsets.UTF_8); - private final String tempFilePrefix; - private String id; - private String fileName; - private WritableByteChannel channel; - protected String mimeType = MimeTypes.TEXT; - private CountingOutputStream out; - - TableRowWriter(String basename) { - this.tempFilePrefix = basename; - } - - public final void open(String uId) throws Exception { - id = uId; - fileName = tempFilePrefix + id; - LOG.debug("Opening {}.", fileName); - channel = IOChannelUtils.create(fileName, mimeType); - try { - out = new CountingOutputStream(Channels.newOutputStream(channel)); - LOG.debug("Writing header to {}.", fileName); - } catch (Exception e) { - try { - LOG.error("Writing header to {} failed, closing channel.", fileName); - channel.close(); - } catch (IOException closeException) { - LOG.error("Closing channel for {} failed", fileName); - } - throw e; - } - LOG.debug("Starting write of bundle {} to {}.", this.id, fileName); - } - - public void write(TableRow value) throws Exception { - coder.encode(value, out, Context.OUTER); - out.write(NEWLINE); - } - - public final KV close() throws IOException { - channel.close(); - return KV.of(fileName, out.getCount()); - } - } - @Override protected Coder getDefaultOutputCoder() { return VoidCoder.of(); @@ -1935,36 +1913,98 @@ private BigQueryServices getBigQueryServices() { } } + static class TableRowWriter { + private static final Coder CODER = TableRowJsonCoder.of(); + private static final byte[] NEWLINE = "\n".getBytes(StandardCharsets.UTF_8); + private final String tempFilePrefix; + private String id; + private String fileName; + private WritableByteChannel channel; + protected String mimeType = MimeTypes.TEXT; + private CountingOutputStream out; + + TableRowWriter(String basename) { + this.tempFilePrefix = basename; + } + + public final void open(String uId) throws Exception { + id = uId; + fileName = tempFilePrefix + id; + LOG.debug("Opening {}.", fileName); + channel = IOChannelUtils.create(fileName, mimeType); + try { + out = new CountingOutputStream(Channels.newOutputStream(channel)); + LOG.debug("Writing header to {}.", fileName); + } catch (Exception e) { + try { + LOG.error("Writing header to {} failed, closing channel.", fileName); + channel.close(); + } catch (IOException closeException) { + LOG.error("Closing channel for {} failed", fileName); + } + throw e; + } + LOG.debug("Starting write of bundle {} to {}.", this.id, fileName); + } + + public void write(TableRow value) throws Exception { + CODER.encode(value, out, Context.OUTER); + out.write(NEWLINE); + } + + public final KV close() throws IOException { + channel.close(); + return KV.of(fileName, out.getCount()); + } + } + /** * Partitions temporary files based on number of files and file sizes. */ - static class WritePartition extends DoFn>> { + static class WritePartition extends DoFn>> { private final PCollectionView>> resultsView; + private TupleTag>> multiPartitionsTag; + private TupleTag>> singlePartitionTag; - public WritePartition(PCollectionView>> resultsView) { + public WritePartition( + PCollectionView>> resultsView, + TupleTag>> multiPartitionsTag, + TupleTag>> singlePartitionTag) { this.resultsView = resultsView; + this.multiPartitionsTag = multiPartitionsTag; + this.singlePartitionTag = singlePartitionTag; } @Override public void processElement(ProcessContext c) throws Exception { List> results = Lists.newArrayList(c.sideInput(resultsView)); + if (results.isEmpty()) { + TableRowWriter writer = new TableRowWriter(c.element()); + writer.open(UUID.randomUUID().toString()); + results.add(writer.close()); + } + long partitionId = 0; int currNumFiles = 0; long currSizeBytes = 0; List currResults = Lists.newArrayList(); for (int i = 0; i < results.size(); ++i) { KV fileResult = results.get(i); - ++currNumFiles; - currSizeBytes += fileResult.getValue(); - currResults.add(fileResult.getKey()); - if (currNumFiles >= Bound.MAX_NUM_FILES - || currSizeBytes >= Bound.MAX_SIZE_BYTES - || i == results.size() - 1) { - c.output(KV.of(++partitionId, currResults)); + if (currNumFiles + 1 > Bound.MAX_NUM_FILES + || currSizeBytes + fileResult.getValue() > Bound.MAX_SIZE_BYTES) { + c.sideOutput(multiPartitionsTag, KV.of(++partitionId, currResults)); currResults = Lists.newArrayList(); currNumFiles = 0; currSizeBytes = 0; } + ++currNumFiles; + currSizeBytes += fileResult.getValue(); + currResults.add(fileResult.getKey()); + } + if (partitionId == 0) { + c.sideOutput(singlePartitionTag, KV.of(++partitionId, currResults)); + } else { + c.sideOutput(multiPartitionsTag, KV.of(++partitionId, currResults)); } } @@ -1975,43 +2015,56 @@ public void populateDisplayData(DisplayData.Builder builder) { } /** - * Writes partitions to separate temporary tables. + * Writes partitions to BigQuery tables. */ - static class WriteTempTables extends DoFn>>, String> { + static class WriteTables extends DoFn>>, String> { + private final boolean singlePartition; private final BigQueryServices bqServices; private final String jobIdToken; private final String tempFilePrefix; private final String jsonTableRef; private final String jsonSchema; + private final WriteDisposition writeDisposition; + private final CreateDisposition createDisposition; - public WriteTempTables( + public WriteTables( + boolean singlePartition, BigQueryServices bqServices, String jobIdToken, String tempFilePrefix, String jsonTableRef, - String jsonSchema) { + String jsonSchema, + WriteDisposition writeDisposition, + CreateDisposition createDisposition) { + this.singlePartition = singlePartition; this.bqServices = bqServices; this.jobIdToken = jobIdToken; this.tempFilePrefix = tempFilePrefix; this.jsonTableRef = jsonTableRef; this.jsonSchema = jsonSchema; + this.writeDisposition = writeDisposition; + this.createDisposition = createDisposition; } @Override public void processElement(ProcessContext c) throws Exception { List partition = Lists.newArrayList(c.element().getValue()).get(0); String jobIdPrefix = String.format(jobIdToken + "_%05d", c.element().getKey()); - TableReference ref = fromJsonString(jsonTableRef, TableReference.class) - .setTableId(jobIdPrefix); + TableReference ref = fromJsonString(jsonTableRef, TableReference.class); + if (!singlePartition) { + ref.setTableId(jobIdPrefix); + } + load( bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)), jobIdPrefix, ref, fromJsonString(jsonSchema, TableSchema.class), partition, - WriteDisposition.WRITE_EMPTY, - CreateDisposition.CREATE_IF_NEEDED); + writeDisposition, + createDisposition); c.output(toJsonString(ref)); + removeTemporaryFiles(c.getPipelineOptions(), partition); } @@ -2098,7 +2151,7 @@ public void populateDisplayData(DisplayData.Builder builder) { /** * Copies temporary tables to destination table. */ - static class WriteRename extends DoFn { + static class WriteRename extends DoFn { private final BigQueryServices bqServices; private final String jobIdToken; private final String jsonTableRef; @@ -2124,6 +2177,12 @@ public WriteRename( @Override public void processElement(ProcessContext c) throws Exception { List tempTablesJson = Lists.newArrayList(c.sideInput(tempTablesView)); + + // Do not copy if not temp tables are provided + if (tempTablesJson.size() == 0) { + return; + } + List tempTables = Lists.newArrayList(); for (String table : tempTablesJson) { tempTables.add(fromJsonString(table, TableReference.class)); @@ -2332,8 +2391,8 @@ public TableReference getOrCreateTable(BigQueryOptions options, String tableSpec TableSchema tableSchema = JSON_FACTORY.fromString(jsonTableSchema, TableSchema.class); Bigquery client = Transport.newBigQueryClient(options).build(); BigQueryTableInserter inserter = new BigQueryTableInserter(client, options); - inserter.getOrCreateTable(tableReference, WriteDisposition.WRITE_APPEND, - CreateDisposition.CREATE_IF_NEEDED, tableSchema); + inserter.getOrCreateTable(tableReference, Write.WriteDisposition.WRITE_APPEND, + Write.CreateDisposition.CREATE_IF_NEEDED, tableSchema); createdTables.add(tableSpec); } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index f3a0dce8f6f2..a53c45484ad7 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -17,7 +17,6 @@ */ package org.apache.beam.sdk.io.gcp.bigquery; -import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.Bound.MAX_RETRY_JOBS; import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.fromJsonString; import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.toJsonString; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; @@ -27,6 +26,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.when; @@ -49,7 +49,7 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WritePartition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteRename; -import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteTempTables; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteTables; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService; import org.apache.beam.sdk.options.BigQueryOptions; @@ -80,6 +80,7 @@ import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; import com.google.api.client.util.Data; import com.google.api.client.util.Strings; @@ -123,6 +124,8 @@ import java.io.FileFilter; import java.io.IOException; import java.io.Serializable; +import java.nio.file.Files; +import java.nio.file.Paths; import java.util.Collections; import java.util.List; import java.util.Map; @@ -137,6 +140,8 @@ @RunWith(JUnit4.class) public class BigQueryIOTest implements Serializable { + @Rule public transient TemporaryFolder tmpFolder = new TemporaryFolder(); + // Status.UNKNOWN maps to null private static final Map JOB_STATUS_MAP = ImmutableMap.of( Status.SUCCEEDED, new Job().setStatus(new JobStatus()), @@ -605,7 +610,6 @@ public void testCustomSink() throws Exception { p.run(); logged.verifyInfo("Starting BigQuery load job"); - logged.verifyInfo("Starting BigQuery copy job"); File tempDir = new File(bqOptions.getTempLocation()); assertEquals(0, tempDir.listFiles(new FileFilter() { @Override @@ -1250,12 +1254,32 @@ void cleanup(PipelineOptions options) throws Exception { p.run(); } + @Test + public void testWritePartitionEmptyData() throws Exception { + final long numFiles = 0; + final long fileSize = 0; + + // An empty file is created for no input data. One partition is needed. + final long expectedNumPartitions = 1; + testWritePartition(numFiles, fileSize, expectedNumPartitions); + } + + @Test + public void testWritePartitionSinglePartition() throws Exception { + final long numFiles = BigQueryIO.Write.Bound.MAX_NUM_FILES; + final long fileSize = 1; + + // One partition is needed. + final long expectedNumPartitions = 1; + testWritePartition(numFiles, fileSize, expectedNumPartitions); + } + @Test public void testWritePartitionManyFiles() throws Exception { final long numFiles = BigQueryIO.Write.Bound.MAX_NUM_FILES * 3; final long fileSize = 1; - // One partition is needed for each group of BigQueryWrite.MAX_NUM_FILES files + // One partition is needed for each group of BigQueryWrite.MAX_NUM_FILES files. final long expectedNumPartitions = 3; testWritePartition(numFiles, fileSize, expectedNumPartitions); } @@ -1265,7 +1289,7 @@ public void testWritePartitionLargeFileSize() throws Exception { final long numFiles = 10; final long fileSize = BigQueryIO.Write.Bound.MAX_SIZE_BYTES / 3; - // One partition is needed for each group of three files + // One partition is needed for each group of three files. final long expectedNumPartitions = 4; testWritePartition(numFiles, fileSize, expectedNumPartitions); } @@ -1285,18 +1309,29 @@ private void testWritePartition(long numFiles, long fileSize, long expectedNumPa files.add(KV.of(fileName, fileSize)); } + TupleTag>> multiPartitionsTag = + new TupleTag>>("multiPartitionsTag") {}; + TupleTag>> singlePartitionTag = + new TupleTag>>("singlePartitionTag") {}; + final PCollectionView>> filesView = PCollectionViews.iterableView( TestPipeline.create(), WindowingStrategy.globalDefault(), KvCoder.of(StringUtf8Coder.of(), VarLongCoder.of())); - WritePartition writePartition = new WritePartition(filesView); + WritePartition writePartition = + new WritePartition(filesView, multiPartitionsTag, singlePartitionTag); - DoFnTester>> tester = DoFnTester.of(writePartition); + DoFnTester>> tester = DoFnTester.of(writePartition); tester.setSideInput(filesView, GlobalWindow.INSTANCE, files); - tester.processElement(null); + tester.processElement(tmpFolder.getRoot().getAbsolutePath()); - List>> partitions = tester.takeOutputElements(); + List>> partitions; + if (expectedNumPartitions > 1) { + partitions = tester.takeSideOutputElements(multiPartitionsTag); + } else { + partitions = tester.takeSideOutputElements(singlePartitionTag); + } List partitionIds = Lists.newArrayList(); List partitionFileNames = Lists.newArrayList(); for (KV> partition : partitions) { @@ -1307,11 +1342,18 @@ private void testWritePartition(long numFiles, long fileSize, long expectedNumPa } assertEquals(expectedPartitionIds, partitionIds); - assertEquals(fileNames, partitionFileNames); + if (numFiles == 0) { + assertThat(partitionFileNames, Matchers.hasSize(1)); + assertTrue(Files.exists(Paths.get(partitionFileNames.get(0)))); + assertThat(Files.readAllBytes(Paths.get(partitionFileNames.get(0))).length, + Matchers.equalTo(0)); + } else { + assertEquals(fileNames, partitionFileNames); + } } @Test - public void testWriteTempTables() throws Exception { + public void testWriteTables() throws Exception { FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() .withJobService(new FakeJobService() .startJobReturns("done", "done", "done", "done") @@ -1336,14 +1378,17 @@ public void testWriteTempTables() throws Exception { expectedTempTables.add(String.format("{\"tableId\":\"%s_%05d\"}", jobIdToken, i)); } - WriteTempTables writeTempTables = new WriteTempTables( + WriteTables writeTables = new WriteTables( + false, fakeBqServices, jobIdToken, tempFilePrefix, jsonTable, - jsonSchema); + jsonSchema, + WriteDisposition.WRITE_EMPTY, + CreateDisposition.CREATE_IF_NEEDED); - DoFnTester>>, String> tester = DoFnTester.of(writeTempTables); + DoFnTester>>, String> tester = DoFnTester.of(writeTables); for (KV>> partition : partitions) { tester.processElement(partition); } @@ -1384,7 +1429,7 @@ public void testWriteRename() throws Exception { CreateDisposition.CREATE_IF_NEEDED, tempTablesView); - DoFnTester tester = DoFnTester.of(writeRename); + DoFnTester tester = DoFnTester.of(writeRename); tester.setSideInput(tempTablesView, GlobalWindow.INSTANCE, tempTables); tester.processElement(null);