Skip to content

Commit

Permalink
Lifetime-based producer resource management.
Browse files Browse the repository at this point in the history
  • Loading branch information
andersio committed May 13, 2017
1 parent aa6f6a7 commit 3614fc9
Show file tree
Hide file tree
Showing 8 changed files with 141 additions and 114 deletions.
6 changes: 3 additions & 3 deletions Sources/Action.swift
Expand Up @@ -162,7 +162,7 @@ public final class Action<Input, Output, Error: Swift.Error> {
/// - returns: A producer that forwards events generated by its started unit of work,
/// or emits `ActionError.disabled` if the execution attempt is failed.
public func apply(_ input: Input) -> SignalProducer<Output, ActionError<Error>> {
return SignalProducer { observer, disposable in
return SignalProducer { observer, lifetime in
let startingState = self.state.modify { state -> Any? in
if state.isEnabled {
state.isExecuting = true
Expand All @@ -179,15 +179,15 @@ public final class Action<Input, Output, Error: Swift.Error> {
}

self.executeClosure(state, input).startWithSignal { signal, signalDisposable in
disposable += signalDisposable
lifetime.observeEnded(signalDisposable.dispose)

signal.observe { event in
observer.action(event.mapError(ActionError.producerFailed))
self.eventsObserver.send(value: event)
}
}

disposable += {
lifetime.observeEnded {
self.state.modify {
$0.isExecuting = false
}
Expand Down
21 changes: 21 additions & 0 deletions Sources/Deprecations+Removals.swift
Expand Up @@ -3,6 +3,27 @@ import Dispatch
import enum Result.NoError

// MARK: Unavailable methods in ReactiveSwift 2.0.
extension Lifetime {
@available(*, unavailable, renamed:"hasEnded")
public var isDisposed: Bool { fatalError() }

@discardableResult
@available(*, deprecated, message:"Use `observeEnded(_:)` with a method reference to `dispose()` instead. This method is subject to removal in a future release.")
public func add(_ d: Disposable?) -> Disposable? {
return d.flatMap { observeEnded($0.dispose) }
}

@discardableResult
@available(*, deprecated, message:"Use `observeEnded(_:)` with a method reference to `dispose()` instead. This operator overload is subject to removal in a future release.")
public static func += (left: Lifetime, right: Disposable?) -> Disposable? {
return right.flatMap { left.observeEnded($0.dispose) }
}

@discardableResult
@available(*, unavailable, message:"Use `observeEnded(_:)` instead.")
public static func += (left: Lifetime, right: () -> Void) -> Disposable? { fatalError() }
}

extension PropertyProtocol {
@available(*, unavailable, renamed:"flatMap(_:_:)")
public func flatMap<P: PropertyProtocol>(_ strategy: FlattenStrategy, transform: @escaping (Value) -> P) -> Property<P.Value> { fatalError() }
Expand Down
34 changes: 21 additions & 13 deletions Sources/Flatten.swift
Expand Up @@ -509,11 +509,13 @@ extension SignalProducer where Value: SignalProducerProtocol, Error == Value.Err
fileprivate func concurrent(limit: UInt) -> SignalProducer<Value.Value, Error> {
precondition(limit > 0, "The concurrent limit must be greater than zero.")

return SignalProducer<Value.Value, Error> { relayObserver, disposable in
return SignalProducer<Value.Value, Error> { relayObserver, lifetime in
self.startWithSignal { signal, signalDisposable in
disposable += signalDisposable
let disposables = CompositeDisposable()
lifetime.observeEnded(signalDisposable.dispose)
lifetime.observeEnded(disposables.dispose)

_ = signal.observeConcurrent(relayObserver, limit, disposable)
_ = signal.observeConcurrent(relayObserver, limit, disposables)
}
}
}
Expand Down Expand Up @@ -767,13 +769,16 @@ extension SignalProducer where Value: SignalProducerProtocol, Error == Value.Err
/// - returns: A signal that forwards values from the latest signal sent on
/// `signal`, ignoring values sent on previous inner signal.
fileprivate func switchToLatest() -> SignalProducer<Value.Value, Error> {
return SignalProducer<Value.Value, Error> { observer, disposable in
return SignalProducer<Value.Value, Error> { observer, lifetime in
let latestInnerDisposable = SerialDisposable()
disposable += latestInnerDisposable
lifetime.observeEnded(latestInnerDisposable.dispose)

self.startWithSignal { signal, signalDisposable in
disposable += signalDisposable
disposable += signal.observeSwitchToLatest(observer, latestInnerDisposable)
lifetime.observeEnded(signalDisposable.dispose)

if let disposable = signal.observeSwitchToLatest(observer, latestInnerDisposable) {
lifetime.observeEnded(disposable.dispose)
}
}
}
}
Expand Down Expand Up @@ -890,13 +895,16 @@ extension SignalProducer where Value: SignalProducerProtocol, Error == Value.Err
///
/// The returned producer completes when `self` and the winning inner producer have both completed.
fileprivate func race() -> SignalProducer<Value.Value, Error> {
return SignalProducer<Value.Value, Error> { observer, disposable in
return SignalProducer<Value.Value, Error> { observer, lifetime in
let relayDisposable = CompositeDisposable()
disposable += relayDisposable
lifetime.observeEnded(relayDisposable.dispose)

self.startWithSignal { signal, signalDisposable in
disposable += signalDisposable
disposable += signal.observeRace(observer, relayDisposable)
lifetime.observeEnded(signalDisposable.dispose)

if let disposable = signal.observeRace(observer, relayDisposable) {
lifetime.observeEnded(disposable.dispose)
}
}
}
}
Expand Down Expand Up @@ -1216,9 +1224,9 @@ extension SignalProducer {
/// - transform: A closure that accepts emitted error and returns a signal
/// producer with a different type of error.
public func flatMapError<F>(_ transform: @escaping (Error) -> SignalProducer<Value, F>) -> SignalProducer<Value, F> {
return SignalProducer<Value, F> { observer, disposable in
return SignalProducer<Value, F> { observer, lifetime in
let serialDisposable = SerialDisposable()
disposable += serialDisposable
lifetime.observeEnded(serialDisposable.dispose)

self.startWithSignal { signal, signalDisposable in
serialDisposable.inner = signalDisposable
Expand Down
6 changes: 2 additions & 4 deletions Sources/FoundationExtensions.swift
Expand Up @@ -63,7 +63,7 @@ extension Reactive where Base: URLSession {
/// side error (i.e. when a response with status code other than
/// 200...299 is received).
public func data(with request: URLRequest) -> SignalProducer<(Data, URLResponse), AnyError> {
return SignalProducer { [base = self.base] observer, disposable in
return SignalProducer { [base = self.base] observer, lifetime in
let task = base.dataTask(with: request) { data, response, error in
if let data = data, let response = response {
observer.send(value: (data, response))
Expand All @@ -73,9 +73,7 @@ extension Reactive where Base: URLSession {
}
}

disposable += {
task.cancel()
}
lifetime.observeEnded(task.cancel)
task.resume()
}
}
Expand Down
13 changes: 8 additions & 5 deletions Sources/Property.swift
Expand Up @@ -518,9 +518,10 @@ public final class Property<Value>: PropertyProtocol {
/// - values: A producer that will start immediately and send values to
/// the property.
public convenience init(initial: Value, then values: SignalProducer<Value, NoError>) {
self.init(unsafeProducer: SignalProducer { observer, disposables in
self.init(unsafeProducer: SignalProducer { observer, lifetime in
observer.send(value: initial)
disposables += values.start(Observer(mappingInterruptedToCompleted: observer))
let disposable = values.start(Observer(mappingInterruptedToCompleted: observer))
lifetime.observeEnded(disposable.dispose)
})
}

Expand Down Expand Up @@ -630,10 +631,12 @@ public final class MutableProperty<Value>: ComposableMutablePropertyProtocol {
/// followed by all changes over time, then complete when the property has
/// deinitialized.
public var producer: SignalProducer<Value, NoError> {
return SignalProducer { [atomic, signal] producerObserver, producerDisposable in
return SignalProducer { [atomic, signal] observer, lifetime in
atomic.withValue { value in
producerObserver.send(value: value)
producerDisposable += signal.observe(Observer(mappingInterruptedToCompleted: producerObserver))
observer.send(value: value)
if let disposable = signal.observe(Observer(mappingInterruptedToCompleted: observer)) {
lifetime.observeEnded(disposable.dispose)
}
}
}
}
Expand Down

0 comments on commit 3614fc9

Please sign in to comment.