-
Notifications
You must be signed in to change notification settings - Fork 19
/
async.jl
124 lines (93 loc) · 3.73 KB
/
async.jl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
export AsyncScheduler
import Base: show, similar
"""
AsyncScheduler
`AsyncScheduler` executes scheduled actions asynchronously and uses `Channel` object to order different actions on a single asynchronous task
"""
struct AsyncScheduler{N} <: AbstractScheduler end
Base.show(io::IO, ::AsyncScheduler) = print(io, "AsyncScheduler()")
function AsyncScheduler(size::Int = typemax(Int))
return AsyncScheduler{size}()
end
Base.similar(::AsyncScheduler{N}) where N = AsyncScheduler{N}()
makeinstance(::Type{D}, ::AsyncScheduler{N}) where { D, N } = AsyncSchedulerInstance{D}(N)
instancetype(::Type{D}, ::Type{<:AsyncScheduler}) where D = AsyncSchedulerInstance{D}
struct AsyncSchedulerDataMessage{D}
data :: D
end
struct AsyncSchedulerErrorMessage
err
end
struct AsyncSchedulerCompleteMessage end
const AsyncSchedulerMessage{D} = Union{AsyncSchedulerDataMessage{D}, AsyncSchedulerErrorMessage, AsyncSchedulerCompleteMessage}
mutable struct AsyncSchedulerInstance{D}
channel :: Channel{AsyncSchedulerMessage{D}}
isunsubscribed :: Bool
subscription :: Teardown
AsyncSchedulerInstance{D}(size::Int = typemax(Int)) where D = begin
return new(Channel{AsyncSchedulerMessage{D}}(size), false, voidTeardown)
end
end
isunsubscribed(instance::AsyncSchedulerInstance) = instance.isunsubscribed
getchannel(instance::AsyncSchedulerInstance) = instance.channel
function dispose(instance::AsyncSchedulerInstance)
if !isunsubscribed(instance)
instance.isunsubscribed = true
close(instance.channel)
@async begin
unsubscribe!(instance.subscription)
end
end
end
function __process_channeled_message(instance::AsyncSchedulerInstance{D}, message::AsyncSchedulerDataMessage{D}, actor) where D
on_next!(actor, message.data)
end
function __process_channeled_message(instance::AsyncSchedulerInstance, message::AsyncSchedulerErrorMessage, actor)
on_error!(actor, message.err)
dispose(instance)
end
function __process_channeled_message(instance::AsyncSchedulerInstance, message::AsyncSchedulerCompleteMessage, actor)
on_complete!(actor)
dispose(instance)
end
struct AsyncSchedulerSubscription{ H <: AsyncSchedulerInstance } <: Teardown
instance :: H
end
Base.show(io::IO, ::AsyncSchedulerSubscription) = print(io, "AsyncSchedulerSubscription()")
as_teardown(::Type{ <: AsyncSchedulerSubscription}) = UnsubscribableTeardownLogic()
function on_unsubscribe!(subscription::AsyncSchedulerSubscription)
dispose(subscription.instance)
return nothing
end
function scheduled_subscription!(source, actor, instance::AsyncSchedulerInstance)
subscription = AsyncSchedulerSubscription(instance)
channeling_task = @async begin
while !isunsubscribed(instance)
message = take!(getchannel(instance))
if !isunsubscribed(instance)
__process_channeled_message(instance, message, actor)
end
end
end
subscription_task = @async begin
if !isunsubscribed(instance)
tmp = on_subscribe!(source, actor, instance)
if !isunsubscribed(instance)
subscription.instance.subscription = tmp
else
unsubscribe!(tmp)
end
end
end
bind(getchannel(instance), channeling_task)
return subscription
end
function scheduled_next!(actor, value::D, instance::AsyncSchedulerInstance{D}) where { D }
put!(getchannel(instance), AsyncSchedulerDataMessage{D}(value))
end
function scheduled_error!(actor, err, instance::AsyncSchedulerInstance)
put!(getchannel(instance), AsyncSchedulerErrorMessage(err))
end
function scheduled_complete!(actor, instance::AsyncSchedulerInstance)
put!(getchannel(instance), AsyncSchedulerCompleteMessage())
end