From 8093bde948e607ee3e9e50f4df04fe8b0fbd0510 Mon Sep 17 00:00:00 2001 From: sammcveety Date: Thu, 15 Dec 2016 10:56:12 -0800 Subject: [PATCH 1/3] Update BigQueryIOTest.java --- .../cloud/dataflow/sdk/io/BigQueryIOTest.java | 366 ++++++++++++++++++ 1 file changed, 366 insertions(+) diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/BigQueryIOTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/BigQueryIOTest.java index b9d79adace..38377aa5ac 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/BigQueryIOTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/BigQueryIOTest.java @@ -397,6 +397,161 @@ public Job getJob(JobReference jobRef) throws InterruptedException { } } + private static class TableContainer { + Table table; + List rows; + List ids; + + TableContainer(Table table) { + this.table = table; + this.rows = new ArrayList<>(); + this.ids = new ArrayList<>(); + } + + TableContainer addRow(TableRow row, String id) { + rows.add(row); + ids.add(id); + return this; + } + + Table getTable() { + return table; + } + + List getRows() { + return rows; + } + } + + // Table information must be static, as each ParDo will get a separate instance of + // FakeDatasetServices, and they must all modify the same storage. + private static com.google.common.collect.Table> + tables = HashBasedTable.create(); + + /** A fake dataset service that can be serialized, for use in testReadFromTable. */ + private static class FakeDatasetService implements DatasetService, Serializable { + + @Override + public Table getTable(String projectId, String datasetId, String tableId) + throws InterruptedException, IOException { + synchronized (tables) { + Map dataset = + checkNotNull( + tables.get(projectId, datasetId), + "Tried to get a dataset %s:%s from %s, but no such dataset was set", + projectId, + datasetId, + tableId, + FakeDatasetService.class.getSimpleName()); + TableContainer tableContainer = dataset.get(tableId); + return tableContainer == null ? null : tableContainer.getTable(); + } + } + + public List getAllRows(String projectId, String datasetId, String tableId) + throws InterruptedException, IOException { + synchronized (tables) { + return getTableContainer(projectId, datasetId, tableId).getRows(); + } + } + + private TableContainer getTableContainer(String projectId, String datasetId, String tableId) + throws InterruptedException, IOException { + synchronized (tables) { + Map dataset = + checkNotNull( + tables.get(projectId, datasetId), + "Tried to get a dataset %s:%s from %s, but no such dataset was set", + projectId, + datasetId, + FakeDatasetService.class.getSimpleName()); + return checkNotNull(dataset.get(tableId), + "Tried to get a table %s:%s.%s from %s, but no such table was set", + projectId, + datasetId, + tableId, + FakeDatasetService.class.getSimpleName()); + } + } + + @Override + public void deleteTable(String projectId, String datasetId, String tableId) + throws IOException, InterruptedException { + throw new UnsupportedOperationException("Unsupported"); + } + + + @Override + public void createTable(Table table) throws IOException { + TableReference tableReference = table.getTableReference(); + synchronized (tables) { + Map dataset = + checkNotNull( + tables.get(tableReference.getProjectId(), tableReference.getDatasetId()), + "Tried to get a dataset %s:%s from %s, but no such table was set", + tableReference.getProjectId(), + tableReference.getDatasetId(), + FakeDatasetService.class.getSimpleName()); + TableContainer tableContainer = dataset.get(tableReference.getTableId()); + System.out.println("CREATING TABLE " + tableReference); + if (tableContainer == null) { + tableContainer = new TableContainer(table); + dataset.put(tableReference.getTableId(), tableContainer); + } + } + } + + @Override + public boolean isTableEmpty(String projectId, String datasetId, String tableId) + throws IOException, InterruptedException { + Long numBytes = getTable(projectId, datasetId, tableId).getNumBytes(); + return numBytes == null || numBytes == 0L; + } + + @Override + public Dataset getDataset( + String projectId, String datasetId) throws IOException, InterruptedException { + throw new UnsupportedOperationException("Unsupported"); + } + + @Override + public void createDataset( + String projectId, String datasetId, String location, String description) + throws IOException, InterruptedException { + synchronized (tables) { + Map dataset = tables.get(projectId, datasetId); + if (dataset == null) { + dataset = new HashMap<>(); + tables.put(projectId, datasetId, dataset); + } + } + } + + @Override + public void deleteDataset(String projectId, String datasetId) + throws IOException, InterruptedException { + throw new UnsupportedOperationException("Unsupported"); + } + + @Override + public long insertAll( + TableReference ref, List rowList, @Nullable List insertIdList) + throws IOException, InterruptedException { + synchronized (tables) { + assertEquals(rowList.size(), insertIdList.size()); + + long dataSize = 0; + TableContainer tableContainer = getTableContainer( + ref.getProjectId(), ref.getDatasetId(), ref.getTableId()); + for (int i = 0; i < rowList.size(); ++i) { + tableContainer.addRow(rowList.get(i), insertIdList.get(i)); + dataSize += rowList.get(i).toString().length(); + } + return dataSize; + } + } + } + @Rule public transient ExpectedException thrown = ExpectedException.none(); @Rule public transient ExpectedLogs logged = ExpectedLogs.none(BigQueryIO.class); @Rule public transient TemporaryFolder testFolder = new TemporaryFolder(); @@ -676,6 +831,217 @@ public void testWrite() throws Exception { testNumFiles(tempDir, 0); } + @Test + @Category(NeedsRunner.class) + public void testStreamingWrite() throws Exception { + BigQueryOptions bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class); + bqOptions.setProject("defaultProject"); + bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath()); + + FakeDatasetService datasetService = new FakeDatasetService(); + datasetService.createDataset("project-id", "dataset-id", "", ""); + FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() + .withDatasetService(datasetService); + + Pipeline p = TestPipeline.create(bqOptions); + p.apply(Create.of( + new TableRow().set("name", "a").set("number", 1), + new TableRow().set("name", "b").set("number", 2), + new TableRow().set("name", "c").set("number", 3), + new TableRow().set("name", "d").set("number", 4)) + .withCoder(TableRowJsonCoder.of())) + .setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED) + .apply(BigQueryIO.Write.to("project-id:dataset-id.table-id") + .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED) + .withSchema(new TableSchema().setFields( + ImmutableList.of( + new TableFieldSchema().setName("name").setType("STRING"), + new TableFieldSchema().setName("number").setType("INTEGER")))) + .withTestServices(fakeBqServices) + .withoutValidation()); + p.run(); + + + assertThat(datasetService.getAllRows("project-id", "dataset-id", "table-id"), + containsInAnyOrder( + new TableRow().set("name", "a").set("number", 1), + new TableRow().set("name", "b").set("number", 2), + new TableRow().set("name", "c").set("number", 3), + new TableRow().set("name", "d").set("number", 4))); + } + + /** + * A generic window function that allows partitioning data into windows by a string value. + * + *

Logically, creates multiple global windows, and the user provides a function that + * decides which global window a value should go into. + */ + public static class PartitionedGlobalWindows extends + + NonMergingWindowFn { + private SerializableFunction extractPartition; + + public PartitionedGlobalWindows(SerializableFunction extractPartition) { + this.extractPartition = extractPartition; + } + + @Override + public Collection assignWindows(AssignContext c) { + return Collections.singletonList(new PartitionedGlobalWindow( + extractPartition.apply(c.element()))); + } + + @Override + public boolean isCompatible(WindowFn o) { + return o instanceof PartitionedGlobalWindows; + } + + @Override + public Coder windowCoder() { + return new PartitionedGlobalWindowCoder(); + } + + @Override + public PartitionedGlobalWindow getSideInputWindow(BoundedWindow window) { + throw new UnsupportedOperationException( + "PartitionedGlobalWindows is not allowed in side inputs"); + } + + @Override + public Instant getOutputTime(Instant inputTimestamp, PartitionedGlobalWindow window) { + return inputTimestamp; + } + } + + /** + * Custom Window object that encodes a String value. + */ + public static class PartitionedGlobalWindow extends BoundedWindow { + String value; + + public PartitionedGlobalWindow(String value) { + this.value = value; + } + + @Override + public Instant maxTimestamp() { + return GlobalWindow.INSTANCE.maxTimestamp(); + } + + // The following methods are only needed due to BEAM-1022. Once this issue is fixed, we will + // no longer need these. + @Override public boolean equals(Object other) { + if (other instanceof PartitionedGlobalWindow) { + return value.equals(((PartitionedGlobalWindow) other).value); + } + return false; + } + + @Override public int hashCode() { + return value.hashCode(); + } + } + + /** + * Coder for @link{PartitionedGlobalWindow}. + */ + public static class PartitionedGlobalWindowCoder extends AtomicCoder { + @Override + public void encode(PartitionedGlobalWindow window, OutputStream outStream, Context context) + throws IOException, CoderException { + StringUtf8Coder.of().encode(window.value, outStream, context); + } + + @Override + public PartitionedGlobalWindow decode(InputStream inStream, Context context) + throws IOException, CoderException { + return new PartitionedGlobalWindow(StringUtf8Coder.of().decode(inStream, context)); + } + } + + @Test + @Category(NeedsRunner.class) + public void testStreamingWriteWithWindowFn() throws Exception { + BigQueryOptions bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class); + bqOptions.setProject("defaultProject"); + bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath()); + + FakeDatasetService datasetService = new FakeDatasetService(); + datasetService.createDataset("project-id", "dataset-id", "", ""); + FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() + .withDatasetService(datasetService); + + List inserts = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + inserts.add(new TableRow().set("name", "number" + i).set("number", i)); + } + + // Create a windowing strategy that puts the input into five different windows depending on + // record value. + WindowFn window = new PartitionedGlobalWindows( + new SerializableFunction() { + @Override + public String apply(Object value) { + try { + if (value instanceof TableRow) { + int intValue = (Integer) ((TableRow) value).get("number") % 5; + return Integer.toString(intValue); + } + } catch (NumberFormatException e) { + // ignored. + } + return value.toString(); + } + } + ); + + SerializableFunction tableFunction = + new SerializableFunction() { + @Override + public String apply(BoundedWindow input) { + return "project-id:dataset-id.table-id-" + ((PartitionedGlobalWindow) input).value; + } + }; + + Pipeline p = TestPipeline.create(bqOptions); + p.apply(Create.of(inserts) + .withCoder(TableRowJsonCoder.of())) + .setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED) + .apply(Window.into(window)) + .apply(BigQueryIO.Write + .to(tableFunction) + .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED) + .withSchema(new TableSchema().setFields( + ImmutableList.of( + new TableFieldSchema().setName("name").setType("STRING"), + new TableFieldSchema().setName("number").setType("INTEGER")))) + .withTestServices(fakeBqServices) + .withoutValidation()); + p.run(); + + + assertThat(datasetService.getAllRows("project-id", "dataset-id", "table-id-0"), + containsInAnyOrder( + new TableRow().set("name", "number0").set("number", 0), + new TableRow().set("name", "number5").set("number", 5))); + assertThat(datasetService.getAllRows("project-id", "dataset-id", "table-id-1"), + containsInAnyOrder( + new TableRow().set("name", "number1").set("number", 1), + new TableRow().set("name", "number6").set("number", 6))); + assertThat(datasetService.getAllRows("project-id", "dataset-id", "table-id-2"), + containsInAnyOrder( + new TableRow().set("name", "number2").set("number", 2), + new TableRow().set("name", "number7").set("number", 7))); + assertThat(datasetService.getAllRows("project-id", "dataset-id", "table-id-3"), + containsInAnyOrder( + new TableRow().set("name", "number3").set("number", 3), + new TableRow().set("name", "number8").set("number", 8))); + assertThat(datasetService.getAllRows("project-id", "dataset-id", "table-id-4"), + containsInAnyOrder( + new TableRow().set("name", "number4").set("number", 4), + new TableRow().set("name", "number9").set("number", 9))); + } + @Test public void testWriteUnknown() throws Exception { BigQueryOptions bqOptions = PipelineOptionsFactory.as(BigQueryOptions.class); From 1434a2ab3daf3cc2f93be50708de1163e8fea9cc Mon Sep 17 00:00:00 2001 From: Sam McVeety Date: Thu, 15 Dec 2016 11:25:29 -0800 Subject: [PATCH 2/3] Fixups --- .../dataflow/sdk/util/BigQueryServices.java | 13 +++++++++++++ .../sdk/util/BigQueryServicesImpl.java | 13 +++++++++++++ .../cloud/dataflow/sdk/io/BigQueryIOTest.java | 18 ++++++++++++++++-- 3 files changed, 42 insertions(+), 2 deletions(-) diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryServices.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryServices.java index ec96009494..7eac05f757 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryServices.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryServices.java @@ -30,6 +30,7 @@ import java.io.IOException; import java.io.Serializable; +import java.util.List; import java.util.NoSuchElementException; import javax.annotation.Nullable; @@ -141,6 +142,18 @@ void createDataset( */ void deleteDataset(String projectId, String datasetId) throws IOException, InterruptedException; + + void createTable(Table table) throws IOException; + + boolean isTableEmpty(String projectId, String datasetId, String tableId) + throws IOException, InterruptedException; + + Dataset getDataset( + String projectId, String datasetId) throws IOException, InterruptedException; + + long insertAll( + TableReference ref, List rowList, @Nullable List insertIdList) + throws IOException, InterruptedException; } /** diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryServicesImpl.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryServicesImpl.java index 1a37e01375..271ee8313b 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryServicesImpl.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryServicesImpl.java @@ -45,6 +45,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.List; import java.util.NoSuchElementException; import javax.annotation.Nullable; @@ -510,6 +511,18 @@ public void deleteDataset(String projectId, String datasetId) Sleeper.DEFAULT, backoff); } + + @Override + public void createTable(Table table) throws IOException { + throw new RuntimeException("Not supported"); + } + + @Override + public long insertAll( + TableReference ref, List rowList, @Nullable List insertIdList) + throws IOException, InterruptedException { + throw new RuntimeException("Not supported"); + } } private static class BigQueryJsonReaderImpl implements BigQueryJsonReader { diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/BigQueryIOTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/BigQueryIOTest.java index 38377aa5ac..4a4c40cf3e 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/BigQueryIOTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/BigQueryIOTest.java @@ -20,7 +20,9 @@ import static com.google.cloud.dataflow.sdk.io.BigQueryIO.toJsonString; import static com.google.cloud.dataflow.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; @@ -32,6 +34,7 @@ import static org.mockito.Mockito.when; import com.google.api.client.util.Data; +import com.google.api.services.bigquery.model.Dataset; import com.google.api.services.bigquery.model.ErrorProto; import com.google.api.services.bigquery.model.Job; import com.google.api.services.bigquery.model.JobConfigurationExtract; @@ -49,6 +52,8 @@ import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.coders.AtomicCoder; +import com.google.cloud.dataflow.sdk.coders.Coder; import com.google.cloud.dataflow.sdk.coders.CoderException; import com.google.cloud.dataflow.sdk.coders.KvCoder; import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; @@ -89,6 +94,10 @@ import com.google.cloud.dataflow.sdk.transforms.display.DisplayData; import com.google.cloud.dataflow.sdk.transforms.display.DisplayDataEvaluator; import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; +import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindow; +import com.google.cloud.dataflow.sdk.transforms.windowing.NonMergingWindowFn; +import com.google.cloud.dataflow.sdk.transforms.windowing.Window; +import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn; import com.google.cloud.dataflow.sdk.util.BigQueryServices; import com.google.cloud.dataflow.sdk.util.BigQueryServices.DatasetService; import com.google.cloud.dataflow.sdk.util.BigQueryServices.JobService; @@ -102,12 +111,14 @@ import com.google.cloud.dataflow.sdk.values.PCollectionView; import com.google.cloud.dataflow.sdk.values.TupleTag; import com.google.common.base.Strings; +import com.google.common.collect.HashBasedTable; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import org.hamcrest.CoreMatchers; import org.hamcrest.Matchers; +import org.joda.time.Instant; import org.junit.Assert; import org.junit.Before; import org.junit.Rule; @@ -124,11 +135,16 @@ import java.io.File; import java.io.FileFilter; +import java.io.InputStream; import java.io.IOException; +import java.io.OutputStream; import java.io.Serializable; import java.nio.file.Files; import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; @@ -832,7 +848,6 @@ public void testWrite() throws Exception { } @Test - @Category(NeedsRunner.class) public void testStreamingWrite() throws Exception { BigQueryOptions bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class); bqOptions.setProject("defaultProject"); @@ -960,7 +975,6 @@ public PartitionedGlobalWindow decode(InputStream inStream, Context context) } @Test - @Category(NeedsRunner.class) public void testStreamingWriteWithWindowFn() throws Exception { BigQueryOptions bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class); bqOptions.setProject("defaultProject"); From fb76dbdf47a7cca50078e1d39d79164e128d0b59 Mon Sep 17 00:00:00 2001 From: Sam McVeety Date: Thu, 15 Dec 2016 11:44:38 -0800 Subject: [PATCH 3/3] Fixups --- .../cloud/dataflow/sdk/io/BigQueryIO.java | 34 ++- .../sdk/util/BigQueryServicesImpl.java | 234 +++++++++++++++++- 2 files changed, 250 insertions(+), 18 deletions(-) diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java index 1db12bb15f..138effacb8 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java @@ -31,6 +31,7 @@ import com.google.api.services.bigquery.model.JobStatistics; import com.google.api.services.bigquery.model.JobStatus; import com.google.api.services.bigquery.model.QueryRequest; +import com.google.api.services.bigquery.model.Table; import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; @@ -2200,7 +2201,8 @@ public TableSchema getSchema() { /** Returns the table reference, or {@code null}. */ @Nullable public ValueProvider getTable() { - return NestedValueProvider.of(jsonTableRef, new JsonTableRefToTableRef()); + return jsonTableRef == null ? null : + NestedValueProvider.of(jsonTableRef, new JsonTableRefToTableRef()); } /** Returns {@code true} if table validation is enabled. */ @@ -2622,6 +2624,11 @@ private static void verifyTablePresence(BigQueryOptions options, TableReference } } + @VisibleForTesting + static void clearCreatedTables() { + StreamingWriteFn.clearCreatedTables(); + } + ///////////////////////////////////////////////////////////////////////////// /** @@ -2654,6 +2661,12 @@ private static class StreamingWriteFn NestedValueProvider.of(schema, new TableSchemaToJsonSchema()); } + private static void clearCreatedTables() { + synchronized (createdTables) { + createdTables.clear(); + } + } + /** Prepares a target BigQuery table. */ @Override public void startBundle(Context context) { @@ -2696,20 +2709,25 @@ public void populateDisplayData(DisplayData.Builder builder) { } public TableReference getOrCreateTable(BigQueryOptions options, String tableSpec) - throws IOException { + throws InterruptedException, IOException { TableReference tableReference = parseTableSpec(tableSpec); if (!createdTables.contains(tableSpec)) { synchronized (createdTables) { // Another thread may have succeeded in creating the table in the meanwhile, so // check again. This check isn't needed for correctness, but we add it to prevent // every thread from attempting a create and overwhelming our BigQuery quota. + DatasetService datasetService = bqServices.getDatasetService(options); if (!createdTables.contains(tableSpec)) { - TableSchema tableSchema = JSON_FACTORY.fromString( - jsonTableSchema.get(), TableSchema.class); - Bigquery client = Transport.newBigQueryClient(options).build(); - BigQueryTableInserter inserter = new BigQueryTableInserter(client); - inserter.getOrCreateTable(tableReference, Write.WriteDisposition.WRITE_APPEND, - Write.CreateDisposition.CREATE_IF_NEEDED, tableSchema); + Table table = datasetService.getTable( + tableReference.getProjectId(), + tableReference.getDatasetId(), + tableReference.getTableId()); + if (table == null) { + TableSchema tableSchema = JSON_FACTORY.fromString( + jsonTableSchema.get(), TableSchema.class); + datasetService.createTable( + new Table().setTableReference(tableReference).setSchema(tableSchema)); + } createdTables.add(tableSpec); } } diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryServicesImpl.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryServicesImpl.java index 271ee8313b..babf81bb34 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryServicesImpl.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryServicesImpl.java @@ -19,6 +19,7 @@ import com.google.api.client.googleapis.services.AbstractGoogleClientRequest; import com.google.api.client.util.BackOff; import com.google.api.client.util.BackOffUtils; +import com.google.api.client.util.ExponentialBackOff; import com.google.api.client.util.Sleeper; import com.google.api.services.bigquery.Bigquery; import com.google.api.services.bigquery.model.Dataset; @@ -33,10 +34,13 @@ import com.google.api.services.bigquery.model.JobStatistics; import com.google.api.services.bigquery.model.JobStatus; import com.google.api.services.bigquery.model.Table; +import com.google.api.services.bigquery.model.TableDataInsertAllRequest; +import com.google.api.services.bigquery.model.TableDataInsertAllResponse; import com.google.api.services.bigquery.model.TableDataList; import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableRow; import com.google.cloud.dataflow.sdk.options.BigQueryOptions; +import com.google.cloud.dataflow.sdk.transforms.SerializableFunction; import com.google.cloud.hadoop.util.ApiErrorExtractor; import com.google.common.annotations.VisibleForTesting; @@ -47,6 +51,8 @@ import java.io.IOException; import java.util.List; import java.util.NoSuchElementException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; @@ -268,7 +274,7 @@ public JobStatistics dryRunQuery(String projectId, JobConfigurationQuery queryCo "Unable to dry run query: %s, aborting after %d retries.", queryConfig, MAX_RPC_RETRIES), Sleeper.DEFAULT, - backoff).getStatistics(); + backoff, ALWAYS_RETRY).getStatistics(); } /** @@ -368,7 +374,7 @@ public Table getTable(String projectId, String datasetId, String tableId) "Unable to get table: %s, aborting after %d retries.", tableId, MAX_RPC_RETRIES), Sleeper.DEFAULT, - backoff); + backoff, DONT_RETRY_NOT_FOUND); } /** @@ -390,7 +396,7 @@ public void deleteTable(String projectId, String datasetId, String tableId) "Unable to delete table: %s, aborting after %d retries.", tableId, MAX_RPC_RETRIES), Sleeper.DEFAULT, - backoff); + backoff, ALWAYS_RETRY); } public boolean isTableEmpty(String projectId, String datasetId, String tableId) @@ -404,7 +410,7 @@ public boolean isTableEmpty(String projectId, String datasetId, String tableId) "Unable to list table data: %s, aborting after %d retries.", tableId, MAX_RPC_RETRIES), Sleeper.DEFAULT, - backoff); + backoff, ALWAYS_RETRY); return dataList.getRows() == null || dataList.getRows().isEmpty(); } @@ -426,7 +432,7 @@ public Dataset getDataset(String projectId, String datasetId) "Unable to get dataset: %s, aborting after %d retries.", datasetId, MAX_RPC_RETRIES), Sleeper.DEFAULT, - backoff); + backoff, DONT_RETRY_NOT_FOUND); } /** @@ -509,19 +515,206 @@ public void deleteDataset(String projectId, String datasetId) "Unable to delete table: %s, aborting after %d retries.", datasetId, MAX_RPC_RETRIES), Sleeper.DEFAULT, - backoff); + backoff, ALWAYS_RETRY); } + private static final int RETRY_CREATE_TABLE_DURATION_MILLIS = + (int) TimeUnit.MINUTES.toMillis(5); + @Override - public void createTable(Table table) throws IOException { - throw new RuntimeException("Not supported"); + public void createTable(Table table) throws InterruptedException, IOException { + LOG.info("Trying to create BigQuery table: {}", + BigQueryIO.toTableSpec(table.getTableReference())); + BackOff backoff = + new ExponentialBackOff.Builder() + .setMaxElapsedTimeMillis(RETRY_CREATE_TABLE_DURATION_MILLIS) + .build(); + + tryCreateTable(table, backoff, Sleeper.DEFAULT); + } + + @VisibleForTesting + @Nullable + Table tryCreateTable(Table table, BackOff backoff, Sleeper sleeper) + throws IOException { + boolean retry = false; + while (true) { + try { + return client.tables().insert( + table.getTableReference().getProjectId(), + table.getTableReference().getDatasetId(), + table).execute(); + } catch (IOException e) { + ApiErrorExtractor extractor = new ApiErrorExtractor(); + if (extractor.itemAlreadyExists(e)) { + // The table already exists, nothing to return. + return null; + } else if (extractor.rateLimited(e)) { + // The request failed because we hit a temporary quota. Back off and try again. + try { + if (BackOffUtils.next(sleeper, backoff)) { + if (!retry) { + LOG.info( + "Quota limit reached when creating table {}:{}.{}, retrying up to {} minutes", + table.getTableReference().getProjectId(), + table.getTableReference().getDatasetId(), + table.getTableReference().getTableId(), + TimeUnit.MILLISECONDS.toSeconds(RETRY_CREATE_TABLE_DURATION_MILLIS) / 60.0); + retry = true; + } + continue; + } + } catch (InterruptedException e1) { + // Restore interrupted state and throw the last failure. + Thread.currentThread().interrupt(); + throw e; + } + } + throw e; + } + } + } + + @VisibleForTesting + long insertAll(TableReference ref, List rowList, @Nullable List insertIdList, + BackOff backoff, final Sleeper sleeper) throws IOException, InterruptedException { + checkNotNull(ref, "ref"); + if (executor == null) { + this.executor = options.as(GcsOptions.class).getExecutorService(); + } + if (insertIdList != null && rowList.size() != insertIdList.size()) { + throw new AssertionError("If insertIdList is not null it needs to have at least " + + "as many elements as rowList"); + } + + long retTotalDataSize = 0; + List allErrors = new ArrayList<>(); + // These lists contain the rows to publish. Initially the contain the entire list. + // If there are failures, they will contain only the failed rows to be retried. + List rowsToPublish = rowList; + List idsToPublish = insertIdList; + while (true) { + List retryRows = new ArrayList<>(); + List retryIds = (idsToPublish != null) ? new ArrayList() : null; + + int strideIndex = 0; + // Upload in batches. + List rows = new LinkedList<>(); + int dataSize = 0; + + List>> futures = new ArrayList<>(); + List strideIndices = new ArrayList<>(); + + for (int i = 0; i < rowsToPublish.size(); ++i) { + TableRow row = rowsToPublish.get(i); + TableDataInsertAllRequest.Rows out = new TableDataInsertAllRequest.Rows(); + if (idsToPublish != null) { + out.setInsertId(idsToPublish.get(i)); + } + out.setJson(row.getUnknownKeys()); + rows.add(out); + + dataSize += row.toString().length(); + if (dataSize >= UPLOAD_BATCH_SIZE_BYTES || rows.size() >= maxRowsPerBatch + || i == rowsToPublish.size() - 1) { + TableDataInsertAllRequest content = new TableDataInsertAllRequest(); + content.setRows(rows); + + final Bigquery.Tabledata.InsertAll insert = client.tabledata() + .insertAll(ref.getProjectId(), ref.getDatasetId(), ref.getTableId(), + content); + + futures.add( + executor.submit(new Callable>() { + @Override + public List call() throws IOException { + BackOff backoff = DEFAULT_BACKOFF_FACTORY.backoff(); + while (true) { + try { + return insert.execute().getInsertErrors(); + } catch (IOException e) { + if (new ApiErrorExtractor().rateLimited(e)) { + LOG.info("BigQuery insertAll exceeded rate limit, retrying"); + try { + sleeper.sleep(backoff.nextBackOffMillis()); + } catch (InterruptedException interrupted) { + throw new IOException( + "Interrupted while waiting before retrying insertAll"); + } + } else { + throw e; + } + } + } + } + })); + strideIndices.add(strideIndex); + + retTotalDataSize += dataSize; + + dataSize = 0; + strideIndex = i + 1; + rows = new LinkedList<>(); + } + } + + try { + for (int i = 0; i < futures.size(); i++) { + List errors = futures.get(i).get(); + if (errors != null) { + for (TableDataInsertAllResponse.InsertErrors error : errors) { + allErrors.add(error); + if (error.getIndex() == null) { + throw new IOException("Insert failed: " + allErrors); + } + + int errorIndex = error.getIndex().intValue() + strideIndices.get(i); + retryRows.add(rowsToPublish.get(errorIndex)); + if (retryIds != null) { + retryIds.add(idsToPublish.get(errorIndex)); + } + } + } + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("Interrupted while inserting " + rowsToPublish); + } catch (ExecutionException e) { + throw new RuntimeException(e.getCause()); + } + + if (allErrors.isEmpty()) { + break; + } + long nextBackoffMillis = backoff.nextBackOffMillis(); + if (nextBackoffMillis == BackOff.STOP) { + break; + } + try { + sleeper.sleep(nextBackoffMillis); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException( + "Interrupted while waiting before retrying insert of " + retryRows); + } + rowsToPublish = retryRows; + idsToPublish = retryIds; + allErrors.clear(); + LOG.info("Retrying {} failed inserts to BigQuery", rowsToPublish.size()); + } + if (!allErrors.isEmpty()) { + throw new IOException("Insert failed: " + allErrors); + } else { + return retTotalDataSize; + } } @Override public long insertAll( TableReference ref, List rowList, @Nullable List insertIdList) throws IOException, InterruptedException { - throw new RuntimeException("Not supported"); + return insertAll( + ref, rowList, insertIdList, INSERT_BACKOFF_FACTORY.backoff(), Sleeper.DEFAULT); } } @@ -583,12 +776,30 @@ public void close() throws IOException { } } + static final SerializableFunction DONT_RETRY_NOT_FOUND = + new SerializableFunction() { + @Override + public Boolean apply(IOException input) { + ApiErrorExtractor errorExtractor = new ApiErrorExtractor(); + return !errorExtractor.itemNotFound(input); + } + }; + + static final SerializableFunction ALWAYS_RETRY = + new SerializableFunction() { + @Override + public Boolean apply(IOException input) { + return true; + } + }; + @VisibleForTesting static T executeWithRetries( AbstractGoogleClientRequest request, String errorMessage, Sleeper sleeper, - BackOff backoff) + BackOff backoff, + SerializableFunction shouldRetry) throws IOException, InterruptedException { Exception lastException = null; do { @@ -597,6 +808,9 @@ static T executeWithRetries( } catch (IOException e) { LOG.warn("Ignore the error and retry the request.", e); lastException = e; + if (!shouldRetry.apply(e)) { + break; + } } } while (nextBackOff(sleeper, backoff)); throw new IOException(