Skip to content

Commit

Permalink
Combine headers + trailers into ConnectError.metadata (#239)
Browse files Browse the repository at this point in the history
As described by @jhump:

> Currently, users may need to check both headers and trailers when
looking for metadata in an error response for an operation with a unary
reply (unary and client-stream RPCs). The reason they may have to look
in two places is because where the metadata shows up isn't
straight-forward - it depends on if the protocol being used and, if gRPC
or gRPC-Web, whether the server used a trailers-only response or not. In
a trailers-only response, all metadata would show up in the same bag of
metadata (which are technically HTTP response headers, but may be
classified as "trailers" since they include the status and there will be
no subsequent HTTP response trailers in the reply).

This PR updates `ConnectError`'s initialization to combine both headers
+ trailers into its `metadata` so that it's easier for consumers to
query for the keys they need. This also matches connect-go and
connect-es.
  • Loading branch information
rebello95 committed Feb 1, 2024
1 parent 7350355 commit 1701d3d
Show file tree
Hide file tree
Showing 9 changed files with 111 additions and 56 deletions.
Expand Up @@ -176,7 +176,8 @@ extension ConnectInterceptor: StreamInterceptor {
code: code,
error: ConnectError.from(
code: code,
headers: self.streamResponseHeaders.value ?? [:],
headers: self.streamResponseHeaders.value,
trailers: trailers,
source: nil
),
trailers: trailers
Expand Down
27 changes: 16 additions & 11 deletions Libraries/Connect/Internal/Interceptors/GRPCWebInterceptor.swift
Expand Up @@ -57,13 +57,16 @@ extension GRPCWebInterceptor: UnaryInterceptor {
}

guard let responseData = response.message, !responseData.isEmpty else {
let code = response.headers.grpcStatus() ?? response.code
let (grpcCode, connectError) = ConnectError.parseGRPCHeaders(
response.headers,
trailers: response.trailers
)
proceed(HTTPResponse(
code: code,
code: grpcCode,
headers: response.headers,
message: response.message,
trailers: response.trailers,
error: ConnectError.fromGRPCTrailers(response.headers, code: code),
error: connectError,
tracingInfo: response.tracingInfo
))
return
Expand Down Expand Up @@ -147,7 +150,7 @@ extension GRPCWebInterceptor: StreamInterceptor {
// Headers-only response.
proceed(.complete(
code: grpcCode,
error: ConnectError.fromGRPCTrailers(headers, code: grpcCode),
error: ConnectError.parseGRPCHeaders(nil, trailers: headers).error,
trailers: headers
))
} else {
Expand All @@ -166,13 +169,15 @@ extension GRPCWebInterceptor: StreamInterceptor {
let isTrailers = 0b10000000 & headerByte != 0
if isTrailers {
let trailers = try Trailers.fromGRPCHeadersBlock(unpackedData)
let grpcCode = trailers.grpcStatus() ?? .unknown
let (grpcCode, error) = ConnectError.parseGRPCHeaders(
self.streamResponseHeaders.value, trailers: trailers
)
if grpcCode == .ok {
proceed(.complete(code: .ok, error: nil, trailers: trailers))
} else {
proceed(.complete(
code: grpcCode,
error: ConnectError.fromGRPCTrailers(trailers, code: grpcCode),
error: error,
trailers: trailers
))
}
Expand Down Expand Up @@ -222,10 +227,10 @@ private extension Trailers {

private extension HTTPResponse {
func withHandledGRPCWebTrailers(_ trailers: Trailers, message: Data?) -> Self {
let grpcStatus = trailers.grpcStatus() ?? .unknown
if grpcStatus == .ok {
let (grpcCode, error) = ConnectError.parseGRPCHeaders(self.headers, trailers: trailers)
if grpcCode == .ok {
return HTTPResponse(
code: grpcStatus,
code: grpcCode,
headers: self.headers,
message: message,
trailers: trailers,
Expand All @@ -234,11 +239,11 @@ private extension HTTPResponse {
)
} else {
return HTTPResponse(
code: grpcStatus,
code: grpcCode,
headers: self.headers,
message: message,
trailers: trailers,
error: ConnectError.fromGRPCTrailers(trailers, code: grpcStatus),
error: error,
tracingInfo: self.tracingInfo
)
}
Expand Down
41 changes: 28 additions & 13 deletions Libraries/Connect/PackageInternal/ConnectError+GRPC.swift
Expand Up @@ -18,24 +18,39 @@ extension ConnectError {
/// This should not be considered part of Connect's public/stable interface, and is subject
/// to change. When the compiler supports it, this should be package-internal.
///
/// Creates an error using gRPC trailers.
/// Parses gRPC headers and/or trailers to obtain the status and any potential error.
///
/// - parameter trailers: The trailers (or headers, for gRPC-Web) from which to parse the error.
/// - parameter code: The status code received from the server.
/// - parameter headers: Headers received from the server.
/// - parameter trailers: Trailers received from the server. Note that this could be trailers
/// passed in the headers block for gRPC-Web.
///
/// - returns: An error, if the status indicated an error.
public static func fromGRPCTrailers(_ trailers: Trailers, code: Code) -> Self? {
if code == .ok {
return nil
/// - returns: A tuple containing the gRPC status code and an optional error.
public static func parseGRPCHeaders(
_ headers: Headers?, trailers: Trailers?
) -> (grpcCode: Code, error: ConnectError?) {
// "Trailers-only" responses can be sent in the headers or trailers block.
// Check for a valid gRPC status in the headers first, then in the trailers.
guard let grpcCode = headers?.grpcStatus() ?? trailers?.grpcStatus() else {
return (.unknown, ConnectError(
code: .unknown, message: "RPC response missing status", exception: nil,
details: [], metadata: [:]
))
}

return .init(
code: code,
message: trailers.grpcMessage(),
if grpcCode == .ok {
return (.ok, nil)
}

// Combine headers + trailers into metadata to make error parsing easier for consumers,
// since gRPC can include error information in either headers or trailers.
let metadata = (headers ?? [:]).merging(trailers ?? [:]) { $1 }
return (grpcCode, .init(
code: grpcCode,
message: metadata.grpcMessage(),
exception: nil,
details: trailers.connectErrorDetailsFromGRPC(),
metadata: trailers
)
details: metadata.connectErrorDetailsFromGRPC(),
metadata: metadata
))
}
}

Expand Down
Expand Up @@ -428,6 +428,7 @@ private extension ResponseMessage where Output: ProtobufMessage {
?? ConnectError.from(
code: response.code,
headers: response.headers,
trailers: response.trailers,
source: response.message
)
self.init(
Expand Down
22 changes: 15 additions & 7 deletions Libraries/Connect/Public/Interfaces/ConnectError.swift
Expand Up @@ -110,26 +110,34 @@ extension ConnectError: Swift.Decodable {
}

extension ConnectError {
public static func from(code: Code, headers: Headers, source: Data?) -> Self {
let headers = headers.reduce(into: Headers(), { headers, current in
headers[current.key.lowercased()] = current.value
})
public static func from(
code: Code, headers: Headers?, trailers: Trailers?, source: Data?
) -> Self {
// Combine headers + trailers into metadata to make error parsing easier for consumers,
// since gRPC can include error information in either headers or trailers.
var metadata = Headers()
for (headerName, headerValue) in headers ?? [:] {
metadata[headerName.lowercased()] = headerValue
}
for (trailerName, trailerValue) in trailers ?? [:] {
metadata[trailerName.lowercased()] = trailerValue
}

guard let source = source else {
return .init(
code: code, message: "empty error message from source", exception: nil,
details: [], metadata: headers
details: [], metadata: metadata
)
}

do {
var connectError = try Foundation.JSONDecoder().decode(ConnectError.self, from: source)
connectError.metadata = headers
connectError.metadata = metadata
return connectError
} catch let error {
return .init(
code: code, message: String(data: source, encoding: .utf8),
exception: error, details: [], metadata: headers
exception: error, details: [], metadata: metadata
)
}
}
Expand Down
24 changes: 6 additions & 18 deletions Libraries/ConnectNIO/Internal/GRPCInterceptor.swift
Expand Up @@ -59,8 +59,9 @@ extension GRPCInterceptor: UnaryInterceptor {
return
}

let (grpcCode, connectError) = self.grpcResult(
fromHeaders: response.headers, trailers: response.trailers
let (grpcCode, connectError) = ConnectError.parseGRPCHeaders(
response.headers,
trailers: response.trailers
)
guard grpcCode == .ok, let rawData = response.message, !rawData.isEmpty else {
proceed(HTTPResponse(
Expand Down Expand Up @@ -154,8 +155,9 @@ extension GRPCInterceptor: StreamInterceptor {
return
}

let (grpcCode, connectError) = self.grpcResult(
fromHeaders: self.streamResponseHeaders.value, trailers: trailers
let (grpcCode, connectError) = ConnectError.parseGRPCHeaders(
self.streamResponseHeaders.value,
trailers: trailers
)
if grpcCode == .ok {
proceed(.complete(
Expand All @@ -172,20 +174,6 @@ extension GRPCInterceptor: StreamInterceptor {
}
}
}

private func grpcResult(
fromHeaders headers: Headers?, trailers: Trailers?
) -> (code: Code, error: ConnectError?) {
// "Trailers-only" responses can be sent in the headers or trailers block.
// Check for a valid gRPC status in the headers first, then in the trailers.
if let headers = headers, let grpcCode = headers.grpcStatus() {
return (grpcCode, .fromGRPCTrailers(headers, code: grpcCode))
} else if let trailers = trailers, let grpcCode = trailers.grpcStatus() {
return (grpcCode, .fromGRPCTrailers(trailers, code: grpcCode))
} else {
return (.unknown, nil)
}
}
}

private final class Locked<T>: @unchecked Sendable {
Expand Down
Expand Up @@ -54,6 +54,7 @@ final class ConnectErrorTests: XCTestCase {
XCTAssertEqual(error.unpackedDetails(), [expectedDetails1, expectedDetails2])
XCTAssertTrue(error.metadata.isEmpty)
}

func testDeserializingErrorUsingHelperFunctionLowercasesHeaderKeys() throws {
let expectedDetails = Connectrpc_Conformance_V1_RawHTTPRequest.with { $0.uri = "a/b/c" }
let errorData = try self.errorData(expectedDetails: [expectedDetails])
Expand All @@ -63,6 +64,7 @@ final class ConnectErrorTests: XCTestCase {
"sOmEkEy": ["foo"],
"otherKey1": ["BAR", "bAz"],
],
trailers: nil,
source: errorData
)
XCTAssertEqual(error.code, .unavailable) // Respects the code from the error body
Expand All @@ -73,6 +75,33 @@ final class ConnectErrorTests: XCTestCase {
XCTAssertEqual(error.metadata, ["somekey": ["foo"], "otherkey1": ["BAR", "bAz"]])
}

func testDeserializingErrorUsingHelperFunctionCombinesHeadersAndTrailers() throws {
let expectedDetails = Connectrpc_Conformance_V1_RawHTTPRequest.with { $0.uri = "a/b/c" }
let errorData = try self.errorData(expectedDetails: [expectedDetails])
let error = ConnectError.from(
code: .aborted,
headers: [
"duPlIcaTedKey": ["headers"],
"otherKey1": ["BAR", "bAz"],
],
trailers: [
"duPlIcaTedKey": ["trailers"],
"anOthErKey": ["foo"],
],
source: errorData
)
XCTAssertEqual(error.code, .unavailable) // Respects the code from the error body
XCTAssertEqual(error.message, "overloaded: back off and retry")
XCTAssertNil(error.exception)
XCTAssertEqual(error.details.count, 1)
XCTAssertEqual(error.unpackedDetails(), [expectedDetails])
XCTAssertEqual(error.metadata, [
"duplicatedkey": ["trailers"],
"otherkey1": ["BAR", "bAz"],
"anotherkey": ["foo"],
])
}

func testDeserializingSimpleError() throws {
let errorDictionary = [
"code": "unavailable",
Expand Down
Expand Up @@ -104,7 +104,9 @@ final class InterceptorChainIterationTests: XCTestCase {
chain.executeInterceptorsAndStopOnFailure(
[
{ _, proceed in
proceed(.failure(.from(code: .unknown, headers: Headers(), source: nil)))
proceed(.failure(.from(
code: .unknown, headers: nil, trailers: nil, source: nil
)))
},
{ value, proceed in proceed(.success(value + "b")) },
],
Expand Down Expand Up @@ -141,7 +143,9 @@ final class InterceptorChainIterationTests: XCTestCase {
chain.executeLinkedInterceptorsAndStopOnFailure(
[
{ _, proceed in
proceed(.failure(.from(code: .unknown, headers: Headers(), source: nil)))
proceed(.failure(.from(
code: .unknown, headers: nil, trailers: nil, source: nil
)))
},
{ value, proceed in proceed(.success(value + "2")) },
],
Expand All @@ -168,7 +172,9 @@ final class InterceptorChainIterationTests: XCTestCase {
firstInFirstOut: true,
initial: "",
transform: { _, proceed in
proceed(.failure(.from(code: .unknown, headers: Headers(), source: nil)))
proceed(.failure(.from(
code: .unknown, headers: nil, trailers: nil, source: nil
)))
},
then: [
{ value, proceed in proceed(.success(value + 1)) },
Expand All @@ -192,7 +198,9 @@ final class InterceptorChainIterationTests: XCTestCase {
transform: { value1, proceed in proceed(.success(Int(value1)!)) },
then: [
{ _, proceed in
proceed(.failure(.from(code: .unknown, headers: Headers(), source: nil)))
proceed(.failure(.from(
code: .unknown, headers: nil, trailers: nil, source: nil
)))
},
{ value, proceed in proceed(.success(value + 3)) },
],
Expand Down
Expand Up @@ -374,7 +374,7 @@ extension StepTrackingInterceptor: UnaryInterceptor {
) {
self.trackStep(.unaryRequest(id: self.id))
if self.failOutboundRequests {
proceed(.failure(.from(code: .aborted, headers: Headers(), source: nil)))
proceed(.failure(.from(code: .aborted, headers: nil, trailers: nil, source: nil)))
} else if self.requestDelay != .never {
DispatchQueue.global().asyncAfter(deadline: .now() + self.requestDelay) {
proceed(.success(request))
Expand Down Expand Up @@ -420,7 +420,7 @@ extension StepTrackingInterceptor: StreamInterceptor {
) {
self.trackStep(.streamStart(id: self.id))
if self.failOutboundRequests {
proceed(.failure(.from(code: .aborted, headers: Headers(), source: nil)))
proceed(.failure(.from(code: .aborted, headers: nil, trailers: nil, source: nil)))
} else if self.requestDelay != .never {
DispatchQueue.global().asyncAfter(deadline: .now() + self.requestDelay) {
proceed(.success(request))
Expand Down

0 comments on commit 1701d3d

Please sign in to comment.