[[并行程序设计]]

---

[[c++多线程语法-来源菜鸟教程]]

## 简介

### 多线程和多进程

- 多进程编程在多重计算单元 (例如，CPU 的多个核心) 上并行化一个程序，目的是利用冗余的资源，比如不同的 CPU 核心上的寄存器、运算逻辑单元(ALU) 等，以提高计算速度。
- 多线程编程共享硬件资源，比如单个核心或多个核心的缓存和 RAM, 目的是避免空闲的未使用资源。

### **硬件线程（Hardware Thread）**

硬件线程是指**处理器核心内部**能够并行执行的独立指令流。现代 CPU 内部通常包含多个核心，每个核心可以同时处理多个硬件线程。这些硬件线程之间通过共享核心内的资源（如寄存器、执行单元、缓存等）来提高计算效率。

- **区别于软件线程**：软件线程是由操作系统或应用程序管理的，而硬件线程是物理处理器核心内部支持的并行执行能力。
- **调度方式**：硬件线程的调度由 CPU 自行管理，通过在不同线程之间快速切换来保持执行单元的高利用率。

### **超线程（Hyper-Threading）**

超线程（Hyper-Threading, HT）是英特尔公司推出的一种硬件技术，旨在提高处理器核心的并行处理能力。它使每个物理核心能够同时处理多个硬件线程，以提高计算效率和资源利用率。其他厂商也有类似的技术（例如，AMD 的 SMT 技术），但 Hyper-Threading 是最广泛使用的术语。

### 派生和并入线程

**派生线程spawn** 是指在程序执行过程中，创建新的线程来执行特定任务。这个过程通常由主线程（或者父线程）调用线程创建函数完成。新线程在创建时会开始执行一个预定义的任务函数，同时与原线程并行运行。

**并入线程join** 是指等待一个线程完成其任务并将其结果合并到主线程的过程。通常，当一个线程执行完任务后，它会通知创建它的线程自己已经完成工作。主线程会等待所有子线程完成后再继续执行，从而确保所有并行任务的正确执行顺序。

**分派线程detach** 是指将线程分配到处理器核心上执行的过程。操作系统的调度器负责线程分派，它决定了哪个线程在何时被分配到哪个处理器核心上运行。分派线程的机制是多线程系统中性能优化的重要部分。

---

## 处理返回值

### 传统方法

传统的错误处理模型是用函数的返回值保留错误代码。 其他计算得到的量常常通过参数列表中的指针传递，这些参数随后在函数内部的函数体中操作。由线程调用的函数使用类似的方法也是可行的：我们简单传递一个指向结果值的指针，并把计算得到的值写入相关联的地址

传统方法在多线程中的缺陷:

多个线程将有可能在一个已经被其他线程释放的内存上操作，导致段错误。另外，在线程执行期间，我们必须保证不从主线程内修改计算结果的值，避免潜在的竞争条件。结论是，你必须保证线程内操控的对象在线程执行期间仍然存在，并且共享资源上没有数据竞争。

```Mermaid
flowchart TD
    Start(Main函数开始) --> A[定义线程数量num_threads为32]
    A --> B[创建std::vector用于存储线程对象和结果]
    B --> C{循环创建并启动线程}
    C -->|是| D[为每个线程启动fibo计算]
    D --> E[将计算结果存储在results数组中]
    E --> F[继续下一个线程id]
    F --> C
    C -->|否| G{等待所有线程完成}
    G --> H[遍历每个线程，调用join 方法]
    H --> I{打印所有计算结果}
    I --> J[遍历results数组并打印结果]
    J --> End(主函数结束)
```

In [None]:
\#include <iostream>
\#include <cstdint>
\#include <vector>
\#include <thread>

// Fibonacci 函数模板，用于计算 Fibonacci 数
template <typename value_t, typename index_t>
void fibo(value_t n, value_t* result) { // 传递结果的地址
    value_t a_0 = 0;  // Fibonacci(0)
    value_t a_1 = 1;  // Fibonacci(1)
    
    // 循环计算 Fibonacci 数
    for (index_t index = 0; index < n; index++) {
        const value_t tmp = a_0; // 保存当前的 Fibonacci 值
        a_0 = a_1;                // 更新 a_0 为 a_1
        a_1 += tmp;              // 更新 a_1 为前两个数的和
    }
    
    *result = a_0; // 将计算结果写入结果地址
}

// 主函数
int main(int argc, char* argv[]) {
    const uint64_t num_threads = 32; // 线程数量
    std::vector<std::thread> threads; // 存储线程对象的向量

    // 为每个线程分配一个结果值
    std::vector<uint64_t> results(num_threads, 0);
    
    // 创建线程并启动计算
    for (uint64_t id = 0; id < num_threads; id++) {
        threads.emplace_back(
            // 指定模板参数和函数参数
            fibo<uint64_t, uint64_t>, id, &(results[id])
        );
    }
    
    // 等待所有线程完成
    for (auto& thread : threads) {
        thread.join();
    }
    
    // 输出每个线程计算的结果
    for (const auto& result : results) {
        std::cout << result << std::endl; // 打印 Fibonacci 数
    }

    return 0; // 程序正常结束
}

### 使用 promise 和 future的现代方法

C++11 提供了一种专门传递返回值的机制s=(p,f)，旨在适应异步执行的特点。

1. **Promise 和 Future 的绑定关系**：
    - `**promise**` 和 `**future**` 是成对使用的对象。它们之间共享一个状态，在多线程环境中用于数据传递。
    - `**promise**`：可以被视为一个可写的对象，用于设置某个值。它由一个线程来设置这个值，并通知其他线程这个值已经准备好。
    - `**future**`：是与 `**promise**` 绑定的对象，它是状态的一个只读视图。它用于从异步操作中获取值，调用 `**future.get()**` 会阻塞当前线程，直到与其关联的 `**promise**` 设置了值。
2. **状态传递的单次履行**：
    - 一个 `**promise**` 只能完成一次信令，即只能将值设置一次，这就是为什么称它为履行 `**promise**`。
    - 一旦 `**promise**` 被设置为一个特定的值，或者被赋予一个异常，它的状态就被视为履行，而这个状态将无法再次改变。
3. **异步通信机制**：
    - 由于 `**promise**` 和 `**future**` 共享一个状态，并且 `**future**` 是在 `**promise**` 被履行后才能获取到值，所以它们之间存在一种因果依赖关系。
    - 这种依赖关系使得可以在主线程（或者任意一个线程）和它所派生的线程之间进行异步通信。也就是说，一个线程可以在完成计算后，通过设置 `**promise**` 的值来通知其他线程，而其他线程通过 `**future**` 可以在适当的时候获取到这个值。
4. **用作异步机制**：
    - 在多线程编程中，主线程可以启动一个子线程并将一个 `**promise**` 传递给它，然后子线程在执行完毕后，通过 `**promise**` 设置计算结果，主线程则可以通过 `**future**` 获取这个结果。

```Mermaid
graph TD;
    A[Promise组] -->|线程传入promise并使用promise.set_value填充future| B[Future组]
```

In [None]:
\#include <iostream>
\#include <thread>
\#include <future>

// 一个函数，执行一些异步计算
void calculate_square(std::promise<int> p, int value) {
    int result = value * value;
    p.set_value(result);  // 将计算结果设置到 promise 中
}

int main() {
    // 创建一个 promise 和与之关联的 future
    std::promise<int> p;
    std::future<int> f = p.get_future();

    // 启动一个新线程，并将 promise 传递给线程
    std::thread t(calculate_square, std::move(p), 10);

    // 在主线程中等待结果
    int result = f.get();  // 阻塞直到结果准备好
    std::cout << "Square of 10 is: " << result << std::endl;

    t.join();  // 等待线程完成
    return 0;
}

```Mermaid
flowchart TD
    Start(Main函数开始) --> A[定义线程数量num_threads为32]
    A --> B[创建std::vector用于存储线程对象和futures]
    B --> C{循环创建线程并传递promise}
    C -->|是| D[为每个线程创建promise和future]
    D --> E[将promise与线程关联并启动fibo计算]
    E --> F[继续下一个线程id]
    F --> C
    C -->|否| G{从futures中读取结果}
    G --> H[遍历results数组调用future.get]
    H --> I[打印每个线程计算的Fibonacci结果]
    I --> J{等待所有线程完成}
    J --> K[遍历线程数组调用thread.join]
    K --> End(主函数结束)
```

In [None]:
\#include <iostream> // std::cout
\#include <cstdint>  // uint64_t
\#include <vector>   // std::vector
\#include <thread>   // std::thread
\#include <future>   // std::promise/future

template <
    typename value_t,
    typename index_t>
void fibo(
    value_t n,
    std::promise<value_t>&& result) { // 传递 promise
    value_t a_0 = 0; // Fibonacci(0)
    value_t a_1 = 1; // Fibonacci(1)

    // 循环计算 Fibonacci 数
    for (index_t index = 0; index < n; index++) {
        const value_t tmp = a_0; // 保存当前的 Fibonacci 值
        a_0 = a_1;                // 更新 a_0 为 a_1
        a_1 += tmp;              // 更新 a_1 为前两个数的和
    }

    result.set_value(a_0); // 兑现 promise，将结果传递给 future
}

int main(int argc, char* argv[]) {
    const uint64_t num_threads = 32; // 线程数量
    std::vector<std::thread> threads; // 存储线程对象的向量
    std::vector<std::future<uint64_t>> results; // 存储 futures

    // 为每个线程创建 promise 和 associated future
    for (uint64_t id = 0; id < num_threads; id++) {
        std::promise<uint64_t> promise; // 创建 promise
        results.emplace_back(promise.get_future()); // 获取与 promise 关联的 future

        // 启动线程并传递 promise
        threads.emplace_back(
            // 将 promise 移动到新线程中
            fibo<uint64_t, uint64_t>, id, std::move(promise)
        );
    }

    // 从 futures 中读取结果，这里会同步到 promise 被兑现
    for (auto& result : results) {
        std::cout << result.get() << std::endl; // 打印 Fibonacci 数
    }

    // 确保所有线程都已结束
    for (auto& thread : threads) {
        thread.join(); // 等待线程完成
    }

    return 0; // 程序正常结束
}

**通过** `**std::packages_task**`**封装函数(可调用对象)**

`**std::packaged_task**` 是 C++11 中引入的一种封装任务的机制，它允许你将一个可调用对象（如函数、lambda 表达式或函数对象）包装成一个任务，并提供一个 `**std::future**` 对象来获取该任务的返回值。

In [None]:
\#include <future>

// 定义一个 packaged_task
std::packaged_task<ReturnType(Args...)> task(function);

封装函数comp例子

In [None]:
\#include <iostream>
\#include <future>

// 任务函数
bool comp(float value, int64_t threshold) {
    return value < threshold;
}

int main() {
    // 创建一个 packaged_task，封装 comp 函数
    std::packaged_task<bool(float, int64_t)> task(comp);
    
    // 获取与该任务关联的 future
    std::future<bool> future = task.get_future();
    
    // 调用任务
    task(3.14f, 5); // WARNING: this is sequential!
    
    // 访问 future 对象
    std::cout << future.get() << std::endl; // 输出 true 或 false

    return 0;
}

### **模板-任务工厂例子**

创建一个 `**std::packaged_task**`，从而将任意可调用对象（函数、Lambda 表达式等）转换为任务。这个函数还支持任意数量的参数，并且自动推导返回值类型。

In [None]:
\#include <iostream>      // 引入输入输出流库，用于 std::cout 等功能
\#include <future>        // 引入未来（future）和打包任务（packaged_task）相关的库
\#include <functional>    // 引入函数相关的库，用于 std::bind
\#include <utility>       // 引入工具库，用于 std::forward

// 模板函数定义
template <
    typename Func, // 可调用对象的类型（例如，函数、Lambda等）
    typename... Args, // 可变参数模板，表示函数的参数类型，Args... 可以接受任意数量的参数
    // 计算返回值类型，使用 std::result_of 获取 Func(Args...) 的返回类型
    typename Rtrn = typename std::result_of<Func(Args...)>::type 
>
auto make_task( // 创建任务的函数
    Func&& func, // 接收一个可调用对象（函数或 Lambda），使用完美转发（forwarding）
    Args&&... args // 接收可变参数，使用完美转发（forwarding）
) -> std::packaged_task<Rtrn(void)> { // 函数返回类型为 std::packaged_task，参数为 void

    // 使用 std::bind 创建一个辅助函数 aux，该函数无参数并返回 func(args...)
    // std::bind 将 func 和 args 绑定在一起，生成一个新的可调用对象
    auto aux = std::bind(std::forward<Func>(func), std::forward<Args>(args)...);
    
    // 创建一个打包的任务，任务执行 aux(void) 即可
    // std::packaged_task<Rtrn(void)> 接收一个无参数函数，并返回 Rtrn 类型的结果
    auto task = std::packaged_task<Rtrn(void)>(aux);
    
    // 返回打包的任务
    return task; // 未来的返回值通过 task.get_future() 访问
}   

In [None]:
\#include <iostream>
\#include <future>
\#include <functional>

bool add(int a, int b) {
    return a + b > 10; // 只是一个示例逻辑
}

int main() {
    // 使用 make_task 创建一个任务
    auto task = make_task(add, 5, 7); // add(5, 7)
    
    // 获取与该任务关联的 future
    auto future = task.get_future();
    
    // 执行任务
    task(); // 调用任务执行 add(5, 7)
    
    // 获取任务的返回值
    std::cout << "Result: " << future.get() << std::endl; // 输出结果

    return 0;
}

### 异步方式 `std::async`

`**std::async**` 是 C++11 中引入的一种用于处理异步操作的函数模板，属于 `**<future>**` 头文件。它的主要功能是将一个可调用对象（如函数、Lambda 表达式等）放入一个异步任务队列中，并返回一个 `**std::future**` 对象，用于在未来某个时刻获取该任务的结果。

> std::async是为了 让用户的少费点脑子的，它让这三个对象默契的工作。大概的工作过程是这样的：std::async先将异步操作用std::packaged_task包 装起来，然后将异步操作的结果放到std::promise中，这个过程就是创造未来的过程。外面再通过future.get/wait来获取这个未来的 结果

In [None]:
std::future<ReturnType> async(std::launch policy, Callable&& f, Args&&... args);

- `std::launch policy`：指定异步任务的执行策略，通常可以是：
    - `**std::launch::async**`：强制在新线程中执行任务。
    - `**std::launch::deferred**`：延迟执行，直到调用 `**get()**` 或 `**wait()**`。

**由future析构函数导致的隐含同步**

当一个 `**std::future**` 对象被销毁时(也就是程序跳出future的定义域了)，如果它对应的任务还没有完成，析构函数将会阻塞，直到任务完成。这意味着，如果我们在一个循环中创建多个 `**std::future**` 对象，但是不保存它们，这些任务在析构时会被同步化，导致它们按顺序执行，而不是并行执行。  
**避免隐含同步**  
  
防止这类行为发生的唯一途径是把 future 存储在 for 循环的循环体外。导致的结果是多占用内存储存future对象，因此不建议在没有返回值的函数中使用 std::async

```Mermaid
flowchart TD
    Start(Main函数开始) --> A[定义线程数量num_threads为32]
    A --> B[创建std::vector存储future对象的结果]
    B --> C{循环启动异步任务}
    C -->|是| D[使用std::async启动任务]
    D --> E[将future对象存储在results向量中]
    E --> F[继续下一个线程id]
    F --> C
    C -->|否| G{遍历results向量}
    G --> H[对每个future调用get]
    H --> I[打印计算结果]
    I --> J[所有任务完成后结束程序]
    J --> End(主函数结束)
```

In [None]:
\#include <iostream>
\#include <cstdint>
\#include <vector>
\#include <future>

// Fibonacci 函数计算第 n 个斐波那契数
uint64_t fibo(uint64_t n) {
    uint64_t a_0 = 0;
    uint64_t a_1 = 1;
    for (uint64_t index = 0; index < n; index++) {
        uint64_t tmp = a_0; 
        a_0 = a_1; 
        a_1 += tmp;
    }
    return a_0;
}

int main(int argc, char *argv[]) {
    const uint64_t num_threads = 32;  // 线程数量
    std::vector<std::future<uint64_t>> results;  // 存储每个异步任务的 future 对象

    // 启动多个异步任务，每个任务计算一个斐波那契数
    for (uint64_t id = 0; id < num_threads; id++) {
        results.emplace_back(
            std::async(
                std::launch::async, fibo, id  // 启动异步任务
            )
        );
    }

    // 同步所有异步任务，并打印它们的结果
    for (auto& result : results) {
        std::cout << result.get() << std::endl;  // 获取并打印每个 future 的值
    }

    return 0;
}

## 基于静态分发的调度机制  

**归约**（Reduction）是高性能计算（HPC）和并行计算中的一个重要概念，指的是将多个数据元素合并为一个单一结果的过程。这个过程通常涉及到某种形式的操作，比如求和、取最大值、取最小值或计算平均值。

### 串行执行矩阵乘DMV

优化方向: 降低alloc时间

- vector会自动初始化内容, 可以用类封装unit64_t让它不使用默认初始化
- 用动态数组代替向量

In [None]:
\#include <iostream>
\#include <cstdint>
\#include <vector>
\#include <thread>

// 假设你有一个自定义计时器在 hpc_helpers.hpp 中
\#include "hpc_helpers.hpp" // custom timers

// 初始化 A 为下三角矩阵，x 为 0, 1, 2, 3, ... 的连续值
template <typename value_t, typename index_t>
void init(std::vector<value_t>& A, std::vector<value_t>& x, index_t m, index_t n) {
    for (index_t row = 0; row < m; row++) {
        for (index_t col = 0; col < n; col++) {
            A[row * n + col] = row >= col ? 1 : 0; // 下三角矩阵
        }
    }
    for (index_t col = 0; col < n; col++) {
        x[col] = col; // 初始化 x 为连续值
    }
}

// 顺序矩阵向量乘法
template <typename value_t, typename index_t>
void sequential_mult(std::vector<value_t>& A, std::vector<value_t>& x, std::vector<value_t>& b, index_t m, index_t n) {
    for (index_t row = 0; row < m; row++) {
        value_t accum = value_t(0);
        for (index_t col = 0; col < n; col++) {
            accum += A[row * n + col] * x[col];
        }
        b[row] = accum;
    }
}

int main(int argc, char* argv[]) {
    const uint64_t n = 1UL << 15; // 32768
    const uint64_t m = 1UL << 15; // 32768

    TIMERSTART(overall);
    TIMERSTART(alloc);
    
    std::vector<uint64_t> A(m * n);
    std::vector<uint64_t> x(n);
    std::vector<uint64_t> b(m);
    
    TIMERSTOP(alloc);
    
    TIMERSTART(init);
    init(A, x, m, n);
    TIMERSTOP(init);
    
    TIMERSTART(mult);
    sequential_mult(A, x, b, m, n);
    TIMERSTOP(mult);
    
    TIMERSTOP(overall);

    // 检查求和是否正确
    for (uint64_t index = 0; index < m; index++) {
        if (b[index] != index * (index + 1) / 2) {
            std::cout << "error at position " << index << std::endl;
        }
    }

    return 0;
}

In [None]:
\#ifndef HPC_HELPERS_HPP
\#define HPC_HELPERS_HPP

\#include <chrono>
\#include <iostream>

// 定义计时器类
class Timer {
public:
    Timer() : start_time_(std::chrono::high_resolution_clock::now()) {}

    void reset() {
        start_time_ = std::chrono::high_resolution_clock::now();
    }

    double elapsed() const {
        return std::chrono::duration<double>(std::chrono::high_resolution_clock::now() - start_time_).count();
    }

private:
    std::chrono::high_resolution_clock::time_point start_time_;
};

// 定义计时器开始和停止的宏
\#define TIMERSTART(name) Timer name#\#_timer; name##_timer.reset();
\#define TIMERSTOP(name) std::cout << \#name " elapsed time: " << name#\#_timer.elapsed() << " seconds" << std::endl;

\#endif // HPC_HELPERS_HPP

### 线程的区块分发

串行执行DMV时,一个线程执行要处理全部数据,因此考虑将这些处理步骤分发给多个线程以增加处理速度.一个更好的方法是一次派生p个线程，其中每个线程处理m/p行。

lambda 表达式支持闭包（capture），可以直接访问外部作用域中的变量（如 `**A**`, `**x**`, `**b**`, `**m**`, `**n**` 以及 `**num_threads**`），无需将它们作为参数传入。这在多线程编程中很方便，尤其是当你只需要某些局部变量时。  
  
[[lambda表达式是什么 2]]

In [None]:
\#include <iostream>
\#include <vector>
\#include <thread>
\#include <algorithm>

\#define SDIV(x, y) ((x + y - 1) / y) // 向上取整的除法

template <typename value_t, typename index_t>
void block_parallel_mult(
		std::vector<value_t>& A, 
		std::vector<value_t>& x, 
		std::vector<value_t>& b, 
		index_t m, 
		index_t n, 
		index_t num_threads = 8) {
		
    auto block = [&] (const index_t& id) -> void {
        const index_t chunk = SDIV(m, num_threads); // 计算每个线程的任务块大小
        const index_t lower = id * chunk;
        const index_t upper = std::min(lower + chunk, m);

        for (index_t row = lower; row < upper; row++) {
            value_t accum = value_t(0);
            for (index_t col = 0; col < n; col++)
                accum += A[row * n + col] * x[col];
            b[row] = accum;
        }
    };

    std::vector<std::thread> threads;
    for (index_t id = 0; id < num_threads; id++)
        threads.emplace_back(block, id);
    for (auto& thread : threads)
        thread.join();
}

int main() {
    // 示例使用
    const int m = 4; // 行数
    const int n = 4; // 列数
    std::vector<int> A = {1, 2, 3, 4,  5, 6, 7, 8,  9, 10, 11, 12,  13, 14, 15, 16}; // 4x4 矩阵
    std::vector<int> x = {1, 2, 3, 4}; // 向量
    std::vector<int> b(m); // 结果向量

    block_parallel_mult(A, x, b, m, n, 2); // 使用 2 个线程

    // 输出结果
    for (const auto& val : b) {
        std::cout << val << " ";
    }
    std::cout << std::endl;

    return 0;
}

### 线程的循环分发

任务按照固定的步长分配给多个线程，每个线程在同一个循环中间隔地处理一组任务。这种方案与静态区块分发的不同之处在于，任务分布方式更均匀，有助于减少线程之间的负载不平衡问题。

**按步长分配任务**：假设有 _C_ 个任务和 _T_ 个线程，每个线程按 **固定步长** _**T**_ 循环处理任务。这样，第一个线程会处理任务 0, _T_, 2_T_ 等；第二个线程会处理任务 1, _T_+1, 2_T_+1 等，以此类推。

**均匀分配**：通过这种方式，即使每个任务所需计算量略有不同，各线程都能大致均匀地分摊计算量，从而减小了负载不平衡的风险。

In [None]:
\#include <iostream>
\#include <vector>
\#include <thread>

template <typename value_t, typename index_t>
void cyclic_parallel_mult(
    const std::vector<value_t>& A,
    const std::vector<value_t>& x,
    std::vector<value_t>& b,
    index_t m,
    index_t n,
    index_t num_threads = 8) {

    // 定义循环分块的匿名函数
    auto cyclic = [&] (const index_t& id) -> void {
        // 以 num_threads 为步长循环处理行
        for (index_t row = id; row < m; row += num_threads) {
            value_t accum = value_t(0);
            for (index_t col = 0; col < n; col++) {
                accum += A[row * n + col] * x[col];
            }
            b[row] = accum;
        }
    };

    // 创建线程池并分配任务
    std::vector<std::thread> threads;
    for (index_t id = 0; id < num_threads; id++) {
        threads.emplace_back(cyclic, id);
    }

    // 等待所有线程完成
    for (auto& thread : threads) {
        thread.join();
    }
}

int main() {
    const int m = 8; // 矩阵行数
    const int n = 8; // 矩阵列数
    const int num_threads = 4; // 使用的线程数

    // 初始化矩阵 A 和向量 x
    std::vector<uint64_t> A(m * n, 1); // 矩阵元素初始化为 1
    std::vector<uint64_t> x(n, 1);     // 向量元素初始化为 1
    std::vector<uint64_t> b(m, 0);     // 结果向量 b

    // 执行并行矩阵向量乘法
    cyclic_parallel_mult(A, x, b, m, n, num_threads);

    // 输出结果
    for (const auto& val : b) {
        std::cout << val << " ";
    }
    std::cout << std::endl;

    return 0;
}

### 虚假共享

**虚假共享（false sharing）** 是一种性能问题，通常发生在多线程程序中。当多个线程试图同时访问位于同一个 CPU 缓存行内的数据，并且至少一个线程在修改其中的数据时，尽管它们实际上没有访问同一变量，依然会产生额外的缓存同步开销(cache一致性策略)，从而导致性能下降。  
简单来说有两个线程分别操作数组   
`**A**` 的不同元素，但它们位于同一个缓存行中,这时就会发生p1,p2虽然没有数据竞争,但是两者都会在对方更新后立刻更新自己的缓存行,导致性能下降.

具体来说,循环分发的例子中, `accum += A[row * n + col] * x[col];`  
中的accum不能省略,如果直接用  
`b[row]`更新,就会导致 `b[row]`在其他cache行中一直被更新,这种更新是无用的.因此务必要用寄存器缓存中间结果,最后再更新结果

### 线程的块循环分发

数据有m行,有p个线程,每个线程分发了一个包含 c 个任务的固定块，这些任务块由每个线程顺序处理。

- c=1时,退化成循环分发
- c比较小时,可能会导致虚假共享
- 理想的c:数据能够覆盖整个缓存行,例如缓存行为64字节,那么对于64位的数据类型,就设置c=8
- c比较大时,可能导致任务负载不均衡
- c=m/p向上取整时,退化成区块分发

In [None]:
\#include <iostream>
\#include <vector>
\#include <thread>
\#include <algorithm>

template <typename value_t, typename index_t>
void block_cyclic_parallel_mult(
    std::vector<value_t>& A,
    std::vector<value_t>& x,
    std::vector<value_t>& b,
    index_t m,
    index_t n,
    index_t num_threads = 8,
    index_t chunk_size = 64 / sizeof(value_t)) {

    // This function is called by the threads
    auto block_cyclic = [&](const index_t& id) -> void {
        // Precompute offset and stride
        const index_t offset = id * chunk_size;
        const index_t stride = num_threads * chunk_size;

        // For each block of size chunk_size in cyclic order
        for (index_t lower = offset; lower < m; lower += stride) {
            // Compute the upper border of the block (exclusive)
            const index_t upper = std::min(lower + chunk_size, m);

            // For each row in the block
            for (index_t row = lower; row < upper; row++) {
                // Accumulate the contributions
                value_t accum = value_t(0);
                for (index_t col = 0; col < n; col++) {
                    accum += A[row * n + col] * x[col];
                }
                b[row] = accum;
            }
        }
    };

    // Create threads and assign each thread to a block
    std::vector<std::thread> threads;
    for (index_t id = 0; id < num_threads; id++) {
        threads.emplace_back(block_cyclic, id);
    }

    // Wait for all threads to complete
    for (auto& thread : threads) {
        thread.join();
    }
}

## 处理负载不均衡

如果处理一个特定任务划分的时间变化很大，可能会有问题。当一部分线程仍在处理它们对应的任务块，而另一部分线程已经完成了计算任务，这种情况就称为**负载不平衡**。

数据挖掘和机器学习中的大量应用依赖于全对距离信息

**全对距离信息**是指在给定数据集中，计算每一对样本之间的距离，从而得到一个完整的距离矩阵。这种距离矩阵可以用于分析样本之间的相似性或差异性，广泛应用于分类、聚类和其他数据分析任务。

[[全对距离信息-背景知识]]

计算全对距离信息将其保存成距离/相似度矩阵,会发现样本ij的距离和样本ji的距离是重复计算的,也就是距离/相似度矩阵是对称矩阵,因此只要计算矩阵的下三角部分即可完成全对距离信息的计算。

In [None]:
\#include <iostream> // for std::cout
\#include <cstdint> // for uint64_t
\#include <vector> // for std::vector
\#include <thread> // for std::thread (not used yet)

// Assuming hpc_helpers.hpp and binary-IO.hpp contain necessary function definitions
\#include "../include/hpc_helpers.hpp" // for custom timers
\#include "../include/binary-IO.hpp" // for loading binary data
\#include "/home/ikun/cppthread/4.4/static_para_all_pair.hpp"

template <
typename index_t,
typename value_t>
void sequential_all_pairs (
    std::vector<value_t>& mnist,
    std::vector<value_t>& all_pair,
    index_t rows,
    index_t cols) {
    
    // Iterate over all entries below the diagonal (i < j)
    for (index_t i = 0; i < rows; i++) {
        for (index_t I = 0; I <= i; I++) { // I 用于访问样本
            // Compute squared Euclidean distance
            value_t accum = value_t(0);
            for (index_t j = 0; j < cols; j++) { // j 用于访问列
                value_t residue = mnist[i * cols + j] - mnist[I * cols + j]; // 使用 j 作为列索引
                accum += residue * residue; // Accumulate the squared differences
            }
            // Write the distances to the all_pair matrix
            all_pair[i * rows + I] = accum; // Fill distance matrix
            all_pair[I * rows + i] = accum; // Since distance is symmetric
        }
    }
}

int main() {
    // Define data types
    typedef no_init_t<float> value_t; // Custom no_init type for floats
    typedef uint64_t index_t; // Use uint64_t for indices

    // Number of images and pixels
    const index_t rows = 60000; // Number of images
    const index_t cols = 28 * 28; // Number of pixels per image

    // Load MNIST data from a binary file
    TIMERSTART(load_data_from_disk);
    std::vector<value_t> mnist(rows * cols); // Vector to hold MNIST data
    load_binary(mnist.data(), rows * cols, "./data/train-images.idx3-ubyte");
    TIMERSTOP(load_data_from_disk);

    // Compute all-pairs distance matrix
    TIMERSTART(compute_distances);
    std::vector<value_t> all_pair(rows * rows); // Matrix to hold distances
    sequential_all_pairs(mnist, all_pair, rows, cols);
    //parallel_all_pairs(mnist, all_pair, rows, cols);
    TIMERSTOP(compute_distances);

    return 0;
}

编译运行:`g++ -O2 -std=c++14 -pthread all_pair.cpp -o all_pair`

优化分析:

- 对矩阵按行采用静态分发策略

### 静态调度

这个计算任务，每行的计算时间 $T(i)$随行数$i$ 增长，通常以 $T(i)=a⋅i^2$表示，其中 $a$是常数。这意味着：

- **计算时间的平方依赖**：chunk 的总计算时间取决于其内部行数的平方。
- **线性和乘积依赖**：chunk 的计算时间还与 chunk 大小 _c_ 和各行执行时间函数 _$T(i)$_的乘积成线性关系。

因此的得到的结论是: 采用大的chunk会导致负载不均衡.采用较小的chunk,这样可以减小负载不均衡

- 具体代码
    
    ```C++
    template <typename index_t, typename value_t>
    void parallel_all_pairs(
        std::vector<value_t>& mnist,
        std::vector<value_t>& all_pair,
        index_t rows,
        index_t cols,
        index_t num_threads = 64,
        index_t chunk_size = 64 / sizeof(value_t)
    ) {
        // 定义 lambda 表达式用于并行执行
        auto block_cyclic = [&] (const index_t& id) -> void {
            // 预计算每个线程的起始位置和步长
            const index_t off = id * chunk_size;
            const index_t str = num_threads * chunk_size;
    
            // 块循环分发，遍历每个线程分配的任务块
            for (index_t lower = off; lower < rows; lower += str) {
                const index_t upper = std::min(lower + chunk_size, rows);
    
                // 计算所有下三角的对
                for (index_t i = lower; i < upper; i++) {
                    for (index_t I = 0; I <= i; I++) {
                        value_t accum = value_t(0);
                        for (index_t j = 0; j < cols; j++) {
                            value_t residue = mnist[i * cols + j] - mnist[I * cols + j];
                            accum += residue * residue;
                        }
                        all_pair[i * rows + I] = all_pair[I * rows + i] = accum;
                    }
                }
            }
        };
    
        // 启动线程并执行并行计算
        std::vector<std::thread> threads;
        for (index_t id = 0; id < num_threads; id++)
            threads.emplace_back(block_cyclic, id);
        for (auto& thread : threads)
            thread.join();
    };
    ```
    
    ```C++
    // binary-IO.hpp
    \#ifndef BINARY_IO_HPP
    \#define BINARY_IO_HPP
    
    \#include <iostream>
    \#include <fstream>
    \#include <string>
    
    template <typename T>
    void load_binary(T* data, size_t size, const std::string& filename) {
        std::ifstream file(filename, std::ios::binary);
        if (!file) {
            std::cerr << "Error opening file: " << filename << std::endl;
            exit(1);
        }
        
        file.read(reinterpret_cast<char*>(data), size);
        
        if (!file) {
            std::cerr << "Error reading file: " << filename << std::endl;
            std::cerr << "Expected to read " << size * sizeof(T) << " bytes, but read "
                      << file.gcount() << " bytes." << std::endl;
            exit(1);
        }
        
        file.close();
    }
    
    \#endif // BINARY_IO_HPP
    ```
    
    ```C++
    // hpc_helpers.hpp
    \#ifndef HPC_HELPERS_HPP
    \#define HPC_HELPERS_HPP
    
    \#include <chrono>
    \#include <iostream>
    
    \#include <type_traits>
    
    template <class T>
    class no_init_t {
    public:
        // Check whether it is a fundamental numeric type
        static_assert(std::is_fundamental<T>::value && std::is_arithmetic<T>::value,
                      "must be a fundamental, numeric type");
    
        // Do nothing
        constexpr no_init_t() noexcept { /* HERE WE DO NOTHING ! */ }
    
        // Convertible from a T
        constexpr no_init_t(T value) noexcept : v_(value) {}
    
        // Act as a T in all conversion contexts
        constexpr operator T() const noexcept { return v_; }
    
        // Negation operators
        constexpr no_init_t& operator-() noexcept {
            v_ = -v_;
            return *this;
        }
    
        // Increment/Decrement operators
        constexpr no_init_t& operator++() noexcept {
            v_++;
            return *this;
        }
        constexpr no_init_t& operator++(int) noexcept {
            v_++;
            return *this;
        }
        constexpr no_init_t& operator--() noexcept {
            v_--;
            return *this;
        }
        constexpr no_init_t& operator--(int) noexcept {
            v_--;
            return *this;
        }
    
        // Assignment operators
        constexpr no_init_t& operator+=(T v) noexcept {
            v_ += v;
            return *this;
        }
        // More assignment operators...
    
    private:
        T v_;
    };
    
    // Timer macros using chrono
    \#define TIMERSTART(label) auto label#\#_start = std::chrono::high_resolution_clock::now();
    \#define TIMERSTOP(label) \
        auto label#\#_end = std::chrono::high_resolution_clock::now(); \
        std::cout << \#label << ": " \
                  << std::chrono::duration_cast<std::chrono::milliseconds>(label#\#_end - label##_start).count() \
                  << " ms" << std::endl;
    
    \#endif // HPC_HELPERS_HPP
    ```
    

### 动态循环分发

在上面静态循环分发时,我们预先知道每个任务明确的时间依赖函数$T(i)$. 具体来说我们提前知道了计算任务相邻行的计算负载差距不大.如果我们不能在程序开始时就正确地估计任务的运行时间，负载不均衡会变得更糟

**分支定界（branch-and-bound）算法**是一种用于解决组合优化问题的算法框架。它通过构建解的搜索树来有效地寻找最优解。以下是一些关键概念和应用：

1. **分支**：将问题分解成更小的子问题，形成一个搜索树。
2. **界限**：计算每个子问题的上界和下界，以确定该子问题是否有可能产生比当前已知最优解更好的解。
3. **回溯**：如果发现某个子问题的界限不可能比当前最优解好，就可以“剪枝”该子问题，避免进一步的计算。
4. **独立任务**：由于某些任务的执行时间可能变化很大，分支定界算法常常会将它们划分为独立的任务，从而实现并行计算，提高效率。

**动态分发如何实现**

1. **全局索引**：
    - 使用一个全局变量 `**global_lower**` 来跟踪已处理的行的起始位置。每个线程在处理时会读取并更新这个变量，从而获取待处理的行。
2. **互斥锁**：
    - 使用 `**std::mutex**` 和 `**std::lock_guard**` 来保护对 `**global_lower**` 的访问，确保在多个线程同时读取和更新该变量时不会出现数据竞争和不一致的问题。
3. **动态分配任务**：
    - 在线程处理任务时，每个线程通过 `**lower**` 获取需要处理的行区间。每个线程在完成一个块的计算后，立即从 `**global_lower**` 获取新的块，继续处理。这种方式允许线程在执行过程中根据实际工作负载动态获取任务，而不是在一开始就静态分配所有任务。

**动态调度的优点**

1. **负载均衡**：
    - 由于任务是动态分配的，工作量可以根据每个线程的执行时间和能力进行调整。较慢的线程可以获取更多的任务，而较快的线程可以继续处理新的任务，从而更好地平衡负载。
2. **提高资源利用率**：
    - 由于不同的数据块可能会花费不同的时间来处理，动态调度可以确保所有线程都被有效利用，减少了空闲时间。
3. **适应性**：
    - 当数据集的大小、计算复杂度或系统负载变化时，动态调度能更好地适应这些变化，提高整体性能。
4. **减少线程间的静态依赖**：
    - 通过动态分配，线程不需要在开始时知道自己将处理多少数据，这减少了对于任务数量的硬编码，从而提高了代码的灵活性和可维护性。

In [None]:
// dynamic_para_all_pair.hpp
\#include <mutex> // for std::mutex, std::lock_guard

template <typename index_t, typename value_t>
void dynamic_all_pairs(
    std::vector<value_t>& mnist,
    std::vector<value_t>& all_pair,
    index_t rows,
    index_t cols,
    index_t num_threads = 64,
    index_t chunk_size = 64 / sizeof(value_t)) {
    
    std::mutex mutex; // Declare mutex
    index_t global_lower = 0; // Global lower index

    auto dynamic_block_cyclic = [&]() -> void {
        index_t lower = 0;

        // While there are still rows to compute
        while (true) {
            {
                std::lock_guard<std::mutex> lock_guard(mutex); // Lock the mutex
                lower = global_lower;
                if (lower >= rows) break; // Exit if done
                global_lower += chunk_size; // Update global lower index
            } // Release the lock automatically

            // Compute the upper border of the block (exclusive)
            const index_t upper = std::min(lower + chunk_size, rows);

            // For all entries below the diagonal (i ≤ j)
            for (index_t i = lower; i < upper; i++) {
                for (index_t j = 0; j <= i; j++) {
                    // Compute squared Euclidean distance
                    value_t accum = value_t(0);
                    for (index_t k = 0; k < cols; k++) {
                        value_t residue = mnist[i * cols + k] - mnist[j * cols + k];
                        accum += residue * residue;
                    }
                    // Write the distances to the all_pair matrix
                    all_pair[i * rows + j] = accum;
                    all_pair[j * rows + i] = accum; // Symmetric
                }
            }
        }
    };

    // Launch threads
    std::vector<std::thread> threads;
    for (index_t id = 0; id < num_threads; id++) {
        threads.emplace_back(dynamic_block_cyclic);
    }

    // Join threads
    for (auto& thread : threads) {
        thread.join();
    }
}


## 条件变量通知线程

### 基本概念与流程

**竞争-睡眠策略**

“竞争-睡眠”方法是一种用于优化计算资源使用的策略，尤其在并行和多线程计算环境中。

1. **竞争**：
    
    在“竞争”阶段，多个线程同时尝试访问共享资源或执行任务。它们争夺CPU时间、内存和其他计算资源。
    
2. **睡眠**：
    
    一旦线程完成其任务或发现没有可用资源可供使用，它们就进入“睡眠”状态。在此状态下，线程不再消耗CPU资源，而是等待其他事件（例如资源变得可用或任务完成）唤醒
    

**条件变量**

**信号与状态**：

条件变量允许一个线程（或多个线程）在某个条件不满足时进行等待，并在另一个线程改变了这个条件后发送信号。当条件满足时，等待的线程被唤醒，开始执行。

**共享状态变量**：

条件变量通常与一个或多个共享状态变量结合使用。这些变量用来指示当前的工作状态或条件。例如，线程在接收到条件变量的信号时，需要检查这些共享变量来决定是否执行相应的任务。

**虚假唤醒**：

线程在等待条件变量时，有可能会经历虚假唤醒，即线程在没有条件满足的情况下被唤醒。这意味着必须在**循环中**检查条件，以确保在被唤醒后线程的状态是有效的。

In [None]:
\#include <iostream> // for std::cout
\#include <thread>   // for std::thread
\#include <mutex>    // for std::mutex
\#include <chrono>   // for std::chrono
\#include <condition_variable> // for std::condition_variable

using namespace std::chrono_literals;

int main() {
    std::mutex mutex;
    std::condition_variable cv;
    bool time_for_breakfast = false; // globally shared state

    // Function to be called by the thread
    auto student = [&]() -> void {
        { // this is the scope of the lock
            std::unique_lock<std::mutex> unique_lock(mutex);
            // Check the globally shared state
            while (!time_for_breakfast) {//防止虚假唤醒
                // Lock is released during wait
                cv.wait(unique_lock);
            } // lock is finally released
        }
        std::cout << "Time to make some coffee!" << std::endl;
    };

    // Create the waiting thread
    std::thread my_thread(student);

    // Wait for 2 seconds
    std::this_thread::sleep_for(2s);

    { // Prepare the alarm clock
        std::lock_guard<std::mutex> lock_guard(mutex);
        time_for_breakfast = true; // Change the state to signal it's time for breakfast
    } // Here the lock is released

    // Ring the alarm clock
    cv.notify_one();

    // Wait until breakfast is finished
    my_thread.join();

    return 0;
}

In [None]:
\#include <iostream> // for std::cout
\#include <thread>   // for std::thread
\#include <mutex>    // for std::mutex
\#include <chrono>   // for std::this_thread::sleep_for
\#include <condition_variable> // for std::condition_variable

using namespace std::chrono_literals;

int main() {
    std::mutex mutex;
    std::condition_variable cv;
    bool is_ping = true; // globally shared state

    // Ping thread function
    auto ping = [&]() -> void {
        while (true) {
            // Wait to be signaled
            std::unique_lock<std::mutex> unique_lock(mutex);
            cv.wait(unique_lock, [&]() { return is_ping; });//wait 会在每次被唤醒时（无论是被通知还是超时）重新检查第二个参数中的条件。
            // Print "ping" to the command line
            std::this_thread::sleep_for(100ms); // Simulate work
            std::cout << "ping" << std::endl;
            // Alter state and notify other thread
            is_ping = false;
            cv.notify_one();
        }
    };

    // Pong thread function
    auto pong = [&]() -> void {
        while (true) {
            // Wait to be signaled
            std::unique_lock<std::mutex> unique_lock(mutex);
            cv.wait(unique_lock, [&]() { return !is_ping; });
            // Print "pong" to the command line
            std::this_thread::sleep_for(100ms); // Simulate work
            std::cout << "pong" << std::endl;
            // Alter state and notify other thread
            is_ping = true;
            cv.notify_one();
        }
    };

    std::thread ping_thread(ping);
    std::thread pong_thread(pong);

    ping_thread.join();
    pong_thread.join();

    return 0;
}

### 使用 future 和 promise 单发同步

回忆4.2中的部分:promise是可写数据类型,future是只读视角,通过填充promise可以让future的阻塞被释放.从而实现单步通知线程的效果.

**一次通知多个线程**

你可以使用 `**std::shared_future**` 来创建一个可以被多个线程共享的未来对象。

In [None]:
\#include <iostream> // for std::cout
\#include <thread> // for std::thread
\#include <future> // for std::future, std::promise
\#include <chrono> // for std::this_thread::sleep_for

using namespace std::chrono_literals;

int main() {
    // Create promise and get corresponding shared future
    std::promise<void> promise;
    auto shared_future = promise.get_future().share();

    // To be called by one or more threads
    auto students = [&]() -> void {
        // Blocks until promise is fulfilled
        shared_future.get();
        std::cout << "Time to make coffee!" << std::endl;
    };

    // Create waiting threads
    std::thread my_thread0(students);
    std::thread my_thread1(students);

    // Simulate a delay of 2 seconds
    std::this_thread::sleep_for(2s);

    // Notify all waiting threads
    promise.set_value();

    // Wait until both threads finish
    my_thread0.join();
    my_thread1.join();

    return 0;
}

## 隐式可数集合上的并行化

### 隐式可数集合

如果任务的个数不是预先已知的，或者任务数非常大，无法一次性将整个图装入内存，那么需要动态地处理这些任务。

处理任务之间的依赖关系通常需要用到**拓扑排序**（Topological Sort），特别是在有向无环图（DAG）中。这种算法将图的节点按依赖关系排序，使得每个节点的依赖都排在它之前。

### **问题背景与挑战**

在 Web 服务器中，每个请求可能需要耗费不同的处理时间，有时请求的处理时间非常长，而有时请求的处理时间较短。如果为每个请求创建一个新的线程，可能会导致以下问题：

- **线程过多**：每个请求都创建一个新的线程，可能导致系统资源的严重浪费，特别是当请求非常频繁时，系统无法处理如此大量的线程。
- **性能下降**：线程创建和销毁本身是有成本的，频繁创建和销毁线程可能导致上下文切换频繁，从而降低性能。
- **拒绝服务攻击（DOS）**：如果恶意用户向服务器发送大量无效请求，服务器的资源可能被耗尽，导致服务不可用。

因此，为了避免这些问题，需要采用**线程池**，它允许服务器在固定数量的线程中调度任务，这样可以有效控制资源的使用，同时确保服务器能够高效地处理大量请求。

### **线程池方案**

线程池是一种预先创建多个线程，并将任务（例如处理 Web 请求）分配给这些线程的技术。通过这种方式，服务器不需要为每个请求都创建一个新的线程，而是重用线程池中的线程来处理多个任务。

> [!important]
> 
> - **线程池的工作流程**：
>     1. 线程池在初始化时创建一定数量的线程。
>     2. 当有新的请求到达时，线程池中的线程会被分配去处理请求。
>     3. 线程完成任务后，回到线程池中，等待下一个任务。
>     4. 如果线程池中的线程都在忙碌，新的请求会被放入任务队列中，直到有线程空闲。
> - **处理 DOS 攻击**： 为了避免恶意的 DOS 攻击，我们可以为每个任务设置一个**时间戳**，表示请求的最大处理时间。如果任务没有在规定的时间内完成，线程池可以丢弃这个任务，防止它阻塞其他任务的处理。

[[具体代码-gpt]]

### 实现一个线程池

In [None]:
\#ifndef THREADPOOL_HPP
\#define THREADPOOL_HPP

\#include <cstdint>
\#include <future>
\#include <vector>
\#include <queue>
\#include <thread>
\#include <functional>
\#include <mutex>
\#include <condition_variable>

// 线程池类定义
class ThreadPool {
private:
    // 存储线程和任务
    std::vector<std::thread> threads; // 工作线程
    std::queue<std::function<void()>> tasks; // 任务队列

    // 用于线程间同步的原语
    std::mutex mutex; // 互斥锁
    std::condition_variable cv; // 条件变量

    // 线程池的状态
    bool stop_pool; // 指示线程池是否停止
    uint32_t active_threads; // 当前活动线程数量
    const uint32_t capacity; // 线程池的容量

In [None]:
template <
    typename Func,
    typename... Args,
    typename Rtrn = typename std::result_of<Func(Args...)>::type>
auto make_task(Func&& func, Args&&... args) -> std::packaged_task<Rtrn(void)> {
    auto aux = std::bind(std::forward<Func>(func), std::forward<Args>(args)...);
    return std::packaged_task<Rtrn(void)>(aux);
}

// 任务执行前的钩子
void before_task_hook() {
    active_threads++; 
}

// 任务执行后的钩子
void after_task_hook() {
    active_threads--; 
}

- 代码解释
    
    ### **1.** `**make_task**` **函数模板**
    
    ```C++
    template <
        typename Func,
        typename... Args,
        typename Rtrn = typename std::result_of<Func(Args...)>::type>
    auto make_task(Func&& func, Args&&... args) -> std::packaged_task<Rtrn(void)> {
    ```
    
    - **模板参数**：
        - `**Func**` 是函数类型，表示需要执行的任务。
        - `**Args...**` 是参数包，表示传递给 `**func**` 的参数类型。
        - `**Rtrn**` 是 `**func**` 执行后的返回类型，由 `**std::result_of<Func(Args...)>::type**` 推导得出。
            - `**std::result_of**` 是 C++11 中的模板工具，可以根据函数类型 `**Func**` 和参数类型 `**Args...**` 推导出函数的返回类型。
    - **函数参数**：
        - `**Func&& func**` 是一个右值引用，用于传递可调用对象。
        - `**Args&&... args**` 是参数包右值引用，用于传递可调用对象的参数，支持完美转发。
    - **返回类型**：
        - `**std::packaged_task<Rtrn(void)>**`，表示返回一个封装了 `**func**` 和 `**args...**` 的 `**std::packaged_task**` 对象。`**std::packaged_task**` 是一个封装异步任务的类，允许在执行完任务后获取其结果。
    
    ### **2.** `**aux**` **中间变量**
    
    ```C++
    auto aux = std::bind(std::forward<Func>(func), std::forward<Args>(args)...);
    ```
    
    - `**std::bind**` 会将 `**func**` 和传递的 `**args...**` 参数绑定在一起，返回一个可调用对象（`**aux**`）。
    - 使用 `**std::forward**` 进行完美转发，确保参数按照传入的类型进行绑定（左值保持左值，右值保持右值）。
    - `**aux**` 是一个无参的可调用对象，在 `**std::packaged_task**` 中使用时，它会自动调用 `**func(args...)**`。
    
    ### **3. 创建并返回** `**std::packaged_task**`
    
    ```C++
    return std::packaged_task<Rtrn(void)>(aux);
    ```
    
    - 使用 `**aux**` 初始化一个 `**std::packaged_task**` 对象。
    - `**std::packaged_task<Rtrn(void)>**` 表示这个任务没有参数（`**void**`），返回类型为 `**Rtrn**`。
    - 这个 `**std::packaged_task**` 对象封装了任务 `**func(args...)**` 的执行，允许异步获取执行结果。


In [None]:
public:
    ThreadPool(uint64_t capacity)
        : stop_pool(false), active_threads(0), capacity(capacity) 
    {
        // 工作线程的循环函数
        auto wait_loop = [this]() -> void {
            while (true) {
                // 占位符任务
                std::function<void(void)> task;
                
                // 进入锁定区域，等待任务或停止信号
                {
                    std::unique_lock<std::mutex> unique_lock(mutex);
                    auto predicate = [this]() -> bool {
                        // 唤醒条件：线程池停止或任务队列不为空
                        return stop_pool || !tasks.empty();
                    };

                    // 等待唤醒，直到满足唤醒条件
                    cv.wait(unique_lock, predicate);

                    // 如果线程池停止且没有任务可执行，退出线程
                    if (stop_pool && tasks.empty()) return;

                    // 从任务队列中取出一个任务
                    task = std::move(tasks.front());
                    tasks.pop();

                    // 任务计数增加
                    before_task_hook();
                } // 离开锁定区域，允许其他线程访问任务队列

                // 执行任务
                task();

                // 任务完成后，调整活动线程计数
                {
                    std::lock_guard<std::mutex> lock_guard(mutex);
                    after_task_hook();
                }
            }
        };
        // 创建并启动指定数量的线程
        for (uint64_t id = 0; id < capacity; id++) {
            threads.emplace_back(wait_loop);
        }
    }     

In [None]:
~ThreadPool() {
    {
        // 加锁以确保线程池的安全访问
        std::lock_guard<std::mutex> lock_guard(mutex);
        // 修改线程池状态以停止所有线程
        stop_pool = true;
    } // 这里释放锁

    // 通知所有等待的线程停止
    cv.notify_all();

    // 等待所有线程完成工作并退出
    for (auto& thread : threads) {
        if (thread.joinable()) {
            thread.join(); // 确保线程安全退出
        }
    }
}

In [None]:
template <
		typename Func, 
		typename... Args, 
		typename Rtrn = typename std::result_of<Func(Args...)>::type>
auto enqueue(Func&& func, Args&&... args) -> std::future<Rtrn> {
    // 创建任务并获取任务的 future
    auto task = make_task(func,args...);
    auto future = task.get_future();
    
    // 将任务封装到 shared_ptr 中，便于复制到任务队列
    auto task_ptr = std::make_shared<std::packaged_task<Rtrn()>>(std::move(task));

    {
        // 加锁以确保线程安全
        std::lock_guard<std::mutex> lock_guard(mutex);

        // 如果线程池已经停止，禁止添加新任务
        if (stop_pool) {
            throw std::runtime_error("enqueue on stopped ThreadPool");
        }

        // 将任务包装成 void 函数对象，便于存储到任务队列中
        auto payload = [task_ptr]() -> void {
            (*task_ptr)(); // 调用任务
        };

        // 将任务添加到任务队列
        tasks.emplace(payload);
    }

    // 通知一个线程去处理任务
    cv.notify_one();

    // 返回 future 以便获取任务结果
    return future;
}

};// 类的括号闭合
\#endif

---

[[并行程序设计]]