Skip to content

Latest commit

 

History

History
823 lines (706 loc) · 27.4 KB

6.RxJava详解之线程调度原理(六).md

File metadata and controls

823 lines (706 loc) · 27.4 KB

RxJava详解之线程调度原理(六)

Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        subscriber.onNext("Hello");
        subscriber.onCompleted();
        Log.i("@@@", "call" + Thread.currentThread().getName());
    }
}).subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Subscriber<String>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(String s) {
                Log.i("@@@", "onNext : " + Thread.currentThread().getName());
            }
        });

执行结果:

07-26 17:16:49.284 7266-7309/? I/@@@: callRxIoScheduler-2
07-26 17:16:49.368 7266-7266/? I/@@@: onNext : main

subscribeOn()是指定被观察者事件源的执行线程。 observeOn()是指定观察者的处理时间的线程。

subscribeOn源码分析:

public final Observable<T> subscribeOn(Scheduler scheduler) {
    if (this instanceof ScalarSynchronousObservable) {
        return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
    }
    return create(new OperatorSubscribeOn<T>(this, scheduler));
}

/**
 * A {@code Scheduler} is an object that schedules units of work. You can find common implementations of this
 * class in {@link Schedulers}.
 * 线程调度器
 */
public abstract class Scheduler {
    ...
}

它内部也是通过create()创建一个Observable但是唯一不同的是OnSubscribe传递的是OperatorSubscribeOn对象:

/**
 * Subscribes Observers on the specified {@code Scheduler}.
 */
public final class OperatorSubscribeOn<T> implements OnSubscribe<T> {

    final Scheduler scheduler;
    final Observable<T> source;

    public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler) {
        this.scheduler = scheduler;
        this.source = source;
    }

    @Override
    public void call(final Subscriber<? super T> subscriber) {
        final Worker inner = scheduler.createWorker();
        subscriber.add(inner);

        inner.schedule(new Action0() {
            @Override
            public void call() {
                final Thread t = Thread.currentThread();

                Subscriber<T> s = new Subscriber<T>(subscriber) {
                    @Override
                    public void onNext(T t) {
                        subscriber.onNext(t);
                    }

                    @Override
                    public void onError(Throwable e) {
                        try {
                            subscriber.onError(e);
                        } finally {
                            inner.unsubscribe();
                        }
                    }

                    @Override
                    public void onCompleted() {
                        try {
                            subscriber.onCompleted();
                        } finally {
                            inner.unsubscribe();
                        }
                    }

                    @Override
                    public void setProducer(final Producer p) {
                        subscriber.setProducer(new Producer() {
                            @Override
                            public void request(final long n) {
                                if (t == Thread.currentThread()) {
                                    p.request(n);
                                } else {
                                    inner.schedule(new Action0() {
                                        @Override
                                        public void call() {
                                            p.request(n);
                                        }
                                    });
                                }
                            }
                        });
                    }
                };

                source.unsafeSubscribe(s);
            }
        });
    }
}

它内部原理其实和map是一样一样的。我们就从call()方法说起,首先看一下SubscriberWorker类:

/**
 * A {@code Scheduler} is an object that schedules units of work. You can find common implementations of this
 * class in {@link Schedulers}.
 */
public abstract class Scheduler {
    /**
     * Retrieves or creates a new {@link Scheduler.Worker} that represents serial execution of actions.
     * <p>
     * When work is completed it should be unsubscribed using {@link Scheduler.Worker#unsubscribe()}.
     * <p>
     * Work on a {@link Scheduler.Worker} is guaranteed to be sequential.
     *
     * @return a Worker representing a serial queue of actions to be executed
     */
    public abstract Worker createWorker();

    /**
     * Sequential Scheduler for executing actions on a single thread or event loop.
     * <p>
     * Unsubscribing the {@link Worker} cancels all outstanding work and allows resources cleanup.
     */
    public abstract static class Worker implements Subscription {

        /**
         * Schedules an Action for execution.
         */
        public abstract Subscription schedule(Action0 action);

        /**
         * Schedules an Action for execution at some point in the future.
         */
        public abstract Subscription schedule(final Action0 action, final long delayTime, final TimeUnit unit);
    }
}    

而上面的schedulers是通过Schedulers.io()创建的,这里看一下它的源码:

public final class Schedulers {

    private final Scheduler computationScheduler;
    private final Scheduler ioScheduler;
    private final Scheduler newThreadScheduler;

    private static final AtomicReference<Schedulers> INSTANCE = new AtomicReference<Schedulers>();

    private static Schedulers getInstance() {
        for (;;) {
            Schedulers current = INSTANCE.get();
            if (current != null) {
                return current;
            }
            current = new Schedulers();
            if (INSTANCE.compareAndSet(null, current)) {
                return current;
            } else {
                current.shutdownInstance();
            }
        }
    }

    public static Scheduler io() {
        return RxJavaHooks.onIOScheduler(getInstance().ioScheduler);
    }

    private Schedulers() {
        @SuppressWarnings("deprecation")
        RxJavaSchedulersHook hook = RxJavaPlugins.getInstance().getSchedulersHook();

        Scheduler c = hook.getComputationScheduler();
        if (c != null) {
            computationScheduler = c;
        } else {
            computationScheduler = RxJavaSchedulersHook.createComputationScheduler();
        }

        // 下面几时创建ioScheduler的地方
        Scheduler io = hook.getIOScheduler();
        if (io != null) {
            ioScheduler = io;
        } else {
            ioScheduler = RxJavaSchedulersHook.createIoScheduler();
        }

        Scheduler nt = hook.getNewThreadScheduler();
        if (nt != null) {
            newThreadScheduler = nt;
        } else {
            newThreadScheduler = RxJavaSchedulersHook.createNewThreadScheduler();
        }
    }
    ...
}    

继续onIOScheduler的源码:

/**
 * 和create()方法类似,这里简单的理解为直接返回参数即可
 * Hook to call when the Schedulers.io() is called.
 * @param scheduler the default io scheduler
 * @return the default of alternative scheduler
 */
public static Scheduler onIOScheduler(Scheduler scheduler) {
    Func1<Scheduler, Scheduler> f = onIOScheduler;
    if (f != null) {
        return f.call(scheduler);
    }
    return scheduler;
}

而上面getInstance().ioScheduler的地方最终会调用到RxJavaSchedulersHook.createIoScheduler():

@Experimental
public static Scheduler createIoScheduler() {
    return createIoScheduler(new RxThreadFactory("RxIoScheduler-"));
}

public static Scheduler createIoScheduler(ThreadFactory threadFactory) {
    if (threadFactory == null) {
        throw new NullPointerException("threadFactory == null");
    }
    // 看到了吗? 这里最终的Scheduler是CachedThreadScheduler
    return new CachedThreadScheduler(threadFactory);
}

而在CachedThreadScheduler类中:

public final class CachedThreadScheduler extends Scheduler implements SchedulerLifecycle {

    @Override
    public Worker createWorker() {
        return new EventLoopWorker(pool.get());
    }
    ...
}    

所以我们通过Schedulers.io()的源码可以发现这里的具体实现类是CachedThreadScheduler。而对应的Worker的实现类是EventLoopWorker:

   static final class EventLoopWorker extends Scheduler.Worker implements Action0 {
        private final CompositeSubscription innerSubscription = new CompositeSubscription();
        private final CachedWorkerPool pool;
        private final ThreadWorker threadWorker;
        final AtomicBoolean once;

        EventLoopWorker(CachedWorkerPool pool) {
            this.pool = pool;
            this.once = new AtomicBoolean();
            this.threadWorker = pool.get();
        }

        @Override
        public void unsubscribe() {
            if (once.compareAndSet(false, true)) {
                // unsubscribe should be idempotent, so only do this once

                // Release the worker _after_ the previous action (if any) has completed
                threadWorker.schedule(this);
            }
            innerSubscription.unsubscribe();
        }

        @Override
        public void call() {
            pool.release(threadWorker);
        }

        @Override
        public boolean isUnsubscribed() {
            return innerSubscription.isUnsubscribed();
        }

        @Override
        public Subscription schedule(Action0 action) {
            return schedule(action, 0, null);
        }

        @Override
        public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
            if (innerSubscription.isUnsubscribed()) {
                // don't schedule, we are unsubscribed
                return Subscriptions.unsubscribed();
            }

            ScheduledAction s = threadWorker.scheduleActual(new Action0() {
                @Override
                public void call() {
                    if (isUnsubscribed()) {
                        return;
                    }
                    // 内部会调用外面传递过来的Action对象的call方法
                    action.call();
                }
            }, delayTime, unit);
            innerSubscription.add(s);
            s.addParent(innerSubscription);
            return s;
        }

这里会继续执行到threadWorker.scheduleActual:

public class NewThreadWorker extends Scheduler.Worker implements Subscription {
    private final ScheduledExecutorService executor;

    public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) {
        Action0 decoratedAction = RxJavaHooks.onScheduledAction(action);
        ScheduledAction run = new ScheduledAction(decoratedAction);
        Future<?> f;
        if (delayTime <= 0) {
            f = executor.submit(run);
        } else {
            f = executor.schedule(run, delayTime, unit);
        }
        run.add(f);

        return run;
    }
...
}

ScheduledAction的源码:

/**
 * A {@code Runnable} that executes an {@code Action0} and can be cancelled. The analog is the
 * {@code Subscriber} in respect of an {@code Observer}.
 */
public final class ScheduledAction extends AtomicReference<Thread> implements Runnable, Subscription {
    /** */
    private static final long serialVersionUID = -3962399486978279857L;
    final SubscriptionList cancel;
    final Action0 action;

    public ScheduledAction(Action0 action) {
        this.action = action;
        this.cancel = new SubscriptionList();
    }
    ...
}    

ScheduledAction实现了Runnable接口,通过线程池executor最终实现了线程切换。上面便是subscribeOn(Schedulers.io())实现线程切换的全部过程。

observeOn源码分析

直接上源码:

public final Observable<T> observeOn(Scheduler scheduler) {
    return observeOn(scheduler, RxRingBuffer.SIZE);
}

public final Observable<T> observeOn(Scheduler scheduler, int bufferSize) {
    return observeOn(scheduler, false, bufferSize);
}

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
    if (this instanceof ScalarSynchronousObservable) {
        return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
    }
    return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize));
}

observeOn内部是通过lift来实现的,看一下lift的源码:

public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
    return create(new OnSubscribeLift<T, R>(onSubscribe, operator));
}

归根到底还是使用了create(),那就和之前分析的基本都差不多了,直接看OnSubscribeLift:

/**
 * Transforms the downstream Subscriber into a Subscriber via an operator
 * callback and calls the parent OnSubscribe.call() method with it.
 */
public final class OnSubscribeLift<T, R> implements OnSubscribe<R> {

    final OnSubscribe<T> parent;

    final Operator<? extends R, ? super T> operator;

    public OnSubscribeLift(OnSubscribe<T> parent, Operator<? extends R, ? super T> operator) {
        this.parent = parent;
        this.operator = operator;
    }

    @Override
    public void call(Subscriber<? super R> o) {
        try {
            Subscriber<? super T> st = RxJavaHooks.onObservableLift(operator).call(o);
            try {
                // new Subscriber created and being subscribed with so 'onStart' it
                st.onStart();
                parent.call(st);
            } catch (Throwable e) {
                // localized capture of errors rather than it skipping all operators
                // and ending up in the try/catch of the subscribe method which then
                // prevents onErrorResumeNext and other similar approaches to error handling
                Exceptions.throwIfFatal(e);
                st.onError(e);
            }
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // if the lift function failed all we can do is pass the error to the final Subscriber
            // as we don't have the operator available to us
            o.onError(e);
        }
    }
}

也实现了OnSubscribe,通过前面的分析我们知道一旦调用了subscribe()将观察者与被观察绑定后就会触发被观察者所对应的OnSubscribecall()方法,所以这里会触发OnSubscribeLift.call()。在call()中调用了OperatorObserveOn.call()并返回了一个新的观察者Subscriber st,接着调用了前一级Observable对应OnSubscriber.call(st)。 所以这里要看一下OperatorObserveOn类及它的call()方法:

public final class OperatorObserveOn<T> implements Operator<T, T> {
    private final Scheduler scheduler;
    private final boolean delayError;
    private final int bufferSize;

    public OperatorObserveOn(Scheduler scheduler, boolean delayError) {
        this(scheduler, delayError, RxRingBuffer.SIZE);
    }

    public OperatorObserveOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = (bufferSize > 0) ? bufferSize : RxRingBuffer.SIZE;
    }

    @Override
    public Subscriber<? super T> call(Subscriber<? super T> child) {
        if (scheduler instanceof ImmediateScheduler) {
            // avoid overhead, execute directly
            return child;
        } else if (scheduler instanceof TrampolineScheduler) {
            // avoid overhead, execute directly
            return child;
        } else {
            ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize);
            parent.init();
            return parent;
        }
    }

这里会走到创建ObserveOnSubscriber然后调用其init()方法:

static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0 {
    final Subscriber<? super T> child;
    final Scheduler.Worker recursiveScheduler;
    final boolean delayError;
    final Queue<Object> queue;
    /** The emission threshold that should trigger a replenishing request. */
    final int limit;

    // the status of the current stream
    volatile boolean finished;

    final AtomicLong requested = new AtomicLong();

    final AtomicLong counter = new AtomicLong();

    /**
     * The single exception if not null, should be written before setting finished (release) and read after
     * reading finished (acquire).
     */
    Throwable error;

    /** Remembers how many elements have been emitted before the requests run out. */
    long emitted;

    // do NOT pass the Subscriber through to couple the subscription chain ... unsubscribing on the parent should
    // not prevent anything downstream from consuming, which will happen if the Subscription is chained
    public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child, boolean delayError, int bufferSize) {
        this.child = child;
        // 通过参数传递过来的Schedule创建对应的worker
        this.recursiveScheduler = scheduler.createWorker();
        this.delayError = delayError;
        int calculatedSize = (bufferSize > 0) ? bufferSize : RxRingBuffer.SIZE;
        // this formula calculates the 75% of the bufferSize, rounded up to the next integer
        this.limit = calculatedSize - (calculatedSize >> 2);
        if (UnsafeAccess.isUnsafeAvailable()) {
            queue = new SpscArrayQueue<Object>(calculatedSize);
        } else {
            queue = new SpscAtomicArrayQueue<Object>(calculatedSize);
        }
        // signal that this is an async operator capable of receiving this many
        request(calculatedSize);
    }

    void init() {
        // don't want this code in the constructor because `this` can escape through the
        // setProducer call
        Subscriber<? super T> localChild = child;

        localChild.setProducer(new Producer() {

            @Override
            public void request(long n) {
                if (n > 0L) {
                    BackpressureUtils.getAndAddRequest(requested, n);
                    schedule();
                }
            }

        });
        localChild.add(recursiveScheduler);
        localChild.add(this);
    }

    @Override
    public void onNext(final T t) {
        if (isUnsubscribed() || finished) {
            return;
        }
        if (!queue.offer(NotificationLite.next(t))) {
            onError(new MissingBackpressureException());
            return;
        }
        // 有该方法
        schedule();
    }

    @Override
    public void onCompleted() {
        if (isUnsubscribed() || finished) {
            return;
        }
        finished = true;
        // 有该方法
        schedule();
    }

    @Override
    public void onError(final Throwable e) {
        if (isUnsubscribed() || finished) {
            RxJavaHooks.onError(e);
            return;
        }
        error = e;
        finished = true;
        // 有该方法
        schedule();
    }

    protected void schedule() {
        if (counter.getAndIncrement() == 0) {
            // 调用recursiveScheduler,而recursiveScheduler是通过参数传递过来的
            recursiveScheduler.schedule(this);
        }
    }

    // only execute this from schedule()
    // 最终会执行到该方法
    @Override
    public void call() {
        long missed = 1L;
        long currentEmission = emitted;

        // these are accessed in a tight loop around atomics so
        // loading them into local variables avoids the mandatory re-reading
        // of the constant fields
        final Queue<Object> q = this.queue;
        final Subscriber<? super T> localChild = this.child;

        // requested and counter are not included to avoid JIT issues with register spilling
        // and their access is is amortized because they are part of the outer loop which runs
        // less frequently (usually after each bufferSize elements)

        for (;;) {
            long requestAmount = requested.get();

            while (requestAmount != currentEmission) {
                boolean done = finished;
                Object v = q.poll();
                boolean empty = v == null;

                if (checkTerminated(done, empty, localChild, q)) {
                    return;
                }

                if (empty) {
                    break;
                }
                // 该方法会调用onNext方法
                localChild.onNext(NotificationLite.<T>getValue(v));

                currentEmission++;
                if (currentEmission == limit) {
                    requestAmount = BackpressureUtils.produced(requested, currentEmission);
                    request(currentEmission);
                    currentEmission = 0L;
                }
            }

            if (requestAmount == currentEmission) {
                if (checkTerminated(finished, q.isEmpty(), localChild, q)) {
                    return;
                }
            }

            emitted = currentEmission;
            missed = counter.addAndGet(-missed);
            if (missed == 0L) {
                break;
            }
        }
    }
}

我们看到上面的onNextonComplete()onError方法里面都调用了schedule()方法,所以这个方法就是具体的切换线程的部分:

protected void schedule() {
    if (counter.getAndIncrement() == 0) {
        // 调用recursiveScheduler,而recursiveScheduler是通过参数传递过来的,该方法将所有的事件都切换到了recursiveScheduler对应的线程
        // 这里将this传递进去了,所以最后会执行到当前的call()方法
        recursiveScheduler.schedule(this);
    }
}

里面会调用recursiveScheduler,而该recursiveScheduler是通过构造函数初始化的:

public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child, boolean delayError, int bufferSize) {
    this.child = child;
    // 通过参数传递过来的Schedule创建对应的worker
    this.recursiveScheduler = scheduler.createWorker();
}

而在此处我们的示例代码中用到的SchedulerAndroidSchedulers.mainThread():

public final class AndroidSchedulers {
    private static final AtomicReference<AndroidSchedulers> INSTANCE = new AtomicReference<>();

    private final Scheduler mainThreadScheduler;

    private static AndroidSchedulers getInstance() {
        for (;;) {
            AndroidSchedulers current = INSTANCE.get();
            if (current != null) {
                return current;
            }
            current = new AndroidSchedulers();
            if (INSTANCE.compareAndSet(null, current)) {
                return current;
            }
        }
    }

    private AndroidSchedulers() {
        RxAndroidSchedulersHook hook = RxAndroidPlugins.getInstance().getSchedulersHook();

        Scheduler main = hook.getMainThreadScheduler();
        if (main != null) {
            mainThreadScheduler = main;
        } else {
            // 具体的实现类是LooperScheduler
            mainThreadScheduler = new LooperScheduler(Looper.getMainLooper());
        }
    }

    /** A {@link Scheduler} which executes actions on the Android UI thread. */
    public static Scheduler mainThread() {
        return getInstance().mainThreadScheduler;
    }
    ...
}    

我们看到这里的具体的Scheduler的实现类是LooperScheduler:

class LooperScheduler extends Scheduler {
    private final Handler handler;

    LooperScheduler(Looper looper) {
        handler = new Handler(looper);
    }

    LooperScheduler(Handler handler) {
        this.handler = handler;
    }

    @Override
    public Worker createWorker() {
        return new HandlerWorker(handler);
    }

    static class HandlerWorker extends Worker {
        private final Handler handler;
        private final RxAndroidSchedulersHook hook;
        private volatile boolean unsubscribed;

        HandlerWorker(Handler handler) {
            this.handler = handler;
            this.hook = RxAndroidPlugins.getInstance().getSchedulersHook();
        }

        @Override
        public void unsubscribe() {
            unsubscribed = true;
            handler.removeCallbacksAndMessages(this /* token */);
        }

        @Override
        public boolean isUnsubscribed() {
            return unsubscribed;
        }

        @Override
        public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
            if (unsubscribed) {
                return Subscriptions.unsubscribed();
            }

            action = hook.onSchedule(action);

            ScheduledAction scheduledAction = new ScheduledAction(action, handler);

            Message message = Message.obtain(handler, scheduledAction);
            message.obj = this; // Used as token for unsubscription operation.

            handler.sendMessageDelayed(message, unit.toMillis(delayTime));

            if (unsubscribed) {
                handler.removeCallbacks(scheduledAction);
                return Subscriptions.unsubscribed();
            }

            return scheduledAction;
        }

        @Override
        public Subscription schedule(final Action0 action) {
            return schedule(action, 0, TimeUnit.MILLISECONDS);
        }
    }

    static final class ScheduledAction implements Runnable, Subscription {
        private final Action0 action;
        private final Handler handler;
        private volatile boolean unsubscribed;

        ScheduledAction(Action0 action, Handler handler) {
            this.action = action;
            this.handler = handler;
        }

        @Override public void run() {
            try {
                action.call();
            } catch (Throwable e) {
                // nothing to do but print a System error as this is fatal and there is nowhere else to throw this
                IllegalStateException ie;
                if (e instanceof OnErrorNotImplementedException) {
                    ie = new IllegalStateException("Exception thrown on Scheduler.Worker thread. Add `onError` handling.", e);
                } else {
                    ie = new IllegalStateException("Fatal Exception thrown on Scheduler.Worker thread.", e);
                }
                RxJavaPlugins.getInstance().getErrorHandler().handleError(ie);
                Thread thread = Thread.currentThread();
                thread.getUncaughtExceptionHandler().uncaughtException(thread, ie);
            }
        }

        @Override public void unsubscribe() {
            unsubscribed = true;
            handler.removeCallbacks(this);
        }

        @Override public boolean isUnsubscribed() {
            return unsubscribed;
        }
    }
}

他里面对应的WorkerHandlerWorker,他里面也是通过ScheduledAction来实现的,他实现了Runnable接口。通过主线程的MainLooper创建一个Handler然后去执行ScheduledAction中的run()方法。然后在run()方法中调用了ObserveOnSubscriber.call(),这便是它实现线程切换的原理。

更多内容请看下一篇文章RxJava详解(七)