Skip to content

Commit

Permalink
fix: Fix a possible NULL PTR after introduced timeout on waitForDone (#…
Browse files Browse the repository at this point in the history
…1638)

* fix: Fix a possible nullptr error when requestCallback is called after we back out with a timeout on waitForDone

* .

* update the structure

* .

* .

* .

* .

* .
  • Loading branch information
yirutang committed May 4, 2022
1 parent 3baa84e commit e1c6ded
Showing 1 changed file with 26 additions and 3 deletions.
Expand Up @@ -38,6 +38,7 @@
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.concurrent.GuardedBy;

Expand Down Expand Up @@ -105,6 +106,12 @@ public class StreamWriter implements AutoCloseable {
@GuardedBy("lock")
private boolean streamConnectionIsConnected = false;

/*
* A boolean to track if we cleaned up inflight queue.
*/
@GuardedBy("lock")
private boolean inflightCleanuped = false;

/*
* Retry threshold, limits how often the connection is retried before processing halts.
*/
Expand Down Expand Up @@ -376,7 +383,8 @@ public void close() {
if (this.ownsBigQueryWriteClient) {
this.client.close();
try {
this.client.awaitTermination(1, TimeUnit.MINUTES);
// Backend request has a 2 minute timeout, so wait a little longer than that.
this.client.awaitTermination(150, TimeUnit.SECONDS);
} catch (InterruptedException ignored) {
}
}
Expand Down Expand Up @@ -465,7 +473,7 @@ private void appendLoop() {
// We can close the stream connection and handle the remaining inflight requests.
if (streamConnection != null) {
this.streamConnection.close();
waitForDoneCallback(1, TimeUnit.MINUTES);
waitForDoneCallback(2, TimeUnit.MINUTES);
}

// At this point, there cannot be more callback. It is safe to clean up all inflight requests.
Expand Down Expand Up @@ -550,6 +558,7 @@ private void cleanupInflightRequests() {
while (!this.inflightRequestQueue.isEmpty()) {
localQueue.addLast(pollInflightRequestQueue());
}
this.inflightCleanuped = true;
} finally {
this.lock.unlock();
}
Expand All @@ -572,7 +581,21 @@ private void requestCallback(AppendRowsResponse response) {
if (conectionRetryCountWithoutCallback != 0) {
conectionRetryCountWithoutCallback = 0;
}
requestWrapper = pollInflightRequestQueue();
if (!this.inflightRequestQueue.isEmpty()) {
requestWrapper = pollInflightRequestQueue();
} else if (inflightCleanuped) {
// It is possible when requestCallback is called, the inflight queue is already drained
// because we timed out waiting for done.
return;
} else {
// This is something not expected, we shouldn't have an empty inflight queue otherwise.
log.log(Level.WARNING, "Unexpected: request callback called on an empty inflight queue.");
connectionFinalStatus =
new StatusRuntimeException(
Status.fromCode(Code.FAILED_PRECONDITION)
.withDescription("Request callback called on an empty inflight queue."));
return;
}
} finally {
this.lock.unlock();
}
Expand Down

0 comments on commit e1c6ded

Please sign in to comment.