-
Notifications
You must be signed in to change notification settings - Fork 18
/
limit_subscribers.jl
158 lines (121 loc) · 5.48 KB
/
limit_subscribers.jl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
export limit_subscribers, LimitSubscribersGuard
import Base: show
import DataStructures: isfull
"""
LimitSubscribersGuard(limit::Int = 1, exclusive = true)
Guard structure used in `limit_subscribers` operator.
# Arguments
- `limit`: number of concurrent subscribers
- `exclusive`: boolean flag, which indicates whenever this guard can be shared with other observables in other `limit_subscribers` operator. If set to `true`, reusing this guard in a different `limit_subscribers` operator for other observable will result in automatic unsubscription of all present actors.
# Note
This structure is useful in Pluto.jl notebooks in particular, allowing for automatic subscription/unsubscription of observables.
# Example
```julia
# Cell 1
guard = LimitSubscribersGuard()
# Cell 2
subscription = subscribe!(some_stream |> limit_subscribers(guard), logger())
```
See also: [`limit_subscribers`](@ref), [`subscribe!`](@ref)
"""
struct LimitSubscribersGuard
limit :: Int
exclusive :: Bool
handlers :: CircularBuffer{Tuple{Teardown, Any}}
end
LimitSubscribersGuard(limit::Int = 1, exclusive::Bool = true) = LimitSubscribersGuard(limit, exclusive, CircularBuffer{Tuple{Teardown, Any}}(limit))
Base.show(io::IO, guard::LimitSubscribersGuard) = print(io, "LimitSubscribersGuard($(getlimit(guard)), $(isexclusive(guard)))")
getlimit(guard::LimitSubscribersGuard) = guard.limit
isexclusive(guard::LimitSubscribersGuard) = guard.exclusive
gethandlers(guard::LimitSubscribersGuard) = guard.handlers
function unsubscribe_last!(guard::LimitSubscribersGuard)
if !isempty(gethandlers(guard))
subscription, actor = popfirst!(gethandlers(guard))
complete!(actor)
unsubscribe!(subscription)
end
return nothing
end
function remove_handler!(guard::LimitSubscribersGuard, subscription)
f = filter(d -> first(d) !== subscription, gethandlers(guard))
if length(f) !== length(gethandlers(guard))
empty!(gethandlers(guard))
append!(gethandlers(guard), f)
end
return nothing
end
function add_subscription!(guard::LimitSubscribersGuard, subscription::Teardown, actor)
if isfull(gethandlers(guard))
unsubscribe_last!(guard)
end
push!(gethandlers(guard), (subscription, actor))
return subscription
end
function release!(guard::LimitSubscribersGuard)
foreach(gethandlers(guard)) do handler
subscription, actor = handler
complete!(actor)
unsubscribe!(subscription)
end
empty!(gethandlers(guard))
return nothing
end
"""
limit_subscribers(limit::Int = 1, exclusive::Bool = true)
limit_subscribers(guard::LimitSubscribersGuard)
Creates an operator that limits number of concurrent actors to the given observable. On new subscription, if limit is exceeded, oldest actor is automatically unsubscribed and receives a completion event.
# Arguments
- `limit`: number of concurrent subscribers
- `exclusive`: boolean flag, which indicates whenever this guard can be shared with other observables in other `limit_subscribers` operator. If set to `true`, reusing this guard in a different `limit_subscribers` operator for other observable will result in automatic unsubscription of all present actors.
# Note
This structure is useful in Pluto.jl notebooks in particular, allowing for automatic subscription/unsubscription of observables.
# Example
```julia
# Cell 1
guard = LimitSubscribersGuard()
# Cell 2
subscription = subscribe!(some_stream |> limit_subscribers(guard), logger())
```
See also: [`LimitSubscribersGuard`](@ref)
"""
limit_subscribers(limit::Int = 1, exclusive::Bool = true) = limit_subscribers(LimitSubscribersGuard(limit, exclusive))
limit_subscribers(guard::LimitSubscribersGuard) = LimitSubscribersOperator(guard)
struct LimitSubscribersOperator <: InferableOperator
guard :: LimitSubscribersGuard
end
operator_right(::LimitSubscribersOperator, ::Type{L}) where L = L
function on_call!(::Type{L}, ::Type{L}, operator::LimitSubscribersOperator, source) where L
if isexclusive(operator.guard)
release!(operator.guard)
end
return proxy(L, source, LimitSubscribersProxy(operator.guard))
end
struct LimitSubscribersProxy <: SourceProxy
guard :: LimitSubscribersGuard
end
source_proxy!(::Type{L}, proxy::LimitSubscribersProxy, source::S) where { L, S } = LimitSubscribersSource{L, S}(source, proxy.guard)
struct LimitSubscribersSubscription{S} <: Teardown
subscription :: S
guard :: LimitSubscribersGuard
end
as_teardown(::Type{ <: LimitSubscribersSubscription }) = UnsubscribableTeardownLogic()
function on_unsubscribe!(subscription::LimitSubscribersSubscription)
remove_handler!(subscription.guard, subscription.subscription)
unsubscribe!(subscription.subscription)
return nothing
end
struct LimitSubscribersSource{L, S} <: Subscribable{L}
source :: S
guard :: LimitSubscribersGuard
end
function on_subscribe!(source::LimitSubscribersSource, actor)
guard = source.guard
if isfull(gethandlers(guard))
unsubscribe_last!(guard)
end
return LimitSubscribersSubscription(add_subscription!(guard, subscribe!(source.source, actor), actor), guard)
end
Base.show(io::IO, ::LimitSubscribersOperator) = print(io, "LimitSubscribersOperator()")
Base.show(io::IO, ::LimitSubscribersProxy) = print(io, "LimitSubscribersProxy()")
Base.show(io::IO, ::LimitSubscribersSource{L}) where L = print(io, "LimitSubscribersSource($L)")
Base.show(io::IO, ::LimitSubscribersSubscription) = print(io, "LimitSubscribersSubscription()")