-
Notifications
You must be signed in to change notification settings - Fork 18
/
pending.jl
114 lines (85 loc) · 3.63 KB
/
pending.jl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
export PendingSubject, PendingSubjectFactory
import Base: show, similar
"""
PendingSubject(::Type{D}) where D
PendingSubject(::Type{D}, factory::F) where { D, F <: AbstractFactory }
PendingSubject(::Type{D}, subject::S) where { D, S }
A variant of Subject that emits its last value on completion.
Reemits last value on further subscriptions and then completes.
See also: [`PendingSubjectFactory`](@ref), [`Subject`](@ref), [`SubjectFactory`](@ref)
"""
function PendingSubject end
"""
PendingSubjectFactory(factory::F) where { F <: AbstractSubjectFactory }
PendingSubjectFactory(; scheduler::H = AsapScheduler()) where { H <: AbstractScheduler }
A variant of SubjectFactory that creates an instance of PendingSubject.
See also: [`SubjectFactory`](@ref), [`AbstractSubjectFactory`](@ref), [`PendingSubject`](@ref), [`Subject`](@ref)
"""
function PendingSubjectFactory end
##
mutable struct PendingSubjectInstance{D, S} <: AbstractSubject{D}
subject :: S
last :: Union{Nothing, D}
end
Base.show(io::IO, ::PendingSubjectInstance{D, S}) where { D, S } = print(io, "PendingSubject($D, $S)")
Base.similar(subject::PendingSubjectInstance{D, S}) where { D, S } = PendingSubject(D, similar(subject.subject))
function PendingSubject(::Type{D}) where D
return PendingSubject(D, SubjectFactory(AsapScheduler()))
end
function PendingSubject(::Type{D}, factory::F) where { D, F <: AbstractSubjectFactory }
return PendingSubject(D, create_subject(D, factory))
end
function PendingSubject(::Type{D}, subject::S) where { D, S }
return as_pending_subject(D, as_subject(S), subject)
end
as_pending_subject(::Type{D}, ::InvalidSubjectTrait, subject) where D = throw(InvalidSubjectTraitUsageError(subject))
as_pending_subject(::Type{D1}, ::ValidSubjectTrait{D2}, subject) where { D1, D2 } = throw(InconsistentSubjectDataTypesError{D1, D2}(subject))
as_pending_subject(::Type{D}, ::ValidSubjectTrait{D}, subject::S) where { D, S } = PendingSubjectInstance{D, S}(subject, nothing)
##
getlast(subject::PendingSubjectInstance) = subject.last
setlast!(subject::PendingSubjectInstance, value) = subject.last = value
##
function on_next!(subject::PendingSubjectInstance{D}, data::D) where D
if isactive(subject.subject)
setlast!(subject, data)
end
end
function on_error!(subject::PendingSubjectInstance, err)
if isactive(subject.subject)
error!(subject.subject, err)
end
end
function on_complete!(subject::PendingSubjectInstance)
if isactive(subject.subject)
last = getlast(subject)
if last !== nothing
next!(subject.subject, last)
end
complete!(subject.subject)
end
end
##
function on_subscribe!(subject::PendingSubjectInstance, actor)
if iscompleted(subject.subject)
last = getlast(subject)
if last !== nothing
next!(actor, last)
end
complete!(actor)
return voidTeardown
else
return subscribe!(subject.subject, actor)
end
end
##
struct PendingSubjectFactoryInstance{ F <: AbstractSubjectFactory } <: AbstractSubjectFactory
factory :: F
end
Base.show(io::IO, subject::PendingSubjectFactoryInstance{F}) where F = print(io, "PendingSubjectFactoryInstance($F)")
create_subject(::Type{L}, factory::PendingSubjectFactoryInstance) where L = PendingSubject(L, factory.factory)
function PendingSubjectFactory(factory::F) where { F <: AbstractSubjectFactory }
return PendingSubjectFactoryInstance(factory)
end
function PendingSubjectFactory(; scheduler::H = AsapScheduler()) where { H <: AbstractScheduler }
return PendingSubjectFactoryInstance(SubjectFactory{H}(scheduler))
end