From e7dc19e118f00e3706c15e19ece7e0f34de1d72e Mon Sep 17 00:00:00 2001 From: Reuven Lax Date: Fri, 2 Jun 2017 18:29:59 -0700 Subject: [PATCH 1/2] Fix crash. --- .../apache/beam/sdk/io/gcp/bigquery/BatchLoads.java | 6 +++++- .../beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java | 2 ++ .../beam/sdk/io/gcp/bigquery/WritePartition.java | 8 ++++---- .../beam/sdk/io/gcp/bigquery/BigQueryIOTest.java | 11 +++++++---- 4 files changed, 18 insertions(+), 9 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java index 3686f992d882..12a2781a519f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java @@ -251,12 +251,16 @@ public String apply(String input) { // loading. PCollection singleton = p.apply("singleton", Create.of((Void) null).withCoder(VoidCoder.of())); + DestinationT singletonTableDestination = null; + if (singletonTable) { + singletonTableDestination = dynamicDestinations.getDestination(null); + } PCollectionTuple partitions = singleton.apply( "WritePartition", ParDo.of( new WritePartition<>( - singletonTable, + singletonTableDestination, tempFilePrefix, resultsView, multiPartitionsTag, diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java index f0140392f4cb..0b5f54bcedb7 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.io.gcp.bigquery; +import static com.google.common.base.Preconditions.checkNotNull; import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.resolveTempLocation; import com.google.api.services.bigquery.model.TableRow; @@ -79,6 +80,7 @@ public static final class Result implements Serializable { public final DestinationT destination; public Result(String filename, Long fileByteSize, DestinationT destination) { + checkNotNull(destination); this.filename = filename; this.fileByteSize = fileByteSize; this.destination = destination; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java index 24693da03236..ff0c56fd8262 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java @@ -34,7 +34,7 @@ */ class WritePartition extends DoFn, List>> { - private final boolean singletonTable; + private final DestinationT singletonTable; private final PCollectionView tempFilePrefix; private final PCollectionView>> results; private TupleTag, List>> multiPartitionsTag; @@ -99,7 +99,7 @@ void addPartition(PartitionData partition) { } WritePartition( - boolean singletonTable, + DestinationT singletonTable, PCollectionView tempFilePrefix, PCollectionView>> results, TupleTag, List>> multiPartitionsTag, @@ -118,7 +118,7 @@ public void processElement(ProcessContext c) throws Exception { // If there are no elements to write _and_ the user specified a constant output table, then // generate an empty table of that name. - if (results.isEmpty() && singletonTable) { + if (results.isEmpty() && singletonTable != null) { String tempFilePrefix = c.sideInput(this.tempFilePrefix); TableRowWriter writer = new TableRowWriter(tempFilePrefix); writer.close(); @@ -127,7 +127,7 @@ public void processElement(ProcessContext c) throws Exception { // resolve it to the singleton output table. results.add( new Result( - writerResult.resourceId.toString(), writerResult.byteSize, null)); + writerResult.resourceId.toString(), writerResult.byteSize, singletonTable)); } Map currentResults = Maps.newHashMap(); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index 04bbac431ae9..25483a529769 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -1797,10 +1797,13 @@ private void testWritePartition(long numTables, long numFilesPerTable, long file // function) and there is no input data, WritePartition will generate an empty table. This // code is to test that path. boolean isSingleton = numTables == 1 && numFilesPerTable == 0; - + String singletonDestination = null + if (isSingleton) { + singletonDestination = "SINGLETON"; + } List> expectedPartitions = Lists.newArrayList(); if (isSingleton) { - expectedPartitions.add(ShardedKey.of(null, 1)); + expectedPartitions.add(ShardedKey.of(singletonDestination, 1)); } else { for (int i = 0; i < numTables; ++i) { for (int j = 1; j <= expectedNumPartitionsPerTable; ++j) { @@ -1842,8 +1845,8 @@ private void testWritePartition(long numTables, long numFilesPerTable, long file p.apply(Create.of(tempFilePrefix)).apply(View.asSingleton()); WritePartition writePartition = - new WritePartition<>( - isSingleton, tempFilePrefixView, resultsView, multiPartitionsTag, singlePartitionTag); + new WritePartition<>(singletonDestination, tempFilePrefixView, resultsView, + multiPartitionsTag, singlePartitionTag); DoFnTester, List>> tester = DoFnTester.of(writePartition); From 2c6e4b443110b0fcc1334e9d1fac7493b15a4982 Mon Sep 17 00:00:00 2001 From: Reuven Lax Date: Sun, 4 Jun 2017 22:11:41 -0700 Subject: [PATCH 2/2] Fix test. --- .../beam/sdk/io/gcp/bigquery/BatchLoads.java | 7 +-- .../sdk/io/gcp/bigquery/WritePartition.java | 13 +++-- .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 48 +++++++++---------- 3 files changed, 34 insertions(+), 34 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java index 12a2781a519f..c1b202ef0c82 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java @@ -251,16 +251,13 @@ public String apply(String input) { // loading. PCollection singleton = p.apply("singleton", Create.of((Void) null).withCoder(VoidCoder.of())); - DestinationT singletonTableDestination = null; - if (singletonTable) { - singletonTableDestination = dynamicDestinations.getDestination(null); - } PCollectionTuple partitions = singleton.apply( "WritePartition", ParDo.of( new WritePartition<>( - singletonTableDestination, + singletonTable, + dynamicDestinations, tempFilePrefix, resultsView, multiPartitionsTag, diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java index ff0c56fd8262..acd113296c83 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java @@ -34,7 +34,8 @@ */ class WritePartition extends DoFn, List>> { - private final DestinationT singletonTable; + private final boolean singletonTable; + private final DynamicDestinations dynamicDestinations; private final PCollectionView tempFilePrefix; private final PCollectionView>> results; private TupleTag, List>> multiPartitionsTag; @@ -99,12 +100,14 @@ void addPartition(PartitionData partition) { } WritePartition( - DestinationT singletonTable, + boolean singletonTable, + DynamicDestinations dynamicDestinations, PCollectionView tempFilePrefix, PCollectionView>> results, TupleTag, List>> multiPartitionsTag, TupleTag, List>> singlePartitionTag) { this.singletonTable = singletonTable; + this.dynamicDestinations = dynamicDestinations; this.results = results; this.tempFilePrefix = tempFilePrefix; this.multiPartitionsTag = multiPartitionsTag; @@ -118,7 +121,7 @@ public void processElement(ProcessContext c) throws Exception { // If there are no elements to write _and_ the user specified a constant output table, then // generate an empty table of that name. - if (results.isEmpty() && singletonTable != null) { + if (results.isEmpty() && singletonTable) { String tempFilePrefix = c.sideInput(this.tempFilePrefix); TableRowWriter writer = new TableRowWriter(tempFilePrefix); writer.close(); @@ -126,8 +129,8 @@ public void processElement(ProcessContext c) throws Exception { // Return a null destination in this case - the constant DynamicDestinations class will // resolve it to the singleton output table. results.add( - new Result( - writerResult.resourceId.toString(), writerResult.byteSize, singletonTable)); + new Result<>(writerResult.resourceId.toString(), writerResult.byteSize, + dynamicDestinations.getDestination(null))); } Map currentResults = Maps.newHashMap(); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index 25483a529769..bfd260a30879 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -1797,23 +1797,23 @@ private void testWritePartition(long numTables, long numFilesPerTable, long file // function) and there is no input data, WritePartition will generate an empty table. This // code is to test that path. boolean isSingleton = numTables == 1 && numFilesPerTable == 0; - String singletonDestination = null + DynamicDestinations dynamicDestinations = + new DynamicDestinationsHelpers.ConstantTableDestinations<>( + StaticValueProvider.of("SINGLETON"), ""); + List> expectedPartitions = Lists.newArrayList(); if (isSingleton) { - singletonDestination = "SINGLETON"; - } - List> expectedPartitions = Lists.newArrayList(); - if (isSingleton) { - expectedPartitions.add(ShardedKey.of(singletonDestination, 1)); + expectedPartitions.add(ShardedKey.of( + new TableDestination("SINGLETON", ""), 1)); } else { for (int i = 0; i < numTables; ++i) { for (int j = 1; j <= expectedNumPartitionsPerTable; ++j) { String tableName = String.format("project-id:dataset-id.tables%05d", i); - expectedPartitions.add(ShardedKey.of(tableName, j)); + expectedPartitions.add(ShardedKey.of(new TableDestination(tableName, ""), j)); } } } - List> files = Lists.newArrayList(); + List> files = Lists.newArrayList(); Map> filenamesPerTable = Maps.newHashMap(); for (int i = 0; i < numTables; ++i) { String tableName = String.format("project-id:dataset-id.tables%05d", i); @@ -1825,36 +1825,36 @@ private void testWritePartition(long numTables, long numFilesPerTable, long file for (int j = 0; j < numFilesPerTable; ++j) { String fileName = String.format("%s_files%05d", tableName, j); filenames.add(fileName); - files.add(new Result<>(fileName, fileSize, tableName)); + files.add(new Result<>(fileName, fileSize, new TableDestination(tableName, ""))); } } - TupleTag, List>> multiPartitionsTag = - new TupleTag, List>>("multiPartitionsTag") {}; - TupleTag, List>> singlePartitionTag = - new TupleTag, List>>("singlePartitionTag") {}; + TupleTag, List>> multiPartitionsTag = + new TupleTag, List>>("multiPartitionsTag") {}; + TupleTag, List>> singlePartitionTag = + new TupleTag, List>>("singlePartitionTag") {}; - PCollectionView>> resultsView = + PCollectionView>> resultsView = p.apply( Create.of(files) - .withCoder(WriteBundlesToFiles.ResultCoder.of(StringUtf8Coder.of()))) - .apply(View.>asIterable()); + .withCoder(WriteBundlesToFiles.ResultCoder.of(TableDestinationCoder.of()))) + .apply(View.>asIterable()); String tempFilePrefix = testFolder.newFolder("BigQueryIOTest").getAbsolutePath(); PCollectionView tempFilePrefixView = p.apply(Create.of(tempFilePrefix)).apply(View.asSingleton()); - WritePartition writePartition = - new WritePartition<>(singletonDestination, tempFilePrefixView, resultsView, - multiPartitionsTag, singlePartitionTag); + WritePartition writePartition = + new WritePartition<>(isSingleton, dynamicDestinations, tempFilePrefixView, + resultsView, multiPartitionsTag, singlePartitionTag); - DoFnTester, List>> tester = + DoFnTester, List>> tester = DoFnTester.of(writePartition); tester.setSideInput(resultsView, GlobalWindow.INSTANCE, files); tester.setSideInput(tempFilePrefixView, GlobalWindow.INSTANCE, tempFilePrefix); tester.processElement(null); - List, List>> partitions; + List, List>> partitions; if (expectedNumPartitionsPerTable > 1) { partitions = tester.takeOutputElements(multiPartitionsTag); } else { @@ -1862,10 +1862,10 @@ private void testWritePartition(long numTables, long numFilesPerTable, long file } - List> partitionsResult = Lists.newArrayList(); + List> partitionsResult = Lists.newArrayList(); Map> filesPerTableResult = Maps.newHashMap(); - for (KV, List> partition : partitions) { - String table = partition.getKey().getKey(); + for (KV, List> partition : partitions) { + String table = partition.getKey().getKey().getTableSpec(); partitionsResult.add(partition.getKey()); List tableFilesResult = filesPerTableResult.get(table); if (tableFilesResult == null) {