diff --git a/tutorials/JsonWriterDefaultStream/src/main/java/com/example/JsonWriterDefaultStream.java b/tutorials/JsonWriterDefaultStream/src/main/java/com/example/JsonWriterDefaultStream.java index a0a01f82bb..93a6179525 100644 --- a/tutorials/JsonWriterDefaultStream/src/main/java/com/example/JsonWriterDefaultStream.java +++ b/tutorials/JsonWriterDefaultStream/src/main/java/com/example/JsonWriterDefaultStream.java @@ -17,6 +17,8 @@ package com.example.bigquerystorage; import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutureCallback; +import com.google.api.core.ApiFutures; import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.BigQueryOptions; import com.google.cloud.bigquery.Field; @@ -31,11 +33,11 @@ import com.google.cloud.bigquery.storage.v1.JsonStreamWriter; import com.google.cloud.bigquery.storage.v1.TableName; import com.google.cloud.bigquery.storage.v1.TableSchema; +import com.google.common.util.concurrent.MoreExecutors; import com.google.protobuf.Descriptors.DescriptorValidationException; import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; -import java.util.concurrent.ExecutionException; import org.json.JSONArray; import org.json.JSONObject; @@ -118,20 +120,32 @@ public static void writeToDefaultStream( } } // batch ApiFuture future = writer.append(jsonArr); - AppendRowsResponse response = future.get(); - if (response.hasUpdatedSchema()) { - // The destination table schema has changed. The client library automatically - // reestablishes a connection to the backend using the new schema, so we can continue - // to send data without interruption. - System.out.println("Table schema changed."); - } + // The append method is asynchronous. Rather than waiting for the method to complete, + // which can hurt performance, register a completion callback and continue streaming. + ApiFutures.addCallback( + future, new AppendCompleteCallback(), MoreExecutors.directExecutor()); } - System.out.println("Appended records successfully."); - } catch (ExecutionException e) { - // If the wrapped exception is a StatusRuntimeException, check the state of the operation. - // If the state is INTERNAL, CANCELLED, or ABORTED, you can retry. For more information, see: - // https://grpc.github.io/grpc-java/javadoc/io/grpc/StatusRuntimeException.html - System.out.println("Failed to append records. \n" + e.toString()); } } } + +class AppendCompleteCallback implements ApiFutureCallback { + + private static int batchCount = 0; + private static final Object lock = new Object(); + + public void onSuccess(AppendRowsResponse response) { + synchronized (lock) { + if (response.hasError()) { + System.out.format("Error: %s\n", response.getError().toString()); + } else { + ++batchCount; + System.out.format("Wrote batch %d\n", batchCount); + } + } + } + + public void onFailure(Throwable throwable) { + System.out.format("Error: %s\n", throwable.toString()); + } +} diff --git a/tutorials/JsonWriterDefaultStream/src/test/java/com/example/JsonWriterDefaultStreamIT.java b/tutorials/JsonWriterDefaultStream/src/test/java/com/example/JsonWriterDefaultStreamIT.java index 3fe67b885d..1c5b2ce0e3 100644 --- a/tutorials/JsonWriterDefaultStream/src/test/java/com/example/JsonWriterDefaultStreamIT.java +++ b/tutorials/JsonWriterDefaultStream/src/test/java/com/example/JsonWriterDefaultStreamIT.java @@ -68,7 +68,7 @@ public void testJsonWriterDefaultStream() throws Exception { System.out.println(dataFilePath.toString()); String[] args = {GOOGLE_CLOUD_PROJECT, datasetName, "github", dataFilePath.toString()}; JsonWriterDefaultStream.main(args); - assertThat(bout.toString()).contains("Appended records successfully."); + assertThat(bout.toString()).contains("Wrote batch"); } @After