diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java index 4a89d6274e..2b86a9194b 100644 --- a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java @@ -32,7 +32,9 @@ import com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings; import com.google.cloud.bigquery.storage.v1.Exceptions; import com.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializationError; +import com.google.cloud.bigquery.storage.v1.Exceptions.MaximumRequestCallbackWaitTimeExceededException; import com.google.cloud.bigquery.storage.v1.Exceptions.StorageException; +import com.google.cloud.bigquery.storage.v1.Exceptions.StreamWriterClosedException; import com.google.cloud.bigquery.storage.v1.JsonStreamWriter; import com.google.cloud.bigquery.storage.v1.TableName; import com.google.common.util.concurrent.MoreExecutors; @@ -150,7 +152,7 @@ private static class DataWriter { private AtomicInteger recreateCount = new AtomicInteger(0); - public void initialize(TableName parentTable) + private JsonStreamWriter createStreamWriter(String tableName) throws DescriptorValidationException, IOException, InterruptedException { // Configure in-stream automatic retry settings. // Error codes that are immediately retried: @@ -165,32 +167,35 @@ public void initialize(TableName parentTable) .setMaxRetryDelay(Duration.ofMinutes(1)) .build(); - // Initialize client without settings, internally within stream writer a new client will be - // created with full settings. - client = BigQueryWriteClient.create(); - // Use the JSON stream writer to send records in JSON format. Specify the table name to write // to the default stream. // For more information about JsonStreamWriter, see: // https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.html - streamWriter = - JsonStreamWriter.newBuilder(parentTable.toString(), client) - .setExecutorProvider( - FixedExecutorProvider.create(Executors.newScheduledThreadPool(100))) - .setChannelProvider( - BigQueryWriteSettings.defaultGrpcTransportProviderBuilder() - .setKeepAliveTime(org.threeten.bp.Duration.ofMinutes(1)) - .setKeepAliveTimeout(org.threeten.bp.Duration.ofMinutes(1)) - .setKeepAliveWithoutCalls(true) - .setChannelsPerCpu(2) - .build()) - .setEnableConnectionPool(true) - // If value is missing in json and there is a default value configured on bigquery - // column, apply the default value to the missing value field. - .setDefaultMissingValueInterpretation( - AppendRowsRequest.MissingValueInterpretation.DEFAULT_VALUE) - .setRetrySettings(retrySettings) - .build(); + return JsonStreamWriter.newBuilder(tableName, client) + .setExecutorProvider(FixedExecutorProvider.create(Executors.newScheduledThreadPool(100))) + .setChannelProvider( + BigQueryWriteSettings.defaultGrpcTransportProviderBuilder() + .setKeepAliveTime(org.threeten.bp.Duration.ofMinutes(1)) + .setKeepAliveTimeout(org.threeten.bp.Duration.ofMinutes(1)) + .setKeepAliveWithoutCalls(true) + .setChannelsPerCpu(2) + .build()) + .setEnableConnectionPool(true) + // If value is missing in json and there is a default value configured on bigquery + // column, apply the default value to the missing value field. + .setDefaultMissingValueInterpretation( + AppendRowsRequest.MissingValueInterpretation.DEFAULT_VALUE) + .setRetrySettings(retrySettings) + .build(); + } + + public void initialize(TableName parentTable) + throws DescriptorValidationException, IOException, InterruptedException { + // Initialize client without settings, internally within stream writer a new client will be + // created with full settings. + client = BigQueryWriteClient.create(); + + streamWriter = createStreamWriter(parentTable.toString()); } public void append(AppendContext appendContext) @@ -199,7 +204,7 @@ public void append(AppendContext appendContext) if (!streamWriter.isUserClosed() && streamWriter.isClosed() && recreateCount.getAndIncrement() < MAX_RECREATE_COUNT) { - streamWriter = JsonStreamWriter.newBuilder(streamWriter.getStreamName(), client).build(); + streamWriter = createStreamWriter(streamWriter.getStreamName()); this.error = null; } // If earlier appends have failed, we need to reset before continuing. @@ -282,6 +287,30 @@ public void onFailure(Throwable throwable) { } } + boolean resendRequest = false; + if (throwable instanceof MaximumRequestCallbackWaitTimeExceededException) { + resendRequest = true; + } else if (throwable instanceof StreamWriterClosedException) { + if (!parent.streamWriter.isUserClosed()) { + resendRequest = true; + } + } + if (resendRequest) { + // Retry this request. + try { + this.parent.append(new AppendContext(appendContext.data)); + } catch (DescriptorValidationException e) { + throw new RuntimeException(e); + } catch (IOException e) { + throw new RuntimeException(e); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + // Mark the existing attempt as done since we got a response for it + done(); + return; + } + synchronized (this.parent.lock) { if (this.parent.error == null) { StorageException storageException = Exceptions.toStorageException(throwable);