-
Notifications
You must be signed in to change notification settings - Fork 9
/
Buffers.swift
executable file
·242 lines (226 loc) · 7.2 KB
/
Buffers.swift
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
//
// Buffers.swift
//
// Created by Daniel Tartaglia on 04 Apr, 2019
// Copyright © 2023 Daniel Tartaglia. MIT License.
//
import Foundation
import RxSwift
extension ObservableType {
func buffer(shouldInclude: @escaping ([Element], Element) -> Bool) -> Observable<[Element]> {
materialize()
.scan(into: (buf: [Element](), emit: [Element]())) { state, event in
switch event {
case let .next(element):
if shouldInclude(state.buf, element) {
state.buf.append(element)
state.emit = []
}
else {
state.emit = state.buf
state.buf = [element]
}
case let .error(error):
throw error
case .completed:
state.emit = state.buf
state.buf = []
}
}
.filter { !$1.isEmpty }
.map { $1 }
}
}
extension ObservableType {
/**
Projects elements from an observable sequence into a buffer that's sent out when its full and then every `skip`
elements.
- seealso: [overlapping buffers in Introduction to Rx](https://introtorx.com/chapters/partitioning#OverlappingBuffers)
- parameter count: Size of the array of elements that will be produced in each event.
- parameter skip: Number of elements that must emit from the source before the buffer emits.
- returns: An observable sequence of buffers.
*/
func buffer(count: Int, skip: Int) -> Observable<[Element]> {
precondition(
skip > 0,
"The `skip` parameter cannot be less than or equal to zero. If you want to use a value of zero (i.e. each buffer contains all values), then consider using the `scan` method instead with an Array<T> as the accumulator."
)
return materialize()
.scan(into: (buf: [Element](), step: count, trigger: false)) { prev, event in
switch event {
case let .next(value):
let newStep = prev.step - 1
prev.buf.append(value)
if prev.buf.count > count {
prev.buf.removeFirst()
}
prev.step = newStep == 0 ? skip : newStep
prev.trigger = newStep == 0
case .completed:
prev.buf = Array(prev.buf.suffix(count - prev.step))
prev.step = 0
prev.trigger = true
case let .error(error):
throw error
}
}
.filter { $0.trigger }
.map { $0.buf }
}
}
extension ObservableType {
/**
Projects elements from an observable sequence into a buffer that's sent out after `timeSpan` and then every
`timeShift` seconds.
- seealso: [overlapping buffers in Introduction to Rx](https://introtorx.com/chapters/partitioning#OverlappingBuffers)
- parameter timeSpan: The amount of time the operator will spend gathering events.
- parameter timeShift: The amount of time that must pass before the buffer emits.
- parameter scheduler: Scheduler to run timers on.
- returns: An observable sequence of buffers.
*/
func buffer(timeSpan: RxTimeInterval, timeShift: RxTimeInterval,
scheduler: SchedulerType) -> Observable<[Element]>
{
precondition(
timeShift.asTimeInterval > 0,
"The `timeShift` parameter cannot be less than or equal to zero. If you want to use a value of zero (i.e. each buffer contains all values), then consider using the `scan` method instead with an Array<T> as the accumulator."
)
return Observable.create { observer in
var buf: [Date: Element] = [:]
var lastEmit: Date?
let lock = NSRecursiveLock()
let bufferDispoable = self.subscribe { event in
lock.lock(); defer { lock.unlock() }
let now = scheduler.now
switch event {
case let .next(element):
buf[now] = element
case .completed:
let span = now.timeIntervalSince(lastEmit ?? .distantPast)
+ timeSpan.asTimeInterval
- timeShift.asTimeInterval
let buffer = buf
.filter { $0.key > now.addingTimeInterval(-span) }
.sorted(by: { $0.key <= $1.key })
.map { $0.value }
observer.onNext(buffer)
observer.onCompleted()
case let .error(error):
observer.onError(error)
}
}
let schedulerDisposable = scheduler.schedulePeriodic(
(),
startAfter: timeSpan,
period: timeShift,
action: { _ in
lock.lock(); defer { lock.unlock() }
let now = scheduler.now
buf = buf.filter { $0.key > now.addingTimeInterval(-timeSpan.asTimeInterval) }
observer.onNext(buf.sorted(by: { $0.key <= $1.key }).map { $0.value })
lastEmit = now
}
)
return Disposables.create([schedulerDisposable, bufferDispoable])
}
}
}
extension ObservableType {
/**
Projects elements from an observable sequence into a buffer that's sent out when the boundary sequence fires. Then it emits the elements as an array and begins collecting again.
- parameter boundary: Triggering event sequence.
- returns: Array of elements observable sequence.
*/
func buffer<O>(boundary: O) -> Observable<[Element]> where O: ObservableConvertibleType {
Observable.create { observer in
var buffer = [Element]()
let lock = NSRecursiveLock()
let source = self.subscribe { event in
lock.lock(); defer { lock.unlock() }
switch event {
case let .next(element):
buffer.append(element)
case let .error(error):
observer.onError(error)
case .completed:
observer.onNext(buffer)
observer.onCompleted()
}
}
let trigger = boundary.asObservable()
.subscribe { event in
lock.lock(); defer { lock.unlock() }
switch event {
case .next:
observer.onNext(buffer)
buffer = []
case let .error(error):
observer.onError(error)
case .completed:
break
}
}
return CompositeDisposable(source, trigger)
}
}
}
extension Observable {
/**
Projects elements from an observable sequence into a buffer that's sent out when the isCollecting sequence's latest value was `true`.
- parameter isCollecting: Sequence that switches the buffering behavior.
- returns: Array of elements observable sequence.
*/
func buffer<O>(isCollecting boundary: O) -> Observable<[Element]> where O: ObservableConvertibleType, O.Element == Bool {
Observable<[Element]>.create { observer in
var buffer = [Element]()
var isCollecting = false
let lock = NSRecursiveLock()
let source = self.subscribe { event in
lock.lock(); defer { lock.unlock() }
switch event {
case let .next(element):
if isCollecting {
buffer.append(element)
} else {
observer.onNext([element])
}
case let .error(error):
observer.onError(error)
case .completed:
observer.onNext(buffer)
observer.onCompleted()
}
}
let trigger = boundary.asObservable().subscribe { event in
lock.lock(); defer { lock.unlock() }
switch event {
case let .next(element):
if isCollecting && !element {
observer.onNext(buffer)
}
isCollecting = element
case let .error(error):
observer.onError(error)
case .completed:
if isCollecting {
observer.onNext(buffer)
}
observer.onCompleted()
}
}
return CompositeDisposable(source, trigger)
}
}
}
private extension RxTimeInterval {
var asTimeInterval: TimeInterval {
switch self {
case let .nanoseconds(val): return Double(val) / 1_000_000_000.0
case let .microseconds(val): return Double(val) / 1_000_000.0
case let .milliseconds(val): return Double(val) / 1000.0
case let .seconds(val): return Double(val)
case .never: return Double.infinity
default: fatalError()
}
}
}