Skip to content
Permalink
Browse files
fix: Remove flushAll method (#850)
* .

* .

* .
  • Loading branch information
yirutang committed Feb 19, 2021
1 parent 6021920 commit 33a450286b999c41459f92dd0177239f2a1b1f9a
@@ -23,4 +23,9 @@
<from>com.google.protobuf.DynamicMessage convertJsonToProtoMessage(com.google.protobuf.Descriptors$Descriptor, org.json.JSONObject, boolean)</from>
<to>com.google.protobuf.DynamicMessage convertJsonToProtoMessage(com.google.protobuf.Descriptors$Descriptor, org.json.JSONObject)</to>
</difference>
<difference>
<className>com/google/cloud/bigquery/storage/v1beta2/StreamWriter</className>
<differenceType>7002</differenceType>
<method>void flushAll(long)</method>
</difference>
</differences>
@@ -272,37 +272,9 @@ public ApiFuture<AppendRowsResponse> append(AppendRowsRequest message) {
messagesBatchLock.unlock();
appendAndRefreshAppendLock.unlock();
}

return outstandingAppend.appendResult;
}

/**
* This is the general flush method for asynchronise append operation. When you have outstanding
* append requests, calling flush will make sure all outstanding append requests completed and
* successful. Otherwise there will be an exception thrown.
*
* @throws Exception
*/
public void flushAll(long timeoutMillis) throws Exception {
appendAndRefreshAppendLock.lock();
try {
writeAllOutstanding();
synchronized (messagesWaiter) {
messagesWaiter.waitComplete(timeoutMillis);
}
} finally {
appendAndRefreshAppendLock.unlock();
}
exceptionLock.lock();
try {
if (streamException != null) {
throw new Exception(streamException);
}
} finally {
exceptionLock.unlock();
}
}

/**
* Re-establishes a stream connection.
*
@@ -863,44 +863,6 @@ public void testExistingClient() throws Exception {
client.awaitTermination(1, TimeUnit.MINUTES);
}

@Test
public void testFlushAll() throws Exception {
StreamWriter writer =
getTestStreamWriterBuilder()
.setBatchingSettings(
StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
.toBuilder()
.setElementCountThreshold(2L)
.setDelayThreshold(Duration.ofSeconds(100000))
.build())
.build();

testBigQueryWrite.addResponse(
AppendRowsResponse.newBuilder()
.setAppendResult(
AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build())
.build());
testBigQueryWrite.addResponse(
AppendRowsResponse.newBuilder()
.setAppendResult(
AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(2)).build())
.build());
testBigQueryWrite.addResponse(
AppendRowsResponse.newBuilder()
.setAppendResult(
AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(3)).build())
.build());

ApiFuture<AppendRowsResponse> appendFuture1 = sendTestMessage(writer, new String[] {"A"});
ApiFuture<AppendRowsResponse> appendFuture2 = sendTestMessage(writer, new String[] {"B"});
ApiFuture<AppendRowsResponse> appendFuture3 = sendTestMessage(writer, new String[] {"C"});
assertFalse(appendFuture3.isDone());
writer.flushAll(100000);
assertTrue(appendFuture3.isDone());

writer.close();
}

@Test
public void testDatasetTraceId() throws Exception {
StreamWriter writer =

0 comments on commit 33a4502

Please sign in to comment.