-
Notifications
You must be signed in to change notification settings - Fork 19
/
discontinue.jl
90 lines (66 loc) · 2.73 KB
/
discontinue.jl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
export discontinue
import Base: show
"""
discontinue()
Creates an operator, which prevents an emitting of self-depending messages and breaks a possible infinite loop.
Does nothing if observable scheduled asynchronously.
# Producing
Stream of type `<: Subscribable{L}` where `L` refers to type of source stream
# Examples
```jldoctest
using Rocket
s = BehaviorSubject(0)
subscription1 = subscribe!(s, logger())
subscription2 = subscribe!(s |> map(Int, d -> d + 1) |> discontinue(), s)
;
# output
[LogActor] Data: 0
[LogActor] Data: 1
```
See also: [`AbstractOperator`](@ref), [`RightTypedOperator`](@ref), [`ProxyObservable`](@ref), [`logger`](@ref)
"""
discontinue() = DiscontinueOperator()
struct DiscontinueOperator <: InferableOperator end
function on_call!(::Type{L}, ::Type{L}, operator::DiscontinueOperator, source) where L
return proxy(L, source, DiscontinueProxy())
end
operator_right(::DiscontinueOperator, ::Type{L}) where L = L
struct DiscontinueProxy <: ActorProxy end
actor_proxy!(::Type{L}, proxy::DiscontinueProxy, actor::A) where { L, A } = DiscontinueActor{L, A}(actor)
mutable struct DiscontinueActor{L, A} <: Actor{L}
actor :: A
isnextpropagated :: Bool
iserrorpropagated :: Bool
iscompletepropagated :: Bool
DiscontinueActor{L, A}(actor::A) where { L, A } = new(actor, false, false, false)
end
isnextpropagated(actor::DiscontinueActor) = actor.isnextpropagated
iserrorpropagated(actor::DiscontinueActor) = actor.iserrorpropagated
iscompletepropagated(actor::DiscontinueActor) = actor.iscompletepropagated
setnextpropagated!(actor::DiscontinueActor, value::Bool) = actor.isnextpropagated = value
seterrorpropagated!(actor::DiscontinueActor, value::Bool) = actor.iserrorpropagated = value
setcompletepropagated!(actor::DiscontinueActor, value::Bool) = actor.iscompletepropagated = value
function on_next!(actor::DiscontinueActor{L}, data::L) where L
if !isnextpropagated(actor)
setnextpropagated!(actor, true)
next!(actor.actor, data)
setnextpropagated!(actor, false)
end
end
function on_error!(actor::DiscontinueActor, err)
if !iserrorpropagated(actor)
seterrorpropagated!(actor, true)
error!(actor.actor, err)
seterrorpropagated!(actor, false)
end
end
function on_complete!(actor::DiscontinueActor)
if !iscompletepropagated(actor)
setcompletepropagated!(actor, true)
complete!(actor.actor)
setcompletepropagated!(actor, false)
end
end
Base.show(io::IO, ::DiscontinueOperator) = print(io, "DiscontinueOperator()")
Base.show(io::IO, ::DiscontinueProxy) = print(io, "DiscontinueProxy()")
Base.show(io::IO, ::DiscontinueActor{L}) where L = print(io, "DiscontinueActor($L)")