Skip to content

jtsky/RxJava--analyse

Repository files navigation

2015-11-27 本项目的分析参考了 扔物线之前写了一篇文章 《给 Android 开发者的 RxJava 详解》
这里还有一篇关于Rxjava lift()函数的分析
废话不多说 直接开始

RxJava的基本用法:

Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                subscriber.onNext(2);
            }
        }).map(new Func1<Integer, String>() {
            @Override
            public String call(Integer integer) {
                return "===>" + integer;
            }
        }).subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                Log.i(TAG, s);
            }
        });

最后将会输出 ===>2

其实上面这段代码中涉及到2个Observable和2个OnSubscribe(具体Observable OnSubscribe Subscribe之间的关系可以去看扔物线的文章)

具体可以看源码

public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
        return lift(new OperatorMap<T, R>(func));
    }

//最主要的方法
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
        return new Observable<R>(new OnSubscribe<R>() {
            @Override
            public void call(Subscriber<? super R> o) {
                try {
                    //这段代码下面会仔细分析
                    Subscriber<? super T> st = hook.onLift(operator).call(o);
                    try {
                        st.onStart();
                        onSubscribe.call(st);
                    } catch (Throwable e) {
                        if (e instanceof OnErrorNotImplementedException) {
                            throw (OnErrorNotImplementedException) e;
                        }
                        st.onError(e);
                    }
                } catch (Throwable e) {
                    if (e instanceof OnErrorNotImplementedException) {
                        throw (OnErrorNotImplementedException) e;
                    }
                    o.onError(e);
                }
            }
        });
    }


    public <T, R> Operator<? extends R, ? super T> onLift(final Operator<? extends R, ? super T> lift) {
            return lift;
        }

Observable.create()的时候是不发送事件的,只有当Observable.subscribe()以后事件才开始emit。

下面直接看subscribe()源码(这里是精简的源码)

public final Subscription subscribe(Subscriber<? super T> subscriber) {
        return Observable.subscribe(subscriber, this);
    }
<br />
private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
        //这里可以做一些emit之前的初始化操作 具体可以重写onStart()
        subscriber.onStart();

        // 对subscriber进行封装 具体是一些错误的处理
        if (!(subscriber instanceof SafeSubscriber)) {
            subscriber = new SafeSubscriber<T>(subscriber);
        }
            //重点方法
            hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
            return hook.onSubscribeReturn(subscriber);

        }
    }

public <T> OnSubscribe<T> onSubscribeStart(Observable<? extends T> observableInstance, final OnSubscribe<T> onSubscribe) {
        return onSubscribe;
    }

onSubscribeStart()方法只是简单的返回了map操作符中创建的OnSubscribe对象,简称为OnSubscribe2,即lift中的

new OnSubscribe<R>() {
            @Override
            public void call(Subscriber<? super R> o) {
                try {
                    //这段代码下面会仔细分析
                    Subscriber<? super T> st = hook.onLift(operator).call(o);
                    try {
                        st.onStart();
                        onSubscribe.call(st);
                    } catch (Throwable e) {
                        if (e instanceof OnErrorNotImplementedException) {
                            throw (OnErrorNotImplementedException) e;
                        }
                        st.onError(e);
                    }
                } catch (Throwable e) {
                    if (e instanceof OnErrorNotImplementedException) {
                        throw (OnErrorNotImplementedException) e;
                    }
                    o.onError(e);
                }
            }
        }

而OnSubscribe2.call(subscriber)中的subscriber为

new Action1<String>() {
            @Override
            public void call(String s) {
                Log.i(TAG, s);
            }
        }

接下来进入到lift中继续分析 Subscriber<? super T> st = hook.onLift(operator).call(o) 中的operator 是OperatorMap 由lift(new OperatorMap<T, R>(func))传入 func很简单就是

new Func1<Integer, String>() {
            @Override
            public String call(Integer integer) {
                return "===>" + integer;
            }
        }

下面具体看下OperatorMap

public final class OperatorMap<T, R> implements Operator<R, T> {

    private final Func1<? super T, ? extends R> transformer;

    public OperatorMap(Func1<? super T, ? extends R> transformer) {
        this.transformer = transformer;
    }

    @Override
    public Subscriber<? super T> call(final Subscriber<? super R> o) {
        return new Subscriber<T>(o) {

            @Override
            public void onCompleted() {
                o.onCompleted();
            }

            @Override
            public void onError(Throwable e) {
                o.onError(e);
            }

            @Override
            public void onNext(T t) {
                try {
                    o.onNext(transformer.call(t));
                } catch (Throwable e) {
                    Exceptions.throwIfFatal(e);
                    onError(OnErrorThrowable.addValueAsLastCause(e, t));
                }
            }

        };
    }

}

transformer 即为func 所以

Subscriber<? super T> st = hook.onLift(operator).call(o);
st.onStart();
onSubscribe.call(st);

可以简写成

Subscriber newSubscriber = operator.call(subscriber);
newSubscriber.onStart();
onSubscribe.call(newSubscriber);

operator.call(subscriber)其实就是对

new Action1<String>() {
            @Override
            public void call(String s) {
                Log.i(TAG, s);
            }
        }

进行了包装 返回了一个新的newSubscriber

@Override
    public Subscriber<? super T> call(final Subscriber<? super R> o) {
        return new Subscriber<T>(o) {

            @Override
            public void onCompleted() {
                o.onCompleted();
            }

            @Override
            public void onError(Throwable e) {
                o.onError(e);
            }

            @Override
            public void onNext(T t) {
                try {
                    o.onNext(transformer.call(t));
                } catch (Throwable e) {
                    Exceptions.throwIfFatal(e);
                    onError(OnErrorThrowable.addValueAsLastCause(e, t));
                }
            }

        };
    }

所以 onSubscribe.call(newSubscriber)即为subscriber.onNext(2); 而

public void onNext(T t) {
                try {
                    o.onNext(transformer.call(t));
                } catch (Throwable e) {
                    Exceptions.throwIfFatal(e);
                    onError(OnErrorThrowable.addValueAsLastCause(e, t));
                }
            }

就可以写成

public void onNext(T t) {
                try {
                    o.onNext(new Func1<Integer, String>() {
                                         @Override
                                         public String call(Integer integer) {
                                             return "===>" + integer;
                                         }
                                     }.call(t));
                } catch (Throwable e) {
                    Exceptions.throwIfFatal(e);
                    onError(OnErrorThrowable.addValueAsLastCause(e, t));
                }
            }

o即为

new Action1<String>() {
            @Override
            public void call(String s) {
                Log.i(TAG, s);
            }
        }

所以最终的结果为===>2

About

关于Rxjava的源码分析

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages