Skip to content
This repository was archived by the owner on Feb 24, 2026. It is now read-only.

Commit 35ef0ed

Browse files
authored
fix: Avoid setting error on response future twice (#261)
* fix:Avoid setting error to future twice modified: google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java * test fix
1 parent 5b99ddd commit 35ef0ed

File tree

2 files changed

+40
-15
lines changed

2 files changed

+40
-15
lines changed

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -250,10 +250,15 @@ public ApiFuture<AppendRowsResponse> append(AppendRowsRequest message) {
250250
* @throws IOException
251251
*/
252252
public void refreshAppend() throws IOException, InterruptedException {
253+
LOG.info("Establish a write connection.");
253254
synchronized (this) {
254-
Preconditions.checkState(!shutdown.get(), "Cannot shut down on a shut-down writer.");
255+
if (shutdown.get()) {
256+
LOG.warning("Cannot refresh on a already shutdown writer.");
257+
return;
258+
}
255259
// There could be a moment, stub is not yet initialized.
256260
if (clientStream != null) {
261+
LOG.info("Closing the stream");
257262
clientStream.closeSend();
258263
}
259264
messagesBatch.resetAttachSchema();
@@ -348,13 +353,15 @@ private static final class InflightBatch {
348353
final List<AppendRequestAndFutureResponse> inflightRequests;
349354
// A list tracks expected offset for each AppendRequest. Used to reconstruct the Response
350355
// future.
351-
final ArrayList<Long> offsetList;
352-
final long creationTime;
353-
int attempt;
354-
long batchSizeBytes;
355-
long expectedOffset;
356-
Boolean attachSchema;
357-
String streamName;
356+
private final ArrayList<Long> offsetList;
357+
private final long creationTime;
358+
private int attempt;
359+
private long batchSizeBytes;
360+
private long expectedOffset;
361+
private Boolean attachSchema;
362+
private String streamName;
363+
364+
private final AtomicBoolean failed;
358365

359366
InflightBatch(
360367
List<AppendRequestAndFutureResponse> inflightRequests,
@@ -376,6 +383,7 @@ private static final class InflightBatch {
376383
this.batchSizeBytes = batchSizeBytes;
377384
this.attachSchema = attachSchema;
378385
this.streamName = streamName;
386+
this.failed = new AtomicBoolean(false);
379387
}
380388

381389
int count() {
@@ -417,6 +425,13 @@ private AppendRowsRequest getMergedRequest() throws IllegalStateException {
417425
}
418426

419427
private void onFailure(Throwable t) {
428+
if (failed.getAndSet(true)) {
429+
// Error has been set already.
430+
LOG.warning("Ignore " + t.toString() + " since error has already been set");
431+
return;
432+
} else {
433+
LOG.fine("Setting " + t.toString() + " on response");
434+
}
420435
for (AppendRequestAndFutureResponse request : inflightRequests) {
421436
request.appendResult.setException(t);
422437
}
@@ -838,8 +853,10 @@ public void onError(Throwable t) {
838853
}
839854
inflightBatch.onFailure(t);
840855
try {
841-
// Establish a new connection.
842-
streamWriter.refreshAppend();
856+
if (!streamWriter.shutdown.get()) {
857+
// Establish a new connection.
858+
streamWriter.refreshAppend();
859+
}
843860
} catch (IOException | InterruptedException e) {
844861
LOG.info("Failed to establish a new connection");
845862
}

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriterTest.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -424,18 +424,25 @@ public void testFlowControlBehaviorException() throws Exception {
424424
.build())
425425
.build())
426426
.build()) {
427+
assertEquals(
428+
1L,
429+
writer
430+
.getBatchingSettings()
431+
.getFlowControlSettings()
432+
.getMaxOutstandingElementCount()
433+
.longValue());
427434

428435
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(1L).build());
436+
testBigQueryWrite.setResponseDelay(Duration.ofSeconds(10));
429437
ApiFuture<AppendRowsResponse> appendFuture1 = sendTestMessage(writer, new String[] {"A"});
430438
ApiFuture<AppendRowsResponse> appendFuture2 = sendTestMessage(writer, new String[] {"B"});
439+
// Wait is necessary for response to be scheduled before timer is advanced.
440+
Thread.sleep(5000L);
441+
fakeExecutor.advanceTime(Duration.ofSeconds(10));
431442
try {
432443
appendFuture2.get();
433444
Assert.fail("This should fail");
434445
} catch (Exception e) {
435-
if (!e.getMessage().equals("The maximum number of batch elements: 1 have been reached.")) {
436-
LOG.info("More error info:");
437-
e.printStackTrace();
438-
}
439446
assertEquals(
440447
"java.util.concurrent.ExecutionException: The maximum number of batch elements: 1 have been reached.",
441448
e.toString());
@@ -505,6 +512,7 @@ public void testStreamReconnectionExceedRetry() throws Exception {
505512
.setMaxAttempts(1)
506513
.build())
507514
.build();
515+
assertEquals(1, writer.getRetrySettings().getMaxAttempts());
508516
StatusRuntimeException transientError = new StatusRuntimeException(Status.UNAVAILABLE);
509517
testBigQueryWrite.addException(transientError);
510518
testBigQueryWrite.addException(transientError);
@@ -818,7 +826,7 @@ public void testAwaitTermination() throws Exception {
818826
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build());
819827
ApiFuture<AppendRowsResponse> appendFuture1 = sendTestMessage(writer, new String[] {"A"});
820828
writer.shutdown();
821-
assertTrue(writer.awaitTermination(1, TimeUnit.MINUTES));
829+
assertTrue(writer.awaitTermination(2, TimeUnit.MINUTES));
822830
}
823831

824832
@Test

0 commit comments

Comments
 (0)