Skip to content

Commit

Permalink
Merge pull request #5 from apple/wip-update-to-shipped-feture
Browse files Browse the repository at this point in the history
  • Loading branch information
ktoso committed Dec 5, 2022
2 parents b18a0b5 + f55e8ea commit e37e67b
Show file tree
Hide file tree
Showing 25 changed files with 1,030 additions and 1,909 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ xcuserdata/
DerivedData/
.swiftpm/xcode/package.xcworkspace/contents.xcworkspacedata
SampleApp/.build
FishyTransport/.build
DistributedSampleHTTP/.build
.idea/
56 changes: 56 additions & 0 deletions DistributedSampleHTTP/Package.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// swift-tools-version:5.7
// The swift-tools-version declares the minimum version of Swift required to build this package.

import PackageDescription

/******************************************************************************/
/************************************ CAVEAT **********************************/
/******************************************************************************/
// This package is a pretty "silly" example of an actor transport implementation.
// The general outline of components, where resolves and decode/encodes happen
// is approximately the same as in a real implementation, however several shortcuts
// and simplifications were taken to keep the example simple and easier to follow.
//
// The connection management and general HTTP server/client use in this transport
// is not optimal - far from it - and please take care to not copy this implementation
// directly, but rather use it as an inspiration for what COULD be done using this
// language feature.
let package = Package(
name: "distributed-sample-http",
platforms: [
.macOS(.v13), // because of the 'distributed actor' feature
],
products: [
.library(
name: "DistributedSampleHTTP",
targets: [
"DistributedSampleHTTP"
]
),
],
dependencies: [
.package(url: "https://github.com/apple/swift-log.git", from: "1.4.1"),
.package(url: "https://github.com/apple/swift-nio.git", from: "2.0.0"),
.package(url: "https://github.com/swift-server/async-http-client.git", branch: "1.13.1"),
],
targets: [
.target(
name: "DistributedSampleHTTP",
dependencies: [
.product(name: "NIO", package: "swift-nio"),
.product(name: "_NIOConcurrency", package: "swift-nio"),
.product(name: "Logging", package: "swift-log"),
.product(name: "AsyncHTTPClient", package: "async-http-client"),
]
),

// ==== Tests -----

.testTarget(
name: "DistributedSampleHTTPTests",
dependencies: [
"DistributedSampleHTTP",
]
),
]
)
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//
// This source file is part of the swift-sample-distributed-actors-transport open source project
//
// Copyright (c) 2021 Apple Inc. and the swift-sample-distributed-actors-transport project authors
// Copyright (c) 2018-2022 Apple Inc. and the swift-sample-distributed-actors-transport project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
Expand All @@ -12,19 +12,16 @@
//
//===----------------------------------------------------------------------===//

import _Distributed

import Distributed
import NIO
import NIOHTTP1
import Foundation
import Tracing
import _NIOConcurrency

private final class HTTPHandler: @unchecked Sendable, ChannelInboundHandler, RemovableChannelHandler {
typealias InboundIn = HTTPServerRequestPart
typealias OutboundOut = HTTPServerResponsePart

private let transport: FishyTransport
private let system: HTTPActorSystem

private var messageBytes: ByteBuffer = ByteBuffer()
private var messageRecipientURI: String = ""
Expand All @@ -50,8 +47,8 @@ private final class HTTPHandler: @unchecked Sendable, ChannelInboundHandler, Rem
}
}

init(transport: FishyTransport) {
self.transport = transport
init(system: HTTPActorSystem) {
self.system = system
}

func handlerAdded(context: ChannelHandlerContext) {
Expand All @@ -63,11 +60,18 @@ private final class HTTPHandler: @unchecked Sendable, ChannelInboundHandler, Rem
func channelRead(context: ChannelHandlerContext, data: NIOAny) {
switch unwrapInboundIn(data) {
case .head(let head):
if self.isHealthCheck(head) {
self.respondHealthCheck(context: context)
return
}

guard case .POST = head.method else {
self.respond405(context: context)
return
}

system.log.info("Received [\(head.method) \(head.uri)]")

messageRecipientURI = head.uri
state.requestReceived()

Expand All @@ -86,18 +90,21 @@ private final class HTTPHandler: @unchecked Sendable, ChannelInboundHandler, Rem

func onMessageComplete(context: ChannelHandlerContext, messageBytes: ByteBuffer) {
let decoder = JSONDecoder()
decoder.userInfo[.actorTransportKey] = transport
decoder.userInfo[.actorSystemKey] = system

let envelope: Envelope
let envelope: RemoteCallEnvelope
do {
envelope = try decoder.decode(Envelope.self, from: messageBytes)
envelope = try decoder.decode(RemoteCallEnvelope.self, from: messageBytes)
} catch {
// TODO: log the error
self.system.log.error("Failed to decode \(RemoteCallEnvelope.self)", metadata: [
"error": "\(error)",
])
self.respond500(context: context, error: error)
return
}
let promise = context.eventLoop.makePromise(of: Data.self)
promise.completeWithTask {
try await self.transport.deliver(envelope: envelope)
Task {
await self.system.deliver(envelope: envelope, promise: promise)
}
promise.futureResult.whenComplete { result in
var headers = HTTPHeaders()
Expand All @@ -106,16 +113,32 @@ private final class HTTPHandler: @unchecked Sendable, ChannelInboundHandler, Rem
let responseHead: HTTPResponseHead
let responseBody: ByteBuffer
switch result {
case .failure(let error):
responseHead = HTTPResponseHead(version: .init(major: 1, minor: 1),
status: .internalServerError,
headers: headers)
responseBody = ByteBuffer(string: "Error: \(error)")
case .success(let data):
responseHead = HTTPResponseHead(version: .init(major: 1, minor: 1),
status: .ok,
headers: headers)
responseBody = ByteBuffer(data: data)
case .failure(let error):
let status: HTTPResponseStatus
let errorString: String
if let error = error as? HTTPActorSystemError {
switch error {
case .actorNotFound:
status = .notFound
errorString = "{\"error\"=\"not-found\"}"
default:
status = .internalServerError
errorString = "{\"error\"=\"\(type(of: error))\"}"
}
} else {
status = .internalServerError
errorString = "{\"error\"=\"\(type(of: error))\"}"
}

responseHead = HTTPResponseHead(version: .init(major: 1, minor: 1),
status: status,
headers: headers)
responseBody = ByteBuffer(string: errorString)
case .success(let data):
responseHead = HTTPResponseHead(version: .init(major: 1, minor: 1),
status: .ok,
headers: headers)
responseBody = ByteBuffer(data: data)
}
headers.add(name: "Content-Length", value: String(responseBody.readableBytes))
headers.add(name: "Connection", value: "close")
Expand All @@ -128,13 +151,41 @@ private final class HTTPHandler: @unchecked Sendable, ChannelInboundHandler, Rem
}
}

private func isHealthCheck(_ head: HTTPRequestHead) -> Bool {
head.method == .GET && head.uri == "/__health"
}

private func respondHealthCheck(context: ChannelHandlerContext) {
let headers = HTTPHeaders()
let head = HTTPResponseHead(version: .http1_1,
status: .ok,
headers: headers)
context.write(self.wrapOutboundOut(.head(head)), promise: nil)
context.write(self.wrapOutboundOut(.end(nil)), promise: nil)
context.flush()
}

private func respond405(context: ChannelHandlerContext) {
var headers = HTTPHeaders()
headers.add(name: "Connection", value: "close")
headers.add(name: "Content-Length", value: "0")
let head = HTTPResponseHead(version: .http1_1,
status: .methodNotAllowed,
headers: headers)
status: .methodNotAllowed,
headers: headers)
context.write(self.wrapOutboundOut(.head(head)), promise: nil)
context.write(self.wrapOutboundOut(.end(nil))).whenComplete { (_: Result<Void, Error>) in
context.close(promise: nil)
}
context.flush()
}

private func respond500(context: ChannelHandlerContext, error: Error) {
var headers = HTTPHeaders()
headers.add(name: "Connection", value: "close")
headers.add(name: "Content-Length", value: "0")
let head = HTTPResponseHead(version: .http1_1,
status: .internalServerError,
headers: headers)
context.write(self.wrapOutboundOut(.head(head)), promise: nil)
context.write(self.wrapOutboundOut(.end(nil))).whenComplete { (_: Result<Void, Error>) in
context.close(promise: nil)
Expand All @@ -143,36 +194,37 @@ private final class HTTPHandler: @unchecked Sendable, ChannelInboundHandler, Rem
}
}

final class FishyServer {
/// The HTTP server acting as a front to the actor system, it handles incoming health checks and http requests which are forwared as http messages to the actor system.
final class HTTPActorSystemServer {

var group: EventLoopGroup
let transport: FishyTransport
let system: HTTPActorSystem

var channel: Channel! = nil

init(group: EventLoopGroup, transport: FishyTransport) {
init(group: EventLoopGroup, system: HTTPActorSystem) {
self.group = group
self.transport = transport
self.system = system
}

func bootstrap(host: String, port: Int) throws {
assert(channel == nil)

let bootstrap = ServerBootstrap(group: group)
// Specify backlog and enable SO_REUSEADDR for the server itself
.serverChannelOption(ChannelOptions.backlog, value: 256)
.serverChannelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1)

// Set the handlers that are applied to the accepted Channels
.childChannelInitializer { channel in
let httpHandler = HTTPHandler(transport: self.transport)
return channel.pipeline.configureHTTPServerPipeline().flatMap {
channel.pipeline.addHandler(httpHandler)
}
}

// Enable SO_REUSEADDR for the accepted Channels
.childChannelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1)
// Specify backlog and enable SO_REUSEADDR for the server itself
.serverChannelOption(ChannelOptions.backlog, value: 256)
.serverChannelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1)

// Set the handlers that are applied to the accepted Channels
.childChannelInitializer { channel in
let httpHandler = HTTPHandler(system: self.system)
return channel.pipeline.configureHTTPServerPipeline().flatMap {
channel.pipeline.addHandler(httpHandler)
}
}

// Enable SO_REUSEADDR for the accepted Channels
.childChannelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1)

channel = try bootstrap.bind(host: host, port: port).wait()
assert(channel.localAddress != nil, "localAddress was nil!")
Expand Down
Loading

0 comments on commit e37e67b

Please sign in to comment.