-
Notifications
You must be signed in to change notification settings - Fork 19
/
delay.jl
134 lines (102 loc) · 4.39 KB
/
delay.jl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
export delay
import Base: close
import Base: show
"""
delay(delay::Int)
Creates a delay operator, which delays the emission of items from the source Observable
by a given timeout.
# Arguments:
- `delay::Int`: the delay duration in milliseconds (a number) until which the emission of the source items is delayed.
# Producing
Stream of type `<: Subscribable{L}` where `L` refers to type of source stream
See also: [`AbstractOperator`](@ref), [`InferableOperator`](@ref), [`ProxyObservable`](@ref)
"""
delay(delay::Int) = DelayOperator(delay)
struct DelayOperator <: InferableOperator
delay :: Int
end
function on_call!(::Type{L}, ::Type{L}, operator::DelayOperator, source) where L
return proxy(L, source, DelayProxy(operator.delay))
end
operator_right(operator::DelayOperator, ::Type{L}) where L = L
struct DelayProxy <: ActorSourceProxy
delay :: Int
end
actor_proxy!(::Type{L}, proxy::DelayProxy, actor::A) where { L, A } = DelayActor{L, A}(proxy.delay, actor)
source_proxy!(::Type{L}, proxy::DelayProxy, source::S) where { L, S } = DelayObservable{L, S}(source)
struct DelayDataMessage{L}
data :: L
end
struct DelayErrorMessage
err
end
struct DelayCompleteMessage end
const DelayMessage{L} = Union{DelayDataMessage{L}, DelayErrorMessage, DelayCompleteMessage}
struct DelayQueueItem{L}
message :: DelayMessage{L}
emmited_at :: Float64
end
struct DelayCompletionException <: Exception end
mutable struct DelayActor{L, A} <: Actor{L}
is_cancelled :: Bool
delay :: Int
actor :: A
channel :: Channel{DelayQueueItem{L}}
DelayActor{L, A}(delay::Int, actor::A) where { L, A } = begin
channel = Channel{DelayQueueItem{L}}(Inf)
self = new(false, delay, actor, channel)
task = @async begin
try
while !self.is_cancelled
item = take!(channel)::DelayQueueItem{L}
sleepfor = (item.emmited_at + convert(Float64, self.delay / MILLISECONDS_IN_SECOND)) - time()
if sleepfor > 0.0
sleep(sleepfor)
else
yield()
end
if !self.is_cancelled
__process_delayed_message(self, item.message)
end
end
catch err
if !(err isa DelayCompletionException)
__process_delayed_message(self, DelayErrorMessage(err))
end
end
end
bind(channel, task)
return self
end
end
__process_delayed_message(actor::DelayActor{L}, message::DelayDataMessage{L}) where L = next!(actor.actor, message.data)
__process_delayed_message(actor::DelayActor, message::DelayErrorMessage) = begin error!(actor.actor, message.err); close(actor); end
__process_delayed_message(actor::DelayActor, message::DelayCompleteMessage) = begin complete!(actor.actor); close(actor); end
on_next!(actor::DelayActor{L}, data::L) where L = put!(actor.channel, DelayQueueItem{L}(DelayDataMessage{L}(data), time()))
on_error!(actor::DelayActor{L}, err) where L = put!(actor.channel, DelayQueueItem{L}(DelayErrorMessage(err), time()))
on_complete!(actor::DelayActor{L}) where L = put!(actor.channel, DelayQueueItem{L}(DelayCompleteMessage(), time()))
Base.close(actor::DelayActor) = close(actor.channel, DelayCompletionException())
@subscribable struct DelayObservable{L, S} <: Subscribable{L}
source :: S
end
function on_subscribe!(observable::DelayObservable, actor::DelayActor)
return DelaySubscription(actor, subscribe!(observable.source, actor))
end
struct DelaySubscription <: Teardown
actor
subscription
end
as_teardown(::Type{<:DelaySubscription}) = UnsubscribableTeardownLogic()
function on_unsubscribe!(subscription::DelaySubscription)
if !subscription.actor.is_cancelled
close(subscription.actor)
end
subscription.actor.is_cancelled = true
unsubscribe!(subscription.subscription)
return nothing
end
Base.show(io::IO, ::DelayOperator) = print(io, "DelayOperator()")
Base.show(io::IO, ::DelayProxy) = print(io, "DelayProxy()")
Base.show(io::IO, ::DelayActor{L}) where L = print(io, "DelayActor($L)")
Base.show(io::IO, ::DelayObservable{L}) where L = print(io, "DelayObservable($L)")
Base.show(io::IO, ::DelaySubscription) = print(io, "DelaySubscription()")