Skip to content

Commit

Permalink
Implement Collect. Use ReduceProducer.
Browse files Browse the repository at this point in the history
  • Loading branch information
broadwaylamb committed Oct 9, 2019
1 parent 777d91e commit 22545f0
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 34 deletions.
34 changes: 0 additions & 34 deletions RemainingCombineInterface.swift
Original file line number Diff line number Diff line change
Expand Up @@ -543,31 +543,6 @@ extension Publishers {
public func receive<S>(subscriber: S) where S : Subscriber, Upstream.Failure == S.Failure, S.Input == [Upstream.Output]
}

/// A publisher that buffers items.
public struct Collect<Upstream> : Publisher where Upstream : Publisher {

/// The kind of values published by this publisher.
public typealias Output = [Upstream.Output]

/// The kind of errors this publisher might publish.
///
/// Use `Never` if this `Publisher` does not publish errors.
public typealias Failure = Upstream.Failure

/// The publisher that this publisher receives elements from.
public let upstream: Upstream

public init(upstream: Upstream)

/// This function is called to attach the specified `Subscriber` to this `Publisher` by `subscribe(_:)`
///
/// - SeeAlso: `subscribe(_:)`
/// - Parameters:
/// - subscriber: The subscriber to attach to this `Publisher`.
/// once attached it can begin to receive values.
public func receive<S>(subscriber: S) where S : Subscriber, Upstream.Failure == S.Failure, S.Input == [Upstream.Output]
}

/// A publisher that buffers a maximum number of items.
public struct CollectByCount<Upstream> : Publisher where Upstream : Publisher {

Expand Down Expand Up @@ -599,15 +574,6 @@ extension Publishers {

extension Publisher {

/// Collects all received elements, and emits a single array of the collection when the upstream publisher finishes.
///
/// If the upstream publisher fails with an error, this publisher forwards the error to the downstream receiver instead of sending its output.
/// This publisher requests an unlimited number of elements from the upstream publisher. It only sends the collected array to its downstream after a request whose demand is greater than 0 items.
/// Note: This publisher uses an unbounded amount of memory to store the received values.
///
/// - Returns: A publisher that collects all received items and returns them as an array upon completion.
public func collect() -> Publishers.Collect<Self>

/// Collects up to the specified number of elements, and then emits a single array of the collection.
///
/// If the upstream publisher finishes before filling the buffer, this publisher sends an array of all the items it has received. This may be fewer than `count` elements.
Expand Down
78 changes: 78 additions & 0 deletions Sources/OpenCombine/Publishers/Publishers.Collect.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
//
// Publishers.Collect.swift
//
//
// Created by Sergej Jaskiewicz on 09.10.2019.
//

extension Publisher {

/// Collects all received elements, and emits a single array of the collection when
/// the upstream publisher finishes.
///
/// If the upstream publisher fails with an error, this publisher forwards the error
/// to the downstream receiver instead of sending its output.
/// This publisher requests an unlimited number of elements from the upstream
/// publisher. It only sends the collected array to its downstream after a request
/// whose demand is greater than 0 items.
/// Note: This publisher uses an unbounded amount of memory to store the received
/// values.
///
/// - Returns: A publisher that collects all received items and returns them as
/// an array upon completion.
public func collect() -> Publishers.Collect<Self> {
return .init(upstream: self)
}
}

extension Publishers {

/// A publisher that buffers items.
public struct Collect<Upstream: Publisher>: Publisher {

public typealias Output = [Upstream.Output]

public typealias Failure = Upstream.Failure

/// The publisher that this publisher receives elements from.
public let upstream: Upstream

public init(upstream: Upstream) {
self.upstream = upstream
}

public func receive<Downstream: Subscriber>(subscriber: Downstream)
where Upstream.Failure == Downstream.Failure,
Downstream.Input == [Upstream.Output]
{
upstream.subscribe(Inner(downstream: subscriber))
}
}
}

extension Publishers.Collect {
private class Inner<Downstream: Subscriber>
: ReduceProducer<Downstream,
Upstream.Output,
[Upstream.Output],
Upstream.Failure,
Void>
where Downstream.Input == [Upstream.Output],
Downstream.Failure == Upstream.Failure
{
fileprivate init(downstream: Downstream) {
super.init(downstream: downstream, initial: [], reduce: ())
}

override func receive(
newValue: Upstream.Output
) -> PartialCompletion<Void, Downstream.Failure> {
result!.append(newValue)
return .continue
}

override var description: String {
return "Collect"
}
}
}

0 comments on commit 22545f0

Please sign in to comment.