Skip to content

Commit

Permalink
Merge pull request #3 from slashmo/tracing
Browse files Browse the repository at this point in the history
Add distributed tracing
  • Loading branch information
ktoso committed Nov 2, 2021
2 parents d1e99a0 + 1583e6c commit b18a0b5
Show file tree
Hide file tree
Showing 12 changed files with 276 additions and 74 deletions.
1 change: 1 addition & 0 deletions .gitignore
Expand Up @@ -8,3 +8,4 @@ DerivedData/
.swiftpm/xcode/package.xcworkspace/contents.xcworkspacedata
SampleApp/.build
FishyTransport/.build
.idea/
4 changes: 3 additions & 1 deletion FishyTransport/Package.swift
Expand Up @@ -52,7 +52,8 @@ let package = Package(
.package(url: "https://github.com/apple/swift-nio.git", from: "2.0.0"),
.package(url: "https://github.com/swift-server/async-http-client.git", from: "1.5.0"),
.package(url: "https://github.com/apple/swift-argument-parser", from: "0.4.0"),
.package(url: "https://github.com/apple/swift-syntax.git", revision: "b8e4a69237f9dfa362268dddaef8793bc694dc6f") // TODO: we currently must depend on specific versions here
.package(url: "https://github.com/apple/swift-syntax.git", revision: "b8e4a69237f9dfa362268dddaef8793bc694dc6f"), // TODO: we currently must depend on specific versions here
.package(url: "https://github.com/apple/swift-distributed-tracing.git", from: "0.2.0"),
// ==== END OF DEPENDENCIES OF TRANSPORT ==== //
],
targets: [
Expand All @@ -63,6 +64,7 @@ let package = Package(
.product(name: "_NIOConcurrency", package: "swift-nio"),
.product(name: "Logging", package: "swift-log"),
.product(name: "AsyncHTTPClient", package: "async-http-client"),
.product(name: "Tracing", package: "swift-distributed-tracing"),
],
swiftSettings: [
.unsafeFlags(experimentalFlags)
Expand Down
Expand Up @@ -17,6 +17,7 @@ import _Distributed
import NIO
import NIOHTTP1
import Foundation
import Tracing
import _NIOConcurrency

private final class HTTPHandler: @unchecked Sendable, ChannelInboundHandler, RemovableChannelHandler {
Expand Down Expand Up @@ -144,12 +145,12 @@ private final class HTTPHandler: @unchecked Sendable, ChannelInboundHandler, Rem

final class FishyServer {

var group: MultiThreadedEventLoopGroup
var group: EventLoopGroup
let transport: FishyTransport

var channel: Channel! = nil

init(group: MultiThreadedEventLoopGroup, transport: FishyTransport) {
init(group: EventLoopGroup, transport: FishyTransport) {
self.group = group
self.transport = transport
}
Expand All @@ -176,4 +177,4 @@ final class FishyServer {
channel = try bootstrap.bind(host: host, port: port).wait()
assert(channel.localAddress != nil, "localAddress was nil!")
}
}
}
163 changes: 101 additions & 62 deletions FishyTransport/Sources/FishyActorTransport/FishyTransport.swift
Expand Up @@ -19,6 +19,7 @@ import NIOHTTP1
import _NIOConcurrency
import AsyncHTTPClient
import Logging
import Tracing

import Foundation // because JSONEncoder and co
import struct Foundation.UUID
Expand All @@ -30,6 +31,10 @@ public protocol MessageRecipient {
) async throws -> Encoder.Output where Encoder: TopLevelEncoder, Decoder: TopLevelDecoder
}

public protocol FishyMessage: Codable {
var functionIdentifier: String { get }
}

private struct AnyMessageRecipient: MessageRecipient {
weak var actor: AnyObject? // Note: store as AnyActor once supported?

Expand Down Expand Up @@ -64,11 +69,11 @@ public final class FishyTransport: ActorTransport, @unchecked Sendable, CustomSt
private var managed: [AnyActorIdentity: AnyMessageRecipient]

// networking infra
private let group: MultiThreadedEventLoopGroup
private let group: EventLoopGroup
private var server: FishyServer!
private let client: HTTPClient

public init(host: String, port: Int, logLevel: Logger.Level? = nil) throws {
public init(host: String, port: Int, group: EventLoopGroup, logLevel: Logger.Level? = nil) throws {
self.host = host
self.port = port

Expand All @@ -82,7 +87,6 @@ public final class FishyTransport: ActorTransport, @unchecked Sendable, CustomSt
}
}

let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
self.group = group
self.lock = Lock()
self.managed = [:]
Expand Down Expand Up @@ -140,14 +144,14 @@ public final class FishyTransport: ActorTransport, @unchecked Sendable, CustomSt
}
}

public func send<Message: Sendable & Codable>(
public func send<Message: Sendable & FishyMessage>(
_ message: Message, to recipient: AnyActorIdentity,
expecting responseType: Void.Type
) async throws -> Void {
_ = try await self.send(message, to: recipient, expecting: NoResponse.self)
}

public func send<Message: Sendable & Codable, Response: Sendable & Codable>(
public func send<Message: Sendable & FishyMessage, Response: Sendable & Codable>(
_ message: Message, to recipient: AnyActorIdentity,
expecting responseType: Response.Type = Response.self
) async throws -> Response {
Expand Down Expand Up @@ -184,78 +188,96 @@ public final class FishyTransport: ActorTransport, @unchecked Sendable, CustomSt
}
}

private func sendEnvelopeRequest<Message: Sendable & Codable>(
private func sendEnvelopeRequest<Message: Sendable & FishyActorTransport.FishyMessage>(
_ message: Message, to recipient: AnyActorIdentity,
encoder: JSONEncoder
) async throws -> HTTPClient.Response {
// Prepare the message envelope
var envelope = try Envelope(recipient: recipient, message: message)
try await InstrumentationSystem.tracer.withSpan(message.functionIdentifier) { span in
// Prepare the message envelope
var envelope = try Envelope(recipient: recipient, message: message)

// TODO: pick up any task locals that we want to propagage, e.g. tracing metadata and put into Envelope
// i.e. this is where we'd invoke swift-distributed-tracing's Instruments
envelope.metadata = [:]
// inject metadata values to propagate for distributed tracing
if let baggage = Baggage.current {
InstrumentationSystem.instrument.inject(baggage, into: &envelope.metadata, using: MessageEnvelopeMetadataInjector())
}

var recipientURI: String.SubSequence = "\(recipient.underlying)"
var recipientURI: String.SubSequence = "\(recipient.underlying)"

let requestData = try encoder.encode(envelope)
log.debug("Send envelope request", metadata: [
"envelope": "\(String(data: requestData, encoding: .utf8)!)",
"recipient": "\(recipientURI)"
])
let requestData = try encoder.encode(envelope)
log.debug("Send envelope request", metadata: [
"envelope": "\(String(data: requestData, encoding: .utf8)!)",
"recipient": "\(recipientURI)"
])

recipientURI = recipientURI.dropFirst("fishy://".count) // our transport is super silly, and abuses http for its messaging
let requestURI = String("http://" + recipientURI)
recipientURI = recipientURI.dropFirst("fishy://".count) // our transport is super silly, and abuses http for its messaging
let requestURI = String("http://" + recipientURI)

let response = try await sendHTTPRequest(requestURI: requestURI, requestData: requestData)
log.debug("Received response \(response)", metadata: [
"response/payload": "\(response.body?.getString(at: 0, length: response.body?.readableBytes ?? 0) ?? "")"
])
let response = try await sendHTTPRequest(requestURI: requestURI, requestData: requestData)
log.debug("Received response \(response)", metadata: [
"response/payload": "\(response.body?.getString(at: 0, length: response.body?.readableBytes ?? 0) ?? "")"
])

return response
return response
}
}

private func sendHTTPRequest(requestURI: String, requestData: Data) async throws -> HTTPClient.Response {
let request = try HTTPClient.Request(
url: requestURI,
method: .POST,
headers: [
"Content-Type": "application/json"
],
body: .data(requestData))

let future = client.execute(
request: request,
deadline: .now() + .seconds(3)) // A real implementation would allow configuring these (i.e. pick up a task-local deadline)

return try await future.get()
try await InstrumentationSystem.tracer.withSpan("HTTP POST", ofKind: .client) { span in
let request = try HTTPClient.Request(
url: requestURI,
method: .POST,
headers: [
"Content-Type": "application/json"
],
body: .data(requestData))
span.attributes["http.method"] = "POST"
span.attributes["http.url"] = requestURI

let future = client.execute(
request: request,
deadline: .now() + .seconds(3)) // A real implementation would allow configuring these (i.e. pick up a task-local deadline)

let response = try await future.get()
span.attributes["http.status_code"] = Int(response.status.code)
return response
}
}

/// Actually deliver the message to the local recipient
func deliver(envelope: Envelope) async throws -> Data {
log.debug("Deliver to \(envelope.recipient)")

guard let known = resolveRecipient(of: envelope) else {
throw handleDeadLetter(envelope)
}

log.debug("Delivering to local instance: \(known)", metadata: [
"envelope": "\(envelope)",
"recipient": "\(known)",
])

// In a real implementation coders would often be configurable on transport level.
//
// The transport must ensure to store itself in the user info offered to receive
// as it may need to attempt to decode actor references.
let encoder = JSONEncoder()
let decoder = JSONDecoder()
encoder.userInfo[.actorTransportKey] = self
decoder.userInfo[.actorTransportKey] = self
var baggage = Baggage.current ?? .topLevel
InstrumentationSystem.instrument.extract(
envelope.metadata,
into: &baggage,
using: MessageEnvelopeMetadataExtractor()
)

return try await Baggage.$current.withValue(baggage) {
log.debug("Deliver to \(envelope.recipient)")

guard let known = resolveRecipient(of: envelope) else {
throw handleDeadLetter(envelope)
}

do {
return try await known._receiveAny(envelope: envelope, encoder: encoder, decoder: decoder)
} catch {
fatalError("Failed to deliver: \(error)")
log.debug("Delivering to local instance: \(known)", metadata: [
"envelope": "\(envelope)",
"recipient": "\(known)",
])

// In a real implementation coders would often be configurable on transport level.
//
// The transport must ensure to store itself in the user info offered to receive
// as it may need to attempt to decode actor references.
let encoder = JSONEncoder()
let decoder = JSONDecoder()
encoder.userInfo[.actorTransportKey] = self
decoder.userInfo[.actorTransportKey] = self

do {
return try await known._receiveAny(envelope: envelope, encoder: encoder, decoder: decoder)
} catch {
fatalError("Failed to deliver: \(error)")
}
}
}

Expand Down Expand Up @@ -357,8 +379,10 @@ public struct Envelope: Sendable, Codable {

/// Represents a `void` return type of a distributed call.
/// Pass this to `send` to avoid decoding any value from the response.
public enum NoResponse: Codable {
public enum NoResponse: Codable, FishyActorTransport.FishyMessage {
case _instance

public var functionIdentifier: String { "noResponse" }
}

extension DistributedActor {
Expand Down Expand Up @@ -392,4 +416,19 @@ public struct UnknownRecipientError: ActorTransportError {

public struct RecipientReleasedError: ActorTransportError {
let recipient: AnyActorIdentity
}
}

// ==== ----------------------------------------------------------------------------------------------------------------
// MARK: Instrumentation

struct MessageEnvelopeMetadataInjector: Injector {
func inject(_ value: String, forKey key: String, into metadata: inout [String: String]) {
metadata[key] = value
}
}

struct MessageEnvelopeMetadataExtractor: Extractor {
func extract(key: String, from metadata: [String: String]) -> String? {
metadata[key]
}
}
44 changes: 43 additions & 1 deletion FishyTransport/Sources/FishyActorsGenerator/SourceGen.swift
Expand Up @@ -71,7 +71,7 @@ final class SourceGen {
// -- emit a `case` that represents the function
//
sourceText += """
enum _Message: Sendable, Codable {
enum _Message: Sendable, FishyActorTransport.FishyMessage {
"""

Expand All @@ -98,6 +98,13 @@ final class SourceGen {
sourceText += ")\n"
}

sourceText += "\n var functionIdentifier: String {\n switch self {\n"
for fun in decl.funcs {
sourceText += " case .\(fun.name):\n return \"\(fun.name)(\(fun.renderFuncParams(forFuncIdentifier: true)))\"\n"
}
sourceText += " }\n"
sourceText += " }\n"

sourceText += " }\n \n"
// ==== Generate the "receive"-side, we must decode the incoming Envelope
// into a _Message and apply it to our local actor.
Expand Down Expand Up @@ -243,3 +250,38 @@ extension FuncDecl {
}.joined(separator: ", ") + ")"
}
}

extension FuncDecl {
fileprivate func renderFuncParams(forFuncIdentifier: Bool = false) -> String {
var result = params.map { first, second, type in
// FIXME: super naive... replace with something more proper
if let name = first {
if name == second || forFuncIdentifier {
// no need to write `name name: String`
var ret = "\(name)"
if (!forFuncIdentifier) {
ret += ": \(type)"
}
return ret
} else {
var ret = "\(name) \(second):"
if (!forFuncIdentifier) {
ret += " \(type)"
}
return ret
}
} else {
if (forFuncIdentifier) {
return "_"
} else {
return "\(second): \(type)"
}
}
}.joined(separator: forFuncIdentifier ? ":" : ", ")

if forFuncIdentifier && !self.params.isEmpty {
result += ":"
}
return result
}
}
11 changes: 11 additions & 0 deletions README.md
Expand Up @@ -70,6 +70,17 @@ Notice that the simplified ID printout contains the port number of the node the
This sample is a distributed application created from just a single process, but all the "nodes" communicate through networking with eachother.
The same application could be launched on different physical hosts (and then would have different IP addresses), this is what location transparency of distributed actors enables us to do.

### Distributed Tracing

Additionally, the calls can also be traced using "Swift Distributed Tracing". To use tracing for the sample app, start the services in [`docker-compose.yaml`](SampleApp/docker-compose.yaml), then run the sample again.

```sh
docker compose -p fishy-transport-sample up -d
```

Afterwards, open Jaeger [http://localhost:16686](http://localhost:16686) and you'll see traces similar to this:

![Jaeger Example Trace](images/jaeger-trace.png)

### Experimental flags

Expand Down

0 comments on commit b18a0b5

Please sign in to comment.