Skip to content

Commit

Permalink
Debounce, Throttle, CollectEvery
Browse files Browse the repository at this point in the history
  • Loading branch information
andersio committed Jan 1, 2021
1 parent 96143cd commit 970cf3d
Show file tree
Hide file tree
Showing 6 changed files with 278 additions and 155 deletions.
30 changes: 30 additions & 0 deletions ReactiveSwift.xcodeproj/project.pbxproj
Expand Up @@ -85,6 +85,18 @@
9A2D5D3B259F985B005682ED /* Delay.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A2D5D39259F985B005682ED /* Delay.swift */; };
9A2D5D3C259F985B005682ED /* Delay.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A2D5D39259F985B005682ED /* Delay.swift */; };
9A2D5D3D259F985B005682ED /* Delay.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A2D5D39259F985B005682ED /* Delay.swift */; };
9A2D5D53259FA000005682ED /* Throttle.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A2D5D52259FA000005682ED /* Throttle.swift */; };
9A2D5D54259FA000005682ED /* Throttle.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A2D5D52259FA000005682ED /* Throttle.swift */; };
9A2D5D55259FA000005682ED /* Throttle.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A2D5D52259FA000005682ED /* Throttle.swift */; };
9A2D5D56259FA000005682ED /* Throttle.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A2D5D52259FA000005682ED /* Throttle.swift */; };
9A2D5D5D259FA0DD005682ED /* Debounce.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A2D5D5C259FA0DD005682ED /* Debounce.swift */; };
9A2D5D5E259FA0DD005682ED /* Debounce.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A2D5D5C259FA0DD005682ED /* Debounce.swift */; };
9A2D5D5F259FA0DD005682ED /* Debounce.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A2D5D5C259FA0DD005682ED /* Debounce.swift */; };
9A2D5D60259FA0DD005682ED /* Debounce.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A2D5D5C259FA0DD005682ED /* Debounce.swift */; };
9A2D5D67259FA59E005682ED /* CollectEvery.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A2D5D66259FA59E005682ED /* CollectEvery.swift */; };
9A2D5D68259FA59E005682ED /* CollectEvery.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A2D5D66259FA59E005682ED /* CollectEvery.swift */; };
9A2D5D69259FA59E005682ED /* CollectEvery.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A2D5D66259FA59E005682ED /* CollectEvery.swift */; };
9A2D5D6A259FA59E005682ED /* CollectEvery.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A2D5D66259FA59E005682ED /* CollectEvery.swift */; };
9A67963B1F6056B90058C5B4 /* UninhabitedTypeGuards.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A67963A1F6056B90058C5B4 /* UninhabitedTypeGuards.swift */; };
9A67963C1F6059420058C5B4 /* UninhabitedTypeGuards.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A67963A1F6056B90058C5B4 /* UninhabitedTypeGuards.swift */; };
9A67963D1F6059430058C5B4 /* UninhabitedTypeGuards.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A67963A1F6056B90058C5B4 /* UninhabitedTypeGuards.swift */; };
Expand Down Expand Up @@ -284,6 +296,9 @@
9A2D5D25259F9373005682ED /* ObserveOn.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ObserveOn.swift; sourceTree = "<group>"; };
9A2D5D2F259F942B005682ED /* LazyMap.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = LazyMap.swift; sourceTree = "<group>"; };
9A2D5D39259F985B005682ED /* Delay.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = Delay.swift; sourceTree = "<group>"; };
9A2D5D52259FA000005682ED /* Throttle.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = Throttle.swift; sourceTree = "<group>"; };
9A2D5D5C259FA0DD005682ED /* Debounce.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = Debounce.swift; sourceTree = "<group>"; };
9A2D5D66259FA59E005682ED /* CollectEvery.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = CollectEvery.swift; sourceTree = "<group>"; };
9A67963A1F6056B90058C5B4 /* UninhabitedTypeGuards.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = UninhabitedTypeGuards.swift; sourceTree = "<group>"; };
9A681A9D1E5A241B00B097CF /* DeprecationSpec.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = DeprecationSpec.swift; sourceTree = "<group>"; };
9A9100DE1E0E6E620093E346 /* ValidatingProperty.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = ValidatingProperty.swift; sourceTree = "<group>"; };
Expand Down Expand Up @@ -441,6 +456,9 @@
9A2D5D25259F9373005682ED /* ObserveOn.swift */,
9A2D5D2F259F942B005682ED /* LazyMap.swift */,
9A2D5D39259F985B005682ED /* Delay.swift */,
9A2D5D52259FA000005682ED /* Throttle.swift */,
9A2D5D5C259FA0DD005682ED /* Debounce.swift */,
9A2D5D66259FA59E005682ED /* CollectEvery.swift */,
);
path = Observers;
sourceTree = "<group>";
Expand Down Expand Up @@ -934,6 +952,7 @@
57A4D1B11BA13D7A00F7D4B1 /* Optional.swift in Sources */,
57A4D1B41BA13D7A00F7D4B1 /* Disposable.swift in Sources */,
57A4D1B61BA13D7A00F7D4B1 /* Event.swift in Sources */,
9A2D5D6A259FA59E005682ED /* CollectEvery.swift in Sources */,
57A4D1B81BA13D7A00F7D4B1 /* Scheduler.swift in Sources */,
9A9100E21E0E6E680093E346 /* ValidatingProperty.swift in Sources */,
57A4D1B91BA13D7A00F7D4B1 /* Action.swift in Sources */,
Expand All @@ -943,7 +962,9 @@
9A2D5D3D259F985B005682ED /* Delay.swift in Sources */,
57A4D1BB1BA13D7A00F7D4B1 /* Signal.swift in Sources */,
9AFA492824E9B15C003D263C /* Operators.swift in Sources */,
9A2D5D56259FA000005682ED /* Throttle.swift in Sources */,
9A67963E1F6059440058C5B4 /* UninhabitedTypeGuards.swift in Sources */,
9A2D5D60259FA0DD005682ED /* Debounce.swift in Sources */,
9AFA491424E9A196003D263C /* Map.swift in Sources */,
9A2D5D33259F942B005682ED /* LazyMap.swift in Sources */,
9AFA491E24E9A925003D263C /* Filter.swift in Sources */,
Expand Down Expand Up @@ -999,6 +1020,7 @@
A9F793341B60D0140026BCBA /* Optional.swift in Sources */,
A9B315BC1B3940810001CB9C /* Disposable.swift in Sources */,
A9B315BE1B3940810001CB9C /* Event.swift in Sources */,
9A2D5D69259FA59E005682ED /* CollectEvery.swift in Sources */,
A9B315C01B3940810001CB9C /* Scheduler.swift in Sources */,
9A9100E11E0E6E680093E346 /* ValidatingProperty.swift in Sources */,
A9B315C11B3940810001CB9C /* Action.swift in Sources */,
Expand All @@ -1008,7 +1030,9 @@
9A2D5D3C259F985B005682ED /* Delay.swift in Sources */,
A9B315C31B3940810001CB9C /* Signal.swift in Sources */,
9AFA492724E9B15C003D263C /* Operators.swift in Sources */,
9A2D5D55259FA000005682ED /* Throttle.swift in Sources */,
9A67963D1F6059430058C5B4 /* UninhabitedTypeGuards.swift in Sources */,
9A2D5D5F259FA0DD005682ED /* Debounce.swift in Sources */,
9AFA491324E9A196003D263C /* Map.swift in Sources */,
9A2D5D32259F942B005682ED /* LazyMap.swift in Sources */,
9AFA491D24E9A925003D263C /* Filter.swift in Sources */,
Expand Down Expand Up @@ -1036,6 +1060,7 @@
D871D69F1B3B29A40070F16C /* Optional.swift in Sources */,
D08C54B61A69A3DB00AD8286 /* Event.swift in Sources */,
D0C312D319EF2A5800984962 /* Disposable.swift in Sources */,
9A2D5D67259FA59E005682ED /* CollectEvery.swift in Sources */,
9A9100DF1E0E6E620093E346 /* ValidatingProperty.swift in Sources */,
EBCC7DBC1BBF010C00A2AE92 /* Signal.Observer.swift in Sources */,
D03B4A3D19F4C39A009E02AC /* FoundationExtensions.swift in Sources */,
Expand All @@ -1045,7 +1070,9 @@
9A2D5D3A259F985B005682ED /* Delay.swift in Sources */,
D85C652A1C0D84C7005A77AD /* Flatten.swift in Sources */,
9AFA492524E9B15C003D263C /* Operators.swift in Sources */,
9A2D5D53259FA000005682ED /* Throttle.swift in Sources */,
9A67963B1F6056B90058C5B4 /* UninhabitedTypeGuards.swift in Sources */,
9A2D5D5D259FA0DD005682ED /* Debounce.swift in Sources */,
9AFA491124E9A196003D263C /* Map.swift in Sources */,
9A2D5D30259F942B005682ED /* LazyMap.swift in Sources */,
9AFA491B24E9A925003D263C /* Filter.swift in Sources */,
Expand Down Expand Up @@ -1101,6 +1128,7 @@
D08C54B41A69A2AF00AD8286 /* Signal.swift in Sources */,
D8E84A671B3B32FB00C3E831 /* Optional.swift in Sources */,
D0C312D419EF2A5800984962 /* Disposable.swift in Sources */,
9A2D5D68259FA59E005682ED /* CollectEvery.swift in Sources */,
D08C54B91A69A9D100AD8286 /* SignalProducer.swift in Sources */,
9A9100E01E0E6E670093E346 /* ValidatingProperty.swift in Sources */,
9ABCB1861D2A5B5A00BCA243 /* Deprecations+Removals.swift in Sources */,
Expand All @@ -1110,7 +1138,9 @@
9A2D5D3B259F985B005682ED /* Delay.swift in Sources */,
D85C652B1C0E70E3005A77AD /* Flatten.swift in Sources */,
9AFA492624E9B15C003D263C /* Operators.swift in Sources */,
9A2D5D54259FA000005682ED /* Throttle.swift in Sources */,
9A67963C1F6059420058C5B4 /* UninhabitedTypeGuards.swift in Sources */,
9A2D5D5E259FA0DD005682ED /* Debounce.swift in Sources */,
9AFA491224E9A196003D263C /* Map.swift in Sources */,
9A2D5D31259F942B005682ED /* LazyMap.swift in Sources */,
9AFA491C24E9A925003D263C /* Filter.swift in Sources */,
Expand Down
171 changes: 19 additions & 152 deletions Sources/Event.swift
Expand Up @@ -726,170 +726,37 @@ extension Signal.Event {
}

internal static func throttle(_ interval: TimeInterval, on scheduler: DateScheduler) -> Transformation<Value, Error> {
precondition(interval >= 0)

return { action, lifetime in
let state: Atomic<ThrottleState<Value>> = Atomic(ThrottleState())
let schedulerDisposable = SerialDisposable()

lifetime.observeEnded {
schedulerDisposable.dispose()
scheduler.schedule { action(.interrupted) }
}

return Signal.Observer { event in
guard let value = event.value else {
schedulerDisposable.inner = scheduler.schedule {
action(event)
}
return
}

let scheduleDate: Date = state.modify { state in
state.pendingValue = value

let proposedScheduleDate: Date
if let previousDate = state.previousDate, previousDate <= scheduler.currentDate {
proposedScheduleDate = previousDate.addingTimeInterval(interval)
} else {
proposedScheduleDate = scheduler.currentDate
}

return proposedScheduleDate < scheduler.currentDate ? scheduler.currentDate : proposedScheduleDate
}

schedulerDisposable.inner = scheduler.schedule(after: scheduleDate) {
if let pendingValue = state.modify({ $0.retrieveValue(date: scheduleDate) }) {
action(.value(pendingValue))
}
}
}
return { downstream, lifetime in
Operators.Throttle(downstream: downstream, downstreamLifetime: lifetime, target: scheduler, interval: interval)
}
}

internal static func debounce(_ interval: TimeInterval, on scheduler: DateScheduler, discardWhenCompleted: Bool) -> Transformation<Value, Error> {
precondition(interval >= 0)

return { action, lifetime in
let state: Atomic<ThrottleState<Value>> = Atomic(ThrottleState(previousDate: scheduler.currentDate, pendingValue: nil))
let d = SerialDisposable()

lifetime.observeEnded {
d.dispose()
scheduler.schedule { action(.interrupted) }
}

return Signal.Observer { event in
switch event {
case let .value(value):
state.modify { state in
state.pendingValue = value
}
let date = scheduler.currentDate.addingTimeInterval(interval)
d.inner = scheduler.schedule(after: date) {
if let pendingValue = state.modify({ $0.retrieveValue(date: date) }) {
action(.value(pendingValue))
}
}

case .completed:
d.inner = scheduler.schedule {
let pending: (value: Value, previousDate: Date)? = state.modify { state in
defer { state.pendingValue = nil }
guard let pendingValue = state.pendingValue, let previousDate = state.previousDate else { return nil }
return (pendingValue, previousDate)
}
if !discardWhenCompleted, let (pendingValue, previousDate) = pending {
scheduler.schedule(after: previousDate.addingTimeInterval(interval)) {
action(.value(pendingValue))
action(.completed)
}
} else {
action(.completed)
}
}

case .failed, .interrupted:
d.inner = scheduler.schedule {
action(event)
}
}
}
return { downstream, lifetime in
Operators.Debounce(
downstream: downstream,
downstreamLifetime: lifetime,
target: scheduler,
interval: interval,
discardWhenCompleted: discardWhenCompleted
)
}
}

internal static func collect(every interval: DispatchTimeInterval, on scheduler: DateScheduler, skipEmpty: Bool, discardWhenCompleted: Bool) -> Transformation<[Value], Error> {
return { action, lifetime in
let state = Atomic<CollectEveryState<Value>>(.init(skipEmpty: skipEmpty))
let d = SerialDisposable()

d.inner = scheduler.schedule(after: scheduler.currentDate.addingTimeInterval(interval), interval: interval, leeway: interval * 0.1) {
let (currentValues, isCompleted) = state.modify { ($0.collect(), $0.isCompleted) }
if let currentValues = currentValues {
action(.value(currentValues))
}
if isCompleted {
action(.completed)
}
}

lifetime.observeEnded {
d.dispose()
scheduler.schedule { action(.interrupted) }
}

return Signal.Observer { event in
switch event {
case let .value(value):
state.modify { $0.values.append(value) }
case let .failed(error):
d.inner = scheduler.schedule { action(.failed(error)) }
case .completed where !discardWhenCompleted:
state.modify { $0.isCompleted = true }
case .completed:
d.inner = scheduler.schedule { action(.completed) }
case .interrupted:
d.inner = scheduler.schedule { action(.interrupted) }
}
}
return { downstream, lifetime in
Operators.CollectEvery(
downstream: downstream,
downstreamLifetime: lifetime,
target: scheduler,
interval: interval,
skipEmpty: skipEmpty,
discardWhenCompleted: discardWhenCompleted
)
}
}
}

private struct CollectEveryState<Value> {
let skipEmpty: Bool
var values: [Value] = []
var isCompleted: Bool = false

init(skipEmpty: Bool) {
self.skipEmpty = skipEmpty
}

var hasValues: Bool {
return !values.isEmpty || !skipEmpty
}

mutating func collect() -> [Value]? {
guard hasValues else { return nil }
defer { values.removeAll() }
return values
}
}

private struct ThrottleState<Value> {
var previousDate: Date?
var pendingValue: Value?

mutating func retrieveValue(date: Date) -> Value? {
defer {
if pendingValue != nil {
pendingValue = nil
previousDate = date
}
}
return pendingValue
}
}

extension Signal.Event where Error == Never {
internal static func promoteError<F>(_: F.Type) -> Transformation<Value, F> {
Expand Down
76 changes: 76 additions & 0 deletions Sources/Observers/CollectEvery.swift
@@ -0,0 +1,76 @@
extension Operators {
internal final class CollectEvery<Value, Error: Swift.Error>: UnaryAsyncOperator<Value, [Value], Error> {
let interval: DispatchTimeInterval
let discardWhenCompleted: Bool
let targetWithClock: DateScheduler

private let state: Atomic<CollectEveryState<Value>>
private let timerDisposable = SerialDisposable()

init(
downstream: Observer<[Value], Error>,
downstreamLifetime: Lifetime,
target: DateScheduler,
interval: DispatchTimeInterval,
skipEmpty: Bool,
discardWhenCompleted: Bool
) {
self.interval = interval
self.discardWhenCompleted = discardWhenCompleted
self.targetWithClock = target
self.state = Atomic(CollectEveryState(skipEmpty: skipEmpty))

super.init(downstream: downstream, downstreamLifetime: downstreamLifetime, target: target)

downstreamLifetime += timerDisposable

let initialDate = targetWithClock.currentDate.addingTimeInterval(interval)
timerDisposable.inner = targetWithClock.schedule(after: initialDate, interval: interval, leeway: interval * 0.1) {
let (currentValues, isCompleted) = self.state.modify { ($0.collect(), $0.isCompleted) }

if let currentValues = currentValues {
self.unscheduledSend(currentValues)
}

if isCompleted {
self.unscheduledTerminate(.completed)
}
}
}

override func receive(_ value: Value) {
state.modify { $0.values.append(value) }
}

override func terminate(_ termination: Termination<Error>) {
guard isActive else { return }

if case .completed = termination, !discardWhenCompleted {
state.modify { $0.isCompleted = true }
} else {
timerDisposable.dispose()
super.terminate(termination)
}
}
}
}

private struct CollectEveryState<Value> {
let skipEmpty: Bool
var values: [Value] = []
var isCompleted: Bool = false

init(skipEmpty: Bool) {
self.skipEmpty = skipEmpty
}

var hasValues: Bool {
return !values.isEmpty || !skipEmpty
}

mutating func collect() -> [Value]? {
guard hasValues else { return nil }
defer { values.removeAll() }
return values
}
}

0 comments on commit 970cf3d

Please sign in to comment.