diff --git a/Sources/NIOCore/EventLoopFuture+Deprecated.swift b/Sources/NIOCore/EventLoopFuture+Deprecated.swift index 75cfb07162..be7204dea9 100644 --- a/Sources/NIOCore/EventLoopFuture+Deprecated.swift +++ b/Sources/NIOCore/EventLoopFuture+Deprecated.swift @@ -15,7 +15,7 @@ extension EventLoopFuture { @inlinable @available(*, deprecated, message: "Please don't pass file:line:, there's no point.") - public func flatMap(file: StaticString = #fileID, line: UInt = #line, _ callback: @escaping (Value) -> EventLoopFuture) -> EventLoopFuture { + public func flatMap(file: StaticString = #fileID, line: UInt = #line, _ callback: @Sendable @escaping (Value) -> EventLoopFuture) -> EventLoopFuture { return self.flatMap(callback) } @@ -23,25 +23,25 @@ extension EventLoopFuture { @available(*, deprecated, message: "Please don't pass file:line:, there's no point.") public func flatMapThrowing(file: StaticString = #fileID, line: UInt = #line, - _ callback: @escaping (Value) throws -> NewValue) -> EventLoopFuture { + _ callback: @Sendable @escaping (Value) throws -> NewValue) -> EventLoopFuture { return self.flatMapThrowing(callback) } @inlinable @available(*, deprecated, message: "Please don't pass file:line:, there's no point.") - public func flatMapErrorThrowing(file: StaticString = #fileID, line: UInt = #line, _ callback: @escaping (Error) throws -> Value) -> EventLoopFuture { + public func flatMapErrorThrowing(file: StaticString = #fileID, line: UInt = #line, _ callback: @Sendable @escaping (Error) throws -> Value) -> EventLoopFuture where Value: Sendable { return self.flatMapErrorThrowing(callback) } @inlinable @available(*, deprecated, message: "Please don't pass file:line:, there's no point.") - public func map(file: StaticString = #fileID, line: UInt = #line, _ callback: @escaping (Value) -> (NewValue)) -> EventLoopFuture { + public func map(file: StaticString = #fileID, line: UInt = #line, _ callback: @Sendable @escaping (Value) -> (NewValue)) -> EventLoopFuture { return self.map(callback) } @inlinable @available(*, deprecated, message: "Please don't pass file:line:, there's no point.") - public func flatMapError(file: StaticString = #fileID, line: UInt = #line, _ callback: @escaping (Error) -> EventLoopFuture) -> EventLoopFuture { + public func flatMapError(file: StaticString = #fileID, line: UInt = #line, _ callback: @Sendable @escaping (Error) -> EventLoopFuture) -> EventLoopFuture where Value: Sendable { return self.flatMapError(callback) } @@ -49,19 +49,19 @@ extension EventLoopFuture { @available(*, deprecated, message: "Please don't pass file:line:, there's no point.") public func flatMapResult(file: StaticString = #fileID, line: UInt = #line, - _ body: @escaping (Value) -> Result) -> EventLoopFuture { + _ body: @Sendable @escaping (Value) -> Result) -> EventLoopFuture { return self.flatMapResult(body) } @inlinable @available(*, deprecated, message: "Please don't pass file:line:, there's no point.") - public func recover(file: StaticString = #fileID, line: UInt = #line, _ callback: @escaping (Error) -> Value) -> EventLoopFuture { + public func recover(file: StaticString = #fileID, line: UInt = #line, _ callback: @Sendable @escaping (Error) -> Value) -> EventLoopFuture { return self.recover(callback) } @inlinable @available(*, deprecated, message: "Please don't pass file:line:, there's no point.") - public func and(_ other: EventLoopFuture, + public func and(_ other: EventLoopFuture, file: StaticString = #fileID, line: UInt = #line) -> EventLoopFuture<(Value, OtherValue)> { return self.and(other) @@ -69,7 +69,7 @@ extension EventLoopFuture { @inlinable @available(*, deprecated, message: "Please don't pass file:line:, there's no point.") - public func and(value: OtherValue, + public func and(value: OtherValue, file: StaticString = #fileID, line: UInt = #line) -> EventLoopFuture<(Value, OtherValue)> { return self.and(value: value) diff --git a/Sources/NIOCore/EventLoopFuture+WithEventLoop.swift b/Sources/NIOCore/EventLoopFuture+WithEventLoop.swift index 4d105fa6e5..f0f5321de7 100644 --- a/Sources/NIOCore/EventLoopFuture+WithEventLoop.swift +++ b/Sources/NIOCore/EventLoopFuture+WithEventLoop.swift @@ -41,25 +41,12 @@ extension EventLoopFuture { /// - returns: A future that will receive the eventual value. @inlinable @preconcurrency - public func flatMapWithEventLoop(_ callback: @escaping @Sendable (Value, EventLoop) -> EventLoopFuture) -> EventLoopFuture { - let next = EventLoopPromise.makeUnleakablePromise(eventLoop: self.eventLoop) - self._whenComplete { [eventLoop = self.eventLoop] in - switch self._value! { - case .success(let t): - let futureU = callback(t, eventLoop) - if futureU.eventLoop.inEventLoop { - return futureU._addCallback { - next._setValue(value: futureU._value!) - } - } else { - futureU.cascade(to: next) - return CallbackList() - } - case .failure(let error): - return next._setValue(value: .failure(error)) - } - } - return next.futureResult + public func flatMapWithEventLoop(_ callback: @escaping @Sendable (Value, EventLoop) -> EventLoopFuture) -> EventLoopFuture { + // Is this the same thing and still fast? + let eventLoop = self.eventLoop + return self.flatMap { + callback($0, eventLoop) + }.hop(to: self.eventLoop) } /// When the current `EventLoopFuture` is in an error state, run the provided callback, which @@ -75,20 +62,22 @@ extension EventLoopFuture { /// - returns: A future that will receive the recovered value. @inlinable @preconcurrency - public func flatMapErrorWithEventLoop(_ callback: @escaping @Sendable (Error, EventLoop) -> EventLoopFuture) -> EventLoopFuture { + public func flatMapErrorWithEventLoop(_ callback: @escaping @Sendable (Error, EventLoop) -> EventLoopFuture) -> EventLoopFuture where Value: Sendable { let next = EventLoopPromise.makeUnleakablePromise(eventLoop: self.eventLoop) + let unsafeSelf = UnsafeTransfer(self) + let unsafeNext = UnsafeTransfer(next) self._whenComplete { [eventLoop = self.eventLoop] in - switch self._value! { + switch unsafeSelf.wrappedValue._value! { case .success(let t): - return next._setValue(value: .success(t)) + return unsafeNext.wrappedValue._setValue(value: .success(t)) case .failure(let e): let t = callback(e, eventLoop) if t.eventLoop.inEventLoop { return t._addCallback { - next._setValue(value: t._value!) + unsafeNext.wrappedValue._setValue(value: t._value!) } } else { - t.cascade(to: next) + t.cascade(to: unsafeNext.wrappedValue) return CallbackList() } } @@ -113,16 +102,15 @@ extension EventLoopFuture { /// - with: A function that will be used to fold the values of two `EventLoopFuture`s and return a new value wrapped in an `EventLoopFuture`. /// - returns: A new `EventLoopFuture` with the folded value whose callbacks run on `self.eventLoop`. @inlinable - @preconcurrency public func foldWithEventLoop( _ futures: [EventLoopFuture], with combiningFunction: @escaping @Sendable (Value, OtherValue, EventLoop) -> EventLoopFuture - ) -> EventLoopFuture { - func fold0(eventLoop: EventLoop) -> EventLoopFuture { + ) -> EventLoopFuture where Value: Sendable, OtherValue: Sendable { // This is a breaking change + let fold0: @Sendable (EventLoop) -> EventLoopFuture = { (eventLoop: EventLoop) in let body = futures.reduce(self) { (f1: EventLoopFuture, f2: EventLoopFuture) -> EventLoopFuture in let newFuture = f1.and(f2).flatMap { (args: (Value, OtherValue)) -> EventLoopFuture in let (f1Value, f2Value) = args - self.eventLoop.assertInEventLoop() + eventLoop.assertInEventLoop() return combiningFunction(f1Value, f2Value, eventLoop) } assert(newFuture.eventLoop === self.eventLoop) @@ -132,11 +120,11 @@ extension EventLoopFuture { } if self.eventLoop.inEventLoop { - return fold0(eventLoop: self.eventLoop) + return fold0(self.eventLoop) } else { let promise = self.eventLoop.makePromise(of: Value.self) self.eventLoop.execute { [eventLoop = self.eventLoop] in - fold0(eventLoop: eventLoop).cascade(to: promise) + fold0(eventLoop).cascade(to: promise) } return promise.futureResult } diff --git a/Sources/NIOCore/EventLoopFuture.swift b/Sources/NIOCore/EventLoopFuture.swift index 6c79a08ef9..a4f3af8921 100644 --- a/Sources/NIOCore/EventLoopFuture.swift +++ b/Sources/NIOCore/EventLoopFuture.swift @@ -206,7 +206,7 @@ public struct EventLoopPromise { /// - future: The future whose value will be used to succeed or fail this promise. /// - seealso: `EventLoopFuture.cascade(to:)` @inlinable - public func completeWith(_ future: EventLoopFuture) { + public func completeWith(_ future: EventLoopFuture) where Value: Sendable { future.cascade(to: self) } @@ -225,7 +225,7 @@ public struct EventLoopPromise { /// - parameters: /// - result: The result which will be used to succeed or fail this promise. @inlinable - public func completeWith(_ result: Result) { + public func completeWith(_ result: Result) where Value: Sendable { self._resolve(value: result) } @@ -242,8 +242,10 @@ public struct EventLoopPromise { if self.futureResult.eventLoop.inEventLoop { self._setValue(value: value)._run() } else { + let value = UnsafeTransfer(value) + let unsafeSelf = UnsafeTransfer(self) self.futureResult.eventLoop.execute { - self._setValue(value: value)._run() + unsafeSelf.wrappedValue._setValue(value: value.wrappedValue)._run() } } } @@ -477,7 +479,7 @@ extension EventLoopFuture { /// - returns: A future that will receive the eventual value. @inlinable @preconcurrency - public func flatMap(_ callback: @escaping @Sendable (Value) -> EventLoopFuture) -> EventLoopFuture { + public func flatMap(_ callback: @escaping @Sendable (Value) -> EventLoopFuture) -> EventLoopFuture { self._flatMap(callback) } @usableFromInline typealias FlatMapCallback = @Sendable (Value) -> EventLoopFuture @@ -517,10 +519,11 @@ extension EventLoopFuture { #endif @inlinable - func _flatMap(_ callback: @escaping FlatMapCallback) -> EventLoopFuture { + func _flatMap(_ callback: @escaping FlatMapCallback) -> EventLoopFuture { let next = EventLoopPromise.makeUnleakablePromise(eventLoop: self.eventLoop) + let unsafeSelf = UnsafeTransfer(self) self._whenComplete { - switch self._value! { + switch unsafeSelf.wrappedValue._value! { case .success(let t): let futureU = callback(t) if futureU.eventLoop.inEventLoop { @@ -584,17 +587,19 @@ extension EventLoopFuture { @inlinable func _flatMapThrowing(_ callback: @escaping FlatMapThrowingCallback) -> EventLoopFuture { let next = EventLoopPromise.makeUnleakablePromise(eventLoop: self.eventLoop) + let unsafeNext = UnsafeTransfer(next) + let unsafeSelf = UnsafeTransfer(self) self._whenComplete { - switch self._value! { + switch unsafeSelf.wrappedValue._value! { case .success(let t): do { let r = try callback(t) - return next._setValue(value: .success(r)) + return unsafeNext.wrappedValue._setValue(value: .success(r)) } catch { - return next._setValue(value: .failure(error)) + return unsafeNext.wrappedValue._setValue(value: .failure(error)) } case .failure(let e): - return next._setValue(value: .failure(e)) + return unsafeNext.wrappedValue._setValue(value: .failure(e)) } } return next.futureResult @@ -617,7 +622,7 @@ extension EventLoopFuture { /// - returns: A future that will receive the eventual value or a rethrown error. @inlinable @preconcurrency - public func flatMapErrorThrowing(_ callback: @escaping @Sendable (Error) throws -> Value) -> EventLoopFuture { + public func flatMapErrorThrowing(_ callback: @escaping @Sendable (Error) throws -> Value) -> EventLoopFuture where Value: Sendable { self._flatMapErrorThrowing(callback) } @usableFromInline typealias FlatMapErrorThrowingCallback = @Sendable (Error) throws -> Value @@ -644,7 +649,7 @@ extension EventLoopFuture { #endif @inlinable - func _flatMapErrorThrowing(_ callback: @escaping FlatMapErrorThrowingCallback) -> EventLoopFuture { + func _flatMapErrorThrowing(_ callback: @escaping FlatMapErrorThrowingCallback) -> EventLoopFuture where Value: Sendable { let next = EventLoopPromise.makeUnleakablePromise(eventLoop: self.eventLoop) self._whenComplete { switch self._value! { @@ -740,8 +745,10 @@ extension EventLoopFuture { return self as! EventLoopFuture } else { let next = EventLoopPromise.makeUnleakablePromise(eventLoop: self.eventLoop) + let unsafeNext = UnsafeTransfer(next) + let unsafeSelf = UnsafeTransfer(self) self._whenComplete { - return next._setValue(value: self._value!.map(callback)) + return unsafeNext.wrappedValue._setValue(value: unsafeSelf.wrappedValue._value!.map(callback)) } return next.futureResult } @@ -761,7 +768,7 @@ extension EventLoopFuture { /// - returns: A future that will receive the recovered value. @inlinable @preconcurrency - public func flatMapError(_ callback: @escaping @Sendable (Error) -> EventLoopFuture) -> EventLoopFuture { + public func flatMapError(_ callback: @escaping @Sendable (Error) -> EventLoopFuture) -> EventLoopFuture where Value: Sendable { self._flatMapError(callback) } @usableFromInline typealias FlatMapErrorCallback = @Sendable (Error) -> EventLoopFuture @@ -785,17 +792,19 @@ extension EventLoopFuture { #endif @inlinable - func _flatMapError(_ callback: @escaping FlatMapErrorCallback) -> EventLoopFuture { + func _flatMapError(_ callback: @escaping FlatMapErrorCallback) -> EventLoopFuture where Value: Sendable { let next = EventLoopPromise.makeUnleakablePromise(eventLoop: self.eventLoop) + let unsafeNext = UnsafeTransfer(next) + let unsafeSelf = UnsafeTransfer(self) self._whenComplete { - switch self._value! { + switch unsafeSelf.wrappedValue._value! { case .success(let t): - return next._setValue(value: .success(t)) + return unsafeNext.wrappedValue._setValue(value: .success(t)) case .failure(let e): let t = callback(e) if t.eventLoop.inEventLoop { return t._addCallback { - next._setValue(value: t._value!) + unsafeNext.wrappedValue._setValue(value: t._value!) } } else { t.cascade(to: next) @@ -850,17 +859,19 @@ extension EventLoopFuture { @inlinable func _flatMapResult(_ body: @escaping FlatMapResultCallback) -> EventLoopFuture { let next = EventLoopPromise.makeUnleakablePromise(eventLoop: self.eventLoop) + let unsafeNext = UnsafeTransfer(next) + let unsafeSelf = UnsafeTransfer(self) self._whenComplete { - switch self._value! { + switch unsafeSelf.wrappedValue._value! { case .success(let value): switch body(value) { case .success(let newValue): - return next._setValue(value: .success(newValue)) + return unsafeNext.wrappedValue._setValue(value: .success(newValue)) case .failure(let error): - return next._setValue(value: .failure(error)) + return unsafeNext.wrappedValue._setValue(value: .failure(error)) } case .failure(let e): - return next._setValue(value: .failure(e)) + return unsafeNext.wrappedValue._setValue(value: .failure(e)) } } return next.futureResult @@ -908,12 +919,14 @@ extension EventLoopFuture { @inlinable func _recover(_ callback: @escaping RecoverCallback) -> EventLoopFuture { let next = EventLoopPromise.makeUnleakablePromise(eventLoop: self.eventLoop) + let unsafeNext = UnsafeTransfer(next) + let unsafeSelf = UnsafeTransfer(self) self._whenComplete { - switch self._value! { + switch unsafeSelf.wrappedValue._value! { case .success(let t): - return next._setValue(value: .success(t)) + return unsafeNext.wrappedValue._setValue(value: .success(t)) case .failure(let e): - return next._setValue(value: .success(callback(e))) + return unsafeNext.wrappedValue._setValue(value: .success(callback(e))) } } return next.futureResult @@ -958,8 +971,9 @@ extension EventLoopFuture { if self.eventLoop.inEventLoop { self._addCallback(callback)._run() } else { + let unsafeSelf = UnsafeTransfer(self) self.eventLoop.execute { - self._addCallback(callback)._run() + unsafeSelf.wrappedValue._addCallback(callback)._run() } } } @@ -1001,8 +1015,9 @@ extension EventLoopFuture { @inlinable func _whenSuccess(_ callback: @escaping WhenSuccessCallback) { + let unsafeSelf = UnsafeTransfer(self) self._whenComplete { - if case .success(let t) = self._value! { + if case .success(let t) = unsafeSelf.wrappedValue._value! { callback(t) } return CallbackList() @@ -1046,8 +1061,9 @@ extension EventLoopFuture { @inlinable func _whenFailure(_ callback: @escaping WhenFailureCallback) { + let unsafeSelf = UnsafeTransfer(self) self._whenComplete { - if case .failure(let e) = self._value! { + if case .failure(let e) = unsafeSelf.wrappedValue._value! { callback(e) } return CallbackList() @@ -1080,8 +1096,9 @@ extension EventLoopFuture { #endif @inlinable func _publicWhenComplete(_ callback: @escaping WhenCompleteCallback) { + let unsafeSelf = UnsafeTransfer(self) self._whenComplete { - callback(self._value!) + callback(unsafeSelf.wrappedValue._value!) return CallbackList() } } @@ -1108,18 +1125,20 @@ extension EventLoopFuture { /// of results. If either one fails, the combined `EventLoopFuture` will fail with /// the first error encountered. @inlinable - public func and(_ other: EventLoopFuture) -> EventLoopFuture<(Value, OtherValue)> { + public func and(_ other: EventLoopFuture) -> EventLoopFuture<(Value, OtherValue)> { let promise = EventLoopPromise<(Value, OtherValue)>.makeUnleakablePromise(eventLoop: self.eventLoop) let box: UnsafeMutableTransferBox<(t:Value?, u: OtherValue?)> = .init((nil, nil)) + let unsafeSelf = UnsafeTransfer(self) + let unsafePromise = UnsafeTransfer(promise) assert(self.eventLoop === promise.futureResult.eventLoop) self._whenComplete { () -> CallbackList in - switch self._value! { + switch unsafeSelf.wrappedValue._value! { case .failure(let error): - return promise._setValue(value: .failure(error)) + return unsafePromise.wrappedValue._setValue(value: .failure(error)) case .success(let t): if let u = box.wrappedValue.u { - return promise._setValue(value: .success((t, u))) + return unsafePromise.wrappedValue._setValue(value: .success((t, u))) } else { box.wrappedValue.t = t } @@ -1129,13 +1148,13 @@ extension EventLoopFuture { let hopOver = other.hop(to: self.eventLoop) hopOver._whenComplete { () -> CallbackList in - self.eventLoop.assertInEventLoop() + unsafeSelf.wrappedValue.eventLoop.assertInEventLoop() switch other._value! { case .failure(let error): - return promise._setValue(value: .failure(error)) + return unsafePromise.wrappedValue._setValue(value: .failure(error)) case .success(let u): if let t = box.wrappedValue.t { - return promise._setValue(value: .success((t, u))) + return unsafePromise.wrappedValue._setValue(value: .success((t, u))) } else { box.wrappedValue.u = u } @@ -1149,7 +1168,7 @@ extension EventLoopFuture { /// Return a new EventLoopFuture that contains this "and" another value. /// This is just syntactic sugar for `future.and(loop.makeSucceedFuture(value))`. @inlinable - public func and(value: OtherValue) -> EventLoopFuture<(Value, OtherValue)> { + public func and(value: OtherValue) -> EventLoopFuture<(Value, OtherValue)> { return self.and(EventLoopFuture(eventLoop: self.eventLoop, value: value)) } } @@ -1179,7 +1198,7 @@ extension EventLoopFuture { /// - Parameter to: The `EventLoopPromise` to fulfill with the results of this future. /// - SeeAlso: `EventLoopPromise.completeWith(_:)` @inlinable - public func cascade(to promise: EventLoopPromise?) { + public func cascade(to promise: EventLoopPromise?) where Value: Sendable { guard let promise = promise else { return } self.whenComplete { result in switch result { @@ -1202,7 +1221,7 @@ extension EventLoopFuture { /// /// - Parameter to: The `EventLoopPromise` to fulfill when a successful result is available. @inlinable - public func cascadeSuccess(to promise: EventLoopPromise?) { + public func cascadeSuccess(to promise: EventLoopPromise?) where Value: Sendable { guard let promise = promise else { return } self.whenSuccess { promise.succeed($0) } } @@ -1214,7 +1233,7 @@ extension EventLoopFuture { /// /// - Parameter to: The `EventLoopPromise` that should fail with the error of this `EventLoopFuture`. @inlinable - public func cascadeFailure(to promise: EventLoopPromise?) { + public func cascadeFailure(to promise: EventLoopPromise?) { guard let promise = promise else { return } self.whenFailure { promise.fail($0) } } @@ -1267,10 +1286,11 @@ extension EventLoopFuture { self.eventLoop._preconditionSafeToWait(file: file, line: line) let v: UnsafeMutableTransferBox?> = .init(nil) + let unsafeSelf = UnsafeTransfer(self) let lock = ConditionLock(value: 0) self._whenComplete { () -> CallbackList in lock.lock() - v.wrappedValue = self._value + v.wrappedValue = unsafeSelf.wrappedValue._value lock.unlock(withValue: 1) return CallbackList() } @@ -1308,10 +1328,10 @@ extension EventLoopFuture { /// - returns: A new `EventLoopFuture` with the folded value whose callbacks run on `self.eventLoop`. @inlinable @preconcurrency - public func fold( + public func fold( _ futures: [EventLoopFuture], with combiningFunction: @escaping @Sendable (Value, OtherValue) -> EventLoopFuture - ) -> EventLoopFuture { + ) -> EventLoopFuture where Value: Sendable { self._fold(futures, with: combiningFunction) } @usableFromInline typealias FoldCallback = @Sendable (Value, OtherValue) -> EventLoopFuture @@ -1343,11 +1363,11 @@ extension EventLoopFuture { #endif @inlinable - func _fold( + func _fold( _ futures: [EventLoopFuture], with combiningFunction: @escaping FoldCallback - ) -> EventLoopFuture { - func fold0() -> EventLoopFuture { + ) -> EventLoopFuture where Value: Sendable { + let fold0: @Sendable () -> EventLoopFuture = { let body = futures.reduce(self) { (f1: EventLoopFuture, f2: EventLoopFuture) -> EventLoopFuture in let newFuture = f1.and(f2).flatMap { (args: (Value, OtherValue)) -> EventLoopFuture in let (f1Value, f2Value) = args @@ -1397,12 +1417,12 @@ extension EventLoopFuture { /// - returns: A new `EventLoopFuture` with the reduced value. @preconcurrency @inlinable - public static func reduce( + public static func reduce( _ initialResult: Value, _ futures: [EventLoopFuture], on eventLoop: EventLoop, _ nextPartialResult: @escaping @Sendable (Value, InputValue) -> Value - ) -> EventLoopFuture { + ) -> EventLoopFuture where Value: Sendable { Self._reduce(initialResult, futures, on: eventLoop, nextPartialResult) } @usableFromInline typealias ReduceCallback = @Sendable (Value, InputValue) -> Value @@ -1439,12 +1459,12 @@ extension EventLoopFuture { #endif @inlinable - static func _reduce( + static func _reduce( _ initialResult: Value, _ futures: [EventLoopFuture], on eventLoop: EventLoop, _ nextPartialResult: @escaping ReduceCallback - ) -> EventLoopFuture { + ) -> EventLoopFuture where Value: Sendable { let f0 = eventLoop.makeSucceededFuture(initialResult) let body = f0.fold(futures) { (t: Value, u: InputValue) -> EventLoopFuture in @@ -1474,12 +1494,12 @@ extension EventLoopFuture { /// - returns: A new `EventLoopFuture` with the combined value. @inlinable @preconcurrency - public static func reduce( + public static func reduce( into initialResult: Value, _ futures: [EventLoopFuture], on eventLoop: EventLoop, _ updateAccumulatingResult: @escaping @Sendable (inout Value, InputValue) -> Void - ) -> EventLoopFuture { + ) -> EventLoopFuture where Value: Sendable { Self._reduce(into: initialResult, futures, on: eventLoop, updateAccumulatingResult) } @usableFromInline typealias ReduceIntoCallback = @Sendable (inout Value, InputValue) -> Void @@ -1514,25 +1534,26 @@ extension EventLoopFuture { #endif @inlinable - static func _reduce( + static func _reduce( into initialResult: Value, _ futures: [EventLoopFuture], on eventLoop: EventLoop, _ updateAccumulatingResult: @escaping ReduceIntoCallback - ) -> EventLoopFuture { + ) -> EventLoopFuture where Value: Sendable { let p0 = eventLoop.makePromise(of: Value.self) - var value: Value = initialResult + + let value = UnsafeMutableTransferBox(initialResult) let f0 = eventLoop.makeSucceededFuture(()) let future = f0.fold(futures) { (_: (), newValue: InputValue) -> EventLoopFuture in eventLoop.assertInEventLoop() - updateAccumulatingResult(&value, newValue) + updateAccumulatingResult(&value.wrappedValue, newValue) return eventLoop.makeSucceededFuture(()) } future.whenSuccess { eventLoop.assertInEventLoop() - p0.succeed(value) + p0.succeed(value.wrappedValue) } future.whenFailure { (error) in eventLoop.assertInEventLoop() @@ -1557,7 +1578,7 @@ extension EventLoopFuture { /// - on: The `EventLoop` on which the new `EventLoopFuture` callbacks will execute on. /// - Returns: A new `EventLoopFuture` that waits for the other futures to succeed. @inlinable - public static func andAllSucceed(_ futures: [EventLoopFuture], on eventLoop: EventLoop) -> EventLoopFuture { + public static func andAllSucceed(_ futures: [EventLoopFuture], on eventLoop: EventLoop) -> EventLoopFuture where Value: Sendable { let promise = eventLoop.makePromise(of: Void.self) EventLoopFuture.andAllSucceed(futures, promise: promise) return promise.futureResult @@ -1572,7 +1593,7 @@ extension EventLoopFuture { /// - futures: An array of homogenous `EventLoopFutures`s to wait for. /// - promise: The `EventLoopPromise` to complete with the result of this call. @inlinable - public static func andAllSucceed(_ futures: [EventLoopFuture], promise: EventLoopPromise) { + public static func andAllSucceed(_ futures: [EventLoopFuture], promise: EventLoopPromise) where Value: Sendable { let eventLoop = promise.futureResult.eventLoop if eventLoop.inEventLoop { @@ -1592,7 +1613,7 @@ extension EventLoopFuture { /// - futures: An array of homogenous `EventLoopFuture`s to wait on for fulfilled values. /// - on: The `EventLoop` on which the new `EventLoopFuture` callbacks will fire. /// - Returns: A new `EventLoopFuture` with all of the values fulfilled by the provided futures. - public static func whenAllSucceed(_ futures: [EventLoopFuture], on eventLoop: EventLoop) -> EventLoopFuture<[Value]> { + public static func whenAllSucceed(_ futures: [EventLoopFuture], on eventLoop: EventLoop) -> EventLoopFuture<[Value]> where Value: Sendable { let promise = eventLoop.makePromise(of: [Value].self) EventLoopFuture.whenAllSucceed(futures, promise: promise) return promise.futureResult @@ -1606,7 +1627,7 @@ extension EventLoopFuture { /// - Parameters: /// - futures: An array of homogenous `EventLoopFutures`s to wait for. /// - promise: The `EventLoopPromise` to complete with the result of this call. - public static func whenAllSucceed(_ futures: [EventLoopFuture], promise: EventLoopPromise<[Value]>) { + public static func whenAllSucceed(_ futures: [EventLoopFuture], promise: EventLoopPromise<[Value]>) where Value: Sendable { let eventLoop = promise.futureResult.eventLoop let reduced = eventLoop.makePromise(of: Void.self) @@ -1652,29 +1673,29 @@ extension EventLoopFuture { /// Once all the futures have succeed, the provided promise will succeed. /// Once any future fails, the provided promise will fail. @inlinable - internal static func _reduceSuccesses0( + internal static func _reduceSuccesses0( _ promise: EventLoopPromise, _ futures: [EventLoopFuture], _ eventLoop: EventLoop, onValue: @escaping ReduceSuccessCallback - ) { + ) where Value: Sendable { eventLoop.assertInEventLoop() - var remainingCount = futures.count + let remainingCount = UnsafeMutableTransferBox(futures.count) // This is stupid - if remainingCount == 0 { + if futures.count == 0 { promise.succeed(()) return } // Sends the result to `onValue` in case of success and succeeds/fails the input promise, if appropriate. - func processResult(_ index: Int, _ result: Result) { + let processResult: @Sendable (Int, Result) -> Void = { index, result in switch result { case .success(let result): onValue(index, result) - remainingCount -= 1 + remainingCount.wrappedValue -= 1 - if remainingCount == 0 { + if remainingCount.wrappedValue == 0 { promise.succeed(()) } case .failure(let error): @@ -1732,12 +1753,13 @@ extension EventLoopFuture { @inlinable public static func andAllComplete(_ futures: [EventLoopFuture], promise: EventLoopPromise) { let eventLoop = promise.futureResult.eventLoop + let voidedFutures = futures.map { $0.map { _ in () } } if eventLoop.inEventLoop { - self._reduceCompletions0(promise, futures, eventLoop, onResult: { _, _ in }) + self._reduceCompletions0(promise, voidedFutures, eventLoop, onResult: { _, _ in }) } else { eventLoop.execute { - self._reduceCompletions0(promise, futures, eventLoop, onResult: { _, _ in }) + self._reduceCompletions0(promise, voidedFutures, eventLoop, onResult: { _, _ in }) } } } @@ -1755,7 +1777,7 @@ extension EventLoopFuture { /// - Returns: A new `EventLoopFuture` with all the results of the provided futures. @inlinable public static func whenAllComplete(_ futures: [EventLoopFuture], - on eventLoop: EventLoop) -> EventLoopFuture<[Result]> { + on eventLoop: EventLoop) -> EventLoopFuture<[Result]> where Value: Sendable { let promise = eventLoop.makePromise(of: [Result].self) EventLoopFuture.whenAllComplete(futures, promise: promise) return promise.futureResult @@ -1769,8 +1791,10 @@ extension EventLoopFuture { /// - futures: An array of homogenous `EventLoopFuture`s to gather results from. /// - promise: The `EventLoopPromise` to complete with the result of the futures. @inlinable - public static func whenAllComplete(_ futures: [EventLoopFuture], - promise: EventLoopPromise<[Result]>) { + public static func whenAllComplete( + _ futures: [EventLoopFuture], + promise: EventLoopPromise<[Result]> + ) where Value: Sendable { let eventLoop = promise.futureResult.eventLoop let reduced = eventLoop.makePromise(of: Void.self) @@ -1820,7 +1844,7 @@ extension EventLoopFuture { /// /// Once all the futures have completed, the provided promise will succeed. @inlinable - internal static func _reduceCompletions0( + internal static func _reduceCompletions0( _ promise: EventLoopPromise, _ futures: [EventLoopFuture], _ eventLoop: EventLoop, @@ -1828,19 +1852,19 @@ extension EventLoopFuture { ) { eventLoop.assertInEventLoop() - var remainingCount = futures.count + let remainingCount = UnsafeMutableTransferBox(futures.count) - if remainingCount == 0 { + if remainingCount.wrappedValue == 0 { promise.succeed(()) return } // Sends the result to `onResult` in case of success and succeeds the input promise, if appropriate. - func processResult(_ index: Int, _ result: Result) { + let processResult: @Sendable (Int, Result) -> Void = { index, result in onResult(index, result) - remainingCount -= 1 + remainingCount.wrappedValue -= 1 - if remainingCount == 0 { + if remainingCount.wrappedValue == 0 { promise.succeed(()) } } @@ -1875,7 +1899,7 @@ extension EventLoopFuture { /// - to: The `EventLoop` that the returned `EventLoopFuture` will run on. /// - returns: An `EventLoopFuture` whose callbacks run on `target` instead of the original loop. @inlinable - public func hop(to target: EventLoop) -> EventLoopFuture { + public func hop(to target: EventLoop) -> EventLoopFuture where Value: Sendable { if target === self.eventLoop { // We're already on that event loop, nothing to do here. Save an allocation. return self @@ -1966,7 +1990,7 @@ extension EventLoopFuture { /// - orReplace: the value of the returned `EventLoopFuture` when then resolved future's value is `Optional.some()`. /// - returns: an new `EventLoopFuture` with new type parameter `NewValue` and the value passed in the `orReplace` parameter. @inlinable - public func unwrap(orReplace replacement: NewValue) -> EventLoopFuture where Value == Optional { + public func unwrap(orReplace replacement: NewValue) -> EventLoopFuture where Value == Optional { return self.map { (value) -> NewValue in guard let value = value else { return replacement @@ -2051,10 +2075,10 @@ extension EventLoopFuture { /// a new `EventLoopFuture`. @inlinable @preconcurrency - public func flatMapBlocking( + public func flatMapBlocking( onto queue: DispatchQueue, _ callbackMayBlock: @escaping @Sendable (Value) throws -> NewValue - ) -> EventLoopFuture { + ) -> EventLoopFuture where Value: Sendable { self._flatMapBlocking(onto: queue, callbackMayBlock) } @usableFromInline typealias FlatMapBlockingCallback = @Sendable (Value) throws -> NewValue @@ -2080,10 +2104,10 @@ extension EventLoopFuture { #endif @inlinable - func _flatMapBlocking( + func _flatMapBlocking( onto queue: DispatchQueue, _ callbackMayBlock: @escaping FlatMapBlockingCallback - ) -> EventLoopFuture { + ) -> EventLoopFuture where Value: Sendable { return self.flatMap { result in queue.asyncWithFuture(eventLoop: self.eventLoop) { try callbackMayBlock(result) } } @@ -2101,7 +2125,7 @@ extension EventLoopFuture { /// - onto: the `DispatchQueue` on which the blocking IO / task specified by `callbackMayBlock` is scheduled. /// - callbackMayBlock: The callback that is called with the successful result of the `EventLoopFuture`. @inlinable - public func whenSuccessBlocking(onto queue: DispatchQueue, _ callbackMayBlock: @escaping (Value) -> Void) { + public func whenSuccessBlocking(onto queue: DispatchQueue, _ callbackMayBlock: @Sendable @escaping (Value) -> Void) where Value: Sendable { self.whenSuccess { value in queue.async { callbackMayBlock(value) } } @@ -2161,7 +2185,7 @@ extension EventLoopFuture { /// - callbackMayBlock: The callback that is called when the `EventLoopFuture` is fulfilled. @inlinable @preconcurrency - public func whenCompleteBlocking(onto queue: DispatchQueue, _ callbackMayBlock: @escaping @Sendable (Result) -> Void) { + public func whenCompleteBlocking(onto queue: DispatchQueue, _ callbackMayBlock: @escaping @Sendable (Result) -> Void) where Value: Sendable { self._whenCompleteBlocking(onto: queue, callbackMayBlock) } @usableFromInline typealias WhenCompleteBlocking = @Sendable (Result) -> Void @@ -2180,7 +2204,7 @@ extension EventLoopFuture { #endif @inlinable - func _whenCompleteBlocking(onto queue: DispatchQueue, _ callbackMayBlock: @escaping WhenCompleteBlocking) { + func _whenCompleteBlocking(onto queue: DispatchQueue, _ callbackMayBlock: @escaping WhenCompleteBlocking) where Value: Sendable { self.whenComplete { value in queue.async { callbackMayBlock(value) } }