diff --git a/Sources/Event.swift b/Sources/Event.swift index 19879912b..989711b29 100644 --- a/Sources/Event.swift +++ b/Sources/Event.swift @@ -699,6 +699,53 @@ extension Signal.Event { } } + internal static func lazyMap(on scheduler: Scheduler, transform: @escaping (Value) -> U) -> Transformation { + return { action, lifetime in + let box = Atomic(nil) + let completionDisposable = SerialDisposable() + let valueDisposable = SerialDisposable() + + lifetime += valueDisposable + lifetime += completionDisposable + + lifetime.observeEnded { + scheduler.schedule { + action(.interrupted) + } + } + + return { event in + switch event { + case let .value(value): + // Schedule only when there is no prior outstanding value. + if box.swap(value) == nil { + valueDisposable.inner = scheduler.schedule { + if let value = box.swap(nil) { + action(.value(transform(value))) + } + } + } + + case .completed, .failed: + // Completion and failure should not discard the outstanding + // value. + completionDisposable.inner = scheduler.schedule { + action(event.map(transform)) + } + + case .interrupted: + // `interrupted` overrides any outstanding value and any + // scheduled completion/failure. + valueDisposable.dispose() + completionDisposable.dispose() + scheduler.schedule { + action(.interrupted) + } + } + } + } + } + internal static func delay(_ interval: TimeInterval, on scheduler: DateScheduler) -> Transformation { precondition(interval >= 0) diff --git a/Sources/Signal.swift b/Sources/Signal.swift index 339d1cf03..8a496e549 100644 --- a/Sources/Signal.swift +++ b/Sources/Signal.swift @@ -595,10 +595,7 @@ extension Signal { /// - returns: A signal that sends values obtained using `transform` as this /// signal sends values. public func lazyMap(on scheduler: Scheduler, transform: @escaping (Value) -> U) -> Signal { - return flatMap(.latest) { value in - return SignalProducer({ transform(value) }) - .start(on: scheduler) - } + return flatMapEvent(Signal.Event.lazyMap(on: scheduler, transform: transform)) } /// Preserve only values which pass the given closure. diff --git a/Sources/SignalProducer.swift b/Sources/SignalProducer.swift index 241db4250..857383df2 100644 --- a/Sources/SignalProducer.swift +++ b/Sources/SignalProducer.swift @@ -895,7 +895,7 @@ extension SignalProducer { /// - returns: A producer that, when started, sends values obtained using /// `transform` as this producer sends values. public func lazyMap(on scheduler: Scheduler, transform: @escaping (Value) -> U) -> SignalProducer { - return lift { $0.lazyMap(on: scheduler, transform: transform) } + return core.flatMapEvent(Signal.Event.lazyMap(on: scheduler, transform: transform)) } /// Preserve only values which pass the given closure. diff --git a/Tests/ReactiveSwiftTests/SignalProducerLiftingSpec.swift b/Tests/ReactiveSwiftTests/SignalProducerLiftingSpec.swift index a60faced4..5d4728f64 100644 --- a/Tests/ReactiveSwiftTests/SignalProducerLiftingSpec.swift +++ b/Tests/ReactiveSwiftTests/SignalProducerLiftingSpec.swift @@ -313,7 +313,11 @@ class SignalProducerLiftingSpec: QuickSpec { } it("should interrupt ASAP and discard outstanding events") { - testAsyncASAPInterruption { $0.lazyMap(on: $1) { $0 } } + testAsyncASAPInterruption(op: "lazyMap") { $0.lazyMap(on: $1) { $0 } } + } + + it("should interrupt on the given scheduler") { + testAsyncInterruptionScheduler(op: "lazyMap") { $0.lazyMap(on: $1) { $0 } } } } @@ -1133,7 +1137,11 @@ class SignalProducerLiftingSpec: QuickSpec { } it("should interrupt ASAP and discard outstanding events") { - testAsyncASAPInterruption { $0.observe(on: $1) } + testAsyncASAPInterruption(op: "observe(on:)") { $0.observe(on: $1) } + } + + it("should interrupt on the given scheduler") { + testAsyncInterruptionScheduler(op: "observe(on:)") { $0.observe(on: $1) } } } @@ -1200,7 +1208,11 @@ class SignalProducerLiftingSpec: QuickSpec { } it("should interrupt ASAP and discard outstanding events") { - testAsyncASAPInterruption { $0.delay(10.0, on: $1) } + testAsyncASAPInterruption(op: "delay") { $0.delay(10.0, on: $1) } + } + + it("should interrupt on the given scheduler") { + testAsyncInterruptionScheduler(op: "delay") { $0.delay(10.0, on: $1) } } } @@ -1298,13 +1310,21 @@ class SignalProducerLiftingSpec: QuickSpec { } it("should interrupt ASAP and discard outstanding events") { - testAsyncASAPInterruption { $0.throttle(10.0, on: $1) } + testAsyncASAPInterruption(op: "throttle") { $0.throttle(10.0, on: $1) } + } + + it("should interrupt on the given scheduler") { + testAsyncInterruptionScheduler(op: "throttle") { $0.throttle(10.0, on: $1) } } } describe("debounce") { it("should interrupt ASAP and discard outstanding events") { - testAsyncASAPInterruption { $0.delay(10.0, on: $1) } + testAsyncASAPInterruption(op: "debounce") { $0.debounce(10.0, on: $1) } + } + + it("should interrupt on the given scheduler") { + testAsyncInterruptionScheduler(op: "debounce") { $0.debounce(10.0, on: $1) } } } @@ -2036,7 +2056,36 @@ class SignalProducerLiftingSpec: QuickSpec { } } +private func testAsyncInterruptionScheduler( + op: String, + file: FileString = #file, + line: UInt = #line, + transform: (SignalProducer, TestScheduler) -> SignalProducer +) { + var isInterrupted = false + + let scheduler = TestScheduler() + let producer = transform(SignalProducer(0 ..< 128), scheduler) + + let failedExpectations = gatherFailingExpectations { + let disposable = producer.startWithInterrupted { isInterrupted = true } + expect(isInterrupted) == false + + disposable.dispose() + expect(isInterrupted) == false + + scheduler.run() + expect(isInterrupted) == true + } + + if !failedExpectations.isEmpty { + fail("The async operator `\(op)` does not interrupt on the appropriate scheduler.", + location: SourceLocation(file: file, line: line)) + } +} + private func testAsyncASAPInterruption( + op: String, file: FileString = #file, line: UInt = #line, transform: (SignalProducer, TestScheduler) -> SignalProducer @@ -2073,7 +2122,7 @@ private func testAsyncASAPInterruption( } if !failedExpectations.isEmpty { - fail("The ASAP interruption test of the async operator has failed.", + fail("The ASAP interruption test of the async operator `\(op)` has failed.", location: SourceLocation(file: file, line: line)) } }