-
Notifications
You must be signed in to change notification settings - Fork 18
/
proxy.jl
181 lines (134 loc) · 10.4 KB
/
proxy.jl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
export ActorProxy, SourceProxy, ActorSourceProxy
export actor_proxy!, source_proxy!
export ProxyObservable, proxy
import Base: show
"""
ActorProxy
Can be used as a super type for common proxy object. Automatically specifies `ValidActorProxy` trait behavior. Each `ActorProxy` must implement
its own method for `actor_proxy!(::Type, proxy, actor)` function which have to return a valid actor object.
See also: [`proxy`](@ref), [`actor_proxy!`](@ref)
"""
abstract type ActorProxy end
"""
SourceProxy
Can be used as a super type for common proxy object. Automatically specifies `ValidSourceProxy` trait behavior. Each `SourceProxy` must implement
its own method for `source_proxy!(::Type, proxy, source)` function which have to return a valid subscribable object.
See also: [`proxy`](@ref), [`source_proxy!`](@ref)
"""
abstract type SourceProxy end
"""
ActorSourceProxy
Can be used as a super type for common proxy object. Automatically specifies `ValidActorSourceProxy` trait behavior. Each `ActorSourceProxy` must implement
its own method for `source_proxy!(::Type, proxy, source)` function which have to return a valid subscribable object and also for `actor_proxy!(::Type, proxy, actor)` function which have to return a valid actor object..
See also: [`proxy`](@ref), [`actor_proxy!`](@ref), [`source_proxy!`](@ref)
"""
abstract type ActorSourceProxy end
abstract type ProxyTrait end
struct ValidActorProxy <: ProxyTrait end
struct ValidSourceProxy <: ProxyTrait end
struct ValidActorSourceProxy <: ProxyTrait end
struct InvalidProxy <: ProxyTrait end
as_proxy(::Type) = InvalidProxy()
as_proxy(::Type{ <: ActorProxy }) = ValidActorProxy()
as_proxy(::Type{ <: SourceProxy }) = ValidSourceProxy()
as_proxy(::Type{ <: ActorSourceProxy }) = ValidActorSourceProxy()
as_proxy(::O) where O = as_proxy(O)
call_actor_proxy!(::Type{L}, proxy::T, actor::A) where { L, T, A } = call_actor_proxy!(L, as_proxy(T), as_actor(A), proxy, actor)
call_source_proxy!(::Type{L}, proxy::T, source::S) where { L, T, S } = call_source_proxy!(L, as_proxy(T), as_subscribable(S), proxy, source)
# Check for invalid actor trait usages
call_actor_proxy!(::Type, ::InvalidProxy, ::InvalidActorTrait, proxy, actor) = error("Type $(typeof(proxy)) is not a valid proxy type. \nConsider extending your type with one of the ActorProxy, SourceProxy or ActorSourceProxy abstract types or implement as_proxy(::Type{<:$(typeof(proxy))}).")
call_actor_proxy!(::Type, ::InvalidProxy, _, proxy, actor) = error("Type $(typeof(proxy)) is not a valid proxy type. \nConsider extending your type with one of the ActorProxy, SourceProxy or ActorSourceProxy abstract types or implement as_proxy(::Type{<:$(typeof(proxy))}).")
call_actor_proxy!(::Type, _, ::InvalidActorTrait, proxy, actor) = error("Type $(typeof(actor)) is not a valid actor type. \nConsider extending your actor with one of the abstract actor types <: (Actor{T}, NextActor{T}, ErrorActor{T}, CompletionActor{T}) or implement as_actor(::Type{<:$(typeof(actor))}).")
# Invoke valid proxy and actor
call_actor_proxy!(::Type{L}, ::ValidActorProxy, ::BaseActorTrait{D}, proxy, actor) where { L, D } = actor_proxy!(L, proxy, actor)
call_actor_proxy!(::Type{L}, ::ValidActorProxy, ::ErrorActorTrait{D}, proxy, actor) where { L, D } = actor_proxy!(L, proxy, actor)
call_actor_proxy!(::Type{L}, ::ValidActorProxy, ::NextActorTrait{D}, proxy, actor) where { L, D } = actor_proxy!(L, proxy, actor)
call_actor_proxy!(::Type{L}, ::ValidActorProxy, ::CompletionActorTrait{D}, proxy, actor) where { L, D } = actor_proxy!(L, proxy, actor)
call_actor_proxy!(::Type{L}, ::ValidSourceProxy, ::BaseActorTrait{D}, proxy, actor) where { L, D } = actor
call_actor_proxy!(::Type{L}, ::ValidSourceProxy, ::NextActorTrait{D}, proxy, actor) where { L, D } = actor
call_actor_proxy!(::Type{L}, ::ValidSourceProxy, ::ErrorActorTrait{D}, proxy, actor) where { L, D } = actor
call_actor_proxy!(::Type{L}, ::ValidSourceProxy, ::CompletionActorTrait{D}, proxy, actor) where { L, D } = actor
call_actor_proxy!(::Type{L}, ::ValidActorSourceProxy, ::BaseActorTrait{D}, proxy, actor) where { L, D } = actor_proxy!(L, proxy, actor)
call_actor_proxy!(::Type{L}, ::ValidActorSourceProxy, ::NextActorTrait{D}, proxy, actor) where { L, D } = actor_proxy!(L, proxy, actor)
call_actor_proxy!(::Type{L}, ::ValidActorSourceProxy, ::ErrorActorTrait{D}, proxy, actor) where { L, D } = actor_proxy!(L, proxy, actor)
call_actor_proxy!(::Type{L}, ::ValidActorSourceProxy, ::CompletionActorTrait{D}, proxy, actor) where { L, D } = actor_proxy!(L, proxy, actor)
# Check for invalid subscribable trait usages
call_source_proxy!(::Type, ::InvalidProxy, ::InvalidSubscribableTrait, proxy, source) = error("Type $(typeof(proxy)) is not a valid proxy type. \nConsider extending your type with one of the ActorProxy, SourceProxy or ActorSourceProxy abstract types or implement as_proxy(::Type{<:$(typeof(proxy))}).")
call_source_proxy!(::Type, ::InvalidProxy, _, proxy, source) = error("Type $(typeof(proxy)) is not a valid proxy type. \nConsider extending your type with one of the ActorProxy, SourceProxy or ActorSourceProxy abstract types or implement as_proxy(::Type{<:$(typeof(proxy))}).")
call_source_proxy!(::Type, _, ::InvalidSubscribableTrait, proxy, source) = error("Type $(typeof(source)) is not a valid subscribable type. \nConsider extending your subscribable with Subscribable{T} abstract type or implement as_subscribable(::Type{<:$(typeof(source))}).")
# Invoke valid proxy and source
call_source_proxy!(::Type{L}, ::ValidActorProxy, ::SimpleSubscribableTrait{D}, proxy, source) where { L, D } = source
call_source_proxy!(::Type{L}, ::ValidActorProxy, ::ScheduledSubscribableTrait{D}, proxy, source) where { L, D } = source
call_source_proxy!(::Type{L}, ::ValidSourceProxy, ::SimpleSubscribableTrait{D}, proxy, source) where { L, D } = source_proxy!(L, proxy, source)
call_source_proxy!(::Type{L}, ::ValidSourceProxy, ::ScheduledSubscribableTrait{D}, proxy, source) where { L, D } = source_proxy!(L, proxy, source)
call_source_proxy!(::Type{L}, ::ValidActorSourceProxy, ::SimpleSubscribableTrait{D}, proxy, source) where { L, D } = source_proxy!(L, proxy, source)
call_source_proxy!(::Type{L}, ::ValidActorSourceProxy, ::ScheduledSubscribableTrait{D}, proxy, source) where { L, D } = source_proxy!(L, proxy, source)
"""
ProxyObservable{L, S, P}(proxied_source::S, proxy::P)
An interface for proxied Observables.
See also: [`proxy`](@ref)
"""
@subscribable struct ProxyObservable{L, S, P} <: Subscribable{L}
proxied_source :: S
proxy :: P
end
# TODO Rocket 2.0: Some operators implement custom `getrecent` logic, but in general it is not possible
# The `getrecent` is a hidden feature, that is not documented neither exported. It is not a part of the public API and may (read will) change
# in the future. We should consider `getrecent` seriously and we depend on this functionlity currently in the `ReactiveMP.jl` package
getrecent(proxy::ProxyObservable) = getrecent(proxy.proxied_source, proxy.proxy)
function on_subscribe!(observable::ProxyObservable{L}, actor) where L
return subscribe!(observable.proxied_source, call_actor_proxy!(L, observable.proxy, actor))
end
"""
actor_proxy!(::Type, proxy, actor)
This is function is used to wrap an actor with its proxied version given a particular proxy object. Must return another actor.
Each valid `ActorProxy` and `ActorSourceProxy` must implement its own method for `actor_proxy!` function. The first argument is the same as
the type of data of the connected proxy observable.
See also: [`proxy`](@ref), [`ActorProxy`](@ref), [`ActorSourceProxy`](@ref)
"""
actor_proxy!(L, proxy, actor) = error("You probably forgot to implement actor_proxy!(::Type, proxy::$(typeof(proxy)), actor::$(typeof(actor)))")
"""
source_proxy!(::Type, proxy, source)
This is function is used to wrap a source with its proxied version given a particular proxy object. Must return another Observable.
Each valid `SourceProxy` and `ActorSourceProxy` must implement its own method for `source_proxy!` function. The first argument is the same as
the type of data of the connected proxy observable.
See also: [`proxy`](@ref), [`SourceProxy`](@ref), [`ActorSourceProxy`](@ref)
"""
source_proxy!(L, proxy, source) = error("You probably forgot to implement source_proxy!(::Type, proxy::$(typeof(proxy)), source::$(typeof(source)))")
Base.show(io::IO, observable::ProxyObservable{L}) where L = print(io, "ProxyObservable($L, $(observable.proxy))")
# -------------------------------- #
# Helpers #
# -------------------------------- #
"""
proxy(::Type{L}, source, proxy) where L
Creation operator for the `ProxyObservable` with a given source and proxy objects.
# Example
```jldoctest
using Rocket
source = from(1:5)
struct MyCustomProxy <: ActorProxy end
struct MyCustomActor{A} <: Actor{Int}
actor :: A
end
Rocket.on_next!(actor::MyCustomActor, data::Int) = next!(actor.actor, data ^ 2)
Rocket.on_error!(actor::MyCustomActor, err) = error!(actor.actor, err)
Rocket.on_complete!(actor::MyCustomActor) = complete!(actor.actor)
Rocket.actor_proxy!(::Type{Int}, proxy::MyCustomProxy, actor::A) where A = MyCustomActor{A}(actor)
proxied = proxy(Int, source, MyCustomProxy())
subscribe!(proxied, logger())
;
# output
[LogActor] Data: 1
[LogActor] Data: 4
[LogActor] Data: 9
[LogActor] Data: 16
[LogActor] Data: 25
[LogActor] Completed
```
See also: [`ProxyObservable`](@ref), [`ActorProxy`](@ref), [`SourceProxy`](@ref), [`ActorSourceProxy`](@ref)
"""
proxy(::Type{L}, source, proxy) where L = as_proxy_observable(L, call_source_proxy!(L, proxy, source), proxy)
as_proxy_observable(::Type{L}, proxied_source::S, proxy) where { L, S } = as_proxy_observable(L, as_subscribable(S), proxied_source, proxy)
as_proxy_observable(::Type{L}, ::InvalidSubscribableTrait, proxied_source, _) where L = throw(InvalidSubscribableTraitUsageError(proxied_source))
as_proxy_observable(::Type{L}, ::SimpleSubscribableTrait, proxied_source::S, proxy::P) where { L, S, P } = ProxyObservable{L, S, P}(proxied_source, proxy)
as_proxy_observable(::Type{L}, ::ScheduledSubscribableTrait, proxied_source::S, proxy::P) where { L, S, P } = ProxyObservable{L, S, P}(proxied_source, proxy)