-
Notifications
You must be signed in to change notification settings - Fork 19
/
min.jl
83 lines (59 loc) · 2.09 KB
/
min.jl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
export min
import Base: min
import Base: show
"""
min(; from = nothing)
Creates a min operator, which emits a single item: the item with the smallest value.
# Arguments
- `from`: optional initial minimal value, if `nothing` first item from the source will be used as initial instead
# Producing
Stream of type `<: Subscribable{Union{L, Nothing}}` where `L` refers to type of source stream
# Examples
```jldoctest
using Rocket
source = from([ i for i in 1:42 ])
subscribe!(source |> min(), logger())
;
# output
[LogActor] Data: 1
[LogActor] Completed
```
See also: [`AbstractOperator`](@ref), [`InferableOperator`](@ref), [`ProxyObservable`](@ref), [`logger`](@ref)
"""
min(; from::D = nothing) where D = MinOperator{D}(from)
struct MinOperator{D} <: InferableOperator
from :: D
end
function on_call!(::Type{L}, ::Type{Union{L, Nothing}}, operator::MinOperator, source) where L
return proxy(Union{L, Nothing}, source, MinProxy(operator.from !== nothing ? convert(L, operator.from) : nothing))
end
operator_right(operator::MinOperator, ::Type{L}) where L = Union{L, Nothing}
struct MinProxy{D} <: ActorProxy
from :: D
end
actor_proxy!(::Type{Union{L, Nothing}}, proxy::MinProxy, actor::A) where { L, A } = MinActor{L, A}(actor, MinActorProps{L}(proxy.from))
mutable struct MinActorProps{L}
current :: Union{L, Nothing}
end
struct MinActor{L, A} <: Actor{L}
actor :: A
props :: MinActorProps{L}
end
getcurrent(actor::MinActor) = actor.props.current
setcurrent!(actor::MinActor, value) = actor.props.current = value
function on_next!(actor::MinActor{L}, data::L) where L
current = getcurrent(actor)
if current === nothing || data < current
setcurrent!(actor, data)
end
end
function on_error!(actor::MinActor, err)
error!(actor.actor, err)
end
function on_complete!(actor::MinActor)
next!(actor.actor, getcurrent(actor))
complete!(actor.actor)
end
Base.show(io::IO, ::MinOperator) = print(io, "MinOperator()")
Base.show(io::IO, ::MinProxy) = print(io, "MinProxy()")
Base.show(io::IO, ::MinActor{L}) where L = print(io, "MinActor($L)")