Skip to content

Commit

Permalink
Fix all conformance failures other than timeouts/deadlines (#274)
Browse files Browse the repository at this point in the history
* The first fix is for how trailers-only responses are classified.
This was previously just looking for a "grpc-status" key in the headers.
If it was present, it was treating it as a trailers-only response, even
if there was a body and/or trailers.
* The second fix is so that the client reports errors in the face of
unexpected response content types. With a little code reorganization,
we can improve this logic in the future, to increase code sharing
(especially between gRPC and gRPC-Web).
  • Loading branch information
jhump committed May 17, 2024
1 parent 15627cd commit 235028c
Show file tree
Hide file tree
Showing 20 changed files with 263 additions and 84 deletions.
3 changes: 0 additions & 3 deletions conformance/client/known-failing-stream-cases.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,3 @@ Timeouts/HTTPVersion:2/**/bidi-stream/**

# Deadline headers are not currently set.
Deadline Propagation/**

# Bug: incorrect code attribution for these failures (UNKNOWN instead of INTERNAL)
Connect Unexpected Responses/**/unexpected-stream-codec
11 changes: 0 additions & 11 deletions conformance/client/known-failing-unary-cases.txt
Original file line number Diff line number Diff line change
@@ -1,13 +1,2 @@
# Deadline headers are not currently set.
Deadline Propagation/**

# Bug: response content-type is not correctly checked
**/unexpected-content-type

# Bug: "trailers-only" responses are not correctly identified.
# If headers contain "grpc-status", this client assumes it is a
# trailers-only response. However, a trailers-only response should
# instead be identified by lack of body or HTTP trailers.
gRPC Unexpected Responses/**/trailers-only/*
gRPC-Web Unexpected Responses/**/trailers-only/ignore-header-if-body-present

3 changes: 2 additions & 1 deletion library/src/main/kotlin/com/connectrpc/AnyError.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ package com.connectrpc
import okio.ByteString

/**
* This is an Any adapter for various base data types.
* This is a protobuf-runtime-agnostic representation of google.protobuf.Any
* messages, which are used to represent error details in gRPC.
*/
class AnyError(
val typeUrl: String,
Expand Down
2 changes: 1 addition & 1 deletion library/src/main/kotlin/com/connectrpc/Code.kt
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ enum class Code(val codeName: String, val value: Int) {
ABORTED("aborted", 10),
OUT_OF_RANGE("out_of_range", 11),
UNIMPLEMENTED("unimplemented", 12),
INTERNAL_ERROR("internal", 13),
INTERNAL_ERROR("internal", 13), // TODO: rename enum value to INTERNAL
UNAVAILABLE("unavailable", 14),
DATA_LOSS("data_loss", 15),
UNAUTHENTICATED("unauthenticated", 16),
Expand Down
1 change: 1 addition & 0 deletions library/src/main/kotlin/com/connectrpc/Codec.kt
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const val codecNameJSON = CODEC_NAME_JSON
* Defines a type that is capable of encoding and decoding messages using a specific format.
*/
interface Codec<E> {
// TODO: remove this method or unify somehow with SerializationStrategy.serializationName?
/**
* @return The name of the codec's format (e.g., "json", "proto"). Usually consumed
* in the form of adding the `content-type` header via "application/{name}".
Expand Down
6 changes: 4 additions & 2 deletions library/src/main/kotlin/com/connectrpc/ErrorDetailParser.kt
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@ import kotlin.reflect.KClass
*/
interface ErrorDetailParser {
/**
* Unpack the underlying payload into the input class type.
* Unpack the given Any payload into the input class type.
*/
fun <E : Any> unpack(any: AnyError, clazz: KClass<E>): E?

/**
* Parse payload for a list of error details.
* Parse the given bytes for a list of error details. The given
* bytes will be the serialized form of a google.rpc.Status
* Protobuf message.
*/
fun parseDetails(bytes: ByteArray): List<ConnectErrorDetail>
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import kotlin.reflect.KClass
interface SerializationStrategy {

/**
* The name of the serialization. Used in the content-encoding
* The name of the serialization. Used in the content-type
* header.
*/
fun serializationName(): String
Expand Down
3 changes: 3 additions & 0 deletions library/src/main/kotlin/com/connectrpc/StreamResult.kt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ package com.connectrpc
sealed class StreamResult<Output> {
// Headers have been received over the stream.
class Headers<Output>(val headers: com.connectrpc.Headers) : StreamResult<Output>() {
// TODO: This should include an HTTP status code, too. Computing an RPC code
// from the HTTP status code should be part of the protocol impl, not
// pushed down to the HTTPClientInterface impl.
override fun toString(): String {
return "Headers{headers=$headers}"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package com.connectrpc.protocols

const val ACCEPT_ENCODING = "accept-encoding"
const val CONTENT_ENCODING = "content-encoding"
const val CONTENT_TYPE = "content-type"
const val CONNECT_STREAMING_CONTENT_ENCODING = "connect-content-encoding"
const val CONNECT_STREAMING_ACCEPT_ENCODING = "connect-accept-encoding"
const val CONNECT_PROTOCOL_VERSION_KEY = "connect-protocol-version"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,37 +94,53 @@ internal class ConnectInterceptor(
}
val trailers = mutableMapOf<String, List<String>>()
trailers.putAll(response.headers.toTrailers())
trailers.putAll(response.trailers)
val responseHeaders =
val headers =
response.headers.filter { entry -> !entry.key.startsWith("trailer-") }
val compressionPool = clientConfig.compressionPool(responseHeaders[CONTENT_ENCODING]?.first())
val compressionPool = clientConfig.compressionPool(headers[CONTENT_ENCODING]?.first())
val responseBody = try {
compressionPool?.decompress(response.message.buffer) ?: response.message.buffer
} catch (e: Exception) {
return@UnaryFunction response.clone(
message = Buffer(),
headers = responseHeaders,
headers = headers,
trailers = trailers,
cause = ConnectException(
code = Code.INTERNAL_ERROR,
message = e.message,
exception = e,
metadata = headers.plus(trailers),
),
)
}
val contentType = headers[CONTENT_TYPE]?.first() ?: ""
val exception: ConnectException?
val message: Buffer
if (response.status != 200) {
exception = parseConnectUnaryException(response.status, responseHeaders.plus(trailers), responseBody)
exception = parseConnectUnaryException(response.status, contentType, headers.plus(trailers), responseBody)
// We've already read the response body to parse an error - don't read again.
message = Buffer()
} else {
exception = null
message = responseBody
val isValidContentType =
(serializationStrategy.serializationName() == "json" && contentTypeIsJSON(contentType)) ||
contentType == "application/" + serializationStrategy.serializationName()
if (isValidContentType) {
exception = null
} else {
// If content-type looks like it could be an RPC server's response, consider
// this an internal error. Otherwise, we infer a code from the HTTP status,
// which means a code of UNKNOWN since HTTP status is 200.
val code = if (contentType.startsWith("application/")) Code.INTERNAL_ERROR else Code.UNKNOWN
exception = ConnectException(
code = code,
message = "unexpected content-type: $contentType",
metadata = headers.plus(trailers),
)
}
}
response.clone(
message = message,
headers = responseHeaders,
headers = headers,
trailers = trailers,
cause = exception,
)
Expand Down Expand Up @@ -161,9 +177,25 @@ internal class ConnectInterceptor(
val streamResult: StreamResult<Buffer> = res.fold(
onHeaders = { result ->
responseHeaders = result.headers
responseCompressionPool =
clientConfig.compressionPool(responseHeaders[CONNECT_STREAMING_CONTENT_ENCODING]?.first())
StreamResult.Headers(responseHeaders)
val contentType = responseHeaders[CONTENT_TYPE]?.first() ?: ""
val isValidContentType = contentType == "application/connect+" + serializationStrategy.serializationName()
if (!isValidContentType) {
// If content-type looks like it could be an RPC server's response, consider
// this an internal error. Otherwise, we infer a code from the HTTP status,
// which means a code of UNKNOWN since HTTP status is 200.
val code = if (contentType.startsWith("application/connect+")) Code.INTERNAL_ERROR else Code.UNKNOWN
StreamResult.Complete(
ConnectException(
code = code,
message = "unexpected content-type: $contentType",
metadata = responseHeaders,
),
)
} else {
responseCompressionPool =
clientConfig.compressionPool(responseHeaders[CONNECT_STREAMING_CONTENT_ENCODING]?.first())
StreamResult.Headers(responseHeaders)
}
},
onMessage = { result ->
val (headerByte, unpackedMessage) = Envelope.unpackWithHeaderByte(
Expand Down Expand Up @@ -196,15 +228,15 @@ internal class ConnectInterceptor(
): UnaryHTTPRequest {
val serializationStrategy = clientConfig.serializationStrategy
val requestCodec = serializationStrategy.codec(request.methodSpec.requestClass)
val url = getUrlFromMethodSpec(
val url = constructURLForGETRequest(
request,
requestCodec,
finalRequestBody,
requestCompression,
)
return request.clone(
url = url,
contentType = "application/${requestCodec.encodingName()}",
contentType = "",
headers = request.headers,
methodSpec = request.methodSpec,
httpMethod = HTTPMethod.GET,
Expand Down Expand Up @@ -244,9 +276,9 @@ internal class ConnectInterceptor(
}
}

private fun parseConnectUnaryException(httpStatus: Int?, metadata: Headers, source: Buffer?): ConnectException {
private fun parseConnectUnaryException(httpStatus: Int?, contentType: String, metadata: Headers, source: Buffer?): ConnectException {
val code = Code.fromHTTPStatus(httpStatus)
if (source == null) {
if (source == null || !contentTypeIsJSON(contentType)) {
return ConnectException(code, "unexpected status code: $httpStatus")
}
return source.use { bufferedSource ->
Expand Down Expand Up @@ -298,7 +330,7 @@ private fun Headers.toTrailers(): Trailers {
return trailers
}

private fun getUrlFromMethodSpec(
private fun constructURLForGETRequest(
httpRequest: HTTPRequest,
codec: Codec<*>,
payload: Buffer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,11 @@ internal class EndStreamResponseJSON(
@Json(name = "error") val error: ErrorPayloadJSON?,
@Json(name = "metadata") val metadata: Headers?,
)

internal fun contentTypeIsJSON(contentType: String): Boolean {
// TODO: This could be more robust, like actually parsing the content-type.
// There exists a good helper for that, but it's in okhttp, which we intentionally
// don't have as a dep for this module, which aims to be agnostic of the actual
// HTTP client implementation to use.
return contentType == "application/json" || contentType == "application/json; charset=utf-8"
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,26 +33,24 @@ internal class GRPCCompletionParser(
*
* Returns an "absent" completion if unable to be parsed.
*/
internal fun parse(headers: Headers, trailers: Trailers): GRPCCompletion {
internal fun parse(headers: Headers, hasBody: Boolean, trailers: Trailers): GRPCCompletion {
val statusCode: Int
val statusMetadata: Map<String, List<String>>
val statusFromHeaders = parseStatus(headers)
val trailersOnly: Boolean
if (statusFromHeaders == null) {
statusCode = parseStatus(trailers)
?: return GRPCCompletion(
present = false,
code = Code.INTERNAL_ERROR,
message = "protocol error: status is missing from trailers",
metadata = trailers,
)
statusMetadata = trailers
trailersOnly = false
} else {
statusCode = statusFromHeaders
if (!hasBody && trailers.isEmpty()) {
statusMetadata = headers
trailersOnly = true
} else {
statusMetadata = trailers
trailersOnly = false
}
statusCode = parseStatus(statusMetadata)
?: return GRPCCompletion(
present = false,
code = Code.UNKNOWN,
message = "protocol error: status is missing from trailers",
metadata = statusMetadata,
)
// Note: we report combined headers and trailers as exception meta, so
// caller doesn't have to check both, which is particularly important
// since server could actually serialize them together in a single bucket
Expand Down
61 changes: 48 additions & 13 deletions library/src/main/kotlin/com/connectrpc/protocols/GRPCInterceptor.kt
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ internal class GRPCInterceptor(
private val completionParser = GRPCCompletionParser(serializationStrategy.errorDetailParser())
private var responseCompressionPool: CompressionPool? = null
private var responseHeaders: Headers = emptyMap()
private var streamEmpty: Boolean = true

override fun unaryFunction(): UnaryFunction {
return UnaryFunction(
Expand Down Expand Up @@ -66,30 +67,47 @@ internal class GRPCInterceptor(
if (response.cause != null) {
return@UnaryFunction response.clone(message = Buffer())
}
val headers = response.headers
if (response.status != 200) {
return@UnaryFunction response.clone(
message = Buffer(),
cause = ConnectException(
code = Code.fromHTTPStatus(response.status),
message = "unexpected status code: ${response.status}",
metadata = headers,
),
)
}
val contentType = headers[CONTENT_TYPE]?.first() ?: ""
if (!contentTypeIsExpectedGRPC(contentType, serializationStrategy.serializationName())) {
// If content-type looks like it could be a gRPC server's response, consider
// this an internal error. Otherwise, we infer a code from the HTTP status,
// which means a code of UNKNOWN since HTTP status is 200.
val code = if (contentTypeIsGRPC(contentType)) Code.INTERNAL_ERROR else Code.UNKNOWN
return@UnaryFunction response.clone(
message = Buffer(),
cause = ConnectException(
code = code,
message = "unexpected content-type: $contentType",
metadata = headers,
),
)
}
val headers = response.headers
var trailers = response.trailers
val completion = completionParser
.parse(headers, trailers)
val hasBody = !response.message.buffer.exhausted()
val completion = completionParser.parse(headers, hasBody, trailers)
if (completion.trailersOnly) {
trailers = headers // report the headers also as trailers
}
val exception = completion.toConnectExceptionOrNull(serializationStrategy)
val message = if (exception == null) {
if (response.message.buffer.exhausted()) {
if (!hasBody) {
return@UnaryFunction response.clone(
message = Buffer(),
cause = ConnectException(
code = Code.UNIMPLEMENTED,
message = "unary stream has no messages",
metadata = headers.plus(trailers),
),
)
}
Expand All @@ -105,6 +123,7 @@ internal class GRPCInterceptor(
cause = ConnectException(
code = Code.UNIMPLEMENTED,
message = "unary stream has multiple messages",
metadata = headers.plus(trailers),
),
)
}
Expand Down Expand Up @@ -137,21 +156,28 @@ internal class GRPCInterceptor(
streamResultFunction = { res ->
res.fold(
onHeaders = { result ->
val headers = result.headers
val completion = completionParser.parse(headers, emptyMap())
if (completion.present) {
responseHeaders = result.headers
val contentType = responseHeaders[CONTENT_TYPE]?.first() ?: ""
if (!contentTypeIsExpectedGRPC(contentType, serializationStrategy.serializationName())) {
// If content-type looks like it could be a gRPC server's response, consider
// this an internal error. Otherwise, we infer a code from the HTTP status,
// which means a code of UNKNOWN since HTTP status is 200.
val code = if (contentTypeIsGRPC(contentType)) Code.INTERNAL_ERROR else Code.UNKNOWN
StreamResult.Complete(
cause = completion.toConnectExceptionOrNull(serializationStrategy),
trailers = headers,
cause = ConnectException(
code = code,
message = "unexpected content-type: $contentType",
metadata = responseHeaders,
),
)
} else {
responseHeaders = headers
responseCompressionPool = clientConfig
.compressionPool(headers[GRPC_ENCODING]?.first())
StreamResult.Headers(headers)
.compressionPool(responseHeaders[GRPC_ENCODING]?.first())
StreamResult.Headers(responseHeaders)
}
},
onMessage = { result ->
streamEmpty = false
val (_, unpackedMessage) = Envelope.unpackWithHeaderByte(
result.message,
responseCompressionPool,
Expand All @@ -164,7 +190,7 @@ internal class GRPCInterceptor(
}
val trailers = result.trailers
val exception = completionParser
.parse(responseHeaders, trailers)
.parse(responseHeaders, !streamEmpty, trailers)
.toConnectExceptionOrNull(serializationStrategy)
StreamResult.Complete(
cause = exception,
Expand All @@ -190,3 +216,12 @@ internal class GRPCInterceptor(
return headers
}
}

internal fun contentTypeIsGRPC(contentType: String): Boolean {
return contentType == "application/grpc" || contentType.startsWith("application/grpc+")
}

internal fun contentTypeIsExpectedGRPC(contentType: String, expectCodec: String): Boolean {
return (expectCodec == "proto" && contentType == "application/grpc") ||
contentType == "application/grpc+$expectCodec"
}

0 comments on commit 235028c

Please sign in to comment.