From 033ecf7bfee6c2665475e99b27c232be0c727173 Mon Sep 17 00:00:00 2001 From: Scott Marchant Date: Fri, 14 Nov 2025 14:21:18 -0700 Subject: [PATCH] feat: Implement AsynceEventLoop and MultiThreadedEventLoopGroup using Swift Concurrency and the AsyncEventLoopExecutor. Forms a key foundation for several vapor repositories to use NIO without using NIOPosix. This enables a large amount of wasm compilation for packages that currently consume NIOPosix. --- Sources/NIOAsyncRuntime/AsyncEventLoop.swift | 355 ++++++++++++++++++ .../MultiThreadedEventLoopGroup.swift | 81 ++++ 2 files changed, 436 insertions(+) create mode 100644 Sources/NIOAsyncRuntime/AsyncEventLoop.swift create mode 100644 Sources/NIOAsyncRuntime/MultiThreadedEventLoopGroup.swift diff --git a/Sources/NIOAsyncRuntime/AsyncEventLoop.swift b/Sources/NIOAsyncRuntime/AsyncEventLoop.swift new file mode 100644 index 0000000..8de9a5c --- /dev/null +++ b/Sources/NIOAsyncRuntime/AsyncEventLoop.swift @@ -0,0 +1,355 @@ +//===----------------------------------------------------------------------===// +// +// Copyright (c) 2025 PassiveLogic, Inc. +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Atomics +import NIOCore + +import struct Foundation.UUID + +#if canImport(Dispatch) + import Dispatch +#endif + +// MARK: - AsyncEventLoop - + +/// A single‑threaded `EventLoop` implemented solely with Swift Concurrency. +@available(macOS 13, *) +public final class AsyncEventLoop: EventLoop, @unchecked Sendable { + public enum AsynceEventLoopError: Error { + case cancellationFailure + } + + private let _id = UUID() // unique identifier + private let executor: AsyncEventLoopExecutor + private var cachedSucceededVoidFuture: EventLoopFuture? + private enum ShutdownState: UInt8 { + case running = 0 + case closing = 1 + case closed = 2 + } + private let shutdownState = ManagedAtomic(ShutdownState.running.rawValue) + + public init(manualTimeModeForTesting: Bool = false) { + self.executor = AsyncEventLoopExecutor(loopID: _id, manualTimeMode: manualTimeModeForTesting) + } + + // MARK: - EventLoop basics - + + public var inEventLoop: Bool { + _CurrentEventLoopKey.id == _id + } + + private func isAcceptingNewTasks() -> Bool { + shutdownState.load(ordering: .acquiring) == ShutdownState.running.rawValue + } + + private func isFullyShutdown() -> Bool { + shutdownState.load(ordering: .acquiring) == ShutdownState.closed.rawValue + } + + @_disfavoredOverload + public func execute(_ task: @escaping @Sendable () -> Void) { + guard self.isAcceptingNewTasks() || self._canAcceptExecuteDuringShutdown else { return } + executor.enqueue(task) + } + + private var _canAcceptExecuteDuringShutdown: Bool { + self.inEventLoop + || MultiThreadedEventLoopGroup._GroupContextKey.isFromMultiThreadedEventLoopGroup + } + + // MARK: - Promises / Futures - + + public func makeSucceededFuture(_ value: T) -> EventLoopFuture { + if T.self == Void.self { + return self.makeSucceededVoidFuture() as! EventLoopFuture + } + let p = makePromise(of: T.self) + p.succeed(value) + return p.futureResult + } + + public func makeFailedFuture(_ error: Error) -> EventLoopFuture { + let p = makePromise(of: T.self) + p.fail(error) + return p.futureResult + } + + public func makeSucceededVoidFuture() -> EventLoopFuture { + if self.inEventLoop { + if let cached = self.cachedSucceededVoidFuture { + return cached + } + let future = self.makeSucceededVoidFutureUncached() + self.cachedSucceededVoidFuture = future + return future + } else { + return self.makeSucceededVoidFutureUncached() + } + } + + private func makeSucceededVoidFutureUncached() -> EventLoopFuture { + let promise = self.makePromise(of: Void.self) + promise.succeed(()) + return promise.futureResult + } + + // MARK: - Submitting work - + @preconcurrency + public func submit(_ task: @escaping @Sendable () throws -> T) -> EventLoopFuture { + self.submit { () throws -> _UncheckedSendable in + _UncheckedSendable(try task()) + }.map { $0.value } + } + + public func submit(_ task: @escaping @Sendable () throws -> T) -> EventLoopFuture + { + guard self.isAcceptingNewTasks() else { + return self.makeFailedFuture(EventLoopError.shutdown) + } + let promise = makePromise(of: T.self) + executor.enqueue { + do { + let value = try task() + promise.succeed(value) + } catch { promise.fail(error) } + } + return promise.futureResult + } + + public func flatSubmit(_ task: @escaping @Sendable () -> EventLoopFuture) + -> EventLoopFuture + { + guard self.isAcceptingNewTasks() else { + return self.makeFailedFuture(EventLoopError.shutdown) + } + let promise = makePromise(of: T.self) + executor.enqueue { + let future = task() + future.cascade(to: promise) + } + return promise.futureResult + } + + // MARK: - Scheduling - + + /// NOTE: + /// + /// Timing for execute vs submit vs schedule: + /// + /// Tasks scheduled via `execute` or `submit` are appended to the back of the event loop's task queue + /// and are executed serially in FIFO order. Scheduled tasks (e.g., via `schedule(deadline:)`) are + /// placed in a timing wheel and, when their deadline arrives, are enqueued at the back of the main + /// queue after any already-pending work. This means that if the event loop is backed up, a scheduled + /// task may execute slightly after its scheduled time, as it must wait for previously enqueued tasks + /// to finish. Scheduled tasks never preempt or jump ahead of already-queued immediate work. + @preconcurrency + public func scheduleTask( + deadline: NIODeadline, + _ task: @escaping @Sendable () throws -> T + ) -> Scheduled { + let scheduled: Scheduled<_UncheckedSendable> = self._scheduleTask( + deadline: deadline, + task: { try _UncheckedSendable(task()) } + ) + return self._unsafelyRewrapScheduled(scheduled) + } + + public func scheduleTask( + deadline: NIODeadline, + _ task: @escaping @Sendable () throws -> T + ) -> Scheduled { + self._scheduleTask(deadline: deadline, task: task) + } + + @preconcurrency + public func scheduleTask( + in delay: TimeAmount, + _ task: @escaping @Sendable () throws -> T + ) -> Scheduled { + let scheduled: Scheduled<_UncheckedSendable> = self._scheduleTask( + in: delay, + task: { try _UncheckedSendable(task()) } + ) + return self._unsafelyRewrapScheduled(scheduled) + } + + public func scheduleTask( + in delay: TimeAmount, + _ task: @escaping @Sendable () throws -> T + ) -> Scheduled { + self._scheduleTask(in: delay, task: task) + } + + private func _scheduleTask( + deadline: NIODeadline, + task: @escaping @Sendable () throws -> T + ) -> Scheduled { + let promise = makePromise(of: T.self) + guard self.isAcceptingNewTasks() else { + promise.fail(EventLoopError._shutdown) + return Scheduled(promise: promise) {} + } + + let jobID = executor.schedule( + at: deadline, + job: { + do { + promise.succeed(try task()) + } catch { + promise.fail(error) + } + }, + failFn: { error in + promise.fail(error) + } + ) + + return Scheduled(promise: promise) { [weak self] in + // NOTE: Documented cancellation procedure indicates + // cancellation is not guaranteed. As such, and to match existing Promise API's, + // using a Task here to avoid pushing async up the software stack. + self?.executor.cancelScheduledJob(withID: jobID) + + // NOTE: NIO Core already fails the promise before calling the cancellation closure, + // so we do NOT try to fail the promise. Also cancellation is not guaranteed, so we + // allow cancellation to silently fail rather than re-negotiating to a throwing API. + } + } + + private func _scheduleTask( + in delay: TimeAmount, + task: @escaping @Sendable () throws -> T + ) -> Scheduled { + // NOTE: This is very similar to the `scheduleTask(deadline:)` implementation. However + // due to the nonisolated context here, we keep the implementations separate until they + // reach isolating mechanisms within the executor. + + let promise = makePromise(of: T.self) + guard self.isAcceptingNewTasks() else { + promise.fail(EventLoopError._shutdown) + return Scheduled(promise: promise) {} + } + + let jobID = executor.schedule( + after: delay, + job: { + do { + promise.succeed(try task()) + } catch { + promise.fail(error) + } + }, + failFn: { error in + promise.fail(error) + } + ) + + return Scheduled(promise: promise) { [weak self] in + // NOTE: Documented cancellation procedure indicates + // cancellation is not guaranteed. As such, and to match existing Promise API's, + // using a Task here to avoid pushing async up the software stack. + self?.executor.cancelScheduledJob(withID: jobID) + + // NOTE: NIO Core already fails the promise before calling the cancellation closure, + // so we do NOT try to fail the promise. Also cancellation is not guaranteed, so we + // allow cancellation to silently fail rather than re-negotiating to a throwing API. + } + } + + func closeGracefully() async { + let previous = shutdownState.exchange(ShutdownState.closing.rawValue, ordering: .acquiring) + guard ShutdownState(rawValue: previous) != .closed else { return } + self.cachedSucceededVoidFuture = nil + await executor.clearQueue() + shutdownState.store(ShutdownState.closed.rawValue, ordering: .releasing) + } + + public func next() -> EventLoop { + self + } + public func any() -> EventLoop { + self + } + + /// Moves time forward by specified increment, and runs event loop, causing + /// all pending events either from enqueing or scheduling requirements to run. + func advanceTime(by increment: TimeAmount) async throws { + try await executor.advanceTime(by: increment) + } + + func advanceTime(to deadline: NIODeadline) async throws { + try await executor.advanceTime(to: deadline) + } + + func run() async { + await executor.run() + } + + #if canImport(Dispatch) + public func shutdownGracefully( + queue: DispatchQueue, _ callback: @escaping @Sendable (Error?) -> Void + ) { + if MultiThreadedEventLoopGroup._GroupContextKey.isFromMultiThreadedEventLoopGroup { + Task { + await closeGracefully() + queue.async { callback(nil) } + } + } else { + // Bypassing the group shutdown and calling an event loop + // shutdown directly is considered api-misuse + callback(EventLoopError.unsupportedOperation) + } + } + #endif + + public func syncShutdownGracefully() throws { + // The test AsyncEventLoopTests.testIllegalCloseOfEventLoopFails requires + // this implementation to throw an error, because uses should call shutdown on + // MultiThreadedEventLoopGroup instead of calling it directly on the loop. + throw EventLoopError.unsupportedOperation + } + + public func shutdownGracefully() async throws { + await self.closeGracefully() + } + + #if !canImport(Dispatch) + public func _preconditionSafeToSyncShutdown(file: StaticString, line: UInt) { + assertionFailure("Synchronous shutdown API's are not currently supported by AsyncEventLoop") + } + #endif + + @preconcurrency + private func _unsafelyRewrapScheduled( + _ scheduled: Scheduled<_UncheckedSendable> + ) -> Scheduled { + let promise = self.makePromise(of: T.self) + scheduled.futureResult.whenComplete { result in + switch result { + case .success(let boxed): + promise.assumeIsolatedUnsafeUnchecked().succeed(boxed.value) + case .failure(let error): + promise.fail(error) + } + } + return Scheduled(promise: promise) { + scheduled.cancel() + } + } + + /// This is a shim used to support older protocol-required API's without compiler warnings, and provide more modern + /// concurrency-ready overloads. + private struct _UncheckedSendable: @unchecked Sendable { + let value: T + init(_ value: T) { self.value = value } + } +} diff --git a/Sources/NIOAsyncRuntime/MultiThreadedEventLoopGroup.swift b/Sources/NIOAsyncRuntime/MultiThreadedEventLoopGroup.swift new file mode 100644 index 0000000..d632806 --- /dev/null +++ b/Sources/NIOAsyncRuntime/MultiThreadedEventLoopGroup.swift @@ -0,0 +1,81 @@ +//===----------------------------------------------------------------------===// +// +// Copyright (c) 2025 PassiveLogic, Inc. +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import class Atomics.ManagedAtomic +import protocol NIOCore.EventLoop +import protocol NIOCore.EventLoopGroup +import struct NIOCore.EventLoopIterator +import enum NIOCore.System + +#if canImport(Dispatch) + import Dispatch +#endif + +/// An `EventLoopGroup` which will create multiple `EventLoop`s, each tied to its own task pool. +/// +/// This implementation relies on SwiftConcurrency and does not directly instantiate any actual threads. +/// This reduces risk and fallout if the event loop group is not shutdown gracefully, compared to the NIOPosix +/// `MultiThreadedEventLoopGroup` implementation. +@available(macOS 13, *) +public final class MultiThreadedEventLoopGroup: EventLoopGroup, @unchecked Sendable { + /// Task‑local key that stores a boolean that helps AsyncEventLoop know + /// if shutdown calls are being made from this event loop group, or external + /// + /// Safety mechanisms prevent calling shutdown direclty on a loop. + @available(macOS 13, *) + enum _GroupContextKey { @TaskLocal static var isFromMultiThreadedEventLoopGroup: Bool = false } + + private let loops: [AsyncEventLoop] + private let counter = ManagedAtomic(0) + + public init(numberOfThreads: Int = System.coreCount) { + precondition(numberOfThreads > 0, "thread count must be positive") + self.loops = (0.. EventLoop { + loops[counter.loadThenWrappingIncrement(ordering: .sequentiallyConsistent) % loops.count] + } + + public func any() -> EventLoop { loops[0] } + + public func makeIterator() -> NIOCore.EventLoopIterator { + .init(self.loops.map { $0 as EventLoop }) + } + + #if canImport(Dispatch) + public func shutdownGracefully( + queue: DispatchQueue, _ onCompletion: @escaping @Sendable (Error?) -> Void + ) { + Task { + await _GroupContextKey.$isFromMultiThreadedEventLoopGroup.withValue(true) { + for loop in loops { await loop.closeGracefully() } + + queue.async { + onCompletion(nil) + } + } + } + } + #endif // canImport(Dispatch) + + public static let singleton = MultiThreadedEventLoopGroup() + + #if !canImport(Dispatch) + public func _preconditionSafeToSyncShutdown(file: StaticString, line: UInt) { + assertionFailure( + "Synchronous shutdown API's are not currently supported by MultiThreadedEventLoopGroup") + } + #endif +}