diff --git a/SUMMARY.md b/SUMMARY.md index 45947e79c8..45c85819b7 100644 --- a/SUMMARY.md +++ b/SUMMARY.md @@ -255,6 +255,7 @@ - [队列](./编程语言/JAVA/JAVA源码解析/队列.md) - [线程](./编程语言/JAVA/JAVA源码解析/线程.md) - [锁](./编程语言/JAVA/JAVA源码解析/锁.md) + - [线程池](./编程语言/JAVA/JAVA源码解析/线程池.md) - [JVM](./编程语言/JAVA/JVM/JVM.md) - [运行参数](./编程语言/JAVA/JVM/运行参数.md) - [内存结构](./编程语言/JAVA/JVM/内存结构.md) diff --git a/assets/2020227152611.jfif b/assets/2020227152611.jfif new file mode 100644 index 0000000000..30d490a7ca Binary files /dev/null and b/assets/2020227152611.jfif differ diff --git a/assets/2020227153022.jfif b/assets/2020227153022.jfif new file mode 100644 index 0000000000..bf0baf1185 Binary files /dev/null and b/assets/2020227153022.jfif differ diff --git "a/\347\274\226\347\250\213\350\257\255\350\250\200/JAVA/JAVA\346\272\220\347\240\201\350\247\243\346\236\220/\347\272\277\347\250\213\346\261\240.md" "b/\347\274\226\347\250\213\350\257\255\350\250\200/JAVA/JAVA\346\272\220\347\240\201\350\247\243\346\236\220/\347\272\277\347\250\213\346\261\240.md" new file mode 100644 index 0000000000..b243e37995 --- /dev/null +++ "b/\347\274\226\347\250\213\350\257\255\350\250\200/JAVA/JAVA\346\272\220\347\240\201\350\247\243\346\236\220/\347\272\277\347\250\213\346\261\240.md" @@ -0,0 +1,275 @@ +# ThreadPoolExecutor 线程池 + +解决了两个问题: + +1:通过减少任务间的调度开销 (主要是通过线程池中的线程被重复使用的方式),来提高大量任务时的执行性能; +2:提供了一种方式来管理线程和消费,维护基本数据统计等工作 + +## 整体架构 + +![2020227152611](/assets/2020227152611.jfif) + +## 运行状态图 + +![2020227153022](/assets/2020227153022.jfif) + +## Wroker + +在线程池中,最小的执行单位就是 Worker + +```java +private final class Worker + extends AbstractQueuedSynchronizer + implements Runnable +{ + + + // 运行任务的线程 + final Thread thread; + // 任务代码块 + Runnable firstTask; + /** Per-thread task counter */ + volatile long completedTasks; + + Worker(Runnable firstTask) { + // 把自己作为一个代码块穿给线程 + setState(-1); // inhibit interrupts until runWorker + this.firstTask = firstTask; + // 线程是通过线程工程创建的 + this.thread = getThreadFactory().newThread(this); + } + + public void run() { + // 这里就将任务的执行交给线程池了 + runWorker(this); + } + ... +} +``` + +## 任务提交 + +```java +public void execute(Runnable command) { + if (command == null) + throw new NullPointerException(); + int c = ctl.get(); + // 如果工作线程数小于coreSize + if (workerCountOf(c) < corePoolSize) { + // 则直接创建worker执行 + if (addWorker(command, true)) + return; + c = ctl.get(); + } + // 如果线程池状态正常,并且工作队列还能入队 + if (isRunning(c) && workQueue.offer(command)) { + int recheck = ctl.get(); + // 线程池状态异常,尝试从队列移除任务 + if (! isRunning(recheck) && remove(command)) + // 移除成功就拒绝任务 + reject(command); + // 工作线程为0 + else if (workerCountOf(recheck) == 0) + // 直接创建worker执行 + addWorker(null, false); + } + // 队列满了,如果线程数超过maxSize,拒绝任务 + else if (!addWorker(command, false)) + reject(command); +} +``` + +addWorker 方法首先是执行了一堆校验,然后使用 new Worker (firstTask) 新建了 Worker,最后使用 t.start () 执行 Worker,所以 t.start () 会执行到 Worker 的 run 方法上,到runWorker 方法里 + +```java +private boolean addWorker(Runnable firstTask, boolean core) { + retry: + // 校验各种状态 + for (int c = ctl.get();;) { + // Check if queue empty only if necessary. + if (runStateAtLeast(c, SHUTDOWN) + && (runStateAtLeast(c, STOP) + || firstTask != null + || workQueue.isEmpty())) + return false; + for (;;) { + if (workerCountOf(c) + >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK)) + return false; + if (compareAndIncrementWorkerCount(c)) + break retry; + c = ctl.get(); // Re-read ctl + if (runStateAtLeast(c, SHUTDOWN)) + continue retry; + // else CAS failed due to workerCount change; retry inner loop + } + } + boolean workerStarted = false; + boolean workerAdded = false; + Worker w = null; + try { + // 把任务交给worker,此时要执行的任务就已经传入给worker里面的thread了 + w = new Worker(firstTask); + final Thread t = w.thread; + if (t != null) { + final ReentrantLock mainLock = this.mainLock; + mainLock.lock(); + try { + // Recheck while holding lock. + // Back out on ThreadFactory failure or if + // shut down before lock acquired. + int c = ctl.get(); + // 检查线程池状态 + if (isRunning(c) || + (runStateLessThan(c, STOP) && firstTask == null)) { + if (t.getState() != Thread.State.NEW) + throw new IllegalThreadStateException(); + workers.add(w); + workerAdded = true; + int s = workers.size(); + if (s > largestPoolSize) + largestPoolSize = s; + } + } finally { + mainLock.unlock(); + } + if (workerAdded) { + // 启动线程,执行worker + t.start(); + workerStarted = true; + } + } + } finally { + if (! workerStarted) + addWorkerFailed(w); + } + return workerStarted; +} +``` + +```java +final void runWorker(Worker w) { + Thread wt = Thread.currentThread(); + Runnable task = w.firstTask; + w.firstTask = null; + w.unlock(); // allow interrupts + boolean completedAbruptly = true; + try { + // 所以说在这里,在不断地取任务执行 + // 如果要执行的task为空,则会去取一个task,取不到就阻塞 + while (task != null || (task = getTask()) != null) { + // 锁住worker,防止一个任务多个线程执行 + w.lock(); + // 线程池stop了,让线程中断 + if ((runStateAtLeast(ctl.get(), STOP) || + (Thread.interrupted() && + runStateAtLeast(ctl.get(), STOP))) && + !wt.isInterrupted()) + wt.interrupt(); + try { + // 执行前钩子函数 + beforeExecute(wt, task); + try { + // 执行真正的任务 + // 而执行这个任务的线程就是worker里面的thread + task.run(); + // 执行后钩子函数 + afterExecute(task, null); + } catch (Throwable ex) { + // 异常钩子函数 + afterExecute(task, ex); + throw ex; + } + } finally { + task = null; + w.completedTasks++; + w.unlock(); + } + } + completedAbruptly = false; + } finally { + processWorkerExit(w, completedAbruptly); + } +} +``` + +```java +private Runnable getTask() { + // 如果设置了线程超时时间,超过一定时间没有任务,超出coreSize部分的线程会被回收 + boolean timedOut = false; // Did the last poll() time out? + for (;;) { + int c = ctl.get(); + // 检查线程池状态 + if (runStateAtLeast(c, SHUTDOWN) + && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) { + decrementWorkerCount(); + return null; + } + int wc = workerCountOf(c); + boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; + // 如果线程数大于maxSize但是存活时间还没超过keepalive,则跳过后面取任务的部分 + if ((wc > maximumPoolSize || (timed && timedOut)) + && (wc > 1 || workQueue.isEmpty())) { + if (compareAndDecrementWorkerCount(c)) + return null; + continue; + } + try { + // 超过keepAliveTime时间取不到数据就返回,此时线程不再运行,结束了,JVM会回收掉 + Runnable r = timed ? + workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : + workQueue.take(); + if (r != null) + return r; + timedOut = true; + } catch (InterruptedException retry) { + timedOut = false; + } + } +} +``` + +## 应用场景 + +### coreSize == maxSize + +让线程一下子增加到 maxSize,并且不要回收线程,防止线程回收,避免不断增加回收的损耗 + +### maxSize 无界 + SynchronousQueue + +当任务被消费时,才会返回,这样请求就能够知道当前请求是已经在被消费了,如果是其他的队列的话,我们只知道任务已经被提交成功了,但无法知道当前任务是在被消费中,还是正在队列中堆积 + +比较消耗资源,大量请求到来时,我们会新建大量的线程来处理请求 + +### maxSize 有界 + Queue 无界 + +对实时性要求不大,但流量忽高忽低的场景下,可以使用这种方式 + +当流量高峰时,大量的请求被阻塞在队列中,对于请求的实时性难以保证 + +### maxSize 有界 + Queue 有界 + +把队列从无界修改成有界,只要排队的任务在要求的时间内,能够完成任务即可 + +### keepAliveTime 设置无穷大 + +想要空闲的线程不被回收,我们可以设置 keepAliveTime 为无穷大值 + +### 线程池的公用和独立 + +查询和写入不公用线程池,如果公用的话,当查询量很大时,写入的请求可能会到队列中去排队,无法及时被处理 + +原则上来说,每个写入业务场景都独自使用自己的线程池,绝不共用,这样在业务治理、限流、熔断方面都比较容易 +多个查询业务场景是可以公用线程池的 + +## 问题 + +### 对线程池的理解 + +线程池结合了锁、线程、队列等元素,在请求量较大的环境下,可以多线程的处理请求,充分的利用了系统的资源,提高了处理请求的速度 + +### 队列在线程池中起的作用 + +请求数大于 coreSize 时,可以让任务在队列中排队,让线程池中的线程慢慢的消费请求 +当线程消费完所有的线程后,会阻塞的从队列中拿数据,通过队列阻塞的功能,使线程不消亡 +