-
Notifications
You must be signed in to change notification settings - Fork 1
/
SlowSink.swift
110 lines (94 loc) · 3.54 KB
/
SlowSink.swift
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
//
// SlowSink.swift
// NameListTests
//
// Created by Paul Wood on 7/31/19.
// Copyright © 2019 Paul Wood. All rights reserved.
//
import Foundation
import Combine
/// A simple subscriber that requests an unlimited number of values upon subscription.
/// creates a delayeach time an element is received using `usleep()`
/// Thanks for OpenCombine for providing the boilerplate needed for a Subscriber
public final class SlowSink<Input, Failure: Error>
: Subscriber,
Cancellable,
CustomStringConvertible,
CustomReflectable,
CustomPlaygroundDisplayConvertible
{
/// The closure to execute on receipt of a value.
public let receiveValue: (Input) -> Void
/// The closure to execute on completion.
public let receiveCompletion: (Subscribers.Completion<Failure>) -> Void
public let slow: UInt32
private var _upstreamSubscription: Subscription?
public var description: String { return "SlowSink" }
public var customMirror: Mirror {
return Mirror(self, children: EmptyCollection())
}
public var playgroundDescription: Any { return description }
/// Initializes a sink with the provided closures.
///
/// - Parameters:
/// - slowBy: The time in milliseconds to delay the receiveValue to create artificial backpressure
/// - receiveValue: The closure to execute on receipt of a value. If `nil`,
/// the sink uses an empty closure.
/// - receiveCompletion: The closure to execute on completion. If `nil`,
/// the sink uses an empty closure.
public init(slowBy slow: UInt32 = 1,
receiveCompletion: ((Subscribers.Completion<Failure>) -> Void)? = nil,
receiveValue: @escaping ((Input) -> Void)) {
self.receiveCompletion = receiveCompletion ?? { _ in }
self.receiveValue = receiveValue
self.slow = slow
}
public func receive(subscription: Subscription) {
if _upstreamSubscription == nil {
_upstreamSubscription = subscription
subscription.request(.unlimited) // What happens when we return `.none` or `.max(3)`?
} else {
subscription.cancel()
}
}
public func receive(_ value: Input) -> Subscribers.Demand {
// What happens if this defer is added? How does that affect the order of work upstream?
// defer {
sleep(slow)
// }
receiveValue(value)
return Subscribers.Demand.max(1)
}
public func receive(completion: Subscribers.Completion<Failure>) {
receiveCompletion(completion)
}
public func cancel() {
_upstreamSubscription?.cancel()
_upstreamSubscription = nil
}
}
extension Publisher {
/// Attaches a subscriber with closure-based behavior.
///
/// This method creates the subscriber and immediately requests
/// an unlimited number of values, prior to returning the subscriber.
/// - parameter receiveValue: The closure to execute on receipt of a value.
/// If `nil`, the sink uses an empty closure.
/// - parameter receiveComplete: The closure to execute on completion.
/// If `nil`, the sink uses an empty closure.
/// - Returns: A cancellable instance; used when you end assignment
/// of the received value. Deallocation of the result will tear down
/// the subscription stream.
public func slowSink(
slowBy: UInt32,
receiveCompletion: @escaping (Subscribers.Completion<Failure>) -> Void,
receiveValue: @escaping ((Output) -> Void)
) -> AnyCancellable {
let subscriber = SlowSink<Output, Failure>(
receiveCompletion: receiveCompletion,
receiveValue: receiveValue
)
subscribe(subscriber)
return AnyCancellable(subscriber)
}
}