-
Notifications
You must be signed in to change notification settings - Fork 18
/
to_array.jl
62 lines (41 loc) · 1.63 KB
/
to_array.jl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
export to_array
import Base: show
"""
to_array()
Creates a `to_array` operator, which reduces all values into a single array and returns this result when the source completes.
# Producing
Stream of type `<: Subscribable{Vector{L}}` where `L` refers to type of source stream
# Examples
```jldoctest
using Rocket
source = from([ 1, 2, 3 ])
subscribe!(source |> to_array(), logger())
;
# output
[LogActor] Data: [1, 2, 3]
[LogActor] Completed
```
See also: [`AbstractOperator`](@ref), [`InferableOperator`](@ref), [`ProxyObservable`](@ref), [`logger`](@ref)
"""
to_array() = ToArrayOperator()
struct ToArrayOperator <: InferableOperator end
function on_call!(::Type{L}, ::Type{Vector{L}}, operator::ToArrayOperator, source) where L
return proxy(Vector{L}, source, ToArrayProxy())
end
operator_right(operator::ToArrayOperator, ::Type{L}) where L = Vector{L}
struct ToArrayProxy <: ActorProxy end
actor_proxy!(::Type{Vector{L}}, proxy::ToArrayProxy, actor::A) where { L, A } = ToArrayActor{L, A}(actor)
struct ToArrayActor{L, A} <: Actor{L}
values :: Vector{L}
actor :: A
ToArrayActor{L, A}(actor::A) where { L, A } = new(Vector{L}(), actor)
end
on_next!(actor::ToArrayActor, data::L) where L = push!(actor.values, data)
on_error!(actor::ToArrayActor, err) = error!(actor.actor, err)
function on_complete!(actor::ToArrayActor)
next!(actor.actor, actor.values)
complete!(actor.actor)
end
Base.show(io::IO, ::ToArrayOperator) = print(io, "ToArrayOperator()")
Base.show(io::IO, ::ToArrayProxy) = print(io, "ToArrayProxy()")
Base.show(io::IO, ::ToArrayActor{L}) where L = print(io, "ToArrayActor($L)")