From c7577fa253d35c60bbae6bce6ff1cf06f5d60ed0 Mon Sep 17 00:00:00 2001 From: George Barnett Date: Thu, 20 Nov 2025 17:26:08 +0000 Subject: [PATCH 1/3] Fix a number of OTel tracing issues Motivation: The OTel tracing interceptors had a number of issues. The most important was that spans were ended too early in their lifecycle. Others included not always setting the span status or grpc status code attribute. Modifications: - Start and end spans manually - Replace the hooked async sequence with custom sequnces for server request messages and client response messages. This allows the atomic to be dropped in favour of a simple integer stored on the iterator. - Replace the hooked async writer with a more targeted tracing writer Result: Tracing interceptors are more reliable --- .../HookedAsyncSequence.swift | 81 ------ .../HookedWriter.swift | 41 --- .../ClientOTelTracingInterceptor.swift | 238 ++++++++++-------- .../ServerOTelTracingInterceptor.swift | 197 +++++++++------ .../Tracing/Tracing+RPC.swift | 96 +++++++ .../GRPCOTelTracingInterceptorsTests.swift | 36 ++- .../OTelTracingIntegrationTests.swift | 9 +- 7 files changed, 370 insertions(+), 328 deletions(-) delete mode 100644 Sources/GRPCOTelTracingInterceptors/HookedAsyncSequence.swift delete mode 100644 Sources/GRPCOTelTracingInterceptors/HookedWriter.swift create mode 100644 Sources/GRPCOTelTracingInterceptors/Tracing/Tracing+RPC.swift diff --git a/Sources/GRPCOTelTracingInterceptors/HookedAsyncSequence.swift b/Sources/GRPCOTelTracingInterceptors/HookedAsyncSequence.swift deleted file mode 100644 index ed7ae43..0000000 --- a/Sources/GRPCOTelTracingInterceptors/HookedAsyncSequence.swift +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Copyright 2025, gRPC Authors All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -@available(gRPCSwiftExtras 2.0, *) -internal struct HookedRPCAsyncSequence: AsyncSequence, Sendable -where Wrapped.Element: Sendable { - private let wrapped: Wrapped - - private let forEachElement: @Sendable (Wrapped.Element) -> Void - private let onFinish: @Sendable ((any Error)?) -> Void - - init( - wrapping sequence: Wrapped, - forEachElement: @escaping @Sendable (Wrapped.Element) -> Void, - onFinish: @escaping @Sendable ((any Error)?) -> Void - ) { - self.wrapped = sequence - self.forEachElement = forEachElement - self.onFinish = onFinish - } - - func makeAsyncIterator() -> HookedAsyncIterator { - HookedAsyncIterator( - self.wrapped, - forEachElement: self.forEachElement, - onFinish: self.onFinish - ) - } - - struct HookedAsyncIterator: AsyncIteratorProtocol { - typealias Element = Wrapped.Element - - private var wrapped: Wrapped.AsyncIterator - private let forEachElement: @Sendable (Wrapped.Element) -> Void - private let onFinish: @Sendable ((any Error)?) -> Void - - init( - _ sequence: Wrapped, - forEachElement: @escaping @Sendable (Wrapped.Element) -> Void, - onFinish: @escaping @Sendable ((any Error)?) -> Void - ) { - self.wrapped = sequence.makeAsyncIterator() - self.forEachElement = forEachElement - self.onFinish = onFinish - } - - mutating func next( - isolation actor: isolated (any Actor)? - ) async throws(Wrapped.Failure) -> Wrapped.Element? { - do { - if let element = try await self.wrapped.next(isolation: actor) { - self.forEachElement(element) - return element - } else { - self.onFinish(nil) - return nil - } - } catch { - self.onFinish(error) - throw error - } - } - - mutating func next() async throws -> Wrapped.Element? { - try await self.next(isolation: nil) - } - } -} diff --git a/Sources/GRPCOTelTracingInterceptors/HookedWriter.swift b/Sources/GRPCOTelTracingInterceptors/HookedWriter.swift deleted file mode 100644 index 63dd80c..0000000 --- a/Sources/GRPCOTelTracingInterceptors/HookedWriter.swift +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright 2024, gRPC Authors All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -internal import GRPCCore -internal import Tracing - -@available(gRPCSwiftExtras 2.0, *) -struct HookedWriter: RPCWriterProtocol { - private let writer: any RPCWriterProtocol - private let afterEachWrite: @Sendable () -> Void - - init( - wrapping other: some RPCWriterProtocol, - afterEachWrite: @Sendable @escaping () -> Void - ) { - self.writer = other - self.afterEachWrite = afterEachWrite - } - - func write(_ element: Element) async throws { - try await self.writer.write(element) - self.afterEachWrite() - } - - func write(contentsOf elements: some Sequence) async throws { - try await self.writer.write(contentsOf: elements) - self.afterEachWrite() - } -} diff --git a/Sources/GRPCOTelTracingInterceptors/Tracing/ClientOTelTracingInterceptor.swift b/Sources/GRPCOTelTracingInterceptors/Tracing/ClientOTelTracingInterceptor.swift index ef6b546..b6f1c48 100644 --- a/Sources/GRPCOTelTracingInterceptors/Tracing/ClientOTelTracingInterceptor.swift +++ b/Sources/GRPCOTelTracingInterceptors/Tracing/ClientOTelTracingInterceptor.swift @@ -146,117 +146,59 @@ public struct ClientOTelTracingInterceptor: ClientInterceptor { ) async throws -> StreamingClientResponse ) async throws -> StreamingClientResponse where Input: Sendable, Output: Sendable { var request = request - let serviceContext = ServiceContext.current ?? .topLevel - - return try await tracer.withSpan( - context.descriptor.fullyQualifiedMethod, - context: serviceContext, - ofKind: .client - ) { span in - span.setOTelClientSpanGRPCAttributes( - context: context, - serverHostname: self.serverHostname, - networkTransportMethod: self.networkTransportMethod - ) + let span = tracer.startSpan(context.descriptor.fullyQualifiedMethod, ofKind: .client) - if self.includeRequestMetadata { - span.setMetadataStringAttributesAsRequestSpanAttributes(request.metadata) - } + span.setOTelClientSpanGRPCAttributes( + context: context, + serverHostname: self.serverHostname, + networkTransportMethod: self.networkTransportMethod + ) - tracer.inject( - serviceContext, - into: &request.metadata, - using: self.injector - ) + if self.includeRequestMetadata { + span.setMetadataStringAttributesAsRequestSpanAttributes(request.metadata) + } - if self.traceEachMessage { - let wrappedProducer = request.producer - request.producer = { writer in - let messageSentCounter = Atomic(1) - let eventEmittingWriter = HookedWriter( - wrapping: writer, - afterEachWrite: { - var event = SpanEvent(name: "rpc.message") - event.attributes[GRPCTracingKeys.rpcMessageType] = "SENT" - event.attributes[GRPCTracingKeys.rpcMessageID] = - messageSentCounter - .wrappingAdd(1, ordering: .sequentiallyConsistent) - .oldValue - span.addEvent(event) - } - ) - try await wrappedProducer(RPCWriter(wrapping: eventEmittingWriter)) - } + tracer.inject(span.context, into: &request.metadata, using: self.injector) + + if self.traceEachMessage { + let originalProducer = request.producer + request.producer = { writer in + let tracingWriter = TracedMessageWriter(wrapping: writer, span: span) + return try await originalProducer(RPCWriter(wrapping: tracingWriter)) } + } - var response = try await next(request, context) + var response: StreamingClientResponse - if self.includeResponseMetadata { - span.setMetadataStringAttributesAsResponseSpanAttributes(response.metadata) + do { + response = try await ServiceContext.$current.withValue(span.context) { + try await next(request, context) } + } catch { + span.endRPC(withError: error) + throw error + } - switch response.accepted { - case .success(var success): - let hookedSequence: - HookedRPCAsyncSequence< - RPCAsyncSequence.Contents.BodyPart, any Error> - > - if self.traceEachMessage { - let messageReceivedCounter = Atomic(1) - hookedSequence = HookedRPCAsyncSequence(wrapping: success.bodyParts) { part in - switch part { - case .message: - var event = SpanEvent(name: "rpc.message") - event.attributes[GRPCTracingKeys.rpcMessageType] = "RECEIVED" - event.attributes[GRPCTracingKeys.rpcMessageID] = - messageReceivedCounter - .wrappingAdd(1, ordering: .sequentiallyConsistent) - .oldValue - span.addEvent(event) - - case .trailingMetadata(let trailingMetadata): - if self.includeResponseMetadata { - span.setMetadataStringAttributesAsResponseSpanAttributes(trailingMetadata) - } - } - } onFinish: { error in - if let error { - if let errorCode = error.grpcErrorCode { - span.attributes[GRPCTracingKeys.grpcStatusCode] = errorCode.rawValue - } - span.setStatus(SpanStatus(code: .error)) - span.recordError(error) - } else { - span.attributes[GRPCTracingKeys.grpcStatusCode] = 0 - } - } - } else { - hookedSequence = HookedRPCAsyncSequence(wrapping: success.bodyParts) { _ in - // Nothing to do if traceEachMessage is false - } onFinish: { error in - if let error { - if let errorCode = error.grpcErrorCode { - span.attributes[GRPCTracingKeys.grpcStatusCode] = errorCode.rawValue - } - span.setStatus(SpanStatus(code: .error)) - span.recordError(error) - } else { - span.attributes[GRPCTracingKeys.grpcStatusCode] = 0 - } - } - } - - success.bodyParts = RPCAsyncSequence(wrapping: hookedSequence) - response.accepted = .success(success) + if self.includeResponseMetadata { + span.setMetadataStringAttributesAsResponseSpanAttributes(response.metadata) + } - case .failure(let error): - span.attributes[GRPCTracingKeys.grpcStatusCode] = error.code.rawValue - span.setStatus(SpanStatus(code: .error)) - span.recordError(error) - } + switch response.accepted { + case .success(var success): + let tracedResponse = TracedClientResponseBodyParts( + wrapping: success.bodyParts, + span: span, + eventPerMessage: self.traceEachMessage, + includeMetadata: self.includeResponseMetadata + ) + success.bodyParts = RPCAsyncSequence(wrapping: tracedResponse) + response.accepted = .success(success) - return response + case .failure(let error): + span.endRPC(withError: error) } + + return response } } @@ -272,14 +214,94 @@ struct ClientRequestInjector: Instrumentation.Injector { } @available(gRPCSwiftExtras 2.0, *) -extension Error { - var grpcErrorCode: RPCError.Code? { - if let rpcError = self as? RPCError { - return rpcError.code - } else if let rpcError = self as? any RPCErrorConvertible { - return rpcError.rpcErrorCode - } else { - return nil +internal struct TracedClientResponseBodyParts: AsyncSequence, Sendable +where Output: Sendable { + typealias Base = RPCAsyncSequence.Contents.BodyPart, any Error> + typealias Element = Base.Element + + private let base: Base + private var span: any Span + private var eventPerMessage: Bool + private var includeMetadata: Bool + + init( + wrapping base: Base, + span: any Span, + eventPerMessage: Bool, + includeMetadata: Bool + ) { + self.base = base + self.span = span + self.eventPerMessage = eventPerMessage + self.includeMetadata = includeMetadata + } + + func makeAsyncIterator() -> AsyncIterator { + AsyncIterator( + wrapping: self.base.makeAsyncIterator(), + span: self.span, + eventPerMessage: self.eventPerMessage, + includeMetadata: self.includeMetadata + ) + } + + struct AsyncIterator: AsyncIteratorProtocol { + typealias Element = Base.Element + + private var wrapped: Base.AsyncIterator + private var span: any Span + private var messageID: Int + private var eventPerMessage: Bool + private var includeMetadata: Bool + + init( + wrapping iterator: Base.AsyncIterator, + span: any Span, + eventPerMessage: Bool, + includeMetadata: Bool + ) { + self.wrapped = iterator + self.span = span + self.eventPerMessage = eventPerMessage + self.includeMetadata = includeMetadata + self.messageID = 1 + } + + private mutating func nextMessageID() -> Int { + defer { self.messageID += 1 } + return self.messageID + } + + mutating func next( + isolation actor: isolated (any Actor)? + ) async throws(any Error) -> Element? { + do { + if let element = try await self.wrapped.next(isolation: actor) { + if self.eventPerMessage { + switch element { + case .message: + self.span.addEvent(.messageReceived(id: self.nextMessageID())) + + case .trailingMetadata(let metadata): + if self.includeMetadata { + self.span.setMetadataStringAttributesAsResponseSpanAttributes(metadata) + } + } + } + + return element + } else { + self.span.endRPC() + return nil + } + } catch { + self.span.endRPC(withError: error) + throw error + } + } + + mutating func next() async throws -> Element? { + try await self.next(isolation: nil) } } } diff --git a/Sources/GRPCOTelTracingInterceptors/Tracing/ServerOTelTracingInterceptor.swift b/Sources/GRPCOTelTracingInterceptors/Tracing/ServerOTelTracingInterceptor.swift index 7f0880f..098665f 100644 --- a/Sources/GRPCOTelTracingInterceptors/Tracing/ServerOTelTracingInterceptor.swift +++ b/Sources/GRPCOTelTracingInterceptors/Tracing/ServerOTelTracingInterceptor.swift @@ -144,99 +144,78 @@ public struct ServerOTelTracingInterceptor: ServerInterceptor { @Sendable (StreamingServerRequest, ServerContext) async throws -> StreamingServerResponse ) async throws -> StreamingServerResponse where Input: Sendable, Output: Sendable { - var serviceContext = ServiceContext.topLevel + var serviceContext = ServiceContext.current ?? .topLevel + tracer.extract(request.metadata, into: &serviceContext, using: self.extractor) + let span = tracer.startSpan( + context.descriptor.fullyQualifiedMethod, + context: serviceContext, + ofKind: .server + ) - tracer.extract( - request.metadata, - into: &serviceContext, - using: self.extractor + span.setOTelServerSpanGRPCAttributes( + context: context, + serverHostname: self.serverHostname, + networkTransportMethod: self.networkTransportMethod ) - // FIXME: use 'ServiceContext.withValue(serviceContext)' - // - // This is blocked on: https://github.com/apple/swift-service-context/pull/46 - return try await ServiceContext.$current.withValue(serviceContext) { - try await tracer.withSpan( - context.descriptor.fullyQualifiedMethod, - context: serviceContext, - ofKind: .server - ) { span in - span.setOTelServerSpanGRPCAttributes( - context: context, - serverHostname: self.serverHostname, - networkTransportMethod: self.networkTransportMethod - ) - - if self.includeRequestMetadata { - span.setMetadataStringAttributesAsRequestSpanAttributes(request.metadata) - } + if self.includeRequestMetadata { + span.setMetadataStringAttributesAsRequestSpanAttributes(request.metadata) + } - var request = request - if self.traceEachMessage { - let messageReceivedCounter = Atomic(1) - request.messages = RPCAsyncSequence( - wrapping: request.messages.map { element in - var event = SpanEvent(name: "rpc.message") - event.attributes[GRPCTracingKeys.rpcMessageType] = "RECEIVED" - event.attributes[GRPCTracingKeys.rpcMessageID] = - messageReceivedCounter - .wrappingAdd(1, ordering: .sequentiallyConsistent) - .oldValue - span.addEvent(event) - return element - } - ) - } + var request = request + if self.traceEachMessage { + let traced = TracedServerRequestMessages(wrapping: request.messages, span: span) + request.messages = RPCAsyncSequence(wrapping: traced) + } + + var response: StreamingServerResponse + do { + response = try await ServiceContext.$current.withValue(span.context) { + try await next(request, context) + } + } catch { + span.endRPC(withError: error) + throw error + } - var response = try await next(request, context) + if self.includeResponseMetadata { + span.setMetadataStringAttributesAsResponseSpanAttributes(response.metadata) + } + + switch response.accepted { + case .success(var success): + let producer = success.producer + success.producer = { writer in + let effectiveWriter: RPCWriter - if self.includeResponseMetadata { - span.setMetadataStringAttributesAsResponseSpanAttributes(response.metadata) + if self.traceEachMessage { + effectiveWriter = RPCWriter(wrapping: TracedMessageWriter(wrapping: writer, span: span)) + } else { + effectiveWriter = writer } - switch response.accepted { - case .success(var success): - let wrappedProducer = success.producer - - if self.traceEachMessage { - success.producer = { writer in - let messageSentCounter = Atomic(1) - let eventEmittingWriter = HookedWriter( - wrapping: writer, - afterEachWrite: { - var event = SpanEvent(name: "rpc.message") - event.attributes[GRPCTracingKeys.rpcMessageType] = "SENT" - event.attributes[GRPCTracingKeys.rpcMessageID] = - messageSentCounter - .wrappingAdd(1, ordering: .sequentiallyConsistent) - .oldValue - span.addEvent(event) - } - ) - - let trailingMetadata = try await wrappedProducer( - RPCWriter(wrapping: eventEmittingWriter) - ) - - if self.includeResponseMetadata { - span.setMetadataStringAttributesAsResponseSpanAttributes(trailingMetadata) - } - - return trailingMetadata - } - } + do { + let metadata = try await producer(effectiveWriter) - response = .init(accepted: .success(success)) + if self.includeResponseMetadata { + span.setMetadataStringAttributesAsResponseSpanAttributes(metadata) + } - case .failure(let error): - span.attributes[GRPCTracingKeys.grpcStatusCode] = error.code.rawValue - span.setStatus(SpanStatus(code: .error)) - span.recordError(error) + span.endRPC() + return metadata + } catch { + span.endRPC(withError: error) + throw error } - - return response } + + response = .init(accepted: .success(success)) + + case .failure(let error): + span.endRPC(withError: error) } + + return response } } @@ -251,3 +230,61 @@ struct ServerRequestExtractor: Instrumentation.Extractor { return values.next() } } + +@available(gRPCSwiftExtras 2.0, *) +internal struct TracedServerRequestMessages: AsyncSequence, Sendable where Input: Sendable { + typealias Base = RPCAsyncSequence + typealias Element = Base.Element + + private let base: Base + private var span: any Span + + init( + wrapping base: Base, + span: any Span + ) { + self.base = base + self.span = span + } + + func makeAsyncIterator() -> AsyncIterator { + AsyncIterator(wrapping: self.base.makeAsyncIterator(), span: self.span) + } + + struct AsyncIterator: AsyncIteratorProtocol { + typealias Element = Base.Element + + private var wrapped: Base.AsyncIterator + private var span: any Span + private var messageID: Int + + init( + wrapping iterator: Base.AsyncIterator, + span: any Span + ) { + self.wrapped = iterator + self.span = span + self.messageID = 1 + } + + private mutating func nextMessageID() -> Int { + defer { self.messageID += 1 } + return self.messageID + } + + mutating func next( + isolation actor: isolated (any Actor)? + ) async throws(any Error) -> Element? { + if let element = try await self.wrapped.next(isolation: actor) { + self.span.addEvent(.messageReceived(id: self.nextMessageID())) + return element + } else { + return nil + } + } + + mutating func next() async throws -> Element? { + try await self.next(isolation: nil) + } + } +} diff --git a/Sources/GRPCOTelTracingInterceptors/Tracing/Tracing+RPC.swift b/Sources/GRPCOTelTracingInterceptors/Tracing/Tracing+RPC.swift new file mode 100644 index 0000000..c2b48c9 --- /dev/null +++ b/Sources/GRPCOTelTracingInterceptors/Tracing/Tracing+RPC.swift @@ -0,0 +1,96 @@ +/* + * Copyright 2025, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +internal import GRPCCore +internal import Synchronization +internal import Tracing + +extension Span { + @available(gRPCSwiftExtras 2.0, *) + func endRPC() { + // No error, status code zero. + self.attributes[GRPCTracingKeys.grpcStatusCode] = 0 + self.setStatus(SpanStatus(code: .ok)) + self.end() + } + + @available(gRPCSwiftExtras 2.0, *) + func endRPC(withError error: RPCError) { + self.attributes[GRPCTracingKeys.grpcStatusCode] = error.code.rawValue + self.setStatus(SpanStatus(code: .error)) + self.recordError(error) + self.end() + } + + @available(gRPCSwiftExtras 2.0, *) + func endRPC(withError error: any Error) { + if let error = error as? RPCError { + self.endRPC(withError: error) + } else if let convertible = error as? any RPCErrorConvertible { + self.endRPC(withError: RPCError(convertible)) + } else { + self.attributes[GRPCTracingKeys.grpcStatusCode] = RPCError.Code.unknown.rawValue + self.setStatus(SpanStatus(code: .error)) + self.recordError(error) + self.end() + } + } +} + +extension SpanEvent { + private static func rpcMessage(type: String, id: Int) -> Self { + var event = SpanEvent(name: "rpc.message") + event.attributes[GRPCTracingKeys.rpcMessageType] = type + event.attributes[GRPCTracingKeys.rpcMessageID] = id + return event + } + + static func messageReceived(id: Int) -> Self { + Self.rpcMessage(type: "RECEIVED", id: id) + } + + static func messageSent(id: Int) -> Self { + Self.rpcMessage(type: "SENT", id: id) + } +} + +@available(gRPCSwiftExtras 2.0, *) +final class TracedMessageWriter: RPCWriterProtocol { + private let writer: any RPCWriterProtocol + private let span: any Span + private let messageID: Atomic + + init(wrapping writer: any RPCWriterProtocol, span: any Span) { + self.writer = writer + self.span = span + self.messageID = Atomic(1) + } + + private func nextMessageID() -> Int { + self.messageID.wrappingAdd(1, ordering: .sequentiallyConsistent).oldValue + } + + func write(_ element: Element) async throws { + try await self.writer.write(element) + self.span.addEvent(.messageSent(id: self.nextMessageID())) + } + + func write(contentsOf elements: some Sequence) async throws { + for element in elements { + try await self.write(element) + } + } +} diff --git a/Tests/GRPCOTelTracingInterceptorsTests/GRPCOTelTracingInterceptorsTests.swift b/Tests/GRPCOTelTracingInterceptorsTests/GRPCOTelTracingInterceptorsTests.swift index b3c540f..390b8e9 100644 --- a/Tests/GRPCOTelTracingInterceptorsTests/GRPCOTelTracingInterceptorsTests.swift +++ b/Tests/GRPCOTelTracingInterceptorsTests/GRPCOTelTracingInterceptorsTests.swift @@ -92,7 +92,7 @@ struct OTelTracingClientInterceptorTests { } assertAttributes: { attributes in #expect(attributes == testValues.expectedSpanAttributes) } assertStatus: { status in - #expect(status == nil) + #expect(status == SpanStatus(code: .ok)) } assertErrors: { errors in #expect(errors == []) } @@ -171,7 +171,7 @@ struct OTelTracingClientInterceptorTests { } assertAttributes: { attributes in #expect(attributes == testValues.expectedSpanAttributes) } assertStatus: { status in - #expect(status == nil) + #expect(status == SpanStatus(code: .ok)) } assertErrors: { errors in #expect(errors == []) } @@ -287,7 +287,7 @@ struct OTelTracingClientInterceptorTests { ] ) } assertStatus: { status in - #expect(status == nil) + #expect(status == SpanStatus(code: .ok)) } assertErrors: { errors in #expect(errors == []) } @@ -403,7 +403,7 @@ struct OTelTracingClientInterceptorTests { ] ) } assertStatus: { status in - #expect(status == nil) + #expect(status == SpanStatus(code: .ok)) } assertErrors: { errors in #expect(errors == []) } @@ -458,6 +458,7 @@ struct OTelTracingClientInterceptorTests { "rpc.system": "grpc", "rpc.method": .string(methodDescriptor.method), "rpc.service": .string(methodDescriptor.service.fullyQualifiedService), + "rpc.grpc.status_code": .int64(Int64(RPCError.Code.unknown.rawValue)), "server.address": "someserver.com", "server.port": 567, "network.peer.address": "10.1.2.80", @@ -467,7 +468,7 @@ struct OTelTracingClientInterceptorTests { ] ) } assertStatus: { status in - #expect(status == nil) + #expect(status == SpanStatus(code: .error)) } assertErrors: { errors in #expect(errors == [.testError]) } @@ -776,7 +777,7 @@ struct OTelTracingServerInterceptorTests { } assertAttributes: { attributes in #expect(attributes == testValues.expectedSpanAttributes) } assertStatus: { status in - #expect(status == nil) + #expect(status == SpanStatus(code: .ok)) } assertErrors: { errors in #expect(errors.isEmpty) } @@ -854,7 +855,7 @@ struct OTelTracingServerInterceptorTests { } assertAttributes: { attributes in #expect(attributes == testValues.expectedSpanAttributes) } assertStatus: { status in - #expect(status == nil) + #expect(status == SpanStatus(code: .ok)) } assertErrors: { errors in #expect(errors.isEmpty) } @@ -949,6 +950,7 @@ struct OTelTracingServerInterceptorTests { "rpc.system": "grpc", "rpc.method": .string(methodDescriptor.method), "rpc.service": .string(methodDescriptor.service.fullyQualifiedService), + "rpc.grpc.status_code": 0, "server.address": "someserver.com", "server.port": 123, "network.peer.address": "10.1.2.90", @@ -964,7 +966,7 @@ struct OTelTracingServerInterceptorTests { ] ) } assertStatus: { status in - #expect(status == nil) + #expect(status == SpanStatus(code: .ok)) } assertErrors: { errors in #expect(errors.isEmpty) } @@ -1059,6 +1061,7 @@ struct OTelTracingServerInterceptorTests { "rpc.system": "grpc", "rpc.method": .string(methodDescriptor.method), "rpc.service": .string(methodDescriptor.service.fullyQualifiedService), + "rpc.grpc.status_code": 0, "server.address": "someserver.com", "server.port": 123, "network.peer.address": "10.1.2.90", @@ -1074,7 +1077,7 @@ struct OTelTracingServerInterceptorTests { ] ) } assertStatus: { status in - #expect(status == nil) + #expect(status == SpanStatus(code: .ok)) } assertErrors: { errors in #expect(errors.isEmpty) } @@ -1095,7 +1098,11 @@ struct OTelTracingServerInterceptorTests { ) let traceIDString = UUID().uuidString let request = ServerRequest(metadata: ["trace-id": .string(traceIDString)], message: [UInt8]()) - let testValues = getTestValues(addressType: .ipv4, methodDescriptor: methodDescriptor) + let testValues = getTestValues( + addressType: .ipv4, + methodDescriptor: methodDescriptor, + statusCode: RPCError.Code.unknown.rawValue + ) do { let _: StreamingServerResponse = try await interceptor.intercept( tracer: tracer, @@ -1117,10 +1124,9 @@ struct OTelTracingServerInterceptorTests { assertTestSpanComponents(forMethod: methodDescriptor, tracer: tracer) { events in #expect(events.isEmpty) } assertAttributes: { attributes in - // The attributes should not contain a grpc status code, as the request was never even sent. #expect(attributes == testValues.expectedSpanAttributes) } assertStatus: { status in - #expect(status == nil) + #expect(status == SpanStatus(code: .error)) } assertErrors: { errors in #expect(errors == [.testError]) } @@ -1194,7 +1200,8 @@ struct OTelTracingServerInterceptorTests { @available(gRPCSwiftExtras 2.0, *) private func getTestValues( addressType: OTelTracingInterceptorTestAddressType, - methodDescriptor: MethodDescriptor + methodDescriptor: MethodDescriptor, + statusCode: Int = 0, ) -> OTelTracingInterceptorTestCaseValues { switch addressType { case .ipv4: @@ -1205,6 +1212,7 @@ struct OTelTracingServerInterceptorTests { "rpc.system": "grpc", "rpc.method": .string(methodDescriptor.method), "rpc.service": .string(methodDescriptor.service.fullyQualifiedService), + "rpc.grpc.status_code": .int64(Int64(statusCode)), "server.address": "someserver.com", "server.port": 123, "network.peer.address": "10.1.2.90", @@ -1224,6 +1232,7 @@ struct OTelTracingServerInterceptorTests { "rpc.system": "grpc", "rpc.method": .string(methodDescriptor.method), "rpc.service": .string(methodDescriptor.service.fullyQualifiedService), + "rpc.grpc.status_code": .int64(Int64(statusCode)), "server.address": "someserver.com", "server.port": 5678, "network.peer.address": "ff06:0:0:0:0:0:0:c3", @@ -1243,6 +1252,7 @@ struct OTelTracingServerInterceptorTests { "rpc.system": "grpc", "rpc.method": .string(methodDescriptor.method), "rpc.service": .string(methodDescriptor.service.fullyQualifiedService), + "rpc.grpc.status_code": .int64(Int64(statusCode)), "server.address": "someserver.com", "network.peer.address": "some-path", "network.transport": "tcp", diff --git a/Tests/GRPCOTelTracingInterceptorsTests/OTelTracingIntegrationTests.swift b/Tests/GRPCOTelTracingInterceptorsTests/OTelTracingIntegrationTests.swift index 90edb84..387bbaf 100644 --- a/Tests/GRPCOTelTracingInterceptorsTests/OTelTracingIntegrationTests.swift +++ b/Tests/GRPCOTelTracingInterceptorsTests/OTelTracingIntegrationTests.swift @@ -87,7 +87,7 @@ struct OTelTracingIntegrationTests { } @available(gRPCSwiftExtras 2.0, *) - @Test(.disabled("known issues with tracing interceptors"), arguments: TracingLevel.allCases) + @Test(arguments: TracingLevel.allCases) func unary(level: TracingLevel) async throws { let tracer = try await withEchoService(clientTracing: level, serverTracing: level) { echo in let reply = try await echo.get(.with { $0.text = "Hello!" }) @@ -119,7 +119,7 @@ struct OTelTracingIntegrationTests { } @available(gRPCSwiftExtras 2.0, *) - @Test(.disabled("known issues with tracing interceptors"), arguments: TracingLevel.allCases) + @Test(arguments: TracingLevel.allCases) func serverStreaming(level: TracingLevel) async throws { let tracer = try await withEchoService(clientTracing: level, serverTracing: level) { echo in try await echo.expand(.with { $0.text = "Foo Bar Baz" }) { response in @@ -153,7 +153,7 @@ struct OTelTracingIntegrationTests { } @available(gRPCSwiftExtras 2.0, *) - @Test(.disabled("known issues with tracing interceptors"), arguments: TracingLevel.allCases) + @Test(arguments: TracingLevel.allCases) func clientStreaming(level: TracingLevel) async throws { let tracer = try await withEchoService(clientTracing: level, serverTracing: level) { echo in let reply = try await echo.collect { writer in @@ -189,7 +189,7 @@ struct OTelTracingIntegrationTests { } @available(gRPCSwiftExtras 2.0, *) - @Test(.disabled("known issues with tracing interceptors"), arguments: TracingLevel.allCases) + @Test(arguments: TracingLevel.allCases) func bidirectionalStreaming(level: TracingLevel) async throws { let tracer = try await withEchoService(clientTracing: level, serverTracing: level) { echo in try await echo.update { writer in @@ -233,7 +233,6 @@ struct OTelTracingIntegrationTests { @available(gRPCSwiftExtras 2.0, *) @Test( - .disabled("known issues with tracing interceptors"), arguments: [ (.immediately(RPCError(code: .aborted, message: "")), .aborted), (.immediately(ConvertibleError(.aborted)), .aborted), From 57482d65fd56b97cee51a8606e094f5ff961e95b Mon Sep 17 00:00:00 2001 From: George Barnett Date: Mon, 24 Nov 2025 17:17:09 +0000 Subject: [PATCH 2/3] add constraint --- Sources/GRPCOTelTracingInterceptors/Tracing/Tracing+RPC.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Sources/GRPCOTelTracingInterceptors/Tracing/Tracing+RPC.swift b/Sources/GRPCOTelTracingInterceptors/Tracing/Tracing+RPC.swift index c2b48c9..dee934a 100644 --- a/Sources/GRPCOTelTracingInterceptors/Tracing/Tracing+RPC.swift +++ b/Sources/GRPCOTelTracingInterceptors/Tracing/Tracing+RPC.swift @@ -68,7 +68,7 @@ extension SpanEvent { } @available(gRPCSwiftExtras 2.0, *) -final class TracedMessageWriter: RPCWriterProtocol { +final class TracedMessageWriter: RPCWriterProtocol where Element: Sendable { private let writer: any RPCWriterProtocol private let span: any Span private let messageID: Atomic From 6bf6b6123c251b4fa354dc0b496eaef102f833bb Mon Sep 17 00:00:00 2001 From: George Barnett Date: Mon, 24 Nov 2025 17:21:00 +0000 Subject: [PATCH 3/3] Appease the 6.0 compiler gods --- .../GRPCOTelTracingInterceptorsTests.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Tests/GRPCOTelTracingInterceptorsTests/GRPCOTelTracingInterceptorsTests.swift b/Tests/GRPCOTelTracingInterceptorsTests/GRPCOTelTracingInterceptorsTests.swift index 390b8e9..024fd15 100644 --- a/Tests/GRPCOTelTracingInterceptorsTests/GRPCOTelTracingInterceptorsTests.swift +++ b/Tests/GRPCOTelTracingInterceptorsTests/GRPCOTelTracingInterceptorsTests.swift @@ -1201,7 +1201,7 @@ struct OTelTracingServerInterceptorTests { private func getTestValues( addressType: OTelTracingInterceptorTestAddressType, methodDescriptor: MethodDescriptor, - statusCode: Int = 0, + statusCode: Int = 0 ) -> OTelTracingInterceptorTestCaseValues { switch addressType { case .ipv4: