Skip to content

Latest commit

 

History

History
504 lines (429 loc) · 19.2 KB

4.RxJava详解之执行原理(四).md

File metadata and controls

504 lines (429 loc) · 19.2 KB

RxJava详解之执行原理(四)

前面几篇文章介绍了RxJava的基本使用,也说了RxJava的优缺点。下面我们就从源码的角度去分析一下RxJava的原理,以及如何进行 线程切换和导致内存泄漏的原因。

// 创建被观察者、数据源
Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        // 可以看到,这里传入了一个 OnSubscribe 对象作为参数。OnSubscribe 会被存储在返回的 Observable 对象中,它的作用相当于一个计划表,当 Observable
        //被订阅的时候,OnSubscribe 的 call() 方法会自动被调用,事件序列就会依照设定依次触发(对于上面的代码,就是观察者Subscriber 将会被调用三次 onNext() 和一次
        // onCompleted())。这样,由被观察者调用了观察者的回调方法,就实现了由被观察者向观察者的事件传递,即观察者模式。
        Log.i("@@@", "call");
        subscriber.onNext("Hello ");
        subscriber.onNext("World !");
        subscriber.onCompleted();
    }
});
// 创建观察者
Subscriber<String> subscriber = new Subscriber<String>() {
    @Override
    public void onCompleted() {
        Log.i("@@@", "onCompleted");
    }

    @Override
    public void onError(Throwable e) {
        Log.i("@@@", "onError");
    }

    @Override
    public void onNext(String s) {
        Log.i("@@@", "onNext : " + s);
    }
};
// 关联或者叫订阅更合适。
observable.subscribe(subscriber);

Observable源码分析

先看一下Observable.create()方法:

public class Observable<T> {

    final OnSubscribe<T> onSubscribe;

    /**
     * Creates an Observable with a Function to execute when it is subscribed to.
     * <p>
     * <em>Note:</em> Use {@link #create(OnSubscribe)} to create an Observable, instead of this constructor,
     * unless you specifically have a need for inheritance.
     *
     * @param f
     *            {@link OnSubscribe} to be executed when {@link #subscribe(Subscriber)} is called
     */
    protected Observable(OnSubscribe<T> f) {
        this.onSubscribe = f;
    }

	public static <T> Observable<T> create(OnSubscribe<T> f) {
		// 调用了上面的构造函数创建一个Observable对象,而RxJavaHooks.onCreate(f)是啥呢? 源码我们下面会看,这里我们只要知道`RxjavaHooks.onCreate(f)`的返回值还是f就行了
	    return new Observable<T>(RxJavaHooks.onCreate(f));
	}
	

	static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
     	// validate and proceed
        if (subscriber == null) {
            throw new IllegalArgumentException("subscriber can not be null");
        }
        // 这里会使用到Observable中的成员变量onSubscribe
        if (observable.onSubscribe == null) {
            throw new IllegalStateException("onSubscribe function can not be null.");
        }

        // new Subscriber so onStart it
        subscriber.onStart();
        // The code below is exactly the same an unsafeSubscribe but not used because it would
        // add a significant depth to already huge call stacks.
        try {
            // allow the hook to intercept and/or decorate
            // 这里会使用到Observable中的成员变量onSubscribe
            RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
            return RxJavaHooks.onObservableReturn(subscriber);
        } catch (Throwable e) {
        	。。。
        }
    }

}

需要一个OnSubscribe参数,OnSbuscribe接口继承Action1接口的它是在Observale.subscribe()方法调用时会执行的操作也就是说当调用observable.subscribe()方法时就会执行到该参数的call方法:

/**
 * Invoked when Observable.subscribe is called.
 * @param <T> the output value type
 */
public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> {
    // cover for generics insanity
}

/**
 * A one-argument action.
 * @param <T> the first argument type
 */
public interface Action1<T> extends Action {
    void call(T t);
}

而在上面Observable.create()方法中我们看到其实是调用了Observable类的构造函数,但是在参数那里传入的是RxJavaHooks.onCreate(f):

@Experimental
public final class RxJavaHooks {
	static volatile Func1<Observable.OnSubscribe, Observable.OnSubscribe> onObservableCreate;

    /** Initialize with the default delegation to the original RxJavaPlugins. */
    // 哎~静态代码块
    static {
        init();
    }

    static void init() {
        ...
        initCreate()
    }
	static void initCreate() {
        onObservableCreate = new Func1<Observable.OnSubscribe, Observable.OnSubscribe>() {
            @Override
            public Observable.OnSubscribe call(Observable.OnSubscribe f) {
                return RxJavaPlugins.getInstance().getObservableExecutionHook().onCreate(f);
            }
        };

        onSingleCreate = new Func1<rx.Single.OnSubscribe, rx.Single.OnSubscribe>() {
            @Override
            public rx.Single.OnSubscribe call(rx.Single.OnSubscribe f) {
                return RxJavaPlugins.getInstance().getSingleExecutionHook().onCreate(f);
            }
        };

        onCompletableCreate = new Func1<Completable.OnSubscribe, Completable.OnSubscribe>() {
            @Override
            public Completable.OnSubscribe call(Completable.OnSubscribe f) {
                return RxJavaPlugins.getInstance().getCompletableExecutionHook().onCreate(f);
            }
        };
    }

    /**
     * Hook to call when an Observable is created.
     * @param <T> the value type
     * @param onSubscribe the original OnSubscribe logic
     * @return the original or replacement OnSubscribe instance
     */
    @SuppressWarnings({ "rawtypes", "unchecked" })
    public static <T> Observable.OnSubscribe<T> onCreate(Observable.OnSubscribe<T> onSubscribe) {
    	// 这里onObservableCreate只有在调用initCreate()方法才会初始化,而initCreate()又被init()方法调用,init()在静态代码块中
        Func1<Observable.OnSubscribe, Observable.OnSubscribe> f = onObservableCreate;
        if (f != null) {
            // 所以之列返回的上面onObservableCreate创建的对OnSubscribe对象,其实这里可以简单的理解为直接返回onSubscribe参数
            return f.call(onSubscribe);
        }
        return onSubscribe;
    }

    public static void clear() {
        if (lockdown) {
            return;
        }
        onError = null;

        onObservableCreate = null;
        ...
    }
}    

继续看一下RxJavaPlugins.getInstance().getObservableExecutionHook().onCreate(f);:

public <T> OnSubscribe<T> onCreate(OnSubscribe<T> f) {
    return f;
}

到这里我们总结一下:

Observable.create(OnSubscribe f)方法其实就是内部new了一个被观察者Observable对象,同时将参数中所传过来的new出来的OnSubscribe对象赋值给了该Observable的成员变量onSubscribe。 那一定会想知道把create(OnSubscribe f)的参数赋值给成员变量onSubscribe有什么用呢? 当然是在Observable.subscribe()unSubscribe()中会用到啊。

Subscriber源码分析

我们在上面的例子中只是new了一个Subscriber对象并且实现了对应的方法.
Subscriber源码并不多,一共二百多行,所以....我要删除些无用的,然后都贴上来:

/**
 * Provides a mechanism for receiving push-based notifications from Observables, and permits manual
 * unsubscribing from these Observables.
 */
public abstract class Subscriber<T> implements Observer<T>, Subscription {

    // represents requested not set yet
    private static final long NOT_SET = Long.MIN_VALUE;
    // Subscription that represents a group of Subscriptions that are unsubscribed together.
    // 订阅事件集,所有发送给当前Subscriber的事件都会保存在这里
    private final SubscriptionList subscriptions;
    private final Subscriber<?> subscriber;
    /**
	 * Interface that establishes a request-channel between an Observable and a Subscriber and allows
	 * the Subscriber to request a certain amount of items from the Observable (otherwise known as
	 * backpressure).
	 */
    private Producer producer;

    protected Subscriber() {
        this(null, false);
    }

    protected Subscriber(Subscriber<?> subscriber) {
        this(subscriber, true);
    }

    /**
     * Construct a Subscriber by using another Subscriber for backpressure and
     * optionally for holding the subscription list (if
     * <code>shareSubscriptions</code> is <code>true</code> then when
     * <code>this.add(sub)</code> is called this will in fact call
     * <code>subscriber.add(sub)</code>).
     * <p>
     * To retain the chaining of subscribers when setting
     * <code>shareSubscriptions</code> to <code>false</code>, add the created
     * instance to {@code subscriber} via {@link #add}.
     *
     * @param subscriber
     *            the other Subscriber
     * @param shareSubscriptions
     *            {@code true} to share the subscription list in {@code subscriber} with
     *            this instance
     * @since 1.0.6
     */
    protected Subscriber(Subscriber<?> subscriber, boolean shareSubscriptions) {
        this.subscriber = subscriber;
        this.subscriptions = shareSubscriptions && subscriber != null ? subscriber.subscriptions : new SubscriptionList();
    }

    /**
     * Adds a {@link Subscription} to this Subscriber's list of subscriptions if this list is not marked as
     * unsubscribed. If the list <em>is</em> marked as unsubscribed, {@code add} will indicate this by
     * explicitly unsubscribing the new {@code Subscription} as well.
     *
     * @param s
     *            the {@code Subscription} to add
     */
    public final void add(Subscription s) {
        subscriptions.add(s);
    }

    @Override
    public final void unsubscribe() {
        subscriptions.unsubscribe();
    }

    /**
     * This method is invoked when the Subscriber and Observable have been connected but the Observable has
     * not yet begun to emit items or send notifications to the Subscriber. Override this method to add any
     * useful initialization to your subscription, for instance to initiate backpressure.
     */
    public void onStart() {
        // do nothing by default
    } 
}

可以看到Subscriber实现了ObserverSubscription接口:

public interface Observer<T> {
    void onCompleted();
    void onError(Throwable e);
    void onNext(T t);
}

public interface Subscription {
    void unsubscribe();
    boolean isUnsubscribed();
}

observable.subscribe(subscriber)源码分析

好了,关键部分出现了,我们继续看Observable类中的subscribe()方法,上面在将Observable源码的时候我们简单带过这部分,没有细说,这里仔细说一下:

public class Observable<T> {
    final OnSubscribe<T> onSubscribe;
    protected Observable(OnSubscribe<T> f) {
        this.onSubscribe = f;
    }
    public static <T> Observable<T> create(OnSubscribe<T> f) {
        return new Observable<T>(RxJavaHooks.onCreate(f));
    }

    public final Subscription subscribe(Subscriber<? super T> subscriber) {
        return Observable.subscribe(subscriber, this);
    }

    static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
     	// validate and proceed
        if (subscriber == null) {
            throw new IllegalArgumentException("subscriber can not be null");
        }

        if (observable.onSubscribe == null) {
            throw new IllegalStateException("onSubscribe function can not be null.");
            /*
             * the subscribe function can also be overridden but generally that's not the appropriate approach
             * so I won't mention that in the exception
             */
        }

        // new Subscriber so onStart it
        // 1.首先调用Subscriber.onStart()方法,上面我们知道该方法没有实现
        subscriber.onStart();

        /*
         * See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls
         * to user code from within an Observer"
         */
        // if not already wrapped
        // 把当前的subscriber转换成SafeSubscriber,SafeSubscriber是一个包装类,他对一些安全性做了校验,保证Subscriber中的onComplete和onError只会有一个被执行且执行一次
        // 而且一旦他们执行了后就不会再继续执行onNext方法
        if (!(subscriber instanceof SafeSubscriber)) {
            // assign to `observer` so we return the protected version
            subscriber = new SafeSubscriber<T>(subscriber);
        }

        // The code below is exactly the same an unsafeSubscribe but not used because it would
        // add a significant depth to already huge call stacks.
        try {
            // allow the hook to intercept and/or decorate
            // 这里就是核心了
            RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
            return RxJavaHooks.onObservableReturn(subscriber);

            // 其实上面的这两句代码完全可以简写为:
            // (observable.onSubscribe).call(subscriber);
            // return subscriber



        } catch (Throwable e) {
            // special handling for certain Throwable/Error/Exception types
            Exceptions.throwIfFatal(e);
            // in case the subscriber can't listen to exceptions anymore
            if (subscriber.isUnsubscribed()) {
                RxJavaHooks.onError(RxJavaHooks.onObservableError(e));
            } else {
                // if an unhandled error occurs executing the onSubscribe we will propagate it
                try {
                    subscriber.onError(RxJavaHooks.onObservableError(e));
                } catch (Throwable e2) {
                    Exceptions.throwIfFatal(e2);
                    // if this happens it means the onError itself failed (perhaps an invalid function implementation)
                    // so we are unable to propagate the error correctly and will just throw
                    RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
                    // TODO could the hook be the cause of the error in the on error handling.
                    RxJavaHooks.onObservableError(r);
                    // TODO why aren't we throwing the hook's return value.
                    throw r; // NOPMD
                }
            }
            return Subscriptions.unsubscribed();
        }
    }
}    

上面代码的核心是:

RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
return RxJavaHooks.onObservableReturn(subscriber);

RxJavaHooks这个类我们在前面讲Observable的时候也讲到过这个类的onCreate()方法。

我们分别看下他们的源码:

public static <T> Observable.OnSubscribe<T> onObservableStart(Observable<T> instance, Observable.OnSubscribe<T> onSubscribe) {
	// 这一块代码是不是似曾相识? 前面讲的RxJavaHooks。onCreate()方法是不是也是这样实现的? 只不过那里是onObservableCreate而这里是onObservableStart,这里可以简单的理解为返回参数onSubscribe
    Func2<Observable, Observable.OnSubscribe, Observable.OnSubscribe> f = onObservableStart;
    if (f != null) {
        return f.call(instance, onSubscribe);
    }
    return onSubscribe;
}

/**
 * Hook to call before the Observable.subscribe() method is about to return a Subscription.
 * @param subscription the original subscription
 * @return the original or alternative subscription that will be returned
 */
public static Subscription onObservableReturn(Subscription subscription) {
    Func1<Subscription, Subscription> f = onObservableReturn;
    if (f != null) {
        return f.call(subscription);
    }
    return subscription;
}

static void init() {
    onObservableStart = new Func2<Observable, Observable.OnSubscribe, Observable.OnSubscribe>() {
        @Override
        public Observable.OnSubscribe call(Observable t1, Observable.OnSubscribe t2) {
            return RxJavaPlugins.getInstance().getObservableExecutionHook().onSubscribeStart(t1, t2);
        }
    };

    onObservableReturn = new Func1<Subscription, Subscription>() {
        @Override
        public Subscription call(Subscription f) {
            return RxJavaPlugins.getInstance().getObservableExecutionHook().onSubscribeReturn(f);
        }
    };
}        

所以这里这两句核心代码是不就可以简写为:

(observable.onSubscribe).call(subscriber);
return subscriber

而这里observable.onSubscribe变量是谁呢? 它就是前面我们说的在Observable.onCreate(OnSubscribe<T> f)方法中将参数赋值给的那个变量,也就是OnSubscribe接口的实现类,而我们在调用onCreate()方法的时候都会实现call()方法。

/**
 * Invoked when Observable.subscribe is called.
 * @param <T> the output value type
 */
public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> {
    // cover for generics insanity
}

Over

所以到这里我们是不是就分析完了?

// 创建被观察者、数据源
Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        subscriber.onNext("Hello ");
        subscriber.onNext("World !");
        subscriber.onCompleted();
    }
});
Subscriber<String> subscriber = new Subscriber<String>() {
    @Override
    public void onCompleted() {
        Log.i("@@@", "onCompleted");
    }

    @Override
    public void onError(Throwable e) {
        Log.i("@@@", "onError");
    }

    @Override
    public void onNext(String s) {
        Log.i("@@@", "onNext : " + s);
    }
};
observable.subscribe(subscriber);

总结一下:

  • 首先我们调用Observable.create(OnSubscribe f)创建一个观察者,同时创建一个OnSubscribe接口的实现类作为create()方法的参数,并重写该接口的call()方法,在call()方法中我们调用subscriberonNext()onComplete()onError()方法。而这个call()方法中的subscriber对象就是我们后面调用observable.subscribe(subscriber);所传递的subscribe对象。
  • 然后调用new Subscriber()创建一个Subscriber类的实例。
  • 最后通过observable.subscribe(subscriber);ObservableSubscriber对象进行绑定。来订阅我们自己创建的观察者Subscriber对象。 一旦调用subscribe()方法后就会触发执行OnSubscribe.call()。而我们上面实现的OnSubscribe.call()方法会调用Subscriber类的onNext()onComplete()onError()等方法。

更多内容请看下一篇文章RxJava详解(五)