Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 24 additions & 23 deletions Sources/Flatten.swift
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,32 @@
import enum Result.NoError

/// Describes how multiple producers should be joined together.
public enum FlattenStrategy: Equatable {
public struct FlattenStrategy {
fileprivate enum Kind {
case concurrent(limit: UInt)
case latest
case race
}

fileprivate let kind: Kind

private init(kind: Kind) {
self.kind = kind
}

/// The producers should be merged, so that any value received on any of the
/// input producers will be forwarded immediately to the output producer.
///
/// The resulting producer will complete only when all inputs have
/// completed.
public static let merge = FlattenStrategy.concurrent(limit: .max)
public static let merge = FlattenStrategy(kind: .concurrent(limit: .max))

/// The producers should be concatenated, so that their values are sent in
/// the order of the producers themselves.
///
/// The resulting producer will complete only when all inputs have
/// completed.
public static let concat = FlattenStrategy.concurrent(limit: 1)
public static let concat = FlattenStrategy(kind: .concurrent(limit: 1))

/// The producers should be merged, but only up to the given limit at any
/// point of time, so that any value received on any of the input producers
Expand All @@ -35,15 +47,17 @@ public enum FlattenStrategy: Equatable {
/// completed.
///
/// - precondition: `limit > 0`.
case concurrent(limit: UInt)
public static func concurrent(limit: UInt) -> FlattenStrategy {
return FlattenStrategy(kind: .concurrent(limit: limit))
}

/// Only the events from the latest input producer should be considered for
/// the output. Any producers received before that point will be disposed
/// of.
///
/// The resulting producer will complete only when the producer-of-producers
/// and the latest producer has completed.
case latest
public static let latest = FlattenStrategy(kind: .latest)

/// Only the events from the "first input producer to send an event" (winning producer)
/// should be considered for the output.
Expand All @@ -53,20 +67,7 @@ public enum FlattenStrategy: Equatable {
/// The resulting producer will complete when:
/// 1. The producer-of-producers and the first "alive" producer has completed.
/// 2. The producer-of-producers has completed without inner producer being "alive".
case race

public static func ==(left: FlattenStrategy, right: FlattenStrategy) -> Bool {
switch (left, right) {
case (.latest, .latest):
return true

case (.concurrent(let leftLimit), .concurrent(let rightLimit)):
return leftLimit == rightLimit

default:
return false
}
}
public static let race = FlattenStrategy(kind: .race)
}

extension Signal where Value: SignalProducerConvertible, Error == Value.Error {
Expand All @@ -82,7 +83,7 @@ extension Signal where Value: SignalProducerConvertible, Error == Value.Error {
/// - parameters:
/// - strategy: Strategy used when flattening signals.
public func flatten(_ strategy: FlattenStrategy) -> Signal<Value.Value, Error> {
switch strategy {
switch strategy.kind {
case .concurrent(let limit):
return self.concurrent(limit: limit)

Expand Down Expand Up @@ -124,7 +125,7 @@ extension Signal where Value: SignalProducerConvertible, Error == NoError, Value
/// - parameters:
/// - strategy: Strategy used when flattening signals.
public func flatten(_ strategy: FlattenStrategy) -> Signal<Value.Value, Value.Error> {
switch strategy {
switch strategy.kind {
case .concurrent(let limit):
return self.concurrent(limit: limit)

Expand Down Expand Up @@ -167,7 +168,7 @@ extension SignalProducer where Value: SignalProducerConvertible, Error == Value.
/// - parameters:
/// - strategy: Strategy used when flattening signals.
public func flatten(_ strategy: FlattenStrategy) -> SignalProducer<Value.Value, Error> {
switch strategy {
switch strategy.kind {
case .concurrent(let limit):
return self.concurrent(limit: limit)

Expand Down Expand Up @@ -209,7 +210,7 @@ extension SignalProducer where Value: SignalProducerConvertible, Error == NoErro
/// - parameters:
/// - strategy: Strategy used when flattening signals.
public func flatten(_ strategy: FlattenStrategy) -> SignalProducer<Value.Value, Value.Error> {
switch strategy {
switch strategy.kind {
case .concurrent(let limit):
return self.concurrent(limit: limit)

Expand Down