Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions Sources/Event.swift
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,53 @@ extension Signal.Event {
}
}

internal static func lazyMap<U>(on scheduler: Scheduler, transform: @escaping (Value) -> U) -> Transformation<U, Error> {
return { action, lifetime in
let box = Atomic<Value?>(nil)
let completionDisposable = SerialDisposable()
let valueDisposable = SerialDisposable()

lifetime += valueDisposable
lifetime += completionDisposable

lifetime.observeEnded {
scheduler.schedule {
action(.interrupted)
}
}

return { event in
switch event {
case let .value(value):
// Schedule only when there is no prior outstanding value.
if box.swap(value) == nil {
valueDisposable.inner = scheduler.schedule {
if let value = box.swap(nil) {
action(.value(transform(value)))
}
}
}

case .completed, .failed:
// Completion and failure should not discard the outstanding
// value.
completionDisposable.inner = scheduler.schedule {
action(event.map(transform))
}

case .interrupted:
// `interrupted` overrides any outstanding value and any
// scheduled completion/failure.
valueDisposable.dispose()
completionDisposable.dispose()
scheduler.schedule {
action(.interrupted)
}
}
}
}
}

internal static func delay(_ interval: TimeInterval, on scheduler: DateScheduler) -> Transformation<Value, Error> {
precondition(interval >= 0)

Expand Down
5 changes: 1 addition & 4 deletions Sources/Signal.swift
Original file line number Diff line number Diff line change
Expand Up @@ -595,10 +595,7 @@ extension Signal {
/// - returns: A signal that sends values obtained using `transform` as this
/// signal sends values.
public func lazyMap<U>(on scheduler: Scheduler, transform: @escaping (Value) -> U) -> Signal<U, Error> {
return flatMap(.latest) { value in
return SignalProducer({ transform(value) })
.start(on: scheduler)
}
return flatMapEvent(Signal.Event.lazyMap(on: scheduler, transform: transform))
}

/// Preserve only values which pass the given closure.
Expand Down
2 changes: 1 addition & 1 deletion Sources/SignalProducer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -895,7 +895,7 @@ extension SignalProducer {
/// - returns: A producer that, when started, sends values obtained using
/// `transform` as this producer sends values.
public func lazyMap<U>(on scheduler: Scheduler, transform: @escaping (Value) -> U) -> SignalProducer<U, Error> {
return lift { $0.lazyMap(on: scheduler, transform: transform) }
return core.flatMapEvent(Signal.Event.lazyMap(on: scheduler, transform: transform))
}

/// Preserve only values which pass the given closure.
Expand Down
61 changes: 55 additions & 6 deletions Tests/ReactiveSwiftTests/SignalProducerLiftingSpec.swift
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,11 @@ class SignalProducerLiftingSpec: QuickSpec {
}

it("should interrupt ASAP and discard outstanding events") {
testAsyncASAPInterruption { $0.lazyMap(on: $1) { $0 } }
testAsyncASAPInterruption(op: "lazyMap") { $0.lazyMap(on: $1) { $0 } }
}

it("should interrupt on the given scheduler") {
testAsyncInterruptionScheduler(op: "lazyMap") { $0.lazyMap(on: $1) { $0 } }
}
}

Expand Down Expand Up @@ -1133,7 +1137,11 @@ class SignalProducerLiftingSpec: QuickSpec {
}

it("should interrupt ASAP and discard outstanding events") {
testAsyncASAPInterruption { $0.observe(on: $1) }
testAsyncASAPInterruption(op: "observe(on:)") { $0.observe(on: $1) }
}

it("should interrupt on the given scheduler") {
testAsyncInterruptionScheduler(op: "observe(on:)") { $0.observe(on: $1) }
}
}

Expand Down Expand Up @@ -1200,7 +1208,11 @@ class SignalProducerLiftingSpec: QuickSpec {
}

it("should interrupt ASAP and discard outstanding events") {
testAsyncASAPInterruption { $0.delay(10.0, on: $1) }
testAsyncASAPInterruption(op: "delay") { $0.delay(10.0, on: $1) }
}

it("should interrupt on the given scheduler") {
testAsyncInterruptionScheduler(op: "delay") { $0.delay(10.0, on: $1) }
}
}

Expand Down Expand Up @@ -1298,13 +1310,21 @@ class SignalProducerLiftingSpec: QuickSpec {
}

it("should interrupt ASAP and discard outstanding events") {
testAsyncASAPInterruption { $0.throttle(10.0, on: $1) }
testAsyncASAPInterruption(op: "throttle") { $0.throttle(10.0, on: $1) }
}

it("should interrupt on the given scheduler") {
testAsyncInterruptionScheduler(op: "throttle") { $0.throttle(10.0, on: $1) }
}
}

describe("debounce") {
it("should interrupt ASAP and discard outstanding events") {
testAsyncASAPInterruption { $0.delay(10.0, on: $1) }
testAsyncASAPInterruption(op: "debounce") { $0.debounce(10.0, on: $1) }
}

it("should interrupt on the given scheduler") {
testAsyncInterruptionScheduler(op: "debounce") { $0.debounce(10.0, on: $1) }
}
}

Expand Down Expand Up @@ -2036,7 +2056,36 @@ class SignalProducerLiftingSpec: QuickSpec {
}
}

private func testAsyncInterruptionScheduler(
op: String,
file: FileString = #file,
line: UInt = #line,
transform: (SignalProducer<Int, NoError>, TestScheduler) -> SignalProducer<Int, NoError>
) {
var isInterrupted = false

let scheduler = TestScheduler()
let producer = transform(SignalProducer(0 ..< 128), scheduler)

let failedExpectations = gatherFailingExpectations {
let disposable = producer.startWithInterrupted { isInterrupted = true }
expect(isInterrupted) == false

disposable.dispose()
expect(isInterrupted) == false

scheduler.run()
expect(isInterrupted) == true
}

if !failedExpectations.isEmpty {
fail("The async operator `\(op)` does not interrupt on the appropriate scheduler.",
location: SourceLocation(file: file, line: line))
}
}

private func testAsyncASAPInterruption(
op: String,
file: FileString = #file,
line: UInt = #line,
transform: (SignalProducer<Int, NoError>, TestScheduler) -> SignalProducer<Int, NoError>
Expand Down Expand Up @@ -2073,7 +2122,7 @@ private func testAsyncASAPInterruption(
}

if !failedExpectations.isEmpty {
fail("The ASAP interruption test of the async operator has failed.",
fail("The ASAP interruption test of the async operator `\(op)` has failed.",
location: SourceLocation(file: file, line: line))
}
}