# [5、深入剖析PyTorch DataLoader源码](https://www.bilibili.com/video/BV1kq4y1G75V?spm_id_from=333.788.player.switch&vd_source=cdd897fffb54b70b076681c3c4e4d45d)

In [None]:
import torch
import torchvision as tv
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
from torchvision import datasets
from torchvision.transforms import ToTensor
import matplotlib.pyplot as plt

## Dataloader

loader用于把单个样本组织成一个个批次，用于模型训练

文档：
`DataLoader` 结合了数据集和采样器，并提供对给定数据集的可迭代访问。

`~torch.utils.data.DataLoader` 支持 map-style 和 iterable-style 的数据集，可进行单进程或多进程加载，自定义加载顺序以及可选的自动批处理（collation）和内存固定（memory pinning）。

更多详情请参见 :py:mod:`torch.utils.data` 文档页面。

**参数:**

* **dataset** (`Dataset`)：从中加载数据的数据集。
* **batch_size** (`int`, 可选)：每批加载多少个样本（默认值：`1`）。
* **shuffle** (`bool`, 可选)：设置为 `True` 时，会在每个 epoch 重新打乱数据（默认值：`False`）。
* **sampler** (`Sampler` or `Iterable`, 可选)：定义从数据集中抽取样本的策略。可以是任何实现了 `__len__` 的 `Iterable` 对象。如果指定了此参数，则 `shuffle` 不能被指定。
* **batch_sampler** (`Sampler` or `Iterable`, 可选)：与 `sampler` 类似，但一次返回一批索引。此参数与 `batch_size`、`shuffle`、`sampler` 和 `drop_last` 互斥。
* **num_workers** (`int`, 可选)：用于数据加载的子进程数量。`0` 表示在主进程中加载数据。（默认值：`0`）
* **collate_fn** (`Callable`, 可选)：将样本列表合并成一个 mini-batch 的张量（Tensor）。当从 map-style 数据集进行批处理加载时使用。
* **pin_memory** (`bool`, 可选)：如果为 `True`，数据加载器在返回张量之前会将其复制到设备/CUDA 的固定内存中。如果您的数据元素是自定义类型，或者您的 `collate_fn` 返回的是一个自定义类型的批次，请参阅下面的示例。
* **drop_last** (`bool`, 可选)：如果数据集大小不能被批大小整除，则设置为 `True` 以删除最后一个不完整的批次。如果为 `False` 并且数据集大小不能被批大小整除，则最后一个批次将较小。（默认值：`False`）
* **timeout** (`numeric`, 可选)：如果为正数，则表示从工作进程（worker）收集一批数据的超时值。应始终为非负数。（默认值：`0`）
* **worker_init_fn** (`Callable`, 可选)：如果不是 `None`，这个函数将在每个工作子进程中被调用，以工作进程 id（一个在 `[0, num_workers - 1]` 范围内的整数）作为输入，在设定种子之后和数据加载之前执行。（默认值：`None`）
* **multiprocessing_context** (`str` or `multiprocessing.context.BaseContext`, 可选)：如果为 `None`，将使用操作系统的默认 `multiprocessing context`_。（默认值：`None`）
* **generator** (`torch.Generator`, 可选)：如果不是 `None`，这个随机数生成器（RNG）将被 RandomSampler 用于生成随机索引，并被 multiprocessing 用于为工作进程生成 `base_seed`。（默认值：`None`）
* **prefetch_factor** (`int`, 可选, 仅限关键字参数)：每个工作进程提前加载的批次数。`2` 表示所有工作进程总共将预取 `2 * num_workers` 个批次。（默认值取决于 `num_workers` 的设置。如果 `num_workers=0`，默认值为 `None`。否则，如果 `num_workers > 0`，默认值为 `2`）。
* **persistent_workers** (`bool`, 可选)：如果为 `True`，数据加载器在数据集被消耗一次后不会关闭工作进程。这允许保持工作进程中的 `Dataset` 实例处于活动状态。（默认值：`False`）
* **pin_memory_device** (`str`, 可选)：当 `pin_memory` 为 `True` 时，指定要固定内存的设备。如果未指定，将默认为当前的 `accelerator<accelerators>`。不推荐使用此参数，并计划在未来弃用。
* **in_order** (`bool`, 可选)：如果为 `False`，数据加载器将不强制批次按先进先出的顺序返回。仅在 `num_workers > 0` 时适用。（默认值：`True`）


如果使用 `spawn` 启动方法，`worker_init_fn` 不能是不可 pickle 的对象，例如 lambda 函数。有关 PyTorch 中多进程处理的更多详细信息，请参阅 `multiprocessing-best-practices`。

`len(dataloader)` 的长度推算是基于所使用的采样器长度。当 `dataset` 是一个 `~torch.utils.data.IterableDataset` 时，它会返回一个基于 `len(dataset) / batch_size` 的估计值，并根据 `drop_last` 进行适当的四舍五入，而不管多进程加载的配置如何。这代表了 PyTorch 能做出的最佳猜测，因为 PyTorch 相信用户的 `dataset` 代码能正确处理多进程加载以避免数据重复。

> 然而，如果数据分片（sharding）导致多个工作进程有不完整的最后一个批次，这个估计值仍然可能不准确，因为 (1) 一个原本完整的批次可能被分成多个批次，以及 (2) 当设置 `drop_last` 为 True 时，可能会丢弃超过一个批次大小的样本。不幸的是，PyTorch 通常无法检测到这种情况。

> 有关这两种数据集类型以及 `~torch.utils.data.IterableDataset` 如何与 `多进程数据加载`_ 交互的更多详细信息，请参阅 `数据集类型`_。

有关随机种子相关的问题，请参阅 `reproducibility`、`dataloader-workers-random-seed` 和 `data-loading-randomness` 的说明。

将 `in_order` 设置为 `False` 可能会损害可复现性，并且在数据不平衡的情况下，可能导致馈送给训练器的数据分布出现偏差。

_multiprocessing context:
    [https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods](https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods)

In [None]:
training_data = datasets.FashionMNIST(
    root="minist_dataset", train=True, download=True, transform=ToTensor()
)

test_data = datasets.FashionMNIST(
    root="minist_dataset", train=False, download=True, transform=ToTensor()
)

In [None]:
from torch.utils.data import DataLoader

train_dataloader = DataLoader(training_data, batch_size=64, shuffle=True)
test_dataloader = DataLoader(test_data, batch_size=64, shuffle=False)

可以自定义 sample 用于采样，但是和 shuffle 冲突
```py
if sampler is not None and shuffle:
    raise ValueError("sampler option is mutually exclusive with shuffle")
```

默认 sampler 是 `RandomSampler` 和 `SequentialSampler`
```py
if sampler is None:  # give default samplers
    if self._dataset_kind == _DatasetKind.Iterable:
        # See NOTE [ Custom Samplers and IterableDataset ]
        sampler = _InfiniteConstantSampler()
    else:  # map-style
        if shuffle:
            sampler = RandomSampler(dataset, generator=generator)  # type: ignore[arg-type]
        else:
            sampler = SequentialSampler(dataset)  # type: ignore[arg-type]
```


关于 `batch_sampler`，其实就是一个个地取数据，满了就返回一个batch的index：

```py
def __iter__(self) -> Iterator[list[int]]:
    # Implemented based on the benchmarking in https://github.com/pytorch/pytorch/pull/76951
    sampler_iter = iter(self.sampler)
    if self.drop_last:
        # Create multiple references to the same iterator
        args = [sampler_iter] * self.batch_size
        for batch_droplast in zip(*args):
            yield [*batch_droplast]
    else:
        batch = [*itertools.islice(sampler_iter, self.batch_size)]
        while batch:
            yield batch
            batch = [*itertools.islice(sampler_iter, self.batch_size)]
```


## collate_fn

```py
if collate_fn is None:
    if self._auto_collation:
        collate_fn = _utils.collate.default_collate
    else:
        collate_fn = _utils.collate.default_convert
```

默认使用`default_collate`，只做了点数据转换，其他的什么事也不干TAT