Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Eventloop executor #430

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
63 changes: 63 additions & 0 deletions Sources/HummingbirdCore/Server/EventLoopExecutor.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Hummingbird server framework project
//
// Copyright (c) 2024 the Hummingbird authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import NIOCore

#if compiler(>=6.0)
final class EventLoopExecutor: TaskExecutor, SerialExecutor {
@usableFromInline let eventLoop: EventLoop

init(eventLoop: EventLoop) {
self.eventLoop = eventLoop
}

func asUnownedTaskExecutor() -> UnownedTaskExecutor {
UnownedTaskExecutor(ordinary: self)
}

@inlinable
func enqueue(_ job: consuming ExecutorJob) {
let job = UnownedJob(job)
self.eventLoop.execute {
job.runSynchronously(on: self.asUnownedTaskExecutor())
}
}

@inlinable
func asUnownedSerialExecutor() -> UnownedSerialExecutor {
UnownedSerialExecutor(complexEquality: self)
}

@inlinable
func isSameExclusiveExecutionContext(other: EventLoopExecutor) -> Bool {
self.eventLoop === other.eventLoop
}
}

struct EventLoopExecutorMap {
init(eventLoopGroup: EventLoopGroup) {
var executors: [ObjectIdentifier: EventLoopExecutor] = [:]
for eventLoop in eventLoopGroup.makeIterator() {
executors[ObjectIdentifier(eventLoop)] = EventLoopExecutor(eventLoop: eventLoop)
}
self.executors = executors
}

subscript(eventLoop: EventLoop) -> EventLoopExecutor? {
return self.executors[ObjectIdentifier(eventLoop)]
}

let executors: [ObjectIdentifier: EventLoopExecutor]
}
#endif // swift(>=6.0)
15 changes: 11 additions & 4 deletions Sources/HummingbirdCore/Server/HTTP/HTTP1Channel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ import NIOCore
import NIOHTTPTypes
import NIOHTTPTypesHTTP1

extension NIOAsyncChannel: ChildChannelValue {
public var eventLoop: EventLoop { self.channel.eventLoop }
}

/// Child channel for processing HTTP1
public struct HTTP1Channel: ServerChildChannel, HTTPChannelHandler {
public typealias Value = NIOAsyncChannel<HTTPRequestPart, HTTPResponsePart>
Expand All @@ -41,9 +45,9 @@ public struct HTTP1Channel: ServerChildChannel, HTTPChannelHandler {
/// - Returns: Object to process input/output on child channel
public func setup(channel: Channel, logger: Logger) -> EventLoopFuture<Value> {
let childChannelHandlers: [any ChannelHandler] =
[HTTP1ToHTTPServerCodec(secure: false)] +
self.additionalChannelHandlers() +
[HTTPUserEventHandler(logger: logger)]
[HTTP1ToHTTPServerCodec(secure: false)] + self.additionalChannelHandlers() + [
HTTPUserEventHandler(logger: logger),
]
return channel.eventLoop.makeCompletedFuture {
try channel.pipeline.syncOperations.configureHTTPServerPipeline(
withPipeliningAssistance: false, // HTTP is pipelined by NIOAsyncChannel
Expand All @@ -62,7 +66,10 @@ public struct HTTP1Channel: ServerChildChannel, HTTPChannelHandler {
/// - Parameters:
/// - value: Object to process input/output on child channel
/// - logger: Logger to use while processing messages
public func handle(value asyncChannel: NIOCore.NIOAsyncChannel<HTTPRequestPart, HTTPResponsePart>, logger: Logging.Logger) async {
public func handle(
value asyncChannel: NIOCore.NIOAsyncChannel<HTTPRequestPart, HTTPResponsePart>,
logger: Logging.Logger
) async {
await handleHTTP(asyncChannel: asyncChannel, logger: logger)
}

Expand Down
71 changes: 57 additions & 14 deletions Sources/HummingbirdCore/Server/Server.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@ import Logging
import NIOCore
import NIOExtras
import NIOPosix
import ServiceLifecycle

#if canImport(Network)
import Network
import NIOTransportServices
#endif
import ServiceLifecycle

/// HTTP server class
public actor Server<ChildChannel: ServerChildChannel>: Service {
Expand Down Expand Up @@ -113,14 +114,32 @@ public actor Server<ChildChannel: ServerChildChannel>: Service {
await onServerRunning?(asyncChannel.channel)

let logger = self.logger
#if compiler(>=6.0)
let eventLoopExecutorMap = EventLoopExecutorMap(
eventLoopGroup: self.eventLoopGroup)
#endif
// We can now start to handle our work.
await withDiscardingTaskGroup { group in
do {
try await asyncChannel.executeThenClose { inbound in
for try await childChannel in inbound {
#if compiler(>=6.0)
group.addTask(
executorPreference: eventLoopExecutorMap[
childChannel.eventLoop
]
) {
await childChannelSetup.handle(
value: childChannel, logger: logger
)
}
#else
group.addTask {
await childChannelSetup.handle(value: childChannel, logger: logger)
await childChannelSetup.handle(
value: childChannel, logger: logger
)
}
#endif
}
}
} catch {
Expand Down Expand Up @@ -179,17 +198,23 @@ public actor Server<ChildChannel: ServerChildChannel>: Service {
/// Start server
/// - Parameter responder: Object that provides responses to requests sent to the server
/// - Returns: EventLoopFuture that is fulfilled when server has started
nonisolated func makeServer(childChannelSetup: ChildChannel, configuration: ServerConfiguration) async throws -> AsyncServerChannel {
nonisolated func makeServer(childChannelSetup: ChildChannel, configuration: ServerConfiguration)
async throws -> AsyncServerChannel
{
let bootstrap: ServerBootstrapProtocol
#if canImport(Network)
if let tsBootstrap = self.createTSBootstrap(configuration: configuration) {
bootstrap = tsBootstrap
} else {
#if os(iOS) || os(tvOS)
self.logger.warning("Running BSD sockets on iOS or tvOS is not recommended. Please use NIOTSEventLoopGroup, to run with the Network framework")
self.logger.warning(
"Running BSD sockets on iOS or tvOS is not recommended. Please use NIOTSEventLoopGroup, to run with the Network framework"
)
#endif
if configuration.tlsOptions.options != nil {
self.logger.warning("tlsOptions set in Configuration will not be applied to a BSD sockets server. Please use NIOTSEventLoopGroup, to run with the Network framework")
self.logger.warning(
"tlsOptions set in Configuration will not be applied to a BSD sockets server. Please use NIOTSEventLoopGroup, to run with the Network framework"
)
}
bootstrap = self.createSocketsBootstrap(configuration: configuration)
}
Expand All @@ -212,7 +237,9 @@ public actor Server<ChildChannel: ServerChildChannel>: Service {
logger: self.logger
)
}
self.logger.info("Server started and listening on \(host):\(asyncChannel.channel.localAddress?.port ?? port)")
self.logger.info(
"Server started and listening on \(host):\(asyncChannel.channel.localAddress?.port ?? port)"
)
return asyncChannel

case .unixDomainSocket(let path):
Expand Down Expand Up @@ -242,8 +269,14 @@ public actor Server<ChildChannel: ServerChildChannel>: Service {
return ServerBootstrap(group: self.eventLoopGroup)
// Specify backlog and enable SO_REUSEADDR for the server itself
.serverChannelOption(ChannelOptions.backlog, value: numericCast(configuration.backlog))
.serverChannelOption(ChannelOptions.socketOption(.so_reuseaddr), value: configuration.reuseAddress ? 1 : 0)
.childChannelOption(ChannelOptions.socketOption(.so_reuseaddr), value: configuration.reuseAddress ? 1 : 0)
.serverChannelOption(
ChannelOptions.socketOption(.so_reuseaddr),
value: configuration.reuseAddress ? 1 : 0
)
.childChannelOption(
ChannelOptions.socketOption(.so_reuseaddr),
value: configuration.reuseAddress ? 1 : 0
)
.childChannelOption(ChannelOptions.maxMessagesPerRead, value: 1)
.childChannelOption(ChannelOptions.allowRemoteHalfClosure, value: true)
}
Expand All @@ -254,10 +287,17 @@ public actor Server<ChildChannel: ServerChildChannel>: Service {
private nonisolated func createTSBootstrap(
configuration: ServerConfiguration
) -> NIOTSListenerBootstrap? {
guard let bootstrap = NIOTSListenerBootstrap(validatingGroup: self.eventLoopGroup)?
.serverChannelOption(ChannelOptions.socketOption(.so_reuseaddr), value: configuration.reuseAddress ? 1 : 0)
guard
let bootstrap = NIOTSListenerBootstrap(validatingGroup: self.eventLoopGroup)?
.serverChannelOption(
ChannelOptions.socketOption(.so_reuseaddr),
value: configuration.reuseAddress ? 1 : 0
)
// Set the handlers that are applied to the accepted Channels
.childChannelOption(ChannelOptions.socketOption(.so_reuseaddr), value: configuration.reuseAddress ? 1 : 0)
.childChannelOption(
ChannelOptions.socketOption(.so_reuseaddr),
value: configuration.reuseAddress ? 1 : 0
)
.childChannelOption(ChannelOptions.allowRemoteHalfClosure, value: true)
else {
return nil
Expand All @@ -276,14 +316,16 @@ protocol ServerBootstrapProtocol {
func bind<Output: Sendable>(
host: String,
port: Int,
serverBackPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark?,
serverBackPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies
.HighLowWatermark?,
childChannelInitializer: @escaping @Sendable (Channel) -> EventLoopFuture<Output>
) async throws -> NIOAsyncChannel<Output, Never>

func bind<Output: Sendable>(
unixDomainSocketPath: String,
cleanupExistingSocketFile: Bool,
serverBackPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark?,
serverBackPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies
.HighLowWatermark?,
childChannelInitializer: @escaping @Sendable (Channel) -> EventLoopFuture<Output>
) async throws -> NIOAsyncChannel<Output, Never>
}
Expand All @@ -299,7 +341,8 @@ extension NIOTSListenerBootstrap: ServerBootstrapProtocol {
func bind<Output: Sendable>(
unixDomainSocketPath: String,
cleanupExistingSocketFile: Bool,
serverBackPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark?,
serverBackPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies
.HighLowWatermark?,
childChannelInitializer: @escaping @Sendable (Channel) -> EventLoopFuture<Output>
) async throws -> NIOAsyncChannel<Output, Never> {
preconditionFailure("Binding to a unixDomainSocketPath is currently not available")
Expand Down
6 changes: 5 additions & 1 deletion Sources/HummingbirdCore/Server/ServerChildChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,13 @@ import Logging
import NIOCore
import ServiceLifecycle

public protocol ChildChannelValue: Sendable {
var eventLoop: EventLoop { get }
}

/// HTTPServer child channel setup protocol
public protocol ServerChildChannel: Sendable {
associatedtype Value: Sendable
associatedtype Value: ChildChannelValue

/// Setup child channel
/// - Parameters:
Expand Down
87 changes: 54 additions & 33 deletions Sources/HummingbirdHTTP2/HTTP2Channel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,19 @@ import NIOSSL

/// Child channel for processing HTTP1 with the option of upgrading to HTTP2
public struct HTTP2UpgradeChannel: HTTPChannelHandler {
public typealias Value = EventLoopFuture<NIONegotiatedHTTPVersion<HTTP1Channel.Value, (NIOAsyncChannel<HTTP2Frame, HTTP2Frame>, NIOHTTP2Handler.AsyncStreamMultiplexer<HTTP1Channel.Value>)>>
public struct Value: ChildChannelValue {
let negotiatedResult:
EventLoopFuture<
NIONegotiatedHTTPVersion<
HTTP1Channel.Value,
(
NIOAsyncChannel<HTTP2Frame, HTTP2Frame>,
NIOHTTP2Handler.AsyncStreamMultiplexer<HTTP1Channel.Value>
)
>
>
public let eventLoop: EventLoop
}

private let sslContext: NIOSSLContext
private let http1: HTTP1Channel
Expand All @@ -40,13 +52,17 @@ public struct HTTP2UpgradeChannel: HTTPChannelHandler {
public init(
tlsConfiguration: TLSConfiguration,
additionalChannelHandlers: @escaping @Sendable () -> [any RemovableChannelHandler] = { [] },
responder: @escaping @Sendable (Request, Channel) async throws -> Response = { _, _ in throw HTTPError(.notImplemented) }
responder: @escaping @Sendable (Request, Channel) async throws -> Response = { _, _ in
throw HTTPError(.notImplemented)
}
) throws {
var tlsConfiguration = tlsConfiguration
tlsConfiguration.applicationProtocols = NIOHTTP2SupportedALPNProtocols
self.sslContext = try NIOSSLContext(configuration: tlsConfiguration)
self.additionalChannelHandlers = additionalChannelHandlers
self.http1 = HTTP1Channel(responder: responder, additionalChannelHandlers: additionalChannelHandlers)
self.http1 = HTTP1Channel(
responder: responder, additionalChannelHandlers: additionalChannelHandlers
)
}

/// Setup child channel for HTTP1 with HTTP2 upgrade
Expand All @@ -56,42 +72,47 @@ public struct HTTP2UpgradeChannel: HTTPChannelHandler {
/// - Returns: Object to process input/output on child channel
public func setup(channel: Channel, logger: Logger) -> EventLoopFuture<Value> {
do {
try channel.pipeline.syncOperations.addHandler(NIOSSLServerHandler(context: self.sslContext))
try channel.pipeline.syncOperations.addHandler(
NIOSSLServerHandler(context: self.sslContext))
} catch {
return channel.eventLoop.makeFailedFuture(error)
}

return channel.configureAsyncHTTPServerPipeline { http1Channel -> EventLoopFuture<HTTP1Channel.Value> in
let childChannelHandlers: [ChannelHandler] =
[HTTP1ToHTTPServerCodec(secure: false)] +
self.additionalChannelHandlers() +
[HTTPUserEventHandler(logger: logger)]
return
channel.configureAsyncHTTPServerPipeline {
http1Channel -> EventLoopFuture<HTTP1Channel.Value> in
let childChannelHandlers: [ChannelHandler] =
[HTTP1ToHTTPServerCodec(secure: false)] + self.additionalChannelHandlers() + [
HTTPUserEventHandler(logger: logger),
]

return http1Channel
.pipeline
.addHandlers(childChannelHandlers)
.flatMapThrowing {
try HTTP1Channel.Value(wrappingChannelSynchronously: http1Channel)
return http1Channel
.pipeline
.addHandlers(childChannelHandlers)
.flatMapThrowing {
try HTTP1Channel.Value(wrappingChannelSynchronously: http1Channel)
}
} http2ConnectionInitializer: {
http2Channel -> EventLoopFuture<NIOAsyncChannel<HTTP2Frame, HTTP2Frame>> in
http2Channel.eventLoop.makeCompletedFuture {
try NIOAsyncChannel<HTTP2Frame, HTTP2Frame>(
wrappingChannelSynchronously: http2Channel)
}
} http2ConnectionInitializer: { http2Channel -> EventLoopFuture<NIOAsyncChannel<HTTP2Frame, HTTP2Frame>> in
http2Channel.eventLoop.makeCompletedFuture {
try NIOAsyncChannel<HTTP2Frame, HTTP2Frame>(wrappingChannelSynchronously: http2Channel)
}
} http2StreamInitializer: { http2ChildChannel -> EventLoopFuture<HTTP1Channel.Value> in
let childChannelHandlers: [ChannelHandler] =
self.additionalChannelHandlers() + [
HTTPUserEventHandler(logger: logger),
]
} http2StreamInitializer: { http2ChildChannel -> EventLoopFuture<HTTP1Channel.Value> in
let childChannelHandlers: [ChannelHandler] =
self.additionalChannelHandlers() + [
HTTPUserEventHandler(logger: logger),
]

return http2ChildChannel
.pipeline
.addHandler(HTTP2FramePayloadToHTTPServerCodec())
.flatMap {
http2ChildChannel.pipeline.addHandlers(childChannelHandlers)
}.flatMapThrowing {
try HTTP1Channel.Value(wrappingChannelSynchronously: http2ChildChannel)
}
}
return http2ChildChannel
.pipeline
.addHandler(HTTP2FramePayloadToHTTPServerCodec())
.flatMap {
http2ChildChannel.pipeline.addHandlers(childChannelHandlers)
}.flatMapThrowing {
try HTTP1Channel.Value(wrappingChannelSynchronously: http2ChildChannel)
}
}.map { Value(negotiatedResult: $0, eventLoop: channel.eventLoop) }
}

/// handle messages being passed down the channel pipeline
Expand All @@ -100,7 +121,7 @@ public struct HTTP2UpgradeChannel: HTTPChannelHandler {
/// - logger: Logger to use while processing messages
public func handle(value: Value, logger: Logger) async {
do {
let channel = try await value.get()
let channel = try await value.negotiatedResult.get()
switch channel {
case .http1_1(let http1):
await handleHTTP(asyncChannel: http1, logger: logger)
Expand Down