Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 83 additions & 0 deletions Sources/AsyncAlgorithms/AsyncInclusiveReductionsSequence.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
extension AsyncSequence {
/// Returns an asynchronous sequence containing the accumulated results of combining the
/// elements of the asynchronous sequence using the given closure.
///
/// This can be seen as applying the reduce function to each element and
/// providing the initial value followed by these results as an asynchronous sequence.
///
/// ```
/// let runningTotal = [1, 2, 3, 4].async.reductions(+)
/// print(await Array(runningTotal))
///
/// // prints [1, 3, 6, 10]
/// ```
///
/// - Parameter transform: A closure that combines the previously reduced
/// result and the next element in the receiving sequence.
/// - Returns: An asynchronous sequence of the reduced elements.
@inlinable
public func reductions(
_ transform: @Sendable @escaping (Element, Element) async -> Element
) -> AsyncInclusiveReductionsSequence<Self> {
AsyncInclusiveReductionsSequence(self, transform: transform)
}
}

@frozen
public struct AsyncInclusiveReductionsSequence<Base: AsyncSequence> {
@usableFromInline
let base: Base

@usableFromInline
let transform: @Sendable (Base.Element, Base.Element) async -> Base.Element

@inlinable
init(_ base: Base, transform: @Sendable @escaping (Base.Element, Base.Element) async -> Base.Element) {
self.base = base
self.transform = transform
}
}

extension AsyncInclusiveReductionsSequence: AsyncSequence {
public typealias Element = Base.Element

@frozen
public struct Iterator: AsyncIteratorProtocol {
@usableFromInline
internal var iterator: Base.AsyncIterator

@usableFromInline
internal var element: Base.Element?

@usableFromInline
internal let transform: @Sendable (Base.Element, Base.Element) async -> Base.Element

@inlinable
internal init(
_ iterator: Base.AsyncIterator,
transform: @Sendable @escaping (Base.Element, Base.Element) async -> Base.Element
) {
self.iterator = iterator
self.transform = transform
}

@inlinable
public mutating func next() async rethrows -> Base.Element? {
guard let previous = element else {
element = try await iterator.next()
return element
}
guard let next = try await iterator.next() else { return nil }
element = await transform(previous, next)
return element
}
}

@inlinable
public func makeAsyncIterator() -> Iterator {
Iterator(base.makeAsyncIterator(), transform: transform)
}
}

extension AsyncInclusiveReductionsSequence: Sendable where Base: Sendable { }
extension AsyncInclusiveReductionsSequence.Iterator: Sendable where Base.AsyncIterator: Sendable, Base.Element: Sendable { }
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
extension AsyncSequence {
/// Returns an asynchronous sequence containing the accumulated results of combining the
/// elements of the asynchronous sequence using the given closure.
///
/// This can be seen as applying the reduce function to each element and
/// providing the initial value followed by these results as an asynchronous sequence.
///
/// ```
/// let runningTotal = [1, 2, 3, 4].async.reductions(+)
/// print(await Array(runningTotal))
///
/// // prints [1, 3, 6, 10]
/// ```
///
/// - Parameter transform: A closure that combines the previously reduced
/// result and the next element in the receiving sequence.
/// - Returns: An asynchronous sequence of the reduced elements.
@inlinable
public func reductions(
_ transform: @Sendable @escaping (Element, Element) async throws -> Element
) -> AsyncThrowingInclusiveReductionsSequence<Self> {
AsyncThrowingInclusiveReductionsSequence(self, transform: transform)
}
}

@frozen
public struct AsyncThrowingInclusiveReductionsSequence<Base: AsyncSequence> {
@usableFromInline
let base: Base

@usableFromInline
let transform: @Sendable (Base.Element, Base.Element) async throws -> Base.Element

@inlinable
init(_ base: Base, transform: @Sendable @escaping (Base.Element, Base.Element) async throws -> Base.Element) {
self.base = base
self.transform = transform
}
}

extension AsyncThrowingInclusiveReductionsSequence: AsyncSequence {
public typealias Element = Base.Element

@frozen
public struct Iterator: AsyncIteratorProtocol {
@usableFromInline
internal var iterator: Base.AsyncIterator?

@usableFromInline
internal var element: Base.Element?

@usableFromInline
internal let transform: @Sendable (Base.Element, Base.Element) async throws -> Base.Element

@inlinable
internal init(
_ iterator: Base.AsyncIterator,
transform: @Sendable @escaping (Base.Element, Base.Element) async throws -> Base.Element
) {
self.iterator = iterator
self.transform = transform
}

@inlinable
public mutating func next() async throws -> Base.Element? {
guard let previous = element else {
element = try await iterator?.next()
return element
}
guard let next = try await iterator?.next() else { return nil }
do {
element = try await transform(previous, next)
} catch {
iterator = nil
throw error
}
return element
}
}

@inlinable
public func makeAsyncIterator() -> Iterator {
Iterator(base.makeAsyncIterator(), transform: transform)
}
}

extension AsyncThrowingInclusiveReductionsSequence: Sendable where Base: Sendable { }
extension AsyncThrowingInclusiveReductionsSequence.Iterator: Sendable where Base.AsyncIterator: Sendable, Base.Element: Sendable { }
66 changes: 66 additions & 0 deletions Tests/AsyncAlgorithmsTests/TestReductions.swift
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,18 @@ final class TestReductions: XCTestCase {
XCTAssertNil(pastEnd)
}

func test_inclusive_reductions() async {
let sequence = [1, 2, 3, 4].async.reductions { $0 + $1 }
var iterator = sequence.makeAsyncIterator()
var collected = [Int]()
while let item = await iterator.next() {
collected.append(item)
}
XCTAssertEqual(collected, [1, 3, 6, 10])
let pastEnd = await iterator.next()
XCTAssertNil(pastEnd)
}

func test_throw_upstream_reductions() async throws {
let sequence = [1, 2, 3, 4].async
.map { try throwOn(3, $0) }
Expand All @@ -48,6 +60,25 @@ final class TestReductions: XCTestCase {
XCTAssertNil(pastEnd)
}

func test_throw_upstream_inclusive_reductions() async throws {
let sequence = [1, 2, 3, 4].async
.map { try throwOn(3, $0) }
.reductions { $0 + $1 }
var iterator = sequence.makeAsyncIterator()
var collected = [Int]()
do {
while let item = try await iterator.next() {
collected.append(item)
}
XCTFail()
} catch {
XCTAssertEqual(error as? Failure, Failure())
}
XCTAssertEqual(collected, [1, 3])
let pastEnd = try await iterator.next()
XCTAssertNil(pastEnd)
}

func test_throwing_reductions() async throws {
let sequence = [1, 2, 3, 4].async.reductions("") { (partial, value) throws -> String in
partial + "\(value)"
Expand All @@ -62,6 +93,20 @@ final class TestReductions: XCTestCase {
XCTAssertNil(pastEnd)
}

func test_throwing_inclusive_reductions() async throws {
let sequence = [1, 2, 3, 4].async.reductions { (lhs, rhs) throws -> Int in
lhs + rhs
}
var iterator = sequence.makeAsyncIterator()
var collected = [Int]()
while let item = try await iterator.next() {
collected.append(item)
}
XCTAssertEqual(collected, [1, 3, 6, 10])
let pastEnd = try await iterator.next()
XCTAssertNil(pastEnd)
}

func test_throw_upstream_reductions_throws() async throws {
let sequence = [1, 2, 3, 4].async
.map { try throwOn(3, $0) }
Expand All @@ -83,6 +128,27 @@ final class TestReductions: XCTestCase {
XCTAssertNil(pastEnd)
}

func test_throw_upstream_inclusive_reductions_throws() async throws {
let sequence = [1, 2, 3, 4].async
.map { try throwOn(3, $0) }
.reductions { (lhs, rhs) throws -> Int in
lhs + rhs
}
var iterator = sequence.makeAsyncIterator()
var collected = [Int]()
do {
while let item = try await iterator.next() {
collected.append(item)
}
XCTFail()
} catch {
XCTAssertEqual(error as? Failure, Failure())
}
XCTAssertEqual(collected, [1, 3])
let pastEnd = try await iterator.next()
XCTAssertNil(pastEnd)
}

func test_reductions_into() async {
let sequence = [1, 2, 3, 4].async.reductions(into: "") { partial, value in
partial.append("\(value)")
Expand Down