Skip to content

Commit

Permalink
The Once FlattenStrategy.
Browse files Browse the repository at this point in the history
  • Loading branch information
andersio committed Sep 11, 2017
1 parent 7b23858 commit c322e17
Show file tree
Hide file tree
Showing 2 changed files with 172 additions and 0 deletions.
26 changes: 26 additions & 0 deletions Sources/Flatten.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ public struct FlattenStrategy {
case concurrent(limit: UInt)
case latest
case race
case once
}

fileprivate let kind: Kind
Expand Down Expand Up @@ -98,6 +99,19 @@ public struct FlattenStrategy {
/// Any failure from the inner streams is propagated immediately to the flattened
/// stream of values.
public static let race = FlattenStrategy(kind: .race)

/// Forward only events from the first inner stream received. The stream of streams
/// would be interrupted upon the said stream being started.
///
/// The flattened stream of values completes when the stream of streams completes
/// without emitting any inner stream, or when the sole inner stream completes.
///
/// Any interruption of inner streams is propagated immediately to the flattened
/// stream of values.
///
/// Any failure from the inner streams is propagated immediately to the flattened
/// stream of values.
public static let once = FlattenStrategy(kind: .once)
}

extension Signal where Value: SignalProducerConvertible, Error == Value.Error {
Expand All @@ -122,6 +136,9 @@ extension Signal where Value: SignalProducerConvertible, Error == Value.Error {

case .race:
return self.race()

case .once:
return self.take(first: 1).race()
}
}
}
Expand Down Expand Up @@ -164,6 +181,9 @@ extension Signal where Value: SignalProducerConvertible, Error == NoError, Value

case .race:
return self.race()

case .once:
return self.take(first: 1).race()
}
}
}
Expand Down Expand Up @@ -207,6 +227,9 @@ extension SignalProducer where Value: SignalProducerConvertible, Error == Value.

case .race:
return self.race()

case .once:
return self.take(first: 1).race()
}
}
}
Expand Down Expand Up @@ -249,6 +272,9 @@ extension SignalProducer where Value: SignalProducerConvertible, Error == NoErro

case .race:
return self.race()

case .once:
return self.take(first: 1).race()
}
}
}
Expand Down
146 changes: 146 additions & 0 deletions Tests/ReactiveSwiftTests/FlattenSpec.swift
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ class FlattenSpec: QuickSpec {
describeSignalFlattenDisposal(.concat, name: "concat")
describeSignalFlattenDisposal(.concurrent(limit: 1024), name: "concurrent(limit: 1024)")
describeSignalFlattenDisposal(.race, name: "race")

// The `once` strategy have a different slightly behavior that does not match
// the above test cases.
}

func describeSignalProducerFlattenDisposal(_ flattenStrategy: FlattenStrategy, name: String) {
Expand All @@ -95,6 +98,7 @@ class FlattenSpec: QuickSpec {
describeSignalProducerFlattenDisposal(.concat, name: "concat")
describeSignalProducerFlattenDisposal(.concurrent(limit: 1024), name: "concurrent(limit: 1024)")
describeSignalProducerFlattenDisposal(.race, name: "race")
describeSignalProducerFlattenDisposal(.once, name: "once")
}

describe("Signal.flatten()") {
Expand Down Expand Up @@ -1061,5 +1065,147 @@ class FlattenSpec: QuickSpec {
run { $0.start(on: scheduler) }
}
}

describe("FlattenStrategy.once") {
it("should complete immediately if the producer of streams completes without emitting any stream") {
let (signal, observer) = Signal<SignalProducer<(), NoError>, NoError>.pipe()
var isCompleted = false

signal
.flatten(.once)
.observeCompleted { isCompleted = true }

expect(isCompleted) == false

observer.sendCompleted()
expect(isCompleted) == true
}

it("should complete immediately if the producer of streams completes without emitting any stream") {
let (signal, observer) = Signal<SignalProducer<(), NoError>, NoError>.pipe()
let producer = SignalProducer<SignalProducer<(), NoError>, NoError>(signal)
var isCompleted = false

producer
.flatten(.once)
.startWithCompleted { isCompleted = true }

expect(isCompleted) == false

observer.sendCompleted()
expect(isCompleted) == true
}

it("should interrupt immediately if the producer of streams interrupts without emitting any stream") {
let (signal, observer) = Signal<SignalProducer<(), NoError>, NoError>.pipe()
var isInterrupted = false

signal
.flatten(.once)
.observeInterrupted { isInterrupted = true }

expect(isInterrupted) == false

observer.sendInterrupted()
expect(isInterrupted) == true
}

it("should interrupt immediately if the producer of streams interrupts without emitting any stream") {
let (signal, observer) = Signal<SignalProducer<(), NoError>, NoError>.pipe()
let producer = SignalProducer<SignalProducer<(), NoError>, NoError>(signal)
var isInterrupted = false

producer
.flatten(.once)
.startWithInterrupted { isInterrupted = true }

expect(isInterrupted) == false

observer.sendInterrupted()
expect(isInterrupted) == true
}

it("should fail immediately if the producer of streams fails without emitting any stream") {
let (signal, observer) = Signal<SignalProducer<(), NoError>, TestError>.pipe()
var error: TestError?

signal
.flatten(.once)
.observeFailed { error = $0 }

expect(error).to(beNil())

observer.send(error: .default)
expect(error) == .default
}

it("should fail immediately if the producer of streams fails without emitting any stream") {
let (signal, observer) = Signal<SignalProducer<(), NoError>, TestError>.pipe()
let producer = SignalProducer<SignalProducer<(), NoError>, TestError>(signal)
var error: TestError?

producer
.flatten(.once)
.startWithFailed { error = $0 }

expect(error).to(beNil())

observer.send(error: .default)
expect(error) == .default
}

it("should dispose of the signal of streams immediately when an inner stream is received") {
let disposable = AnyDisposable()
var observer: Signal<SignalProducer<(), NoError>, NoError>.Observer!

func make() -> Signal<SignalProducer<(), NoError>, NoError> {
let (signal, innerObserver) = Signal<SignalProducer<(), NoError>, NoError>.pipe(disposable: disposable)
observer = innerObserver
return signal
}

make()
.flatten(.once)
.observe(Signal.Observer())

expect(disposable.isDisposed) == false

observer.send(value: SignalProducer(value: ()))
expect(disposable.isDisposed) == true
}

it("should interrupt the producer of streams immediately when an inner stream is received") {
let (signal, observer) = Signal<SignalProducer<(), NoError>, NoError>.pipe()
let producer = SignalProducer<SignalProducer<(), NoError>, NoError>(signal)

var isProducerOfStreamsInterrupted = false

producer
.on(interrupted: { isProducerOfStreamsInterrupted = true })
.flatten(.once)
.start()

expect(isProducerOfStreamsInterrupted) == false

observer.send(value: SignalProducer(value: ()))
expect(isProducerOfStreamsInterrupted) == true
}

it("should forward the values from the received inner producer") {
let (signal, observer) = Signal<SignalProducer<Int, NoError>, NoError>.pipe()
let producer = SignalProducer<SignalProducer<Int, NoError>, NoError>(signal)

var values: [Int] = []

producer
.flatten(.once)
.startWithValues { values.append($0) }

expect(values) == []

observer.send(value: SignalProducer(CountableRange(0 ..< 10)))
expect(values) == Array(CountableRange(0 ..< 10))
}
}
}
}

0 comments on commit c322e17

Please sign in to comment.