Skip to content

Commit

Permalink
feat: Add reconnect support to v1 client lib. (#1446)
Browse files Browse the repository at this point in the history
* fix: update code comment to reflect max size change

* feat: add reconnection support to v1 client

* 🦉 Updates from OwlBot

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
yirutang and gcf-owl-bot[bot] authored Dec 20, 2021
1 parent b8f1edb commit a5157fa
Show file tree
Hide file tree
Showing 4 changed files with 227 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.rpc.FixedHeaderProvider;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.cloud.bigquery.storage.util.Errors;
import com.google.cloud.bigquery.storage.v1.AppendRowsRequest.ProtoData;
import com.google.cloud.bigquery.storage.v1.StreamConnection.DoneCallback;
import com.google.cloud.bigquery.storage.v1.StreamConnection.RequestCallback;
Expand Down Expand Up @@ -90,6 +91,26 @@ public class StreamWriter implements AutoCloseable {
@GuardedBy("lock")
private long inflightBytes = 0;

/*
* Tracks how often the stream was closed due to a retriable error. Streaming will stop when the
* count hits a threshold. Streaming should only be halted, if it isn't possible to establish a
* connection. Keep track of the number of reconnections in succession. This will be reset if
* a row is successfully called back.
*/
@GuardedBy("lock")
private long conectionRetryCountWithoutCallback = 0;

/*
* If false, streamConnection needs to be reset.
*/
@GuardedBy("lock")
private boolean streamConnectionIsConnected = false;

/*
* Retry threshold, limits how often the connection is retried before processing halts.
*/
private static final long RETRY_THRESHOLD = 3;

/*
* Indicates whether user has called Close() or not.
*/
Expand Down Expand Up @@ -173,6 +194,18 @@ private StreamWriter(Builder builder) throws IOException {
this.ownsBigQueryWriteClient = false;
}

this.appendThread =
new Thread(
new Runnable() {
@Override
public void run() {
appendLoop();
}
});
this.appendThread.start();
}

private void resetConnection() {
this.streamConnection =
new StreamConnection(
this.client,
Expand All @@ -188,15 +221,6 @@ public void run(Throwable finalStatus) {
doneCallback(finalStatus);
}
});
this.appendThread =
new Thread(
new Runnable() {
@Override
public void run() {
appendLoop();
}
});
this.appendThread.start();
}

/**
Expand Down Expand Up @@ -331,12 +355,27 @@ public void close() {
* It takes requests from waiting queue and sends them to server.
*/
private void appendLoop() {
boolean isFirstRequestInConnection = true;
Deque<AppendRequestAndResponse> localQueue = new LinkedList<AppendRequestAndResponse>();
boolean streamNeedsConnecting = false;
// Set firstRequestInConnection to true immediately after connecting the steam,
// indicates then next row sent, needs the schema and other metadata.
boolean isFirstRequestInConnection = true;
while (!waitingQueueDrained()) {
this.lock.lock();
try {
hasMessageInWaitingQueue.await(100, TimeUnit.MILLISECONDS);
// Copy the streamConnectionIsConnected guarded by lock to a local variable.
// In addition, only reconnect if there is a retriable error.
streamNeedsConnecting = !streamConnectionIsConnected && connectionFinalStatus == null;
if (streamNeedsConnecting) {
// If the stream connection is broken, any requests on inflightRequestQueue will need
// to be resent, as the new connection has no knowledge of the requests. Copy the requests
// from inflightRequestQueue and prepent them onto the waitinRequestQueue. They need to be
// prepended as they need to be sent before new requests.
while (!inflightRequestQueue.isEmpty()) {
waitingRequestQueue.addFirst(inflightRequestQueue.pollLast());
}
}
while (!this.waitingRequestQueue.isEmpty()) {
AppendRequestAndResponse requestWrapper = this.waitingRequestQueue.pollFirst();
this.inflightRequestQueue.addLast(requestWrapper);
Expand All @@ -355,12 +394,34 @@ private void appendLoop() {
if (localQueue.isEmpty()) {
continue;
}

// TODO: Add reconnection here.
if (streamNeedsConnecting) {
// Set streamConnectionIsConnected to true, to indicate the stream has been connected. This
// should happen before the call to resetConnection. As it is unknown when the connection
// could be closed and the doneCallback called, and thus clearing the flag.
lock.lock();
try {
this.streamConnectionIsConnected = true;
} finally {
lock.unlock();
}
resetConnection();
// Set firstRequestInConnection to indicate the next request to be sent should include
// metedata.
isFirstRequestInConnection = true;
}
while (!localQueue.isEmpty()) {
AppendRowsRequest preparedRequest =
prepareRequestBasedOnPosition(
localQueue.pollFirst().message, isFirstRequestInConnection);
// Send should only throw an exception if there is a problem with the request. The catch
// block will handle this case, and return the exception with the result.
// Otherwise send will return:
// SUCCESS: Message was sent, wait for the callback.
// STREAM_CLOSED: Stream was closed, normally or due to en error
// NOT_ENOUGH_QUOTA: Message wasn't sent due to not enough quota.
// TODO: Handle NOT_ENOUGH_QUOTA.
// In the close case, the request is in the inflight queue, and will either be returned
// to the user with an error, or will be resent.
this.streamConnection.send(preparedRequest);
isFirstRequestInConnection = false;
}
Expand All @@ -369,8 +430,10 @@ private void appendLoop() {
log.fine("Cleanup starts. Stream: " + streamName);
// At this point, the waiting queue is drained, so no more requests.
// We can close the stream connection and handle the remaining inflight requests.
this.streamConnection.close();
waitForDoneCallback();
if (streamConnection != null) {
this.streamConnection.close();
waitForDoneCallback();
}

// At this point, there cannot be more callback. It is safe to clean up all inflight requests.
log.fine(
Expand Down Expand Up @@ -455,6 +518,12 @@ private void requestCallback(AppendRowsResponse response) {
AppendRequestAndResponse requestWrapper;
this.lock.lock();
try {
// Had a successful connection with at least one result, reset retries.
// conectionRetryCountWithoutCallback is reset so that only multiple retries, without
// successful records sent, will cause the stream to fail.
if (conectionRetryCountWithoutCallback != 0) {
conectionRetryCountWithoutCallback = 0;
}
requestWrapper = pollInflightRequestQueue();
} finally {
this.lock.unlock();
Expand All @@ -476,6 +545,14 @@ private void requestCallback(AppendRowsResponse response) {
}
}

private boolean isRetriableError(Throwable t) {
Status status = Status.fromThrowable(t);
if (Errors.isRetryableInternalStatus(status)) {
return true;
}
return status.getCode() == Status.Code.ABORTED || status.getCode() == Status.Code.UNAVAILABLE;
}

private void doneCallback(Throwable finalStatus) {
log.fine(
"Received done callback. Stream: "
Expand All @@ -484,7 +561,26 @@ private void doneCallback(Throwable finalStatus) {
+ finalStatus.toString());
this.lock.lock();
try {
this.connectionFinalStatus = finalStatus;
this.streamConnectionIsConnected = false;
if (connectionFinalStatus == null) {
// If the error can be retried, don't set it here, let it try to retry later on.
if (isRetriableError(finalStatus)
&& conectionRetryCountWithoutCallback < RETRY_THRESHOLD
&& !userClosed) {
this.conectionRetryCountWithoutCallback++;
log.fine(
"Retriable error "
+ finalStatus.toString()
+ " received, retry count "
+ conectionRetryCountWithoutCallback
+ " for stream "
+ streamName);
} else {
this.connectionFinalStatus = finalStatus;
log.info(
"Stream finished with error " + finalStatus.toString() + " for stream " + streamName);
}
}
} finally {
this.lock.unlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,22 @@ public void reset() {
serviceImpl.reset();
}

public void setResponseDelay(Duration delay) {
serviceImpl.setResponseDelay(delay);
}

public void setResponseSleep(Duration sleep) {
serviceImpl.setResponseSleep(sleep);
}

public void setCloseEveryNAppends(long closeAfter) {
serviceImpl.setCloseEveryNAppends(closeAfter);
}

public void setTimesToClose(long numberTimesToClose) {
serviceImpl.setTimesToClose(numberTimesToClose);
}

public long getConnectionCount() {
return serviceImpl.getConnectionCount();
}

public void setExecutor(ScheduledExecutorService executor) {
serviceImpl.setExecutor(executor);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.google.common.base.Optional;
import com.google.common.util.concurrent.Uninterruptibles;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -45,11 +46,16 @@ class FakeBigQueryWriteImpl extends BigQueryWriteGrpc.BigQueryWriteImplBase {
private final AtomicInteger nextMessageId = new AtomicInteger(1);
private boolean autoPublishResponse;
private ScheduledExecutorService executor = null;
private Duration responseDelay = Duration.ZERO;

private Duration responseSleep = Duration.ZERO;
private Semaphore responseSemaphore = new Semaphore(0, true);

private long numberTimesToClose = 0;
private long closeAfter = 0;
private long recordCount = 0;
private long connectionCount = 0;
private boolean firstRecord = false;

/** Class used to save the state of a possible response. */
private static class Response {
Optional<AppendRowsResponse> appendResponse;
Expand Down Expand Up @@ -120,38 +126,51 @@ public void waitForResponseScheduled() throws InterruptedException {
responseSemaphore.acquire();
}

/* Return the number of times the stream was connected. */
public long getConnectionCount() {
return connectionCount;
}

@Override
public StreamObserver<AppendRowsRequest> appendRows(
final StreamObserver<AppendRowsResponse> responseObserver) {
this.connectionCount++;
this.firstRecord = true;
StreamObserver<AppendRowsRequest> requestObserver =
new StreamObserver<AppendRowsRequest>() {
@Override
public void onNext(AppendRowsRequest value) {
LOG.fine("Get request:" + value.toString());
final Response response = responses.remove();
requests.add(value);
recordCount++;
if (responseSleep.compareTo(Duration.ZERO) > 0) {
LOG.info("Sleeping before response for " + responseSleep.toString());
LOG.fine("Sleeping before response for " + responseSleep.toString());
Uninterruptibles.sleepUninterruptibly(
responseSleep.toMillis(), TimeUnit.MILLISECONDS);
}
if (responseDelay == Duration.ZERO) {
sendResponse(response, responseObserver);
if (firstRecord) {
if (!value.getProtoRows().hasWriterSchema() || value.getWriteStream().isEmpty()) {
LOG.info(
String.valueOf(
!value.getProtoRows().hasWriterSchema()
|| value.getWriteStream().isEmpty()));
responseObserver.onError(
Status.INVALID_ARGUMENT
.withDescription("Unexpected first request: " + value.toString())
.asException());
return;
}
}
firstRecord = false;
if (closeAfter > 0
&& recordCount % closeAfter == 0
&& (numberTimesToClose == 0 || connectionCount <= numberTimesToClose)) {
LOG.info("Shutting down connection from test...");
responseObserver.onError(Status.ABORTED.asException());
} else {
final Response responseToSend = response;
// TODO(yirutang): This is very wrong because it messes up response/complete ordering.
LOG.fine("Schedule a response to be sent at delay");
executor.schedule(
new Runnable() {
@Override
public void run() {
sendResponse(responseToSend, responseObserver);
}
},
responseDelay.toMillis(),
TimeUnit.MILLISECONDS);
final Response response = responses.remove();
sendResponse(response, responseObserver);
}
responseSemaphore.release();
}

@Override
Expand Down Expand Up @@ -183,12 +202,6 @@ public FakeBigQueryWriteImpl setExecutor(ScheduledExecutorService executor) {
return this;
}

/** Set an amount of time by which to delay publish responses. */
public FakeBigQueryWriteImpl setResponseDelay(Duration responseDelay) {
this.responseDelay = responseDelay;
return this;
}

/** Set an amount of time by which to sleep before publishing responses. */
public FakeBigQueryWriteImpl setResponseSleep(Duration responseSleep) {
this.responseSleep = responseSleep;
Expand Down Expand Up @@ -231,4 +244,29 @@ public void reset() {
requests.clear();
responses.clear();
}

/* Abort the stream after N records. The primary use case is to test the retry logic. After N
* records are sent, the stream will be aborted with Code.ABORTED. This is a retriable error.
* The abort will call the onDone callback immediately, and thus potentially losing some messages
* that have already been sent. If the value of closeAfter is too small, the client might not get
* a chance to process any records before a subsequent abort is sent. Which means multiple retries
* in a row on the client side. After 3 retries in a row the write will fail.
* closeAfter should be large enough to give the client some opportunity to receive some of the
* messages.
**/
public void setCloseEveryNAppends(long closeAfter) {
this.closeAfter = closeAfter;
}
/* If setCloseEveryNAppends is greater than 0, then the stream will be aborted every N appends.
* setTimesToClose will limit the number of times to do the abort. If it is set to 0, it will
* abort every N appends.
* The primary use cases is, send a couple of records, then abort. But if there are only a couple
* of records, it is possible these two records are sent, then the abort happens before those two
* records are processed by the client, requiring them to be sent again, and thus a potential
* infinite loop. Therefore set the times to close to 1. This will send the two records, force
* an abort an retry, and then reprocess the records to completion.
**/
public void setTimesToClose(long numberTimesToClose) {
this.numberTimesToClose = numberTimesToClose;
}
}
Loading

0 comments on commit a5157fa

Please sign in to comment.