# 合并数据流


## 合并类操作符

`RxJS`提供了一系列可以完成`Observable`组合操作的操作符，在一类操作符成为**合并类操作符**。

> 绝大部分合并类操作符都同时具有*静态操作符*和*实例操作符*

### concat操作符
`concat`将多个Observable中数据内容依次合并。

`concat`会依次订阅Observable实例，直到Observable实例complete才会订阅下一个实例。

In [6]:
import { Observable } from 'rxjs/Observable';
import 'rxjs/add/observable/of';
import 'rxjs/add/observable/concat';

var source1$ = Observable.of(1, 2, 3);
var source2$ = Observable.of(4, 5, 6);
var ofConcatSource$ = Observable.concat(source1$, source2$);

ofConcatSource$.subscribe(console.log)

1
2
3
4
5
6


Subscriber {
  closed: true,
  _parentOrParents: null,
  _subscriptions: null,
  syncErrorValue: null,
  syncErrorThrown: false,
  syncErrorThrowable: true,
  isStopped: true,
  destination: SafeSubscriber {
    closed: true,
    _parentOrParents: null,
    _subscriptions: null,
    syncErrorValue: null,
    syncErrorThrown: false,
    syncErrorThrowable: false,
    isStopped: true,
    destination: {
      closed: true,
      next: [Function: next],
      error: [Function: error],
      complete: [Function: complete]
    },
    _parentSubscriber: null,
    _context: null,
    _next: [Function: bound log],
    _error: undefined,
    _complete: undefined
  }
}

### merge操作符
merge会第一时间订阅所有的上游Observable，然后对上游的数据采用”先到先得“策略。

任何一个Observable只要有数据推下来，就立刻转给下游Observable对象。

merge可选参数concurrent，用于指定可以同时合并的Observable对象个数

In [4]:
$$.async();
import { Observable } from 'rxjs/Observable';
import 'rxjs/add/observable/timer';
import 'rxjs/add/operator/map';
import 'rxjs/add/observable/merge';

var source1$ = Observable.timer(0, 1000).map(x => `${x}A`);
var source2$ = Observable.timer(500, 1000).map(x => `${x}B`);

var concatSource$ = Observable.merge(source1$, source2$);
var subscription = concatSource$.subscribe(console.log);
(function($$){
    setTimeout(() => {
        subscription.unsubscribe();
        $$.done();
    }, 3000);
})($$);

0A
0B
1A
1B
2A
2B


### zip操作符

合并操作符

In [16]:
import { Observable } from 'rxjs/Observable';
import 'rxjs/add/observable/of';
import 'rxjs/add/observable/zip';

var source1$ = Observable.of(1, 2, 3, 4);
var source2$ = Observable.of(5, 6, 7, 8);
var project = (a, b) => {
    return `${a}---${b}`
};
var zipSource$ = Observable.zip(source1$, source2$, project);
var zipSubscription = zipSource$.subscribe(console.log)

1---5
2---6
3---7
4---8


{}

### combineLatest操作符

combineLatest合并数据流的方式是当任何一个上游`Observable`产生数据时，从所有输入`Observable`对象中拿最后一次产生的数据（最新数据）。

与`zip`操作符差异：zip对上游数据只使用一次，而combineLatest可能会反复使用上游产生的最新数据。

In [18]:
$$.async();
import { Observable } from 'rxjs/Observable';
import 'rxjs/add/observable/timer';
import 'rxjs/add/observable/combineLatest';
import 'rxjs/add/operator/map';

var source1$ = Observable.timer(0, 1000).map(x => `${x}A`);
var source2$ = Observable.timer(500, 1000).map(x => `${x}B`);
var combineLatestSource$ = Observable.combineLatest(source1$, source2$);
var combineLatestDescription = combineLatestSource$.subscribe(console.log);

($$ => {
    setTimeout(() => {
        combineLatestDescription.unsubscribe();
        $$.done();
    }, 10 * 1000)
})($$);

[ '0A', '0B' ]
[ '1A', '0B' ]
[ '1A', '1B' ]
[ '2A', '1B' ]
[ '2A', '2B' ]
[ '3A', '2B' ]
[ '3A', '3B' ]
[ '4A', '3B' ]
[ '4A', '4B' ]
[ '5A', '4B' ]
[ '5A', '5B' ]
[ '6A', '5B' ]
[ '6A', '6B' ]
[ '7A', '6B' ]
[ '7A', '7B' ]
[ '8A', '7B' ]
[ '8A', '8B' ]
[ '9A', '8B' ]
[ '9A', '9B' ]


#### 定制下游数据

`combineLatest`最后一个参数可以是一个函数（project函数），作用：combineLatest把所有上游的“最新数据”扔给下游做一个组合处理。

#### 多重依赖问题

`withLatestFrom`功能类似于`combineLatest`，但是给下游推送数据只能由一个上游的`Observable`对象驱动。

In [2]:
$$.async();
import { Observable } from 'rxjs/Observable';
import 'rxjs/add/observable/timer';
import 'rxjs/add/observable/combineLatest';
import 'rxjs/add/operator/map';
import 'rxjs/add/operator/withLatestFrom';

// source1和source2同时依赖于origin
var origin$ = Observable.timer(0, 1000)
var source1$ = origin$.map(x => `${x}A`);
var source2$ = origin$.map(x => `${x}B`);
// var combineLatestSource$ = Observable.combineLatest(source1$, source2$);
// var combineLatestDescription = combineLatestSource$
//                                 .subscribe(value => {
//                                     console.log('==========');
//                                     console.log(value);
//                                 });

// withLatestFrom
var withLatestFromSource$ = source2$.withLatestFrom(source1$);
var withLatestFromDescription = withLatestFromSource$.subscribe(console.log);

($$ => {
    setTimeout(() => {
//         combineLatestDescription.unsubscribe();
        withLatestFromDescription.unsubscribe();
        $$.done();
    }, 5 * 1000)
})($$);

[ '0B', '0A' ]
[ '1B', '1A' ]
[ '2B', '2A' ]
[ '3B', '3A' ]
[ '4B', '4A' ]


### race操作符

`race`操作符是多个Observable对象同时订阅，先产生数据的Observable对象`race`会继续使用该Observable对象，退订其他Observable对象。

### startWith操作符

`startWith`操作符是**实例操作符**。功能为：当一个Observable对象在被订阅的时候，总是先吐出指定的若干数据。

`startWith`是以同步的方式吐出数据的，支持多个参数。

In [17]:
$$.async()
import { Observable } from 'rxjs/Observable';
import 'rxjs/add/observable/fromPromise';
import 'rxjs/add/observable/of';
import 'rxjs/add/observable/interval';
import 'rxjs/add/operator/concat';
import 'rxjs/add/operator/take';
import 'rxjs/add/operator/startWith';

import { get } from 'axios';


var source$ = Observable.of(1, 2, 3).startWith('dahu');
source$.subscribe(
    console.log,
    null, 
    () => console.log('complete')
);

// source$ = Observable.fromPromise(get('http://poetry.apiopen.top/getTime'))
//     .map(res => res.data).startWith('dahu');
// source$.subscribe(console.log, null, () => $$.done());
source$ = Observable.interval(1000).take(4);
source$ = Observable.of(-1).concat(source$);
source$.subseribe(
    console.log,
    nu
);

dahu
1
2
3
complete
dahu
{
  code: 200,
  message: '成功!',
  result: {
    date: '2021-03-10',
    time: '18:43:34',
    weekday: '星期三',
    dateTime: '2021-03-10 18:43:34'
  }
}


### forkJoin操作符

`forkJoin`操作符只有**静态操作符**。可以接受多个Observable对象作为参数，其功能类似于`Promise.all`。

`forkJoin`会等到所有的Observable对象最后一个数据，确定不会有新的数据产生的时候，`forkJoin`就会把所有输入Observable对象产生的最后一个数据合并成给下游唯一的数据。

In [13]:
$$.async();
import { Observable } from 'rxjs/Observable';
import 'rxjs/add/observable/fromPromise';
import 'rxjs/add/observable/forkJoin';
import 'rxjs/add/operator/map';
import { get } from 'axios';

// 模拟慢请求
var promise1 = new Promise((reject, resolve) => {
    setTimeout(() => {
        get('http://poetry.apiopen.top/getTime')
            .then(reject)
            .catch(resolve);
    }, 1000);
});
var promise2 = get('http://poetry.apiopen.top/getTime')

var forkJoinSource$ = Observable.forkJoin(
    Observable.fromPromise(promise1).map(res => res.data).startWith(1), 
    Observable.fromPromise(promise2).map(res => res.data).startWith(2)
);
var forkJoinDescription = forkJoinSource$.subscribe(res => {
    console.log(res);
//     forkJoinDescription.unsubscribe();
}, null, () => $$.done());

[
  {
    code: 200,
    message: '成功!',
    result: {
      date: '2021-03-10',
      time: '18:38:33',
      weekday: '星期三',
      dateTime: '2021-03-10 18:38:33'
    }
  },
  {
    code: 200,
    message: '成功!',
    result: {
      date: '2021-03-10',
      time: '18:38:32',
      weekday: '星期三',
      dateTime: '2021-03-10 18:38:32'
    }
  }
]
