-
Notifications
You must be signed in to change notification settings - Fork 207
/
Publishers.Map.swift
351 lines (297 loc) · 12.3 KB
/
Publishers.Map.swift
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
//
// Publishers.Map.swift
//
//
// Created by Anton Nazarov on 25.06.2019.
//
extension Publisher {
/// Transforms all elements from the upstream publisher with a provided closure.
///
/// OpenCombine’s `map(_:)` operator performs a function similar to that of `map(_:)`
/// in the Swift standard library: it uses a closure to transform each element it
/// receives from the upstream publisher. You use `map(_:)` to transform from one kind
/// of element to another.
///
/// The following example uses an array of numbers as the source for a collection
/// based publisher. A `map(_:)` operator consumes each integer from the publisher and
/// uses a dictionary to transform it from its Arabic numeral to a Roman equivalent,
/// as a `String`.
/// If the `map(_:)`’s closure fails to look up a Roman numeral, it returns the string
/// `(unknown)`.
///
/// let numbers = [5, 4, 3, 2, 1, 0]
/// let romanNumeralDict: [Int : String] =
/// [1:"I", 2:"II", 3:"III", 4:"IV", 5:"V"]
/// cancellable = numbers.publisher
/// .map { romanNumeralDict[$0] ?? "(unknown)" }
/// .sink { print("\($0)", terminator: " ") }
///
/// // Prints: "V IV III II I (unknown)"
///
/// If your closure can throw an error, use OpenCombine’s `tryMap(_:)` operator
/// instead.
///
/// - Parameter transform: A closure that takes one element as its parameter and
/// returns a new element.
/// - Returns: A publisher that uses the provided closure to map elements from
/// the upstream publisher to new elements that it then publishes.
public func map<Result>(
_ transform: @escaping (Output) -> Result
) -> Publishers.Map<Self, Result> {
return Publishers.Map(upstream: self, transform: transform)
}
/// Transforms all elements from the upstream publisher with a provided error-throwing
/// closure.
///
/// OpenCombine’s `tryMap(_:)` operator performs a function similar to that of
/// `map(_:)` in the Swift standard library: it uses a closure to transform each
/// element it receives from the upstream publisher. You use `tryMap(_:)` to transform
/// from one kind of element to another, and to terminate publishing when the map’s
/// closure throws an error.
///
/// The following example uses an array of numbers as the source for a collection
/// based publisher. A `tryMap(_:)` operator consumes each integer from the publisher
/// and uses a dictionary to transform it from its Arabic numeral to a Roman
/// equivalent, as a `String`.
/// If the `tryMap(_:)`’s closure fails to look up a Roman numeral, it throws
/// an error. The `tryMap(_:)` operator catches this error and terminates publishing,
/// sending a `Subscribers.Completion.failure(_:)` that wraps the error.
///
/// struct ParseError: Error {}
/// func romanNumeral(from:Int) throws -> String {
/// let romanNumeralDict: [Int : String] =
/// [1:"I", 2:"II", 3:"III", 4:"IV", 5:"V"]
/// guard let numeral = romanNumeralDict[from] else {
/// throw ParseError()
/// }
/// return numeral
/// }
/// let numbers = [5, 4, 3, 2, 1, 0]
/// cancellable = numbers.publisher
/// .tryMap { try romanNumeral(from: $0) }
/// .sink(
/// receiveCompletion: { print ("completion: \($0)") },
/// receiveValue: { print ("\($0)", terminator: " ") }
/// )
///
/// // Prints: "V IV III II I completion: failure(ParseError())"
///
/// If your closure doesn’t throw, use `map(_:)` instead.
///
/// - Parameter transform: A closure that takes one element as its parameter and
/// returns a new element. If the closure throws an error, the publisher fails with
/// the thrown error.
/// - Returns: A publisher that uses the provided closure to map elements from
/// the upstream publisher to new elements that it then publishes.
public func tryMap<Result>(
_ transform: @escaping (Output) throws -> Result
) -> Publishers.TryMap<Self, Result> {
return Publishers.TryMap(upstream: self, transform: transform)
}
/// Replaces `nil` elements in the stream with the provided element.
///
/// The `replaceNil(with:)` operator enables replacement of `nil` values in a stream
/// with a substitute value. In the example below, a collection publisher contains
/// a `nil` value. The `replaceNil(with:)` operator replaces this with `0.0`.
///
/// let numbers: [Double?] = [1.0, 2.0, nil, 3.0]
/// numbers.publisher
/// .replaceNil(with: 0.0)
/// .sink { print("\($0)", terminator: " ") }
///
/// // Prints: "Optional(1.0) Optional(2.0) Optional(0.0) Optional(3.0)"
///
/// - Parameter output: The element to use when replacing `nil`.
/// - Returns: A publisher that replaces `nil` elements from the upstream publisher
/// with the provided element.
public func replaceNil<ElementOfResult>(
with output: ElementOfResult
) -> Publishers.Map<Self, ElementOfResult>
where Output == ElementOfResult?
{
return Publishers.Map(upstream: self) { $0 ?? output }
}
}
extension Publishers {
/// A publisher that transforms all elements from the upstream publisher with
/// a provided closure.
public struct Map<Upstream: Publisher, Output>: Publisher {
public typealias Failure = Upstream.Failure
/// The publisher from which this publisher receives elements.
public let upstream: Upstream
/// The closure that transforms elements from the upstream publisher.
public let transform: (Upstream.Output) -> Output
public init(upstream: Upstream,
transform: @escaping (Upstream.Output) -> Output) {
self.upstream = upstream
self.transform = transform
}
public func receive<Downstream: Subscriber>(subscriber: Downstream)
where Output == Downstream.Input, Downstream.Failure == Upstream.Failure
{
upstream.subscribe(Inner(downstream: subscriber, map: transform))
}
}
/// A publisher that transforms all elements from the upstream publisher
/// with a provided error-throwing closure.
public struct TryMap<Upstream: Publisher, Output>: Publisher {
public typealias Failure = Error
/// The publisher from which this publisher receives elements.
public let upstream: Upstream
/// The error-throwing closure that transforms elements from
/// the upstream publisher.
public let transform: (Upstream.Output) throws -> Output
public init(upstream: Upstream,
transform: @escaping (Upstream.Output) throws -> Output) {
self.upstream = upstream
self.transform = transform
}
}
}
extension Publishers.Map {
public func map<Result>(
_ transform: @escaping (Output) -> Result
) -> Publishers.Map<Upstream, Result> {
return .init(upstream: upstream) { transform(self.transform($0)) }
}
public func tryMap<Result>(
_ transform: @escaping (Output) throws -> Result
) -> Publishers.TryMap<Upstream, Result> {
return .init(upstream: upstream) { try transform(self.transform($0)) }
}
}
extension Publishers.TryMap {
public func receive<Downstream: Subscriber>(subscriber: Downstream)
where Output == Downstream.Input, Downstream.Failure == Error
{
upstream.subscribe(Inner(downstream: subscriber, map: transform))
}
public func map<Result>(
_ transform: @escaping (Output) -> Result
) -> Publishers.TryMap<Upstream, Result> {
return .init(upstream: upstream) { try transform(self.transform($0)) }
}
public func tryMap<Result>(
_ transform: @escaping (Output) throws -> Result
) -> Publishers.TryMap<Upstream, Result> {
return .init(upstream: upstream) { try transform(self.transform($0)) }
}
}
extension Publishers.Map {
private struct Inner<Downstream: Subscriber>
: Subscriber,
CustomStringConvertible,
CustomReflectable,
CustomPlaygroundDisplayConvertible
where Downstream.Input == Output, Downstream.Failure == Upstream.Failure
{
typealias Input = Upstream.Output
typealias Failure = Upstream.Failure
private let downstream: Downstream
private let map: (Input) -> Output
let combineIdentifier = CombineIdentifier()
fileprivate init(downstream: Downstream, map: @escaping (Input) -> Output) {
self.downstream = downstream
self.map = map
}
func receive(subscription: Subscription) {
downstream.receive(subscription: subscription)
}
func receive(_ input: Input) -> Subscribers.Demand {
return downstream.receive(map(input))
}
func receive(completion: Subscribers.Completion<Failure>) {
downstream.receive(completion: completion)
}
var description: String { return "Map" }
var customMirror: Mirror {
return Mirror(self, children: EmptyCollection())
}
var playgroundDescription: Any { return description }
}
}
extension Publishers.TryMap {
private final class Inner<Downstream: Subscriber>
: Subscriber,
Subscription,
CustomStringConvertible,
CustomReflectable,
CustomPlaygroundDisplayConvertible
where Downstream.Input == Output, Downstream.Failure == Error
{
// NOTE: This class has been audited for thread-safety
typealias Input = Upstream.Output
typealias Failure = Upstream.Failure
private let downstream: Downstream
private let map: (Input) throws -> Output
private var status = SubscriptionStatus.awaitingSubscription
private let lock = UnfairLock.allocate()
let combineIdentifier = CombineIdentifier()
fileprivate init(downstream: Downstream,
map: @escaping (Input) throws -> Output) {
self.downstream = downstream
self.map = map
}
deinit {
lock.deallocate()
}
func receive(subscription: Subscription) {
lock.lock()
guard case .awaitingSubscription = status else {
lock.unlock()
subscription.cancel()
return
}
status = .subscribed(subscription)
lock.unlock()
downstream.receive(subscription: self)
}
func receive(_ input: Input) -> Subscribers.Demand {
do {
return try downstream.receive(map(input))
} catch {
lock.lock()
let subscription = status.subscription
status = .terminal
lock.unlock()
subscription?.cancel()
downstream.receive(completion: .failure(error))
return .none
}
}
func receive(completion: Subscribers.Completion<Failure>) {
lock.lock()
guard case .subscribed = status else {
lock.unlock()
return
}
status = .terminal
lock.unlock()
downstream.receive(completion: completion.eraseError())
}
func request(_ demand: Subscribers.Demand) {
lock.lock()
guard case let .subscribed(subscription) = status else {
lock.unlock()
return
}
lock.unlock()
subscription.request(demand)
}
func cancel() {
lock.lock()
guard case let .subscribed(subscription) = status else {
lock.unlock()
return
}
status = .terminal
lock.unlock()
subscription.cancel()
}
var description: String { return "TryMap" }
var customMirror: Mirror {
return Mirror(self, children: EmptyCollection())
}
var playgroundDescription: Any { return description }
}
}