Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 19 additions & 25 deletions Sources/Property.swift
Original file line number Diff line number Diff line change
Expand Up @@ -522,10 +522,7 @@ public final class Property<Value>: PropertyProtocol {
///
/// - parameters:
/// - unsafeProducer: The composed producer for creating the property.
fileprivate init(
unsafeProducer: SignalProducer<Value, NoError>,
transform: ((Signal<Value, NoError>.Observer) -> Signal<Value, NoError>.Observer)? = nil
) {
fileprivate init(unsafeProducer: SignalProducer<Value, NoError>) {
// The ownership graph:
//
// ------------ weak ----------- strong ------------------
Expand All @@ -537,33 +534,30 @@ public final class Property<Value>: PropertyProtocol {
// strong ------------ ----------- strong ------------

let box = PropertyBox<Value?>(nil)
var relay: Signal<Value, NoError>!

unsafeProducer.startWithSignal { upstream, interruptHandle in
// A composed property tracks its active consumers through its relay signal, and
// interrupts `unsafeProducer` if the relay signal terminates.
let (signal, _observer) = Signal<Value, NoError>.pipe(disposable: interruptHandle)
let observer = transform?(_observer) ?? _observer
relay = signal
// A composed property tracks its active consumers through its relay signal, and
// interrupts `unsafeProducer` if the relay signal terminates.
let disposable = SerialDisposable()
let (relay, observer) = Signal<Value, NoError>.pipe(disposable: disposable)

disposable.inner = unsafeProducer.start { [weak box] event in
// `observer` receives `interrupted` only as a result of the termination of
// `signal`, and would not be delivered anyway. So transforming
// `interrupted` to `completed` is unnecessary here.
upstream.observe { [weak box] event in
guard let box = box else {
// Just forward the event, since no one owns the box or IOW no demand
// for a cached latest value.
return observer.send(event)
}

box.begin { storage in
storage.modify { value in
if let newValue = event.value {
value = newValue
}
guard let box = box else {
// Just forward the event, since no one owns the box or IOW no demand
// for a cached latest value.
return observer.send(event)
}

box.begin { storage in
storage.modify { value in
if let newValue = event.value {
value = newValue
}
observer.send(event)
}
observer.send(event)
}
}

Expand All @@ -576,10 +570,10 @@ public final class Property<Value>: PropertyProtocol {
_value = { box.value! }
signal = relay

producer = SignalProducer { [box, signal = relay!] observer, lifetime in
producer = SignalProducer { [box, relay] observer, lifetime in
box.withValue { value in
observer.send(value: value!)
lifetime += signal.observe(Signal.Observer(mappingInterruptedToCompleted: observer))
lifetime += relay.observe(Signal.Observer(mappingInterruptedToCompleted: observer))
}
}
}
Expand Down