-
Notifications
You must be signed in to change notification settings - Fork 17
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix problems in streams that were identified by conformance tests #210
Conversation
…rface; conformance adapters use exceptions instead of Result<T> so need try/catch
// transform the error in a network interceptor to prevent okhttp from | ||
// retrying it. | ||
if (resp.code == 408 && isConnectUnary(chain.request())) { | ||
val contentType = resp.headers["Content-Type"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
val contentType = resp.headers["Content-Type"] | |
val mediaType = resp.headers["Content-Type"]?.toMediaTypeOrNull() |
OkHttp provides a useful class to parse content types - then you can use type/subtype to check for JSON and not worry about parsing parameters.
if (resp.code == 408 && isConnectUnary(chain.request())) { | ||
val contentType = resp.headers["Content-Type"] | ||
if (contentType == "application/json" || contentType == "application/json; charset=utf-8") { | ||
return resp.newBuilder().code(499).build() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess if you want to restore the 408 status code at the end you could have an interceptor (not network interceptor) that restores it if it was set here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead, I've added a way to get the original code and now store that in connect's HTTPResponse
, and also use it when computing a Connect RPC error code from an HTTP status code. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great!
private fun isConnectUnary(req: Request): Boolean { | ||
return when (req.method) { | ||
"POST" -> req.headers[CONNECT_PROTOCOL_VERSION_KEY].orEmpty() == CONNECT_PROTOCOL_VERSION_VALUE && | ||
req.headers["Content-Type"].orEmpty().startsWith("application/") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another place which could use MediaType.type if you wanted.
// For non-duplex request bodies, okhttp3 | ||
// expects this method to return only when | ||
// the request body is complete. | ||
closed.await() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this necessary? I don't see this logic in any of the other implementations of RequestBody or documented in the contract for writeTo.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's because other implementations of RequestBody
aren't pipes, where they are awaiting data from another writer thread.
It is in fact documented: https://github.com/square/okhttp/blob/ee9ebba20a39e6e4cdd78cb74f488728e49de942/okhttp/src/main/kotlin/okhttp3/RequestBody.kt#L71-L78
Basically, writeTo
is expected to write the entire request body to the given sink. But what was here before, just pipe.fold()
does not do that. It only copies any data to the sink that has already been written and buffered in the pipe and then returns. So to make sure that the entire request body is written, we must wait for the writer to signal the end via closing.
Duplex bodies are the exception: in that case, this method is free to return early and continue writing to the given sink from another thread.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sadly, I never saw that doc. So figuring this out was a bear.
I first added tracing to the conformance test runner, so I could see the full HTTP request and response on failures. That's how I saw that sometimes the server saw an empty body with zero requests (and less often, a non-empty body with too few requests).
That's when I started tracking down the calls to send
to figure out what was failing. That resulted in the first commit in this PR (correctly propagating failures in a Result<Unit>
up to the caller and then checking the result and throwing on failure in the conformance client). That allowed me to see that calls to stream.send
were failing with a "closed" exception.
So then I had to track down where the heck it was being closed. (You'll see a bunch of tracing added in #212, including ability to log stack traces, which helped me find it.) And it was being called from here.
That's when I realized the crux of the issue -- the body had to be completely written here before returning.
Another formulation I considered here was something like so:
if (duplex) {
pipe.fold(sink)
return
}
sink.writeAll(pipe.source)
But that ends up buffering in the pipe and needing a little more overhead on each write, to go through the pipe's sink-to-source exchange, from the writer thread to this one. Whereas pipe.fold
allows subsequent writes to go directly to the new destination sink. So it seemed better to keep using pipe.fold
. 🤷
…ode available to application code
okhttp/src/main/kotlin/com/connectrpc/okhttp/ConnectOkHttpClient.kt
Outdated
Show resolved
Hide resolved
Co-authored-by: Philip K. Warren <pkwarren@users.noreply.github.com>
This includes a fix to the recently observed flakiness in client streams, which was introduced (in #196).
The problems are described below. The fixes are each in their own commit, so reviewing commit-by-commit might make this PR easier to read.
There was an issue in the use of
Result<Unit>
as the return type forsend
operations. Since the return type (Unit
) is really a void/no-return type, calling code was never checking the result. That means that when the operation failed, it was never noticed. I'm a little surprised that we don't have linters or warnings for calls to functions that returnResult
where that return value is ignored.It turns out that this was not just a problem in my calling code, failing to check the return value for failure, but in the framework itself: the stream wrapper in
ProtocolClient
(wrapping underlying stream returned byConnectOkHttpClient
) was using anonSend
callback that called the underlying stream'ssend
. But theonSend
callback simply returnedUnit
instead ofResult<Unit>
, and the method that propagated the result wasn't checking the result and throwing.I think this is the biggest commit here, and it's because I did some overhauling of
Stream
. For one, I changed it to an interface -- mainly so that we could apply a decorator pattern to it andHTTPClientInterface
(more on that in a later PR). This makes the wrapper inProtocolClient
simpler -- instead of it being a full implementation, with its own atomic booleans to guard/track the close operations, it just delegates to the underlying implementation.The Connect unary protocol can return a 408 status code for "canceled" and "deadline exceeded" RPC errors. But okhttp auto-retries this status code, even though the requests are not idempotent (i.e. even for POST calls). This isn't an issue with the stream conformance tests, but was noticed later after I added an extra check to the reference server so that it catches cases where a client sends an RPC for the same test case more than once. This commit adds a network interceptor to the
OkHttpClient
that will identify 408 responses that look like Connect unary responses and change their status code to 499. That is the only way I could find to prevent the retries.The recently introduced flakiness in client streams is actually a rather severe issue. It was mainly observed in the new conformance suite with server streams when gzip was used, because it was all due to race conditions and the gzip operations would slow down the producer thread just enough to tickle the issue. The problem is that the
RequestBody.writeTo
function should not return before the request body is finished when the request body is not duplex. But it was callingpipe.fold
and then immediately returning. Thefold
method swaps in a new sink in place of the read-side of the pipe and then returns, without waiting for the pipe's write side to complete. So now we use aCountDownLatch
to wait until the writer is complete (which is signaled via a call toclose
).The last issue I encountered was much less frequent, and also turned out to be a race condition. It was caused by a concurrency bug in
okio.Pipe
(Pipe.fold: destination sink can remain open even after pipe closed square/okio#1412). Basically, some duplex operations (i.e. bidi RPCs) would infrequently timeout because, even though the stream writer had closed the pipe, the HTTP request body incorrectly remained open. I've opened a PR with a fix in theokio
library, but I've also added a work-around for now in the code here, by using extra synchronization between the calls towrite
,close
, andfold
.