Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RxJS入门——基础操作指北 #3

Open
forthealllight opened this issue May 16, 2018 · 0 comments
Open

RxJS入门——基础操作指北 #3

forthealllight opened this issue May 16, 2018 · 0 comments

Comments

@forthealllight
Copy link
Owner

forthealllight commented May 16, 2018

RxJS入门——基础操作指北


本章所基于的RxJS的版本问5.5.9,在本章中介绍RxJS的基础知识,比如Observable、Observer、Subscription、Subject以及Operation等

  • 了解RxJS的设计思想
  • 掌握RxJS的基础用法
  • 梳理RxJS的接口API

一、RxJS的设计思想

简单理解RxJS的设计思想,之前实现了PromiseA+规范下的promise,创建一个Promise后,有一个状态的初始值pending,两个状态的改变值,fullfilled和rejected.在Promise的订阅机制中,一旦状态发生改变,状态的改变就是不可逆的.而RxJS不同,Observable相当于一个消息生成器,给消息生成器设置处理函数,通过消息生成器传递信息给处理函数,处理函数作为订阅者执行相应的逻辑,并返回结果.

因为Observable这个消息生成器是多值的,因此订阅者执行后返回的值也是多值的。

二、RxJS的基本用法

来看RxJS的基本用法,简单的介绍Observable、Observer、Subscription、Subject以及Operation的使用

1.Observable、Observer

(1)首先来看基本的用法,

第一步创建可观察对象:

import Rx from 'rxjs/Rx';
const observable=Rx.Observable.create(
 function  subscribe(observer){
       try{
         observer.next(1);
         observer.next(2);
         observer.next(3);
         observer.complete();
       }catch(e){
         observer.error(e);
       }
})

在每一个可观察对象中会推送消息给订阅者,比如在上述函数中通过在next、complete和error方法中传递参数的形式将消息推送给订阅者。

第二步,创建订阅者

const observer={
  next:x=>console.log(x),
  error:error=>console.log(error),
  complete:()=>console.log('Observer got a complete notification')
}

订阅者有几个函数属性,用于接受可观察对象中的值并执行。

第三步,建立观察订阅关系:

observable.subscribe(observer);

执行返回的结果为:

1
2
3
Observer got a complete notification

(2)Observable异步推送

const observable=Rx.Observable.create(function              subscribe(observer){
       try{
         observer.next(1);
         observer.next(2);
         observer.next(3);
         setTimeout(()=>{
           observer.next(4)
         },1000)
       }catch(e){
         observer.error(e);
       }
});
const observer={
  next:x=>console.log(x),
  error:error=>console.log(error),
  complete:()=>console.log('Observer got a complete notification')
}
console.log('before');
observable.subscribe(observer);
console.log('after');

通过setTimeout异步推送值给next方法,输出结果为:

before
1
2
3
after
4

(3)取消Observable的订阅

var subscription = observable.subscribe(observer);
subscription.unsubscribe();

2. Subject

(1)基础用法

RxJS中的Subject是一种特殊的Observable,通过Subject定义的可观察对象可以被多个Observer订阅.

const subject=new Rx.Subject();
subject.subscribe({
  next:(v)=>console.log('observerA:'+v)
});
subject.subscribe({
  next:(v)=>console.log('observerB:'+v)
});
subject.next(1);
subject.next(2);

输出信息为:

observerA: 1
observerB: 1
observerA: 2
observerB: 2

(2)Subject特殊类型

Subject也有几种特殊的类型,BehaviorSubject、ReplaySubject 和 AsyncSubject。依次来看每一种类型是如何使用的:

  • BehaviorSubject

通过BehaviorSubject构建的观察者,会将最新值发送给订阅者,并且观察者一旦创建就是有初值的:

const subject=new Rx.BehaviorSubject(0);

如上述的代码中创建了一个观察者,默认值为0,也就是上述代码其实默认执行了:

subject.next(0)

因此,如果:

const subject=new Rx.BehaviorSubject(0);
subject.subscribe({
  next:(v)=>console.log('observerA:'+v)
});

虽然我们没有在subject中传入值,但是因为有默认值,因此在控制台输出:

observerA:0

BehaviorSubject方法定义的观察者,会始终使用最新值,也就是将最新值传递给订阅者.完整的例子为:

    const subject=new Rx.BehaviorSubject(0);
    subject.subscribe({
      next:(v)=>console.log('observerA:'+v)
    });
    subject.next(1);
    subject.next(2);
    subject.subscribe({
      next:(v)=>console.log('observerB:'+v)
    });
    subject.next(3);

输出的值为:

observerA: 0
observerA: 1
observerA: 2
observerB: 2
observerA: 3
observerB: 3

注意创建第二个订阅者B时,因为最新的next函数的参数是2,因此第二个订阅者会输出2

  • ReplaySubject

ReplaySubject可以缓存旧的观察者的值,传递给新的订阅者,在构造的函数中可以制定缓存旧值的个数.直接看例子:

    const subject = new Rx.ReplaySubject(3); // 为新的订阅者缓冲3个值
    
    subject.subscribe({
      next: (v) => console.log('observerA: ' + v)
    });
    
    subject.next(1);
    subject.next(2);
    subject.next(3);
    subject.next(4);
    
    subject.subscribe({
      next: (v) => console.log('observerB: ' + v)
    });
    
    subject.next(5);

在observerB订阅之前,我们在观察者中保留了最新的3个值,此时最新的3个值分别为2,3,4,因此在observerB订阅时,就会输出2,3,4,完成的输出为:

    observerA: 1
    observerA: 2
    observerA: 3
    observerA: 4
    observerB: 2
    observerB: 3
    observerB: 4
    observerA: 5
    observerB: 5

此外,可以设置时间,来缓存多少时间段内的观察者的值。

  • AsyncSubject

AsyncSubject只有在可观察对象complete的时候,才会将最新的值传递给订阅者,举例来说:

    const subject = new Rx.AsyncSubject();

    subject.subscribe({
      next: (v) => console.log('observerA: ' + v)
    });
    
    subject.next(1);
    subject.next(2);
    subject.next(3);
    subject.next(4);
    
    subject.subscribe({
      next: (v) => console.log('observerB: ' + v)
    });
    
    subject.next(5);
    subject.complete();

输出的结果为:

observerA: 5
observerB: 5

小提示:Observer也可以只是一个函数的形式,这种清空下这个函数等同与next属性的函数,也就是说,下面两种方法是等价的:

subject.subscribe({
   next:(v)=>console.log('observerB:'+v)
})

和省略的条件下:

subject.subscribe(v=>console.log('observerB:'+v))

3.Operators(操作符)

(1)操作符的定义

可观察对象上有很多操作符,比如.map(...)、.filter(...)等等,操作符的本身是一个纯函数,接受一个Observable,返回一个新的Observable,并且如果对订阅新生成的Observable,那么同时也会使得旧的观Observable也被订阅.

举例来说:

function multiplyByTen(input) {
  var output = Rx.Observable.create(function subscribe(observer) {
    input.subscribe({
      next: (v) => observer.next(10 * v),
      error: (err) => observer.error(err),
      complete: () => observer.complete()
    });
  });
  return output;
}
var input = Rx.Observable.from([1, 2, 3, 4]);
var output = multiplyByTen(input);
output.subscribe(x => console.log(x));

在上述的例子中,multiplyByTen就类似于一个操作符函数,该函数接受一个Observable,这里为input,同时返回一个新的Observable,这里为output,最后我们在新创建的Observable上进行订阅:

output.subscribe(x => console.log(x));

因为旧的Observable此时也同时被订阅,因此输出的结果为:

10
20
30
40

(2)实例操作符和静态操作符

实例操作符定义在Observable原型上,在该实例操作符所定义的方法中,通过this取得实例的具体值:

Rx.Observable.prototype.multiplyByTen = function multiplyByTen() {
  var input = this;
  return Rx.Observable.create(function subscribe(observer) {
    input.subscribe({
      next: (v) => observer.next(10 * v),
      error: (err) => observer.error(err),
      complete: () => observer.complete()
    });
  });
}

在实例方法中,不需要给该方法传递具体Observable作为参数输入,而是在Observable本身上直接调用.实例调用的方法为:

var observable = Rx.Observable.from([1, 2, 3, 4]).multiplyByTen();
observable.subscribe(x => console.log(x));

静态方法就是纯函数,接受Observable作为参数.比如在我们上述使用的Rx.Observable.create,就是一个常见的静态方法

4.常见操作符举例

Observable中的操作符很多我们来举几个例子。

(1) from 操作符

可以将任何值转化成一个可观察对象Observable,这里的任何值包括数组、类数组对象、promise、迭代对象以及类可观察者对象.首先,比如我们可以将一个数组转化成一个Observable:

const array=[10,20,30];
var result=Rx.Observable.from(array);
result.subscribe(x=>console.log(x));

如果传入的是数组,那么会返回一个next数组值的Observable,最后的输出为:

10
20
30

如果from接受的参数是一个迭代对象,比如generator构造的状态机,那么会有:

var iterator = generateDoubles(3);
var result = Rx.Observable.from(iterator).take(10);
result.subscribe(x => console.log(x));

输入的结果为:

3 6 12 24 48 96 192 384 768 1536

(2) fromEvent 操作符

该操作符与事件有关,将Dom事件,nodejs中通过EventEmitter所出发的事件等转化成一个可观察对象Observer,举例来看DOM事件的例子:

var clicks = Rx.Observable.fromEvent(document, 'click');
clicks.subscribe(x => console.log(x));

这样,点击docment后会在next方法中传入一个鼠标事件对象MouseEvent,因此点击,在订阅者中会输出一个鼠标事件对象.输出的结果为:

MouseEvent {isTrusted: true, screenX: 48, screenY: 123, clientX: 35, clientY: 25, …}

此外,fromEvent还可以接受第三个参数option,默认的事件是遵循在冒泡阶段执行,默认为false,如果将option设置为true,将在冒泡阶段进行.比如:

    var clicksInDocument=Rx.Observable.fromEvent(document, 'click', true); 
    var clicksInDiv = Rx.Observable.fromEvent(someDivInDocument, 'click');
    
    clicksInDocument.subscribe(() => console.log('document'));
    clicksInDiv.subscribe(() => console.log('div'));

如果不设置option,那么应该先输出div,后输出document,但是此时的情况下设置了option为true,那么会先输出document,后输出div,事件在捕获阶段进行.

(3) fromEventPattern

该操作符将添加事件的函数,转化成一个可观察的Observable,举例来说:

function addClickHandler(handler) {
  document.addEventListener('click', handler);
}

function removeClickHandler(handler) {
  document.removeEventListener('click', handler);
}

var clicks = Rx.Observable.fromEventPattern(
  addClickHandler,
  removeClickHandler
);
clicks.subscribe(x => console.log(x));

上述的过程中有两个函数,这两个函数内部是执行的事件添加的过程,通过fromEventPattern可以将函数转化成可观察对象,最后输出的值也是一个MouseEvent.

(4)fromPromise

该操作符将promise转化成一个可观察的Observable,举例来说:

var result = Rx.Observable.fromPromise(fetch('http://myserver.com/'));
result.subscribe(x => console.log(x), e => console.error(e));

上述代码中fetch返回一个promise,我们将这个promise转化成一个可观察的Observable对象.next方法对应与fullied,同时error方法对应了rejected.

(5)interval

创建一个可观察对象Observable,定时的输出(emit)连续的序列.举例来说:

var numbers = Rx.Observable.interval(1000);
numbers.subscribe(x => console.log(x));

上述的方法会以生序的方式输出序列.
输出结果为1,2,3,...每个数字间隔1000ms

(6)merge

顾名思义,merge操作符就是将几个可观察对象融合,生成一个组合形式的新的可观察对象,举例来看:

var clicks = Rx.Observable.fromEvent(document, 'click');
var timer = Rx.Observable.interval(1000);
var clicksOrTimer = Rx.Observable.merge(clicks, timer);
clicksOrTimer.subscribe(x => console.log(x));

通过merge方法将interval和click方法融合,这样之后,新的Observable会有2个可观察的属性,对于订阅者而言,输出信息为,依次输出1,2,3...,当有点击事件发生时,输出MouseEvent

(7)never

该操作符表示生成一个不会emit出任何信息的可观察Observable,举例来说:

function info() {
  console.log('Will not be called');
}
var result = Rx.Observable.never().startWith(7);
result.subscribe(x => console.log(x), info, info);

该方法不会有任何的emit过程,只在初始的时候输出了7

(8)of

该操作符表示将一组数据在一次中完全输出,同时一次性完全输出后,可观察的状态变为complete.举例来说:

var numbers = Rx.Observable.of(10, 20, 30);
var letters = Rx.Observable.of('a', 'b', 'c');
var interval = Rx.Observable.interval(1000);
var result = numbers.concat(letters).concat(interval);
result.subscribe(x => console.log(x));

输出结果为:一次性输出10,20,30,a,b,c
然后定时输出:1,2,3,4...

(9)range

该操作符表示同时输出一段范围内的值,举例来说:

var numbers = Rx.Observable.range(1, 10);
numbers.subscribe(x => console.log(x));

输出的值为1,2,3,4,5,6,7,8,9,10

(10)delay

该操作符表示延迟emit,举例来说:

var clicks = Rx.Observable.fromEvent(document, 'click');
var delayedClicks = clicks.delay(1000); // each click emitted after 1 second
delayedClicks.subscribe(x => console.log(x));

上述的方法表示点击后,延迟1000ms才进行emit.

(11)debounce

顾名思义,该操作符表示的是去抖动,也就是说规定,多次重复的事件中,只执行最近的一次

var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.debounce(() => Rx.Observable.interval(1000));
result.subscribe(x => console.log(x));

在上述的例子中,就实现了debounce,此时按钮事件在1秒内只能被执行一次.并且emit的是最近的一次的观察信息。

(12)throttle

节流,限制了可观察信息emit的频率,举例来说:

var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.throttle(ev => Rx.Observable.interval(1000));
result.subscribe(x => console.log(x));

上面的代码说明,可观察信息emit的频率最高为1hz(1/1000ms)

三、Scheduler调度器

调度器控制了何时启动订阅以及可观察对象何时emit,普通的Observable通过observeOn方法来指定调度器,

var observable = Rx.Observable.create(function (observer) {
  observer.next(1);
  observer.next(2);
  observer.next(3);
  observer.complete();
})
.observeOn(Rx.Scheduler.async);

console.log('just before subscribe');
observable.subscribe({
  next: x => console.log('got value ' + x),
  error: err => console.error('something wrong occurred: ' + err),
  complete: () => console.log('done'),
});
console.log('just after subscribe');

上述是一个异步调度的过程,输出的信息为:

just before subscribe
just after subscribe
got value 1
got value 2
got value 3
done

先输出同步信息,再输出异步调度信息.

如果指定:

observeOn(Rx.Scheduler.queue);

那就是顺序执行先入先出,输出的信息为:

just before subscribe
got value 1
got value 2
got value 3
done
just after subscribe

此外,所有的操作符默认都是有第三个参数,用于指定调度器.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant