/
error_if_empty.jl
68 lines (51 loc) · 1.74 KB
/
error_if_empty.jl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
export error_if_empty
import Base: show
"""
error_if_empty(err)
Creates a `error_if_empty` operator, which emits a given error if the source Observable completes
without emitting any next value, otherwise mirrors the source Observable.
```jldoctest
using Rocket
source = completed(Int) |> error_if_empty("Empty")
subscribe!(source, logger())
;
# output
[LogActor] Error: Empty
```
See also: [`AbstractOperator`](@ref), [`InferableOperator`](@ref), [`logger`](@ref), [`map`](@ref)
"""
error_if_empty(err) = ErrorIfEmptyOperator(err)
struct ErrorIfEmptyOperator <: InferableOperator
err
end
operator_right(operator::ErrorIfEmptyOperator, ::Type{L}) where L = L
function on_call!(::Type{L}, ::Type{L}, operator::ErrorIfEmptyOperator, source) where L
return proxy(L, source, ErrorIfEmptyProxy(operator.err))
end
struct ErrorIfEmptyProxy <: ActorProxy
err
end
actor_proxy!(::Type{L}, proxy::ErrorIfEmptyProxy, actor::A) where { L, A } = ErrorIfEmptyActor{L, A}(actor, false, proxy.err)
mutable struct ErrorIfEmptyActor{L, A} <: Actor{L}
actor :: A
is_emitted :: Bool
err
end
function on_next!(actor::ErrorIfEmptyActor{L}, data::L) where L
actor.is_emitted = true
next!(actor.actor, data)
end
function on_error!(actor::ErrorIfEmptyActor, err)
actor.is_emitted = true
error!(actor.actor, err)
end
function on_complete!(actor::ErrorIfEmptyActor)
if !actor.is_emitted
error!(actor.actor, actor.err)
else
complete!(actor.actor)
end
end
Base.show(io::IO, ::ErrorIfEmptyOperator) = print(io, "ErrorIfEmptyOperator()")
Base.show(io::IO, ::ErrorIfEmptyProxy) = print(io, "ErrorIfEmptyProxy()")
Base.show(io::IO, ::ErrorIfEmptyActor{L}) where L = print(io, "ErrorIfEmptyActor($L)")