Server streaming retries take 2#463
Conversation
| } | ||
| } | ||
|
|
||
| public static class WrappedApiException extends RuntimeException { |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
| } | ||
| } | ||
|
|
||
| public static class WrappedCancellationException extends RuntimeException { |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
|
@garrettjonesgoogle |
|
At a cursory look I have no problem with the approach. |
92c5a37 to
4804885
Compare
Codecov Report
@@ Coverage Diff @@
## master #463 +/- ##
============================================
- Coverage 71.01% 70.34% -0.68%
- Complexity 743 773 +30
============================================
Files 156 162 +6
Lines 3340 3672 +332
Branches 240 275 +35
============================================
+ Hits 2372 2583 +211
- Misses 875 973 +98
- Partials 93 116 +23
Continue to review full report at Codecov.
|
|
This is ready for review |
82d01af to
f4e04d3
Compare
| innerAttemptFuture = new NonCancellableFuture<>(); | ||
| seenSuccessSinceLastError = false; | ||
|
|
||
| // TODO: watchdog |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
| * when the retry needs to happen, and the stream resumption strategy composes the request to resume | ||
| * the stream. To turn off retries, set the retryable codes to the empty set. | ||
| * | ||
| * <p>The retry settings have slightly different semantics here: |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
|
Also, update the PR description to match the current classnames in the PR |
|
done |
b110fe1 to
1dc0207
Compare
There was a problem hiding this comment.
Overall looks good and I don't have any major concerns here. The two relatively big concerns here are:
-
the status of watchdog logic (I believe there will a a subsequent PR for this)
-
Please check that
RetryingFuture#peekAttemptResult()andRetryingFuture#getAttemptResult()are not completelly broken for the streaming case (it seems that the future is not exposed to user, so in that case it should be fine, he/she will just never call them)
| new GrpcExceptionServerStreamingCallable<>( | ||
| callable, streamingCallSettings.getRetryableCodes()); | ||
|
|
||
| if (!streamingCallSettings.getRetryableCodes().isEmpty() |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
| } | ||
|
|
||
| void cancelPrivately() { | ||
| public void cancelPrivately() { |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
| * | ||
| * <p>For internal use only - public for technical reasons. | ||
| */ | ||
| @InternalApi("For internal use only") |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
|
|
||
| StreamingRetryAlgorithm<Void> retryAlgorithm = | ||
| new StreamingRetryAlgorithm<>( | ||
| new ApiResultRetryAlgorithm<Void>(), |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
|
|
||
| /** | ||
| * A callable that generates Server Streaming RPC calls. At any one time, it is responsible for at | ||
| * most a single RPC. During an attempt, it proxies all incoming message to the outer {@link |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
| private boolean seenSuccessSinceLastError; | ||
| private NonCancellableFuture<Void> innerAttemptFuture; | ||
|
|
||
| /** Constructs a new instances. */ |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
| outerObserver.onStart( | ||
| new StreamController() { | ||
| @Override | ||
| public void disableAutoInboundFlowControl() { |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
| } | ||
|
|
||
| @Override | ||
| public void call( |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
| private StreamController innerController; | ||
|
|
||
| private boolean seenSuccessSinceLastError; | ||
| private NonCancellableFuture<Void> innerAttemptFuture; |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
| } | ||
|
|
||
| boolean setPrivately(ResponseT value) { | ||
| public boolean setPrivately(ResponseT value) { |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
|
Thanks for reviewing! Watchdog: The logic should remain intact, my only plan is to move the scheduling out of the callable. There are 2 possibilities:
I would like to deal with this question outside of this PR so that I can unblock myself on googleapis/google-cloud-java#2849 RetryingFuture#peekAttemptResult() and RetryingFuture#getAttemptResult(): the retrying future is not exposed to the caller, so that functionality is not needed I think I responded to all feedback, PTAL |
garrettjonesgoogle
left a comment
There was a problem hiding this comment.
LGTM - just a couple minor wording / naming fixes
|
|
||
| // Unwrap | ||
| if (prevThrowable instanceof ServerStreamingAttemptException) { | ||
| ServerStreamingAttemptException wrapper = (ServerStreamingAttemptException) prevThrowable; |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
| * <p>This class includes settings that are applicable to all server streaming calls, which | ||
| * currently just includes retries. | ||
| * | ||
| * <p>Retry configuration allows for the stream to be restarted and resumed. it is composed of 3 |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
| * <p>The retry settings have slightly different semantics when compared to unary RPCs: | ||
| * | ||
| * <ul> | ||
| * <li>retry delays are reset to initial value as soon as a response is received. |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
| * <li>RPC timeouts apply to the time interval between caller demanding more responses via {@link | ||
| * StreamController#request(int)} and the {@link ResponseObserver} receiving the message. | ||
| * <li>RPC timeouts are best effort and are checked once every {@link #timeoutCheckInterval}. | ||
| * <li>Attempt counts are reset as soon as a response is received. So max attempts is the maximum |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
|
Done & verified with the in progress bigtable client. Please merge when ready. |
This enables retries that were added to gax in googleapis/gax-java#463. Server streaming api can be treated the same as unary apis before the first response is received. Allowing GAPIC clients to automatically retry RPCs. This PR exposes the retry settings that were configured in gapic yaml file to the java stub settings. This will affect the codegen for 3 clients: - firestore - will enable retries for batchGetDocuments & runQuery. Which will bring its behavior closer to the golang client - bigtable - the handwritten overlay will overwrite these settings - spanner - doesn't currently use gapic client, so will ignore the new settings
This enables retries that were added to gax in googleapis/gax-java#463. Server streaming api can be treated the same as unary apis before the first response is received. Allowing GAPIC clients to automatically retry RPCs. This PR exposes the retry settings that were configured in gapic yaml file to the java stub settings. This will affect the codegen for 3 clients: - firestore - will enable retries for batchGetDocuments & runQuery. Which will bring its behavior closer to the golang client - bigtable - the handwritten overlay will overwrite these settings - spanner - doesn't currently use gapic client, so will ignore the new settings
Add retries and resume support.
This is re-write of #449 and the last part of #433
Conceptually, retries for streams can be split into 2 phases: before the first response is observed and after. Before the first response is observed, retries can be handled automatically similar to unary RPCs. After the first response is observed, manual work needs to done to recalculate the retry request. This work is delegated to a StreamResumptionStrategy, which is notified of incoming responses and can be asked to build a resume request on failure. Gax should offer out of box support for the first kind of retries and allow client developers to manually add support for resumes via handwritten code. I envision the workflow for client developers as: add retry codes & settings to GAPIC yaml to enable simple retries, then implement a StreamTracker and set it on the ServerStreamingCallSettings.
This PR also implements timeouts for RPCs. The existing concepts of RPC & total timeouts are extended: RPC timeout now limits the time interval between a consumer signaling demand for the next response via StreamController#request() and receiving a response in ResponseObserver#onResponse. Total timeout limits the total duration of the stream from the initial call() until the last onComplete/onError of the last retry attempt. Furthermore, the concept of idle timeout is added to streams with manual flow control, this limits the time between last observed consumer activity (the time between finishing processing a response in onResponse to the next call to request). This is meant to address the possibility of the caller forgetting to cancel a partially read stream.
Since GRPC does not implement the concept of RPC timeouts for streams, this PR implements it as a watchdog helper. The helper tracks activity per stream and schedules periodic garbage collection sweeps. So stream RPC timeouts are a lot looser than their unary counterparts. Furthermore, the helper needs to receive more information from its callers than is currently available: it needs to be able to accept both an rpc timeout and a total stream deadline. I couldn't figure out how to generalize this and extend the ApiContext to carry this information. So the watchdog helper steps outside the callable chain pattern and is implemented as a factory for ResponseObserver decorators.