New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Java线程池ThreadPoolExecutor实现原理剖析 #28

Open
aCoder2013 opened this Issue Sep 4, 2018 · 2 comments

Comments

Projects
None yet
2 participants
@aCoder2013
Owner

aCoder2013 commented Sep 4, 2018

引言

在Java中,使用线程池来异步执行一些耗时任务是非常常见的操作。最初我们一般都是直接使用new Thread().start的方式,但我们知道,线程的创建和销毁都会耗费大量的资源,关于线程可以参考之前的一片博客Java线程那点事儿, 因此我们需要重用线程资源。

当然也有其他待解决方案,比如说coroutine, 目前Kotlin已经支持了,JDK也已经有了相关的提案:Project Loom, 目前的实现方式和Kotlin有点类似,都是基于ForkJoinPool,当然目前还有很多限制,以及问题没解决,比如synchronized还是锁住当前线程等。

继承结构

image
继承结构看起来很清晰,最顶层的Executor只提供了一个最简单的void execute(Runnable command)方法,然后是ExecutorService,ExecutorService提供了一些管理相关的方法,例如关闭、判断当前线程池的状态等,另外不同于Executor#execute,ExecutorService提供了一系列方法,可以将任务包装成一个Future,从而使得任务提交方可以跟踪任务的状态。而父类AbstractExecutorService则提供了一些默认的实现。

构造器

ThreadPoolExecutor的构造器提供了非常多的参数,每一个参数都非常的重要,一不小心就容易踩坑,因此设置的时候,你必须要知道自己在干什么。

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
  1. corePoolSize、 maximumPoolSize。线程池会自动根据corePoolSize和maximumPoolSize去调整当前线程池的大小。当你通过submit或者execute方法提交任务的时候,如果当前线程池的线程数小于corePoolSize,那么线程池就会创建一个新的线程处理任务, 即使其他的core线程是空闲的。如果当前线程数大于corePoolSize并且小于maximumPoolSize,那么只有在队列"满"的时候才会创建新的线程。因此这里会有很多的坑,比如你的core和max线程数设置的不一样,希望请求积压在队列的时候能够实时的扩容,但如果制定了一个无界队列,那么就不会扩容了,因为队列不存在满的概念。

  2. keepAliveTime。如果当前线程池中的线程数超过了corePoolSize,那么如果在keepAliveTime时间内都没有新的任务需要处理,那么超过corePoolSize的这部分线程就会被销毁。默认情况下是不会回收core线程的,可以通过设置allowCoreThreadTimeOut改变这一行为。

  3. workQueue。即实际用于存储任务的队列,这个可以说是最核心的一个参数了,直接决定了线程池的行为,比如说传入一个有界队列,那么队列满的时候,线程池就会根据core和max参数的设置情况决定是否需要扩容,如果传入了一个SynchronousQueue,这个队列只有在另一个线程在同步remove的时候才可以put成功,对应到线程池中,简单来说就是如果有线程池任务处理完了,调用poll或者take方法获取新的任务的时候,新提交的任务才会put成功,否则如果当前的线程都在忙着处理任务,那么就会put失败,也就会走扩容的逻辑,如果传入了一个DelayedWorkQueue,顾名思义,任务就会根据过期时间来决定什么时候弹出,即为ScheduledThreadPoolExecutor的机制。

  4. threadFactory。创建线程都是通过ThreadFactory来实现的,如果没指定的话,默认会使用Executors.defaultThreadFactory() ,一般来说,我们会在这里对线程设置名称、异常处理器等。

  5. handler。即当任务提交失败的时候,会调用这个处理器,ThreadPoolExecutor内置了多个实现,比如抛异常、直接抛弃等。这里也需要根据业务场景进行设置,比如说当队列积压的时候,针对性的对线程池扩容或者发送告警等策略。

看完这几个参数的含义,我们看一下Executors提供的一些工具方法,只要是为了方便使用,但是我建议最好少用这个类,而是直接用ThreadPoolExecutor的构造函数,多了解一下这几个参数到底是什么意思,自己的业务场景是什么样的,比如线程池需不需要扩容、用不用回收空闲的线程等。

public class Executors {
    
    /*
    * 提供一个固定大小的线程池,并且线程不会回收,由于传入的是一个无界队列,相当于队列永远不会满
    * 也就不会扩容,因此需要特别注意任务积压在队列中导致内存爆掉的问题
    */
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }


    /*
    *  这个线程池会一直扩容,由于SynchronousQueue的特性,如果当前所有的线程都在处理任务,那么
    *  新的请求过来,就会导致创建一个新的线程处理任务。如果线程一分钟没有新任务处理,就会被回 
    *  收掉。特别注意,如果每一个任务都比较耗时,并发又比较高,那么可能每次任务过来都会创建一个线 
    *  程
    */
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
}

源码分析

既然是个线程池,那就必然有其生命周期:运行中、关闭、停止等。ThreadPoolExecutor是用一个AtomicInteger去的前三位表示这个状态的,另外又重用了低29位用于表示线程数,可以支持最大大概5亿多,绝逼够用了,如果以后硬件真的发展到能够启动这么多线程,改成AtomicLong就可以了。
状态这里主要分为下面几种:

  1. RUNNING: 表示当前线程池正在运行中,可以接受新任务以及处理队列中的任务
  2. SHUTDOWN: 不再接受新的任务,但会继续处理队列中的任务
  3. STOP: 不再接受新的任务,也不处理队列中的任务了,并且会中断正在进行中的任务
  4. TIDYING: 所有任务都已经处理完毕,线程数为0,转为为TIDYING状态之后,会调用terminated()回调
  5. TERMINATED: terminated()已经执行完毕

同时我们可以看到所有的状态都是用二进制位表示的,并且依次递增,从而方便进行比较,比如想获取当前状态是否至少为SHUTDOWN等,同时状态之前有几种转换:

  1. RUNNING -> SHUTDOWN。调用了shutdown()之后,或者执行了finalize()
  2. (RUNNING 或者 SHUTDOWN) -> STOP。调用了shutdownNow()之后会转换这个状态
  3. SHUTDOWN -> TIDYING。当线程池和队列都为空的时候
  4. STOP -> TIDYING。当线程池为空的时候
  5. IDYING -> TERMINATED。执行完terminated()回调之后会转换为这个状态
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    //由于前三位表示状态,因此将CAPACITY取反,和进行与操作即可
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    
    //高三位+第三位进行或操作即可
    private static int ctlOf(int rs, int wc) { return rs | wc; }

    private static boolean runStateLessThan(int c, int s) {
        return c < s;
    }

    private static boolean runStateAtLeast(int c, int s) {
        return c >= s;
    }

    private static boolean isRunning(int c) {
        return c < SHUTDOWN;
    }
    
    //下面三个方法,通过CAS修改worker的数目
    private boolean compareAndIncrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect + 1);
    }
    
    //只尝试一次,失败了则返回,是否重试由调用方决定
    private boolean compareAndDecrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect - 1);
    }
    
    //跟上一个不一样,会一直重试
    private void decrementWorkerCount() {
        do {} while (! compareAndDecrementWorkerCount(ctl.get()));
    }

下面是比较核心的字段,这里workers采用的是非线程安全的HashSet, 而不是线程安全的版本,主要是因为这里有些复合的操作,比如说将worker添加到workers后,我们还需要判断是否需要更新largestPoolSize等,workers只在获取到mainLock的情况下才会进行读写,另外这里的mainLock也用于在中断线程的时候串行执行,否则如果不加锁的话,可能会造成并发去中断线程,引起不必要的中断风暴。

private final ReentrantLock mainLock = new ReentrantLock();

private final HashSet<Worker> workers = new HashSet<Worker>();

private final Condition termination = mainLock.newCondition();

private int largestPoolSize;

private long completedTaskCount;

核心方法

拿到一个线程池之后,我们就可以开始提交任务,让它去执行了,那么我们看一下submit方法是如何实现的。

    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

这两个方法都很简单,首先将提交过来的任务(有两种形式:Callable、Runnable )都包装成统一的 RunnableFuture,然后调用execute方法,execute可以说是线程池最核心的一个方法。

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        /*
            获取当前worker的数目,如果小于corePoolSize那么就扩容,
            这里不会判断是否已经有core线程,而是只要小于corePoolSize就会直接增加worker
         */
        if (workerCountOf(c) < corePoolSize) {
            /*
                调用addWorker(Runnable firstTask, boolean core)方法扩容
                firstTask表示为该worker启动之后要执行的第一个任务,core表示要增加的为core线程
             */
            if (addWorker(command, true))
                return;
            //如果增加失败了那么重新获取ctl的快照,比如可能线程池在这期间关闭了
            c = ctl.get();
        }
        /*
             如果当前线程池正在运行中,并且将任务丢到队列中成功了,
             那么就会进行一次double check,看下在这期间线程池是否关闭了,
             如果关闭了,比如处于SHUTDOWN状态,如上文所讲的,SHUTDOWN状态的时候,
             不再接受新任务,remove成功后调用拒绝处理器。而如果仍然处于运行中的状态,
             那么这里就double check下当前的worker数,如果为0,有可能在上述逻辑的执行
             过程中,有worker销毁了,比如说任务抛出了未捕获异常等,那么就会进行一次扩容,
             但不同于扩容core线程,这里由于任务已经丢到队列中去了,因此就不需要再传递firstTask了,
             同时要注意,这里扩容的是非core线程
         */
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            /*
                如果在上一步中,将任务丢到队列中失败了,那么就进行一次扩容,
                这里会将任务传递到firstTask参数中,并且扩容的是非core线程,
                如果扩容失败了,那么就执行拒绝策略。
             */
            reject(command);
    }

这里要特别注意下防止队列失败的逻辑,不同的队列丢任务的逻辑也不一样,例如说无界队列,那么就永远不会put失败,也就是说扩容也永远不会执行,如果是有界队列,那么当队列满的时候,会扩容非core线程,如果是SynchronousQueue,这个队列比较特殊,当有另外一个线程正在同步获取任务的时候,你才能put成功,因此如果当前线程池中所有的worker都忙着处理任务的时候,那么后续的每次新任务都会导致扩容, 当然如果worker没有任务处理了,阻塞在获取任务这一步的时候,新任务的提交就会直接丢到队列中去,而不会扩容。
上文中多次提到了扩容,那么我们下面看一下线程池具体是如何进行扩容的:

    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            //获取当前线程池的状态
            int rs = runStateOf(c);

            /*
                如果状态为大于SHUTDOWN, 比如说STOP,STOP上文说过队列中的任务不处理了,也不接受新任务,
                因此可以直接返回false不扩容了,如果状态为SHUTDOWN并且firstTask为null,同时队列非空,
                那么就可以扩容
             */
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                    firstTask == null &&
                    ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                /*
                    若worker的数目大于CAPACITY则直接返回,
                    然后根据要扩容的是core线程还是非core线程,进行判断worker数目
                    是否超过设置的值,超过则返回
                 */
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                /*
                    通过CAS的方式自增worker的数目,成功了则直接跳出循环
                 */
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                //重新读取状态变量,如果状态改变了,比如线程池关闭了,那么就跳到最外层的for循环,
                //注意这里跳出的是retry。
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            //创建Worker
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    /*
                        获取锁,并判断线程池是否已经关闭
                     */
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // 若线程已经启动了,比如说已经调用了start()方法,那么就抛异常,
                            throw new IllegalThreadStateException();
                        //添加到workers中
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize) //更新largestPoolSize
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    //若Worker创建成功,则启动线程,这么时候worker就会开始执行任务了
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                //添加失败
                addWorkerFailed(w);
        }
        return workerStarted;
    } 

    private void addWorkerFailed(Worker w) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (w != null)
                workers.remove(w);
            decrementWorkerCount();
            //每次减少worker或者从队列中移除任务的时候都需要调用这个方法
            tryTerminate();
        } finally {
            mainLock.unlock();
        }
    }

这里有个貌似不太起眼的方法tryTerminate,这个方法会在所有可能导致线程池终结的地方调用,比如说减少worker的数目等,如果满足条件的话,那么将线程池转换为TERMINATED状态。另外这个方法没有用private修饰,因为ScheduledThreadPoolExecutor继承自ThreadPoolExecutor,而ScheduledThreadPoolExecutor也会调用这个方法。

    final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            /*
                如果当前线程处于运行中、TIDYING、TERMINATED状态则直接返回,运行中的没
                什么好说的,后面两种状态可以说线程池已经正在终结了,另外如果处于SHUTDOWN状态,
                并且workQueue非空,表明还有任务需要处理,也直接返回
             */
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;
            //可以退出,但是线程数非0,那么就中断一个线程,从而使得关闭的信号能够传递下去,
            //中断worker后,worker捕获异常后,会尝试退出,并在这里继续执行tryTerminate()方法,
            //从而使得信号传递下去
            if (workerCountOf(c) != 0) {
                interruptIdleWorkers(ONLY_ONE);
                return;
            }

            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                //尝试转换成TIDYING状态,执行完terminated回调之后
                //会转换为TERMINATED状态,这个时候线程池已经完整关闭了,
                //通过signalAll方法,唤醒所有阻塞在awaitTermination上的线程
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        terminated();
                    } finally {
                        ctl.set(ctlOf(TERMINATED, 0));
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }

    /**
     * 中断空闲的线程
     * @param onlyOne
     */
    private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                //遍历所有worker,若之前没有被中断过,
                //并且获取锁成功,那么就尝试中断。
                //锁能够获取成功,那么表明当前worker没有在执行任务,而是在
                //获取任务,因此也就达到了只中断空闲线程的目的。
                Thread t = w.thread;
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }

image

Worker

下面看一下Worker类,也就是这个类实际负责执行任务,Worker类继承自AbstractQueuedSynchronizer,AQS可以理解为一个同步框架,提供了一些通用的机制,利用模板方法模式,让你能够原子的管理同步状态、blocking和unblocking线程、以及队列,具体的内容之后有时间会再写,还是比较复杂的。这里Worker对AQS的使用相对比较简单,使用了状态变量state表示是否获得锁,0表示解锁、1表示已获得锁,同时通过exclusiveOwnerThread存储当前持有锁的线程。另外再简单提一下,比如说CountDownLatch, 也是基于AQS框架实现的,countdown方法递减state,await阻塞等待state为0。

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
      
        /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;

        /** Initial task to run.  Possibly null. */
        Runnable firstTask;

        /** Per-thread task counter */
        volatile long completedTasks;

        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        /** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }
       protected boolean isHeldExclusively() {
            return getState() != 0;
        }

        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }

        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

注意这里Worker初始化的时候,会通过setState(-1)将state设置为-1,并在runWorker()方法中置为0,上文说过Worker是利用state这个变量来表示锁的状态,那么加锁的操作就是通过CAS将state从0改成1,那么初始化的时候改成-1,也就是表示在Worker启动之前,都不允许加锁操作,我们再看interruptIfStarted()以及interruptIdleWorkers()方法,这两个方法在尝试中断Worker之前,都会先加锁或者判断state是否大于0,因此这里的将state设置为-1,就是为了禁止中断操作,并在runWorker中置为0,也就是说只能在Worker启动之后才能够中断Worker。
另外线程启动之后,其实就是调用了runWorker方法,下面我们看一下具体是如何实现的。

   final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // 调用unlock()方法,将state置为0,表示其他操作可以获得锁或者中断worker
        boolean completedAbruptly = true;
        try {
            /*
                首先尝试执行firstTask,若没有的话,则调用getTask()从队列中获取任务
             */
            while (task != null || (task = getTask()) != null) {
                w.lock();
                /*
                    如果线程池正在关闭,那么中断线程。
                 */
                if ((runStateAtLeast(ctl.get(), STOP) ||
                    (Thread.interrupted() &&
                        runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    //执行beforeExecute回调
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        //实际开始执行任务
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        //执行afterExecute回调
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    //这里加了锁,因此没有线程安全的问题,volatile修饰保证其他线程的可见性
                    w.completedTasks++;
                    w.unlock();//解锁
                }
            }
            completedAbruptly = false;
        } finally {
            //抛异常了,或者当前队列中已没有任务需要处理等
            processWorkerExit(w, completedAbruptly);
        }
    }

private void processWorkerExit(Worker w, boolean completedAbruptly) {
        //如果是异常终止的,那么减少worker的数目
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            //将当前worker中workers中删除掉,并累加当前worker已执行的任务到completedTaskCount中
            completedTaskCount += w.completedTasks;
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }

        //上文说过,减少worker的操作都需要调用这个方法
        tryTerminate();

        /*
            如果当前线程池仍然是运行中的状态,那么就看一下是否需要新增另外一个worker替换此worker
         */
        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {
            /*
                如果是异常结束的则直接扩容,否则的话则为正常退出,比如当前队列中已经没有任务需要处理,
                如果允许core线程超时的话,那么看一下当前队列是否为空,空的话则不用扩容。否则话看一下
                是否少于corePoolSize个worker在运行。
             */
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            addWorker(null, false);
        }
    }

     private Runnable getTask() {
        boolean timedOut = false; // 上一次poll()是否超时了

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // 若线程池关闭了(状态大于STOP)
            // 或者线程池处于SHUTDOWN状态,但是队列为空,那么返回null
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            /*
                如果允许core线程超时 或者 不允许core线程超时但当前worker的数目大于core线程数,
                那么下面的poll()则超时调用
             */
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            /*
                获取任务超时了并且(当前线程池中还有不止一个worker 或者 队列中已经没有任务了),那么就尝试
                减少worker的数目,若失败了则重试
             */
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                //从队列中抓取任务
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                //走到这里表明,poll调用超时了
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

关闭线程池

关闭线程池一般有两种形式,shutdown()和shutdownNow()

    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            //通过CAS将状态更改为SHUTDOWN,这个时候线程池不接受新任务,但会继续处理队列中的任务
            advanceRunState(SHUTDOWN);
            //中断所有空闲的worker,也就是说除了正在处理任务的worker,其他阻塞在getTask()上的worker
            //都会被中断
            interruptIdleWorkers();
            //执行回调
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        //这个方法不会等待所有的任务处理完成才返回
    }
    public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            /*
                不同于shutdown(),会转换为STOP状态,不再处理新任务,队列中的任务也不处理,
                而且会中断所有的worker,而不只是空闲的worker
             */
            advanceRunState(STOP);
            interruptWorkers();
            tasks = drainQueue();//将所有的任务从队列中弹出
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }

    private List<Runnable> drainQueue() {
        BlockingQueue<Runnable> q = workQueue;
        ArrayList<Runnable> taskList = new ArrayList<Runnable>();
        /*
            将队列中所有的任务remove掉,并添加到taskList中,
            但是有些队列比较特殊,比如说DelayQueue,如果第一个任务还没到过期时间,则不会弹出,
            因此这里通过调用toArray方法,然后再一个一个的remove掉
         */
        q.drainTo(taskList);
        if (!q.isEmpty()) {
            for (Runnable r : q.toArray(new Runnable[0])) {
                if (q.remove(r))
                    taskList.add(r);
            }
        }
        return taskList;
    }

从上文中可以看到,调用了shutdown()方法后,不会等待所有的任务处理完毕才返回,因此需要调用awaitTermination()来实现

    public boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (;;) {
                //线程池若已经终结了,那么就返回
                if (runStateAtLeast(ctl.get(), TERMINATED))
                    return true;
                //若超时了,也返回掉
                if (nanos <= 0)
                    return false;
                //阻塞在信号量上,等待线程池终结,但是要注意这个方法可能会因为一些未知原因随时唤醒当前线程,
                //因此需要重试,在tryTerminate()方法中,执行完terminated()回调后,表明线程池已经终结了,
                //然后会通过termination.signalAll()唤醒当前线程
                nanos = termination.awaitNanos(nanos);
            }
        } finally {
            mainLock.unlock();
        }
    }

一些统计相关的方法

    public int getPoolSize() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            //若线程已终结则直接返回0,否则计算works中的数目
           //想一下为什么不用workerCount呢?
            return runStateAtLeast(ctl.get(), TIDYING) ? 0
                : workers.size();
        } finally {
            mainLock.unlock();
        }
    }

   public int getActiveCount() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            int n = 0;
            for (Worker w : workers)
                if (w.isLocked())//上锁的表明worker当前正在处理任务,也就是活跃的worker
                    ++n;
            return n;
        } finally {
            mainLock.unlock();
        }
    }


    public int getLargestPoolSize() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            return largestPoolSize;
        } finally {
            mainLock.unlock();
        }
    }

    //获取任务的总数,这个方法慎用,若是个无解队列,或者队列挤压比较严重,会很蛋疼
    public long getTaskCount() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            long n = completedTaskCount;//比如有些worker被销毁后,其处理完成的任务就会叠加到这里
            for (Worker w : workers) {
                n += w.completedTasks;//叠加历史处理完成的任务
                if (w.isLocked())//上锁表明正在处理任务,也算一个
                    ++n;
            }
            return n + workQueue.size();//获取队列中的数目
        } finally {
            mainLock.unlock();
        }
    }


    public long getCompletedTaskCount() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            long n = completedTaskCount;
            for (Worker w : workers)
                n += w.completedTasks;
            return n;
        } finally {
            mainLock.unlock();
        }
    }

总结

这篇博客基本上覆盖了线程池的方方面面,但仍然有非常多的细节可以深究,比如说异常的处理,可以参照之前的一篇博客:深度解析Java线程池的异常处理机制 ,另外还有AQS、unsafe等可以之后再单独总结。

Flag Counter

@DockOne

This comment has been minimized.

DockOne commented Sep 7, 2018

你好,我是DockOne.io社区负责人李颖杰,想转载您的这篇文章,请问可以么?我的微信号是:liyingjiesd,如果方便可以加微信具体聊下,谢谢!

@aCoder2013

This comment has been minimized.

Owner

aCoder2013 commented Sep 7, 2018

@DockOne 嗯,可以,注明原文出处和作者即可

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment