Skip to content

Commit

Permalink
Some fixes noticed when testing against 'main' of conformance (#252)
Browse files Browse the repository at this point in the history
We are hoping to cut a _final_ v1.0.0 of the conformance suite this
week. It has a couple of minor changes from rc4. I wanted to make sure
they wouldn't cause issues (particularly, [this pending
PR](connectrpc/conformance#840), which could
possibly impact some of the logic I just added in #248 related to
reporting trailers and merging headers+trailers in error metadata).

And... this ended up exposing some issues.

For one, I had not updated the Connect stream protocol in the same way
as the gRPC-Web and gRPC protocols regarding merging of headers+trailers
in metadata. I didn't notice in the previous PR because it was actually
the "trailers only" responses that tickled test failures related to this
(the connect-go reference server will only use "trailers only" responses
for gRPC-Web, not for other protocols). For two, this code path was not
correctly setting the trailers of the `StreamResult.Completed` object.
Oops 🤦.

After I fixed that, I just happened to observe a conformance test flake
that caused the client to hang. It turned out to be due to uncaught
exception while trying to drain the response body in a unary RPC. So
that's what warranted the other changes in here, adding `try/catch` to
the unary flow of `ConnectOkHttpClient` and extracting the
`safeTrailers` helper so it can also be used from that unary flow.

While making the above change, I greatly simplified the `safeTrailers`
helper because it seemed to be incorrect: it was almost always computing
empty trailers in these unary flows (which causes issues for gRPC calls,
which rely on trailers to convey the status code). After re-reading the
code, it's not clear why it was wrong, and I didn't have the patience to
try to put it in a debugger to step through it and understand it. I just
simplified it, removing the questionable conditional, and that fixed it.
  • Loading branch information
jhump committed Apr 2, 2024
1 parent 8805990 commit 16a8b1d
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ internal class ConnectInterceptor(
private val moshi = Moshi.Builder().build()
private val serializationStrategy = clientConfig.serializationStrategy
private var responseCompressionPool: CompressionPool? = null
private var responseHeaders: Headers = emptyMap()

override fun unaryFunction(): UnaryFunction {
return UnaryFunction(
Expand Down Expand Up @@ -159,8 +160,7 @@ internal class ConnectInterceptor(
streamResultFunction = { res ->
val streamResult: StreamResult<Buffer> = res.fold(
onHeaders = { result ->
val responseHeaders =
result.headers.filter { entry -> !entry.key.startsWith("trailer-") }
responseHeaders = result.headers
responseCompressionPool =
clientConfig.compressionPool(responseHeaders[CONNECT_STREAMING_CONTENT_ENCODING]?.first())
StreamResult.Headers(responseHeaders)
Expand All @@ -171,7 +171,7 @@ internal class ConnectInterceptor(
responseCompressionPool,
)
if (headerByte.and(TRAILERS_BIT) == TRAILERS_BIT) {
parseConnectEndStream(unpackedMessage)
parseConnectEndStream(responseHeaders, unpackedMessage)
} else {
StreamResult.Message(unpackedMessage)
}
Expand Down Expand Up @@ -211,7 +211,7 @@ internal class ConnectInterceptor(
)
}

private fun parseConnectEndStream(source: Buffer): StreamResult.Complete<Buffer> {
private fun parseConnectEndStream(headers: Headers, source: Buffer): StreamResult.Complete<Buffer> {
val adapter = moshi.adapter(EndStreamResponseJSON::class.java).nonNull()
return source.use { bufferedSource ->
val errorJSON = bufferedSource.readUtf8()
Expand All @@ -234,11 +234,12 @@ internal class ConnectInterceptor(
cause = ConnectException(
code = code,
message = endStreamResponseJSON.error.message,
metadata = metadata.orEmpty(),
metadata = headers.plus(metadata.orEmpty()),
).withErrorDetails(
serializationStrategy.errorDetailParser(),
parseErrorDetails(endStreamResponseJSON.error),
),
trailers = metadata.orEmpty(),
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,12 @@ class ConnectInterceptorTest {

assertThat(result).isInstanceOf(StreamResult.Headers::class.java)
val headerResult = result as StreamResult.Headers
assertThat(headerResult.headers).isEqualTo(mapOf(CONNECT_STREAMING_CONTENT_ENCODING to listOf("gzip")))
assertThat(headerResult.headers).isEqualTo(
mapOf(
"trailer-x-some-key" to listOf("some_value"),
CONNECT_STREAMING_CONTENT_ENCODING to listOf("gzip"),
),
)
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package com.connectrpc.okhttp
import com.connectrpc.Code
import com.connectrpc.ConnectException
import com.connectrpc.StreamResult
import com.connectrpc.asConnectException
import com.connectrpc.http.Cancelable
import com.connectrpc.http.HTTPClientInterface
import com.connectrpc.http.HTTPRequest
Expand Down Expand Up @@ -147,17 +148,24 @@ class ConnectOkHttpClient @JvmOverloads constructor(

override fun onResponse(call: Call, response: Response) {
// Unary requests will need to read the entire body to access trailers.
val responseBuffer = response.body?.source()?.use { bufferedSource ->
val buffer = Buffer()
buffer.writeAll(bufferedSource)
buffer
var responseBuffer: Buffer? = null
var connEx: ConnectException? = null
try {
responseBuffer = response.body?.source()?.use { bufferedSource ->
val buffer = Buffer()
buffer.writeAll(bufferedSource)
buffer
}
} catch (ex: Throwable) {
connEx = asConnectException(ex, codeFromException(call.isCanceled(), ex))
}
onResult(
HTTPResponse(
status = response.originalCode(),
headers = response.headers.toLowerCaseKeysMultiMap(),
message = responseBuffer ?: Buffer(),
trailers = response.trailers().toLowerCaseKeysMultiMap(),
trailers = response.safeTrailers(),
cause = connEx,
),
)
}
Expand Down Expand Up @@ -197,7 +205,7 @@ internal fun Headers.toLowerCaseKeysMultiMap(): Map<String, List<String>> {
)
}

internal fun codeFromException(callCanceled: Boolean, e: Exception): Code {
internal fun codeFromException(callCanceled: Boolean, e: Throwable): Code {
return if ((e is InterruptedIOException && e.message == "timeout") ||
e is SocketTimeoutException
) {
Expand Down Expand Up @@ -232,3 +240,12 @@ fun Response.originalMessage(): String {
message
}
}

internal fun Response.safeTrailers(): Map<String, List<String>> {
return try {
trailers().toLowerCaseKeysMultiMap()
} catch (_: Throwable) {
// Trailers not available or something else went wrong...
emptyMap()
}
}
35 changes: 6 additions & 29 deletions okhttp/src/main/kotlin/com/connectrpc/okhttp/OkHttpStream.kt
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ private class ResponseCallback(
if (httpStatus != 200) {
// TODO: This is not quite exercised yet. Validate if this is exercised in another test case.
val finalResult = StreamResult.Complete<Buffer>(
trailers = response.safeTrailers() ?: emptyMap(),
trailers = response.safeTrailers(),
cause = ConnectException(
code = Code.fromHTTPStatus(httpStatus),
message = "unexpected HTTP status: $httpStatus ${response.originalMessage()}",
Expand All @@ -119,7 +119,7 @@ private class ResponseCallback(
}
response.use { resp ->
resp.body!!.source().use { sourceBuffer ->
var exception: Exception? = null
var connEx: ConnectException? = null
try {
while (!sourceBuffer.exhausted()) {
val buffer = readStreamElement(sourceBuffer)
Expand All @@ -128,18 +128,14 @@ private class ResponseCallback(
)
onResult(streamResult)
}
} catch (e: Exception) {
exception = e
} catch (ex: Exception) {
connEx = asConnectException(ex, codeFromException(call.isCanceled(), ex))
} finally {
// If trailers are not yet communicated.
// This is the final chance to notify trailers to the consumer.
val connectEx = when (exception) {
null -> null
else -> asConnectException(exception, codeFromException(call.isCanceled(), exception))
}
val finalResult = StreamResult.Complete<Buffer>(
trailers = response.safeTrailers() ?: emptyMap(),
cause = connectEx,
trailers = response.safeTrailers(),
cause = connEx,
)
onResult(finalResult)
}
Expand All @@ -148,25 +144,6 @@ private class ResponseCallback(
}
}

private fun Response.safeTrailers(): Map<String, List<String>>? {
try {
if (body?.source()?.exhausted() == false) {
// Assuming this means that trailers are not available.
// Returning null to signal trailers are "missing".
return null
}
} catch (e: Exception) {
return null
}

return try {
trailers().toLowerCaseKeysMultiMap()
} catch (_: Throwable) {
// Something went terribly wrong.
emptyMap()
}
}

/**
* Helps with reading and framing OkHttp responses into Buffers.
*
Expand Down

0 comments on commit 16a8b1d

Please sign in to comment.