Skip to content

Commit

Permalink
JAVA源码解析 增加线程池源码
Browse files Browse the repository at this point in the history
  • Loading branch information
0xcaffebabe committed Feb 27, 2020
1 parent 4a8ecf3 commit c73bcd4
Show file tree
Hide file tree
Showing 4 changed files with 276 additions and 0 deletions.
1 change: 1 addition & 0 deletions SUMMARY.md
Expand Up @@ -255,6 +255,7 @@
- [队列](./编程语言/JAVA/JAVA源码解析/队列.md)
- [线程](./编程语言/JAVA/JAVA源码解析/线程.md)
- [](./编程语言/JAVA/JAVA源码解析/锁.md)
- [线程池](./编程语言/JAVA/JAVA源码解析/线程池.md)
- [JVM](./编程语言/JAVA/JVM/JVM.md)
- [运行参数](./编程语言/JAVA/JVM/运行参数.md)
- [内存结构](./编程语言/JAVA/JVM/内存结构.md)
Expand Down
Binary file added assets/2020227152611.jfif
Binary file not shown.
Binary file added assets/2020227153022.jfif
Binary file not shown.
275 changes: 275 additions & 0 deletions 编程语言/JAVA/JAVA源码解析/线程池.md
@@ -0,0 +1,275 @@
# ThreadPoolExecutor 线程池

解决了两个问题:

1:通过减少任务间的调度开销 (主要是通过线程池中的线程被重复使用的方式),来提高大量任务时的执行性能;
2:提供了一种方式来管理线程和消费,维护基本数据统计等工作

## 整体架构

![2020227152611](/assets/2020227152611.jfif)

## 运行状态图

![2020227153022](/assets/2020227153022.jfif)

## Wroker

在线程池中,最小的执行单位就是 Worker

```java
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{


// 运行任务的线程
final Thread thread;
// 任务代码块
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;

Worker(Runnable firstTask) {
// 把自己作为一个代码块穿给线程
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
// 线程是通过线程工程创建的
this.thread = getThreadFactory().newThread(this);
}

public void run() {
// 这里就将任务的执行交给线程池了
runWorker(this);
}
...
}
```

## 任务提交

```java
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 如果工作线程数小于coreSize
if (workerCountOf(c) < corePoolSize) {
// 则直接创建worker执行
if (addWorker(command, true))
return;
c = ctl.get();
}
// 如果线程池状态正常,并且工作队列还能入队
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 线程池状态异常,尝试从队列移除任务
if (! isRunning(recheck) && remove(command))
// 移除成功就拒绝任务
reject(command);
// 工作线程为0
else if (workerCountOf(recheck) == 0)
// 直接创建worker执行
addWorker(null, false);
}
// 队列满了,如果线程数超过maxSize,拒绝任务
else if (!addWorker(command, false))
reject(command);
}
```

addWorker 方法首先是执行了一堆校验,然后使用 new Worker (firstTask) 新建了 Worker,最后使用 t.start () 执行 Worker,所以 t.start () 会执行到 Worker 的 run 方法上,到runWorker 方法里

```java
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
// 校验各种状态
for (int c = ctl.get();;) {
// Check if queue empty only if necessary.
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP)
|| firstTask != null
|| workQueue.isEmpty()))
return false;
for (;;) {
if (workerCountOf(c)
>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateAtLeast(c, SHUTDOWN))
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 把任务交给worker,此时要执行的任务就已经传入给worker里面的thread了
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int c = ctl.get();
// 检查线程池状态
if (isRunning(c) ||
(runStateLessThan(c, STOP) && firstTask == null)) {
if (t.getState() != Thread.State.NEW)
throw new IllegalThreadStateException();
workers.add(w);
workerAdded = true;
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 启动线程,执行worker
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
```

```java
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 所以说在这里,在不断地取任务执行
// 如果要执行的task为空,则会去取一个task,取不到就阻塞
while (task != null || (task = getTask()) != null) {
// 锁住worker,防止一个任务多个线程执行
w.lock();
// 线程池stop了,让线程中断
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 执行前钩子函数
beforeExecute(wt, task);
try {
// 执行真正的任务
// 而执行这个任务的线程就是worker里面的thread
task.run();
// 执行后钩子函数
afterExecute(task, null);
} catch (Throwable ex) {
// 异常钩子函数
afterExecute(task, ex);
throw ex;
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
```

```java
private Runnable getTask() {
// 如果设置了线程超时时间,超过一定时间没有任务,超出coreSize部分的线程会被回收
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
// 检查线程池状态
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 如果线程数大于maxSize但是存活时间还没超过keepalive,则跳过后面取任务的部分
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 超过keepAliveTime时间取不到数据就返回,此时线程不再运行,结束了,JVM会回收掉
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
```

## 应用场景

### coreSize == maxSize

让线程一下子增加到 maxSize,并且不要回收线程,防止线程回收,避免不断增加回收的损耗

### maxSize 无界 + SynchronousQueue

当任务被消费时,才会返回,这样请求就能够知道当前请求是已经在被消费了,如果是其他的队列的话,我们只知道任务已经被提交成功了,但无法知道当前任务是在被消费中,还是正在队列中堆积

比较消耗资源,大量请求到来时,我们会新建大量的线程来处理请求

### maxSize 有界 + Queue 无界

对实时性要求不大,但流量忽高忽低的场景下,可以使用这种方式

当流量高峰时,大量的请求被阻塞在队列中,对于请求的实时性难以保证

### maxSize 有界 + Queue 有界

把队列从无界修改成有界,只要排队的任务在要求的时间内,能够完成任务即可

### keepAliveTime 设置无穷大

想要空闲的线程不被回收,我们可以设置 keepAliveTime 为无穷大值

### 线程池的公用和独立

查询和写入不公用线程池,如果公用的话,当查询量很大时,写入的请求可能会到队列中去排队,无法及时被处理

原则上来说,每个写入业务场景都独自使用自己的线程池,绝不共用,这样在业务治理、限流、熔断方面都比较容易
多个查询业务场景是可以公用线程池的

## 问题

### 对线程池的理解

线程池结合了锁、线程、队列等元素,在请求量较大的环境下,可以多线程的处理请求,充分的利用了系统的资源,提高了处理请求的速度

### 队列在线程池中起的作用

请求数大于 coreSize 时,可以让任务在队列中排队,让线程池中的线程慢慢的消费请求
当线程消费完所有的线程后,会阻塞的从队列中拿数据,通过队列阻塞的功能,使线程不消亡

0 comments on commit c73bcd4

Please sign in to comment.