# TorchData 节点式数据管线速查笔记

> 记录 2024 年 6 月 TorchData 路线更新、`torchdata.nodes` Beta 能力与迁移建议，帮助在 `torch.utils.data` 基础上升级多线程/多进程并行、流式管线与中途断点恢复。若环境尚未安装 `torchdata`，可先阅读文本理解设计理念。

## 状态更新：DataPipes 与 DataLoader V2 退场
- **2024-06 公告**：官方停更 `DataPipes` 与 `DataLoaderV2`，TorchData 仓库回归逐步增强 `torch.utils.data.DataLoader` 的路线。
- **弃用计划**：`torchdata==0.8.0`（2024-07）开始标记弃用，`0.10.0`（2024 年底）彻底删除；需使用旧实现请锁定 `torchdata<=0.9.0`。
- **迁移建议**：提前评估新 API，尤其是 `torchdata.nodes` 与 `torchdata.stateful_dataloader`，避免后期被动升级。

## torchdata.nodes (Beta) 快速概览
- **定位**：提供一组可组合的 *迭代器*（Iterator），将数据加载拆为节点式流水线，兼容流式、Map-style、Sampler 等模式。
- **核心特性**：
  - 同时支持多线程与多进程，并可按节点粒度选择并行方式。
  - 通过 `state_dict` / `load_state_dict` 支持中途 checkpoint。
  - 强制迭代器范式（实现 `next`, `reset`, `get_state`），简化状态管理。
- **安装**：`pip install torchdata>=0.10.0`。

In [None]:
# 检查 torchdata 可用性并输出版本
try:
    import torchdata
    print('torchdata 版本:', torchdata.__version__)
    from torchdata import nodes as tn
    print('torchdata.nodes 可用：', hasattr(torchdata, 'nodes'))
except Exception as err:
    print('未检测到 torchdata，或导入失败 ->', err)
    print('请先安装: pip install torchdata>=0.10.0')

## 为什么需要 torchdata.nodes？
- **多进程痛点**：传统 DataLoader 需复制 Dataset 内存、IPC 队列慢、必须在 worker 端做批处理。
- **多线程重回舞台**：配合 GIL 释放函数与 Free-Threaded Python，多线程不再受限，`nodes` 可自由切换线程/进程。
- **流式数据模型**：原 Map-style 难以扩展至大规模和多数据集场景，`nodes` 采用迭代器链路，天然支持多源融合。
- **多数据集策略**：现有 Sampler 仅面向单数据集，难以实现加权采样、Round-Robin 等策略；`nodes` 通过组合节点解决。
- **IterableDataset 分片**：传统方案需手动调用 `get_worker_info`，`nodes` 则让分片逻辑内聚在节点中。

## 架构要点与设计选择
- **BaseNode**：所有节点需继承 `torchdata.nodes.BaseNode` 并实现 `next()`, `reset(initial_state)`, `get_state()`；禁止使用生成器以确保状态显式可控。
- **Loader**：将 `BaseNode` 包装成熟悉的 Iterable，统一处理 `reset()`、`state_dict()`。
- **迭代器优先**：所有组件处理的是 Iterator，而非 Iterable，避免“多活迭代器”状态难题。
- **状态管理**：通过 `reset(initial_state)` 复现任意时刻状态，支持 mid-epoch checkpoint。

### 常用节点速览
- `IterableWrapper`：将任意 Iterable 包装为 BaseNode。
- `SamplerWrapper`：包装 `torch.utils.data.Sampler` 并保持 `set_epoch` 钩子。
- `Batcher` / `Unbatcher`：批处理与扁平化。
- `Mapper` / `ParallelMapper`：串行或并行映射函数，可选线程/进程、是否保持顺序。
- `Prefetcher` / `PinMemory`：预取与页锁内存。
- `MultiNodeWeightedSampler`：权重调度多数据源。

## 入门示例
以下示例在未安装 torchdata 时不会执行，可在 Notebook 中观察或待安装后运行。

In [None]:
try:
    from torchdata.nodes import IterableWrapper, ParallelMapper, Loader
    node = IterableWrapper(range(10))
    node = ParallelMapper(node, map_fn=lambda x: x ** 2, num_workers=3, method="thread")
    loader = Loader(node)
    print(list(loader))
except ImportError as err:
    print('缺少 torchdata，跳过示例 ->', err)

In [None]:
try:
    import torch.utils.data
    from torch.utils.data import RandomSampler
    from torchdata.nodes import SamplerWrapper, ParallelMapper, Loader

    class SquaredDataset(torch.utils.data.Dataset):
        def __getitem__(self, i: int) -> int:
            return i ** 2
        def __len__(self):
            return 10

    dataset = SquaredDataset()
    sampler = RandomSampler(dataset)
    node = SamplerWrapper(sampler)
    node = ParallelMapper(node, map_fn=dataset.__getitem__, num_workers=3, method="thread")
    loader = Loader(node)
    print(list(loader))
except ImportError as err:
    print('缺少 torchdata，跳过示例 ->', err)

## 迁移指南：从 DataLoader 到 Nodes 管线
- 构建顺序：`SamplerWrapper` → `Batcher` → `Mapper/ParallelMapper` → 可选 `PinMemory`/`Prefetcher` → `Loader`。
- `Loader` 负责在 epoch 之间调用 `reset()` 并暴露 `state_dict()` 接口。
- `map_fn` 可封装 `Dataset.__getitem__` + `collate_fn`，与原 DataLoader 行为匹配。

In [None]:
from typing import List, Callable

try:
    import torchdata.nodes as tn
    from torch.utils.data import RandomSampler, SequentialSampler, default_collate, Dataset

    class MapAndCollate:
        def __init__(self, dataset: Dataset, collate_fn: Callable):
            self.dataset = dataset
            self.collate_fn = collate_fn
        def __call__(self, batch_indices: List[int]):
            batch = [self.dataset[i] for i in batch_indices]
            return self.collate_fn(batch)

    def nodes_dataloader(
        dataset: Dataset,
        batch_size: int,
        shuffle: bool,
        num_workers: int,
        collate_fn: Callable | None,
        pin_memory: bool,
        drop_last: bool,
    ):
        sampler = RandomSampler(dataset) if shuffle else SequentialSampler(dataset)
        node = tn.SamplerWrapper(sampler)
        node = tn.Batcher(node, batch_size=batch_size, drop_last=drop_last)
        map_fn = MapAndCollate(dataset, collate_fn or default_collate)
        node = tn.ParallelMapper(node, map_fn=map_fn, num_workers=num_workers, method="process", in_order=True)
        if pin_memory:
            node = tn.PinMemory(node)
        node = tn.Prefetcher(node, prefetch_factor=max(1, num_workers * 2))
        return tn.Loader(node)
except ImportError as err:
    print('缺少 torchdata，示例仅供参考 ->', err)

### 示例：状态恢复
- `Loader.state_dict()` 返回上一迭代器的状态，可在中途保存。
- `Loader.load_state_dict(sd)` 恢复后续迭代位置，便于断点续训。

In [None]:
try:
    import torchdata.nodes as tn
    import torch
    from torch.utils.data import Dataset

    class SquaredDataset(Dataset):
        def __len__(self):
            return 14
        def __getitem__(self, idx: int):
            return idx ** 2

    loader = nodes_dataloader(
        dataset=SquaredDataset(),
        batch_size=3,
        shuffle=False,
        num_workers=2,
        collate_fn=None,
        pin_memory=False,
        drop_last=False,
    )

    cached = []
    state = None
    for i, batch in enumerate(loader):
        cached.append(batch)
        if i == 2:
            state = loader.state_dict()
            break
    if state is not None:
        loader.load_state_dict(state)
        resumed = list(loader)
        print('断点后批次数:', len(resumed))
        print('前后批次一致:', cached[3:] == resumed)
except Exception as err:
    print('运行示例失败 ->', err)

## 性能要点
- 多线程与多进程可按节点粒度选择；在 Free-Threaded Python (3.13t) 中多线程可饱和内存带宽。
- 可将繁重预处理放在 `ParallelMapper`，GPU 预处理也可在多线程模式下进行。
- 早期基准表明视频解码场景中 `nodes` 与原 DataLoader 持平或略优；详见官方 PyTorch Conf 2024 分享。

## 设计决策回顾
- 禁止生成器：生成器隐式存储栈状态，难以落地通用的 `state_dict`。
- 显式 `reset(initial_state)`：集中处理初始化逻辑，并确保状态恢复可控。
- Loader 统一管理 epoch 生命周期，用户无需显式调用 `reset()`。
- 警惕 `StopIteration`：节点在流式管线中需明确何时结束，Loader 可配置 `restart_on_stop_iteration`。

## StatefulDataLoader：DataLoader 版断点续训
- `torchdata.stateful_dataloader.StatefulDataLoader` 为现有 DataLoader 增强版，提供 `state_dict`/`load_state_dict`。
- 支持聚合 Sampler/Dataset 状态，并在多进程间转发。
- `snapshot_every_n_steps` 可调节同步频率，权衡 checkpoint 精度与开销。

In [None]:
try:
    from torchdata.stateful_dataloader import StatefulDataLoader
    import torch
    from torch.utils.data import Dataset

    class NoisyRange(Dataset):
        def __init__(self, high: int, mean: float = 0.0, std: float = 1.0):
            self.high, self.mean, self.std = high, mean, std
        def __len__(self):
            return self.high
        def __getitem__(self, idx: int):
            noise = torch.randn(1).item() * self.std + self.mean
            return idx + noise

    dl = StatefulDataLoader(NoisyRange(8), batch_size=2, num_workers=0)
    state = None
    for i, batch in enumerate(dl):
        print('batch', i, batch)
        if i == 1:
            state = dl.state_dict()
            break
    if state:
        dl.load_state_dict(state)
        print('恢复后继续:', list(dl))
except ImportError as err:
    print('缺少 torchdata，跳过 StatefulDataLoader 示例 ->', err)
except Exception as err:
    print('StatefulDataLoader 示例运行失败 ->', err)

## 迁移与版本策略
- **短期策略**：依赖 DataPipes / DataLoaderV2 的项目需锁定 `torchdata<=0.9.0`，并规划迁移到 `nodes` 或 `StatefulDataLoader`。
- **测试验证**：引入 `nodes` 后需重点验证并行方式、状态恢复、性能指标。
- **反馈渠道**：官方建议通过 GitHub issue 提供建议，帮助完善未来路线。

## 参考资料
- 官方公告：`torchdata` June 2024 Status Update
- 文档：Getting Started With torchdata.nodes (beta)
- 教程：Migrating to torchdata.nodes from torch.utils.data
- API：`torchdata.nodes`、`torchdata.stateful_dataloader`