-
Notifications
You must be signed in to change notification settings - Fork 18
/
concat_map_to.jl
46 lines (35 loc) · 1.44 KB
/
concat_map_to.jl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
export concat_map_to
"""
switch_map_to(inner_observable)
Creates a `switch_map_to` operator, which returns an observable of values merged together by
joining the passed observable with itself, one after the other, for each value emitted from the source.
Essentially it projects each source value to the same Observable which is merged multiple times in a
serialized fashion on the output Observable.
# Arguments
- `inner_observable`: an Observable to replace each value from the source Observable.
# Producing
Stream of type `<: Subscribable{R}` where R refers to the eltype of `inner_observable`
# Examples
```jldoctest
using Rocket
source = from([ 0, 0, 0 ])
subscribe!(source |> concat_map_to(from([ 1, 2, 3 ])), logger())
;
# output
[LogActor] Data: 1
[LogActor] Data: 2
[LogActor] Data: 3
[LogActor] Data: 1
[LogActor] Data: 2
[LogActor] Data: 3
[LogActor] Data: 1
[LogActor] Data: 2
[LogActor] Data: 3
[LogActor] Completed
```
See also: [`concat_map`](@ref), [`AbstractOperator`](@ref), [`RightTypedOperator`](@ref), [`ProxyObservable`](@ref), [`logger`](@ref)
"""
concat_map_to(source::S) where S = as_concat_map_to(as_subscribable(S), source)
as_concat_map_to(::InvalidSubscribableTrait, source) = throw(InvalidSubscribableTraitUsageError(source))
as_concat_map_to(::SimpleSubscribableTrait{R}, source) where R = concat_map(R, (_) -> source)
as_concat_map_to(::ScheduledSubscribableTrait{R}, source) where R = concat_map(R, (_) -> source)