Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions Sources/NIO/EventLoopFuture.swift
Original file line number Diff line number Diff line change
Expand Up @@ -815,6 +815,37 @@ extension EventLoopFuture {
}
}

extension EventLoopFuture {
/// Returns a new `EventLoopFuture` that fires only when this `EventLoopFuture` and
/// all the provided `futures` complete. It then provides the result of folding the value of this
/// `EventLoopFuture` with the values of all the provided `futures`.
///
/// This function is suited when you have APIs that already know how to return `EventLoopFuture`s.
///
/// The returned `EventLoopFuture` will fail as soon as the a failure is encountered in any of the
/// `futures` (or in this one). However, the failure will not occur until all preceding
/// `EventLoopFutures` have completed. At the point the failure is encountered, all subsequent
/// `EventLoopFuture` objects will no longer be waited for. This function therefore fails fast: once
/// a failure is encountered, it will immediately fail the overall EventLoopFuture.
///
/// - parameters:
/// - futures: An array of `EventLoopFuture<U>` to wait for.
/// - with: A function that will be used to fold the values of two `EventLoopFuture`s and return a new value wrapped in an `EventLoopFuture`.
/// - returns: A new `EventLoopFuture` with the folded value whose callbacks run on `self.eventLoop`.
public func fold<U>(_ futures: [EventLoopFuture<U>], with combiningFunction: @escaping (T, U) -> EventLoopFuture<T>) -> EventLoopFuture<T> {
let body = futures.reduce(self) { (f1: EventLoopFuture<T>, f2: EventLoopFuture<U>) -> EventLoopFuture<T> in
let newFuture = f1.and(f2).then { (args: (T, U)) -> EventLoopFuture<T> in
let (f1Value, f2Value) = args
assert(self.eventLoop.inEventLoop)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but what if we're dealing with different event loops?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@weissi It doesn't matter. and's then will execute on f1's event loop, and the first f1 is self, and then the future returned from combiningFunction will be moved over to the f1 loop (because that's how then works.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah yes, that makes sense. Thank you!

return combiningFunction(f1Value, f2Value)
}
assert(newFuture.eventLoop === self.eventLoop)
return newFuture
}
return body
}
}

extension EventLoopFuture {
/// Returns a new `EventLoopFuture` that fires only when all the provided futures complete.
///
Expand Down
8 changes: 8 additions & 0 deletions Tests/NIOTests/EventLoopFutureTest+XCTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@ extension EventLoopFutureTest {
return [
("testFutureFulfilledIfHasResult", testFutureFulfilledIfHasResult),
("testFutureFulfilledIfHasError", testFutureFulfilledIfHasError),
("testFoldWithMultipleEventLoops", testFoldWithMultipleEventLoops),
("testFoldWithSuccessAndAllSuccesses", testFoldWithSuccessAndAllSuccesses),
("testFoldWithSuccessAndOneFailure", testFoldWithSuccessAndOneFailure),
("testFoldWithSuccessAndEmptyFutureList", testFoldWithSuccessAndEmptyFutureList),
("testFoldWithFailureAndEmptyFutureList", testFoldWithFailureAndEmptyFutureList),
("testFoldWithFailureAndAllSuccesses", testFoldWithFailureAndAllSuccesses),
("testFoldWithFailureAndAllUnfulfilled", testFoldWithFailureAndAllUnfulfilled),
("testFoldWithFailureAndAllFailures", testFoldWithFailureAndAllFailures),
("testAndAllWithAllSuccesses", testAndAllWithAllSuccesses),
("testAndAllWithAllFailures", testAndAllWithAllFailures),
("testAndAllWithOneFailure", testAndAllWithOneFailure),
Expand Down
195 changes: 195 additions & 0 deletions Tests/NIOTests/EventLoopFutureTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,201 @@ class EventLoopFutureTest : XCTestCase {
let f = EventLoopFuture<Void>(eventLoop: eventLoop, error: EventLoopFutureTestError.example, file: #file, line: #line)
XCTAssertTrue(f.isFulfilled)
}

func testFoldWithMultipleEventLoops() throws {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@weissi Does this test suffice for multiple eventLoops?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, thanks!

let nThreads = 3
let eventLoopGroup = MultiThreadedEventLoopGroup(numThreads: nThreads)
defer {
XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully())
}

let eventLoop0 = eventLoopGroup.next()
let eventLoop1 = eventLoopGroup.next()
let eventLoop2 = eventLoopGroup.next()

XCTAssert(eventLoop0 !== eventLoop1)
XCTAssert(eventLoop1 !== eventLoop2)
XCTAssert(eventLoop0 !== eventLoop2)

let f0: EventLoopFuture<[Int]> = eventLoop0.submit { [0] }
let f1s: [EventLoopFuture<Int>] = (1...4).map { id in eventLoop1.submit { id } }
let f2s: [EventLoopFuture<Int>] = (5...8).map { id in eventLoop2.submit { id } }

var fN = f0.fold(f1s) { (f1Value: [Int], f2Value: Int) -> EventLoopFuture<[Int]> in
XCTAssert(eventLoop0.inEventLoop)
return eventLoop1.newSucceededFuture(result: f1Value + [f2Value])
}

fN = fN.fold(f2s) { (f1Value: [Int], f2Value: Int) -> EventLoopFuture<[Int]> in
XCTAssert(eventLoop0.inEventLoop)
return eventLoop2.newSucceededFuture(result: f1Value + [f2Value])
}

let allValues = try fN.wait()
XCTAssert(fN.eventLoop === f0.eventLoop)
XCTAssert(fN.isFulfilled)
XCTAssertEqual(allValues, [0, 1, 2, 3, 4, 5, 6, 7, 8])
}

func testFoldWithSuccessAndAllSuccesses() throws {
let eventLoop = EmbeddedEventLoop()
let secondEventLoop = EmbeddedEventLoop()
let f0 = eventLoop.newSucceededFuture(result: [0])

let futures: [EventLoopFuture<Int>] = (1...5).map { (id: Int) in secondEventLoop.newSucceededFuture(result: id) }

let fN = f0.fold(futures) { (f1Value: [Int], f2Value: Int) -> EventLoopFuture<[Int]> in
XCTAssert(eventLoop.inEventLoop)
return secondEventLoop.newSucceededFuture(result: f1Value + [f2Value])
}

let allValues = try fN.wait()
XCTAssert(fN.eventLoop === f0.eventLoop)
XCTAssert(fN.isFulfilled)
XCTAssertEqual(allValues, [0, 1, 2, 3, 4, 5])
}

func testFoldWithSuccessAndOneFailure() throws {
struct E: Error {}
let eventLoop = EmbeddedEventLoop()
let secondEventLoop = EmbeddedEventLoop()
let f0: EventLoopFuture<Int> = eventLoop.newSucceededFuture(result: 0)

let promises: [EventLoopPromise<Int>] = (0..<100).map { (_: Int) in secondEventLoop.newPromise() }
var futures = promises.map { $0.futureResult }
let failedFuture: EventLoopFuture<Int> = secondEventLoop.newFailedFuture(error: E())
futures.insert(failedFuture, at: futures.startIndex)

let fN = f0.fold(futures) { (f1Value: Int, f2Value: Int) -> EventLoopFuture<Int> in
XCTAssert(eventLoop.inEventLoop)
return secondEventLoop.newSucceededFuture(result: f1Value + f2Value)
}

_ = promises.map { $0.succeed(result: 0) }
XCTAssert(fN.isFulfilled)
do {
_ = try fN.wait()
XCTFail("should've thrown an error")
} catch _ as E {
/* good */
} catch let e {
XCTFail("error of wrong type \(e)")
}
}

func testFoldWithSuccessAndEmptyFutureList() throws {
let eventLoop = EmbeddedEventLoop()
let f0 = eventLoop.newSucceededFuture(result: 0)

let futures: [EventLoopFuture<Int>] = []

let fN = f0.fold(futures) { (f1Value: Int, f2Value: Int) -> EventLoopFuture<Int> in
XCTAssert(eventLoop.inEventLoop)
return eventLoop.newSucceededFuture(result: f1Value + f2Value)
}

let summationResult = try fN.wait()
XCTAssert(fN.isFulfilled)
XCTAssertEqual(summationResult, 0)
}

func testFoldWithFailureAndEmptyFutureList() throws {
struct E: Error {}
let eventLoop = EmbeddedEventLoop()
let f0: EventLoopFuture<Int> = eventLoop.newFailedFuture(error: E())

let futures: [EventLoopFuture<Int>] = []

let fN = f0.fold(futures) { (f1Value: Int, f2Value: Int) -> EventLoopFuture<Int> in
XCTAssert(eventLoop.inEventLoop)
return eventLoop.newSucceededFuture(result: f1Value + f2Value)
}

XCTAssert(fN.isFulfilled)
do {
_ = try fN.wait()
XCTFail("should've thrown an error")
} catch _ as E {
/* good */
} catch let e {
XCTFail("error of wrong type \(e)")
}
}

func testFoldWithFailureAndAllSuccesses() throws {
struct E: Error {}
let eventLoop = EmbeddedEventLoop()
let secondEventLoop = EmbeddedEventLoop()
let f0: EventLoopFuture<Int> = eventLoop.newFailedFuture(error: E())

let promises: [EventLoopPromise<Int>] = (0..<100).map { (_: Int) in secondEventLoop.newPromise() }
let futures = promises.map { $0.futureResult }

let fN = f0.fold(futures) { (f1Value: Int, f2Value: Int) -> EventLoopFuture<Int> in
XCTAssert(eventLoop.inEventLoop)
return secondEventLoop.newSucceededFuture(result: f1Value + f2Value)
}

_ = promises.map { $0.succeed(result: 1) }
XCTAssert(fN.isFulfilled)
do {
_ = try fN.wait()
XCTFail("should've thrown an error")
} catch _ as E {
/* good */
} catch let e {
XCTFail("error of wrong type \(e)")
}
}

func testFoldWithFailureAndAllUnfulfilled() throws {
struct E: Error {}
let eventLoop = EmbeddedEventLoop()
let secondEventLoop = EmbeddedEventLoop()
let f0: EventLoopFuture<Int> = eventLoop.newFailedFuture(error: E())

let promises: [EventLoopPromise<Int>] = (0..<100).map { (_: Int) in secondEventLoop.newPromise() }
let futures = promises.map { $0.futureResult }

let fN = f0.fold(futures) { (f1Value: Int, f2Value: Int) -> EventLoopFuture<Int> in
XCTAssert(eventLoop.inEventLoop)
return secondEventLoop.newSucceededFuture(result: f1Value + f2Value)
}

XCTAssert(fN.isFulfilled)
do {
_ = try fN.wait()
XCTFail("should've thrown an error")
} catch _ as E {
/* good */
} catch let e {
XCTFail("error of wrong type \(e)")
}
}

func testFoldWithFailureAndAllFailures() throws {
struct E: Error {}
let eventLoop = EmbeddedEventLoop()
let secondEventLoop = EmbeddedEventLoop()
let f0: EventLoopFuture<Int> = eventLoop.newFailedFuture(error: E())

let futures: [EventLoopFuture<Int>] = (0..<100).map { (_: Int) in secondEventLoop.newFailedFuture(error: E()) }

let fN = f0.fold(futures) { (f1Value: Int, f2Value: Int) -> EventLoopFuture<Int> in
XCTAssert(eventLoop.inEventLoop)
return secondEventLoop.newSucceededFuture(result: f1Value + f2Value)
}

XCTAssert(fN.isFulfilled)
do {
_ = try fN.wait()
XCTFail("should've thrown an error")
} catch _ as E {
/* good */
} catch let e {
XCTFail("error of wrong type \(e)")
}
}

func testAndAllWithAllSuccesses() throws {
let eventLoop = EmbeddedEventLoop()
Expand Down