From 73ddd7b4fd44dce4be434726df57ecd84e6e3e6a Mon Sep 17 00:00:00 2001 From: gnanda Date: Mon, 2 May 2022 11:20:01 -0700 Subject: [PATCH] docs(sample): update WriteToDefaultStream sample to match best practices (#1631) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: Update WriteToDefaultStream sample to match best practices * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md Co-authored-by: Owl Bot --- .../bigquerystorage/WriteToDefaultStream.java | 185 +++++++++++++++--- 1 file changed, 157 insertions(+), 28 deletions(-) diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java index 630e06c62e..5cd39ed4d1 100644 --- a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java @@ -17,18 +17,28 @@ package com.example.bigquerystorage; // [START bigquerystorage_jsonstreamwriter_default] + import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutureCallback; +import com.google.api.core.ApiFutures; import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.BigQueryOptions; import com.google.cloud.bigquery.Schema; import com.google.cloud.bigquery.Table; import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; +import com.google.cloud.bigquery.storage.v1.Exceptions; +import com.google.cloud.bigquery.storage.v1.Exceptions.StorageException; import com.google.cloud.bigquery.storage.v1.JsonStreamWriter; import com.google.cloud.bigquery.storage.v1.TableName; import com.google.cloud.bigquery.storage.v1.TableSchema; +import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.MoreExecutors; import com.google.protobuf.Descriptors.DescriptorValidationException; +import io.grpc.Status; +import io.grpc.Status.Code; import java.io.IOException; -import java.util.concurrent.ExecutionException; +import java.util.concurrent.Phaser; +import javax.annotation.concurrent.GuardedBy; import org.json.JSONArray; import org.json.JSONObject; @@ -45,36 +55,155 @@ public static void runWriteToDefaultStream() public static void writeToDefaultStream(String projectId, String datasetName, String tableName) throws DescriptorValidationException, InterruptedException, IOException { - BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService(); - Table table = bigquery.getTable(datasetName, tableName); TableName parentTable = TableName.of(projectId, datasetName, tableName); - Schema schema = table.getDefinition().getSchema(); - TableSchema tableSchema = BqToBqStorageSchemaConverter.convertTableSchema(schema); - - // Use the JSON stream writer to send records in JSON format. - // For more information about JsonStreamWriter, see: - // https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.html - try (JsonStreamWriter writer = - JsonStreamWriter.newBuilder(parentTable.toString(), tableSchema).build()) { - // Write two batches to the stream, each with 10 JSON records. A writer should be used for as - // much writes as possible. Creating a writer for just one write is an antipattern. - for (int i = 0; i < 2; i++) { - // Create a JSON object that is compatible with the table schema. - JSONArray jsonArr = new JSONArray(); - for (int j = 0; j < 10; j++) { - JSONObject record = new JSONObject(); - record.put("test_string", String.format("record %03d-%03d", i, j)); - jsonArr.put(record); + + DataWriter writer = new DataWriter(); + // One time initialization for the worker. + writer.initialize(parentTable); + + // Write two batches of fake data to the stream, each with 10 JSON records. Data may be + // batched up to the maximum request size: + // https://cloud.google.com/bigquery/quotas#write-api-limits + for (int i = 0; i < 2; i++) { + // Create a JSON object that is compatible with the table schema. + JSONArray jsonArr = new JSONArray(); + for (int j = 0; j < 10; j++) { + JSONObject record = new JSONObject(); + record.put("test_string", String.format("record %03d-%03d", i, j)); + jsonArr.put(record); + } + + writer.append(new AppendContext(jsonArr, 0)); + } + + // Final cleanup for the stream during worker teardown. + writer.cleanup(); + System.out.println("Appended records successfully."); + } + + private static class AppendContext { + + JSONArray data; + int retryCount = 0; + + AppendContext(JSONArray data, int retryCount) { + this.data = data; + this.retryCount = retryCount; + } + } + + private static class DataWriter { + + private static final int MAX_RETRY_COUNT = 2; + private static final ImmutableList RETRIABLE_ERROR_CODES = + ImmutableList.of(Code.INTERNAL, Code.ABORTED, Code.CANCELLED); + + // Track the number of in-flight requests to wait for all responses before shutting down. + private final Phaser inflightRequestCount = new Phaser(1); + private final Object lock = new Object(); + private JsonStreamWriter streamWriter; + + @GuardedBy("lock") + private RuntimeException error = null; + + public void initialize(TableName parentTable) + throws DescriptorValidationException, IOException, InterruptedException { + // Retrive table schema information. + BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService(); + Table table = bigquery.getTable(parentTable.getDataset(), parentTable.getTable()); + Schema schema = table.getDefinition().getSchema(); + TableSchema tableSchema = BqToBqStorageSchemaConverter.convertTableSchema(schema); + + // Use the JSON stream writer to send records in JSON format. Specify the table name to write + // to the default stream. + // For more information about JsonStreamWriter, see: + // https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.html + streamWriter = JsonStreamWriter.newBuilder(parentTable.toString(), tableSchema).build(); + } + + public void append(AppendContext appendContext) + throws DescriptorValidationException, IOException { + synchronized (this.lock) { + // If earlier appends have failed, we need to reset before continuing. + if (this.error != null) { + throw this.error; } - ApiFuture future = writer.append(jsonArr); - AppendRowsResponse response = future.get(); } - System.out.println("Appended records successfully."); - } catch (ExecutionException e) { - // If the wrapped exception is a StatusRuntimeException, check the state of the operation. - // If the state is INTERNAL, CANCELLED, or ABORTED, you can retry. For more information, see: - // https://grpc.github.io/grpc-java/javadoc/io/grpc/StatusRuntimeException.html - System.out.println("Failed to append records. \n" + e.toString()); + // Append asynchronously for increased throughput. + ApiFuture future = streamWriter.append(appendContext.data); + ApiFutures.addCallback( + future, new AppendCompleteCallback(this, appendContext), MoreExecutors.directExecutor()); + + // Increase the count of in-flight requests. + inflightRequestCount.register(); + } + + public void cleanup() { + // Wait for all in-flight requests to complete. + inflightRequestCount.arriveAndAwaitAdvance(); + + // Close the connection to the server. + streamWriter.close(); + + // Verify that no error occurred in the stream. + synchronized (this.lock) { + if (this.error != null) { + throw this.error; + } + } + } + + static class AppendCompleteCallback implements ApiFutureCallback { + + private final DataWriter parent; + private final AppendContext appendContext; + + public AppendCompleteCallback(DataWriter parent, AppendContext appendContext) { + this.parent = parent; + this.appendContext = appendContext; + } + + public void onSuccess(AppendRowsResponse response) { + System.out.format("Append success\n"); + done(); + } + + public void onFailure(Throwable throwable) { + // If the wrapped exception is a StatusRuntimeException, check the state of the operation. + // If the state is INTERNAL, CANCELLED, or ABORTED, you can retry. For more information, + // see: https://grpc.github.io/grpc-java/javadoc/io/grpc/StatusRuntimeException.html + Status status = Status.fromThrowable(throwable); + if (appendContext.retryCount < MAX_RETRY_COUNT + && RETRIABLE_ERROR_CODES.contains(status.getCode())) { + appendContext.retryCount++; + try { + // Since default stream appends are not ordered, we can simply retry the appends. + // Retrying with exclusive streams requires more careful consideration. + this.parent.append(appendContext); + // Mark the existing attempt as done since it's being retried. + done(); + return; + } catch (Exception e) { + // Fall through to return error. + System.out.format("Failed to retry append: %s\n", e); + } + } + + synchronized (this.parent.lock) { + if (this.parent.error == null) { + StorageException storageException = Exceptions.toStorageException(throwable); + this.parent.error = + (storageException != null) ? storageException : new RuntimeException(throwable); + } + } + System.out.format("Error: %s\n", throwable); + done(); + } + + private void done() { + // Reduce the count of in-flight requests. + this.parent.inflightRequestCount.arriveAndDeregister(); + } } } }