Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactored & Bugfix: startWithSignal and the hot-to-cold lift. #106

Merged
merged 4 commits into from
Nov 28, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions Sources/Signal.swift
Original file line number Diff line number Diff line change
Expand Up @@ -128,14 +128,19 @@ public final class Signal<Value, Error: Swift.Error> {
/// observer.
///
/// - note: The Signal will remain alive until a terminating event is sent
/// to the observer.
/// to the observer, or until it has no observer if it is not
/// retained.
///
/// - parameters:
/// - disposable: An optional disposable to associate with the signal, and
/// to be disposed of when the signal terminates.
///
/// - returns: A tuple made of signal and observer.
public static func pipe() -> (Signal, Observer) {
public static func pipe(disposable: Disposable? = nil) -> (Signal, Observer) {
var observer: Observer!
let signal = self.init { innerObserver in
observer = innerObserver
return nil
return disposable
}

return (signal, observer)
Expand Down
42 changes: 5 additions & 37 deletions Sources/SignalProducer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -172,36 +172,23 @@ public struct SignalProducer<Value, Error: Swift.Error> {
/// - parameters:
/// - setUp: A closure that accepts a `signal` and `interrupter`.
public func startWithSignal(_ setup: (_ signal: Signal<Value, Error>, _ interrupter: Disposable) -> Void) {
let (signal, observer) = Signal<Value, Error>.pipe()

// Disposes of the work associated with the SignalProducer and any
// upstream producers.
let producerDisposable = CompositeDisposable()

let (signal, observer) = Signal<Value, Error>.pipe(disposable: producerDisposable)

// Directly disposed of when `start()` or `startWithSignal()` is
// disposed.
let cancelDisposable = ActionDisposable {
observer.sendInterrupted()
producerDisposable.dispose()
}
let cancelDisposable = ActionDisposable(action: observer.sendInterrupted)

setup(signal, cancelDisposable)

if cancelDisposable.isDisposed {
return
}

let wrapperObserver: Signal<Value, Error>.Observer = Observer { event in
observer.action(event)

if event.isTerminating {
// Dispose only after notifying the Signal, so disposal
// logic is consistently the last thing to run.
producerDisposable.dispose()
}
}

startHandler(wrapperObserver, producerDisposable)
startHandler(observer, producerDisposable)
}
}

Expand Down Expand Up @@ -445,7 +432,6 @@ extension SignalProducerProtocol {
}
}
}


/// Lift a binary Signal operator to operate upon a Signal and a
/// SignalProducer instead.
Expand All @@ -462,27 +448,9 @@ extension SignalProducerProtocol {
/// `SignalProducer`.
public func lift<U, F, V, G>(_ transform: @escaping (Signal<Value, Error>) -> (Signal<U, F>) -> Signal<V, G>) -> (Signal<U, F>) -> SignalProducer<V, G> {
return { otherSignal in
return SignalProducer { observer, outerDisposable in
let (wrapperSignal, otherSignalObserver) = Signal<U, F>.pipe()

// Avoid memory leak caused by the direct use of the given
// signal.
//
// See https://github.com/ReactiveCocoa/ReactiveCocoa/pull/2758
// for the details.
outerDisposable += ActionDisposable {
otherSignalObserver.sendInterrupted()
}
outerDisposable += otherSignal.observe(otherSignalObserver)

self.startWithSignal { signal, disposable in
outerDisposable += disposable
outerDisposable += transform(signal)(wrapperSignal).observe(observer)
}
}
return self.liftRight(transform)(SignalProducer(signal: otherSignal))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh this is great

}
}


/// Map each value in the producer to a new value.
///
Expand Down
27 changes: 24 additions & 3 deletions Tests/ReactiveSwiftTests/SignalProducerSpec.swift
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,8 @@ class SignalProducerSpec: QuickSpec {
return
}

producer.startWithSignal { _, innerDisposable in
producer.startWithSignal { signal, innerDisposable in
signal.observe { _ in }
disposable = innerDisposable
}

Expand Down Expand Up @@ -432,7 +433,7 @@ class SignalProducerSpec: QuickSpec {
observer = incomingObserver
}

producer.startWithSignal { _ in }
producer.start()
expect(addedDisposable.isDisposed) == false

observer.sendCompleted()
Expand All @@ -448,12 +449,31 @@ class SignalProducerSpec: QuickSpec {
observer = incomingObserver
}

producer.startWithSignal { _ in }
producer.start()
expect(addedDisposable.isDisposed) == false

observer.send(error: .default)
expect(addedDisposable.isDisposed) == true
}

it("should dispose of the added disposable if the signal is unretained and unobserved upon exiting the scope") {
let addedDisposable = SimpleDisposable()

let producer = SignalProducer<Int, TestError> { _, disposable in
disposable += addedDisposable
}

var started = false
var disposed = false

producer
.on(started: { started = true }, disposed: { disposed = true })
.startWithSignal { _ in }

expect(started) == true
expect(disposed) == true
expect(addedDisposable.isDisposed) == true
}
}

describe("start") {
Expand Down Expand Up @@ -1970,6 +1990,7 @@ class SignalProducerSpec: QuickSpec {
producer
.observe(on: TestScheduler())
.startWithSignal { signal, innerDisposable in
signal.observe { _ in }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's this for?

Copy link
Member Author

@andersio andersio Nov 26, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Under the new signal lifetime semantic, a signal is kept alive only if it is retained or has active observers. So the observer is added to extend the signal's lifetime indefinitely for the termination event outside the closure scope, or otherwise the signal would dispose of itself upon exiting the closure.

The fixed bug is kinda related too - a produced signal would dispose of itself when it is not retained and not observed. But then the associated producer disposable would not follow.

downstreamDisposable = innerDisposable
}

Expand Down
10 changes: 10 additions & 0 deletions Tests/ReactiveSwiftTests/SignalSpec.swift
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,16 @@ class SignalSpec: QuickSpec {
expect(completed) == true
}

it("should dispose the supplied disposable when the signal terminates") {
let disposable = SimpleDisposable()
let (signal, observer) = Signal<(), NoError>.pipe(disposable: disposable)

expect(disposable.isDisposed) == false

observer.sendCompleted()
expect(disposable.isDisposed) == true
}

context("memory") {
it("should not crash allocating memory with a few observers") {
let (signal, _) = Signal<Int, NoError>.pipe()
Expand Down