Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove @preconcurrency from internal EventLoopFuture methods #2135

Merged
merged 10 commits into from
Jun 7, 2022
129 changes: 37 additions & 92 deletions Sources/NIOCore/EventLoopFuture.swift
Original file line number Diff line number Diff line change
Expand Up @@ -920,22 +920,13 @@ extension EventLoopFuture {
}

#if swift(>=5.7)
@inlinable
@preconcurrency // TODO(davidnadoba): remove @preconcurrency and fix our internal use sites
internal func _addCallback(_ callback: @escaping AddCallbackCallback) -> CallbackList {
self.__addCallback(callback)
}
@usableFromInline typealias AddCallbackCallback = @Sendable () -> CallbackList
#else
@inlinable
internal func _addCallback(_ callback: @escaping AddCallbackCallback) -> CallbackList {
self.__addCallback(callback)
}
@usableFromInline typealias AddCallbackCallback = () -> CallbackList
#endif
/// Add a callback. If there's already a value, invoke it and return the resulting list of new callback functions.
@inlinable
internal func __addCallback(_ callback: @escaping AddCallbackCallback) -> CallbackList {
internal func _addCallback(_ callback: @escaping AddCallbackCallback) -> CallbackList {
self.eventLoop.assertInEventLoop()
if self._value == nil {
self._callbacks.append(callback)
Expand All @@ -945,24 +936,14 @@ extension EventLoopFuture {
}

#if swift(>=5.7)
/// Add a callback. If there's already a value, run as much of the chain as we can.
@inlinable
@preconcurrency // TODO(davidnadoba): remove @preconcurrency and fix our internal use sites
internal func _whenComplete(_ callback: @escaping InternalWhenCompleteCallback) {
self._internalWhenComplete(callback)
}
@usableFromInline typealias InternalWhenCompleteCallback = @Sendable () -> CallbackList
#else
/// Add a callback. If there's already a value, run as much of the chain as we can.
@inlinable
internal func _whenComplete(_ callback: @escaping InternalWhenCompleteCallback) {
self._internalWhenComplete(callback)
}
@usableFromInline typealias InternalWhenCompleteCallback = () -> CallbackList
#endif

/// Add a callback. If there's already a value, run as much of the chain as we can.
@inlinable
internal func _internalWhenComplete(_ callback: @escaping InternalWhenCompleteCallback) {
internal func _whenComplete(_ callback: @escaping InternalWhenCompleteCallback) {
if self.eventLoop.inEventLoop {
self._addCallback(callback)._run()
} else {
Expand Down Expand Up @@ -1118,19 +1099,18 @@ extension EventLoopFuture {
@inlinable
public func and<OtherValue>(_ other: EventLoopFuture<OtherValue>) -> EventLoopFuture<(Value, OtherValue)> {
let promise = EventLoopPromise<(Value, OtherValue)>.makeUnleakablePromise(eventLoop: self.eventLoop)
var tvalue: Value?
var uvalue: OtherValue?
let box: UnsafeMutableTransferBox<(t:Value?, u: OtherValue?)> = .init((nil, nil))

assert(self.eventLoop === promise.futureResult.eventLoop)
self._whenComplete { () -> CallbackList in
switch self._value! {
case .failure(let error):
return promise._setValue(value: .failure(error))
case .success(let t):
if let u = uvalue {
if let u = box.wrappedValue.u {
return promise._setValue(value: .success((t, u)))
} else {
tvalue = t
box.wrappedValue.t = t
}
}
return CallbackList()
Expand All @@ -1143,10 +1123,10 @@ extension EventLoopFuture {
case .failure(let error):
return promise._setValue(value: .failure(error))
case .success(let u):
if let t = tvalue {
if let t = box.wrappedValue.t {
return promise._setValue(value: .success((t, u)))
} else {
uvalue = u
box.wrappedValue.u = u
}
}
return CallbackList()
Expand Down Expand Up @@ -1247,18 +1227,18 @@ extension EventLoopFuture {
public func wait(file: StaticString = #file, line: UInt = #line) throws -> Value {
self.eventLoop._preconditionSafeToWait(file: file, line: line)

var v: Result<Value, Error>? = nil
let v: UnsafeMutableTransferBox<Result<Value, Error>?> = .init(nil)
let lock = ConditionLock(value: 0)
self._whenComplete { () -> CallbackList in
lock.lock()
v = self._value
v.wrappedValue = self._value
lock.unlock(withValue: 1)
return CallbackList()
}
lock.lock(whenValue: 1)
lock.unlock()

switch(v!) {
switch(v.wrappedValue!) {
case .success(let result):
return result
case .failure(let error):
Expand Down Expand Up @@ -1591,10 +1571,16 @@ extension EventLoopFuture {
let eventLoop = promise.futureResult.eventLoop
let reduced = eventLoop.makePromise(of: Void.self)

var results: [Value?] = .init(repeating: nil, count: futures.count)
let results: UnsafeMutableTransferBox<[Value?]> = .init(.init(repeating: nil, count: futures.count))
#if swift(>=5.7)
let callback = { @Sendable (index: Int, result: Value) in
results.wrappedValue[index] = result
}
#else
let callback = { (index: Int, result: Value) in
results[index] = result
results.wrappedValue[index] = result
}
#endif

if eventLoop.inEventLoop {
self._reduceSuccesses0(reduced, futures, eventLoop, onValue: callback)
Expand All @@ -1608,50 +1594,26 @@ extension EventLoopFuture {
switch result {
case .success:
// verify that all operations have been completed
assert(!results.contains(where: { $0 == nil }))
promise.succeed(results.map { $0! })
assert(!results.wrappedValue.contains(where: { $0 == nil }))
promise.succeed(results.wrappedValue.map { $0! })
case .failure(let error):
promise.fail(error)
}
}
}

#if swift(>=5.7)
/// Loops through the futures array and attaches callbacks to execute `onValue` on the provided `EventLoop` when
/// they succeed. The `onValue` will receive the index of the future that fulfilled the provided `Result`.
///
/// Once all the futures have succeed, the provided promise will succeed.
/// Once any future fails, the provided promise will fail.
@inlinable
@preconcurrency // TODO(davidnadoba): remove @preconcurrency and fix our internal use sites
internal static func _reduceSuccesses0<InputValue>(
_ promise: EventLoopPromise<Void>,
_ futures: [EventLoopFuture<InputValue>],
_ eventLoop: EventLoop,
onValue: @escaping @Sendable (Int, InputValue) -> Void
) {
Self.__reduceSuccesses0(promise, futures, eventLoop, onValue: onValue)
}
@usableFromInline typealias ReduceSuccessCallback<InputValue> = @Sendable (Int, InputValue) -> Void
#else
@usableFromInline typealias ReduceSuccessCallback<InputValue> = (Int, InputValue) -> Void
#endif
/// Loops through the futures array and attaches callbacks to execute `onValue` on the provided `EventLoop` when
/// they succeed. The `onValue` will receive the index of the future that fulfilled the provided `Result`.
///
/// Once all the futures have succeed, the provided promise will succeed.
/// Once any future fails, the provided promise will fail.
@inlinable
internal static func _reduceSuccesses0<InputValue>(
_ promise: EventLoopPromise<Void>,
_ futures: [EventLoopFuture<InputValue>],
_ eventLoop: EventLoop,
onValue: @escaping (Int, InputValue) -> Void
) {
Self.__reduceSuccesses0(promise, futures, eventLoop, onValue: onValue)
}
@usableFromInline typealias ReduceSuccessCallback<InputValue> = (Int, InputValue) -> Void
#endif
@inlinable
internal static func __reduceSuccesses0<InputValue>(
_ promise: EventLoopPromise<Void>,
_ futures: [EventLoopFuture<InputValue>],
_ eventLoop: EventLoop,
Expand Down Expand Up @@ -1772,11 +1734,17 @@ extension EventLoopFuture {
promise: EventLoopPromise<[Result<Value, Error>]>) {
let eventLoop = promise.futureResult.eventLoop
let reduced = eventLoop.makePromise(of: Void.self)

var results: [Result<Value, Error>] = .init(repeating: .failure(OperationPlaceholderError()), count: futures.count)

let results: UnsafeMutableTransferBox<[Result<Value, Error>]> = .init(.init(repeating: .failure(OperationPlaceholderError()), count: futures.count))
#if swift(>=5.7)
let callback = { @Sendable (index: Int, result: Result<Value, Error>) in
results.wrappedValue[index] = result
}
#else
let callback = { (index: Int, result: Result<Value, Error>) in
results[index] = result
results.wrappedValue[index] = result
}
#endif

if eventLoop.inEventLoop {
self._reduceCompletions0(reduced, futures, eventLoop, onResult: callback)
Expand All @@ -1790,11 +1758,11 @@ extension EventLoopFuture {
switch result {
case .success:
// verify that all operations have been completed
assert(!results.contains(where: {
assert(!results.wrappedValue.contains(where: {
guard case let .failure(error) = $0 else { return false }
return error is OperationPlaceholderError
}))
promise.succeed(results)
promise.succeed(results.wrappedValue)

case .failure(let error):
promise.fail(error)
Expand All @@ -1803,40 +1771,17 @@ extension EventLoopFuture {
}

#if swift(>=5.7)
/// Loops through the futures array and attaches callbacks to execute `onResult` on the provided `EventLoop` when
/// they complete. The `onResult` will receive the index of the future that fulfilled the provided `Result`.
///
/// Once all the futures have completed, the provided promise will succeed.
@inlinable
@preconcurrency // TODO(davidnadoba): remove @preconcurrency and fix our internal use sites
internal static func _reduceCompletions0<InputValue>(
_ promise: EventLoopPromise<Void>,
_ futures: [EventLoopFuture<InputValue>],
_ eventLoop: EventLoop,
onResult: @escaping @Sendable (Int, Result<InputValue, Error>) -> Void
) {
Self.__reduceCompletions0(promise, futures, eventLoop, onResult: onResult)
}
@usableFromInline typealias ReduceCompletions<InputValue> = @Sendable (Int, Result<InputValue, Error>) -> Void
#else
@usableFromInline typealias ReduceCompletions<InputValue> = (Int, Result<InputValue, Error>) -> Void
#endif

/// Loops through the futures array and attaches callbacks to execute `onResult` on the provided `EventLoop` when
/// they complete. The `onResult` will receive the index of the future that fulfilled the provided `Result`.
///
/// Once all the futures have completed, the provided promise will succeed.
@inlinable
internal static func _reduceCompletions0<InputValue>(
_ promise: EventLoopPromise<Void>,
_ futures: [EventLoopFuture<InputValue>],
_ eventLoop: EventLoop,
onResult: @escaping (Int, Result<InputValue, Error>) -> Void
) {
Self.__reduceCompletions0(promise, futures, eventLoop, onResult: onResult)
}
@usableFromInline typealias ReduceCompletions<InputValue> = (Int, Result<InputValue, Error>) -> Void
#endif

@inlinable
internal static func __reduceCompletions0<InputValue>(
_ promise: EventLoopPromise<Void>,
_ futures: [EventLoopFuture<InputValue>],
_ eventLoop: EventLoop,
Expand Down
30 changes: 30 additions & 0 deletions Sources/NIOCore/NIOSendable.swift
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,33 @@ struct UnsafeTransfer<Wrapped>: @unchecked Sendable {
extension UnsafeTransfer: Equatable where Wrapped: Equatable {}
extension UnsafeTransfer: Hashable where Wrapped: Hashable {}
#endif

#if swift(>=5.5) && canImport(_Concurrency)
/// ``UnsafeMutableTransferBox`` can be used to make non-`Sendable` values `Sendable` and mutable.
/// It can be used to capture local mutable values in a `@Sendable` closure and mutate them from within the closure.
/// As the name implies, the usage of this is unsafe because it disables the sendable checking of the compiler and does not add any synchronisation.
@usableFromInline
final class UnsafeMutableTransferBox<Wrapped>: @unchecked Sendable {
@usableFromInline
var wrappedValue: Wrapped

@inlinable
init(_ wrappedValue: Wrapped) {
self.wrappedValue = wrappedValue
}
}
#else
/// ``UnsafeMutableTransferBox`` can be used to make non-`Sendable` values `Sendable` and mutable.
/// It can be used to capture local mutable values in a `@Sendable` closure and mutate them from within the closure.
/// As the name implies, the usage of this is unsafe because it disables the sendable checking of the compiler and does not add any synchronisation.
@usableFromInline
final class UnsafeMutableTransferBox<Wrapped> {
@usableFromInline
var wrappedValue: Wrapped

@inlinable
init(_ wrappedValue: Wrapped) {
self.wrappedValue = wrappedValue
}
}
#endif
2 changes: 1 addition & 1 deletion docker/docker-compose.1804.54.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ services:
- MAX_ALLOCS_ALLOWED_encode_1000_ws_frames_new_buffer_with_space=3050
- MAX_ALLOCS_ALLOWED_execute_hop_10000_tasks=0
- MAX_ALLOCS_ALLOWED_future_erase_result=4050
- MAX_ALLOCS_ALLOWED_future_lots_of_callbacks=60050
- MAX_ALLOCS_ALLOWED_future_lots_of_callbacks=56050
- MAX_ALLOCS_ALLOWED_get_100000_headers_canonical_form=700050
- MAX_ALLOCS_ALLOWED_get_100000_headers_canonical_form_trimming_whitespace=700050
- MAX_ALLOCS_ALLOWED_get_100000_headers_canonical_form_trimming_whitespace_from_long_string=700050
Expand Down
2 changes: 1 addition & 1 deletion docker/docker-compose.2004.55.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ services:
- MAX_ALLOCS_ALLOWED_encode_1000_ws_frames_new_buffer_with_space=3050
- MAX_ALLOCS_ALLOWED_execute_hop_10000_tasks=0
- MAX_ALLOCS_ALLOWED_future_erase_result=4050
- MAX_ALLOCS_ALLOWED_future_lots_of_callbacks=60050
- MAX_ALLOCS_ALLOWED_future_lots_of_callbacks=56050
- MAX_ALLOCS_ALLOWED_get_100000_headers_canonical_form=700050
- MAX_ALLOCS_ALLOWED_get_100000_headers_canonical_form_trimming_whitespace=700050
- MAX_ALLOCS_ALLOWED_get_100000_headers_canonical_form_trimming_whitespace_from_long_string=700050
Expand Down
2 changes: 1 addition & 1 deletion docker/docker-compose.2004.56.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ services:
- MAX_ALLOCS_ALLOWED_encode_1000_ws_frames_new_buffer_with_space=3050
- MAX_ALLOCS_ALLOWED_execute_hop_10000_tasks=0
- MAX_ALLOCS_ALLOWED_future_erase_result=4050
- MAX_ALLOCS_ALLOWED_future_lots_of_callbacks=59050
- MAX_ALLOCS_ALLOWED_future_lots_of_callbacks=55050
- MAX_ALLOCS_ALLOWED_get_100000_headers_canonical_form=700050
- MAX_ALLOCS_ALLOWED_get_100000_headers_canonical_form_trimming_whitespace=700050
- MAX_ALLOCS_ALLOWED_get_100000_headers_canonical_form_trimming_whitespace_from_long_string=700050
Expand Down
2 changes: 1 addition & 1 deletion docker/docker-compose.2004.57.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ services:
- MAX_ALLOCS_ALLOWED_encode_1000_ws_frames_new_buffer_with_space=3050
- MAX_ALLOCS_ALLOWED_execute_hop_10000_tasks=0
- MAX_ALLOCS_ALLOWED_future_erase_result=4050
- MAX_ALLOCS_ALLOWED_future_lots_of_callbacks=59050
- MAX_ALLOCS_ALLOWED_future_lots_of_callbacks=55050
- MAX_ALLOCS_ALLOWED_get_100000_headers_canonical_form=700050
- MAX_ALLOCS_ALLOWED_get_100000_headers_canonical_form_trimming_whitespace=700050
- MAX_ALLOCS_ALLOWED_get_100000_headers_canonical_form_trimming_whitespace_from_long_string=700050
Expand Down
2 changes: 1 addition & 1 deletion docker/docker-compose.2004.main.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ services:
- MAX_ALLOCS_ALLOWED_encode_1000_ws_frames_new_buffer_with_space=3050
- MAX_ALLOCS_ALLOWED_execute_hop_10000_tasks=0
- MAX_ALLOCS_ALLOWED_future_erase_result=4050
- MAX_ALLOCS_ALLOWED_future_lots_of_callbacks=59050
- MAX_ALLOCS_ALLOWED_future_lots_of_callbacks=55050
- MAX_ALLOCS_ALLOWED_get_100000_headers_canonical_form=700050
- MAX_ALLOCS_ALLOWED_get_100000_headers_canonical_form_trimming_whitespace=700050
- MAX_ALLOCS_ALLOWED_get_100000_headers_canonical_form_trimming_whitespace_from_long_string=700050
Expand Down