-
Notifications
You must be signed in to change notification settings - Fork 18
/
single.jl
62 lines (43 loc) · 1.6 KB
/
single.jl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
export SingleObservable, of
import Base: ==
import Base: show
"""
SingleObservable{D, H}(value::D, scheduler::H)
SingleObservable wraps single value of type `D` into a observable.
# Constructor arguments
- `value`: a single value to emit
- `scheduler`: scheduler-like object
# See also: [`of`](@ref), [`Subscribable`](@ref)
"""
@subscribable struct SingleObservable{D, H} <: ScheduledSubscribable{D}
value :: D
scheduler :: H
end
getrecent(observable::SingleObservable) = observable.value
getscheduler(observable::SingleObservable) = observable.scheduler
function on_subscribe!(observable::SingleObservable, actor, scheduler)
next!(actor, observable.value, scheduler)
complete!(actor, scheduler)
return voidTeardown
end
"""
of(x; scheduler::H = AsapScheduler()) where { H <: AbstractScheduler }
Creation operator for the `SingleObservable` that emits a single value x and then completes.
# Arguments
- `x`: value to be emmited before completion
# Examples
```jldoctest
using Rocket
source = of(1)
subscribe!(source, logger())
;
# output
[LogActor] Data: 1
[LogActor] Completed
```
See also: [`SingleObservable`](@ref), [`subscribe!`](@ref), [`logger`](@ref)
"""
of(x::T; scheduler::H = AsapScheduler()) where { T, H <: AbstractScheduler } = SingleObservable{T, H}(x, scheduler)
Base.:(==)(left::SingleObservable{D, H}, right::SingleObservable{D, H}) where { D, H } = left.value == right.value
Base.:(==)(left::SingleObservable, right::SingleObservable) = false
Base.show(io::IO, ::SingleObservable{D, H}) where { D, H } = print(io, "SingleObservable($D, $H)")