-
Notifications
You must be signed in to change notification settings - Fork 2
/
switchMap.ts
72 lines (65 loc) · 1.92 KB
/
switchMap.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import { Source, Operator, Observer, Unsubscribe } from '../index';
import { createSource } from '../sources';
import subscribe from '../utils/subscribe';
/**
* Projects each source value to a source which is merged in the output source, emitting values only from the most recently projected source
*
* @param mapper - function which transform item emitted by the source to a new source
* @return callbag operator
*
* @public
*/
function switchMap<I, O>(mapper: (value: I) => Source<O>): Operator<I, O> {
return source => {
return createSource((next, complete, error) => {
let hasCurrentSubscription = false;
let completed = false;
let finished = false;
let unsubscribe: Unsubscribe = () => {};
let unsubscribePrevious: Unsubscribe = () => {};
const mappedObserver: Observer<O> = {
next,
error: (err: any) => {
hasCurrentSubscription = false;
finished = true;
error(err);
unsubscribe();
},
complete: () => {
hasCurrentSubscription = false;
if (completed && !finished) {
finished = true;
complete();
}
},
};
const observer: Observer<I> = {
next: value => {
unsubscribePrevious();
hasCurrentSubscription = true;
const source = mapper(value);
unsubscribePrevious = subscribe(source)(mappedObserver);
},
error: err => {
completed = true;
finished = true;
unsubscribePrevious();
error(err);
},
complete: () => {
completed = true;
if (!hasCurrentSubscription && !finished) {
finished = true;
complete();
}
},
};
unsubscribe = subscribe(source)(observer);
return () => {
unsubscribePrevious();
unsubscribe();
};
});
};
}
export default switchMap;