# Python Generator（生成器）实践指南

Generator 是 Python 的内存高效迭代工具，使用 `yield` 关键字**惰性生成**数据，而非一次性加载到内存。

## 1. 基础对比：列表 vs 生成器

In [1]:
# 列表：一次性生成所有数据，占用内存
def squares_list(n):
    return [i**2 for i in range(n)]

# 生成器：按需生成，内存占用 O(1)
def squares_gen(n):
    for i in range(n):
        yield i**2

# 比较内存占用
import sys
n = 1_000_000
print(f"列表占用: {sys.getsizeof(squares_list(n)) / 1024:.1f} KB")
print(f"生成器占用: {sys.getsizeof(squares_gen(n)) / 1024:.1f} KB")

列表占用: 8250.7 KB
生成器占用: 0.2 KB


## 2. 生成器表达式（Generator Expression）

In [2]:
# 列表推导式
squares_list = [x**2 for x in range(5)]
print(f"列表: {squares_list}")  # [0, 1, 4, 9, 16]

# 生成器表达式（用圆括号）
squares_gen = (x**2 for x in range(5))
print(f"生成器对象: {squares_gen}")  # <generator object>
print(f"逐个生成: {list(squares_gen)}")  # [0, 1, 4, 9, 16]

列表: [0, 1, 4, 9, 16]
生成器对象: <generator object <genexpr> at 0x1065a1e50>
逐个生成: [0, 1, 4, 9, 16]


## 3. 实践案例：批量数据处理

In [3]:
def read_large_file(filepath):
    """逐行读取大文件（避免一次性加载）"""
    with open(filepath) as f:
        for line in f:
            yield line.strip()

def batch_generator(iterable, batch_size):
    """将数据流分批（类似 DataLoader）"""
    batch = []
    for item in iterable:
        batch.append(item)
        if len(batch) == batch_size:
            yield batch
            batch = []
    if batch:  # 处理最后不满一批的数据
        yield batch

# 模拟大数据集处理
data_stream = (x for x in range(25))  # 模拟 25 条数据
for batch in batch_generator(data_stream, batch_size=8):
    print(f"处理批次: {batch}")

处理批次: [0, 1, 2, 3, 4, 5, 6, 7]
处理批次: [8, 9, 10, 11, 12, 13, 14, 15]
处理批次: [16, 17, 18, 19, 20, 21, 22, 23]
处理批次: [24]


## 4. 高级技巧：双向通信（send & yield）

In [4]:
def running_average():
    """生成器计算滑动平均（接收外部输入）"""
    total = 0
    count = 0
    avg = None
    while True:
        value = yield avg  # 返回当前平均值，接收新值
        total += value
        count += 1
        avg = total / count

# 使用示例
avg_gen = running_average()
next(avg_gen)  # 启动生成器
print(f"输入 10: {avg_gen.send(10)}")  # 10.0
print(f"输入 20: {avg_gen.send(20)}")  # 15.0
print(f"输入 30: {avg_gen.send(30)}")  # 20.0

输入 10: 10.0
输入 20: 15.0
输入 30: 20.0


## 5. 核心要点总结

| 特性 | 列表 | 生成器 |
|------|------|--------|
| **内存占用** | O(n) | O(1) |
| **计算时机** | 立即全部计算 | 按需惰性计算 |
| **可重复迭代** | ✅ 是 | ❌ 否（一次性消耗）|
| **适用场景** | 小数据集、需多次访问 | 大数据流、管道处理 |

**关键语法**：
- `yield` — 暂停函数并返回值，下次从暂停处继续
- `next(gen)` — 手动获取下一个值
- `gen.send(value)` — 向生成器发送数据（双向通信）
- `(expr for x in iterable)` — 生成器表达式（类似列表推导式）

# PyTorch 多进程并行（torch.multiprocessing）实践指南

`torch.multiprocessing` 是 PyTorch 对 Python `multiprocessing` 的封装，提供了**跨进程共享 Tensor** 的能力。在数据加载（DataLoader）和分布式训练中广泛使用。

**核心特性**：
- 与 Python `multiprocessing` API 兼容
- 支持 CUDA Tensor 跨进程共享（通过共享内存）
- 提供 `spawn` 启动方式（避免 CUDA 上下文 fork 问题）

## Task 1: 进程池基础 — 并行打印与进程标识

**注意**：Jupyter notebook 无法直接运行 `multiprocessing`（序列化限制）。我们使用 `%%writefile` 创建 .py 脚本运行。

In [7]:
%%writefile task1_multiprocessing.py
import torch.multiprocessing as mp
import os
import time

def worker_print(worker_id):
    """每个 worker 打印 1-5，并显示进程 PID"""
    pid = os.getpid()
    for i in range(1, 6):
        print(f"[Worker {worker_id}] PID={pid} | 计数={i}")
        time.sleep(0.1)  # 模拟耗时操作

if __name__ == '__main__':
    # 创建并行度为 3 的进程池
    num_workers = 3
    processes = []
    
    print(f"主进程 PID={os.getpid()}\n")
    
    # 启动 3 个子进程
    for i in range(num_workers):
        p = mp.Process(target=worker_print, args=(i,))
        p.start()
        processes.append(p)
    
    # 等待所有进程完成
    for p in processes:
        p.join()
    
    print("\n所有 worker 执行完毕！")

Writing task1_multiprocessing.py


In [8]:
# 运行 Task 1
!python task1_multiprocessing.py

  import pynvml  # type: ignore[import]
主进程 PID=23520

  import pynvml  # type: ignore[import]
  import pynvml  # type: ignore[import]
  import pynvml  # type: ignore[import]
[Worker 1] PID=23552 | 计数=1
[Worker 2] PID=23553 | 计数=1
[Worker 0] PID=23551 | 计数=1
[Worker 0] PID=23551 | 计数=2[Worker 1] PID=23552 | 计数=2[Worker 2] PID=23553 | 计数=2


[Worker 0] PID=23551 | 计数=3
[Worker 1] PID=23552 | 计数=3
[Worker 2] PID=23553 | 计数=3
[Worker 0] PID=23551 | 计数=4
[Worker 1] PID=23552 | 计数=4
[Worker 2] PID=23553 | 计数=4
[Worker 1] PID=23552 | 计数=5
[Worker 0] PID=23551 | 计数=5
[Worker 2] PID=23553 | 计数=5

所有 worker 执行完毕！


**关键要点**：
- `mp.Process(target=func, args=())` — 创建子进程
- `p.start()` — 启动进程（异步执行）
- `p.join()` — 主进程等待子进程结束
- `os.getpid()` — 获取当前进程 PID

**Jupyter 限制说明**：
- `multiprocessing` 使用 `pickle` 序列化函数传递给子进程
- Jupyter notebook 中定义的函数无法被序列化（`__main__` 模块问题）
- 解决方案：使用 `%%writefile` 创建独立 .py 文件，然后用 `!python` 运行

## Task 2: 进程池通信 — 使用 Queue 传递计算结果

同样使用 `%%writefile` 创建脚本，演示主进程与 worker 之间的双向通信。

In [10]:
%%writefile task2_multiprocessing.py
import torch.multiprocessing as mp
import os
import time

def worker_compute(worker_id, task_queue, result_queue):
    """从任务队列获取数据，计算后将结果放入结果队列"""
    pid = os.getpid()
    print(f"[Worker {worker_id}] PID={pid} 启动")
    
    while True:
        try:
            # 从任务队列获取任务（超时 1 秒）
            task = task_queue.get(timeout=1)
            if task is None:  # 收到结束信号
                print(f"[Worker {worker_id}] 收到结束信号，退出")
                break
            
            # 执行计算任务
            x = task
            result = x ** 2
            time.sleep(0.2)  # 模拟耗时计算
            
            # 将结果放入结果队列
            result_queue.put({
                'worker_id': worker_id,
                'pid': pid,
                'input': x,
                'output': result
            })
            print(f"[Worker {worker_id}] 完成任务: {x}^2 = {result}")
            
        except:
            break  # 队列为空，退出

if __name__ == '__main__':
    # 创建任务队列和结果队列
    task_queue = mp.Queue()
    result_queue = mp.Queue()
    
    # 准备 10 个任务
    tasks = list(range(1, 11))
    for task in tasks:
        task_queue.put(task)
    
    # 启动 3 个 worker 进程
    num_workers = 3
    processes = []
    
    print(f"主进程 PID={os.getpid()}\n")
    
    for i in range(num_workers):
        p = mp.Process(target=worker_compute, args=(i, task_queue, result_queue))
        p.start()
        processes.append(p)
    
    # 发送结束信号（每个 worker 一个 None）
    for _ in range(num_workers):
        task_queue.put(None)
    
    # 等待所有进程完成
    for p in processes:
        p.join()
    
    # 从结果队列收集所有结果
    print("\n主进程收集结果：")
    results = []
    while not result_queue.empty():
        result = result_queue.get()
        results.append(result)
        print(f"  Worker {result['worker_id']} (PID={result['pid']}): "
              f"{result['input']}^2 = {result['output']}")
    
    print(f"\n共完成 {len(results)} 个任务")

Overwriting task2_multiprocessing.py


In [None]:
# 运行 Task 2
!python task2_multiprocessing.py

## Task 3: 流水线并行 — 专用收集 Worker 实现异步处理

**改进点**：Task 2 中主进程需要等待所有计算完成才能收集结果。Task 3 创建专用的收集 worker，实现**计算与收集的流水线并行**。

In [12]:
%%writefile task3_pipeline_multiprocessing.py
import torch.multiprocessing as mp
import os
import time

def worker_compute(worker_id, task_queue, result_queue):
    """计算 worker：从任务队列获取任务，计算后发送到结果队列"""
    pid = os.getpid()
    print(f"[计算Worker {worker_id}] PID={pid} 启动")
    
    while True:
        try:
            task = task_queue.get(timeout=1)
            if task is None:  # 收到结束信号
                print(f"[计算Worker {worker_id}] 收到结束信号，退出")
                break
            
            # 执行耗时计算
            x = task
            result = x ** 2
            time.sleep(0.3)  # 模拟耗时计算
            
            # 立即将结果发送到结果队列（无需等待其他任务）
            result_queue.put({
                'worker_id': worker_id,
                'pid': pid,
                'input': x,
                'output': result
            })
            print(f"[计算Worker {worker_id}] 完成 {x}^2 = {result}，已发送到结果队列")
            
        except:
            break

def worker_collect(result_queue, num_tasks):
    """
    收集 worker：专门负责从结果队列收集结果并处理
    
    Args:
        result_queue: 结果队列
        num_tasks: 预期要收集的任务总数
    """
    pid = os.getpid()
    print(f"[收集Worker] PID={pid} 启动，预期收集 {num_tasks} 个结果\n")
    
    collected = []
    for i in range(num_tasks):
        # 从结果队列获取结果（阻塞等待）
        result = result_queue.get()
        collected.append(result)
        
        # 实时处理结果（例如：保存、统计、可视化等）
        print(f"[收集Worker] 收到第 {i+1}/{num_tasks} 个结果: "
              f"Worker{result['worker_id']} 计算 {result['input']}^2 = {result['output']}")
        time.sleep(0.1)  # 模拟结果处理耗时
    
    print(f"\n[收集Worker] 所有 {len(collected)} 个结果收集完毕！")
    print("=" * 50)
    print("汇总结果：")
    for r in collected:
        print(f"  {r['input']}^2 = {r['output']} (来自 Worker{r['worker_id']}, PID={r['pid']})")

if __name__ == '__main__':
    # 创建队列
    task_queue = mp.Queue()
    result_queue = mp.Queue()
    
    # 准备 10 个任务
    tasks = list(range(1, 11))
    for task in tasks:
        task_queue.put(task)
    
    num_compute_workers = 3
    num_tasks = len(tasks)
    
    print(f"主进程 PID={os.getpid()}")
    print(f"启动 {num_compute_workers} 个计算 worker + 1 个收集 worker\n")
    print("=" * 50)
    
    # 1. 启动收集 worker（与计算 worker 并行运行）
    collector = mp.Process(target=worker_collect, args=(result_queue, num_tasks))
    collector.start()
    
    # 2. 启动计算 workers
    compute_processes = []
    for i in range(num_compute_workers):
        p = mp.Process(target=worker_compute, args=(i, task_queue, result_queue))
        p.start()
        compute_processes.append(p)
    
    # 3. 发送结束信号给计算 workers
    for _ in range(num_compute_workers):
        task_queue.put(None)
    
    # 4. 等待所有进程完成
    for p in compute_processes:
        p.join()
    print("\n所有计算 worker 已退出")
    
    collector.join()
    print("收集 worker 已退出")
    
    print("\n主进程完成！")

Writing task3_pipeline_multiprocessing.py


In [13]:
# 运行 Task 3
!python task3_pipeline_multiprocessing.py

  import pynvml  # type: ignore[import]
主进程 PID=25553
启动 3 个计算 worker + 1 个收集 worker

  import pynvml  # type: ignore[import]
  import pynvml  # type: ignore[import]
  import pynvml  # type: ignore[import]
  import pynvml  # type: ignore[import]
[计算Worker 0] PID=25561 启动
[收集Worker] PID=25560 启动，预期收集 10 个结果

[计算Worker 2] PID=25563 启动
[计算Worker 1] PID=25562 启动
[计算Worker 2] 完成 2^2 = 4，已发送到结果队列[计算Worker 0] 完成 1^2 = 1，已发送到结果队列

[收集Worker] 收到第 1/10 个结果: Worker2 计算 2^2 = 4
[计算Worker 1] 完成 3^2 = 9，已发送到结果队列
[收集Worker] 收到第 2/10 个结果: Worker0 计算 1^2 = 1
[收集Worker] 收到第 3/10 个结果: Worker1 计算 3^2 = 9
[计算Worker 2] 完成 5^2 = 25，已发送到结果队列
[计算Worker 1] 完成 6^2 = 36，已发送到结果队列
[计算Worker 0] 完成 4^2 = 16，已发送到结果队列
[收集Worker] 收到第 4/10 个结果: Worker2 计算 5^2 = 25
[收集Worker] 收到第 5/10 个结果: Worker1 计算 6^2 = 36
[收集Worker] 收到第 6/10 个结果: Worker0 计算 4^2 = 16
[计算Worker 2] 完成 7^2 = 49，已发送到结果队列
[计算Worker 1] 完成 8^2 = 64，已发送到结果队列[计算Worker 0] 完成 9^2 = 81，已发送到结果队列

[计算Worker 1] 收到结束信号，退出
[计算Worker 0] 收到结束信号，退出
[收集Worker] 收到第 7/10 个结果

**Task 3 核心改进**：

### 架构对比

| 维度 | Task 2（主进程收集） | Task 3（专用 Worker 收集） |
|------|---------------------|--------------------------|
| **收集时机** | 等待所有计算完成后收集 | 边计算边收集（流水线） |
| **并行度** | 计算阶段并行，收集串行 | 计算和收集完全并行 |
| **响应速度** | 最后批量显示结果 | 实时显示每个结果 |
| **适用场景** | 小任务量，快速计算 | 大任务量，长时间运行 |

### 流水线优势

```
Task 2 时间线（串行收集）:
[计算] ████████████ (所有任务完成)
[收集]             ████ (批量收集)
总耗时: 计算时间 + 收集时间

Task 3 时间线（并行收集）:
[计算] ████████████
[收集] ████████████ (同时进行)
总耗时: max(计算时间, 收集时间)
```

### 实际应用场景
- **数据处理管道**：多进程解析文件 → 专用进程写入数据库
- **深度学习训练**：DataLoader 加载数据 → 训练进程消费数据（PyTorch 内部机制）
- **日志收集**：多服务产生日志 → 专用进程聚合写入文件