From 67811a4d10083b895945659121234df65b231192 Mon Sep 17 00:00:00 2001 From: Anders Ha Date: Fri, 22 Dec 2017 22:01:29 +0800 Subject: [PATCH 1/2] Implement `lazyMap` as an event transformation. --- Sources/Event.swift | 47 ++++++++++++++++++++++++++++++++++++ Sources/Signal.swift | 5 +--- Sources/SignalProducer.swift | 2 +- 3 files changed, 49 insertions(+), 5 deletions(-) diff --git a/Sources/Event.swift b/Sources/Event.swift index 19879912b..989711b29 100644 --- a/Sources/Event.swift +++ b/Sources/Event.swift @@ -699,6 +699,53 @@ extension Signal.Event { } } + internal static func lazyMap(on scheduler: Scheduler, transform: @escaping (Value) -> U) -> Transformation { + return { action, lifetime in + let box = Atomic(nil) + let completionDisposable = SerialDisposable() + let valueDisposable = SerialDisposable() + + lifetime += valueDisposable + lifetime += completionDisposable + + lifetime.observeEnded { + scheduler.schedule { + action(.interrupted) + } + } + + return { event in + switch event { + case let .value(value): + // Schedule only when there is no prior outstanding value. + if box.swap(value) == nil { + valueDisposable.inner = scheduler.schedule { + if let value = box.swap(nil) { + action(.value(transform(value))) + } + } + } + + case .completed, .failed: + // Completion and failure should not discard the outstanding + // value. + completionDisposable.inner = scheduler.schedule { + action(event.map(transform)) + } + + case .interrupted: + // `interrupted` overrides any outstanding value and any + // scheduled completion/failure. + valueDisposable.dispose() + completionDisposable.dispose() + scheduler.schedule { + action(.interrupted) + } + } + } + } + } + internal static func delay(_ interval: TimeInterval, on scheduler: DateScheduler) -> Transformation { precondition(interval >= 0) diff --git a/Sources/Signal.swift b/Sources/Signal.swift index 339d1cf03..8a496e549 100644 --- a/Sources/Signal.swift +++ b/Sources/Signal.swift @@ -595,10 +595,7 @@ extension Signal { /// - returns: A signal that sends values obtained using `transform` as this /// signal sends values. public func lazyMap(on scheduler: Scheduler, transform: @escaping (Value) -> U) -> Signal { - return flatMap(.latest) { value in - return SignalProducer({ transform(value) }) - .start(on: scheduler) - } + return flatMapEvent(Signal.Event.lazyMap(on: scheduler, transform: transform)) } /// Preserve only values which pass the given closure. diff --git a/Sources/SignalProducer.swift b/Sources/SignalProducer.swift index 241db4250..857383df2 100644 --- a/Sources/SignalProducer.swift +++ b/Sources/SignalProducer.swift @@ -895,7 +895,7 @@ extension SignalProducer { /// - returns: A producer that, when started, sends values obtained using /// `transform` as this producer sends values. public func lazyMap(on scheduler: Scheduler, transform: @escaping (Value) -> U) -> SignalProducer { - return lift { $0.lazyMap(on: scheduler, transform: transform) } + return core.flatMapEvent(Signal.Event.lazyMap(on: scheduler, transform: transform)) } /// Preserve only values which pass the given closure. From c8e3a8cecdaf958f1f70f0d85fce80f17060bd76 Mon Sep 17 00:00:00 2001 From: Anders Ha Date: Sat, 23 Dec 2017 00:32:36 +0800 Subject: [PATCH 2/2] Add interruption scheduler test cases for the five async operators. --- .../SignalProducerLiftingSpec.swift | 61 +++++++++++++++++-- 1 file changed, 55 insertions(+), 6 deletions(-) diff --git a/Tests/ReactiveSwiftTests/SignalProducerLiftingSpec.swift b/Tests/ReactiveSwiftTests/SignalProducerLiftingSpec.swift index a60faced4..5d4728f64 100644 --- a/Tests/ReactiveSwiftTests/SignalProducerLiftingSpec.swift +++ b/Tests/ReactiveSwiftTests/SignalProducerLiftingSpec.swift @@ -313,7 +313,11 @@ class SignalProducerLiftingSpec: QuickSpec { } it("should interrupt ASAP and discard outstanding events") { - testAsyncASAPInterruption { $0.lazyMap(on: $1) { $0 } } + testAsyncASAPInterruption(op: "lazyMap") { $0.lazyMap(on: $1) { $0 } } + } + + it("should interrupt on the given scheduler") { + testAsyncInterruptionScheduler(op: "lazyMap") { $0.lazyMap(on: $1) { $0 } } } } @@ -1133,7 +1137,11 @@ class SignalProducerLiftingSpec: QuickSpec { } it("should interrupt ASAP and discard outstanding events") { - testAsyncASAPInterruption { $0.observe(on: $1) } + testAsyncASAPInterruption(op: "observe(on:)") { $0.observe(on: $1) } + } + + it("should interrupt on the given scheduler") { + testAsyncInterruptionScheduler(op: "observe(on:)") { $0.observe(on: $1) } } } @@ -1200,7 +1208,11 @@ class SignalProducerLiftingSpec: QuickSpec { } it("should interrupt ASAP and discard outstanding events") { - testAsyncASAPInterruption { $0.delay(10.0, on: $1) } + testAsyncASAPInterruption(op: "delay") { $0.delay(10.0, on: $1) } + } + + it("should interrupt on the given scheduler") { + testAsyncInterruptionScheduler(op: "delay") { $0.delay(10.0, on: $1) } } } @@ -1298,13 +1310,21 @@ class SignalProducerLiftingSpec: QuickSpec { } it("should interrupt ASAP and discard outstanding events") { - testAsyncASAPInterruption { $0.throttle(10.0, on: $1) } + testAsyncASAPInterruption(op: "throttle") { $0.throttle(10.0, on: $1) } + } + + it("should interrupt on the given scheduler") { + testAsyncInterruptionScheduler(op: "throttle") { $0.throttle(10.0, on: $1) } } } describe("debounce") { it("should interrupt ASAP and discard outstanding events") { - testAsyncASAPInterruption { $0.delay(10.0, on: $1) } + testAsyncASAPInterruption(op: "debounce") { $0.debounce(10.0, on: $1) } + } + + it("should interrupt on the given scheduler") { + testAsyncInterruptionScheduler(op: "debounce") { $0.debounce(10.0, on: $1) } } } @@ -2036,7 +2056,36 @@ class SignalProducerLiftingSpec: QuickSpec { } } +private func testAsyncInterruptionScheduler( + op: String, + file: FileString = #file, + line: UInt = #line, + transform: (SignalProducer, TestScheduler) -> SignalProducer +) { + var isInterrupted = false + + let scheduler = TestScheduler() + let producer = transform(SignalProducer(0 ..< 128), scheduler) + + let failedExpectations = gatherFailingExpectations { + let disposable = producer.startWithInterrupted { isInterrupted = true } + expect(isInterrupted) == false + + disposable.dispose() + expect(isInterrupted) == false + + scheduler.run() + expect(isInterrupted) == true + } + + if !failedExpectations.isEmpty { + fail("The async operator `\(op)` does not interrupt on the appropriate scheduler.", + location: SourceLocation(file: file, line: line)) + } +} + private func testAsyncASAPInterruption( + op: String, file: FileString = #file, line: UInt = #line, transform: (SignalProducer, TestScheduler) -> SignalProducer @@ -2073,7 +2122,7 @@ private func testAsyncASAPInterruption( } if !failedExpectations.isEmpty { - fail("The ASAP interruption test of the async operator has failed.", + fail("The ASAP interruption test of the async operator `\(op)` has failed.", location: SourceLocation(file: file, line: line)) } }