From 7202328ef0e10895b7c80bb5d1ec39244b09c6b8 Mon Sep 17 00:00:00 2001 From: Reuven Lax Date: Tue, 19 Sep 2017 10:10:18 -0700 Subject: [PATCH 1/6] Move file deletion into subsequent ParDo. --- .../apache/beam/sdk/io/LocalFileSystem.java | 9 +- .../beam/sdk/io/gcp/bigquery/BatchLoads.java | 16 +- .../sdk/io/gcp/bigquery/TableDestination.java | 5 +- .../io/gcp/bigquery/WriteBundlesToFiles.java | 25 ++- .../bigquery/WriteGroupedRecordsToFiles.java | 7 +- .../beam/sdk/io/gcp/bigquery/WriteTables.java | 190 ++++++++++++------ .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 131 ++++++++---- .../sdk/io/gcp/bigquery/FakeJobService.java | 7 +- 8 files changed, 275 insertions(+), 115 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java index 5fe894d86902..3891b91f15c7 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java @@ -34,6 +34,7 @@ import java.nio.channels.ReadableByteChannel; import java.nio.channels.WritableByteChannel; import java.nio.file.Files; +import java.nio.file.NoSuchFileException; import java.nio.file.Path; import java.nio.file.PathMatcher; import java.nio.file.Paths; @@ -181,8 +182,12 @@ protected void rename( @Override protected void delete(Collection resourceIds) throws IOException { for (LocalResourceId resourceId : resourceIds) { - LOG.debug("Deleting file {}", resourceId); - Files.delete(resourceId.getPath()); + try { + Files.delete(resourceId.getPath()); + } catch (NoSuchFileException e) { + LOG.info("Ignoring failed deletion of file {} which already does not exist: {}", resourceId, + e); + } } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java index 76cf7e84d591..4099e4db8ad6 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java @@ -41,6 +41,7 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; import org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.Result; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; @@ -216,7 +217,6 @@ private WriteResult expandTriggered(PCollection> inpu .discardingFiredPanes()); PCollection> results = writeShardedFiles(inputInGlobalWindow, tempFilePrefixView); - // Apply the user's trigger before we start generating BigQuery load jobs. results = results.apply( @@ -480,15 +480,14 @@ private PCollection> writeTempTables( .apply("MultiPartitionsReshuffle", Reshuffle., List>of()) .apply( "MultiPartitionsWriteTables", - ParDo.of( - new WriteTables<>( + new WriteTables<>( false, bigQueryServices, jobIdTokenView, WriteDisposition.WRITE_EMPTY, CreateDisposition.CREATE_IF_NEEDED, - dynamicDestinations)) - .withSideInputs(sideInputs)); + sideInputs, + dynamicDestinations)); } // In the case where the files fit into a single load job, there's no need to write temporary @@ -510,15 +509,14 @@ void writeSinglePartition( .apply("SinglePartitionsReshuffle", Reshuffle., List>of()) .apply( "SinglePartitionWriteTables", - ParDo.of( - new WriteTables<>( + new WriteTables<>( true, bigQueryServices, jobIdTokenView, writeDisposition, createDisposition, - dynamicDestinations)) - .withSideInputs(sideInputs)); + sideInputs, + dynamicDestinations)); } private WriteResult writeResult(Pipeline p) { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java index ecc34d30c72c..ce2e7c76196e 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java @@ -104,11 +104,12 @@ public boolean equals(Object o) { } TableDestination other = (TableDestination) o; return Objects.equals(this.tableSpec, other.tableSpec) - && Objects.equals(this.tableDescription, other.tableDescription); + && Objects.equals(this.tableDescription, other.tableDescription) + && Objects.equals(this.jsonTimePartitioning, other.jsonTimePartitioning); } @Override public int hashCode() { - return Objects.hash(tableSpec, tableDescription); + return Objects.hash(tableSpec, tableDescription, jsonTimePartitioning); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java index e337f94aab9b..5fbca205586f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java @@ -30,6 +30,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.ThreadLocalRandom; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; @@ -75,7 +76,7 @@ class WriteBundlesToFiles * The result of the {@link WriteBundlesToFiles} transform. Corresponds to a single output file, * and encapsulates the table it is destined to as well as the file byte size. */ - public static final class Result implements Serializable { + static final class Result implements Serializable { private static final long serialVersionUID = 1L; public final String filename; public final Long fileByteSize; @@ -87,6 +88,28 @@ public Result(String filename, Long fileByteSize, DestinationT destination) { this.fileByteSize = fileByteSize; this.destination = destination; } + + @Override + public String toString() { + return String.format("filename: %s fileByeSize: %d destination: %s", + filename, fileByteSize, destination); + } + + @Override + public boolean equals(Object other) { + if (other instanceof Result) { + Result o = (Result) other; + return Objects.equals(this.filename, o.filename) + && Objects.equals(this.fileByteSize, o.fileByteSize) + && Objects.equals(this.destination, o.destination); + } + return false; + } + + @Override + public int hashCode() { + return Objects.hash(filename, fileByteSize, destination); + } } /** a coder for the {@link Result} class. */ diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteGroupedRecordsToFiles.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteGroupedRecordsToFiles.java index 887cb9377442..e82b29d3d09b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteGroupedRecordsToFiles.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteGroupedRecordsToFiles.java @@ -48,18 +48,21 @@ class WriteGroupedRecordsToFiles public void processElement(ProcessContext c) throws Exception { String tempFilePrefix = c.sideInput(this.tempFilePrefix); TableRowWriter writer = new TableRowWriter(tempFilePrefix); - try (TableRowWriter ignored = writer) { + try { for (TableRow tableRow : c.element().getValue()) { if (writer.getByteSize() > maxFileSize) { writer.close(); + writer = new TableRowWriter(tempFilePrefix); TableRowWriter.Result result = writer.getResult(); c.output(new WriteBundlesToFiles.Result<>( result.resourceId.toString(), result.byteSize, c.element().getKey().getKey())); - writer = new TableRowWriter(tempFilePrefix); } writer.write(tableRow); } + } finally { + writer.close(); } + TableRowWriter.Result result = writer.getResult(); c.output( new WriteBundlesToFiles.Result<>( diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java index a646f17513c2..aa186add1ee3 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java @@ -33,7 +33,11 @@ import java.util.List; import java.util.Map; import javax.annotation.Nullable; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.io.fs.MatchResult; import org.apache.beam.sdk.io.fs.MoveOptions; import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.Status; @@ -42,9 +46,23 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.Values; +import org.apache.beam.sdk.transforms.WithKeys; +import org.apache.beam.sdk.transforms.windowing.AfterPane; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindows; +import org.apache.beam.sdk.transforms.windowing.Repeatedly; +import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.ShardedKey; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,7 +79,8 @@ * {@link KV} maps the final table to itself. */ class WriteTables - extends DoFn, List>, KV> { + extends PTransform, List>>, + PCollection>> { private static final Logger LOG = LoggerFactory.getLogger(WriteTables.class); private final boolean singlePartition; @@ -70,7 +89,84 @@ class WriteTables private final WriteDisposition firstPaneWriteDisposition; private final CreateDisposition firstPaneCreateDisposition; private final DynamicDestinations dynamicDestinations; - private Map jsonSchemas = Maps.newHashMap(); + private final List> sideInputs; + private final TupleTag> mainOutputTag; + private final TupleTag temporaryFilesTag; + + + private class WriteTablesDoFn + extends DoFn, List>, KV> { + private Map jsonSchemas = Maps.newHashMap(); + + @StartBundle + public void startBundle(StartBundleContext c) { + // Clear the map on each bundle so we can notice side-input updates. + // (alternative is to use a cache with a TTL). + jsonSchemas.clear(); + } + + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + dynamicDestinations.setSideInputAccessorFromProcessContext(c); + DestinationT destination = c.element().getKey().getKey(); + TableSchema tableSchema; + String jsonSchema = jsonSchemas.get(destination); + if (jsonSchema != null) { + tableSchema = BigQueryHelpers.fromJsonString(jsonSchema, TableSchema.class); + } else { + tableSchema = dynamicDestinations.getSchema(destination); + if (tableSchema != null) { + jsonSchemas.put(destination, BigQueryHelpers.toJsonString(tableSchema)); + } + } + + TableDestination tableDestination = dynamicDestinations.getTable(destination); + TableReference tableReference = tableDestination.getTableReference(); + if (Strings.isNullOrEmpty(tableReference.getProjectId())) { + tableReference.setProjectId( + c.getPipelineOptions().as(BigQueryOptions.class).getProject()); + tableDestination = new TableDestination( + tableReference, tableDestination.getTableDescription()); + } + + Integer partition = c.element().getKey().getShardNumber(); + List partitionFiles = Lists.newArrayList(c.element().getValue()); + String jobIdPrefix = BigQueryHelpers.createJobId( + c.sideInput(jobIdToken), tableDestination, partition, c.pane().getIndex()); + + if (!singlePartition) { + tableReference.setTableId(jobIdPrefix); + } + + WriteDisposition writeDisposition = + (c.pane().getIndex() == 0) ? firstPaneWriteDisposition : WriteDisposition.WRITE_APPEND; + CreateDisposition createDisposition = + (c.pane().getIndex() == 0) ? firstPaneCreateDisposition : CreateDisposition.CREATE_NEVER; + load( + bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)), + bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)), + jobIdPrefix, + tableReference, + tableDestination.getTimePartitioning(), + tableSchema, + partitionFiles, + writeDisposition, + createDisposition, + tableDestination.getTableDescription()); + c.output( + mainOutputTag, KV.of(tableDestination, BigQueryHelpers.toJsonString(tableReference))); + for (String file : partitionFiles) { + c.output(temporaryFilesTag, file); + } + } + } + + private class GarbageCollectTemporaryFiles extends DoFn, Void> { + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + removeTemporaryFiles(c.element()); + } + } public WriteTables( boolean singlePartition, @@ -78,74 +174,48 @@ public WriteTables( PCollectionView jobIdToken, WriteDisposition writeDisposition, CreateDisposition createDisposition, + List> sideInputs, DynamicDestinations dynamicDestinations) { this.singlePartition = singlePartition; this.bqServices = bqServices; this.jobIdToken = jobIdToken; this.firstPaneWriteDisposition = writeDisposition; this.firstPaneCreateDisposition = createDisposition; + this.sideInputs = sideInputs; this.dynamicDestinations = dynamicDestinations; + this.mainOutputTag = new TupleTag<>("WriteTablesMainOutput"); + this.temporaryFilesTag = new TupleTag<>("TemporaryFiles"); } - @StartBundle - public void startBundle(StartBundleContext c) { - // Clear the map on each bundle so we can notice side-input updates. - // (alternative is to use a cache with a TTL). - jsonSchemas.clear(); - } + @Override + public PCollection> expand( + PCollection, List>> input) { + PCollectionTuple writeTablesOutputs = input.apply(ParDo.of(new WriteTablesDoFn()) + .withSideInputs(sideInputs) + .withOutputTags(mainOutputTag, TupleTagList.of(temporaryFilesTag))); - @ProcessElement - public void processElement(ProcessContext c) throws Exception { - dynamicDestinations.setSideInputAccessorFromProcessContext(c); - DestinationT destination = c.element().getKey().getKey(); - TableSchema tableSchema; - String jsonSchema = jsonSchemas.get(destination); - if (jsonSchema != null) { - tableSchema = BigQueryHelpers.fromJsonString(jsonSchema, TableSchema.class); - } else { - tableSchema = dynamicDestinations.getSchema(destination); - if (tableSchema != null) { - jsonSchemas.put(destination, BigQueryHelpers.toJsonString(tableSchema)); - } - } - - TableDestination tableDestination = dynamicDestinations.getTable(destination); - TableReference tableReference = tableDestination.getTableReference(); - if (Strings.isNullOrEmpty(tableReference.getProjectId())) { - tableReference.setProjectId( - c.getPipelineOptions().as(BigQueryOptions.class).getProject()); - tableDestination = new TableDestination( - tableReference, tableDestination.getTableDescription()); - } + // Garbage collect temporary files. + // We mustn't start garbage collecting files until we are assured that the WriteTablesDoFn has + // succeeded in loading those files and won't be retried. Otherwise, we might fail part of the + // way through deleting temporary files, and retry WriteTablesDoFn. This will then fail due + // to missing files, causing either the entire workflow to fail or get stuck (depending on how + // the runner handles persistent failures). + writeTablesOutputs + .get(temporaryFilesTag) + .setCoder(StringUtf8Coder.of()) + .apply(WithKeys.of((Void) null)) + .setCoder(KvCoder.of(VoidCoder.of(), StringUtf8Coder.of())) + .apply(Window.>into(new GlobalWindows()) + .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))) + .discardingFiredPanes()) + .apply(GroupByKey.create()) + .apply(Values.>create()) + .apply(ParDo.of(new GarbageCollectTemporaryFiles())); - Integer partition = c.element().getKey().getShardNumber(); - List partitionFiles = Lists.newArrayList(c.element().getValue()); - String jobIdPrefix = BigQueryHelpers.createJobId( - c.sideInput(jobIdToken), tableDestination, partition, c.pane().getIndex()); + return writeTablesOutputs.get(mainOutputTag); + } - if (!singlePartition) { - tableReference.setTableId(jobIdPrefix); - } - WriteDisposition writeDisposition = - (c.pane().getIndex() == 0) ? firstPaneWriteDisposition : WriteDisposition.WRITE_APPEND; - CreateDisposition createDisposition = - (c.pane().getIndex() == 0) ? firstPaneCreateDisposition : CreateDisposition.CREATE_NEVER; - load( - bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)), - bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)), - jobIdPrefix, - tableReference, - tableDestination.getTimePartitioning(), - tableSchema, - partitionFiles, - writeDisposition, - createDisposition, - tableDestination.getTableDescription()); - c.output(KV.of(tableDestination, BigQueryHelpers.toJsonString(tableReference))); - - removeTemporaryFiles(partitionFiles); - } private void load( JobService jobService, @@ -208,11 +278,11 @@ private void load( BigQueryHelpers.jobToPrettyString(lastFailedLoadJob))); } - static void removeTemporaryFiles(Collection files) throws IOException { + static void removeTemporaryFiles(Iterable files) throws IOException { ImmutableList.Builder fileResources = ImmutableList.builder(); - for (String file: files) { + for (String file : files) { fileResources.add(FileSystems.matchNewResource(file, false/* isDirectory */)); } - FileSystems.delete(fileResources.build(), MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES); + FileSystems.delete(fileResources.build()); } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index ad4cbaa82300..41b4a4471261 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -488,6 +488,38 @@ public void processElement(ProcessContext c) throws Exception { abstract static class StringIntegerDestinations extends DynamicDestinations { } + @Test + public void testWriteEmptyPCollection() throws Exception { + BigQueryOptions bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class); + bqOptions.setProject("project-id"); + bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath()); + + FakeDatasetService datasetService = new FakeDatasetService(); + FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() + .withJobService(new FakeJobService()) + .withDatasetService(datasetService); + + datasetService.createDataset("project-id", "dataset-id", "", ""); + + Pipeline p = TestPipeline.create(bqOptions); + + TableSchema schema = new TableSchema() + .setFields( + ImmutableList.of( + new TableFieldSchema().setName("number").setType("INTEGER"))); + + p.apply(Create.empty(TableRowJsonCoder.of())) + .apply(BigQueryIO.writeTableRows() + .to("project-id:dataset-id.table-id") + .withTestServices(fakeBqServices) + .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND) + .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) + .withSchema(schema) + .withoutValidation()); + p.run(); + testNumFiles(new File(bqOptions.getTempLocation()), 0); + } + @Test public void testWriteDynamicDestinationsBatch() throws Exception { writeDynamicDestinations(false); @@ -635,6 +667,7 @@ private void verifySideInputs() { assertThat(datasetService.getAllRows("project-id", "dataset-id", "userid-" + entry.getKey()), containsInAnyOrder(Iterables.toArray(entry.getValue(), TableRow.class))); } + testNumFiles(new File(bqOptions.getTempLocation()), 0); } @Test @@ -684,6 +717,7 @@ public void testTimePartitioning(BigQueryIO.Write.Method insertMethod) throws Ex BigQueryHelpers.parseTableSpec("project-id:dataset-id.table-id")); assertEquals(schema, table.getSchema()); assertEquals(timePartitioning, table.getTimePartitioning()); + testNumFiles(new File(bqOptions.getTempLocation()), 0); } @Test @@ -737,6 +771,7 @@ public void testTriggeredFileLoads() throws Exception { assertThat( datasetService.getAllRows("project-id", "dataset-id", "table-id"), containsInAnyOrder(Iterables.toArray(elements, TableRow.class))); + testNumFiles(new File(bqOptions.getTempLocation()), 0); } @Test @@ -836,6 +871,7 @@ public void testRetryPolicy() throws Exception { // Only row1 and row3 were successfully inserted. assertThat(datasetService.getAllRows("project-id", "dataset-id", "table-id"), containsInAnyOrder(row1, row3)); + testNumFiles(new File(bqOptions.getTempLocation()), 0); } @Test @@ -908,6 +944,7 @@ public void testStreamingWrite() throws Exception { new TableRow().set("name", "b").set("number", 2), new TableRow().set("name", "c").set("number", 3), new TableRow().set("name", "d").set("number", 4))); + testNumFiles(new File(bqOptions.getTempLocation()), 0); } /** @@ -1128,6 +1165,7 @@ public TableRow apply(Integer i) { new TableRow().set("name", String.format("number%d", i)).set("number", i), new TableRow().set("name", String.format("number%d", i + 5)).set("number", i + 5))); } + testNumFiles(new File(bqOptions.getTempLocation()), 0); } @Test @@ -1142,12 +1180,20 @@ public void testWriteUnknown() throws Exception { .withDatasetService(datasetService); datasetService.createDataset("project-id", "dataset-id", "", ""); Pipeline p = TestPipeline.create(bqOptions); + + TableSchema schema = new TableSchema() + .setFields( + ImmutableList.of( + new TableFieldSchema().setName("name").setType("STRING"), + new TableFieldSchema().setName("number").setType("INTEGER"))); + p.apply(Create.of( new TableRow().set("name", "a").set("number", 1), new TableRow().set("name", "b").set("number", 2), new TableRow().set("name", "c").set("number", 3)) .withCoder(TableRowJsonCoder.of())) .apply(BigQueryIO.writeTableRows().to("project-id:dataset-id.table-id") + .withSchema(schema) .withCreateDisposition(CreateDisposition.CREATE_NEVER) .withTestServices(fakeBqServices) .withoutValidation()); @@ -1160,6 +1206,7 @@ public void testWriteUnknown() throws Exception { File tempDir = new File(bqOptions.getTempLocation()); testNumFiles(tempDir, 0); } + testNumFiles(new File(bqOptions.getTempLocation()), 0); } @Test @@ -1173,6 +1220,12 @@ public void testWriteFailedJobs() throws Exception { .withJobService(new FakeJobService()) .withDatasetService(datasetService); + TableSchema schema = new TableSchema() + .setFields( + ImmutableList.of( + new TableFieldSchema().setName("name").setType("STRING"), + new TableFieldSchema().setName("number").setType("INTEGER"))); + Pipeline p = TestPipeline.create(bqOptions); p.apply(Create.of( new TableRow().set("name", "a").set("number", 1), @@ -1180,6 +1233,7 @@ public void testWriteFailedJobs() throws Exception { new TableRow().set("name", "c").set("number", 3)) .withCoder(TableRowJsonCoder.of())) .apply(BigQueryIO.writeTableRows().to("dataset-id.table-id") + .withSchema(schema) .withCreateDisposition(CreateDisposition.CREATE_NEVER) .withTestServices(fakeBqServices) .withoutValidation()); @@ -2023,25 +2077,31 @@ public TableDestination getTable(String destination) { @Override public TableSchema getSchema(String destination) { - return null; + return new TableSchema().setFields( + ImmutableList.of( + new TableFieldSchema().setName("name").setType("STRING"))); } } @Test public void testWriteTables() throws Exception { - p.enableAbandonedNodeEnforcement(false); + BigQueryOptions bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class); + bqOptions.setProject("project-id"); + bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath()); FakeDatasetService datasetService = new FakeDatasetService(); FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() .withJobService(new FakeJobService()) .withDatasetService(datasetService); datasetService.createDataset("project-id", "dataset-id", "", ""); + + Pipeline p = TestPipeline.create(bqOptions); + long numTables = 3; long numPartitions = 3; long numFilesPerPartition = 10; - String jobIdToken = "jobIdToken"; - String stepUuid = "stepUuid"; - Map> expectedTempTables = Maps.newHashMap(); + String jobIdToken = "jobId"; + final Multimap expectedTempTables = ArrayListMultimap.create(); Path baseDir = Files.createTempDirectory(tempFolder, "testWriteTables"); @@ -2055,35 +2115,29 @@ public void testWriteTables() throws Exception { for (int k = 0; k < numFilesPerPartition; ++k) { String filename = Paths.get(baseDir.toString(), String.format("files0x%08x_%05d", tempTableId.hashCode(), k)).toString(); - ResourceId fileResource = - FileSystems.matchNewResource(filename, false /* isDirectory */); - try (WritableByteChannel channel = FileSystems.create(fileResource, MimeTypes.TEXT)) { - try (OutputStream output = Channels.newOutputStream(channel)) { - TableRow tableRow = new TableRow().set("name", tableName); - TableRowJsonCoder.of().encode(tableRow, output, Context.OUTER); - output.write("\n".getBytes(StandardCharsets.UTF_8)); - } + TableRowWriter writer = new TableRowWriter(filename); + try (TableRowWriter ignored = writer) { + TableRow tableRow = new TableRow().set("name", tableName); + writer.write(tableRow); } - filesPerPartition.add(filename); + filesPerPartition.add(writer.getResult().resourceId.toString()); } partitions.add(KV.of(ShardedKey.of(tableDestination.getTableSpec(), j), filesPerPartition)); - List expectedTables = expectedTempTables.get(tableDestination); - if (expectedTables == null) { - expectedTables = Lists.newArrayList(); - expectedTempTables.put(tableDestination, expectedTables); - } String json = String.format( "{\"datasetId\":\"dataset-id\",\"projectId\":\"project-id\",\"tableId\":\"%s\"}", tempTableId); - expectedTables.add(json); + expectedTempTables.put(tableDestination, json); } } + PCollection, List>> writeTablesInput = + p.apply(Create.of(partitions)); PCollectionView jobIdTokenView = p .apply("CreateJobId", Create.of("jobId")) .apply(View.asSingleton()); + List> sideInputs = ImmutableList.>of(jobIdTokenView); WriteTables writeTables = new WriteTables<>( @@ -2092,26 +2146,29 @@ public void testWriteTables() throws Exception { jobIdTokenView, WriteDisposition.WRITE_EMPTY, CreateDisposition.CREATE_IF_NEEDED, + sideInputs, new IdentityDynamicTables()); - DoFnTester, List>, - KV> tester = DoFnTester.of(writeTables); - tester.setSideInput(jobIdTokenView, GlobalWindow.INSTANCE, jobIdToken); - tester.getPipelineOptions().setTempLocation("tempLocation"); - for (KV, List> partition : partitions) { - tester.processElement(partition); - } + PCollection> writeTablesOutput = + writeTablesInput.apply(writeTables); - Map> tempTablesResult = Maps.newHashMap(); - for (KV element : tester.takeOutputElements()) { - List tables = tempTablesResult.get(element.getKey()); - if (tables == null) { - tables = Lists.newArrayList(); - tempTablesResult.put(element.getKey(), tables); - } - tables.add(element.getValue()); - } - assertEquals(expectedTempTables, tempTablesResult); + PAssert.thatMultimap(writeTablesOutput) + .satisfies( + new SerializableFunction>, Void>() { + @Override + public Void apply(Map> input) { + assertEquals(input.keySet(), expectedTempTables.keySet()); + for (Map.Entry> entry : input.entrySet()) { + @SuppressWarnings("unchecked") + String[] expectedValues = Iterables.toArray( + expectedTempTables.get(entry.getKey()), String.class); + assertThat(entry.getValue(), containsInAnyOrder(expectedValues)); + } + return null; + } + }); + p.run(); + testNumFiles(baseDir.toFile(), 0); } @Test diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java index cc600d1a5134..02a2a71d1531 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java @@ -112,6 +112,9 @@ public void startLoadJob(JobReference jobRef, JobConfigurationLoad loadConfig) throws InterruptedException, IOException { synchronized (allJobs) { verifyUniqueJobId(jobRef.getJobId()); + if (loadConfig.getSchema() == null) { + throw new IOException("No schema specified on job or table: " + jobRef.getJobId()); + } Job job = new Job(); job.setJobReference(jobRef); job.setConfiguration(new JobConfiguration().setLoad(loadConfig)); @@ -129,8 +132,7 @@ public void startLoadJob(JobReference jobRef, JobConfigurationLoad loadConfig) filename + ThreadLocalRandom.current().nextInt(), false /* isDirectory */)); } - FileSystems.copy(sourceFiles.build(), loadFiles.build(), - MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES); + FileSystems.copy(sourceFiles.build(), loadFiles.build()); filesForLoadJobs.put(jobRef.getProjectId(), jobRef.getJobId(), loadFiles.build()); } @@ -325,6 +327,7 @@ private JobStatus runLoadJob(JobReference jobRef, JobConfigurationLoad load) rows.addAll(readRows(filename.toString())); } datasetService.insertAll(destination, rows, null); + FileSystems.delete(sourceFiles); return new JobStatus().setState("DONE"); } From b097094f2668f7801c75a6853418952701f21af5 Mon Sep 17 00:00:00 2001 From: Reuven Lax Date: Fri, 22 Sep 2017 17:27:31 -0700 Subject: [PATCH 2/6] Remove unused imports. --- .../org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java | 1 - .../org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java | 4 ---- .../apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java | 7 ------- .../apache/beam/sdk/io/gcp/bigquery/FakeJobService.java | 1 - 4 files changed, 13 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java index 4099e4db8ad6..6d832e47347f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java @@ -41,7 +41,6 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; import org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.Result; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java index aa186add1ee3..f8ed7965aa9a 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java @@ -29,7 +29,6 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import java.io.IOException; -import java.util.Collection; import java.util.List; import java.util.Map; import javax.annotation.Nullable; @@ -37,8 +36,6 @@ import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.io.FileSystems; -import org.apache.beam.sdk.io.fs.MatchResult; -import org.apache.beam.sdk.io.fs.MoveOptions; import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.Status; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; @@ -52,7 +49,6 @@ import org.apache.beam.sdk.transforms.Values; import org.apache.beam.sdk.transforms.WithKeys; import org.apache.beam.sdk.transforms.windowing.AfterPane; -import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; import org.apache.beam.sdk.transforms.windowing.Repeatedly; import org.apache.beam.sdk.transforms.windowing.Window; diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index 41b4a4471261..b14bd789631b 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -63,9 +63,6 @@ import java.io.OutputStream; import java.io.Serializable; import java.math.BigDecimal; -import java.nio.channels.Channels; -import java.nio.channels.WritableByteChannel; -import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; @@ -81,16 +78,13 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.Coder.Context; import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.ShardedKeyCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.io.BoundedSource; -import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.GenerateSequence; -import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.JsonSchemaToTableSchema; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.Method; @@ -130,7 +124,6 @@ import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.transforms.windowing.WindowMappingFn; import org.apache.beam.sdk.util.CoderUtils; -import org.apache.beam.sdk.util.MimeTypes; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java index 02a2a71d1531..7278c40b7bd5 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java @@ -63,7 +63,6 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.Coder.Context; import org.apache.beam.sdk.io.FileSystems; -import org.apache.beam.sdk.io.fs.MoveOptions; import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; From ebbb9a65831ab9506f259072932bb60795ff1638 Mon Sep 17 00:00:00 2001 From: Reuven Lax Date: Fri, 22 Sep 2017 17:43:52 -0700 Subject: [PATCH 3/6] Fix test. --- .../org/apache/beam/sdk/io/FileSystemsTest.java | 15 +-------------- 1 file changed, 1 insertion(+), 14 deletions(-) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileSystemsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileSystemsTest.java index a75c54dd7fd1..3e393bf3d4be 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileSystemsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileSystemsTest.java @@ -82,18 +82,6 @@ public void testVerifySchemesAreUnique() throws Exception { new LocalFileSystemRegistrar())); } - @Test - public void testDeleteThrowsNoSuchFileException() throws Exception { - Path existingPath = temporaryFolder.newFile().toPath(); - Path nonExistentPath = existingPath.resolveSibling("non-existent"); - - createFileWithContent(existingPath, "content1"); - - thrown.expect(NoSuchFileException.class); - FileSystems.delete( - toResourceIds(ImmutableList.of(existingPath, nonExistentPath), false /* isDirectory */)); - } - @Test public void testDeleteIgnoreMissingFiles() throws Exception { Path existingPath = temporaryFolder.newFile().toPath(); @@ -102,8 +90,7 @@ public void testDeleteIgnoreMissingFiles() throws Exception { createFileWithContent(existingPath, "content1"); FileSystems.delete( - toResourceIds(ImmutableList.of(existingPath, nonExistentPath), false /* isDirectory */), - MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES); + toResourceIds(ImmutableList.of(existingPath, nonExistentPath), false /* isDirectory */)); } @Test From 1c38b5ecde0957c67e156211116d9094b4eacfc4 Mon Sep 17 00:00:00 2001 From: Reuven Lax Date: Sat, 23 Sep 2017 10:08:32 -0700 Subject: [PATCH 4/6] Address comments. --- .../io/gcp/bigquery/WriteBundlesToFiles.java | 15 +++++++------ .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 21 ++++--------------- .../sdk/io/gcp/bigquery/FakeJobService.java | 3 --- 3 files changed, 13 insertions(+), 26 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java index 5fbca205586f..977ae9e7cc16 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java @@ -89,12 +89,6 @@ public Result(String filename, Long fileByteSize, DestinationT destination) { this.destination = destination; } - @Override - public String toString() { - return String.format("filename: %s fileByeSize: %d destination: %s", - filename, fileByteSize, destination); - } - @Override public boolean equals(Object other) { if (other instanceof Result) { @@ -110,6 +104,15 @@ public boolean equals(Object other) { public int hashCode() { return Objects.hash(filename, fileByteSize, destination); } + + @Override + public String toString() { + return "Result{" + + "filename='" + filename + '\'' + + ", fileByteSize=" + fileByteSize + + ", destination=" + destination + + '}'; + } } /** a coder for the {@link Result} class. */ diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index b14bd789631b..dbf9c46730c8 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -50,6 +50,7 @@ import com.google.api.services.bigquery.model.TimePartitioning; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.HashBasedTable; +import com.google.common.collect.HashMultimap; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; @@ -510,6 +511,8 @@ public void testWriteEmptyPCollection() throws Exception { .withSchema(schema) .withoutValidation()); p.run(); + checkNotNull(datasetService.getTable( + BigQueryHelpers.parseTableSpec("project-id:dataset-id.table-id"))); testNumFiles(new File(bqOptions.getTempLocation()), 0); } @@ -1174,19 +1177,12 @@ public void testWriteUnknown() throws Exception { datasetService.createDataset("project-id", "dataset-id", "", ""); Pipeline p = TestPipeline.create(bqOptions); - TableSchema schema = new TableSchema() - .setFields( - ImmutableList.of( - new TableFieldSchema().setName("name").setType("STRING"), - new TableFieldSchema().setName("number").setType("INTEGER"))); - p.apply(Create.of( new TableRow().set("name", "a").set("number", 1), new TableRow().set("name", "b").set("number", 2), new TableRow().set("name", "c").set("number", 3)) .withCoder(TableRowJsonCoder.of())) .apply(BigQueryIO.writeTableRows().to("project-id:dataset-id.table-id") - .withSchema(schema) .withCreateDisposition(CreateDisposition.CREATE_NEVER) .withTestServices(fakeBqServices) .withoutValidation()); @@ -1213,12 +1209,6 @@ public void testWriteFailedJobs() throws Exception { .withJobService(new FakeJobService()) .withDatasetService(datasetService); - TableSchema schema = new TableSchema() - .setFields( - ImmutableList.of( - new TableFieldSchema().setName("name").setType("STRING"), - new TableFieldSchema().setName("number").setType("INTEGER"))); - Pipeline p = TestPipeline.create(bqOptions); p.apply(Create.of( new TableRow().set("name", "a").set("number", 1), @@ -1226,7 +1216,6 @@ public void testWriteFailedJobs() throws Exception { new TableRow().set("name", "c").set("number", 3)) .withCoder(TableRowJsonCoder.of())) .apply(BigQueryIO.writeTableRows().to("dataset-id.table-id") - .withSchema(schema) .withCreateDisposition(CreateDisposition.CREATE_NEVER) .withTestServices(fakeBqServices) .withoutValidation()); @@ -2070,9 +2059,7 @@ public TableDestination getTable(String destination) { @Override public TableSchema getSchema(String destination) { - return new TableSchema().setFields( - ImmutableList.of( - new TableFieldSchema().setName("name").setType("STRING"))); + return null; } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java index 7278c40b7bd5..f13a7ab7180b 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java @@ -111,9 +111,6 @@ public void startLoadJob(JobReference jobRef, JobConfigurationLoad loadConfig) throws InterruptedException, IOException { synchronized (allJobs) { verifyUniqueJobId(jobRef.getJobId()); - if (loadConfig.getSchema() == null) { - throw new IOException("No schema specified on job or table: " + jobRef.getJobId()); - } Job job = new Job(); job.setJobReference(jobRef); job.setConfiguration(new JobConfiguration().setLoad(loadConfig)); From ea7fcc4c7239456bb792eb7f91c56453c340344e Mon Sep 17 00:00:00 2001 From: Reuven Lax Date: Sun, 24 Sep 2017 23:01:18 -0400 Subject: [PATCH 5/6] Fix style issues. --- .../beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java | 10 +++++----- .../beam/sdk/io/gcp/bigquery/BigQueryIOTest.java | 1 - 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java index 977ae9e7cc16..017d5c15ceab 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java @@ -107,11 +107,11 @@ public int hashCode() { @Override public String toString() { - return "Result{" + - "filename='" + filename + '\'' + - ", fileByteSize=" + fileByteSize + - ", destination=" + destination + - '}'; + return "Result{" + + "filename='" + filename + '\'' + + ", fileByteSize=" + fileByteSize + + ", destination=" + destination + + '}'; } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index dbf9c46730c8..5500b12d5736 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -50,7 +50,6 @@ import com.google.api.services.bigquery.model.TimePartitioning; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.HashBasedTable; -import com.google.common.collect.HashMultimap; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; From 274f188f4e768f7388c9285059c1c7dafb90f91c Mon Sep 17 00:00:00 2001 From: Reuven Lax Date: Tue, 26 Sep 2017 10:38:40 -0400 Subject: [PATCH 6/6] Address comments. --- .../beam/sdk/io/gcp/bigquery/WriteTables.java | 54 ++++++++++--------- 1 file changed, 28 insertions(+), 26 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java index f8ed7965aa9a..ed72971b3e91 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java @@ -89,6 +89,31 @@ class WriteTables private final TupleTag> mainOutputTag; private final TupleTag temporaryFilesTag; + public WriteTables( + boolean singlePartition, + BigQueryServices bqServices, + PCollectionView jobIdToken, + WriteDisposition writeDisposition, + CreateDisposition createDisposition, + List> sideInputs, + DynamicDestinations dynamicDestinations) { + this.singlePartition = singlePartition; + this.bqServices = bqServices; + this.jobIdToken = jobIdToken; + this.firstPaneWriteDisposition = writeDisposition; + this.firstPaneCreateDisposition = createDisposition; + this.sideInputs = sideInputs; + this.dynamicDestinations = dynamicDestinations; + this.mainOutputTag = new TupleTag<>("WriteTablesMainOutput"); + this.temporaryFilesTag = new TupleTag<>("TemporaryFiles"); + } + + private class GarbageCollectTemporaryFiles extends DoFn, Void> { + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + removeTemporaryFiles(c.element()); + } + } private class WriteTablesDoFn extends DoFn, List>, KV> { @@ -157,32 +182,6 @@ public void processElement(ProcessContext c) throws Exception { } } - private class GarbageCollectTemporaryFiles extends DoFn, Void> { - @ProcessElement - public void processElement(ProcessContext c) throws Exception { - removeTemporaryFiles(c.element()); - } - } - - public WriteTables( - boolean singlePartition, - BigQueryServices bqServices, - PCollectionView jobIdToken, - WriteDisposition writeDisposition, - CreateDisposition createDisposition, - List> sideInputs, - DynamicDestinations dynamicDestinations) { - this.singlePartition = singlePartition; - this.bqServices = bqServices; - this.jobIdToken = jobIdToken; - this.firstPaneWriteDisposition = writeDisposition; - this.firstPaneCreateDisposition = createDisposition; - this.sideInputs = sideInputs; - this.dynamicDestinations = dynamicDestinations; - this.mainOutputTag = new TupleTag<>("WriteTablesMainOutput"); - this.temporaryFilesTag = new TupleTag<>("TemporaryFiles"); - } - @Override public PCollection> expand( PCollection, List>> input) { @@ -199,8 +198,11 @@ public PCollection> expand( writeTablesOutputs .get(temporaryFilesTag) .setCoder(StringUtf8Coder.of()) + // Add as key for the GroupByKey. .apply(WithKeys.of((Void) null)) .setCoder(KvCoder.of(VoidCoder.of(), StringUtf8Coder.of())) + // Delete files fairly quickly. In practice elementCountAtLeast(1) will trigger full bundles + // so we will get batching in deletes. .apply(Window.>into(new GlobalWindows()) .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))) .discardingFiredPanes())