-
Notifications
You must be signed in to change notification settings - Fork 81
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix:shutdown stuck when there is error on the flush path #831
Conversation
…l to release the requests in the queue, causing shutdown to wait forever
Codecov Report
@@ Coverage Diff @@
## master #831 +/- ##
============================================
+ Coverage 81.10% 81.14% +0.04%
Complexity 996 996
============================================
Files 74 74
Lines 5346 5347 +1
Branches 415 415
============================================
+ Hits 4336 4339 +3
+ Misses 839 838 -1
+ Partials 171 170 -1
Continue to review full report at Codecov.
|
@@ -932,6 +932,7 @@ public void onComplete() { | |||
public void onError(Throwable t) { | |||
LOG.fine("OnError called"); | |||
if (streamWriter.shutdown.get()) { | |||
abortInflightRequests(t); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are two more issues:
- Even if you abort the inflight request here, the shutdown can still add more requests through writeAllOutstanding();
- Besides, it is also possible that another thread is calling append, and will contribute another stuck request at https://screenshot.googleplex.com/7pmsyb2zyKDEaGF
...igquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterTest.java
Outdated
Show resolved
Hide resolved
|
||
ApiFuture<AppendRowsResponse> appendFuture1 = sendTestMessage(writer, new String[] {"A"}); | ||
ApiFuture<AppendRowsResponse> appendFuture2 = sendTestMessage(writer, new String[] {"B"}); | ||
Thread.sleep(5000L); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is this for?
Thread.sleep(5000L); | ||
fakeExecutor.advanceTime(Duration.ofSeconds(20)); | ||
// Shutdown writer immediately and there will be some error happened when flushing the queue. | ||
writer.shutdown(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand this. You advance the time before shutdown. So even in the old code path before your fix, you can reduce the only inflight requests in the queue and unblock. Will this unit test block before your fix?
Please think carefully how to write a unit test.
🤖 I have created a release *beep* *boop* --- ### Updating meta-information for bleeding-edge SNAPSHOT release. --- This PR was generated with [Release Please](https://github.com/googleapis/release-please). See [documentation](https://github.com/googleapis/release-please#release-please).
…l to release the requests in the queue, causing shutdown to wait forever
Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:
Fixes #<issue_number_goes_here> ☕️