Skip to content

Commit

Permalink
fix: Add unit test for closing disconnected streamwriter. Also reduce…
Browse files Browse the repository at this point in the history
… wait from 5->3 minutes (#1751)
  • Loading branch information
gnanda committed Aug 18, 2022
1 parent 7db3647 commit 095d7d5
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 1 deletion.
Expand Up @@ -506,7 +506,7 @@ private void appendLoop() {
// We can close the stream connection and handle the remaining inflight requests.
if (streamConnection != null) {
this.streamConnection.close();
waitForDoneCallback(5, TimeUnit.MINUTES);
waitForDoneCallback(3, TimeUnit.MINUTES);
}

// At this point, there cannot be more callback. It is safe to clean up all inflight requests.
Expand Down
Expand Up @@ -702,4 +702,22 @@ public void testWriterId()
Assert.assertFalse(writer2.getWriterId().isEmpty());
Assert.assertNotEquals(writer1.getWriterId(), writer2.getWriterId());
}

// Timeout to ensure close() doesn't wait for done callback timeout.
@Test(timeout = 10000)
public void testCloseDisconnectedStream() throws Exception {
StreamWriter writer =
StreamWriter.newBuilder(TEST_STREAM)
.setCredentialsProvider(NoCredentialsProvider.create())
.setChannelProvider(serviceHelper.createChannelProvider())
.setWriterSchema(createProtoSchema())
.build();

testBigQueryWrite.addResponse(createAppendResponse(0));
ApiFuture<AppendRowsResponse> appendFuture1 = sendTestMessage(writer, new String[] {"A"});
assertEquals(0, appendFuture1.get().getAppendResult().getOffset().getValue());
serviceHelper.stop();
// Ensure closing the writer after disconnect succeeds.
writer.close();
}
}

0 comments on commit 095d7d5

Please sign in to comment.