diff --git a/Sources/NIO/EventLoopFuture.swift b/Sources/NIO/EventLoopFuture.swift index 123e946697b..a23267d9b7e 100644 --- a/Sources/NIO/EventLoopFuture.swift +++ b/Sources/NIO/EventLoopFuture.swift @@ -815,6 +815,37 @@ extension EventLoopFuture { } } +extension EventLoopFuture { + /// Returns a new `EventLoopFuture` that fires only when this `EventLoopFuture` and + /// all the provided `futures` complete. It then provides the result of folding the value of this + /// `EventLoopFuture` with the values of all the provided `futures`. + /// + /// This function is suited when you have APIs that already know how to return `EventLoopFuture`s. + /// + /// The returned `EventLoopFuture` will fail as soon as the a failure is encountered in any of the + /// `futures` (or in this one). However, the failure will not occur until all preceding + /// `EventLoopFutures` have completed. At the point the failure is encountered, all subsequent + /// `EventLoopFuture` objects will no longer be waited for. This function therefore fails fast: once + /// a failure is encountered, it will immediately fail the overall EventLoopFuture. + /// + /// - parameters: + /// - futures: An array of `EventLoopFuture` to wait for. + /// - with: A function that will be used to fold the values of two `EventLoopFuture`s and return a new value wrapped in an `EventLoopFuture`. + /// - returns: A new `EventLoopFuture` with the folded value whose callbacks run on `self.eventLoop`. + public func fold(_ futures: [EventLoopFuture], with combiningFunction: @escaping (T, U) -> EventLoopFuture) -> EventLoopFuture { + let body = futures.reduce(self) { (f1: EventLoopFuture, f2: EventLoopFuture) -> EventLoopFuture in + let newFuture = f1.and(f2).then { (args: (T, U)) -> EventLoopFuture in + let (f1Value, f2Value) = args + assert(self.eventLoop.inEventLoop) + return combiningFunction(f1Value, f2Value) + } + assert(newFuture.eventLoop === self.eventLoop) + return newFuture + } + return body + } +} + extension EventLoopFuture { /// Returns a new `EventLoopFuture` that fires only when all the provided futures complete. /// diff --git a/Tests/NIOTests/EventLoopFutureTest+XCTest.swift b/Tests/NIOTests/EventLoopFutureTest+XCTest.swift index 1e84afe1e0b..66f3dd7a983 100644 --- a/Tests/NIOTests/EventLoopFutureTest+XCTest.swift +++ b/Tests/NIOTests/EventLoopFutureTest+XCTest.swift @@ -28,6 +28,14 @@ extension EventLoopFutureTest { return [ ("testFutureFulfilledIfHasResult", testFutureFulfilledIfHasResult), ("testFutureFulfilledIfHasError", testFutureFulfilledIfHasError), + ("testFoldWithMultipleEventLoops", testFoldWithMultipleEventLoops), + ("testFoldWithSuccessAndAllSuccesses", testFoldWithSuccessAndAllSuccesses), + ("testFoldWithSuccessAndOneFailure", testFoldWithSuccessAndOneFailure), + ("testFoldWithSuccessAndEmptyFutureList", testFoldWithSuccessAndEmptyFutureList), + ("testFoldWithFailureAndEmptyFutureList", testFoldWithFailureAndEmptyFutureList), + ("testFoldWithFailureAndAllSuccesses", testFoldWithFailureAndAllSuccesses), + ("testFoldWithFailureAndAllUnfulfilled", testFoldWithFailureAndAllUnfulfilled), + ("testFoldWithFailureAndAllFailures", testFoldWithFailureAndAllFailures), ("testAndAllWithAllSuccesses", testAndAllWithAllSuccesses), ("testAndAllWithAllFailures", testAndAllWithAllFailures), ("testAndAllWithOneFailure", testAndAllWithOneFailure), diff --git a/Tests/NIOTests/EventLoopFutureTest.swift b/Tests/NIOTests/EventLoopFutureTest.swift index b31ca291f09..31a6a12f688 100644 --- a/Tests/NIOTests/EventLoopFutureTest.swift +++ b/Tests/NIOTests/EventLoopFutureTest.swift @@ -32,6 +32,201 @@ class EventLoopFutureTest : XCTestCase { let f = EventLoopFuture(eventLoop: eventLoop, error: EventLoopFutureTestError.example, file: #file, line: #line) XCTAssertTrue(f.isFulfilled) } + + func testFoldWithMultipleEventLoops() throws { + let nThreads = 3 + let eventLoopGroup = MultiThreadedEventLoopGroup(numThreads: nThreads) + defer { + XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) + } + + let eventLoop0 = eventLoopGroup.next() + let eventLoop1 = eventLoopGroup.next() + let eventLoop2 = eventLoopGroup.next() + + XCTAssert(eventLoop0 !== eventLoop1) + XCTAssert(eventLoop1 !== eventLoop2) + XCTAssert(eventLoop0 !== eventLoop2) + + let f0: EventLoopFuture<[Int]> = eventLoop0.submit { [0] } + let f1s: [EventLoopFuture] = (1...4).map { id in eventLoop1.submit { id } } + let f2s: [EventLoopFuture] = (5...8).map { id in eventLoop2.submit { id } } + + var fN = f0.fold(f1s) { (f1Value: [Int], f2Value: Int) -> EventLoopFuture<[Int]> in + XCTAssert(eventLoop0.inEventLoop) + return eventLoop1.newSucceededFuture(result: f1Value + [f2Value]) + } + + fN = fN.fold(f2s) { (f1Value: [Int], f2Value: Int) -> EventLoopFuture<[Int]> in + XCTAssert(eventLoop0.inEventLoop) + return eventLoop2.newSucceededFuture(result: f1Value + [f2Value]) + } + + let allValues = try fN.wait() + XCTAssert(fN.eventLoop === f0.eventLoop) + XCTAssert(fN.isFulfilled) + XCTAssertEqual(allValues, [0, 1, 2, 3, 4, 5, 6, 7, 8]) + } + + func testFoldWithSuccessAndAllSuccesses() throws { + let eventLoop = EmbeddedEventLoop() + let secondEventLoop = EmbeddedEventLoop() + let f0 = eventLoop.newSucceededFuture(result: [0]) + + let futures: [EventLoopFuture] = (1...5).map { (id: Int) in secondEventLoop.newSucceededFuture(result: id) } + + let fN = f0.fold(futures) { (f1Value: [Int], f2Value: Int) -> EventLoopFuture<[Int]> in + XCTAssert(eventLoop.inEventLoop) + return secondEventLoop.newSucceededFuture(result: f1Value + [f2Value]) + } + + let allValues = try fN.wait() + XCTAssert(fN.eventLoop === f0.eventLoop) + XCTAssert(fN.isFulfilled) + XCTAssertEqual(allValues, [0, 1, 2, 3, 4, 5]) + } + + func testFoldWithSuccessAndOneFailure() throws { + struct E: Error {} + let eventLoop = EmbeddedEventLoop() + let secondEventLoop = EmbeddedEventLoop() + let f0: EventLoopFuture = eventLoop.newSucceededFuture(result: 0) + + let promises: [EventLoopPromise] = (0..<100).map { (_: Int) in secondEventLoop.newPromise() } + var futures = promises.map { $0.futureResult } + let failedFuture: EventLoopFuture = secondEventLoop.newFailedFuture(error: E()) + futures.insert(failedFuture, at: futures.startIndex) + + let fN = f0.fold(futures) { (f1Value: Int, f2Value: Int) -> EventLoopFuture in + XCTAssert(eventLoop.inEventLoop) + return secondEventLoop.newSucceededFuture(result: f1Value + f2Value) + } + + _ = promises.map { $0.succeed(result: 0) } + XCTAssert(fN.isFulfilled) + do { + _ = try fN.wait() + XCTFail("should've thrown an error") + } catch _ as E { + /* good */ + } catch let e { + XCTFail("error of wrong type \(e)") + } + } + + func testFoldWithSuccessAndEmptyFutureList() throws { + let eventLoop = EmbeddedEventLoop() + let f0 = eventLoop.newSucceededFuture(result: 0) + + let futures: [EventLoopFuture] = [] + + let fN = f0.fold(futures) { (f1Value: Int, f2Value: Int) -> EventLoopFuture in + XCTAssert(eventLoop.inEventLoop) + return eventLoop.newSucceededFuture(result: f1Value + f2Value) + } + + let summationResult = try fN.wait() + XCTAssert(fN.isFulfilled) + XCTAssertEqual(summationResult, 0) + } + + func testFoldWithFailureAndEmptyFutureList() throws { + struct E: Error {} + let eventLoop = EmbeddedEventLoop() + let f0: EventLoopFuture = eventLoop.newFailedFuture(error: E()) + + let futures: [EventLoopFuture] = [] + + let fN = f0.fold(futures) { (f1Value: Int, f2Value: Int) -> EventLoopFuture in + XCTAssert(eventLoop.inEventLoop) + return eventLoop.newSucceededFuture(result: f1Value + f2Value) + } + + XCTAssert(fN.isFulfilled) + do { + _ = try fN.wait() + XCTFail("should've thrown an error") + } catch _ as E { + /* good */ + } catch let e { + XCTFail("error of wrong type \(e)") + } + } + + func testFoldWithFailureAndAllSuccesses() throws { + struct E: Error {} + let eventLoop = EmbeddedEventLoop() + let secondEventLoop = EmbeddedEventLoop() + let f0: EventLoopFuture = eventLoop.newFailedFuture(error: E()) + + let promises: [EventLoopPromise] = (0..<100).map { (_: Int) in secondEventLoop.newPromise() } + let futures = promises.map { $0.futureResult } + + let fN = f0.fold(futures) { (f1Value: Int, f2Value: Int) -> EventLoopFuture in + XCTAssert(eventLoop.inEventLoop) + return secondEventLoop.newSucceededFuture(result: f1Value + f2Value) + } + + _ = promises.map { $0.succeed(result: 1) } + XCTAssert(fN.isFulfilled) + do { + _ = try fN.wait() + XCTFail("should've thrown an error") + } catch _ as E { + /* good */ + } catch let e { + XCTFail("error of wrong type \(e)") + } + } + + func testFoldWithFailureAndAllUnfulfilled() throws { + struct E: Error {} + let eventLoop = EmbeddedEventLoop() + let secondEventLoop = EmbeddedEventLoop() + let f0: EventLoopFuture = eventLoop.newFailedFuture(error: E()) + + let promises: [EventLoopPromise] = (0..<100).map { (_: Int) in secondEventLoop.newPromise() } + let futures = promises.map { $0.futureResult } + + let fN = f0.fold(futures) { (f1Value: Int, f2Value: Int) -> EventLoopFuture in + XCTAssert(eventLoop.inEventLoop) + return secondEventLoop.newSucceededFuture(result: f1Value + f2Value) + } + + XCTAssert(fN.isFulfilled) + do { + _ = try fN.wait() + XCTFail("should've thrown an error") + } catch _ as E { + /* good */ + } catch let e { + XCTFail("error of wrong type \(e)") + } + } + + func testFoldWithFailureAndAllFailures() throws { + struct E: Error {} + let eventLoop = EmbeddedEventLoop() + let secondEventLoop = EmbeddedEventLoop() + let f0: EventLoopFuture = eventLoop.newFailedFuture(error: E()) + + let futures: [EventLoopFuture] = (0..<100).map { (_: Int) in secondEventLoop.newFailedFuture(error: E()) } + + let fN = f0.fold(futures) { (f1Value: Int, f2Value: Int) -> EventLoopFuture in + XCTAssert(eventLoop.inEventLoop) + return secondEventLoop.newSucceededFuture(result: f1Value + f2Value) + } + + XCTAssert(fN.isFulfilled) + do { + _ = try fN.wait() + XCTFail("should've thrown an error") + } catch _ as E { + /* good */ + } catch let e { + XCTFail("error of wrong type \(e)") + } + } func testAndAllWithAllSuccesses() throws { let eventLoop = EmbeddedEventLoop()