Skip to content

Commit

Permalink
Fix problems in streams that were identified by conformance tests (#210)
Browse files Browse the repository at this point in the history
This includes a fix to the recently observed flakiness in client
streams, which was introduced (in #196).

The problems are described below. The fixes are each in their own
commit, so reviewing commit-by-commit might make this PR easier to read.

1. There was an issue in the use of `Result<Unit>` as the return type
for `send` operations. Since the return type (`Unit`) is really a
void/no-return type, calling code was never checking the result. That
means that when the operation failed, it was never noticed. I'm a little
surprised that we don't have linters or warnings for calls to functions
that return `Result` where that return value is ignored.

It turns out that this was not just a problem in my calling code,
failing to check the return value for failure, but in the framework
itself: the stream wrapper in `ProtocolClient` (wrapping underlying
stream returned by `ConnectOkHttpClient`) was using an `onSend` callback
that called the underlying stream's `send`. But the `onSend` callback
simply returned `Unit` instead of `Result<Unit>`, and the method that
propagated the result wasn't checking the result and throwing.
    
I think this is the biggest commit here, and it's because I did some
overhauling of `Stream`. For one, I changed it to an interface -- mainly
so that we could apply a decorator pattern to it and
`HTTPClientInterface` (more on that in a later PR). This makes the
wrapper in `ProtocolClient` simpler -- instead of it being a full
implementation, with its own atomic booleans to guard/track the close
operations, it just delegates to the underlying implementation.
    
2. The Connect unary protocol can return a 408 status code for
"canceled" and "deadline exceeded" RPC errors. But okhttp auto-retries
this status code, even though the requests are not idempotent (i.e. even
for POST calls). This isn't an issue with the stream conformance tests,
but was noticed later after I added an extra check to the reference
server so that it catches cases where a client sends an RPC for the same
test case more than once. This commit adds a network interceptor to the
`OkHttpClient` that will identify 408 responses that look like Connect
unary responses and change their status code to 499. That is the only
way I could find to prevent the retries.

3. The recently introduced flakiness in client streams is actually a
rather severe issue. It was mainly observed in the new conformance suite
with server streams when gzip was used, because it was all due to race
conditions and the gzip operations would slow down the producer thread
just enough to tickle the issue. The problem is that the
`RequestBody.writeTo` function should not return before the request body
is finished when the request body is not duplex. But it was calling
`pipe.fold` and then immediately returning. The `fold` method swaps in a
new sink in place of the read-side of the pipe and then returns, without
waiting for the pipe's write side to complete. So now we use a
`CountDownLatch` to wait until the writer is complete (which is signaled
via a call to `close`).

4. The last issue I encountered was much less frequent, and also turned
out to be a race condition. It was caused by a concurrency bug in
`okio.Pipe` (square/okio#1412). Basically,
some duplex operations (i.e. bidi RPCs) would infrequently timeout
because, even though the stream writer had closed the pipe, the HTTP
request body incorrectly remained open. I've opened a PR with a fix in
the `okio` library, but I've also added a work-around for now in the
code here, by using extra synchronization between the calls to `write`,
`close`, and `fold`.
  • Loading branch information
jhump committed Jan 31, 2024
1 parent 5b05d1b commit 868181b
Show file tree
Hide file tree
Showing 12 changed files with 376 additions and 207 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,16 @@ class JavaServerStreamClient(
) {
override suspend fun execute(req: ServerStreamRequest, headers: Headers): ResponseStream<ServerStreamResponse> {
val stream = client.serverStream(headers)
stream.sendAndClose(req)
val sendResult: Result<Unit>
try {
sendResult = stream.sendAndClose(req)
if (sendResult.isFailure) {
throw sendResult.exceptionOrNull()!!
}
} catch (ex: Throwable) {
stream.receiveClose()
throw ex
}
return ResponseStream.new(stream)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,16 @@ class JavaLiteServerStreamClient(
) {
override suspend fun execute(req: ServerStreamRequest, headers: Headers): ResponseStream<ServerStreamResponse> {
val stream = client.serverStream(headers)
stream.sendAndClose(req)
val sendResult: Result<Unit>
try {
sendResult = stream.sendAndClose(req)
if (sendResult.isFailure) {
throw sendResult.exceptionOrNull()!!
}
} catch (ex: Throwable) {
stream.receiveClose()
throw ex
}
return ResponseStream.new(stream)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -162,35 +162,39 @@ class Client(
throw RuntimeException("client stream calls can only support `BeforeCloseSend` and 'AfterCloseSendMs' cancellation field, instead got ${req.cancel!!::class.simpleName}")
}
val stream = client.execute(req.requestHeaders)
var numUnsent = 0
for (i in req.requestMessages.indices) {
if (req.requestDelayMs > 0) {
delay(req.requestDelayMs.toLong())
}
val msg = fromAny(req.requestMessages[i], client.reqTemplate, CLIENT_STREAM_REQUEST_NAME)
try {
stream.send(msg)
} catch (_: Exception) {
numUnsent = req.requestMessages.size - i
break
}
}
when (val cancel = req.cancel) {
is Cancel.BeforeCloseSend -> {
stream.cancel()
try {
var numUnsent = 0
for (i in req.requestMessages.indices) {
if (req.requestDelayMs > 0) {
delay(req.requestDelayMs.toLong())
}
val msg = fromAny(req.requestMessages[i], client.reqTemplate, CLIENT_STREAM_REQUEST_NAME)
try {
stream.send(msg)
} catch (ex: Exception) {
numUnsent = req.requestMessages.size - i
break
}
}
is Cancel.AfterCloseSendMs -> {
launch {
delay(cancel.millis.toLong())
when (val cancel = req.cancel) {
is Cancel.BeforeCloseSend -> {
stream.cancel()
}
is Cancel.AfterCloseSendMs -> {
launch {
delay(cancel.millis.toLong())
stream.cancel()
}
}
else -> {
// We already validated the case above.
// So this case means no cancellation.
}
}
else -> {
// We already validated the case above.
// So this case means no cancellation.
}
return@coroutineScope unaryResult(numUnsent, stream.closeAndReceive())
} finally {
stream.cancel()
}
return@coroutineScope unaryResult(numUnsent, stream.closeAndReceive())
}

private suspend fun <Req : MessageLite, Resp : MessageLite> handleServer(
Expand All @@ -210,13 +214,38 @@ class Client(
throw RuntimeException("server stream calls can only support `AfterCloseSendMs` and 'AfterNumResponses' cancellation field, instead got ${req.cancel!!::class.simpleName}")
}
val msg = fromAny(req.requestMessages[0], client.reqTemplate, SERVER_STREAM_REQUEST_NAME)
val stream = client.execute(msg, req.requestHeaders)
val cancel = req.cancel
if (cancel is Cancel.AfterCloseSendMs) {
delay(cancel.millis.toLong())
val stream: ResponseStream<Resp>
try {
// TODO: should this throw? Maybe not...
// An alternative would be to have it return a
// stream that throws the relevant exception in
// calls to receive.
stream = client.execute(msg, req.requestHeaders)
} catch (ex: Throwable) {
val connEx = if (ex is ConnectException) {
ex
} else {
ConnectException(
code = Code.UNKNOWN,
message = ex.message,
exception = ex,
)
}
return ClientResponseResult(
error = connEx,
numUnsentRequests = 1,
)
}
try {
val cancel = req.cancel
if (cancel is Cancel.AfterCloseSendMs) {
delay(cancel.millis.toLong())
stream.close()
}
return streamResult(0, stream, cancel)
} finally {
stream.close()
}
return streamResult(0, stream, cancel)
}

private suspend fun <Req : MessageLite, Resp : MessageLite> handleBidi(
Expand All @@ -238,114 +267,120 @@ class Client(
req: ClientCompatRequest,
): ClientResponseResult {
val stream = client.execute(req.requestHeaders)
var numUnsent = 0
for (i in req.requestMessages.indices) {
if (req.requestDelayMs > 0) {
delay(req.requestDelayMs.toLong())
}
val msg = fromAny(req.requestMessages[i], client.reqTemplate, BIDI_STREAM_REQUEST_NAME)
try {
stream.requests.send(msg)
} catch (_: Exception) {
numUnsent = req.requestMessages.size - i
break
}
}
val cancel = req.cancel
when (cancel) {
is Cancel.BeforeCloseSend -> {
stream.responses.close() // cancel
stream.requests.close() // close send
}
is Cancel.AfterCloseSendMs -> {
stream.requests.close() // close send
delay(cancel.millis.toLong())
stream.responses.close() // cancel
try {
var numUnsent = 0
for (i in req.requestMessages.indices) {
if (req.requestDelayMs > 0) {
delay(req.requestDelayMs.toLong())
}
val msg = fromAny(req.requestMessages[i], client.reqTemplate, BIDI_STREAM_REQUEST_NAME)
try {
stream.requests.send(msg)
} catch (ex: Exception) {
numUnsent = req.requestMessages.size - i
break
}
}
else -> {
stream.requests.close() // close send
val cancel = req.cancel
when (cancel) {
is Cancel.BeforeCloseSend -> {
stream.responses.close() // cancel
stream.requests.close() // close send
}
is Cancel.AfterCloseSendMs -> {
stream.requests.close() // close send
delay(cancel.millis.toLong())
stream.responses.close() // cancel
}
else -> {
stream.requests.close() // close send
}
}
return streamResult(numUnsent, stream.responses, cancel)
} finally {
stream.responses.close()
}
return streamResult(numUnsent, stream.responses, cancel)
}

private suspend fun <Req : MessageLite, Resp : MessageLite> handleFullDuplexBidi(
client: BidiStreamClient<Req, Resp>,
req: ClientCompatRequest,
): ClientResponseResult {
val stream = client.execute(req.requestHeaders)
val cancel = req.cancel
val payloads: MutableList<MessageLite> = mutableListOf()
for (i in req.requestMessages.indices) {
if (req.requestDelayMs > 0) {
delay(req.requestDelayMs.toLong())
}
val msg = fromAny(req.requestMessages[i], client.reqTemplate, BIDI_STREAM_REQUEST_NAME)
try {
stream.requests.send(msg)
} catch (_: Exception) {
// Ignore. We should see it again below when we receive the response.
}
try {
val cancel = req.cancel
val payloads: MutableList<MessageLite> = mutableListOf()
for (i in req.requestMessages.indices) {
if (req.requestDelayMs > 0) {
delay(req.requestDelayMs.toLong())
}
val msg = fromAny(req.requestMessages[i], client.reqTemplate, BIDI_STREAM_REQUEST_NAME)
try {
stream.requests.send(msg)
} catch (ex: Exception) {
// Ignore. We should see it again below when we receive the response.
}

// In full-duplex mode, we read the response after writing request,
// to interleave the requests and responses.
if (i == 0 && cancel is Cancel.AfterNumResponses && cancel.num == 0) {
stream.responses.close()
}
try {
val resp = stream.responses.messages.receive()
payloads.add(payloadExtractor(resp))
if (cancel is Cancel.AfterNumResponses && cancel.num == payloads.size) {
// In full-duplex mode, we read the response after writing request,
// to interleave the requests and responses.
if (i == 0 && cancel is Cancel.AfterNumResponses && cancel.num == 0) {
stream.responses.close()
}
} catch (ex: ConnectException) {
return ClientResponseResult(
headers = stream.responses.headers(),
payloads = payloads,
error = ex,
trailers = ex.metadata,
numUnsentRequests = req.requestMessages.size - i,
)
}
}
when (cancel) {
is Cancel.BeforeCloseSend -> {
stream.responses.close() // cancel
stream.requests.close() // close send
}
is Cancel.AfterCloseSendMs -> {
stream.requests.close() // close send
delay(cancel.millis.toLong())
stream.responses.close() // cancel
try {
val resp = stream.responses.messages.receive()
payloads.add(payloadExtractor(resp))
if (cancel is Cancel.AfterNumResponses && cancel.num == payloads.size) {
stream.responses.close()
}
} catch (ex: ConnectException) {
return ClientResponseResult(
headers = stream.responses.headers(),
payloads = payloads,
error = ex,
trailers = ex.metadata,
numUnsentRequests = req.requestMessages.size - i,
)
}
}
else -> {
stream.requests.close() // close send
when (cancel) {
is Cancel.BeforeCloseSend -> {
stream.responses.close() // cancel
stream.requests.close() // close send
}
is Cancel.AfterCloseSendMs -> {
stream.requests.close() // close send
delay(cancel.millis.toLong())
stream.responses.close() // cancel
}
else -> {
stream.requests.close() // close send
}
}
}

// Drain the response, in case there are any other messages.
var connEx: ConnectException? = null
var trailers: Headers
try {
for (resp in stream.responses.messages) {
payloads.add(payloadExtractor(resp))
if (cancel is Cancel.AfterNumResponses && cancel.num == payloads.size) {
stream.responses.close()
// Drain the response, in case there are any other messages.
var connEx: ConnectException? = null
var trailers: Headers
try {
for (resp in stream.responses.messages) {
payloads.add(payloadExtractor(resp))
if (cancel is Cancel.AfterNumResponses && cancel.num == payloads.size) {
stream.responses.close()
}
}
trailers = stream.responses.trailers()
} catch (ex: ConnectException) {
connEx = ex
trailers = ex.metadata
}
trailers = stream.responses.trailers()
} catch (ex: ConnectException) {
connEx = ex
trailers = ex.metadata
return ClientResponseResult(
headers = stream.responses.headers(),
payloads = payloads,
error = connEx,
trailers = trailers,
)
} finally {
stream.responses.close()
}
return ClientResponseResult(
headers = stream.responses.headers(),
payloads = payloads,
error = connEx,
trailers = trailers,
)
}

private fun unaryResult(numUnsent: Int, result: ResponseMessage<out MessageLite>): ClientResponseResult {
Expand Down Expand Up @@ -393,8 +428,6 @@ class Client(
} catch (ex: ConnectException) {
connEx = ex
trailers = ex.metadata
} finally {
stream.close()
}
return ClientResponseResult(
headers = stream.headers(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,10 @@ abstract class ClientStreamClient<Req : MessageLite, Resp : MessageLite>(
fun <Req : MessageLite, Resp : MessageLite> new(underlying: ClientOnlyStreamInterface<Req, Resp>): ClientStream<Req, Resp> {
return object : ClientStream<Req, Resp> {
override suspend fun send(req: Req) {
underlying.send(req)
val result = underlying.send(req)
if (result.isFailure) {
throw result.exceptionOrNull()!!
}
}

override suspend fun closeAndReceive(): ResponseMessage<Resp> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ interface RequestStream<Req : MessageLite> {
fun <Req : MessageLite, Resp : MessageLite> new(underlying: BidirectionalStreamInterface<Req, Resp>): RequestStream<Req> {
return object : RequestStream<Req> {
override suspend fun send(req: Req) {
underlying.send(req)
val result = underlying.send(req)
if (result.isFailure) {
throw result.exceptionOrNull()!!
}
}

override fun close() {
Expand Down
1 change: 1 addition & 0 deletions library/src/main/kotlin/com/connectrpc/Code.kt
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ enum class Code(val codeName: String, val value: Int) {
415 -> INTERNAL_ERROR
429 -> UNAVAILABLE
431 -> RESOURCE_EXHAUSTED
499 -> CANCELED
502, 503, 504 -> UNAVAILABLE
else -> UNKNOWN
}
Expand Down

0 comments on commit 868181b

Please sign in to comment.