Skip to content

Commit

Permalink
Use DisposableCollector in the producer start handler.
Browse files Browse the repository at this point in the history
  • Loading branch information
andersio committed Apr 19, 2017
1 parent 5f62dd1 commit 5d01764
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 13 deletions.
2 changes: 1 addition & 1 deletion Sources/Flatten.swift
Expand Up @@ -438,7 +438,7 @@ extension Signal where Value: SignalProducerProtocol, Error == Value.Error {
}
}

fileprivate func observeConcurrent(_ observer: ReactiveSwift.Observer<Value.Value, Error>, _ limit: UInt, _ disposable: CompositeDisposable) -> Disposable? {
fileprivate func observeConcurrent(_ observer: ReactiveSwift.Observer<Value.Value, Error>, _ limit: UInt, _ disposable: DisposableCollector) -> Disposable? {
let state = Atomic(ConcurrentFlattenState<Value.Value, Error>(limit: limit))

func startNextIfNeeded() {
Expand Down
9 changes: 6 additions & 3 deletions Sources/SignalProducer.swift
Expand Up @@ -19,7 +19,7 @@ import Result
public struct SignalProducer<Value, Error: Swift.Error> {
public typealias ProducedSignal = Signal<Value, Error>

private let startHandler: (Signal<Value, Error>.Observer, CompositeDisposable) -> Void
private let startHandler: (Signal<Value, Error>.Observer, DisposableCollector) -> Void

/// Initializes a `SignalProducer` that will emit the same events as the
/// given signal.
Expand Down Expand Up @@ -48,7 +48,7 @@ public struct SignalProducer<Value, Error: Swift.Error> {
///
/// - parameters:
/// - startHandler: A closure that accepts observer and a disposable.
public init(_ startHandler: @escaping (Signal<Value, Error>.Observer, CompositeDisposable) -> Void) {
public init(_ startHandler: @escaping (Signal<Value, Error>.Observer, DisposableCollector) -> Void) {
self.startHandler = startHandler
}

Expand Down Expand Up @@ -118,7 +118,10 @@ public struct SignalProducer<Value, Error: Swift.Error> {
/// - values: A sequence of values that a `Signal` will send as separate
/// `value` events and then complete.
public init<S: Sequence>(_ values: S) where S.Iterator.Element == Value {
self.init { observer, disposable in
self.init { observer, collector in
let disposable = SimpleDisposable()
collector += disposable

for value in values {
observer.send(value: value)

Expand Down
24 changes: 15 additions & 9 deletions Tests/ReactiveSwiftTests/SignalProducerSpec.swift
Expand Up @@ -31,24 +31,24 @@ class SignalProducerSpec: QuickSpec {
expect(handlerCalledTimes) == 2
}

it("should not release signal observers when given disposable is disposed") {
it("should release signal observers when given disposable is disposed") {
var disposable: Disposable!

let producer = SignalProducer<Int, NoError> { observer, innerDisposable in
disposable = innerDisposable

innerDisposable += {
let producer = SignalProducer<Int, NoError> { observer, collector in
collector += {
// This is necessary to keep the observer long enough to
// even test the memory management.
observer.send(value: 0)
}
}

weak var objectRetainedByObserver: NSObject?
producer.startWithSignal { signal, _ in
producer.startWithSignal { signal, interrupter in
let object = NSObject()
objectRetainedByObserver = object
signal.observeValues { _ in _ = object }

disposable = interrupter
}

expect(objectRetainedByObserver).toNot(beNil())
Expand All @@ -62,7 +62,12 @@ class SignalProducerSpec: QuickSpec {
//
// After #2959, the object is still retained, since the observation
// keeps the signal alive.
expect(objectRetainedByObserver).toNot(beNil())
//
// With the introduction of `DisposableCollector`, the producer disposable
// collector is no longer disposable. Hence, this becomes `nil` as the
// test case now interrupts the producer, causing all observers to be
// released.
expect(objectRetainedByObserver).to(beNil())
}

it("should dispose of added disposables upon completion") {
Expand Down Expand Up @@ -2169,9 +2174,10 @@ class SignalProducerSpec: QuickSpec {

describe("observeOn") {
it("should immediately cancel upstream producer's work when disposed") {
var upstreamDisposable: Disposable!
let upstreamDisposable = SimpleDisposable()

let producer = SignalProducer<(), NoError>{ _, innerDisposable in
upstreamDisposable = innerDisposable
innerDisposable += upstreamDisposable
}

var downstreamDisposable: Disposable!
Expand Down

0 comments on commit 5d01764

Please sign in to comment.