Skip to content

Commit

Permalink
Handle grpc-status in headers rather than only trailers (#174)
Browse files Browse the repository at this point in the history
"Trailers-only" responses can actually be just headers, in which case we
should read `grpc-status` from them.

This PR:
- Updates the logic for both unary and streaming APIs to properly handle
cases where "trailers-only" responses are returned using the headers
block
- Updates the conformance test suite to test against grpc-go in addition
to connect-go in order to catch this case. (grpc-go returns
"trailers-only" responses in headers, and connect-go returns them in
trailers)

Here's the new logic for gRPC:
- If non-HTTP 200, report an invalid response
- If no body data, assume "trailers-only"
  - Look for the gRPC status and error details in the headers
  - Look for the gRPC status and error details in the trailers
  - If the status is an error, don't process the body

Resolves #168.
  • Loading branch information
rebello95 committed Sep 15, 2023
1 parent e7943ca commit 2146cff
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 54 deletions.
47 changes: 28 additions & 19 deletions Libraries/ConnectNIO/Internal/GRPCInterceptor.swift
Expand Up @@ -49,16 +49,16 @@ extension GRPCInterceptor: Connect.Interceptor {
return response
}

guard let responseData = response.message, !responseData.isEmpty else {
let code = response.trailers.grpcStatus() ?? response.code
let (grpcCode, connectError) = self.grpcResult(
fromHeaders: response.headers, trailers: response.trailers
)
guard grpcCode == .ok, let rawData = response.message, !rawData.isEmpty else {
return Connect.HTTPResponse(
code: code,
code: grpcCode,
headers: response.headers,
message: response.message,
trailers: response.trailers,
error: response.error ?? ConnectError.fromGRPCTrailers(
response.trailers, code: code
),
error: connectError ?? response.error,
tracingInfo: response.tracingInfo
)
}
Expand All @@ -69,18 +69,14 @@ extension GRPCInterceptor: Connect.Interceptor {
.flatMap { self.config.responseCompressionPool(forName: $0) }
do {
let messageData = try Connect.Envelope.unpackMessage(
responseData,
compressionPool: compressionPool
rawData, compressionPool: compressionPool
).unpacked
let grpcCode = response.trailers.grpcStatus() ?? .unknown
return Connect.HTTPResponse(
code: grpcCode,
headers: response.headers,
message: messageData,
trailers: response.trailers,
error: grpcCode == .ok
? nil
: ConnectError.fromGRPCTrailers(response.trailers, code: grpcCode),
error: nil,
tracingInfo: response.tracingInfo
)
} catch let error {
Expand Down Expand Up @@ -120,14 +116,13 @@ extension GRPCInterceptor: Connect.Interceptor {
responseHeaders.value = headers
return result

case .message(let data):
case .message(let rawData):
do {
let responseCompressionPool = responseHeaders.value?[
Connect.HeaderConstants.grpcContentEncoding
]?.first.flatMap { self.config.responseCompressionPool(forName: $0) }
return .message(try Connect.Envelope.unpackMessage(
data,
compressionPool: responseCompressionPool
rawData, compressionPool: responseCompressionPool
).unpacked)
} catch let error {
// TODO: Close the stream here?
Expand All @@ -140,7 +135,9 @@ extension GRPCInterceptor: Connect.Interceptor {
return .complete(code: code, error: error, trailers: trailers)
}

let grpcCode = trailers?.grpcStatus() ?? .unknown
let (grpcCode, connectError) = self.grpcResult(
fromHeaders: responseHeaders.value, trailers: trailers
)
if grpcCode == .ok {
return .complete(
code: .ok,
Expand All @@ -150,14 +147,26 @@ extension GRPCInterceptor: Connect.Interceptor {
} else {
return .complete(
code: grpcCode,
error: trailers
.map { ConnectError.fromGRPCTrailers($0, code: grpcCode) }
?? error,
error: connectError ?? error,
trailers: trailers
)
}
}
}
)
}

private func grpcResult(
fromHeaders headers: Headers?, trailers: Trailers?
) -> (code: Code, error: ConnectError?) {
// "Trailers-only" responses can be sent in the headers or trailers block.
// Check for a valid gRPC status in the headers first, then in the trailers.
if let headers = headers, let grpcCode = headers.grpcStatus() {
return (grpcCode, .fromGRPCTrailers(headers, code: grpcCode))
} else if let trailers = trailers, let grpcCode = trailers.grpcStatus() {
return (grpcCode, .fromGRPCTrailers(trailers, code: grpcCode))
} else {
return (.unknown, nil)
}
}
}
Expand Up @@ -274,20 +274,32 @@ final class AsyncAwaitConformance: XCTestCase {
}
}

func testUnimplementedMethod() async {
await self.executeTestWithClients { client in
func testUnimplementedMethod() async throws {
let validErrorMessages = [
// connect-go
"connectrpc.conformance.v1.TestService.UnimplementedCall is not implemented",
// grpc-go
"method UnimplementedCall not implemented",
]
try await self.executeTestWithClients { client in
let response = await client.unimplementedCall(
request: SwiftProtobuf.Google_Protobuf_Empty()
)
XCTAssertEqual(response.code, .unimplemented)
XCTAssertEqual(
response.error?.message,
"connectrpc.conformance.v1.TestService.UnimplementedCall is not implemented"
)
XCTAssertTrue(validErrorMessages.contains(try XCTUnwrap(response.error?.message)))
}
}

func testUnimplementedServerStreamingMethod() async throws {
let validErrorMessages = [
// connect-go
"""
connectrpc.conformance.v1.TestService.UnimplementedStreamingOutputCall is \
not implemented
""",
// grpc-go
"method UnimplementedStreamingOutputCall not implemented",
]
try await self.executeTestWithClients { client in
let expectation = self.expectation(description: "Stream completes")
let stream = client.unimplementedStreamingOutputCall()
Expand All @@ -299,13 +311,9 @@ final class AsyncAwaitConformance: XCTestCase {

case .complete(let code, let error, _):
XCTAssertEqual(code, .unimplemented)
XCTAssertEqual(
(error as? ConnectError)?.message,
"""
connectrpc.conformance.v1.TestService.UnimplementedStreamingOutputCall is \
not implemented
"""
)
XCTAssertTrue(validErrorMessages.contains(
try XCTUnwrap((error as? ConnectError)?.message)
))
expectation.fulfill()
}
}
Expand Down
Expand Up @@ -301,14 +301,17 @@ final class CallbackConformance: XCTestCase {
}

func testUnimplementedMethod() {
let validErrorMessages = [
// connect-go
"connectrpc.conformance.v1.TestService.UnimplementedCall is not implemented",
// grpc-go
"method UnimplementedCall not implemented",
]
self.executeTestWithClients { client in
let expectation = self.expectation(description: "Request completes")
client.unimplementedCall(request: SwiftProtobuf.Google_Protobuf_Empty()) { response in
XCTAssertEqual(response.code, .unimplemented)
XCTAssertEqual(
response.error?.message,
"connectrpc.conformance.v1.TestService.UnimplementedCall is not implemented"
)
XCTAssertTrue(validErrorMessages.contains(response.error?.message ?? ""))
expectation.fulfill()
}

Expand All @@ -317,6 +320,15 @@ final class CallbackConformance: XCTestCase {
}

func testUnimplementedServerStreamingMethod() throws {
let validErrorMessages = [
// connect-go
"""
connectrpc.conformance.v1.TestService.UnimplementedStreamingOutputCall is \
not implemented
""",
// grpc-go
"method UnimplementedStreamingOutputCall not implemented",
]
try self.executeTestWithClients { client in
let expectation = self.expectation(description: "Stream completes")
let stream = client.unimplementedStreamingOutputCall { result in
Expand All @@ -326,13 +338,9 @@ final class CallbackConformance: XCTestCase {

case .complete(let code, let error, _):
XCTAssertEqual(code, .unimplemented)
XCTAssertEqual(
(error as? ConnectError)?.message,
"""
connectrpc.conformance.v1.TestService.UnimplementedStreamingOutputCall is \
not implemented
"""
)
XCTAssertTrue(validErrorMessages.contains(
(error as? ConnectError)?.message ?? ""
))
expectation.fulfill()
}
}
Expand Down
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// swiftlint:disable number_separator

import Connect
import ConnectNIO
import Foundation
Expand All @@ -34,27 +36,39 @@ final class ConformanceConfiguration {
/// - returns: A list of configurations to use for conformance tests.
static func all(timeout: TimeInterval) -> [ConformanceConfiguration] {
let urlSessionClient = ConformanceURLSessionHTTPClient(timeout: timeout)
let nioClient = ConformanceNIOHTTPClient(
// swiftlint:disable:next number_separator
let nioClient8081 = ConformanceNIOHTTPClient(
host: "https://localhost", port: 8081, timeout: timeout
)
let matrix: [(networkProtocol: NetworkProtocol, httpClients: [HTTPClientInterface])] = [
(.connect, [urlSessionClient, nioClient]),
(.grpcWeb, [urlSessionClient, nioClient]),
(.grpc, [nioClient]), // URLSession client does not support gRPC
let nioClient8083 = ConformanceNIOHTTPClient(
host: "https://localhost", port: 8083, timeout: timeout
)
// swiftlint:disable:next large_tuple
let matrix: [(
networkProtocol: NetworkProtocol,
httpClients: [HTTPClientInterface],
codecs: [Codec],
port: Int
)] = [
// Port 8081 is used to test against connect-go
(.connect, [urlSessionClient, nioClient8081], [JSONCodec(), ProtoCodec()], 8081),
(.grpcWeb, [urlSessionClient, nioClient8081], [JSONCodec(), ProtoCodec()], 8081),
// URLSession does not support gRPC
(.grpc, [nioClient8081], [JSONCodec(), ProtoCodec()], 8081),
// gRPC should also be tested against grpc-go, which runs on port 8083
(.grpc, [nioClient8083], [ProtoCodec()], 8083),
]
let codecs: [Codec] = [JSONCodec(), ProtoCodec()]
return matrix.reduce(into: []) { configurations, tuple in
return matrix.reduce(into: [ConformanceConfiguration]()) { configurations, tuple in
for httpClient in tuple.httpClients {
for codec in codecs {
for codec in tuple.codecs {
configurations.append(.init(
description: """
\(tuple.networkProtocol) + \(type(of: codec)) via \(type(of: httpClient))
\(tuple.networkProtocol) + \(type(of: codec)) via \(type(of: httpClient)) \
on port \(tuple.port)
""",
protocolClient: ProtocolClient(
httpClient: httpClient,
config: ProtocolClientConfig(
host: "https://localhost:8081",
host: "https://localhost:\(tuple.port)",
networkProtocol: tuple.networkProtocol,
codec: codec,
requestCompression: .init(minBytes: 10, pool: GzipCompressionPool())
Expand Down

0 comments on commit 2146cff

Please sign in to comment.