diff --git a/.gitignore b/.gitignore index bdd8498..681d0b0 100644 --- a/.gitignore +++ b/.gitignore @@ -8,3 +8,4 @@ DerivedData/ .swiftpm/xcode/package.xcworkspace/contents.xcworkspacedata SampleApp/.build FishyTransport/.build +.idea/ diff --git a/FishyTransport/Package.swift b/FishyTransport/Package.swift index db433d4..140bbc1 100644 --- a/FishyTransport/Package.swift +++ b/FishyTransport/Package.swift @@ -52,7 +52,8 @@ let package = Package( .package(url: "https://github.com/apple/swift-nio.git", from: "2.0.0"), .package(url: "https://github.com/swift-server/async-http-client.git", from: "1.5.0"), .package(url: "https://github.com/apple/swift-argument-parser", from: "0.4.0"), - .package(url: "https://github.com/apple/swift-syntax.git", revision: "b8e4a69237f9dfa362268dddaef8793bc694dc6f") // TODO: we currently must depend on specific versions here + .package(url: "https://github.com/apple/swift-syntax.git", revision: "b8e4a69237f9dfa362268dddaef8793bc694dc6f"), // TODO: we currently must depend on specific versions here + .package(url: "https://github.com/apple/swift-distributed-tracing.git", from: "0.2.0"), // ==== END OF DEPENDENCIES OF TRANSPORT ==== // ], targets: [ @@ -63,6 +64,7 @@ let package = Package( .product(name: "_NIOConcurrency", package: "swift-nio"), .product(name: "Logging", package: "swift-log"), .product(name: "AsyncHTTPClient", package: "async-http-client"), + .product(name: "Tracing", package: "swift-distributed-tracing"), ], swiftSettings: [ .unsafeFlags(experimentalFlags) diff --git a/FishyTransport/Sources/FishyActorTransport/FishyTransport+Server.swift b/FishyTransport/Sources/FishyActorTransport/FishyTransport+Server.swift index 6224949..46fd537 100644 --- a/FishyTransport/Sources/FishyActorTransport/FishyTransport+Server.swift +++ b/FishyTransport/Sources/FishyActorTransport/FishyTransport+Server.swift @@ -17,6 +17,7 @@ import _Distributed import NIO import NIOHTTP1 import Foundation +import Tracing import _NIOConcurrency private final class HTTPHandler: @unchecked Sendable, ChannelInboundHandler, RemovableChannelHandler { @@ -144,12 +145,12 @@ private final class HTTPHandler: @unchecked Sendable, ChannelInboundHandler, Rem final class FishyServer { - var group: MultiThreadedEventLoopGroup + var group: EventLoopGroup let transport: FishyTransport var channel: Channel! = nil - init(group: MultiThreadedEventLoopGroup, transport: FishyTransport) { + init(group: EventLoopGroup, transport: FishyTransport) { self.group = group self.transport = transport } @@ -176,4 +177,4 @@ final class FishyServer { channel = try bootstrap.bind(host: host, port: port).wait() assert(channel.localAddress != nil, "localAddress was nil!") } -} \ No newline at end of file +} diff --git a/FishyTransport/Sources/FishyActorTransport/FishyTransport.swift b/FishyTransport/Sources/FishyActorTransport/FishyTransport.swift index b11a01c..384ca90 100644 --- a/FishyTransport/Sources/FishyActorTransport/FishyTransport.swift +++ b/FishyTransport/Sources/FishyActorTransport/FishyTransport.swift @@ -19,6 +19,7 @@ import NIOHTTP1 import _NIOConcurrency import AsyncHTTPClient import Logging +import Tracing import Foundation // because JSONEncoder and co import struct Foundation.UUID @@ -30,6 +31,10 @@ public protocol MessageRecipient { ) async throws -> Encoder.Output where Encoder: TopLevelEncoder, Decoder: TopLevelDecoder } +public protocol FishyMessage: Codable { + var functionIdentifier: String { get } +} + private struct AnyMessageRecipient: MessageRecipient { weak var actor: AnyObject? // Note: store as AnyActor once supported? @@ -64,11 +69,11 @@ public final class FishyTransport: ActorTransport, @unchecked Sendable, CustomSt private var managed: [AnyActorIdentity: AnyMessageRecipient] // networking infra - private let group: MultiThreadedEventLoopGroup + private let group: EventLoopGroup private var server: FishyServer! private let client: HTTPClient - public init(host: String, port: Int, logLevel: Logger.Level? = nil) throws { + public init(host: String, port: Int, group: EventLoopGroup, logLevel: Logger.Level? = nil) throws { self.host = host self.port = port @@ -82,7 +87,6 @@ public final class FishyTransport: ActorTransport, @unchecked Sendable, CustomSt } } - let group = MultiThreadedEventLoopGroup(numberOfThreads: 1) self.group = group self.lock = Lock() self.managed = [:] @@ -140,14 +144,14 @@ public final class FishyTransport: ActorTransport, @unchecked Sendable, CustomSt } } - public func send( + public func send( _ message: Message, to recipient: AnyActorIdentity, expecting responseType: Void.Type ) async throws -> Void { _ = try await self.send(message, to: recipient, expecting: NoResponse.self) } - public func send( + public func send( _ message: Message, to recipient: AnyActorIdentity, expecting responseType: Response.Type = Response.self ) async throws -> Response { @@ -184,78 +188,96 @@ public final class FishyTransport: ActorTransport, @unchecked Sendable, CustomSt } } - private func sendEnvelopeRequest( + private func sendEnvelopeRequest( _ message: Message, to recipient: AnyActorIdentity, encoder: JSONEncoder ) async throws -> HTTPClient.Response { - // Prepare the message envelope - var envelope = try Envelope(recipient: recipient, message: message) + try await InstrumentationSystem.tracer.withSpan(message.functionIdentifier) { span in + // Prepare the message envelope + var envelope = try Envelope(recipient: recipient, message: message) - // TODO: pick up any task locals that we want to propagage, e.g. tracing metadata and put into Envelope - // i.e. this is where we'd invoke swift-distributed-tracing's Instruments - envelope.metadata = [:] + // inject metadata values to propagate for distributed tracing + if let baggage = Baggage.current { + InstrumentationSystem.instrument.inject(baggage, into: &envelope.metadata, using: MessageEnvelopeMetadataInjector()) + } - var recipientURI: String.SubSequence = "\(recipient.underlying)" + var recipientURI: String.SubSequence = "\(recipient.underlying)" - let requestData = try encoder.encode(envelope) - log.debug("Send envelope request", metadata: [ - "envelope": "\(String(data: requestData, encoding: .utf8)!)", - "recipient": "\(recipientURI)" - ]) + let requestData = try encoder.encode(envelope) + log.debug("Send envelope request", metadata: [ + "envelope": "\(String(data: requestData, encoding: .utf8)!)", + "recipient": "\(recipientURI)" + ]) - recipientURI = recipientURI.dropFirst("fishy://".count) // our transport is super silly, and abuses http for its messaging - let requestURI = String("http://" + recipientURI) + recipientURI = recipientURI.dropFirst("fishy://".count) // our transport is super silly, and abuses http for its messaging + let requestURI = String("http://" + recipientURI) - let response = try await sendHTTPRequest(requestURI: requestURI, requestData: requestData) - log.debug("Received response \(response)", metadata: [ - "response/payload": "\(response.body?.getString(at: 0, length: response.body?.readableBytes ?? 0) ?? "")" - ]) + let response = try await sendHTTPRequest(requestURI: requestURI, requestData: requestData) + log.debug("Received response \(response)", metadata: [ + "response/payload": "\(response.body?.getString(at: 0, length: response.body?.readableBytes ?? 0) ?? "")" + ]) - return response + return response + } } private func sendHTTPRequest(requestURI: String, requestData: Data) async throws -> HTTPClient.Response { - let request = try HTTPClient.Request( - url: requestURI, - method: .POST, - headers: [ - "Content-Type": "application/json" - ], - body: .data(requestData)) - - let future = client.execute( - request: request, - deadline: .now() + .seconds(3)) // A real implementation would allow configuring these (i.e. pick up a task-local deadline) - - return try await future.get() + try await InstrumentationSystem.tracer.withSpan("HTTP POST", ofKind: .client) { span in + let request = try HTTPClient.Request( + url: requestURI, + method: .POST, + headers: [ + "Content-Type": "application/json" + ], + body: .data(requestData)) + span.attributes["http.method"] = "POST" + span.attributes["http.url"] = requestURI + + let future = client.execute( + request: request, + deadline: .now() + .seconds(3)) // A real implementation would allow configuring these (i.e. pick up a task-local deadline) + + let response = try await future.get() + span.attributes["http.status_code"] = Int(response.status.code) + return response + } } /// Actually deliver the message to the local recipient func deliver(envelope: Envelope) async throws -> Data { - log.debug("Deliver to \(envelope.recipient)") - - guard let known = resolveRecipient(of: envelope) else { - throw handleDeadLetter(envelope) - } - - log.debug("Delivering to local instance: \(known)", metadata: [ - "envelope": "\(envelope)", - "recipient": "\(known)", - ]) - - // In a real implementation coders would often be configurable on transport level. - // - // The transport must ensure to store itself in the user info offered to receive - // as it may need to attempt to decode actor references. - let encoder = JSONEncoder() - let decoder = JSONDecoder() - encoder.userInfo[.actorTransportKey] = self - decoder.userInfo[.actorTransportKey] = self + var baggage = Baggage.current ?? .topLevel + InstrumentationSystem.instrument.extract( + envelope.metadata, + into: &baggage, + using: MessageEnvelopeMetadataExtractor() + ) + + return try await Baggage.$current.withValue(baggage) { + log.debug("Deliver to \(envelope.recipient)") + + guard let known = resolveRecipient(of: envelope) else { + throw handleDeadLetter(envelope) + } - do { - return try await known._receiveAny(envelope: envelope, encoder: encoder, decoder: decoder) - } catch { - fatalError("Failed to deliver: \(error)") + log.debug("Delivering to local instance: \(known)", metadata: [ + "envelope": "\(envelope)", + "recipient": "\(known)", + ]) + + // In a real implementation coders would often be configurable on transport level. + // + // The transport must ensure to store itself in the user info offered to receive + // as it may need to attempt to decode actor references. + let encoder = JSONEncoder() + let decoder = JSONDecoder() + encoder.userInfo[.actorTransportKey] = self + decoder.userInfo[.actorTransportKey] = self + + do { + return try await known._receiveAny(envelope: envelope, encoder: encoder, decoder: decoder) + } catch { + fatalError("Failed to deliver: \(error)") + } } } @@ -357,8 +379,10 @@ public struct Envelope: Sendable, Codable { /// Represents a `void` return type of a distributed call. /// Pass this to `send` to avoid decoding any value from the response. -public enum NoResponse: Codable { +public enum NoResponse: Codable, FishyActorTransport.FishyMessage { case _instance + + public var functionIdentifier: String { "noResponse" } } extension DistributedActor { @@ -392,4 +416,19 @@ public struct UnknownRecipientError: ActorTransportError { public struct RecipientReleasedError: ActorTransportError { let recipient: AnyActorIdentity -} \ No newline at end of file +} + +// ==== ---------------------------------------------------------------------------------------------------------------- +// MARK: Instrumentation + +struct MessageEnvelopeMetadataInjector: Injector { + func inject(_ value: String, forKey key: String, into metadata: inout [String: String]) { + metadata[key] = value + } +} + +struct MessageEnvelopeMetadataExtractor: Extractor { + func extract(key: String, from metadata: [String: String]) -> String? { + metadata[key] + } +} diff --git a/FishyTransport/Sources/FishyActorsGenerator/SourceGen.swift b/FishyTransport/Sources/FishyActorsGenerator/SourceGen.swift index 78d97fe..e9648a5 100644 --- a/FishyTransport/Sources/FishyActorsGenerator/SourceGen.swift +++ b/FishyTransport/Sources/FishyActorsGenerator/SourceGen.swift @@ -71,7 +71,7 @@ final class SourceGen { // -- emit a `case` that represents the function // sourceText += """ - enum _Message: Sendable, Codable { + enum _Message: Sendable, FishyActorTransport.FishyMessage { """ @@ -98,6 +98,13 @@ final class SourceGen { sourceText += ")\n" } + sourceText += "\n var functionIdentifier: String {\n switch self {\n" + for fun in decl.funcs { + sourceText += " case .\(fun.name):\n return \"\(fun.name)(\(fun.renderFuncParams(forFuncIdentifier: true)))\"\n" + } + sourceText += " }\n" + sourceText += " }\n" + sourceText += " }\n \n" // ==== Generate the "receive"-side, we must decode the incoming Envelope // into a _Message and apply it to our local actor. @@ -243,3 +250,38 @@ extension FuncDecl { }.joined(separator: ", ") + ")" } } + +extension FuncDecl { + fileprivate func renderFuncParams(forFuncIdentifier: Bool = false) -> String { + var result = params.map { first, second, type in + // FIXME: super naive... replace with something more proper + if let name = first { + if name == second || forFuncIdentifier { + // no need to write `name name: String` + var ret = "\(name)" + if (!forFuncIdentifier) { + ret += ": \(type)" + } + return ret + } else { + var ret = "\(name) \(second):" + if (!forFuncIdentifier) { + ret += " \(type)" + } + return ret + } + } else { + if (forFuncIdentifier) { + return "_" + } else { + return "\(second): \(type)" + } + } + }.joined(separator: forFuncIdentifier ? ":" : ", ") + + if forFuncIdentifier && !self.params.isEmpty { + result += ":" + } + return result + } +} diff --git a/README.md b/README.md index 9ccd118..55a4c4e 100644 --- a/README.md +++ b/README.md @@ -70,6 +70,17 @@ Notice that the simplified ID printout contains the port number of the node the This sample is a distributed application created from just a single process, but all the "nodes" communicate through networking with eachother. The same application could be launched on different physical hosts (and then would have different IP addresses), this is what location transparency of distributed actors enables us to do. +### Distributed Tracing + +Additionally, the calls can also be traced using "Swift Distributed Tracing". To use tracing for the sample app, start the services in [`docker-compose.yaml`](SampleApp/docker-compose.yaml), then run the sample again. + +```sh +docker compose -p fishy-transport-sample up -d +``` + +Afterwards, open Jaeger [http://localhost:16686](http://localhost:16686) and you'll see traces similar to this: + +![Jaeger Example Trace](images/jaeger-trace.png) ### Experimental flags diff --git a/SampleApp/Package.resolved b/SampleApp/Package.resolved index f55c2b6..2baaa71 100644 --- a/SampleApp/Package.resolved +++ b/SampleApp/Package.resolved @@ -5,8 +5,26 @@ "kind" : "remoteSourceControl", "location" : "https://github.com/swift-server/async-http-client.git", "state" : { - "revision" : "d5bd8d6526fa861adf18be7217bc180e6c13ac0d", - "version" : "1.6.2" + "revision" : "1081b0b0541f535ca088acdb56f5ca5598bc6247", + "version" : "1.6.3" + } + }, + { + "identity" : "grpc-swift", + "kind" : "remoteSourceControl", + "location" : "https://github.com/grpc/grpc-swift.git", + "state" : { + "revision" : "580ba36aec6d96c4e93365e0e55197832853d9ea", + "version" : "1.5.0" + } + }, + { + "identity" : "opentelemetry-swift", + "kind" : "remoteSourceControl", + "location" : "https://github.com/slashmo/opentelemetry-swift.git", + "state" : { + "branch" : "automatic-context-propagation", + "revision" : "8083218b15e55e0cf22805c3e9cf62e67ec77068" } }, { @@ -18,6 +36,24 @@ "version" : "0.5.0" } }, + { + "identity" : "swift-distributed-tracing", + "kind" : "remoteSourceControl", + "location" : "https://github.com/apple/swift-distributed-tracing.git", + "state" : { + "revision" : "7a89c904d80fd2dc7c6071806f38d5d0b2d5a1b5", + "version" : "0.2.0" + } + }, + { + "identity" : "swift-distributed-tracing-baggage", + "kind" : "remoteSourceControl", + "location" : "https://github.com/apple/swift-distributed-tracing-baggage.git", + "state" : { + "revision" : "4239b6569f8a19b6bdc2b9c51973e2d67216014c", + "version" : "0.2.1" + } + }, { "identity" : "swift-log", "kind" : "remoteSourceControl", @@ -72,6 +108,15 @@ "version" : "1.11.3" } }, + { + "identity" : "swift-protobuf", + "kind" : "remoteSourceControl", + "location" : "https://github.com/apple/swift-protobuf.git", + "state" : { + "revision" : "7e2c5f3cbbeea68e004915e3a8961e20bd11d824", + "version" : "1.18.0" + } + }, { "identity" : "swift-syntax", "kind" : "remoteSourceControl", diff --git a/SampleApp/Package.swift b/SampleApp/Package.swift index 866abaf..9872c30 100644 --- a/SampleApp/Package.swift +++ b/SampleApp/Package.swift @@ -38,6 +38,8 @@ let package = Package( .package(url: "https://github.com/apple/swift-log.git", from: "1.2.0"), .package(url: "https://github.com/apple/swift-argument-parser", from: "0.4.0"), + .package(url: "https://github.com/apple/swift-distributed-tracing.git", from: "0.2.0"), + .package(url: "https://github.com/slashmo/opentelemetry-swift.git", branch: "automatic-context-propagation"), ], targets: [ .executableTarget( @@ -46,6 +48,9 @@ let package = Package( .product(name: "FishyActorTransport", package: "sample-fishy-transport"), .product(name: "Logging", package: "swift-log"), .product(name: "ArgumentParser", package: "swift-argument-parser"), + .product(name: "Tracing", package: "swift-distributed-tracing"), + .product(name: "OpenTelemetry", package: "opentelemetry-swift"), + .product(name: "OtlpGRPCSpanExporting", package: "opentelemetry-swift"), ], swiftSettings: [ .unsafeFlags(experimentalFlags) diff --git a/SampleApp/Sources/FishyActorsDemo/main.swift b/SampleApp/Sources/FishyActorsDemo/main.swift index 554fedb..1511da1 100644 --- a/SampleApp/Sources/FishyActorsDemo/main.swift +++ b/SampleApp/Sources/FishyActorsDemo/main.swift @@ -16,7 +16,11 @@ import _Distributed import FishyActorTransport import ArgumentParser +import NIO import Logging +import Tracing +import OpenTelemetry +import OtlpGRPCSpanExporting import func Foundation.sleep @@ -28,18 +32,29 @@ struct Demo: ParsableCommand { var interactive: Bool = false @Flag(help: "Log level used by (all) ActorTransport instances") - var transportLogLevel: Logger.Level = .warning + var transportLogLevel: Logger.Level = .info mutating func run() throws { LoggingSystem.bootstrap(PrettyDemoLogHandler.init) + let group = MultiThreadedEventLoopGroup(numberOfThreads: System.coreCount) + let otel = OTel( + serviceName: "chatroom", + eventLoopGroup: group, + processor: OTel.BatchSpanProcessor( + exportingTo: OtlpGRPCSpanExporter(config: OtlpGRPCSpanExporter.Config(eventLoopGroup: group)), + eventLoopGroup: group + ) + ) + try otel.start().wait() + InstrumentationSystem.bootstrap(otel.tracer()) var keepAlive: Set = [] // one node to keep the chat rooms: - let roomNode = try FishyTransport(host: "127.0.0.1", port: 8001, logLevel: transportLogLevel) + let roomNode = try FishyTransport(host: "127.0.0.1", port: 8001, group: group, logLevel: transportLogLevel) // multiple nodes for the regional chatters: - let firstNode = try FishyTransport(host: "127.0.0.1", port: 9002, logLevel: transportLogLevel) - let secondNode = try FishyTransport(host: "127.0.0.1", port: 9003, logLevel: transportLogLevel) + let firstNode = try FishyTransport(host: "127.0.0.1", port: 9002, group: group, logLevel: transportLogLevel) + let secondNode = try FishyTransport(host: "127.0.0.1", port: 9003, group: group, logLevel: transportLogLevel) let room = ChatRoom(topic: "Cute Capybaras", transport: roomNode) @@ -62,6 +77,9 @@ struct Demo: ParsableCommand { // normally transports will ofer `await .park()` functions, but for now just sleep: sleep(1000) _ = keepAlive + + try otel.shutdown().wait() + try group.syncShutdownGracefully() } } @@ -70,4 +88,3 @@ if #available(macOS 12.0, /* Linux */ *) { } else { fatalError("Unsupported platform") } - diff --git a/SampleApp/docker-compose.yaml b/SampleApp/docker-compose.yaml new file mode 100644 index 0000000..5996697 --- /dev/null +++ b/SampleApp/docker-compose.yaml @@ -0,0 +1,21 @@ +version: '3.0' + +services: + otel-collector: + image: otel/opentelemetry-collector-contrib:latest + networks: [observability] + depends_on: [jaeger] + command: ["--config=/etc/config.yaml"] + volumes: + - ./otel-collector-config.yaml:/etc/config.yaml + ports: + - 4317:4317 + + jaeger: + image: jaegertracing/all-in-one:latest + networks: [observability] + ports: + - 16686:16686 + +networks: + observability: diff --git a/SampleApp/otel-collector-config.yaml b/SampleApp/otel-collector-config.yaml new file mode 100644 index 0000000..4cad933 --- /dev/null +++ b/SampleApp/otel-collector-config.yaml @@ -0,0 +1,18 @@ +receivers: + otlp: + protocols: + grpc: + +exporters: + jaeger: + endpoint: jaeger:14250 + tls: + insecure: true + logging: + logLevel: debug + +service: + pipelines: + traces: + receivers: [otlp] + exporters: [jaeger, logging] diff --git a/images/jaeger-trace.png b/images/jaeger-trace.png new file mode 100644 index 0000000..8bcbde0 Binary files /dev/null and b/images/jaeger-trace.png differ