You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
/** * Executes the given task sometime in the future. The task * may execute in a new thread or in an existing pooled thread. * * If the task cannot be submitted for execution, either because this * executor has been shutdown or because its capacity has been reached, * the task is handled by the current {@code RejectedExecutionHandler}. * * @param command the task to execute * @throws RejectedExecutionException at discretion of * {@code RejectedExecutionHandler}, if the task * cannot be accepted for execution * @throws NullPointerException if {@code command} is null */publicvoidexecute(Runnablecommand) {
if (command == null)
thrownewNullPointerException();
/* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */intc = ctl.get();
if (workerCountOf(c) < corePoolSize) {
//如果线程池中线程数没有达到corePoolSize,则新增线程(worker)if (addWorker(command, true))
return;
//更新c值。c = ctl.get();
}
//线程池处于RUNNING状态,并且阻塞队列未满//workQueue.offer(command)是非阻塞方法,当队列满时直接返回false(例如,SynchronousQueue如果没有线程在阻塞take,则返回false)if (isRunning(c) && workQueue.offer(command)) {
intrecheck = ctl.get();
//再次检查状态,如果发现不是RUNNING状态,则remove掉刚才offer的任务。if (! isRunning(recheck) && remove(command))
reject(command);
//如果有效线程数==0,添加一个线程,而不去启动它。??//怎么会==0?elseif (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//如果不是RUNNING状态,或者阻塞队列已满,则添加线程//如果不能添加,则reject。//false 表示添加的线程属于maximumPoolSize,如果线程数已经达到maximumPoolSize,则rejectelseif (!addWorker(command, false))
reject(command);
}
当线程池中线程已满,并且都处于忙碌状态。此时semaphore的值==线程池线程数,addThread被semaphore.acquire()阻塞,禁止submit新任务。当线程池中一个线程t1执行了runWorker(Worker w)中的task.run(),main线程就可以执行Future take = executorCompletionService.take()获取结果并semaphore.release()释放信号量。
目录
ThreadPoolExecutor源码
线程池状态标志
ctl 保存了线程池的运行状态(runState)和线程池内有效线程数量(workerCount)。
用 ctl 的高3位来表示线程池的运行状态, 用低29位来表示线程池内有效线程的数量。ctlOf() 方法用于计算出ctl的值。runStateOf()和workerCountOf()方法分别通过CAPACITY来计算得到其runState和workerCount,CAPACITY=29个1。
线程池的运行状态:
execute(Runnable command)
addWorker(Runnable firstTask, boolean core)
runWorker(Worker w)
运行worker,该线程不断的getTask()从队列中获取任务,然后 task.run();运行。只要队列中有值则不断循环。
getTask()
ThreadPoolExecutor阻塞添加任务
使用semaphore限流ThreadPoolExecutor(失效及原因)
考虑到当线程池满时(任务数 > maximumPoolSize + Queue.size()),会执行饱和策略。默认AbortPolicy ,抛出RejectedExecutionException。
怎么能避免线程池拒绝提交的任务呢?首先想到通过信号量Semaphore来控制任务的添加。代码如下:
注意:该代码是无效的。
只是在submit之前添加semaphore.acquire(); 在获取future后,添加semaphore.release();。
但这样依然会产生RejectedExecutionException。
通过源码分析原因,
当线程池中线程已满,并且都处于忙碌状态。此时semaphore的值==线程池线程数,addThread被semaphore.acquire()阻塞,禁止submit新任务。当线程池中一个线程t1执行了runWorker(Worker w)中的task.run(),main线程就可以执行Future take = executorCompletionService.take()获取结果并semaphore.release()释放信号量。
释放信号量semaphore后,addThread线程可以submit新任务,假设此时t1线程还没有执行到getTask() 中的poll()和take()方法。此时workQueue队列依然是满的。
而addThread已经执行到execute()的
当workQueue已满,offer() 直接返回false(正确的顺序应该是等t1线程执行到workQueue.take()后addThread再开始执行workQueue.offer(command)。)。执行execute() 如下逻辑
addWork()中,wc = maximumPoolSize 返回false。
执行reject(),抛出RejectedExecutionException。
使用自定义队列(不建议)
其思想就是替换BlockingQueue中的offer()方法为put()方法,这样execute() 中的workQueue.offer(command),就变成put(),阻塞添加任务,不会存在workQueue.offer() 返回false的情况。
但这样的问题是下面的else if (!addWorker(command, false)) 代码逻辑将无法执行,导致的结果就是,只针对corePoolSize==maxPoolSize 时有效。不建议这么做。
自定义RejectedExecutionHandler
通过自定义RejectedExecutionHandler,在reject时调用Queue的put()方法,阻塞式添加任务。
使用CallerRunsPolicy
其实忙活一圈,发现最简单的方式就是使用ThreadPoolExecutor.CallerRunsPolicy。
CallerRunsPolicy被拒绝的任务,谁submit的谁执行。想想之前的各种阻塞也对,负责添加任务的线程因为线程池满了就阻塞在那里,还不如帮着执行一些任务..
Reference
The text was updated successfully, but these errors were encountered: