diff --git a/Sources/HummingbirdCore/Server/EventLoopExecutor.swift b/Sources/HummingbirdCore/Server/EventLoopExecutor.swift new file mode 100644 index 000000000..8db501365 --- /dev/null +++ b/Sources/HummingbirdCore/Server/EventLoopExecutor.swift @@ -0,0 +1,63 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Hummingbird server framework project +// +// Copyright (c) 2024 the Hummingbird authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import NIOCore + +#if compiler(>=6.0) +final class EventLoopExecutor: TaskExecutor, SerialExecutor { + @usableFromInline let eventLoop: EventLoop + + init(eventLoop: EventLoop) { + self.eventLoop = eventLoop + } + + func asUnownedTaskExecutor() -> UnownedTaskExecutor { + UnownedTaskExecutor(ordinary: self) + } + + @inlinable + func enqueue(_ job: consuming ExecutorJob) { + let job = UnownedJob(job) + self.eventLoop.execute { + job.runSynchronously(on: self.asUnownedTaskExecutor()) + } + } + + @inlinable + func asUnownedSerialExecutor() -> UnownedSerialExecutor { + UnownedSerialExecutor(complexEquality: self) + } + + @inlinable + func isSameExclusiveExecutionContext(other: EventLoopExecutor) -> Bool { + self.eventLoop === other.eventLoop + } +} + +struct EventLoopExecutorMap { + init(eventLoopGroup: EventLoopGroup) { + var executors: [ObjectIdentifier: EventLoopExecutor] = [:] + for eventLoop in eventLoopGroup.makeIterator() { + executors[ObjectIdentifier(eventLoop)] = EventLoopExecutor(eventLoop: eventLoop) + } + self.executors = executors + } + + subscript(eventLoop: EventLoop) -> EventLoopExecutor? { + return self.executors[ObjectIdentifier(eventLoop)] + } + + let executors: [ObjectIdentifier: EventLoopExecutor] +} +#endif // swift(>=6.0) diff --git a/Sources/HummingbirdCore/Server/HTTP/HTTP1Channel.swift b/Sources/HummingbirdCore/Server/HTTP/HTTP1Channel.swift index 6c2d6e224..9a8efe112 100644 --- a/Sources/HummingbirdCore/Server/HTTP/HTTP1Channel.swift +++ b/Sources/HummingbirdCore/Server/HTTP/HTTP1Channel.swift @@ -18,6 +18,10 @@ import NIOCore import NIOHTTPTypes import NIOHTTPTypesHTTP1 +extension NIOAsyncChannel: ChildChannelValue { + public var eventLoop: EventLoop { self.channel.eventLoop } +} + /// Child channel for processing HTTP1 public struct HTTP1Channel: ServerChildChannel, HTTPChannelHandler { public typealias Value = NIOAsyncChannel @@ -41,9 +45,9 @@ public struct HTTP1Channel: ServerChildChannel, HTTPChannelHandler { /// - Returns: Object to process input/output on child channel public func setup(channel: Channel, logger: Logger) -> EventLoopFuture { let childChannelHandlers: [any ChannelHandler] = - [HTTP1ToHTTPServerCodec(secure: false)] + - self.additionalChannelHandlers() + - [HTTPUserEventHandler(logger: logger)] + [HTTP1ToHTTPServerCodec(secure: false)] + self.additionalChannelHandlers() + [ + HTTPUserEventHandler(logger: logger), + ] return channel.eventLoop.makeCompletedFuture { try channel.pipeline.syncOperations.configureHTTPServerPipeline( withPipeliningAssistance: false, // HTTP is pipelined by NIOAsyncChannel @@ -62,7 +66,10 @@ public struct HTTP1Channel: ServerChildChannel, HTTPChannelHandler { /// - Parameters: /// - value: Object to process input/output on child channel /// - logger: Logger to use while processing messages - public func handle(value asyncChannel: NIOCore.NIOAsyncChannel, logger: Logging.Logger) async { + public func handle( + value asyncChannel: NIOCore.NIOAsyncChannel, + logger: Logging.Logger + ) async { await handleHTTP(asyncChannel: asyncChannel, logger: logger) } diff --git a/Sources/HummingbirdCore/Server/Server.swift b/Sources/HummingbirdCore/Server/Server.swift index 5d9a44ec2..443bad83b 100644 --- a/Sources/HummingbirdCore/Server/Server.swift +++ b/Sources/HummingbirdCore/Server/Server.swift @@ -16,11 +16,12 @@ import Logging import NIOCore import NIOExtras import NIOPosix +import ServiceLifecycle + #if canImport(Network) import Network import NIOTransportServices #endif -import ServiceLifecycle /// HTTP server class public actor Server: Service { @@ -113,14 +114,32 @@ public actor Server: Service { await onServerRunning?(asyncChannel.channel) let logger = self.logger + #if compiler(>=6.0) + let eventLoopExecutorMap = EventLoopExecutorMap( + eventLoopGroup: self.eventLoopGroup) + #endif // We can now start to handle our work. await withDiscardingTaskGroup { group in do { try await asyncChannel.executeThenClose { inbound in for try await childChannel in inbound { + #if compiler(>=6.0) + group.addTask( + executorPreference: eventLoopExecutorMap[ + childChannel.eventLoop + ] + ) { + await childChannelSetup.handle( + value: childChannel, logger: logger + ) + } + #else group.addTask { - await childChannelSetup.handle(value: childChannel, logger: logger) + await childChannelSetup.handle( + value: childChannel, logger: logger + ) } + #endif } } } catch { @@ -179,17 +198,23 @@ public actor Server: Service { /// Start server /// - Parameter responder: Object that provides responses to requests sent to the server /// - Returns: EventLoopFuture that is fulfilled when server has started - nonisolated func makeServer(childChannelSetup: ChildChannel, configuration: ServerConfiguration) async throws -> AsyncServerChannel { + nonisolated func makeServer(childChannelSetup: ChildChannel, configuration: ServerConfiguration) + async throws -> AsyncServerChannel + { let bootstrap: ServerBootstrapProtocol #if canImport(Network) if let tsBootstrap = self.createTSBootstrap(configuration: configuration) { bootstrap = tsBootstrap } else { #if os(iOS) || os(tvOS) - self.logger.warning("Running BSD sockets on iOS or tvOS is not recommended. Please use NIOTSEventLoopGroup, to run with the Network framework") + self.logger.warning( + "Running BSD sockets on iOS or tvOS is not recommended. Please use NIOTSEventLoopGroup, to run with the Network framework" + ) #endif if configuration.tlsOptions.options != nil { - self.logger.warning("tlsOptions set in Configuration will not be applied to a BSD sockets server. Please use NIOTSEventLoopGroup, to run with the Network framework") + self.logger.warning( + "tlsOptions set in Configuration will not be applied to a BSD sockets server. Please use NIOTSEventLoopGroup, to run with the Network framework" + ) } bootstrap = self.createSocketsBootstrap(configuration: configuration) } @@ -212,7 +237,9 @@ public actor Server: Service { logger: self.logger ) } - self.logger.info("Server started and listening on \(host):\(asyncChannel.channel.localAddress?.port ?? port)") + self.logger.info( + "Server started and listening on \(host):\(asyncChannel.channel.localAddress?.port ?? port)" + ) return asyncChannel case .unixDomainSocket(let path): @@ -242,8 +269,14 @@ public actor Server: Service { return ServerBootstrap(group: self.eventLoopGroup) // Specify backlog and enable SO_REUSEADDR for the server itself .serverChannelOption(ChannelOptions.backlog, value: numericCast(configuration.backlog)) - .serverChannelOption(ChannelOptions.socketOption(.so_reuseaddr), value: configuration.reuseAddress ? 1 : 0) - .childChannelOption(ChannelOptions.socketOption(.so_reuseaddr), value: configuration.reuseAddress ? 1 : 0) + .serverChannelOption( + ChannelOptions.socketOption(.so_reuseaddr), + value: configuration.reuseAddress ? 1 : 0 + ) + .childChannelOption( + ChannelOptions.socketOption(.so_reuseaddr), + value: configuration.reuseAddress ? 1 : 0 + ) .childChannelOption(ChannelOptions.maxMessagesPerRead, value: 1) .childChannelOption(ChannelOptions.allowRemoteHalfClosure, value: true) } @@ -254,10 +287,17 @@ public actor Server: Service { private nonisolated func createTSBootstrap( configuration: ServerConfiguration ) -> NIOTSListenerBootstrap? { - guard let bootstrap = NIOTSListenerBootstrap(validatingGroup: self.eventLoopGroup)? - .serverChannelOption(ChannelOptions.socketOption(.so_reuseaddr), value: configuration.reuseAddress ? 1 : 0) + guard + let bootstrap = NIOTSListenerBootstrap(validatingGroup: self.eventLoopGroup)? + .serverChannelOption( + ChannelOptions.socketOption(.so_reuseaddr), + value: configuration.reuseAddress ? 1 : 0 + ) // Set the handlers that are applied to the accepted Channels - .childChannelOption(ChannelOptions.socketOption(.so_reuseaddr), value: configuration.reuseAddress ? 1 : 0) + .childChannelOption( + ChannelOptions.socketOption(.so_reuseaddr), + value: configuration.reuseAddress ? 1 : 0 + ) .childChannelOption(ChannelOptions.allowRemoteHalfClosure, value: true) else { return nil @@ -276,14 +316,16 @@ protocol ServerBootstrapProtocol { func bind( host: String, port: Int, - serverBackPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark?, + serverBackPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies + .HighLowWatermark?, childChannelInitializer: @escaping @Sendable (Channel) -> EventLoopFuture ) async throws -> NIOAsyncChannel func bind( unixDomainSocketPath: String, cleanupExistingSocketFile: Bool, - serverBackPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark?, + serverBackPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies + .HighLowWatermark?, childChannelInitializer: @escaping @Sendable (Channel) -> EventLoopFuture ) async throws -> NIOAsyncChannel } @@ -299,7 +341,8 @@ extension NIOTSListenerBootstrap: ServerBootstrapProtocol { func bind( unixDomainSocketPath: String, cleanupExistingSocketFile: Bool, - serverBackPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark?, + serverBackPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies + .HighLowWatermark?, childChannelInitializer: @escaping @Sendable (Channel) -> EventLoopFuture ) async throws -> NIOAsyncChannel { preconditionFailure("Binding to a unixDomainSocketPath is currently not available") diff --git a/Sources/HummingbirdCore/Server/ServerChildChannel.swift b/Sources/HummingbirdCore/Server/ServerChildChannel.swift index 3654731f4..c93f9ed1c 100644 --- a/Sources/HummingbirdCore/Server/ServerChildChannel.swift +++ b/Sources/HummingbirdCore/Server/ServerChildChannel.swift @@ -16,9 +16,13 @@ import Logging import NIOCore import ServiceLifecycle +public protocol ChildChannelValue: Sendable { + var eventLoop: EventLoop { get } +} + /// HTTPServer child channel setup protocol public protocol ServerChildChannel: Sendable { - associatedtype Value: Sendable + associatedtype Value: ChildChannelValue /// Setup child channel /// - Parameters: diff --git a/Sources/HummingbirdHTTP2/HTTP2Channel.swift b/Sources/HummingbirdHTTP2/HTTP2Channel.swift index 355d007e6..c5429c470 100644 --- a/Sources/HummingbirdHTTP2/HTTP2Channel.swift +++ b/Sources/HummingbirdHTTP2/HTTP2Channel.swift @@ -25,7 +25,19 @@ import NIOSSL /// Child channel for processing HTTP1 with the option of upgrading to HTTP2 public struct HTTP2UpgradeChannel: HTTPChannelHandler { - public typealias Value = EventLoopFuture, NIOHTTP2Handler.AsyncStreamMultiplexer)>> + public struct Value: ChildChannelValue { + let negotiatedResult: + EventLoopFuture< + NIONegotiatedHTTPVersion< + HTTP1Channel.Value, + ( + NIOAsyncChannel, + NIOHTTP2Handler.AsyncStreamMultiplexer + ) + > + > + public let eventLoop: EventLoop + } private let sslContext: NIOSSLContext private let http1: HTTP1Channel @@ -40,13 +52,17 @@ public struct HTTP2UpgradeChannel: HTTPChannelHandler { public init( tlsConfiguration: TLSConfiguration, additionalChannelHandlers: @escaping @Sendable () -> [any RemovableChannelHandler] = { [] }, - responder: @escaping @Sendable (Request, Channel) async throws -> Response = { _, _ in throw HTTPError(.notImplemented) } + responder: @escaping @Sendable (Request, Channel) async throws -> Response = { _, _ in + throw HTTPError(.notImplemented) + } ) throws { var tlsConfiguration = tlsConfiguration tlsConfiguration.applicationProtocols = NIOHTTP2SupportedALPNProtocols self.sslContext = try NIOSSLContext(configuration: tlsConfiguration) self.additionalChannelHandlers = additionalChannelHandlers - self.http1 = HTTP1Channel(responder: responder, additionalChannelHandlers: additionalChannelHandlers) + self.http1 = HTTP1Channel( + responder: responder, additionalChannelHandlers: additionalChannelHandlers + ) } /// Setup child channel for HTTP1 with HTTP2 upgrade @@ -56,42 +72,47 @@ public struct HTTP2UpgradeChannel: HTTPChannelHandler { /// - Returns: Object to process input/output on child channel public func setup(channel: Channel, logger: Logger) -> EventLoopFuture { do { - try channel.pipeline.syncOperations.addHandler(NIOSSLServerHandler(context: self.sslContext)) + try channel.pipeline.syncOperations.addHandler( + NIOSSLServerHandler(context: self.sslContext)) } catch { return channel.eventLoop.makeFailedFuture(error) } - return channel.configureAsyncHTTPServerPipeline { http1Channel -> EventLoopFuture in - let childChannelHandlers: [ChannelHandler] = - [HTTP1ToHTTPServerCodec(secure: false)] + - self.additionalChannelHandlers() + - [HTTPUserEventHandler(logger: logger)] + return + channel.configureAsyncHTTPServerPipeline { + http1Channel -> EventLoopFuture in + let childChannelHandlers: [ChannelHandler] = + [HTTP1ToHTTPServerCodec(secure: false)] + self.additionalChannelHandlers() + [ + HTTPUserEventHandler(logger: logger), + ] - return http1Channel - .pipeline - .addHandlers(childChannelHandlers) - .flatMapThrowing { - try HTTP1Channel.Value(wrappingChannelSynchronously: http1Channel) + return http1Channel + .pipeline + .addHandlers(childChannelHandlers) + .flatMapThrowing { + try HTTP1Channel.Value(wrappingChannelSynchronously: http1Channel) + } + } http2ConnectionInitializer: { + http2Channel -> EventLoopFuture> in + http2Channel.eventLoop.makeCompletedFuture { + try NIOAsyncChannel( + wrappingChannelSynchronously: http2Channel) } - } http2ConnectionInitializer: { http2Channel -> EventLoopFuture> in - http2Channel.eventLoop.makeCompletedFuture { - try NIOAsyncChannel(wrappingChannelSynchronously: http2Channel) - } - } http2StreamInitializer: { http2ChildChannel -> EventLoopFuture in - let childChannelHandlers: [ChannelHandler] = - self.additionalChannelHandlers() + [ - HTTPUserEventHandler(logger: logger), - ] + } http2StreamInitializer: { http2ChildChannel -> EventLoopFuture in + let childChannelHandlers: [ChannelHandler] = + self.additionalChannelHandlers() + [ + HTTPUserEventHandler(logger: logger), + ] - return http2ChildChannel - .pipeline - .addHandler(HTTP2FramePayloadToHTTPServerCodec()) - .flatMap { - http2ChildChannel.pipeline.addHandlers(childChannelHandlers) - }.flatMapThrowing { - try HTTP1Channel.Value(wrappingChannelSynchronously: http2ChildChannel) - } - } + return http2ChildChannel + .pipeline + .addHandler(HTTP2FramePayloadToHTTPServerCodec()) + .flatMap { + http2ChildChannel.pipeline.addHandlers(childChannelHandlers) + }.flatMapThrowing { + try HTTP1Channel.Value(wrappingChannelSynchronously: http2ChildChannel) + } + }.map { Value(negotiatedResult: $0, eventLoop: channel.eventLoop) } } /// handle messages being passed down the channel pipeline @@ -100,7 +121,7 @@ public struct HTTP2UpgradeChannel: HTTPChannelHandler { /// - logger: Logger to use while processing messages public func handle(value: Value, logger: Logger) async { do { - let channel = try await value.get() + let channel = try await value.negotiatedResult.get() switch channel { case .http1_1(let http1): await handleHTTP(asyncChannel: http1, logger: logger)