# 从入门到 vmap 大神：PyTorch vmap 教程（含手写 mini_vmap）
更新时间：2025-09-20

本笔记目标：
- 直观理解 `vmap` 的语义与常用参数；
- 观察一次 vmap 的执行过程；
- **亲手实现一个可用的 `mini_vmap`**（支持 `in_dims/out_dims/randomness/chunk_size` 与 pytree）。


## 1. 快速上手


In [1]:

import torch
print("torch version:", torch.__version__)

def per_sample_fn(t):
    return (t * t + torch.sin(t)).sum()

batch = torch.randn(5, 4, requires_grad=True)

# 手写 for 循环（基准）
loop_out = torch.stack([per_sample_fn(batch[i]) for i in range(batch.size(0))])

# vmap 版本（无 Python 显式 for）
batched_fn = torch.vmap(per_sample_fn)
vmap_out = batched_fn(batch)

print("一致性:", torch.allclose(loop_out, vmap_out))
print("输出形状: loop", loop_out.shape, "| vmap", vmap_out.shape)


torch version: 2.7.1+cu126
一致性: True
输出形状: loop torch.Size([5]) | vmap torch.Size([5])


## 2. 常用参数（in_dims / out_dims / randomness / chunk_size）


In [None]:
import torch

# in_dims / out_dims
x = torch.randn(2, 5)
f = lambda z: z ** 2
v1 = torch.vmap(f, out_dims=1)
print("out_dims=1 结果形状:", v1(x).shape)  # 期望 [5, 2]

def dot_scale(a, b, scale: float = 1.0):
    return torch.dot(a, b) * scale

A = torch.randn(4, 8)  # [N, D]
b = torch.randn(8)     # [D] 不带批维
v2 = torch.vmap(dot_scale, in_dims=(0, None))
print("多输入 + None in_dim:", v2(A, b, scale=2.0).shape)  # [N]

# randomness：演示 vmap 的随机策略
def add_rand(z):
    return z + torch.rand_like(z)

Z = torch.zeros(3, 4)

# 默认 randomness='error'：在 vmap 内调用随机算子会报错（演示）
try:
    torch.vmap(add_rand)(Z)
except Exception as e:
    print("randomness=error ->", type(e).__name__)

# randomness='same'：每个样本用同一份随机数
same = torch.vmap(add_rand, randomness='same')(Z)
print("same 行相等? ->", torch.allclose(same[0], same[1]) and torch.allclose(same[1], same[2]))

# randomness='different'：每个样本各用一份不同随机数
diff = torch.vmap(add_rand, randomness='different')(Z)
print("different 行相等? ->", torch.allclose(diff[0], diff[1]))

# ------------------------------
# 方案 A：把 heavy 的随机权重移到 vmap 外（推荐）
# ------------------------------

# 原来：在 vmap 内生成随机 W，会触发 randomness='error' 报错
# def heavy(op_in):
#     W = torch.randn(op_in.size(-1), op_in.size(-1))
#     return (op_in @ W).relu()

# 修正：在 vmap 外只生成一次 W，然后当作参数传入
def heavy_with_W(op_in, W_mat):
    return (op_in @ W_mat).relu()

big = torch.randn(64, 128)
W = torch.randn(big.size(-1), big.size(-1))  # 在 vmap 外生成一次

# vmap：第一个参数是批维（dim=0），第二个参数 W 不带批维
full = torch.vmap(heavy_with_W, in_dims=(0, None))(big, W)

# chunk_size 版本：语义应与 full 等价（同一份 W）
chunked = torch.vmap(heavy_with_W, in_dims=(0, None), chunk_size=16)(big, W)

print("chunk 等价? ->", torch.allclose(full, chunked, atol=1e-5, rtol=1e-5))
print("输出形状:", full.shape)


out_dims=1 结果形状: torch.Size([5, 2])
多输入 + None in_dim: torch.Size([4])
randomness=error -> RuntimeError
same 行相等? -> True
different 行相等? -> False
chunk 等价? -> True
输出形状: torch.Size([64, 128])


## 3. 观察一次分发（可选）


In [8]:

from torch.utils._python_dispatch import TorchDispatchMode

def _brief(x):
    if isinstance(x, torch.Tensor):
        return f"Tensor(device={x.device}, dtype={x.dtype}, shape={tuple(x.shape)}, requires_grad={x.requires_grad})"
    return repr(x)

class LoggingMode(TorchDispatchMode):
    def __torch_dispatch__(self, func, types, args=(), kwargs=None):
        kwargs = kwargs or {}
        name_attr = getattr(func, "name", None)
        op_full = name_attr() if callable(name_attr) else str(func)
        arg_desc = ", ".join(_brief(a) for a in args)
        print(f"[DISPATCH] op={op_full} | args=[{arg_desc}]")
        return func(*args, **kwargs)

def per_ex(a, b):
    return (a + b).relu()

A = torch.randn(3, 3, 3)
B = torch.randn(3, 3, 3)

with LoggingMode():
    torch.vmap(per_ex)(A, B)


[DISPATCH] op=aten::add.Tensor | args=[Tensor(device=cpu, dtype=torch.float32, shape=(3, 3, 3), requires_grad=False), Tensor(device=cpu, dtype=torch.float32, shape=(3, 3, 3), requires_grad=False)]
[DISPATCH] op=aten::relu | args=[Tensor(device=cpu, dtype=torch.float32, shape=(3, 3, 3), requires_grad=False)]


# 4.vamp源码阅读

这里会解读vmap.py，帮助读者更好的理解vmap的实现和机制

## 辅助函数部分


### `lazy_load_decompositions` 源码详注

> 背景：在 `torch.func`/functorch 的 vmap/grad 等变换里，很多高阶算子需要“分解（decomposition）”为更基础、可变换/向量化的 ATen 原语。由于 TorchScript / 打包环境（`torch.package`）、以及 Python 3.11 的部分限制，这些分解并不是总能在进程启动时就安全注册，所以这里采用“**按需（lazy）加载**”的策略，只在需要且允许时注册到库中。参考：`torch.library` 官方文档、TorchScript 环境变量 `PYTORCH_JIT`、functorch 源码中对 vmap decomposition 的实现说明。  
> 参阅：torch.library 文档、TorchScript/`PYTORCH_JIT`、functorch eager_transforms 源码、functorch → torch.func 迁移说明。  
> 文档：[[torch.library]](https://docs.pytorch.org/docs/stable/library.html)｜[[TorchScript/ PYTORCH_JIT]](https://docs.pytorch.org/docs/stable/jit.html)｜[[env vars]](https://docs.pytorch.org/docs/stable/torch_environment_variables.html)｜[[functorch 源码片段]](https://docs.pytorch.org/functorch/1.13/_modules/functorch/_src/eager_transforms.html)｜[[functorch→torch.func]](https://docs.pytorch.org/docs/stable/func.migrating.html)




In [None]:
# torch.package, Python 3.11, and torch.jit-less environments are unhappy with
# decompositions. Only load them when needed if possible.
def lazy_load_decompositions():
    global DECOMPOSITIONS_LOADED
    if DECOMPOSITIONS_LOADED:
        return
    # ↑ 全局幂等保护：如果已经装载过分解表（decomposition table），直接返回，避免重复注册。

    with DECOMPOSITIONS_LOCK:
        if DECOMPOSITIONS_LOADED:
            return
        # ↑ 二次检查 + 互斥锁：多线程/并发场景下防止重复装载。

        # 只有在 JIT 打开 且 处于 debug 模式（__debug__ 为 True）时，才尝试注册 Python 侧分解。
        # 否则直接标记为“已加载”并返回（不做任何注册），以规避：
        # - 某些打包/环境下（如 torch.package、Python 3.11、禁用 JIT）对分解注册不友好的情况；
        # - TorchScript 兼容性与类型系统限制（见下）。
        if not (os.environ.get("PYTORCH_JIT", "1") == "1" and __debug__):
            DECOMPOSITIONS_LOADED = True
            return
        # 参考：PYTORCH_JIT 环境变量可禁用 JIT；禁用时一般不应执行基于 TorchScript 的注册路径。
        # 文档：TorchScript / PYTORCH_JIT（官方）.

        # 使用 torch.library 的“替代”注册方式把 Python decomposition 塞到表里。
        # 直接用 _register_jit_decomposition 在某些算子（例如 aten::addr）上会失败：
        #   原因是 TorchScript 生成的 Tensor 类型在一些分支上无法合并（union），导致类型系统报错。
        # 因此这里通过 torch.library.Library("aten", "IMPL", "FuncTorchBatched")
        # 来向现有 aten 库注入 IMPL（实现）以覆盖/提供 decomposition。
        global VMAP_DECOMPOSITIONS_LIB
        VMAP_DECOMPOSITIONS_LIB = torch.library.Library(
            "aten", "IMPL", "FuncTorchBatched"
        )
        # 参考：torch.library 允许对现有算子命名空间（如 "aten"）添加实现/别名/后端覆写等。
        # 文档：torch.library（官方）。

        from torch._decomp import decomposition_table
        # ↑ PyTorch 维护的 Python 侧 decomposition 表（op → Python 实现函数），
        #   常用于导出/编译/变换（如 torch.export.run_decompositions、AOTAutograd）等流程。
        # 文档：export.run_decompositions / Core ATen decomposition（官方）。

        def _register_python_decomposition_vmap(decomp):
            # 如果该 op 在 decomposition_table 里有 Python 实现，则通过 torch.library 注入实现；
            # 否则报错提示该 op 没有找到相应 decomposition。
            if decomp in decomposition_table:
                VMAP_DECOMPOSITIONS_LIB.impl(decomp, decomposition_table[decomp])
            else:
                raise RuntimeError(f"could not find decomposition for {decomp}")

        # 下面把若干“梯度/损失类”算子和一个“线性代数”算子的 Python decomposition 挂进表里，
        # 这样 vmap/grad 之类的变换在遇到它们时可以退化成更基础的 ATen 原语组合来执行。
        _register_python_decomposition_vmap(torch.ops.aten.mse_loss_backward.default)
        _register_python_decomposition_vmap(
            torch.ops.aten.smooth_l1_loss_backward.default
        )
        _register_python_decomposition_vmap(torch.ops.aten.huber_loss_backward.default)
        _register_python_decomposition_vmap(torch.ops.aten.nll_loss_forward.default)
        _register_python_decomposition_vmap(torch.ops.aten.nll_loss2d_forward.default)
        _register_python_decomposition_vmap(torch.ops.aten.nll_loss_backward.default)
        _register_python_decomposition_vmap(torch.ops.aten.nll_loss2d_backward.default)
        _register_python_decomposition_vmap(torch.ops.aten.addr.default)
        # 注：functorch 源码注释里明确提到 “addr 在 _register_jit_decomposition 路径会出问题”，
        #     因而采用 torch.library 的 IMPL 注入作为替代。
        # 参阅：functorch eager_transforms 源码中的相同注释。

        DECOMPOSITIONS_LOADED = True
        # ↑ 成功注册后设置已加载标志，保证后续调用不会重复执行。



代码片断是 “注册 decomposition 到某个 DispatchKey 后端”的操作

1. **为什么要 “lazy” 加载？**  
   - 某些运行环境（如 `torch.package` 打包、Python 3.11、或“无 JIT”模式）对 TorchScript/分解注册不友好；  
   - 过早注册可能触发类型系统/脚本化限制（例如 `aten::addr` 的类型并合问题），导致运行期错误；  
   - 按需注册能减少冷启动开销，并把失败范围限定在真正需要用到分解的场景里。  
   参考：TorchScript/`PYTORCH_JIT` 环境变量（官方），functorch 源码注释。  
   资料：[[TorchScript/ PYTORCH_JIT]](https://docs.pytorch.org/docs/stable/jit.html)｜[[functorch 源码片段]](https://docs.pytorch.org/functorch/1.13/_modules/functorch/_src/eager_transforms.html)

2. **`PYTORCH_JIT` 与条件分支**  
   - 代码里要求 `PYTORCH_JIT == "1"` 且 `__debug__`（非 `-O` 运行）才进行注册；否则直接 “标记已加载并返回”，避免在不支持脚本化/不适合调试的环境里动分解表；  
   - `PYTORCH_JIT=0` 会关闭所有脚本/trace 注解，让模型以 Python 态执行，便于调试，但也意味着很多依赖 TorchScript 的路径应当避免。  
   资料：[[TorchScript 文档-禁用 JIT]](https://docs.pytorch.org/docs/stable/jit.html)｜[[环境变量总览]](https://docs.pytorch.org/docs/stable/torch_environment_variables.html)

3. **为何用 `torch.library.Library("aten", "IMPL", "...")`？**  
   - `torch.library` 提供对算子库的“扩展/覆写”入口（创建自定义算子、为既有算子增加后端实现/别名等）；  
   - 设置 `"aten"` 命名空间 + `"IMPL"` 表示给现有 ATen 算子提供实现层面的注册；  
   - 这里用 `"FuncTorchBatched"` 作为后端/命名标签，承载 vmap/分解语义下的实现。  
   资料：[[torch.library 官方]](https://docs.pytorch.org/docs/stable/library.html)

4. **`decomposition_table` 是什么？**  
   - Python 侧的“算子分解表”（op → Python 实现函数），广泛用于 `torch.export.run_decompositions`、AOTAutograd/`torch.compile` 及相关 passes，把复杂算子**替换**为核心 ATen 原语组合（Core ATen set），便于后端/编译器处理；  
   - “Core ATen” 是一组**不会再继续分解**的基础算子集合；默认的 decompositions 会把高阶 op 分解到这套核心集合里。  
   资料：[[export.run_decompositions]](https://docs.pytorch.org/docs/stable/export.html)｜[[Core ATen 定义]](https://docs.pytorch.org/executorch/stable/ir-ops-set-definition.html)｜（中文）[[export 教程（含 Core ATen 描述）]](https://pytorch-cn.com/tutorials/intermediate/torch_export_tutorial.html)

5. **为什么特别点名 `aten::addr`？**  
   - functorch 源码注释里指出 `_register_jit_decomposition` 在 `addr` 上会失败（TorchScript 类型系统无法对该算子的 Tensor 类型做 union），因此用 `torch.library` 的 IMPL 注入规避；  
   - 这类“绕过 JIT 的 Python 侧注册”思路常见于 vmap/grad 的特殊链路。  
   资料：[[functorch 源码片段]](https://docs.pytorch.org/functorch/1.13/_modules/functorch/_src/eager_transforms.html)

6. **和 `torch.func` / functorch 的关系**  
   - functorch 已经并入 PyTorch 核心库（`torch.func`），vmap/grad 等变换的实现依赖分解/包装（BatchedTensor）等机制；  
   - 迁移指引详见官方说明。  
   资料：[[functorch→torch.func 迁移]](https://docs.pytorch.org/docs/stable/func.migrating.html)

7. **关于decomposition_table**
   - `decomposition_table`(由`torch._decomp`维护) 是 PyTorch 内部用来存储「operator 分解（decomposition）」映射的一个结构，它把算子（operator）映射到一个 Python 实现函数，用于将一个算子拆解成一组更基础／更底层算子的组合。在很多变换／导出／编译流程中，这样的分解机制非常重要。这个表格（通常是一个字典／映射）里的键 (key) 是某些 ATen operator（torch.ops.aten.xxx），值 (value) 是对应一个 Python 函数／callable，这个函数能实现在更基础／更核心算子上的等价操作。

---

#### 小结

- 该函数通过**幂等+加锁**保障只注册一次；  
- 仅在 **JIT 开启且 debug 模式** 下注册 Python decompositions，其他环境直接跳过；  
- 使用 `torch.library` 的 **IMPL** 路径把 `decomposition_table` 中已存在的分解实现“挂到” `aten` 算子上（尤其为了解决 `addr` 等在 TorchScript 路径中的类型限制问题）；  
- 这让 vmap/grad/compile/export 等变换在遇到这些 op 时，能退化为“核心 ATen”组合，获得更好的兼容性与可编译性。  

## 核心impl实现

In [None]:
def vmap_impl(func, in_dims, out_dims, randomness, chunk_size, *args, **kwargs):
    # 1) 惰性加载一些“分解规则”（decompositions），
    #    供后续把复杂算子拆成更基础、可自动向量化的原语用。
    #    好处：避免模块级 import 时就做重活儿，延迟到真正调用 vmap 时再加载。
    lazy_load_decompositions()

    # 2) 预检查 out_dims 的形状/类型是否和 func 的输出结构匹配：
    #    - 允许 int 或“PyTree of int”（比如与多输出结构同构的树）。
    #    - 不匹配会尽早报错，避免跑到一半才发现维度对不齐。
    _check_out_dims_is_int_or_int_pytree(out_dims, func)

    # 3) 解析输入批维并把实参拍平：
    #    - 根据 in_dims（可为 int / None / PyTree of int/None），
    #      找到每个输入张量上“要被 vmap 的那一维”（通常是 batch 维）。
    #    - 统一拍平成 flat_args（方便后续统一处理），
    #      并记录 flat_in_dims 和 args_spec（原 PyTree 结构的规格信息）。
    #    - 同时确定 batch_size（即被映射的维度长度），用于后续切分或校验。
    batch_size, flat_in_dims, flat_args, args_spec = _process_batched_inputs(
        in_dims, args, func
    )

    # 4) 如果指定了 chunk_size，则走“分块 vmap”路径：
    #    背景：当 batch 很大时直接向量化可能占用内存大或不够稳，
    #    把 batch 维按 chunk_size 切成若干小块，逐块做 vmap，再把结果拼起来。
    if chunk_size is not None:
        # 4.1) 依据 flat_in_dims 在 batch 维上对 flat_args 做切片，得到每块的实参列表。
        chunks_flat_args = _get_chunked_inputs(
            flat_args, flat_in_dims, batch_size, chunk_size
        )
        # 4.2) 逐块执行 vmap（内部会把 func 应用到每一块的样本上），
        #      并在 out_dims 指定的位置拼出结果的批维。
        #      注意 randomness 的含义：
        #      - 'error'：块内/块间如遇随机算子直接报错（默认行为）；
        #      - 'same'：各样本共享相同随机性；
        #      - 'different'：各样本使用独立随机性。
        #      该开关只影响 PyTorch 自身的随机算子，不影响 Python/numpy 的随机。参见官方说明。
        #      参见：torch.func.vmap(randomness=...) 文档。
        #      https://docs.pytorch.org/docs/stable/generated/torch.func.vmap.html
        return _chunked_vmap(
            func,
            flat_in_dims,
            chunks_flat_args,
            args_spec,
            out_dims,
            randomness,
            **kwargs,
        )

    # 5) 否则，走“整批 vmap”路径：
    #    - 一次性对 batch 维做自动向量化，不切块。
    #    - _flat_vmap 内部会完成：
    #        a) 把需要做 vmap 的输入包装成带批维的“BatchedTensor”，
    #        b) 调用 func，让其内部的张量运算在 BatchedTensor 语义下执行“批版算子”，
    #        c) 按 out_dims 还原输出结构并在指定位置插入批维。
    #    - 对随机策略 randomness 的处理同上。
    return _flat_vmap(
        func,
        batch_size,
        flat_in_dims,
        flat_args,
        args_spec,
        out_dims,
        randomness,
        **kwargs,
    )


## in_dims / out_dims / randomness / chunk_size 解释

### 什么是 `in_dims`

- `in_dims`：告诉 `vmap` “每个输入的哪个维度是要被批处理/映射（batch-map）过的维度”  
- 类型可以是：
  - `int`：对于所有输入张量，默认在这个维度被批处理（通常是 0）  
  - `None`：表示该输入 **不随 batch 变化**，即这个输入没有被 `vmap` 映射维度  
  - 与输入结构同构（PyTree）的嵌套结构（例如 list/tuple/dict），每个输入位置对应一个 `int` 或 `None`  
- 默认值：`0`

### 什么是 `out_dims`

- `out_dims`：告诉 `vmap` “输出里新增的批维（映射维）应该放到输出张量的哪个维度位置”  
- 类型可以是：
  - `int`：对所有输出张量都在这个维度插入 batch 维  
  - 与输出结构同构的嵌套结构／PyTree，如果函数有多个输出，每个输出对应一个位置 `int`  
- 默认值：`0`，即输出的第一个维度是批次维。

### `randomness` 的策略

- `randomness` 决定 “PyTorch 随机算子”（例如 `torch.rand_like`／`torch.randn` 等）在批处理／vmap 中的行为  
- 三种可选值：

  1. `'error'`  
     如果函数体中使用了 PyTorch 的随机生成功能，会报错。默认是这个模式。

  2. `'same'`  
     批次中的所有样本共享同一次随机性，例如同一个随机种子／同一个随机结果。用在你希望 batch 内部行为是一致随机的时候。

  3. `'different'`  
     为每个样本提供独立的随机性，即每个样本像独立调用随机函数那样，结果不同。用在你希望 batch 内每个元素随机行为不同的时候。

- 注意事项：

  - 这个 `randomness` 设定只影响 **PyTorch 的随机算子**，不影响 `Python random` 模块或 `numpy.random` 等。

### `chunk_size` 的作用

- 当批量（batch）很大时，直接把所有样本一次性向量化可能带来以下问题：

  - 内存开销太高  
  - 某些算子在大 batch 数下可能不稳定／效率低

- `chunk_size` 提供了一种折中的方式：把数据按 batch 维度切成若干块，每块大小为 `chunk_size`，对每块分别 vmap，然后把所有块的结果按 `out_dims` 指定位置 **拼接** 回来。

- 特别地：

  - 如果 `chunk_size = 1`，等价于手写的 `for` 循环逐样本处理（batch size 1 每次）  
  - 如果 `chunk_size = None`（默认），则一次性处理全部 batch 样本，不分块。

---

### 内部函数大致作用

下面是一些内部函数／步骤的作用，助于理解 `vmap_impl` 在 `in_dims / out_dims / randomness / chunk_size` 参数控制下是如何工作的：

| 内部函数 | 功能／做什么 |
|---|---------------|
| `lazy_load_decompositions()` | 延迟加载 “分解规则”（decompositions），供后续把复杂算子拆为基础的可批／可向量化处理方式用。 |
| `_check_out_dims_is_int_or_int_pytree(out_dims, func)` | 校验 `out_dims` 是一个合法的对象：要么是 `int`，要么是一个与 `func` 输出结构同构的 PyTree，其中每个叶子是 `int`。不合法就报错。 |
| `_process_batched_inputs(in_dims, args, func)` | 分析输入参数 `args`：<br> • 根据 `in_dims` 找出每个输入张量在哪个维度有 batch mapping；<br> • 拍平成扁平列表 `flat_args`，与之对应的 `flat_in_dims`；<br> • 保存输入的树型结构规格 `args_spec`（以便输出或结果重建结构）；<br> • 找到 `batch_size`（被映射的维度的大小）。 |
| `_get_chunked_inputs(flat_args, flat_in_dims, batch_size, chunk_size)` | 按 `chunk_size` 把 `flat_args` 在 batch 维上切成块，得到每个块的参数子集，用于后续按块处理。 |
| `_chunked_vmap(func, flat_in_dims, chunks_flat_args, args_spec, out_dims, randomness, **kwargs)` | 对每一块执行类似全量 vmap 的流程，再把每块的输出在 `out_dims` 的位置合并／拼接起来，形成与一次性 vmap 等效的结果（但内存要求可能更小、更加可控）。 |
| `_flat_vmap(func, batch_size, flat_in_dims, flat_args, args_spec, out_dims, randomness, **kwargs)` | 主干路径：一次性把所有样本做批处理（不分块）。<br>• 把输入包装成具有 batch 维（按照 `flat_in_dims`）的 BatchedTensor（或等价机制），使 func 内部操作能在这些含有 batch 的张量上进行；<br>• func 执行；<br>• 输出中插入 batch 维度到 `out_dims` 指定的位置；<br>• 按输出结构重组返回。 |


## 4. 手写 `mini_vmap`（教学版）


In [4]:

from torch.utils._pytree import tree_flatten, tree_unflatten

class MiniError(Exception): 
    pass

def _normalize_in_dims(in_dims, flat_len):
    if isinstance(in_dims, (int, type(None))):
        if flat_len != 1:
            raise MiniError("in_dims 是标量，但有多个输入叶子；请传结构化 in_dims")
        return (in_dims,)
    if isinstance(in_dims, (tuple, list)):
        if len(in_dims) != flat_len:
            raise MiniError(f"in_dims 长度 {len(in_dims)} 与输入叶子数 {flat_len} 不符")
        return tuple(in_dims)
    raise MiniError("in_dims 必须是 int/None 或 tuple/list")

def _infer_batch_size(flat_args, flat_in_dims):
    bs = None
    for a, d in zip(flat_args, flat_in_dims):
        if d is None:
            continue
        if not isinstance(a, torch.Tensor):
            raise MiniError("带 batch 维的输入必须是 Tensor")
        size_d = a.size(d if d >= 0 else a.dim() + d)
        if bs is None:
            bs = size_d
        elif bs != size_d:
            raise MiniError(f"批大小不一致：之前 {bs}，当前 {size_d}")
    if bs is None:
        raise MiniError("无法推断批大小：没有任何输入带 batch 维")
    return bs

def _select_along(x: torch.Tensor, dim: int, i: int):
    if dim < 0: 
        dim = x.dim() + dim
    return x.select(dim, i)

def _stack_outputs(per_item_outs, out_dims):
    flat0, spec = tree_flatten(per_item_outs[0])
    flat_items = []
    for o in per_item_outs:
        f, s = tree_flatten(o)
        if s != spec:
            raise MiniError("输出结构在批内不一致")
        flat_items.append(f)
    leaves_per_slot = list(zip(*flat_items))
    if isinstance(out_dims, int):
        out_dims = (out_dims,) * len(leaves_per_slot)
    elif isinstance(out_dims, (tuple, list)):
        if len(out_dims) != len(leaves_per_slot):
            raise MiniError("out_dims 长度与输出叶子数不匹配")
        out_dims = tuple(out_dims)
    else:
        raise MiniError("out_dims 必须是 int 或 tuple/list[int]")
    stacked_leaves = []
    for leaves, od in zip(leaves_per_slot, out_dims):
        if not all(isinstance(t, torch.Tensor) for t in leaves):
            raise MiniError("演示实现仅支持 Tensor 叶子")
        stacked = torch.stack(leaves, dim=0)
        _od = od if od >= 0 else stacked.dim() + od
        if _od != 0:
            perm = [p for p in range(stacked.dim()) if p != 0]
            perm.insert(_od, 0)
            stacked = stacked.permute(perm)
        stacked_leaves.append(stacked)
    return tree_unflatten(stacked_leaves, spec)

def _apply_randomness(mode, i, base_seed):
    if mode == "error":
        return
    elif mode == "same":
        torch.manual_seed(base_seed)
    elif mode == "different":
        torch.manual_seed(base_seed + i)
    else:
        raise MiniError("randomness 仅支持 'error'|'same'|'different'")

def _chunk_slices(total, chunk_size):
    k = chunk_size
    i = 0
    while i < total:
        j = min(i + k, total)
        yield i, j
        i = j

def mini_vmap(func, in_dims=0, out_dims=0, *, randomness="error", chunk_size=None):
    def wrapped(*args, **kwargs):
        flat_args, spec = tree_flatten(args)
        # 1) 解析 in_dims，推断 batch 大小
        flat_in_dims = _normalize_in_dims(in_dims, len(flat_args))
        B = _infer_batch_size(flat_args, flat_in_dims)
        base_seed = int(torch.seed())
        # 2) 分块执行（可选）
        chunks = [(0, B)] if chunk_size is None else list(_chunk_slices(B, int(chunk_size)))
        outs = []
        for (lo, hi) in chunks:
            for i in range(lo, hi):
                _apply_randomness(randomness, i, base_seed)
                # 3) 沿 in_dims 切片、调用 func
                sliced = []
                for a, d in zip(flat_args, flat_in_dims):
                    if d is None:
                        sliced.append(a)
                    else:
                        if not isinstance(a, torch.Tensor):
                            raise MiniError("非 Tensor 不可带 batch 维")
                        sliced.append(_select_along(a, d, i))
                call_args = tree_unflatten(sliced, spec)
                outs.append(func(*call_args, **kwargs))
        # 4) 聚合到 out_dims
        return _stack_outputs(outs, out_dims)
    return wrapped

# 对比官方行为（若数值差异大则抛错）
g = lambda t: (t*t + torch.sin(t)).sum()
X = torch.randn(7, 4)
ans_official = torch.vmap(g)(X)
ans_mine = mini_vmap(g)(X)
print("mini_vmap vs torch.vmap:", torch.allclose(ans_official, ans_mine))


mini_vmap vs torch.vmap: True


## 5. 小测试与练习


In [5]:

def assert_close(x, y, tol=1e-6):
    if not torch.allclose(x, y, atol=tol, rtol=tol):
        raise AssertionError("Mismatch")

# 多输入 + None in_dim
def ds(a, b): return torch.dot(a, b)
A = torch.randn(8, 3); b = torch.randn(3)
assert_close(torch.vmap(ds, in_dims=(0, None))(A, b), mini_vmap(ds, in_dims=(0, None))(A, b))

# 结构化输出（两路输出）
def two_out(x): return (x.sum(), x.mean())
T = torch.randn(4, 10)
o1 = torch.vmap(two_out)(T)
o2 = mini_vmap(two_out)(T)
assert_close(o1[0], o2[0]); assert_close(o1[1], o2[1])

print("mini_vmap 基本测试通过 ✅")


mini_vmap 基本测试通过 ✅
