trainer_reason.ipynb
---------------------------------
MicroCortex 语言模型思维链训练脚本（带详细中文注释）。
本脚本演示了如何使用 PyTorch + Transformers 来思维链微调一个自定义的
MicroCortexCausalLanguageModel，并支持分布式数据并行（DDP）训练、
梯度累积、自动混合精度 (AMP)、学习率余弦退火以及按间隔保存检查点。

一、导入相关包

In [None]:
import os
import sys

__package__ = "trainer"
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))

import argparse
import time
import math
import warnings
import torch
import torch.distributed as dist
from contextlib import nullcontext
from torch import optim, nn
from torch.nn.parallel import DistributedDataParallel
from torch.utils.data import DataLoader, DistributedSampler
from transformers import AutoTokenizer, AutoModelForCausalLM
from model.model_minimind import MiniMindConfig, MiniMindForCausalLM
from dataset.lm_dataset import SFTDataset

warnings.filterwarnings('ignore')

二、相关工具函数，和train_pretrain相同

In [None]:
#日志打印函数，和train_pretrain相同，只有主进程打印
def Logger(content):
    if not ddp or dist.get_rank() == 0:
        print(content)

#余弦退火学习率，和train_pretrain相同
def get_lr(current_step, total_steps, lr):
    return lr / 10 + 0.5 * lr * (1 + math.cos(math.pi * current_step / total_steps))

三、单个epoch的训练逻辑

模型与tokenizer的初始化，这里我们加载之前强化学习模型的参数

In [None]:
def init_model(lm_config):
    # 初始化tokenizer
    tokenizer = AutoTokenizer.from_pretrained('../model')
    #初始化模型
    model = MicroCortexForCausalLM(lm_config)

    moe_path = '_moe' if lm_config.use_moe else ''
    # 模型名
    ckp = f'{args.save_dir}/rlhf_{lm_config.hidden_size}{moe_path}.pth'
    # 加载模型参数
    state_dict = torch.load(ckp, map_location=args.device)
    # 加载模型参数到模型
    model.load_state_dict(state_dict, strict=False)

    Logger(f'LLM可训练总参数量：{sum(p.numel() for p in model.parameters() if p.requires_grad) / 1e6:.3f} 百万')
    model = model.to(args.device)
    return model, tokenizer

分布式初始化，和train_pretrain一样

In [None]:
def init_distributed_mode():
    if not ddp: return
    global ddp_local_rank, DEVICE

    dist.init_process_group(backend="nccl")
    ddp_rank = int(os.environ["RANK"])
    ddp_local_rank = int(os.environ["LOCAL_RANK"])
    ddp_world_size = int(os.environ["WORLD_SIZE"])
    DEVICE = f"cuda:{ddp_local_rank}"
    torch.cuda.set_device(DEVICE)

单个epoch的训练，这里需要注意，因为训练数据中有思考标签，需要将思考标签的loss放大10倍

In [None]:
def train_epoch(epoch, wandb):
    # 思考标签token_id
    start_of_think_ids = tokenizer('<think>').input_ids
    end_of_think_ids = tokenizer('</think>').input_ids
    start_of_answer_ids = tokenizer('<answer>').input_ids
    end_of_answer_ids = tokenizer('</answer>').input_ids
    # 使用 token‑level 交叉熵，和train_pretrain相同
    loss_fct = nn.CrossEntropyLoss(reduction='none')
    start_time = time.time()
    for step, (X, Y, loss_mask) in enumerate(train_loader):
        # 将 batch 数据搬到指定设备
        X = X.to(args.device)
        Y = Y.to(args.device)
        loss_mask = loss_mask.to(args.device)
        # 计算并设置学习率（step 级别）
        lr = get_lr(epoch * iter_per_epoch + step, args.epochs * iter_per_epoch, args.learning_rate)
        '''
        optimizer.param_groups 是一个包含了所有模型参数组及其优化超参数（如学习率、动量等）的列表。
        [
          {
            'params': [...],         # 这是模型的一部分参数（可以是一个列表，也可以是单个参数）
            'lr': 0.001,             # 学习率
            'weight_decay': 0.0005,  # 权重衰减
            'momentum': 0.9,         # 如果是 SGD 可能会有
            ...                      # 还有其他优化器相关的超参数
          },
          {
            'params': [...],         # 这是模型的一部分参数（可以是一个列表，也可以是单个参数）
            'lr': 0.001,             # 学习率
            'weight_decay': 0.0005,  # 权重衰减
            'momentum': 0.9,         # 如果是 SGD 可能会有
            ...                      # 还有其他优化器相关的超参数
           },
           ...
        ]
        '''
        for param_group in optimizer.param_groups:
            param_group['lr'] = lr

        # 自动混合精度上下文(AMP Autocast)（在 CPU 时为 nullcontext 空操作）
        # 自动混合精度训练会把反向传播前的loss乘以放大因子s，来避免(FP16/BP16)下的下溢
        with ctx:
            res = model(X)# 前向传播
            # 交叉熵按 token 计算，随后与 mask 相乘，只统计非 padding 区域
            # res.logits [batch, token, vocab_size] - >[batch*token, vocab_size]
            # Y [batch, token] -> [batch*token]
            loss = loss_fct(
                res.logits.view(-1, res.logits.size(-1)),
                Y.view(-1)
            ).view(Y.size())# 再reshape回[batch, token]

            # sp_ids bool类型的[batch*token]，对应Y.view(-1)，对应位置为True，表示Y中对应位置是思维标签
            sp_ids = torch.isin(Y.view(-1),#[batch*token]
                                torch.tensor(start_of_think_ids + end_of_think_ids
                                             + start_of_answer_ids + end_of_answer_ids
                                             ).to(args.device))#思维token_id列表
            #！这里和train_pretrain不同：在 sp_ids 对应的位置增加额外的惩罚，loss_mask原本是0/1，
            loss_mask = loss_mask.view(-1)#[batch, token] -> [batch*token]
            # 有效 token 的个数
            loss_mask_sum = loss_mask.sum()
            #！这里和train_pretrain不同：思维标签位置权重放大10倍
            loss_mask[sp_ids] = 10
            # mask + 交叉熵求平均
            loss_mask = loss_mask.view(Y.size())
            loss = (loss * loss_mask).sum() / loss_mask_sum
            # 加上模型可能返回的额外正则项（如 MoE loss）
            loss += res.aux_loss
            # 梯度累积：先除以累计步数
            loss = loss / args.accumulation_steps

        # 和train_pretrain相同
        scaler.scale(loss).backward()

        if (step + 1) % args.accumulation_steps == 0:
            scaler.unscale_(optimizer)
            torch.nn.utils.clip_grad_norm_(model.parameters(), args.grad_clip)

            scaler.step(optimizer)
            scaler.update()

            optimizer.zero_grad(set_to_none=True)

        if step % args.log_interval == 0:
            spend_time = time.time() - start_time
            Logger(
                'Epoch:[{}/{}]({}/{}) loss:{:.3f} lr:{:.12f} epoch_Time:{}min:'.format(
                    epoch + 1,
                    args.epochs,
                    step,
                    iter_per_epoch,
                    loss.item() * args.accumulation_steps,
                    optimizer.param_groups[-1]['lr'],
                    spend_time / (step + 1) * iter_per_epoch // 60 - spend_time // 60))

            if (wandb is not None) and (not ddp or dist.get_rank() == 0):
                wandb.log({"loss": loss * args.accumulation_steps,
                           "lr": optimizer.param_groups[-1]['lr'],
                           "epoch_Time": spend_time / (step + 1) * iter_per_epoch // 60 - spend_time // 60})

        if (step + 1) % args.save_interval == 0 and (not ddp or dist.get_rank() == 0):
            model.eval()
            moe_path = '_moe' if lm_config.use_moe else ''
            ckp = f'{args.save_dir}/reason_{lm_config.hidden_size}{moe_path}.pth'

            if isinstance(model, torch.nn.parallel.DistributedDataParallel):
                state_dict = model.module.state_dict()
            else:
                state_dict = model.state_dict()

            state_dict = {k: v.half() for k, v in state_dict.items()}  # 半精度保存
            torch.save(state_dict, ckp)
            model.train()

四、主函数入口
启动示例（双卡）：
torchrun –nporc_per_node 2 1-pretrain.py，和train_pretrain一样

In [None]:
if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="MicroCortex Distill Reasoning")
    parser.add_argument("--out_dir", type=str, default="../out")
    parser.add_argument("--epochs", type=int, default=6)
    parser.add_argument("--batch_size", type=int, default=32)
    parser.add_argument("--learning_rate", type=float, default=5e-7)
    parser.add_argument("--device", type=str, default="cuda:0" if torch.cuda.is_available() else "cpu")
    parser.add_argument("--dtype", type=str, default="bfloat16")
    parser.add_argument("--use_wandb", action="store_true")
    parser.add_argument("--wandb_project", type=str, default="MicroCortex-Full-SFT")
    parser.add_argument("--num_workers", type=int, default=1)
    parser.add_argument("--ddp", action="store_true")
    parser.add_argument("--accumulation_steps", type=int, default=1)
    parser.add_argument("--grad_clip", type=float, default=1.0)
    parser.add_argument("--warmup_iters", type=int, default=0)
    parser.add_argument("--log_interval", type=int, default=100)
    parser.add_argument("--save_interval", type=int, default=100)
    parser.add_argument('--local_rank', type=int, default=-1)
    parser.add_argument('--hidden_size', default=512, type=int)
    parser.add_argument('--num_hidden_layers', default=8, type=int)
    parser.add_argument('--max_seq_len', default=1024, type=int)
    parser.add_argument('--use_moe', default=True, type=bool)
    parser.add_argument("--data_path", type=str, default="../dataset/r1_mix_1024.jsonl")
    args = parser.parse_args()

    ####################
    #模型配置
    ####################
    lm_config = MicroCortexConfig(hidden_size=args.hidden_size, num_hidden_layers=args.num_hidden_layers,use_moe=args.use_moe)
    # 创建输出目录
    args.save_dir = os.path.join(args.out_dir)
    os.makedirs(args.save_dir, exist_ok=True)
    os.makedirs(args.out_dir, exist_ok=True)
    tokens_per_iter = args.batch_size * args.max_seq_len
    device_type = "cuda" if "cuda" in args.device else "cpu"

    # wandb run 名称 = 超参组合
    args.wandb_run_name = f"MiniMind-Full-SFT-Epoch-{args.epochs}-BatchSize-{args.batch_size}-LearningRate-{args.learning_rate}"

    # AMP 上下文：CPU 下为 no‑op
    ctx = nullcontext() if device_type == "cpu" else torch.cuda.amp.autocast()

    # 判断是否为 ddp 进程（RANK 环境变量由 torchrun 注入）
    ddp = int(os.environ.get("RANK", -1)) != -1  # is this a ddp run?
    ddp_local_rank, DEVICE = 0, "cuda:0"

    ####################
    #随机种子
    ####################
    base_seed = 1337
    torch.manual_seed(base_seed)
    torch.cuda.manual_seed(base_seed)

    # 若为 DDP，需要调用初始化并根据 rank 调整种子/设备
    if ddp:
        init_distributed_mode()
        args.device = torch.device(DEVICE)
        rank = dist.get_rank()
        torch.manual_seed(base_seed + rank)
        # 同时设置 CUDA 的随机种子
        torch.cuda.manual_seed(base_seed + rank)

    ####################
    #wandb初始化
    ####################
    if args.use_wandb and (not ddp or ddp_local_rank == 0):
        import wandb

        # 初始化wandb的项目和训练名
        wandb.init(project=args.wandb_project, name=args.wandb_run_name)
    else:
        wandb = None

    ####################
    #模型、数据集初始化
    ####################
    model, tokenizer = init_model(lm_config)

    train_ds = SFTDataset(args.data_path, tokenizer, max_length=args.max_seq_len)
    # 若为分布式，使用 DistributedSampler 保证各进程拿到不同切片
    train_sampler = DistributedSampler(train_ds) if ddp else None
    train_loader = DataLoader(
        train_ds,
        batch_size=args.batch_size,
        pin_memory=True,
        drop_last=False,
        shuffle=False,
        num_workers=args.num_workers,
        sampler=train_sampler
    )

    # AMP 梯度缩放器
    scaler = torch.cuda.amp.GradScaler(enabled=(args.dtype in ['float16', 'bfloat16']))
    # AdamW 优化器
    optimizer = optim.AdamW(model.parameters(), lr=args.learning_rate)

    # 若启用 DDP，包裹模型；排除不需要同步的 pos_cis（静态预计算矩阵），但是其实原作者这里有错误，它的旋转矩阵变量注册名并不是pos_cis
    if ddp:
        model._ddp_params_and_buffers_to_ignore = {"pos_cis"}
        model = DistributedDataParallel(model, device_ids=[ddp_local_rank])
    # 每 epoch 的 step 数
    iter_per_epoch = len(train_loader)

    ####################
    #开始训练
    ####################
    for epoch in range(args.epochs):
        train_epoch(epoch, wandb)