Skip to content

Commit

Permalink
feat: add isUserClosed to indicate that user explicitly closed the St…
Browse files Browse the repository at this point in the history
…reamWriter (#1983)

To be used by the sample here:
https://togithub.com/googleapis/java-bigquerystorage/pull/1982/files

Also change the name of isDone to be isClosed, since we use StreamWriterClosedException to indicate a writer is shutdown for writing. Making the terms more consistent with each other.
  • Loading branch information
yirutang committed Feb 8, 2023
1 parent cd1dc60 commit abd6627
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 11 deletions.
11 changes: 11 additions & 0 deletions google-cloud-bigquerystorage/clirr-ignored-differences.xml
Expand Up @@ -147,4 +147,15 @@
<className>com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool</className>
<method>ConnectionWorkerPool(long, long, java.time.Duration, com.google.api.gax.batching.FlowController$LimitExceededBehavior, java.lang.String, com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings)</method>
</difference>
<difference>
<differenceType>7002</differenceType>
<className>com/google/cloud/bigquery/storage/v1/StreamWriter</className>
<method>boolean isDone()</method>
</difference>
<difference>
<differenceType>7002</differenceType>
<className>com/google/cloud/bigquery/storage/v1/JsonStreamWriter</className>
<method>boolean isDone()</method>
</difference>
</differences>

Expand Up @@ -387,8 +387,13 @@ public void close() {
* JsonStreamWriter is explicitly closed or the underlying connection is broken when
* connection pool is not used. Client should recreate JsonStreamWriter in this case.
*/
public boolean isDone() {
return this.streamWriter.isDone();
public boolean isClosed() {
return this.streamWriter.isClosed();
}

/** @return if user explicitly closed the writer. */
public boolean isUserClosed() {
return this.streamWriter.isUserClosed();
}

public static final class Builder {
Expand Down
Expand Up @@ -425,7 +425,7 @@ public String getLocation() {
* StreamWriter is explicitly closed or the underlying connection is broken when connection
* pool is not used. Client should recreate StreamWriter in this case.
*/
public boolean isDone() {
public boolean isClosed() {
if (singleConnectionOrConnectionPool.getKind() == Kind.CONNECTION_WORKER) {
return userClosed.get()
|| singleConnectionOrConnectionPool.connectionWorker().isConnectionInUnrecoverableState();
Expand All @@ -435,6 +435,11 @@ public boolean isDone() {
}
}

/** @return if user explicitly closed the writer. */
public boolean isUserClosed() {
return userClosed.get();
}

/** Close the stream writer. Shut down all resources. */
@Override
public void close() {
Expand Down
Expand Up @@ -1141,10 +1141,11 @@ public void testWriterId()

@Test
public void testIsDone() throws DescriptorValidationException, IOException, InterruptedException {
JsonStreamWriter writer1 = getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA).build();
Assert.assertFalse(writer1.isDone());
writer1.close();
Assert.assertTrue(writer1.isDone());
JsonStreamWriter writer = getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA).build();
Assert.assertFalse(writer.isClosed());
writer.close();
Assert.assertTrue(writer.isClosed());
Assert.assertTrue(writer.isUserClosed());
}

private AppendRowsResponse createAppendResponse(long offset) {
Expand Down
Expand Up @@ -1237,7 +1237,7 @@ public void testStreamWriterUserCloseMultiplexing() throws Exception {
.build();

writer.close();
assertTrue(writer.isDone());
assertTrue(writer.isClosed());
ApiFuture<AppendRowsResponse> appendFuture1 = sendTestMessage(writer, new String[] {"A"});
ExecutionException ex =
assertThrows(
Expand All @@ -1248,6 +1248,7 @@ public void testStreamWriterUserCloseMultiplexing() throws Exception {
assertEquals(
Status.Code.FAILED_PRECONDITION,
((StatusRuntimeException) ex.getCause()).getStatus().getCode());
assertTrue(writer.isUserClosed());
}

@Test(timeout = 10000)
Expand All @@ -1256,7 +1257,7 @@ public void testStreamWriterUserCloseNoMultiplexing() throws Exception {
StreamWriter.newBuilder(TEST_STREAM_1, client).setWriterSchema(createProtoSchema()).build();

writer.close();
assertTrue(writer.isDone());
assertTrue(writer.isClosed());
ApiFuture<AppendRowsResponse> appendFuture1 = sendTestMessage(writer, new String[] {"A"});
ExecutionException ex =
assertThrows(
Expand All @@ -1267,6 +1268,7 @@ public void testStreamWriterUserCloseNoMultiplexing() throws Exception {
assertEquals(
Status.Code.FAILED_PRECONDITION,
((StatusRuntimeException) ex.getCause()).getStatus().getCode());
assertTrue(writer.isUserClosed());
}

@Test(timeout = 10000)
Expand All @@ -1291,7 +1293,8 @@ public void testStreamWriterPermanentErrorMultiplexing() throws Exception {
appendFuture2.get();
});
assertTrue(ex.getCause() instanceof InvalidArgumentException);
assertFalse(writer.isDone());
assertFalse(writer.isClosed());
assertFalse(writer.isUserClosed());
}

@Test(timeout = 10000)
Expand All @@ -1311,7 +1314,8 @@ public void testStreamWriterPermanentErrorNoMultiplexing() throws Exception {
() -> {
appendFuture2.get();
});
assertTrue(writer.isDone());
assertTrue(writer.isClosed());
assertTrue(ex.getCause() instanceof InvalidArgumentException);
assertFalse(writer.isUserClosed());
}
}

0 comments on commit abd6627

Please sign in to comment.