Skip to content
This repository was archived by the owner on Feb 24, 2026. It is now read-only.

Commit 8856288

Browse files
authored
fix: make awaitTermination and shutdown protected, since we already have close() method, it is confusing to have 3 shutdown methods (#330)
1 parent f553253 commit 8856288

File tree

3 files changed

+27
-40
lines changed

3 files changed

+27
-40
lines changed
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!-- see http://www.mojohaus.org/clirr-maven-plugin/examples/ignored-differences.html -->
3+
<differences>
4+
<!--TODO: To be removed-->
5+
<difference>
6+
<differenceType>7009</differenceType>
7+
<className>com/google/cloud/bigquery/storage/v1alpha2/StreamWriter</className>
8+
<method>void shutdown()</method>
9+
</difference>
10+
<difference>
11+
<differenceType>7009</differenceType>
12+
<className>com/google/cloud/bigquery/storage/v1alpha2/StreamWriter</className>
13+
<method>boolean awaitTermination(long, java.util.concurrent.TimeUnit) </method>
14+
</difference>
15+
</differences>

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -164,14 +164,10 @@ private StreamWriter(Builder builder)
164164
Instant.ofEpochSecond(
165165
stream.getCreateTime().getSeconds(), stream.getCreateTime().getNanos());
166166
if (stream.getType() == Stream.WriteStream.Type.PENDING && stream.hasCommitTime()) {
167-
backgroundResources.shutdown();
168-
backgroundResources.awaitTermination(1, TimeUnit.MINUTES);
169167
throw new IllegalStateException(
170168
"Cannot write to a stream that is already committed: " + streamName);
171169
}
172170
if (createTime.plus(streamTTL).compareTo(Instant.now()) < 0) {
173-
backgroundResources.shutdown();
174-
backgroundResources.awaitTermination(1, TimeUnit.MINUTES);
175171
throw new IllegalStateException(
176172
"Cannot write to a stream that is already expired: " + streamName);
177173
}
@@ -360,7 +356,7 @@ private void writeBatch(final InflightBatch inflightBatch) {
360356
/** Close the stream writer. Shut down all resources. */
361357
@Override
362358
public void close() {
363-
LOG.info("Closing stream writer");
359+
LOG.info("Closing stream writer:" + streamName);
364360
shutdown();
365361
try {
366362
awaitTermination(1, TimeUnit.MINUTES);
@@ -512,10 +508,12 @@ public RetrySettings getRetrySettings() {
512508
* should be invoked prior to deleting the {@link WriteStream} object in order to ensure that no
513509
* pending messages are lost.
514510
*/
515-
public void shutdown() {
516-
Preconditions.checkState(
517-
!shutdown.getAndSet(true), "Cannot shut down a writer already shut-down.");
518-
LOG.info("Shutdown called on writer");
511+
protected void shutdown() {
512+
if (shutdown.getAndSet(true)) {
513+
LOG.fine("Already shutdown.");
514+
return;
515+
}
516+
LOG.fine("Shutdown called on writer");
519517
if (currentAlarmFuture != null && activeAlarm.getAndSet(false)) {
520518
currentAlarmFuture.cancel(false);
521519
}
@@ -535,7 +533,7 @@ public void shutdown() {
535533
*
536534
* <p>Call this method to make sure all resources are freed properly.
537535
*/
538-
public boolean awaitTermination(long duration, TimeUnit unit) throws InterruptedException {
536+
protected boolean awaitTermination(long duration, TimeUnit unit) throws InterruptedException {
539537
return backgroundResources.awaitTermination(duration, unit);
540538
}
541539

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriterTest.java

Lines changed: 4 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -294,8 +294,7 @@ public void testWriteByShutdown() throws Exception {
294294
// Note we are not advancing time or reaching the count threshold but messages should
295295
// still get written by call to shutdown
296296

297-
writer.shutdown();
298-
writer.awaitTermination(10, TimeUnit.SECONDS);
297+
writer.close();
299298

300299
// Verify the appends completed
301300
assertTrue(appendFuture1.isDone());
@@ -407,8 +406,7 @@ public void run() {
407406
// Wait is necessary for response to be scheduled before timer is advanced.
408407
fakeExecutor.advanceTime(Duration.ofSeconds(10));
409408
t.join();
410-
writer.shutdown();
411-
writer.awaitTermination(1, TimeUnit.MINUTES);
409+
writer.close();
412410
}
413411

414412
@Test
@@ -527,8 +525,7 @@ public void testStreamReconnectionExceedRetry() throws Exception {
527525
} catch (ExecutionException e) {
528526
assertEquals(transientError.toString(), e.getCause().getCause().toString());
529527
}
530-
writer.shutdown();
531-
assertTrue(writer.awaitTermination(1, TimeUnit.MINUTES));
528+
writer.close();
532529
}
533530

534531
@Test
@@ -643,7 +640,7 @@ public void testWriterGetters() throws Exception {
643640
.getFlowControlSettings()
644641
.getMaxOutstandingRequestBytes()
645642
.longValue());
646-
writer.shutdown();
643+
writer.close();
647644
}
648645

649646
@Test
@@ -823,29 +820,6 @@ public void testBuilderInvalidArguments() {
823820
}
824821
}
825822

826-
@Test
827-
public void testAwaitTermination() throws Exception {
828-
StreamWriter writer =
829-
getTestStreamWriterBuilder("projects/p/datasets/d/tables/t/streams/await").build();
830-
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build());
831-
ApiFuture<AppendRowsResponse> appendFuture1 = sendTestMessage(writer, new String[] {"AWAIT"});
832-
writer.shutdown();
833-
assertTrue(writer.awaitTermination(1, TimeUnit.MINUTES));
834-
}
835-
836-
@Test
837-
public void testClose() throws Exception {
838-
StreamWriter writer = getTestStreamWriterBuilder().build();
839-
writer.close();
840-
try {
841-
writer.shutdown();
842-
fail("Should throw");
843-
} catch (IllegalStateException e) {
844-
LOG.info(e.toString());
845-
assertEquals("Cannot shut down a writer already shut-down.", e.getMessage());
846-
}
847-
}
848-
849823
@Test
850824
public void testExistingClient() throws Exception {
851825
BigQueryWriteSettings settings =

0 commit comments

Comments
 (0)