-
Notifications
You must be signed in to change notification settings - Fork 4.2k
/
Concat.swift
131 lines (99 loc) · 4.95 KB
/
Concat.swift
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
//
// Concat.swift
// RxSwift
//
// Created by Krunoslav Zaher on 3/21/15.
// Copyright © 2015 Krunoslav Zaher. All rights reserved.
//
extension ObservableType {
/**
Concatenates the second observable sequence to `self` upon successful termination of `self`.
- seealso: [concat operator on reactivex.io](http://reactivex.io/documentation/operators/concat.html)
- parameter second: Second observable sequence.
- returns: An observable sequence that contains the elements of `self`, followed by those of the second sequence.
*/
public func concat<Source: ObservableConvertibleType>(_ second: Source) -> Observable<Element> where Source.Element == Element {
Observable.concat([self.asObservable(), second.asObservable()])
}
}
extension ObservableType {
/**
Concatenates all observable sequences in the given sequence, as long as the previous observable sequence terminated successfully.
This operator has tail recursive optimizations that will prevent stack overflow.
Optimizations will be performed in cases equivalent to following:
[1, [2, [3, .....].concat()].concat].concat()
- seealso: [concat operator on reactivex.io](http://reactivex.io/documentation/operators/concat.html)
- returns: An observable sequence that contains the elements of each given sequence, in sequential order.
*/
public static func concat<Sequence: Swift.Sequence>(_ sequence: Sequence) -> Observable<Element>
where Sequence.Element == Observable<Element> {
return Concat(sources: sequence, count: nil)
}
/**
Concatenates all observable sequences in the given collection, as long as the previous observable sequence terminated successfully.
This operator has tail recursive optimizations that will prevent stack overflow.
Optimizations will be performed in cases equivalent to following:
[1, [2, [3, .....].concat()].concat].concat()
- seealso: [concat operator on reactivex.io](http://reactivex.io/documentation/operators/concat.html)
- returns: An observable sequence that contains the elements of each given sequence, in sequential order.
*/
public static func concat<Collection: Swift.Collection>(_ collection: Collection) -> Observable<Element>
where Collection.Element == Observable<Element> {
return Concat(sources: collection, count: Int64(collection.count))
}
/**
Concatenates all observable sequences in the given collection, as long as the previous observable sequence terminated successfully.
This operator has tail recursive optimizations that will prevent stack overflow.
Optimizations will be performed in cases equivalent to following:
[1, [2, [3, .....].concat()].concat].concat()
- seealso: [concat operator on reactivex.io](http://reactivex.io/documentation/operators/concat.html)
- returns: An observable sequence that contains the elements of each given sequence, in sequential order.
*/
public static func concat(_ sources: Observable<Element> ...) -> Observable<Element> {
Concat(sources: sources, count: Int64(sources.count))
}
}
final private class ConcatSink<Sequence: Swift.Sequence, Observer: ObserverType>
: TailRecursiveSink<Sequence, Observer>
, ObserverType where Sequence.Element: ObservableConvertibleType, Sequence.Element.Element == Observer.Element {
typealias Element = Observer.Element
override init(observer: Observer, cancel: Cancelable) {
super.init(observer: observer, cancel: cancel)
}
func on(_ event: Event<Element>){
switch event {
case .next:
self.forwardOn(event)
case .error:
self.forwardOn(event)
self.dispose()
case .completed:
self.schedule(.moveNext)
}
}
override func subscribeToNext(_ source: Observable<Element>) -> Disposable {
source.subscribe(self)
}
override func extract(_ observable: Observable<Element>) -> SequenceGenerator? {
if let source = observable as? Concat<Sequence> {
return (source.sources.makeIterator(), source.count)
}
else {
return nil
}
}
}
final private class Concat<Sequence: Swift.Sequence>: Producer<Sequence.Element.Element> where Sequence.Element: ObservableConvertibleType {
typealias Element = Sequence.Element.Element
fileprivate let sources: Sequence
fileprivate let count: IntMax?
init(sources: Sequence, count: IntMax?) {
self.sources = sources
self.count = count
}
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
let sink = ConcatSink<Sequence, Observer>(observer: observer, cancel: cancel)
let subscription = sink.run((self.sources.makeIterator(), self.count))
return (sink: sink, subscription: subscription)
}
}