Permalink
Browse files

Rename `ELF.andAll(_:eventLoop:)` to `andAllSucceed(_🔛)` and add `ELF…

….andAllComplete` (#803)

Motiviation:

After adding `ELF.whenAllComplete` the concept of "fail fast" and "fail slow" for reducing an array of future results together was introduced.

This commit adds that concepts with the `andAll* methods that act as simple completion notifications.

Modifications:

Rename `EventLoopFuture.andAll(_:eventLoop:)` to `andAllSucceed(_🔛)` to denote its "fail fast" nature, and to match Swift API guidelines.

Add new `EventLoopFuture.andAllComplete(_🔛)` for a "fail slow" companion.

Shift implementation of `whenAllComplete(_🔛)` to be usable without unnecessary allocations in `andAllComplete`

Result:

EventLoopFuture now has two methods for "flattening" arrays of EventLoopFuture into a single notification ELF
  • Loading branch information...
Mordil authored and Lukasa committed Feb 6, 2019
1 parent c6c137c commit 06fb45d07e87f580daa48128deef9c003a135eb2
@@ -903,8 +903,7 @@ extension ChannelPipeline {
if first {
handlers = handlers.reversed()
}

return EventLoopFuture<Void>.andAll(handlers.map { add(handler: $0, first: first) }, eventLoop: eventLoop)
return .andAllSucceed(handlers.map { add(handler: $0, first: first) }, on: eventLoop)
}

/// Adds the provided channel handlers to the pipeline in the order given, taking account
@@ -880,22 +880,6 @@ extension EventLoopFuture {
}

extension EventLoopFuture {
/// Returns a new `EventLoopFuture` that fires only when all the provided futures complete.
///
/// This extension is only available when you have a collection of `EventLoopFuture`s that do not provide
/// result data: that is, they are completion notifiers. In this case, you can wait for all of them. The
/// returned `EventLoopFuture` will fail as soon as any of the futures fails: otherwise, it will succeed
/// only when all of them do.
///
/// - parameters:
/// - futures: An array of `EventLoopFuture<Void>` to wait for.
/// - eventLoop: The `EventLoop` on which the new `EventLoopFuture` callbacks will fire.
/// - returns: A new `EventLoopFuture`.
public static func andAll(_ futures: [EventLoopFuture<Void>], eventLoop: EventLoop) -> EventLoopFuture<Void> {
let body = EventLoopFuture<Void>.reduce((), futures, eventLoop: eventLoop) { (_: (), _: ()) in }
return body
}

/// Returns a new `EventLoopFuture` that fires only when all the provided futures complete.
/// The new `EventLoopFuture` contains the result of reducing the `initialResult` with the
/// values of the `[EventLoopFuture<NewValue>]`.
@@ -997,9 +981,23 @@ extension EventLoopFuture {
}
}

// MARK: whenAll
// "fail fast" reduce
extension EventLoopFuture {
/// Returns a new `EventLoopFuture` that succeeds only if all of the provided futures succeed.
///
/// This method acts as a successful completion notifier - values fulfilled by each future are discarded.
///
/// The returned `EventLoopFuture` fails as soon as any of the provided futures fail.
///
/// If it is desired to always succeed, regardless of failures, use `andAllComplete` instead.
/// - Parameters:
/// - futures: An array of homogenous `EventLoopFutures`s to wait for.
/// - on: The `EventLoop` on which the new `EventLoopFuture` callbacks will execute on.
/// - Returns: A new `EventLoopFuture` that waits for the other futures to succeed.
public static func andAllSucceed(_ futures: [EventLoopFuture<Value>], on eventLoop: EventLoop) -> EventLoopFuture<Void> {
return .reduce((), futures, eventLoop: eventLoop) { (_: (), _: Value) in }
}

/// Returns a new `EventLoopFuture` that succeeds only if all of the provided futures succeed.
/// The new `EventLoopFuture` will contain all of the values fulfilled by the futures.
///
@@ -1011,6 +1009,33 @@ extension EventLoopFuture {
public static func whenAllSucceed(_ futures: [EventLoopFuture<Value>], on eventLoop: EventLoop) -> EventLoopFuture<[Value]> {
return .reduce(into: [], futures, eventLoop: eventLoop) { (results, value) in results.append(value) }
}
}

// "fail slow" reduce
extension EventLoopFuture {
/// Returns a new `EventLoopFuture` that succeeds when all of the provided `EventLoopFuture`s complete.
///
/// The returned `EventLoopFuture` always succeeds, acting as a completion notification.
/// Values fulfilled by each future are discarded.
///
/// If the results are needed, use `whenAllComplete` instead.
/// - Parameters:
/// - futures: An array of homogenous `EventLoopFuture`s to wait for.
/// - on: The `EventLoop` on which the new `EventLoopFuture` callbacks will execute on.
/// - Returns: A new `EventLoopFuture` that succeeds after all futures complete.
public static func andAllComplete(_ futures: [EventLoopFuture<Value>], on eventLoop: EventLoop) -> EventLoopFuture<Void> {
let promise = eventLoop.makePromise(of: Void.self)

if eventLoop.inEventLoop {
_reduceCompletions0(promise, futures, eventLoop, onResult: { _, _ in })
} else {
eventLoop.execute {
_reduceCompletions0(promise, futures, eventLoop, onResult: { _, _ in })
}
}

return promise.futureResult
}

/// Returns a new `EventLoopFuture` that succeeds when all of the provided `EventLoopFuture`s complete.
/// The new `EventLoopFuture` will contain an array of results, maintaining ordering for each of the `EventLoopFuture`s.
@@ -1025,44 +1050,55 @@ extension EventLoopFuture {
/// - Returns: A new `EventLoopFuture` with all the results of the provided futures.
public static func whenAllComplete(_ futures: [EventLoopFuture<Value>],
on eventLoop: EventLoop) -> EventLoopFuture<[Result<Value, Error>]> {
let promise = eventLoop.makePromise(of: [Result<Value, Error>].self)
let promise = eventLoop.makePromise(of: Void.self)

var results: [Result<Value, Error>] = .init(repeating: .failure(OperationPlaceholderError()), count: futures.count)
let callback = { (index: Int, result: Result<Value, Error>) in
results[index] = result
}

if eventLoop.inEventLoop {
_whenAllComplete0(promise, futures, eventLoop: eventLoop)
_reduceCompletions0(promise, futures, eventLoop, onResult: callback)
} else {
eventLoop.execute {
_whenAllComplete0(promise, futures, eventLoop: eventLoop)
_reduceCompletions0(promise, futures, eventLoop, onResult: callback)
}
}

return promise.futureResult
return promise.futureResult.map {
// verify that all operations have been completed
assert(!results.contains(where: {
guard case let .failure(error) = $0 else { return false }
return error is OperationPlaceholderError
}))

return results
}
}

private static func _whenAllComplete0<InputValue>(_ promise: EventLoopPromise<[Result<InputValue, Error>]>,
_ futures: [EventLoopFuture<InputValue>],
eventLoop: EventLoop) {
/// Loops through the futures array and attaches callbacks to execute `onResult` on the provided `EventLoop` when
/// they complete. The `onResult` will receive the index of the future that fulfilled the provided `Result`.
///
/// Once all the futures have completed, the provided promise will succeed.
private static func _reduceCompletions0<InputValue>(_ promise: EventLoopPromise<Void>,
_ futures: [EventLoopFuture<InputValue>],
_ eventLoop: EventLoop,
onResult: @escaping (Int, Result<InputValue, Error>) -> Void) {
eventLoop.assertInEventLoop()

var remainingCount = futures.count
var results: [Result<InputValue, Error>] = .init(repeating: .failure(OperationPlaceholderError()), count: futures.count)

// loop through the futures to chain callbacks to execute on the initiating event loop and grab their index
// in the "futures" to store their result in the same location
// in the "futures" to pass their result to the caller
for (index, future) in futures.enumerated() {
future.hopTo(eventLoop: eventLoop)
.whenComplete { result in
results[index] = result
onResult(index, result)
remainingCount -= 1

guard remainingCount == 0 else { return }

// verify that all operations have been completed
assert(!results.contains(where: {
guard case let .failure(error) = $0 else { return false }
return error is OperationPlaceholderError
}))

promise.succeed(results)
promise.succeed(())
}
}
}
@@ -696,7 +696,7 @@ internal extension Selector where R == NIORegistration {
return eventLoop.makeSucceededFuture(())
}

return EventLoopFuture<Void>.andAll(futures, eventLoop: eventLoop)
return .andAllSucceed(futures, on: eventLoop)
}
}

@@ -249,7 +249,7 @@ public class HTTPServerUpgradeHandler: ChannelInboundHandler {
return ctx.eventLoop.makeSucceededFuture(())
}

return EventLoopFuture<Void>.andAll(self.extraHTTPHandlers.map { ctx.pipeline.remove(handler: $0).map { (_: Bool) in () }},
eventLoop: ctx.eventLoop)
return .andAllSucceed(self.extraHTTPHandlers.map { ctx.pipeline.remove(handler: $0).map { (_: Bool) in () }},
on: ctx.eventLoop)
}
}
@@ -145,6 +145,11 @@ extension EventLoopFuture {
public func cascadeFailure<NewValue>(promise: EventLoopPromise<NewValue>?) {
self.cascadeFailure(to: promise)
}

@available(*, deprecated, renamed: "andAllSucceed(_:on:)")
public func andAll(_ futures: [EventLoopFuture<Void>], eventLoop: EventLoop) -> EventLoopFuture<Void> {
return .andAllSucceed(futures, on: eventLoop)
}
}

extension EventLoopPromise {
@@ -113,7 +113,7 @@ class HTTPTest: XCTestCase {
bodyData = nil
}
channel.pipeline.flush()
XCTAssertNoThrow(try EventLoopFuture<Void>.andAll(writeFutures, eventLoop: channel.eventLoop).wait())
XCTAssertNoThrow(try EventLoopFuture.andAllSucceed(writeFutures, on: channel.eventLoop).wait())
XCTAssertEqual(2 * expecteds.count, step)

if body != nil {
@@ -149,7 +149,7 @@ class HTTPTest: XCTestCase {
try chan.writeInbound(buf)
})
}
return EventLoopFuture<Void>.andAll(writeFutures, eventLoop: chan.eventLoop)
return EventLoopFuture.andAllSucceed(writeFutures, on: chan.eventLoop)
})

XCTAssertEqual(bd1, bd2)
@@ -85,7 +85,7 @@ private func serverHTTPChannelWithAutoremoval(group: EventLoopGroup,
let upgradeConfig = (upgraders: upgraders, completionHandler: upgradeCompletionHandler)
return channel.pipeline.configureHTTPServerPipeline(withPipeliningAssistance: pipelining, withServerUpgrade: upgradeConfig).flatMap {
let futureResults = extraHandlers.map { channel.pipeline.add(handler: $0) }
return EventLoopFuture<Void>.andAll(futureResults, eventLoop: channel.eventLoop)
return EventLoopFuture.andAllSucceed(futureResults, on: channel.eventLoop)
}
}.bind(host: "127.0.0.1", port: 0).wait()
return (c, p.futureResult)
@@ -100,9 +100,9 @@ public class ChannelTests: XCTestCase {
XCTAssertNoThrow(try clientChannel.close().wait())

// Wait for the close promises. These fire last.
XCTAssertNoThrow(try EventLoopFuture<Void>.andAll([clientChannel.closeFuture,
XCTAssertNoThrow(try EventLoopFuture.andAllSucceed([clientChannel.closeFuture,
serverAcceptedChannel.closeFuture],
eventLoop: group.next()).map {
on: group.next()).map {
XCTAssertEqual(clientLifecycleHandler.currentState, .unregistered)
XCTAssertEqual(serverLifecycleHandler.currentState, .unregistered)
XCTAssertEqual(clientLifecycleHandler.stateHistory, [.unregistered, .registered, .active, .inactive, .unregistered])
@@ -130,7 +130,7 @@ final class DatagramChannelTests: XCTestCase {
writeFutures.append(self.firstChannel.write(NIOAny(writeData)))
}
self.firstChannel.flush()
XCTAssertNoThrow(try EventLoopFuture<Void>.andAll(writeFutures, eventLoop: self.firstChannel.eventLoop).wait())
XCTAssertNoThrow(try EventLoopFuture.andAllSucceed(writeFutures, on: self.firstChannel.eventLoop).wait())

let reads = try self.secondChannel.waitForDatagrams(count: 5)

@@ -219,7 +219,7 @@ final class DatagramChannelTests: XCTestCase {
buffer.write(string: "a")
let envelope = AddressedEnvelope(remoteAddress: self.secondChannel.localAddress!, data: buffer)
self.firstChannel.write(NIOAny(envelope), promise: myPromise)
overall = EventLoopFuture<Void>.andAll([overall, myPromise.futureResult], eventLoop: self.firstChannel.eventLoop)
overall = EventLoopFuture.andAllSucceed([overall, myPromise.futureResult], on: self.firstChannel.eventLoop)
}
self.firstChannel.flush()
XCTAssertNoThrow(try overall.wait())
@@ -248,7 +248,7 @@ final class DatagramChannelTests: XCTestCase {
var written: Int64 = 0
while written <= lotsOfData {
self.firstChannel.write(NIOAny(envelope), promise: myPromise)
overall = EventLoopFuture<Void>.andAll([overall, myPromise.futureResult], eventLoop: self.firstChannel.eventLoop)
overall = EventLoopFuture.andAllSucceed([overall, myPromise.futureResult], on: self.firstChannel.eventLoop)
written += Int64(bufferSize)
datagrams += 1
}
@@ -232,7 +232,7 @@ class EventLoopFutureTest : XCTestCase {
let eventLoop = EmbeddedEventLoop()
let futures: [EventLoopFuture<Void>] = []

let fN: EventLoopFuture<Void> = EventLoopFuture<Void>.andAll(futures, eventLoop: eventLoop)
let fN = EventLoopFuture.andAllSucceed(futures, on: eventLoop)

XCTAssert(fN.isFulfilled)
}
@@ -242,7 +242,7 @@ class EventLoopFutureTest : XCTestCase {
let promises: [EventLoopPromise<Void>] = (0..<100).map { (_: Int) in eventLoop.makePromise() }
let futures = promises.map { $0.futureResult }

let fN: EventLoopFuture<Void> = EventLoopFuture<Void>.andAll(futures, eventLoop: eventLoop)
let fN = EventLoopFuture.andAllSucceed(futures, on: eventLoop)
_ = promises.map { $0.succeed(()) }
() = try fN.wait()
}
@@ -253,7 +253,7 @@ class EventLoopFutureTest : XCTestCase {
let promises: [EventLoopPromise<Void>] = (0..<100).map { (_: Int) in eventLoop.makePromise() }
let futures = promises.map { $0.futureResult }

let fN: EventLoopFuture<Void> = EventLoopFuture<Void>.andAll(futures, eventLoop: eventLoop)
let fN = EventLoopFuture.andAllSucceed(futures, on: eventLoop)
_ = promises.map { $0.fail(E()) }
do {
() = try fN.wait()
@@ -276,7 +276,7 @@ class EventLoopFutureTest : XCTestCase {

let futures = promises.map { $0.futureResult }

let fN: EventLoopFuture<Void> = EventLoopFuture<Void>.andAll(futures, eventLoop: eventLoop)
let fN = EventLoopFuture.andAllSucceed(futures, on: eventLoop)
do {
() = try fN.wait()
XCTFail("should've thrown an error")
@@ -638,7 +638,7 @@ class EventLoopFutureTest : XCTestCase {
let ps = (0..<n).map { (_: Int) -> EventLoopPromise<Void> in
elg.next().makePromise()
}
let allOfEm = EventLoopFuture<Void>.andAll(ps.map { $0.futureResult }, eventLoop: elg.next())
let allOfEm = EventLoopFuture.andAllSucceed(ps.map { $0.futureResult }, on: elg.next())
ps.reversed().forEach { p in
DispatchQueue.global().async {
p.succeed(())
@@ -656,7 +656,7 @@ class EventLoopFutureTest : XCTestCase {
let ps = (0..<n).map { (_: Int) -> EventLoopPromise<Void> in
elg.next().makePromise()
}
let allOfEm = EventLoopFuture<Void>.andAll(ps.map { $0.futureResult }, eventLoop: fireBackEl.next())
let allOfEm = EventLoopFuture.andAllSucceed(ps.map { $0.futureResult }, on: fireBackEl.next())
ps.reversed().enumerated().forEach { idx, p in
DispatchQueue.global().async {
if idx == n / 2 {
@@ -285,8 +285,8 @@ class SelectorTest: XCTestCase {
}

// if all the new re-connected channels have read, then we're happy here.
EventLoopFuture<Void>.andAll(reconnectedChannelsHaveRead,
eventLoop: ctx.eventLoop).cascade(to: self.everythingWasReadPromise)
EventLoopFuture.andAllSucceed(reconnectedChannelsHaveRead, on: ctx.eventLoop)
.cascade(to: self.everythingWasReadPromise)
// let's also remove all the channels so this code will not be triggered again.
self.allChannels.value.removeAll()
}
@@ -42,3 +42,4 @@
- renamed `HTTPProtocolUpgrader` to `HTTPServerProtocolUpgrader`
- `EventLoopFuture.cascade(promise: EventLoopPromise)` had its label changed to `EventLoopFuture.cascade(to: EventLoopPromise)`
- `EventLoopFuture.cascadeFailure(promise: EventLoopPromise)` had its label changed to `EventLoopFuture.cascade(to: EventLoopPromise)`
- renamed `EventLoopFuture.andAll(_:eventLoop:)` to `EventLoopFuture.andAllSucceed(_:on:)`

0 comments on commit 06fb45d

Please sign in to comment.