/
generate.jl
59 lines (45 loc) · 1.66 KB
/
generate.jl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
export generate
import Base: show
"""
generate(initial::D, condition::C, iterator::I; scheduler::H = AsapScheduler()) where { D, C, I, H <: AbstractScheduler }
Generates an observable sequence by running a state-driven loop producing the sequence's elements, using the specified scheduler to send out observer messages.
# Arguments
- `initial`: initial state
- `condition`: condition to terminate generation (upon returning false)
- `iterator`: iteration step function
- `scheduler`: optional, scheduler-like object
# Note
`iterator` object should return objects of the same type as `initial`.
# Examples
```jldoctest
using Rocket
source = generate(1, x -> x < 3, x -> x + 1)
subscribe!(source, logger())
;
# output
[LogActor] Data: 1
[LogActor] Data: 2
[LogActor] Completed
```
See also: [`ScheduledSubscribable`](@ref), [`subscribe!`](@ref)
"""
function generate(initial::D, condition::C, iterator::I; scheduler::H = AsapScheduler()) where { D, C, I, H <: AbstractScheduler }
return GenerateObservable{D, C, I, H}(initial, condition, iterator, scheduler)
end
@subscribable struct GenerateObservable{D, C, I, H} <: ScheduledSubscribable{D}
initial :: D
condition :: C
iterator :: I
scheduler :: H
end
getscheduler(observable::GenerateObservable) = observable.scheduler
function on_subscribe!(observable::GenerateObservable, actor, scheduler)
value = observable.initial
while observable.condition(value)
next!(actor, value, scheduler)
value = observable.iterator(value)
end
complete!(actor, scheduler)
return voidTeardown
end
Base.show(io::IO, observable::GenerateObservable{D}) where D = print(io, "GenerateObservable($D)")