-
Notifications
You must be signed in to change notification settings - Fork 19
/
tap_on_complete.jl
67 lines (47 loc) · 1.97 KB
/
tap_on_complete.jl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
export tap_on_complete
import Base: show
"""
tap_on_complete(tapFn::F) where { F <: Function }
Creates a tap operator, which performs a side effect
for only complete emission on the source Observable, but return an Observable that is identical to the source.
# Arguments
- `tapFn::Function`: side-effect tap function with `() -> Nothing` signature
# Producing
Stream of type `<: Subscribable{L}` where `L` refers to type of source stream
# Examples
```jldoctest
using Rocket
source = from([ 1, 2, 3 ])
subscribe!(source |> tap_on_complete(() -> println("Complete event received")), logger())
;
# output
[LogActor] Data: 1
[LogActor] Data: 2
[LogActor] Data: 3
[LogActor] Completed
Complete event received
```
See also: [`tap_on_subscribe`](@ref), [`tap`](@ref), [`logger`](@ref)
"""
tap_on_complete(tapFn::F) where { F <: Function } = TapOnCompleteOperator{F}(tapFn)
struct TapOnCompleteOperator{F} <: InferableOperator
tapFn :: F
end
function on_call!(::Type{L}, ::Type{L}, operator::TapOnCompleteOperator{F}, source) where { L, F }
return proxy(L, source, TapOnCompleteProxy{F}(operator.tapFn))
end
operator_right(operator::TapOnCompleteOperator, ::Type{L}) where L = L
struct TapOnCompleteProxy{F} <: ActorProxy
tapFn :: F
end
actor_proxy!(::Type{L}, proxy::TapOnCompleteProxy{F}, actor::A) where { L, A, F } = TapOnCompleteActor{L, A, F}(proxy.tapFn, actor)
struct TapOnCompleteActor{L, A, F} <: Actor{L}
tapFn :: F
actor :: A
end
on_next!(actor::TapOnCompleteActor{L}, data::L) where L = next!(actor.actor, data)
on_error!(actor::TapOnCompleteActor, err) = error!(actor.actor, err)
on_complete!(actor::TapOnCompleteActor) = begin complete!(actor.actor); actor.tapFn(); end
Base.show(io::IO, ::TapOnCompleteOperator) = print(io, "TapOnCompleteOperator()")
Base.show(io::IO, ::TapOnCompleteProxy) = print(io, "TapOnCompleteProxy()")
Base.show(io::IO, ::TapOnCompleteActor{L}) where L = print(io, "TapOnCompleteActor($L)")