diff --git a/Sources/AsyncAlgorithms/AsyncInclusiveReductionsSequence.swift b/Sources/AsyncAlgorithms/AsyncInclusiveReductionsSequence.swift new file mode 100644 index 00000000..5cdbc0ed --- /dev/null +++ b/Sources/AsyncAlgorithms/AsyncInclusiveReductionsSequence.swift @@ -0,0 +1,83 @@ +extension AsyncSequence { + /// Returns an asynchronous sequence containing the accumulated results of combining the + /// elements of the asynchronous sequence using the given closure. + /// + /// This can be seen as applying the reduce function to each element and + /// providing the initial value followed by these results as an asynchronous sequence. + /// + /// ``` + /// let runningTotal = [1, 2, 3, 4].async.reductions(+) + /// print(await Array(runningTotal)) + /// + /// // prints [1, 3, 6, 10] + /// ``` + /// + /// - Parameter transform: A closure that combines the previously reduced + /// result and the next element in the receiving sequence. + /// - Returns: An asynchronous sequence of the reduced elements. + @inlinable + public func reductions( + _ transform: @Sendable @escaping (Element, Element) async -> Element + ) -> AsyncInclusiveReductionsSequence { + AsyncInclusiveReductionsSequence(self, transform: transform) + } +} + +@frozen +public struct AsyncInclusiveReductionsSequence { + @usableFromInline + let base: Base + + @usableFromInline + let transform: @Sendable (Base.Element, Base.Element) async -> Base.Element + + @inlinable + init(_ base: Base, transform: @Sendable @escaping (Base.Element, Base.Element) async -> Base.Element) { + self.base = base + self.transform = transform + } +} + +extension AsyncInclusiveReductionsSequence: AsyncSequence { + public typealias Element = Base.Element + + @frozen + public struct Iterator: AsyncIteratorProtocol { + @usableFromInline + internal var iterator: Base.AsyncIterator + + @usableFromInline + internal var element: Base.Element? + + @usableFromInline + internal let transform: @Sendable (Base.Element, Base.Element) async -> Base.Element + + @inlinable + internal init( + _ iterator: Base.AsyncIterator, + transform: @Sendable @escaping (Base.Element, Base.Element) async -> Base.Element + ) { + self.iterator = iterator + self.transform = transform + } + + @inlinable + public mutating func next() async rethrows -> Base.Element? { + guard let previous = element else { + element = try await iterator.next() + return element + } + guard let next = try await iterator.next() else { return nil } + element = await transform(previous, next) + return element + } + } + + @inlinable + public func makeAsyncIterator() -> Iterator { + Iterator(base.makeAsyncIterator(), transform: transform) + } +} + +extension AsyncInclusiveReductionsSequence: Sendable where Base: Sendable { } +extension AsyncInclusiveReductionsSequence.Iterator: Sendable where Base.AsyncIterator: Sendable, Base.Element: Sendable { } diff --git a/Sources/AsyncAlgorithms/AsyncThrowingInclusiveReductionsSequence.swift b/Sources/AsyncAlgorithms/AsyncThrowingInclusiveReductionsSequence.swift new file mode 100644 index 00000000..f41d874b --- /dev/null +++ b/Sources/AsyncAlgorithms/AsyncThrowingInclusiveReductionsSequence.swift @@ -0,0 +1,88 @@ +extension AsyncSequence { + /// Returns an asynchronous sequence containing the accumulated results of combining the + /// elements of the asynchronous sequence using the given closure. + /// + /// This can be seen as applying the reduce function to each element and + /// providing the initial value followed by these results as an asynchronous sequence. + /// + /// ``` + /// let runningTotal = [1, 2, 3, 4].async.reductions(+) + /// print(await Array(runningTotal)) + /// + /// // prints [1, 3, 6, 10] + /// ``` + /// + /// - Parameter transform: A closure that combines the previously reduced + /// result and the next element in the receiving sequence. + /// - Returns: An asynchronous sequence of the reduced elements. + @inlinable + public func reductions( + _ transform: @Sendable @escaping (Element, Element) async throws -> Element + ) -> AsyncThrowingInclusiveReductionsSequence { + AsyncThrowingInclusiveReductionsSequence(self, transform: transform) + } +} + +@frozen +public struct AsyncThrowingInclusiveReductionsSequence { + @usableFromInline + let base: Base + + @usableFromInline + let transform: @Sendable (Base.Element, Base.Element) async throws -> Base.Element + + @inlinable + init(_ base: Base, transform: @Sendable @escaping (Base.Element, Base.Element) async throws -> Base.Element) { + self.base = base + self.transform = transform + } +} + +extension AsyncThrowingInclusiveReductionsSequence: AsyncSequence { + public typealias Element = Base.Element + + @frozen + public struct Iterator: AsyncIteratorProtocol { + @usableFromInline + internal var iterator: Base.AsyncIterator? + + @usableFromInline + internal var element: Base.Element? + + @usableFromInline + internal let transform: @Sendable (Base.Element, Base.Element) async throws -> Base.Element + + @inlinable + internal init( + _ iterator: Base.AsyncIterator, + transform: @Sendable @escaping (Base.Element, Base.Element) async throws -> Base.Element + ) { + self.iterator = iterator + self.transform = transform + } + + @inlinable + public mutating func next() async throws -> Base.Element? { + guard let previous = element else { + element = try await iterator?.next() + return element + } + guard let next = try await iterator?.next() else { return nil } + do { + element = try await transform(previous, next) + } catch { + iterator = nil + throw error + } + return element + } + } + + @inlinable + public func makeAsyncIterator() -> Iterator { + Iterator(base.makeAsyncIterator(), transform: transform) + } +} + +extension AsyncThrowingInclusiveReductionsSequence: Sendable where Base: Sendable { } +extension AsyncThrowingInclusiveReductionsSequence.Iterator: Sendable where Base.AsyncIterator: Sendable, Base.Element: Sendable { } diff --git a/Tests/AsyncAlgorithmsTests/TestReductions.swift b/Tests/AsyncAlgorithmsTests/TestReductions.swift index 4aef2658..82a0132b 100644 --- a/Tests/AsyncAlgorithmsTests/TestReductions.swift +++ b/Tests/AsyncAlgorithmsTests/TestReductions.swift @@ -27,6 +27,18 @@ final class TestReductions: XCTestCase { XCTAssertNil(pastEnd) } + func test_inclusive_reductions() async { + let sequence = [1, 2, 3, 4].async.reductions { $0 + $1 } + var iterator = sequence.makeAsyncIterator() + var collected = [Int]() + while let item = await iterator.next() { + collected.append(item) + } + XCTAssertEqual(collected, [1, 3, 6, 10]) + let pastEnd = await iterator.next() + XCTAssertNil(pastEnd) + } + func test_throw_upstream_reductions() async throws { let sequence = [1, 2, 3, 4].async .map { try throwOn(3, $0) } @@ -48,6 +60,25 @@ final class TestReductions: XCTestCase { XCTAssertNil(pastEnd) } + func test_throw_upstream_inclusive_reductions() async throws { + let sequence = [1, 2, 3, 4].async + .map { try throwOn(3, $0) } + .reductions { $0 + $1 } + var iterator = sequence.makeAsyncIterator() + var collected = [Int]() + do { + while let item = try await iterator.next() { + collected.append(item) + } + XCTFail() + } catch { + XCTAssertEqual(error as? Failure, Failure()) + } + XCTAssertEqual(collected, [1, 3]) + let pastEnd = try await iterator.next() + XCTAssertNil(pastEnd) + } + func test_throwing_reductions() async throws { let sequence = [1, 2, 3, 4].async.reductions("") { (partial, value) throws -> String in partial + "\(value)" @@ -62,6 +93,20 @@ final class TestReductions: XCTestCase { XCTAssertNil(pastEnd) } + func test_throwing_inclusive_reductions() async throws { + let sequence = [1, 2, 3, 4].async.reductions { (lhs, rhs) throws -> Int in + lhs + rhs + } + var iterator = sequence.makeAsyncIterator() + var collected = [Int]() + while let item = try await iterator.next() { + collected.append(item) + } + XCTAssertEqual(collected, [1, 3, 6, 10]) + let pastEnd = try await iterator.next() + XCTAssertNil(pastEnd) + } + func test_throw_upstream_reductions_throws() async throws { let sequence = [1, 2, 3, 4].async .map { try throwOn(3, $0) } @@ -83,6 +128,27 @@ final class TestReductions: XCTestCase { XCTAssertNil(pastEnd) } + func test_throw_upstream_inclusive_reductions_throws() async throws { + let sequence = [1, 2, 3, 4].async + .map { try throwOn(3, $0) } + .reductions { (lhs, rhs) throws -> Int in + lhs + rhs + } + var iterator = sequence.makeAsyncIterator() + var collected = [Int]() + do { + while let item = try await iterator.next() { + collected.append(item) + } + XCTFail() + } catch { + XCTAssertEqual(error as? Failure, Failure()) + } + XCTAssertEqual(collected, [1, 3]) + let pastEnd = try await iterator.next() + XCTAssertNil(pastEnd) + } + func test_reductions_into() async { let sequence = [1, 2, 3, 4].async.reductions(into: "") { partial, value in partial.append("\(value)")