# Python 并发编程笔记

本笔记系统整理 Python 并发相关核心概念与实践：**进程 (multiprocessing)、线程 (threading)、协程 (asyncio)**，以及 **同步 vs 异步、阻塞 vs 非阻塞**、GIL、适用场景、常见原语、性能与陷阱、调试与优化策略。全部示例使用标准库，便于直接运行。

> 快速结论：
> - CPU 密集：优先多进程 (绕过 GIL) 或 C 扩展
> - IO 密集：线程池 或 asyncio (大量并发连接/等待)
> - 简单并行 map：concurrent.futures (统一接口)
> - 大量高并发、少阻塞：asyncio；需要生态 (HTTP/DB) 时选支持异步的库
> - 混合场景：主结构 asyncio，CPU 子任务 run_in_executor


## 1. 并发 vs 并行 vs 异步 概念速览
- 并发 (Concurrency)：逻辑上交错执行 (调度切换)
- 并行 (Parallelism)：物理上同时执行 (多核)
- 同步：调用发起后等待完成再继续
- 异步：调用发起立即返回，通过回调/未来对象/事件循环获取结果
- 阻塞：当前执行流无法继续 (等待 IO/锁)
- 非阻塞：立即返回 (可能只返回状态/需轮询)

Python 解释器层面：GIL (Global Interpreter Lock) 在 **CPython** 中保证同一时刻只有一个线程执行字节码，影响多线程 CPU 绑定任务，但 IO 密集不显著。


## 2. GIL 是什么 & 影响
- 目的：简化内存管理 (对象引用计数)
- 后果：多线程无法在单进程内真正利用多核进行 Python 计算 (CPU 密集)
- IO 密集：线程等待 IO 释放 GIL，其他线程可继续
- 规避：多进程、C 扩展、NumPy (内部释放 GIL)、PyPy (部分差异)、或用异步 IO
- 误区：
  - "多线程一定更快" (CPU 密集反而慢)
  - "协程能提高 CPU 计算速度" (协程主要减少 IO 等待切换开销)


## 3. threading 基础示例
适合：IO 密集，少量并发 (<1000)，代码简单。
风险：共享状态竞争、死锁。使用 Lock/RLock。

In [1]:
import threading, time
counter = 0
lock = threading.Lock()

def unsafe_inc(n):
    global counter
    for _ in range(n):
        # 演示竞争：不加锁可能丢失更新
        tmp = counter
        tmp += 1
        counter = tmp

def safe_inc(n):
    global counter
    for _ in range(n):
        with lock:
            counter += 1

def run(func, thread_count=10, each=10000):
    global counter
    counter = 0
    threads = [ threading.Thread(target=func, args=(each,)) for _ in range(thread_count) ]
    start = time.perf_counter()
    for t in threads: t.start()
    for t in threads: t.join()
    elapsed = time.perf_counter() - start
    print(func.__name__, 'counter=', counter, 'elapsed=', round(elapsed, 4))

run(unsafe_inc)  # 理论应=100000
run(safe_inc)    # 正确值=100000

unsafe_inc counter= 100000 elapsed= 0.0093
safe_inc counter= 100000 elapsed= 0.0209


### 3.1 常用同步原语
- Lock / RLock：互斥 (RLock 可重入)
- Semaphore：限制并发数量 (资源池)
- Event：线程间信号触发
- Condition：等待特定条件，配合锁
- Queue：线程安全队列 (推荐传递工作)


In [2]:
import threading, queue, time
q = queue.Queue()
STOP = object()

def producer():
    for i in range(5):
        q.put(i)
    q.put(STOP)

def consumer():
    while True:
        item = q.get()
        if item is STOP:
            break
        print('consume', item)
        time.sleep(0.1)

th1 = threading.Thread(target=producer)
th2 = threading.Thread(target=consumer)
th1.start(); th2.start(); th1.join(); th2.join()


consume 0
consume 1
consume 2
consume 3
consume 4


## 4. multiprocessing 基础示例
适合：CPU 密集或需要真正多核。
注意：Windows 必须放在 `if __name__ == '__main__':` 下，否则无法安全 spawn 进程。
通信：Queue / Pipe / Manager / shared memory。开销：进程启动与序列化 (pickle)。


In [None]:
from multiprocessing import Process, Queue, cpu_count
import math, time

def heavy(n):
    # 计算密集：近似求 pi 的一部分
    s = 0.0
    for k in range(n):
        s += 4.0 * (-1)**k / (2*k + 1)
    return s

def worker(n, q):
    q.put(heavy(n))

def run_mp():
    q = Queue()
    tasks = [2000000] * cpu_count()
    ps = [Process(target=worker, args=(t, q)) for t in tasks]
    start = time.perf_counter()
    [p.start() for p in ps]
    results = [q.get() for _ in ps]
    [p.join() for p in ps]
    print('multi-process took', round(time.perf_counter() - start, 2), 'results sample', results[0])

if __name__ == '__main__':
    run_mp()


### 4.1 concurrent.futures 统一接口
提供 ThreadPoolExecutor / ProcessPoolExecutor：submit / map / as_completed 简化使用。

In [None]:
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
import time

def io_task(x):
    time.sleep(0.2)
    return x * 2

def cpu_task(x):
    s = 0
    for i in range(200000):
        s += i * x
    return s

def demo_futures():
    with ThreadPoolExecutor(max_workers=5) as tp:
        futures = [tp.submit(io_task, i) for i in range(10)]
        for f in as_completed(futures):
            print('thread result', f.result())
    with ProcessPoolExecutor() as pp:
        for res in pp.map(cpu_task, range(5)):
            print('process result sample', res)

demo_futures()


## 5. asyncio 协程与事件循环
场景：海量并发 IO (网络、磁盘、sleep)；减少线程上下文切换和内存。核心：事件循环 + Task + Future。
协程：`async def` 定义；切换点：`await`。
调度：单线程内可挂起与恢复。
网络库：aiohttp、httpx(异步)、asyncpg、aioredis 等。

In [None]:
import asyncio, time

async def fake_io(idx):
    await asyncio.sleep(0.2)
    return idx * 10

async def main():
    start = time.perf_counter()
    tasks = [asyncio.create_task(fake_io(i)) for i in range(10)]
    results = await asyncio.gather(*tasks)
    print('async results', results)
    print('elapsed', round(time.perf_counter() - start, 3))

asyncio.run(main())


### 5.1 使用 Semaphore 控制并发数量


In [None]:
import asyncio, random
sem = asyncio.Semaphore(3)  # 限流

async def limited_task(i):
    async with sem:
        await asyncio.sleep(random.uniform(0.1,0.3))
        print('done', i)

async def run_all():
    await asyncio.gather(*[limited_task(i) for i in range(10)])

asyncio.run(run_all())


### 5.2 在 asyncio 中调用阻塞 CPU 任务
使用 `loop.run_in_executor` 或 `asyncio.to_thread` (3.9+) 避免阻塞事件循环。

In [None]:
import asyncio, math
def cpu_blocking(n):
    s = 0
    for i in range(n):
        s += math.sqrt(i)
    return s

async def wrapper():
    # 将阻塞函数移动到线程池 (默认)
    result = await asyncio.to_thread(cpu_blocking, 100000)
    print('cpu result', round(result, 2))

asyncio.run(wrapper())


## 6. 同步原语对比 (线程 vs 协程)
| 原语 | threading | asyncio | 说明 |
|------|----------|---------|------|
| Lock | Lock/RLock | `asyncio.Lock` | 协程锁需 `async with` |
| Semaphore | Semaphore | `asyncio.Semaphore` | 控制并发 |
| Event | Event | `asyncio.Event` | 协程间信号 |
| Queue | queue.Queue | `asyncio.Queue` | 非阻塞 await |
| Condition | Condition | `asyncio.Condition` | 复杂等待 |


## 7. CPU vs IO 场景对比简易基准 (示意)
避免实际执行时间过长，下面缩小规模示例：

In [None]:
import time, threading, asyncio

def fake_io_block(n):
    time.sleep(0.1)
    return n

def thread_test():
    start = time.perf_counter()
    res = []
    def run(n): res.append(fake_io_block(n))
    ts = [threading.Thread(target=run, args=(i,)) for i in range(20)]
    [t.start() for t in ts]; [t.join() for t in ts]
    print('threads elapsed', round(time.perf_counter() - start, 3))

async def async_test():
    start = time.perf_counter()
    async def aio(n):
        await asyncio.sleep(0.1)
        return n
    await asyncio.gather(*[aio(i) for i in range(20)])
    print('async elapsed', round(time.perf_counter() - start, 3))

thread_test()
asyncio.run(async_test())


## 8. 常见陷阱
- 死锁：锁嵌套顺序不一致 -> 避免循环等待
- 竞争条件：共享变量未保护 -> 使用锁或队列
- 资源泄漏：线程/进程不 join -> 主进程退出混乱
- 过度线程：几千阻塞线程 -> 内存与调度开销高
- 协程中调用阻塞 IO (如 requests)：阻塞事件循环 -> 换异步库或 run_in_executor
- multiprocessing 在 Windows 未加   护 -> 无限递归创建进程
- 大对象频繁跨进程传递 -> 序列化瓶颈
- asyncio 忘记 `await` -> 协程对象未执行
- 使用 time.sleep() 代替 asyncio.sleep() -> 阻塞 loop


## 9. 调试与诊断
- threading.enumerate() 查看活跃线程
- faulthandler / logging 跟踪死锁位置
- asyncio.run() 外层只调用一次，内部使用 `create_task` 管理
- 使用 `asyncio.Task.get_coro()` 查看任务来源 (3.10+)
- profile：cProfile + snakeviz；IO 分析用系统工具 (netstat, lsof)
- 超时：`asyncio.wait_for` / futures.result(timeout=) 防止永久挂起


## 10. 第三方生态简述 (不深入示例)
| 类别 | 库 | 特点 |
|------|----|------|
| 异步 HTTP | aiohttp / httpx | 客户端/服务端支持 |
| 异步 DB | asyncpg / aiomysql / sqlalchemy(async) | 高并发数据库访问 |
| 协程框架 | trio / curio | 不同调度模型，结构化并发 |
| 协程补丁 | gevent / eventlet | 通过 monkey patch 让阻塞 IO 变协作式 |
| 分布式任务 | Celery / RQ / Dramatiq | 进程池 + 队列异步执行 |


## 11. 选择策略速查
| 场景 | 推荐 | 原因 |
|------|------|------|
| CPU 密集 (计算/压缩) | multiprocessing / C 扩展 | 多核 / 释放 GIL |
| IO 密集 少量并发 | ThreadPoolExecutor | 简单、API 友好 |
| IO 密集 海量并发 | asyncio | 单线程调度 + 低开销 |
| 简单批量 map | concurrent.futures.map | 最少样板代码 |
| 混合 (网络 + CPU) | asyncio + run_in_executor | 分离 IO/CPU |
| 多进程通信 | multiprocessing.Queue/Manager | 共享/传递数据 |
| 需要取消任务 | asyncio (Task.cancel) | 协程取消语义完备 |


## 12. 总结
- 明确瓶颈类型：先 profile 再选工具
- 线程适度使用，避免过量阻塞堆积
- 协程不是魔法：只加速大量等待；要使用异步库栈
- CPU 重任务不要强行线程/协程，选进程或本地优化 (NumPy/向量化)
- 正确使用同步原语，优先消息传递 (Queue) 替代共享变量
- 设计可取消、可超时的并发任务，避免僵尸运行
- 渐进迁移：先抽象接口，再替换执行模型 (同步->异步)
