From f8fd07ba11e216047de60444887627ad67878b81 Mon Sep 17 00:00:00 2001 From: Yiru Tang Date: Wed, 22 Mar 2023 23:48:57 +0000 Subject: [PATCH] feat: Add sample about writer permanently failed --- .../bigquerystorage/WriteToDefaultStream.java | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java index 8bd384c325..2801a93ba0 100644 --- a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java @@ -39,6 +39,7 @@ import io.grpc.Status.Code; import java.io.IOException; import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.Phaser; import javax.annotation.concurrent.GuardedBy; import org.json.JSONArray; @@ -123,6 +124,7 @@ private static class AppendContext { private static class DataWriter { private static final int MAX_RETRY_COUNT = 3; + private static final int MAX_RECREATE_COUNT = 3; private static final ImmutableList RETRIABLE_ERROR_CODES = ImmutableList.of( Code.INTERNAL, @@ -140,6 +142,8 @@ private static class DataWriter { @GuardedBy("lock") private RuntimeException error = null; + private AtomicInteger recreateCount = new AtomicInteger(0); + public void initialize(TableName parentTable) throws DescriptorValidationException, IOException, InterruptedException { // Use the JSON stream writer to send records in JSON format. Specify the table name to write @@ -153,6 +157,16 @@ public void initialize(TableName parentTable) public void append(AppendContext appendContext) throws DescriptorValidationException, IOException { synchronized (this.lock) { + if (streamWriter.isUserClosed()) { + throw new RuntimeException("Writer is closed by user."); + } + // If stream writer is premenantly failed, try recreate a stream writer. + if (streamWriter.isClosed() && recreateCount.recreateCount() <= MAX_RECREATE_COUNT) { + streamWriter = + JsonStreamWriter.newBuilder(parentTable.toString(), + BigQueryWriteClient.create()).build(); + this.error = null; + } // If earlier appends have failed, we need to reset before continuing. if (this.error != null) { throw this.error; @@ -194,6 +208,7 @@ public AppendCompleteCallback(DataWriter parent, AppendContext appendContext) { public void onSuccess(AppendRowsResponse response) { System.out.format("Append success\n"); + this.parent.recreateCount.set(0); done(); }