Skip to content

Commit

Permalink
fix: StreamWriter hang when we reach the inflight limit control and i…
Browse files Browse the repository at this point in the history
…s doing a retry (#799)

* fix:a hang in StreamWriter when inflight request reached a limit and we try to resend a message

* .

* .

* .

* .

* .
  • Loading branch information
yirutang committed Jan 17, 2021
1 parent 8c4cec8 commit f8f9770
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,14 @@ private StreamWriter(Builder builder)
this.onSchemaUpdateRunnable.setStreamWriter(this);
}

refreshAppend();
bidiStreamingCallable = stub.appendRowsCallable();
clientStream = bidiStreamingCallable.splitCall(responseObserver);
try {
while (!clientStream.isSendReady()) {
Thread.sleep(10);
}
} catch (InterruptedException e) {
}
}

/** Stream name we are writing to. */
Expand Down Expand Up @@ -296,9 +303,9 @@ public void flushAll(long timeoutMillis) throws Exception {
/**
* Re-establishes a stream connection.
*
* @throws IOException
* @throws InterruptedException
*/
public void refreshAppend() throws IOException, InterruptedException {
public void refreshAppend() throws InterruptedException {
appendAndRefreshAppendLock.lock();
if (shutdown.get()) {
LOG.warning("Cannot refresh on a already shutdown writer.");
Expand All @@ -313,11 +320,8 @@ public void refreshAppend() throws IOException, InterruptedException {
messagesBatch.resetAttachSchema();
bidiStreamingCallable = stub.appendRowsCallable();
clientStream = bidiStreamingCallable.splitCall(responseObserver);
try {
while (!clientStream.isSendReady()) {
Thread.sleep(10);
}
} catch (InterruptedException expected) {
while (!clientStream.isSendReady()) {
Thread.sleep(10);
}
Thread.sleep(this.retrySettings.getInitialRetryDelay().toMillis());
// Can only unlock here since need to sleep the full 7 seconds before stream can allow appends.
Expand Down Expand Up @@ -922,41 +926,43 @@ public void onError(Throwable t) {
}
inflightBatch = this.inflightBatches.poll();
}
try {
if (isRecoverableError(t)) {
try {
if (streamWriter.currentRetries < streamWriter.getRetrySettings().getMaxAttempts()
&& !streamWriter.shutdown.get()) {
streamWriter.refreshAppend();
LOG.info("Resending requests on transient error:" + streamWriter.currentRetries);
streamWriter.writeBatch(inflightBatch);
synchronized (streamWriter.currentRetries) {
streamWriter.currentRetries++;
}
} else {
inflightBatch.onFailure(t);
abortInflightRequests(t);
synchronized (streamWriter.currentRetries) {
streamWriter.currentRetries = 0;
}
streamWriter.messagesWaiter.release(inflightBatch.getByteSize());
if (isRecoverableError(t)) {
try {
if (streamWriter.currentRetries < streamWriter.getRetrySettings().getMaxAttempts()
&& !streamWriter.shutdown.get()) {
synchronized (streamWriter.currentRetries) {
streamWriter.currentRetries++;
}
} catch (IOException | InterruptedException e) {
LOG.info("Got exception while retrying.");
inflightBatch.onFailure(e);
abortInflightRequests(e);
LOG.info(
"Try to reestablish connection due to transient error: "
+ t.toString()
+ " retry times: "
+ streamWriter.currentRetries);
streamWriter.refreshAppend();
LOG.info("Resending requests on after connection established");
streamWriter.writeBatch(inflightBatch);
} else {
inflightBatch.onFailure(t);
abortInflightRequests(t);
synchronized (streamWriter.currentRetries) {
streamWriter.currentRetries = 0;
}
}
} else {
inflightBatch.onFailure(t);
abortInflightRequests(t);
} catch (InterruptedException e) {
LOG.info("Got exception while retrying: " + e.toString());
inflightBatch.onFailure(new StatusRuntimeException(Status.ABORTED));
abortInflightRequests(new StatusRuntimeException(Status.ABORTED));
synchronized (streamWriter.currentRetries) {
streamWriter.currentRetries = 0;
}
}
} finally {
streamWriter.messagesWaiter.release(inflightBatch.getByteSize());
} else {
inflightBatch.onFailure(t);
abortInflightRequests(t);
synchronized (streamWriter.currentRetries) {
streamWriter.currentRetries = 0;
}
}
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -855,7 +855,6 @@ public void testFlushAll() throws Exception {
ApiFuture<AppendRowsResponse> appendFuture2 = sendTestMessage(writer, new String[] {"B"});
ApiFuture<AppendRowsResponse> appendFuture3 = sendTestMessage(writer, new String[] {"C"});

assertFalse(appendFuture3.isDone());
writer.flushAll(100000);

assertTrue(appendFuture3.isDone());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -519,21 +519,30 @@ public void testStreamReconnectionTransient() throws Exception {
.toBuilder()
.setDelayThreshold(Duration.ofSeconds(100000))
.setElementCountThreshold(1L)
.setFlowControlSettings(
StreamWriter.Builder.DEFAULT_FLOW_CONTROL_SETTINGS
.toBuilder()
.setMaxOutstandingElementCount(1L)
.setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block)
.build())
.build())
.build();

StatusRuntimeException transientError = new StatusRuntimeException(Status.UNAVAILABLE);
testBigQueryWrite.addException(transientError);
testBigQueryWrite.addResponse(
AppendRowsResponse.newBuilder()
.setAppendResult(
AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build())
.build());
testBigQueryWrite.addException(new StatusRuntimeException(Status.UNAVAILABLE));
testBigQueryWrite.addResponse(
AppendRowsResponse.newBuilder()
.setAppendResult(
AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(1)).build())
.build());
ApiFuture<AppendRowsResponse> future1 = sendTestMessage(writer, new String[] {"m1"});
assertEquals(false, future1.isDone());
// Retry is scheduled to be 7 seconds later.
ApiFuture<AppendRowsResponse> future2 = sendTestMessage(writer, new String[] {"m1"});
assertEquals(0L, future1.get().getAppendResult().getOffset().getValue());
future1.get();
assertEquals(1L, future2.get().getAppendResult().getOffset().getValue());
writer.close();
}

Expand Down

0 comments on commit f8f9770

Please sign in to comment.