diff --git a/Makefile b/Makefile index ad2e66ee..eb62f19c 100644 --- a/Makefile +++ b/Makefile @@ -10,11 +10,12 @@ BIN := .tmp/bin CACHE := .tmp/cache LICENSE_HEADER_YEAR_RANGE := 2022-2023 LICENSE_HEADER_VERSION := v1.30.0 -CONFORMANCE_VERSION := v1.0.0-rc3 +CONFORMANCE_VERSION := v1.0.0-rc4 PROTOC_VERSION ?= 26.1 GRADLE_ARGS ?= PROTOC := $(BIN)/protoc CONNECT_CONFORMANCE := $(BIN)/connectconformance +CONNECT_CONFORMANCE_ARGS ?= -v --mode client --trace UNAME_OS := $(shell uname -s) UNAME_ARCH := $(shell uname -m) @@ -42,34 +43,34 @@ clean: ## Cleans the underlying build. .PHONY: runconformance runconformance: generate $(CONNECT_CONFORMANCE) ## Run the new conformance test suite. ./gradlew $(GRADLE_ARGS) conformance:client:google-java:installDist conformance:client:google-javalite:installDist - $(CONNECT_CONFORMANCE) -v --mode client --conf conformance/client/lite-unary-config.yaml \ + $(CONNECT_CONFORMANCE) $(CONNECT_CONFORMANCE_ARGS) --conf conformance/client/lite-unary-config.yaml \ --known-failing @conformance/client/known-failing-unary-cases.txt -- \ conformance/client/google-javalite/build/install/google-javalite/bin/google-javalite \ --style suspend - $(CONNECT_CONFORMANCE) -v --mode client --conf conformance/client/lite-unary-config.yaml \ + $(CONNECT_CONFORMANCE) $(CONNECT_CONFORMANCE_ARGS) --conf conformance/client/lite-unary-config.yaml \ --known-failing @conformance/client/known-failing-unary-cases.txt -- \ conformance/client/google-javalite/build/install/google-javalite/bin/google-javalite \ --style callback - $(CONNECT_CONFORMANCE) -v --mode client --conf conformance/client/lite-unary-config.yaml \ + $(CONNECT_CONFORMANCE) $(CONNECT_CONFORMANCE_ARGS) --conf conformance/client/lite-unary-config.yaml \ --known-failing @conformance/client/known-failing-unary-cases.txt -- \ conformance/client/google-javalite/build/install/google-javalite/bin/google-javalite \ --style blocking - $(CONNECT_CONFORMANCE) -v --mode client --conf conformance/client/standard-unary-config.yaml \ + $(CONNECT_CONFORMANCE) $(CONNECT_CONFORMANCE_ARGS) --conf conformance/client/standard-unary-config.yaml \ --known-failing @conformance/client/known-failing-unary-cases.txt -- \ conformance/client/google-java/build/install/google-java/bin/google-java \ --style suspend - $(CONNECT_CONFORMANCE) -v --mode client --conf conformance/client/standard-unary-config.yaml \ + $(CONNECT_CONFORMANCE) $(CONNECT_CONFORMANCE_ARGS) --conf conformance/client/standard-unary-config.yaml \ --known-failing @conformance/client/known-failing-unary-cases.txt -- \ conformance/client/google-java/build/install/google-java/bin/google-java \ --style callback - $(CONNECT_CONFORMANCE) -v --mode client --conf conformance/client/standard-unary-config.yaml \ + $(CONNECT_CONFORMANCE) $(CONNECT_CONFORMANCE_ARGS) --conf conformance/client/standard-unary-config.yaml \ --known-failing @conformance/client/known-failing-unary-cases.txt -- \ conformance/client/google-java/build/install/google-java/bin/google-java \ --style blocking - $(CONNECT_CONFORMANCE) -v --mode client --conf conformance/client/lite-stream-config.yaml \ + $(CONNECT_CONFORMANCE) $(CONNECT_CONFORMANCE_ARGS) --conf conformance/client/lite-stream-config.yaml \ --known-failing @conformance/client/known-failing-stream-cases.txt -- \ conformance/client/google-javalite/build/install/google-javalite/bin/google-javalite - $(CONNECT_CONFORMANCE) -v --mode client --conf conformance/client/standard-stream-config.yaml \ + $(CONNECT_CONFORMANCE) $(CONNECT_CONFORMANCE_ARGS) --conf conformance/client/standard-stream-config.yaml \ --known-failing @conformance/client/known-failing-stream-cases.txt -- \ conformance/client/google-java/build/install/google-java/bin/google-java diff --git a/conformance/client/google-java/src/main/kotlin/com/connectrpc/conformance/client/java/JavaHelpers.kt b/conformance/client/google-java/src/main/kotlin/com/connectrpc/conformance/client/java/JavaHelpers.kt index c934013d..55b88b00 100644 --- a/conformance/client/google-java/src/main/kotlin/com/connectrpc/conformance/client/java/JavaHelpers.kt +++ b/conformance/client/google-java/src/main/kotlin/com/connectrpc/conformance/client/java/JavaHelpers.kt @@ -251,7 +251,7 @@ class JavaHelpers { } private class TlsCredsImpl( - private val msg: com.connectrpc.conformance.v1.ClientCompatRequest.TLSCreds, + private val msg: com.connectrpc.conformance.v1.TLSCreds, ) : TlsCreds { override val cert: ByteString get() = msg.cert diff --git a/conformance/client/google-javalite/src/main/kotlin/com/connectrpc/conformance/client/javalite/JavaLiteHelpers.kt b/conformance/client/google-javalite/src/main/kotlin/com/connectrpc/conformance/client/javalite/JavaLiteHelpers.kt index 42d0c300..bac3fdc6 100644 --- a/conformance/client/google-javalite/src/main/kotlin/com/connectrpc/conformance/client/javalite/JavaLiteHelpers.kt +++ b/conformance/client/google-javalite/src/main/kotlin/com/connectrpc/conformance/client/javalite/JavaLiteHelpers.kt @@ -233,7 +233,7 @@ class JavaLiteHelpers { } private class TlsCredsImpl( - private val msg: com.connectrpc.conformance.v1.ClientCompatRequest.TLSCreds, + private val msg: com.connectrpc.conformance.v1.TLSCreds, ) : TlsCreds { override val cert: ByteString get() = msg.cert diff --git a/conformance/client/known-failing-unary-cases.txt b/conformance/client/known-failing-unary-cases.txt index e0b823d5..bcdd5e3d 100644 --- a/conformance/client/known-failing-unary-cases.txt +++ b/conformance/client/known-failing-unary-cases.txt @@ -1,32 +1,13 @@ -# If error body is JSON null, it is interpreted as an unknown error -# instead of falling back to basing code on the HTTP status. -Connect Error and End-Stream/**/error/null - # Deadline headers are not currently set. Deadline Propagation/** -# This response doesn't look like a normal Connect response, so it -# goes through okhttp's default 408 code handling, which retries. -# The retry triggers an error: -# client sent another request (#2) for the same test case -HTTP to Connect Code Mapping/**/request-timeout - # Bug: response content-type is not correctly checked -Unexpected Responses/**/unexpected-content-type +**/unexpected-content-type # Bug: "trailers-only" responses are not correctly identified. # If headers contain "grpc-status", this client assumes it is a # trailers-only response. However, a trailers-only response should # instead be identified by lack of body or HTTP trailers. gRPC Unexpected Responses/**/trailers-only/* +gRPC-Web Unexpected Responses/**/trailers-only/ignore-header-if-body-present -# Bug: if gRPC unary response contains zero messages or -# more than one message, client does not complain if -# status trailers says "ok" -gRPC Unexpected Responses/**/unary-multiple-responses -gRPC Unexpected Responses/**/unary-ok-but-no-response -gRPC-Web Unexpected Responses/**/unary-ok-but-no-response - -# Bug: incorrect code attribution for these failures (INTERNAL instead of UNKNOWN) -gRPC-Web Unexpected Responses/**/missing-status -gRPC-Web Unexpected Responses/**/trailers-in-body/unary-multiple-responses diff --git a/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/Client.kt b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/Client.kt index af6ff03e..7e4bb746 100644 --- a/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/Client.kt +++ b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/Client.kt @@ -385,10 +385,15 @@ class Client( ) } is ResponseMessage.Failure -> { + // TODO: report result.headers and result.trailers independently + // once reference server can report send them independently. + // https://github.com/connectrpc/conformance/pull/840. + // Until then, always report trailers as exception metadata (which + // may include headers). ClientResponseResult( headers = result.headers, error = result.cause, - trailers = result.trailers, + trailers = result.cause.metadata, numUnsentRequests = numUnsent, ) } diff --git a/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/ClientStreamClient.kt b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/ClientStreamClient.kt index 65918acb..6c651185 100644 --- a/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/ClientStreamClient.kt +++ b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/adapt/ClientStreamClient.kt @@ -103,7 +103,7 @@ abstract class ClientStreamClient( return ResponseMessage.Failure( cause = connEx, headers = underlying.responseHeaders().await(), - trailers = connEx.metadata, + trailers = underlying.responseTrailers().await(), ) } } diff --git a/library/src/main/kotlin/com/connectrpc/Code.kt b/library/src/main/kotlin/com/connectrpc/Code.kt index 5a5bcc34..3b540ec6 100644 --- a/library/src/main/kotlin/com/connectrpc/Code.kt +++ b/library/src/main/kotlin/com/connectrpc/Code.kt @@ -47,32 +47,24 @@ enum class Code(val codeName: String, val value: Int) { fun fromHTTPStatus(status: Int?): Code { return when (status) { null -> UNKNOWN - 400 -> INVALID_ARGUMENT + 400 -> INTERNAL_ERROR 401 -> UNAUTHENTICATED 403 -> PERMISSION_DENIED 404 -> UNIMPLEMENTED - 408 -> DEADLINE_EXCEEDED - 409 -> ABORTED - 412 -> FAILED_PRECONDITION - 413 -> RESOURCE_EXHAUSTED - 415 -> INTERNAL_ERROR - 429 -> UNAVAILABLE - 431 -> RESOURCE_EXHAUSTED - 499 -> CANCELED - 502, 503, 504 -> UNAVAILABLE + 429, 502, 503, 504 -> UNAVAILABLE else -> UNKNOWN } } - fun fromName(name: String?): Code { + fun fromName(name: String?, ifNotKnown: Code = UNKNOWN): Code { if (name == null) { - return UNKNOWN + return ifNotKnown } for (value in values()) { if (value.codeName == name) { return value } } - return UNKNOWN + return ifNotKnown } fun fromValue(value: Int?): Code? { if (value == null) { diff --git a/library/src/main/kotlin/com/connectrpc/ConnectErrorDetail.kt b/library/src/main/kotlin/com/connectrpc/ConnectErrorDetail.kt index 3ca288cd..c5da1f13 100644 --- a/library/src/main/kotlin/com/connectrpc/ConnectErrorDetail.kt +++ b/library/src/main/kotlin/com/connectrpc/ConnectErrorDetail.kt @@ -16,14 +16,16 @@ package com.connectrpc import okio.ByteString -// An ErrorDetail is a self-describing Protobuf message attached to an [*Error]. +// A ConnectErrorDetail is a self-describing Protobuf message attached to a +// [ConnectException]. +// // Error details are sent over the network to clients, which can then work with // strongly-typed data rather than trying to parse a complex error message. For // example, you might use details to send a localized error message or retry // parameters to the client. // -// The [google.golang.org/genproto/googleapis/rpc/errdetails] package contains a -// variety of Protobuf messages commonly used as error details. +// The [com.google.rpc](https://googleapis.github.io/googleapis/java/all/latest/apidocs/com/google/rpc/package-summary.html) +// package contains a variety of Protobuf messages commonly used as error details. data class ConnectErrorDetail( val type: String, val payload: ByteString, diff --git a/library/src/main/kotlin/com/connectrpc/ConnectException.kt b/library/src/main/kotlin/com/connectrpc/ConnectException.kt index 8eb4ef97..9d847daa 100644 --- a/library/src/main/kotlin/com/connectrpc/ConnectException.kt +++ b/library/src/main/kotlin/com/connectrpc/ConnectException.kt @@ -20,19 +20,30 @@ import kotlin.reflect.KClass * Typed error provided by Connect RPCs that may optionally wrap additional typed custom errors * using [details]. */ -data class ConnectException( +class ConnectException private constructor( // The resulting status code. val code: Code, - private val errorDetailParser: ErrorDetailParser? = null, // User-readable error message. - override val message: String? = null, + override val message: String?, // Client-side exception that occurred, resulting in the error. - val exception: Throwable? = null, - // List of typed errors that were provided by the server. - val details: List = emptyList(), + val exception: Throwable?, // Additional key-values that were provided by the server. - val metadata: Headers = emptyMap(), + val metadata: Headers, + // Optional parser for messages in details. Will be non-null if + // details is non-empty. + private val errorDetailParser: ErrorDetailParser?, + // List of typed errors that were provided by the server. + val details: List, ) : Exception(message, exception) { + /** + * Constructs a new ConnectException. + */ + constructor ( + code: Code, + message: String? = null, + exception: Throwable? = null, + metadata: Headers = emptyMap(), + ) : this(code, message, exception, metadata, null, emptyList()) /** * Unpacks values from [details] and returns the first matching error, if any. @@ -51,16 +62,31 @@ data class ConnectException( } /** - * Creates a new [ConnectException] with the specified [ErrorDetailParser]. + * Creates a new [ConnectException] with the specified error details and + * accompanying (non-null) detail parser. */ - fun setErrorParser(errorParser: ErrorDetailParser): ConnectException { + fun withErrorDetails(errorParser: ErrorDetailParser, details: List): ConnectException { return ConnectException( code, - errorParser, message, exception, + metadata, + errorParser, details, + ) + } + + /** + * Creates a new [ConnectException] with the specified metadata. + */ + fun withMetadata(metadata: Headers): ConnectException { + return ConnectException( + code, + message, + exception, metadata, + errorDetailParser, + details, ) } } diff --git a/library/src/main/kotlin/com/connectrpc/impl/ClientOnlyStream.kt b/library/src/main/kotlin/com/connectrpc/impl/ClientOnlyStream.kt index 0edbf07e..86e339f6 100644 --- a/library/src/main/kotlin/com/connectrpc/impl/ClientOnlyStream.kt +++ b/library/src/main/kotlin/com/connectrpc/impl/ClientOnlyStream.kt @@ -19,7 +19,9 @@ import com.connectrpc.ClientOnlyStreamInterface import com.connectrpc.Code import com.connectrpc.ConnectException import com.connectrpc.Headers +import com.connectrpc.asConnectException import kotlinx.coroutines.Deferred +import kotlinx.coroutines.channels.ClosedReceiveChannelException /** * Concrete implementation of [ClientOnlyStreamInterface]. @@ -35,12 +37,23 @@ internal class ClientOnlyStream( val resultChannel = messageStream.responseChannel() try { messageStream.sendClose() - val message = resultChannel.receive() + val message = resultChannel.receiveCatching() + if (message.isFailure) { + val ex = message.exceptionOrNull() + if (ex == null || ex is ClosedReceiveChannelException) { + throw ConnectException( + code = Code.UNIMPLEMENTED, + message = "unary stream has no messages", + exception = ex, + ) + } + throw asConnectException(ex) + } val additionalMessage = resultChannel.receiveCatching() if (additionalMessage.isSuccess) { - throw ConnectException(code = Code.UNKNOWN, message = "unary stream has multiple messages") + throw ConnectException(code = Code.UNIMPLEMENTED, message = "unary stream has multiple messages") } - return message + return message.getOrThrow() } finally { resultChannel.cancel() } diff --git a/library/src/main/kotlin/com/connectrpc/impl/ProtocolClient.kt b/library/src/main/kotlin/com/connectrpc/impl/ProtocolClient.kt index d85cd12a..b846b754 100644 --- a/library/src/main/kotlin/com/connectrpc/impl/ProtocolClient.kt +++ b/library/src/main/kotlin/com/connectrpc/impl/ProtocolClient.kt @@ -109,11 +109,10 @@ class ProtocolClient( ) return@httpClientUnary } - val exception = finalResponse.cause?.setErrorParser(serializationStrategy.errorDetailParser()) - if (exception != null) { + if (finalResponse.cause != null) { onResult( ResponseMessage.Failure( - exception, + finalResponse.cause, finalResponse.headers, finalResponse.trailers, ), diff --git a/library/src/main/kotlin/com/connectrpc/protocols/ConnectInterceptor.kt b/library/src/main/kotlin/com/connectrpc/protocols/ConnectInterceptor.kt index 95586b36..9af725fb 100644 --- a/library/src/main/kotlin/com/connectrpc/protocols/ConnectInterceptor.kt +++ b/library/src/main/kotlin/com/connectrpc/protocols/ConnectInterceptor.kt @@ -106,7 +106,6 @@ internal class ConnectInterceptor( trailers = trailers, cause = ConnectException( code = Code.INTERNAL_ERROR, - errorDetailParser = serializationStrategy.errorDetailParser(), message = e.message, exception = e, ), @@ -234,10 +233,11 @@ internal class ConnectInterceptor( StreamResult.Complete( cause = ConnectException( code = code, - errorDetailParser = serializationStrategy.errorDetailParser(), message = endStreamResponseJSON.error.message, - details = parseErrorDetails(endStreamResponseJSON.error), metadata = metadata.orEmpty(), + ).withErrorDetails( + serializationStrategy.errorDetailParser(), + parseErrorDetails(endStreamResponseJSON.error), ), ) } @@ -246,7 +246,7 @@ internal class ConnectInterceptor( private fun parseConnectUnaryException(httpStatus: Int?, headers: Headers, source: Buffer?): ConnectException { val code = Code.fromHTTPStatus(httpStatus) if (source == null) { - return ConnectException(code, serializationStrategy.errorDetailParser(), "unexpected status code: $httpStatus") + return ConnectException(code, "unexpected status code: $httpStatus") } return source.use { bufferedSource -> val adapter = moshi.adapter(ErrorPayloadJSON::class.java).nonNull() @@ -254,15 +254,16 @@ internal class ConnectInterceptor( val errorPayloadJSON = try { adapter.fromJson(errorJSON) } catch (e: Exception) { - return ConnectException(code, serializationStrategy.errorDetailParser(), errorJSON, e) + return ConnectException(code, errorJSON, e) } val errorDetails = parseErrorDetails(errorPayloadJSON!!) ConnectException( - code = Code.fromName(errorPayloadJSON.code), - errorDetailParser = serializationStrategy.errorDetailParser(), + code = Code.fromName(errorPayloadJSON.code, code), message = errorPayloadJSON.message, - details = errorDetails, metadata = headers, + ).withErrorDetails( + serializationStrategy.errorDetailParser(), + errorDetails, ) } } diff --git a/library/src/main/kotlin/com/connectrpc/protocols/Envelope.kt b/library/src/main/kotlin/com/connectrpc/protocols/Envelope.kt index a940413a..68601b83 100644 --- a/library/src/main/kotlin/com/connectrpc/protocols/Envelope.kt +++ b/library/src/main/kotlin/com/connectrpc/protocols/Envelope.kt @@ -67,11 +67,9 @@ class Envelope { } val headerByte = source.readByte().toInt() val length = source.readInt().toLong() - val message = if (source.size > length) { + val message = if (source.size >= length) { // extract relevant subset for this message Buffer().write(source as Source, length) - } else if (source.size == length) { - source } else { throw ConnectException( code = Code.INTERNAL_ERROR, diff --git a/library/src/main/kotlin/com/connectrpc/protocols/GRPCCompletion.kt b/library/src/main/kotlin/com/connectrpc/protocols/GRPCCompletion.kt index bce29bab..d0632b3c 100644 --- a/library/src/main/kotlin/com/connectrpc/protocols/GRPCCompletion.kt +++ b/library/src/main/kotlin/com/connectrpc/protocols/GRPCCompletion.kt @@ -46,10 +46,11 @@ internal data class GRPCCompletion( } else { ConnectException( code = code, - errorDetailParser = serializationStrategy.errorDetailParser(), message = message, - details = errorDetails, metadata = metadata, + ).withErrorDetails( + serializationStrategy.errorDetailParser(), + errorDetails, ) } } diff --git a/library/src/main/kotlin/com/connectrpc/protocols/GRPCCompletionParser.kt b/library/src/main/kotlin/com/connectrpc/protocols/GRPCCompletionParser.kt index 837a57b3..efe23a19 100644 --- a/library/src/main/kotlin/com/connectrpc/protocols/GRPCCompletionParser.kt +++ b/library/src/main/kotlin/com/connectrpc/protocols/GRPCCompletionParser.kt @@ -34,27 +34,32 @@ internal class GRPCCompletionParser( * Returns an "absent" completion if unable to be parsed. */ internal fun parse(headers: Headers, trailers: Trailers): GRPCCompletion { - val status: Int - val metadata: Map> + val statusCode: Int + val statusMetadata: Map> val statusFromHeaders = parseStatus(headers) if (statusFromHeaders == null) { - status = parseStatus(trailers) + statusCode = parseStatus(trailers) ?: return GRPCCompletion( present = false, code = Code.INTERNAL_ERROR, message = "protocol error: status is missing from trailers", metadata = trailers, ) - metadata = trailers + statusMetadata = trailers } else { - status = statusFromHeaders - metadata = headers + statusCode = statusFromHeaders + statusMetadata = headers } + // Note: we report combined headers and trailers as exception meta, so + // caller doesn't have to check both, which is particularly important + // since server could actually serialize them together in a single bucket + // for a gRPC "trailers only" response. + val exceptionMeta = headers.plus(trailers) return GRPCCompletion( - code = Code.fromValue(status), - message = parseMessage(metadata).utf8(), - errorDetails = connectErrorDetails(metadata), - metadata = metadata, + code = Code.fromValue(statusCode), + message = parseMessage(statusMetadata).utf8(), + errorDetails = connectErrorDetails(statusMetadata), + metadata = exceptionMeta, ) } diff --git a/library/src/main/kotlin/com/connectrpc/protocols/GRPCInterceptor.kt b/library/src/main/kotlin/com/connectrpc/protocols/GRPCInterceptor.kt index 7cd417b0..23e1341a 100644 --- a/library/src/main/kotlin/com/connectrpc/protocols/GRPCInterceptor.kt +++ b/library/src/main/kotlin/com/connectrpc/protocols/GRPCInterceptor.kt @@ -36,6 +36,7 @@ internal class GRPCInterceptor( private val serializationStrategy = clientConfig.serializationStrategy private val completionParser = GRPCCompletionParser(serializationStrategy.errorDetailParser()) private var responseCompressionPool: CompressionPool? = null + private var responseHeaders: Headers = emptyMap() override fun unaryFunction(): UnaryFunction { return UnaryFunction( @@ -79,15 +80,34 @@ internal class GRPCInterceptor( val exception = completionParser .parse(headers, trailers) .toConnectExceptionOrNull(serializationStrategy) - val (_, message) = if (exception == null) { + val message = if (exception == null) { + if (response.message.buffer.exhausted()) { + return@UnaryFunction response.clone( + message = Buffer(), + cause = ConnectException( + code = Code.UNIMPLEMENTED, + message = "unary stream has no messages", + ), + ) + } val compressionPool = clientConfig.compressionPool(headers[GRPC_ENCODING]?.first()) - Envelope.unpackWithHeaderByte( + val (_, buffer) = Envelope.unpackWithHeaderByte( response.message.buffer, compressionPool, ) + if (!response.message.buffer.exhausted()) { + return@UnaryFunction response.clone( + message = Buffer(), + cause = ConnectException( + code = Code.UNIMPLEMENTED, + message = "unary stream has multiple messages", + ), + ) + } + buffer } else { - 0 to Buffer() + Buffer() } response.clone( message = message, @@ -121,6 +141,7 @@ internal class GRPCInterceptor( trailers = headers, ) } else { + responseHeaders = headers responseCompressionPool = clientConfig .compressionPool(headers[GRPC_ENCODING]?.first()) StreamResult.Headers(headers) @@ -139,7 +160,7 @@ internal class GRPCInterceptor( } val trailers = result.trailers val exception = completionParser - .parse(emptyMap(), trailers) + .parse(responseHeaders, trailers) .toConnectExceptionOrNull(serializationStrategy) StreamResult.Complete( cause = exception, diff --git a/library/src/main/kotlin/com/connectrpc/protocols/GRPCWebInterceptor.kt b/library/src/main/kotlin/com/connectrpc/protocols/GRPCWebInterceptor.kt index ad063bdc..d332c964 100644 --- a/library/src/main/kotlin/com/connectrpc/protocols/GRPCWebInterceptor.kt +++ b/library/src/main/kotlin/com/connectrpc/protocols/GRPCWebInterceptor.kt @@ -41,6 +41,7 @@ internal class GRPCWebInterceptor( private val serializationStrategy = clientConfig.serializationStrategy private val completionParser = GRPCCompletionParser(serializationStrategy.errorDetailParser()) private var responseCompressionPool: CompressionPool? = null + private var responseHeaders: Headers = emptyMap() override fun unaryFunction(): UnaryFunction { return UnaryFunction( @@ -91,9 +92,16 @@ internal class GRPCWebInterceptor( // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-WEB.md if (response.message.exhausted()) { // There was no response body. Read status within the headers. - val exception = completionParser + var exception = completionParser .parse(headers, emptyMap()) .toConnectExceptionOrNull(serializationStrategy) + if (exception == null) { + // No response data and no error code? + exception = ConnectException( + code = Code.UNIMPLEMENTED, + message = "unary stream has no messages", + ) + } response.clone( message = Buffer(), cause = exception, @@ -101,28 +109,62 @@ internal class GRPCWebInterceptor( } else { // Unpack the current message and trailers. val responseBuffer = response.message.buffer - // currentMessage will contain remaining bytes unread by unpackWithHeaderByte. val (headerByte, unpacked) = Envelope.unpackWithHeaderByte( responseBuffer, compressionPool, ) // Check if the current message contains only trailers. - val trailerBuffer = if (headerByte.and(TRAILERS_BIT) == TRAILERS_BIT) { - unpacked + val (currentMessage, trailerBuffer) = if (headerByte.and(TRAILERS_BIT) == TRAILERS_BIT) { + null to unpacked + } else if (response.message.exhausted()) { + return@UnaryFunction response.clone( + message = Buffer(), + cause = ConnectException( + code = Code.INTERNAL_ERROR, + message = "response did not include an end of stream message", + ), + ) } else { // The previous chunk is the message which means this is the trailers. - val (_, trailerBuffer) = Envelope.unpackWithHeaderByte( + val (trailerHeaderByte, trailerBuffer) = Envelope.unpackWithHeaderByte( responseBuffer, compressionPool, ) - trailerBuffer + if (trailerHeaderByte.and(TRAILERS_BIT) != TRAILERS_BIT) { + // Another message instead of trailers? + return@UnaryFunction response.clone( + message = Buffer(), + cause = ConnectException( + code = Code.UNIMPLEMENTED, + message = "unary stream has multiple messages", + ), + ) + } + unpacked to trailerBuffer + } + if (!responseBuffer.exhausted()) { + // More after the trailers message? + return@UnaryFunction response.clone( + message = Buffer(), + cause = ConnectException( + code = Code.INTERNAL_ERROR, + message = "response stream contains data after end-of-stream message", + ), + ) } val finalTrailers = parseGrpcWebTrailer(trailerBuffer) - val exception = completionParser - .parse(emptyMap(), finalTrailers) + var exception = completionParser + .parse(headers, finalTrailers) .toConnectExceptionOrNull(serializationStrategy) + if (exception == null && currentMessage == null) { + // No response message, and trailers indicated no error? + exception = ConnectException( + code = Code.UNIMPLEMENTED, + message = "unary stream has multiple messages", + ) + } response.clone( - message = unpacked, + message = currentMessage ?: Buffer(), trailers = finalTrailers, cause = exception, ) @@ -155,6 +197,7 @@ internal class GRPCWebInterceptor( trailers = headers, ) } else { + responseHeaders = headers responseCompressionPool = clientConfig .compressionPool(headers[GRPC_ENCODING]?.first()) StreamResult.Headers(headers) @@ -168,7 +211,7 @@ internal class GRPCWebInterceptor( if (headerByte.and(TRAILERS_BIT) == TRAILERS_BIT) { val streamTrailers = parseGrpcWebTrailer(unpackedMessage) val exception = completionParser - .parse(emptyMap(), streamTrailers) + .parse(responseHeaders, streamTrailers) .toConnectExceptionOrNull(serializationStrategy) StreamResult.Complete( cause = exception, diff --git a/library/src/test/kotlin/com/connectrpc/ConnectExceptionTest.kt b/library/src/test/kotlin/com/connectrpc/ConnectExceptionTest.kt index bbdb9cb5..16dd83e3 100644 --- a/library/src/test/kotlin/com/connectrpc/ConnectExceptionTest.kt +++ b/library/src/test/kotlin/com/connectrpc/ConnectExceptionTest.kt @@ -32,30 +32,14 @@ class ConnectExceptionTest { whenever(errorDetailParser.unpack(errorDetail.pb, String::class)).thenReturn("unpacked_value") val connectException = ConnectException( code = Code.UNKNOWN, + ).withErrorDetails( + errorParser = errorDetailParser, details = listOf( errorDetail, errorDetail, ), - errorDetailParser = errorDetailParser, ) val parsedResult = connectException.unpackedDetails(String::class) assertThat(parsedResult).contains("unpacked_value", "unpacked_value") } - - @Test - fun completionParsingUnset() { - val errorDetail = ConnectErrorDetail( - "type", - "value".encodeUtf8(), - ) - whenever(errorDetailParser.unpack(errorDetail.pb, String::class)).thenReturn("unpacked_value") - val connectException = ConnectException( - code = Code.UNKNOWN, - details = listOf( - errorDetail, - ), - ) - val parsedResult = connectException.unpackedDetails(String::class) - assertThat(parsedResult).isEmpty() - } } diff --git a/library/src/test/kotlin/com/connectrpc/protocols/GRPCWebInterceptorTest.kt b/library/src/test/kotlin/com/connectrpc/protocols/GRPCWebInterceptorTest.kt index f508f830..ecb8df0b 100644 --- a/library/src/test/kotlin/com/connectrpc/protocols/GRPCWebInterceptorTest.kt +++ b/library/src/test/kotlin/com/connectrpc/protocols/GRPCWebInterceptorTest.kt @@ -174,12 +174,17 @@ class GRPCWebInterceptorTest { val grpcWebInterceptor = GRPCWebInterceptor(config) val unaryFunction = grpcWebInterceptor.unaryFunction() - val envelopedMessage = Envelope.pack(Buffer().write("message".encodeUtf8()), GzipCompressionPool, 0) + val responseBody = Envelope.pack(Buffer().write("message".encodeUtf8()), GzipCompressionPool, 0) + // And add end-stream message w/ trailers, too + val endStreamMessageContents = "grpc-status: 0\r\n".encodeUtf8() + responseBody.writeByte(GRPCWebInterceptor.TRAILERS_BIT) + responseBody.writeInt(endStreamMessageContents.size) + responseBody.write(endStreamMessageContents) val response = unaryFunction.responseFunction( HTTPResponse( status = 200, - headers = mapOf("grpc-encoding" to listOf("gzip")), - message = envelopedMessage, + headers = mapOf(GRPC_ENCODING to listOf("gzip")), + message = responseBody, trailers = emptyMap(), ), ) @@ -196,12 +201,17 @@ class GRPCWebInterceptorTest { val grpcWebInterceptor = GRPCWebInterceptor(config) val unaryFunction = grpcWebInterceptor.unaryFunction() - val envelopedMessage = Envelope.pack(Buffer().write("message".encodeUtf8()), GzipCompressionPool, 1) + val responseBody = Envelope.pack(Buffer().write("message".encodeUtf8()), GzipCompressionPool, 1) + // And add end-stream message w/ trailers, too + val endStreamMessageContents = "grpc-status: 0\r\n".encodeUtf8() + responseBody.writeByte(GRPCWebInterceptor.TRAILERS_BIT) + responseBody.writeInt(endStreamMessageContents.size) + responseBody.write(endStreamMessageContents) val response = unaryFunction.responseFunction( HTTPResponse( status = 200, headers = mapOf(GRPC_ENCODING to listOf(GzipCompressionPool.name())), - message = envelopedMessage, + message = responseBody, trailers = emptyMap(), ), )