# 📦 NanoChat 数据处理指南

> **写给小白的话**：这个 Notebook 会手把手教你如何准备训练数据，不需要任何专业背景，跟着运行每个单元格就行！

---

## 📚 目录

1. [核心概念：3 分钟快速理解](#核心概念)
2. [第一阶段：预训练数据](#预训练数据)
3. [第二阶段：中期训练数据](#中期训练数据)
4. [第三阶段：微调数据](#微调数据)
5. [实战：准备中文数据](#准备中文数据)
6. [数据质量检查工具](#数据质量检查)
7. [数据量计算器](#数据量计算器)
8. [完整流程检查清单](#检查清单)

---

## <a id="核心概念"></a>1. 核心概念：3 分钟快速理解

### 训练 AI 需要什么数据？

想象一下教小孩学说话的过程：

```
👶 第一阶段：听大量日常对话 → 学会基本语言能力
👧 第二阶段：学习问答方式 → 懂得对话结构  
👨 第三阶段：学习回答问题 → 能按要求回答
```

训练 AI 模型也是一样的 **三个阶段**：

In [None]:
import pandas as pd

# 创建训练阶段对比表
training_stages = pd.DataFrame({
    '阶段': ['1️⃣', '2️⃣', '3️⃣'],
    '名称': ['预训练 (Pretraining)', '中期训练 (Midtraining)', '微调 (Fine-tuning)'],
    '数据类型': ['海量网页文本', '对话记录', '指令对话对'],
    '学什么': ['语言的基本规律、语法、词汇、常识', '对话的格式、一问一答的结构', '理解和执行指令、做个好助手'],
    '数据量': ['超级大 (几十 GB)', '中等 (几百 MB)', '较小 (几十 MB)']
})

print("\n🎯 AI 训练的三个阶段\n")
display(training_stages)
print("\n" + "="*80)

### 💡 为什么要分三个阶段？

**类比：就像学英语**

- **预训练** = 大量阅读英文书籍（学语法和词汇）
- **中期训练** = 学习英语对话（学怎么交流）
- **微调** = 学习回答面试问题（学特定任务）

如果直接让 AI 学习回答问题而不先学语言，就像让完全不懂英语的人直接参加英语面试，肯定学不好！

---

## <a id="预训练数据"></a>2. 第一阶段：预训练数据

### 用什么数据？

项目默认使用 **FineWeb-Edu** 数据集：

- 📖 来源：**Datawhale/fineweb-edu-100b-shuffle**（ModelScope 平台）
- 🔗 访问地址：[https://modelscope.cn/datasets/Datawhale/fineweb-edu-100b-shuffle](https://modelscope.cn/datasets/Datawhale/fineweb-edu-100b-shuffle)
- 📊 规模：约 1000 亿个单词（是的，1000 亿！）
- ✨ 质量：高质量网页内容，已经过混洗处理
- 🎁 免费：完全开源，直接下载
- 🚀 **国内优势**：从 ModelScope 下载，国内访问速度更快更稳定

### 📊 我需要下载多少数据？

取决于你要训练多大的模型：

In [None]:
# 不同模型规模的数据需求对比表
data_requirements = pd.DataFrame({
    '模型规模': ['d10 (迷你)', 'd12 (小)', 'd20 (默认)', 'd26 (大)', 'd32 (超大)'],
    '参数量': ['42M', '123M', '561M', '1.2B', '2.1B'],
    '需要下载': ['16 个分片', '48 个分片', '215 个分片', '460 个分片', '806 个分片'],
    '磁盘空间': ['~2GB', '~5GB', '~21GB', '~45GB', '~79GB'],
    '训练时间': ['30 分钟', '1-2 小时', '4 小时', '12 小时', '24 小时']
})

print("\n📊 模型规模与数据需求对照表\n")
display(data_requirements)
print("\n💡 新手建议：先用 d10 或 d12 练手，熟悉流程后再训练大模型！")
print("="*80)

### 🚀 如何下载？

**一条命令搞定！** 运行下面的代码单元格：

In [None]:
# 下载 8 个分片用于训练分词器（约 800MB）
# 这是最小下载量，适合快速测试

!python -m nanochat.dataset -n 8

In [None]:
# 如果要训练 d20 模型，需要下载更多数据
# ⚠️ 警告：这会下载约 21GB 数据，需要较长时间！
# 如果不需要，请不要运行这个单元格

# !python -m nanochat.dataset -n 215

### 📁 数据下载到哪了？

所有数据自动保存到 `~/.cache/nanochat/base_data/`

让我们检查一下：

In [None]:
import os
from pathlib import Path

# 获取数据目录
data_dir = Path.home() / ".cache" / "nanochat" / "base_data"

print(f"📁 数据目录: {data_dir}\n")

if data_dir.exists():
    # 统计已下载的文件
    parquet_files = list(data_dir.glob("*.parquet"))
    
    if parquet_files:
        print(f"✅ 找到 {len(parquet_files)} 个数据文件")
        
        # 计算总大小
        total_size = sum(f.stat().st_size for f in parquet_files)
        print(f"💽 总大小: {total_size / (1024**3):.2f} GB")
        
        # 显示前 5 个文件
        print("\n前 5 个文件:")
        for f in sorted(parquet_files)[:5]:
            size_mb = f.stat().st_size / (1024**2)
            print(f"  📄 {f.name:25s} ({size_mb:.1f} MB)")
    else:
        print("⚠️ 数据目录存在，但没有找到 .parquet 文件")
        print("   请先运行上面的下载命令！")
else:
    print("⚠️ 数据目录不存在，请先下载数据！")
    print(f"   运行: python -m nanochat.dataset -n 8")

### 🔍 查看数据内容

让我们打开一个文件看看里面是什么：

In [None]:
import pyarrow.parquet as pq

# 读取第一个分片
data_dir = Path.home() / ".cache" / "nanochat" / "base_data"
parquet_files = list(data_dir.glob("*.parquet")) if data_dir.exists() else []

if parquet_files:
    first_file = sorted(parquet_files)[0]
    print(f"📖 正在读取: {first_file.name}\n")
    
    # 读取 Parquet 文件
    table = pq.read_table(first_file)
    
    print(f"📊 文件信息:")
    print(f"   行数: {len(table):,}")
    print(f"   列名: {table.column_names}")
    
    # 显示前 3 条数据
    print("\n📝 前 3 条数据示例:\n")
    print("=" * 80)
    
    for i in range(min(3, len(table))):
        text = table['text'][i].as_py()
        # 只显示前 200 个字符
        preview = text[:200] + "..." if len(text) > 200 else text
        print(f"\n第 {i+1} 条 (长度: {len(text)} 字符)")
        print("-" * 80)
        print(preview)
    
    print("\n" + "=" * 80)
else:
    print("⚠️ 找不到数据文件，请先下载数据！")

### 💻 数据下载代码详解

数据下载功能由 `nanochat/dataset.py` 实现，让我们看看关键代码：

**核心功能：**
1. **多进程并行下载**：默认使用 4 个进程同时下载
2. **自动重试机制**：下载失败时自动重试，最多 5 次
3. **断点续传**：已下载的文件会自动跳过
4. **临时文件保护**：先下载到临时文件，完成后才重命名，避免中断导致文件损坏

**数据源配置：**
- 默认使用 ModelScope：`Datawhale/fineweb-edu-100b-shuffle`
- 国内访问速度快，无需特殊配置
- 总共 1822 个分片，每个约 100MB


In [None]:
# 查看 dataset.py 的关键代码
# 完整代码在: nanochat/dataset.py

print("📄 数据下载模块核心代码：\n")
print("=" * 80)

code_example = '''
# 数据源配置（已优化为国内源）
BASE_URL = "https://modelscope.cn/api/v1/datasets/Datawhale/fineweb-edu-100b-shuffle/repo?Revision=master&FilePath="
MAX_SHARD = 1822
index_to_filename = lambda index: f"shard_{index:05d}.parquet"

def download_single_file(index):
    """下载单个文件，带重试机制"""
    filename = index_to_filename(index)
    filepath = os.path.join(DATA_DIR, filename)
    
    if os.path.exists(filepath):
        return True  # 已存在，跳过
    
    url = f"{BASE_URL}/{filename}"
    
    # 最多重试 5 次
    for attempt in range(1, 6):
        try:
            response = requests.get(url, stream=True, timeout=30)
            temp_path = filepath + ".tmp"
            with open(temp_path, 'wb') as f:
                for chunk in response.iter_content(chunk_size=1024*1024):
                    f.write(chunk)
            os.rename(temp_path, filepath)  # 原子操作
            return True
        except Exception as e:
            if attempt < 5:
                time.sleep(2 ** attempt)  # 指数退避
            else:
                return False

# 使用多进程并行下载
with Pool(processes=4) as pool:
    results = pool.map(download_single_file, range(num_shards))
'''

print(code_example)
print("\n" + "=" * 80)
print("\n💡 完整代码请查看: nanochat/dataset.py")
print("   关键特性：多进程、自动重试、断点续传、临时文件保护")


---

## <a id="中期训练数据"></a>3. 第二阶段：中期训练数据

### 用什么数据？

项目默认使用 **SmolTalk** 对话数据集：

#### 🌐 数据源信息

- 📖 数据集：`HuggingFaceTB/smoltalk`
- 🏢 平台：HuggingFace
- 🗣️ 内容：真实的人类对话记录
- 📝 格式：一问一答的对话形式
- 🎯 目的：让模型学会对话的格式
- 📥 下载方式：训练脚本自动下载

#### 🇨🇳 国内访问优化

如果下载速度慢，可以设置 HuggingFace 镜像加速：

```bash
export HF_ENDPOINT=https://hf-mirror.com
```

### 数据格式示例

In [None]:
import json

# 对话数据格式示例
dialogue_example = {
    "messages": [
        {
            "role": "user",
            "content": "你好！请介绍一下自己"
        },
        {
            "role": "assistant",
            "content": "你好！我是一个 AI 助手，可以回答问题、提供建议..."
        },
        {
            "role": "user",
            "content": "你会说中文吗？"
        },
        {
            "role": "assistant",
            "content": "是的，我可以使用中文交流。"
        }
    ]
}

print("📝 对话数据格式示例：\n")
print(json.dumps(dialogue_example, ensure_ascii=False, indent=2))

print("\n💡 重要字段说明：")
print("   • role: 说话的角色，'user'(用户) 或 'assistant'(助手)")
print("   • content: 说话的内容")

print("\n✅ 好消息：训练脚本会自动下载 SmolTalk 数据集，无需手动操作！")

---

## <a id="微调数据"></a>4. 第三阶段：微调数据

### 用什么数据？

微调阶段混合使用多个任务数据集：

#### 🌐 数据集列表

In [None]:
# 微调数据集概览
sft_datasets = pd.DataFrame({
    '数据集': ['ARC-Easy', 'ARC-Challenge', 'GSM8K', 'SmolTalk'],
    '内容': ['简单选择题', '困难选择题', '小学数学题', '日常对话'],
    '数量': ['2,300 条', '1,100 条', '8,000 条', '10,000 条'],
    '学什么能力': ['常识推理', '深度推理', '数学计算', '闲聊能力']
})

print("\n🎯 微调阶段的数据集\n")
display(sft_datasets)
print("\n📊 总计：约 21,400 条训练样本")
print("="*80)

#### 🇨🇳 国内访问优化

所有微调数据集来自 HuggingFace，会在训练时自动下载。国内用户建议设置镜像：

```bash
export HF_ENDPOINT=https://hf-mirror.com
```

或在 Python 代码中设置：

```python
import os
os.environ['HF_ENDPOINT'] = 'https://hf-mirror.com'
```

### 数据格式示例

In [None]:
# 数学题示例 (GSM8K)
math_example = {
    "messages": [
        {
            "role": "user",
            "content": "小明有8个苹果，吃掉了3个，还剩几个？"
        },
        {
            "role": "assistant",
            "content": "让我来算一下：\n8 - 3 = 5\n所以小明还剩5个苹果。"
        }
    ]
}

# 选择题示例 (ARC)
arc_example = {
    "messages": [
        {
            "role": "user",
            "content": "哪个物体会浮在水面上？\nA. 石头\nB. 铁钉\nC. 木头\nD. 玻璃球"
        },
        {
            "role": "assistant",
            "content": "答案是C. 木头。因为木头的密度比水小，所以会浮在水面上。"
        }
    ]
}

print("📝 数学题示例 (GSM8K)：\n")
print(json.dumps(math_example, ensure_ascii=False, indent=2))

print("\n" + "="*80 + "\n")

print("📝 选择题示例 (ARC)：\n")
print(json.dumps(arc_example, ensure_ascii=False, indent=2))

print("\n✅ 这些数据集会在运行微调脚本时自动下载！")

---

## <a id="准备中文数据"></a>5. 实战：准备中文数据

> 如果你想训练中文模型，需要准备中文数据。下面是一个完整的示例！

### 方法一：使用 HuggingFace 中文数据集

In [None]:
# 设置镜像（可选）
import os

# 项目已默认使用 ModelScope 下载预训练数据，无需额外设置
# 以下镜像设置仅用于其他 HuggingFace 数据集（如 SmolTalk）
os.environ['HF_ENDPOINT'] = 'https://hf-mirror.com'

print("✅ 已设置 HuggingFace 镜像：https://hf-mirror.com")
print("   这会加速其他 HuggingFace 数据集的下载速度")
print("\n💡 预训练数据默认从 ModelScope 下载，国内访问速度已优化")

In [None]:
# 下载中文维基百科数据（示例）
# ⚠️ 警告：这会下载较大的数据集，需要时间！
# 如果不需要，请不要运行这个单元格

from datasets import load_dataset

print("📥 正在下载中文维基百科（前 1000 条用于演示）...\n")

try:
    # 只下载前 1000 条用于演示
    wiki = load_dataset(
        "wikipedia",
        "20220301.zh",  # 中文版本
        split="train[:1000]",  # 只取前 1000 条
        trust_remote_code=True
    )
    
    print(f"✅ 成功下载：{len(wiki):,} 条数据\n")
    
    # 显示第一条
    print("📝 第一条数据示例：")
    print("="*80)
    first_text = wiki[0]['text'][:300] + "..."
    print(first_text)
    print("="*80)
    
except Exception as e:
    print(f"❌ 下载失败：{e}")
    print("   可能需要检查网络连接或尝试使用镜像")

### 方法二：转换自己的文本数据

如果你有自己收集的中文文本，可以使用项目提供的转换工具：

In [None]:
# 使用内置工具转换自定义数据
# 详细说明请查看 data_check/convert_custom_data.py

print("🛠️ 转换自定义文本数据的步骤：\n")
print("1. 准备你的文本数据（.txt 文件）")
print("2. 运行转换命令：")
print("   python -m data_check.convert_custom_data")
print("\n支持的输入格式：")
print("   • 单个文本文件（每行一条数据）")
print("   • 目录（包含多个 .txt 文件）")
print("\n详细代码请查看：data_check/convert_custom_data.py")

---

## <a id="数据质量检查"></a>6. 数据质量检查工具

项目提供了完整的数据检查工具集：

In [None]:
# 数据检查工具概览
tools = pd.DataFrame({
    '工具': [
        'check_data.py',
        'check_length_distribution.py',
        'check_content_quality.py',
        'check_char_distribution.py',
        'convert_custom_data.py'
    ],
    '用途': [
        '验证数据文件完整性',
        '检查文本长度分布',
        '抽样检查内容质量',
        '检查字符分布统计',
        '转换自定义文本数据'
    ],
    '命令': [
        'python -m data_check.check_data',
        'python -m data_check.check_length_distribution',
        'python -m data_check.check_content_quality',
        'python -m data_check.check_char_distribution',
        'python -m data_check.convert_custom_data'
    ]
})

print("\n🛠️ 数据检查工具总览\n")
display(tools)
print("\n💡 所有工具的详细代码都在 data_check/ 目录下")
print("="*80)

### 快速检查数据完整性

In [None]:
# 运行数据完整性检查
!python -m data_check.check_data

### 检查文本长度分布

In [None]:
# 分析数据的长度分布
# 这有助于了解数据质量

!python -m data_check.check_length_distribution

---

## <a id="数据量计算器"></a>7. 数据量计算器

### Chinchilla 定律

**数据 token 数 = 模型参数量 × 20**

让我们计算不同模型需要多少数据：

In [None]:
# 可视化数据量对比
import matplotlib.pyplot as plt
import numpy as np

# 设置中文字体（如果有的话）
plt.rcParams['font.sans-serif'] = ['Arial Unicode MS', 'SimHei', 'DejaVu Sans']
plt.rcParams['axes.unicode_minus'] = False

# 准备数据
models_list = [r['模型'] for r in results]
params_list = [float(r['参数量'].replace('M', '')) for r in results]
tokens_list = [float(r['Token数'].replace('B', '')) for r in results]
disk_list = [float(r['磁盘'].replace('GB', '')) for r in results]
shards_list = [r['分片数'] for r in results]

# 创建多子图
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
fig.suptitle('📊 不同模型规模的数据需求对比', fontsize=16, fontweight='bold')

# 1. 参数量 vs Token数
ax1 = axes[0, 0]
ax1.plot(params_list, tokens_list, 'o-', linewidth=2, markersize=8, color='#4CAF50')
ax1.set_xlabel('模型参数量 (M)', fontsize=12)
ax1.set_ylabel('需要的 Token 数 (B)', fontsize=12)
ax1.set_title('参数量 vs Token数（Chinchilla 定律）', fontsize=13, fontweight='bold')
ax1.grid(True, alpha=0.3)
for i, model in enumerate(models_list):
    ax1.annotate(model, (params_list[i], tokens_list[i]), 
                textcoords="offset points", xytext=(0,10), ha='center', fontsize=10)

# 2. Token数 vs 磁盘空间
ax2 = axes[0, 1]
ax2.plot(tokens_list, disk_list, 's-', linewidth=2, markersize=8, color='#2196F3')
ax2.set_xlabel('Token 数 (B)', fontsize=12)
ax2.set_ylabel('磁盘空间 (GB)', fontsize=12)
ax2.set_title('Token数 vs 磁盘空间', fontsize=13, fontweight='bold')
ax2.grid(True, alpha=0.3)
for i, model in enumerate(models_list):
    ax2.annotate(model, (tokens_list[i], disk_list[i]), 
                textcoords="offset points", xytext=(0,10), ha='center', fontsize=10)

# 3. 分片数对比（柱状图）
ax3 = axes[1, 0]
bars = ax3.bar(models_list, shards_list, color=['#FF9800', '#F44336', '#9C27B0', '#00BCD4', '#4CAF50'], alpha=0.7)
ax3.set_xlabel('模型规模', fontsize=12)
ax3.set_ylabel('分片数', fontsize=12)
ax3.set_title('不同模型需要的分片数', fontsize=13, fontweight='bold')
ax3.grid(True, alpha=0.3, axis='y')
# 在柱状图上添加数值标签
for bar, shard in zip(bars, shards_list):
    height = bar.get_height()
    ax3.text(bar.get_x() + bar.get_width()/2., height,
             f'{shard}',
             ha='center', va='bottom', fontsize=10, fontweight='bold')

# 4. 磁盘空间对比（柱状图）
ax4 = axes[1, 1]
bars2 = ax4.bar(models_list, disk_list, color=['#FF9800', '#F44336', '#9C27B0', '#00BCD4', '#4CAF50'], alpha=0.7)
ax4.set_xlabel('模型规模', fontsize=12)
ax4.set_ylabel('磁盘空间 (GB)', fontsize=12)
ax4.set_title('不同模型需要的磁盘空间', fontsize=13, fontweight='bold')
ax4.grid(True, alpha=0.3, axis='y')
# 在柱状图上添加数值标签
for bar, disk in zip(bars2, disk_list):
    height = bar.get_height()
    ax4.text(bar.get_x() + bar.get_width()/2., height,
             f'{disk:.1f}GB',
             ha='center', va='bottom', fontsize=10, fontweight='bold')

plt.tight_layout()
plt.show()

print("\n✅ 可视化图表已生成！")


In [None]:
def calculate_data_requirement(model_params_million):
    """
    计算训练所需的数据量
    
    参数:
        model_params_million: 模型参数量(百万)，如123表示123M参数
    
    返回:
        字典，包含各种数据量信息
    """
    
    # 1. 需要的 token 数（参数量 × 20）
    tokens_billion = model_params_million / 1000 * 20
    
    # 2. 需要的字符数（1 token ≈ 4.8 字符）
    chars_billion = tokens_billion * 4.8
    
    # 3. 需要的分片数（每个分片 250M 字符）
    num_shards = int(chars_billion * 1000 / 250)
    
    # 4. 磁盘空间（每个分片约 100MB）
    disk_gb = num_shards * 100 / 1024
    
    return {
        'model_params': f"{model_params_million}M",
        'tokens': f"{tokens_billion:.1f}B",
        'chars': f"{chars_billion:.0f}B",
        'shards': num_shards,
        'disk': f"{disk_gb:.1f}GB"
    }

# 不同规模模型
models = {
    'd10': 42,
    'd12': 123,
    'd20': 561,
    'd26': 1200,
    'd32': 2100
}

results = []
for name, params in models.items():
    req = calculate_data_requirement(params)
    results.append({
        '模型': name,
        '参数量': req['model_params'],
        'Token数': req['tokens'],
        '字符数': req['chars'],
        '分片数': req['shards'],
        '磁盘': req['disk']
    })

df_results = pd.DataFrame(results)

print("\n📊 模型数据需求计算表\n")
display(df_results)
print("\n💡 提示：数据量基于 Chinchilla 定律计算（参数量 × 20）")

### 自定义计算

输入你的模型参数量，计算需要多少数据：

In [None]:
# 分词器训练代码 (scripts/tok_train.py)
# 完整代码请查看: scripts/tok_train.py

print("📄 分词器训练核心代码：\n")
print("=" * 80)

code_tok_train = '''
"""
训练 BPE 分词器
"""
import argparse
from nanochat.tokenizer import RustBPETokenizer
from nanochat.dataset import parquets_iter_batched

# 解析参数
parser = argparse.ArgumentParser(description='Train a BPE tokenizer')
parser.add_argument('--max_chars', type=int, default=2_000_000_000, 
                    help='最大训练字符数（默认20亿）')
parser.add_argument('--vocab_size', type=int, default=65536, 
                    help='词汇表大小（默认65536=2^16）')
args = parser.parse_args()

# 文本迭代器：从数据中读取文本
def text_iterator():
    """从训练数据中迭代读取文本"""
    nchars = 0
    for batch in parquets_iter_batched(split="train"):
        for doc in batch:
            # 限制每个文档的最大长度
            doc_text = doc[:10000] if len(doc) > 10000 else doc
            nchars += len(doc_text)
            yield doc_text
            if nchars > args.max_chars:
                return

# 训练分词器
text_iter = text_iterator()
tokenizer = RustBPETokenizer.train_from_iterator(text_iter, args.vocab_size)

# 保存分词器
tokenizer_dir = os.path.join(base_dir, "tokenizer")
tokenizer.save(tokenizer_dir)

print(f"✅ 分词器训练完成！")
print(f"   保存位置: {tokenizer_dir}")
print(f"   词汇表大小: {tokenizer.get_vocab_size():,}")
'''

print(code_tok_train)
print("\n" + "=" * 80)
print("\n💡 运行命令: python -m scripts.tok_train --max_chars=2000000000")
print("   这会使用前 20 亿字符训练一个 65536 词汇的 BPE 分词器")


### 2. 数据加载器代码 (`nanochat/dataloader.py`)

数据加载器负责将文本转换为 token 序列，并支持分布式训练：


In [None]:
# 数据加载器核心逻辑（简化版）
# 完整代码请查看: nanochat/dataloader.py

print("📄 数据加载器核心逻辑：\n")
print("=" * 80)

code_dataloader = '''
def tokenizing_distributed_data_loader(batch_size, seq_len, split):
    """
    即时分词的数据加载器（支持分布式）
    
    参数:
        batch_size: 每个 GPU 的批次大小
        seq_len: 序列长度（上下文窗口）
        split: "train" 或 "val"
    """
    tokenizer = get_tokenizer()
    
    while True:
        # 1. 从磁盘读取一个 parquet 分片
        for batch in parquets_iter_batched(split=split):
            # 2. 遍历分片中的文档
            for doc in batch:
                text = doc['text']
                
                # 3. 使用分词器转换文本为 token IDs
                tokens = tokenizer.encode(text)
                
                # 4. 切分成固定长度的序列
                for i in range(0, len(tokens) - seq_len, seq_len):
                    inputs = tokens[i:i+seq_len]
                    targets = tokens[i+1:i+seq_len+1]  # 目标是输入向右偏移1位
                    
                    # 5. 组装成批次
                    batch_inputs.append(inputs)
                    batch_targets.append(targets)
                    
                    if len(batch_inputs) == batch_size:
                        yield (torch.tensor(batch_inputs), 
                               torch.tensor(batch_targets))
                        batch_inputs = []
                        batch_targets = []
'''

print(code_dataloader)
print("\n" + "=" * 80)
print("\n💡 关键特性：")
print("   • 即时分词：不需要预先分词，节省磁盘空间")
print("   • 流式加载：只加载当前需要的数据，节省内存")
print("   • 分布式支持：每个 GPU 自动划分数据")
print("   • 序列打包：连续拼接文档，最大化 GPU 利用率")


### 3. 数据检查工具代码

项目提供了完整的数据检查工具集，位于 `data_check/` 目录：


In [None]:
# 数据检查工具代码示例
# 完整代码请查看: data_check/*.py

print("📄 数据检查工具代码示例：\n")
print("=" * 80)

code_check_data = '''
# 1. 检查数据完整性 (data_check/check_data.py)
import pyarrow.parquet as pq
import glob

def check_data_integrity(data_dir):
    """检查所有 Parquet 文件的完整性"""
    files = sorted(glob.glob(f"{data_dir}/*.parquet"))
    
    broken = []
    total_rows = 0
    
    for filepath in files:
        try:
            table = pq.read_table(filepath)
            rows = len(table)
            total_rows += rows
            
            if rows == 0:
                broken.append((filepath, "空文件"))
            else:
                print(f"✅ {os.path.basename(filepath)}: {rows:,} 条")
        except Exception as e:
            print(f"❌ {os.path.basename(filepath)}: 损坏")
            broken.append((filepath, str(e)))
    
    return len(broken) == 0

# 2. 检查长度分布 (data_check/check_length_distribution.py)
def check_length_distribution(data_path):
    """分析文本长度分布"""
    table = pq.read_table(data_path)
    texts = table['text'].to_pylist()
    
    lengths = [len(text) for text in texts]
    avg_length = sum(lengths) / len(lengths)
    
    print(f"平均长度: {avg_length:.0f} 字符")
    print(f"最短: {min(lengths)} 字符")
    print(f"最长: {max(lengths)} 字符")
    
    # 分桶统计
    buckets = {"< 50": 0, "50-100": 0, "100-500": 0, "500-1000": 0, "> 1000": 0}
    for length in lengths:
        if length < 50:
            buckets["< 50"] += 1
        elif length < 100:
            buckets["50-100"] += 1
        # ... 其他分桶
    
    return buckets

# 3. 转换自定义数据 (data_check/convert_custom_data.py)
def convert_to_parquet(texts, output_dir, shard_size=100):
    """将文本列表转换为 Parquet 格式"""
    import pyarrow as pa
    
    num_shards = (len(texts) + shard_size - 1) // shard_size
    
    for i in range(num_shards):
        start = i * shard_size
        end = min(start + shard_size, len(texts))
        shard_texts = texts[start:end]
        
        table = pa.Table.from_pydict({'text': shard_texts})
        output_path = f"{output_dir}/shard_{i:05d}.parquet"
        
        pq.write_table(
            table,
            output_path,
            row_group_size=1024,
            compression='zstd',
            compression_level=3
        )
'''

print(code_check_data)
print("\n" + "=" * 80)
print("\n💡 使用方法：")
print("   • python -m data_check.check_data              # 检查数据完整性")
print("   • python -m data_check.check_length_distribution  # 检查长度分布")
print("   • python -m data_check.convert_custom_data      # 转换自定义数据")


### 4. 训练脚本关键参数 (`scripts/base_train.py`)

预训练脚本的关键配置和代码逻辑：


In [None]:
# 训练脚本关键配置（scripts/base_train.py）
# 完整代码请查看: scripts/base_train.py

print("📄 预训练脚本关键配置：\n")
print("=" * 80)

code_base_train = '''
# 用户配置
depth = 20                    # Transformer 深度
max_seq_len = 2048            # 最大上下文长度
device_batch_size = 32        # 每个 GPU 的批次大小
total_batch_size = 524288     # 总批次大小（token 数）

# 训练长度（三选一）
num_iterations = -1          # 明确的迭代次数（-1 = 禁用）
target_flops = -1.0           # 目标 FLOPs（-1 = 禁用）
target_param_data_ratio = 20 # Chinchilla 定律：数据token数 = 参数量 × 20

# 优化器配置
embedding_lr = 0.2            # 嵌入层学习率（Adam）
unembedding_lr = 0.004        # 输出层学习率（Adam）
matrix_lr = 0.02              # 矩阵参数学习率（Muon）
grad_clip = 1.0               # 梯度裁剪

# 评估配置
eval_every = 250              # 每 250 步评估一次验证集 loss
core_metric_every = 2000      # 每 2000 步评估一次 CORE 指标
sample_every = 2000           # 每 2000 步采样一次

# 训练循环核心逻辑
for step in range(num_iterations):
    # 1. 前向传播
    loss = model(x, y)
    
    # 2. 反向传播
    loss.backward()
    
    # 3. 梯度裁剪
    if grad_clip > 0.0:
        torch.nn.utils.clip_grad_norm_(model.parameters(), grad_clip)
    
    # 4. 更新学习率
    lrm = get_lr_multiplier(step)
    for opt in optimizers:
        for group in opt.param_groups:
            group["lr"] = group["initial_lr"] * lrm
    
    # 5. 优化器步进
    for opt in optimizers:
        opt.step()
    
    # 6. 评估（按需）
    if step % eval_every == 0:
        val_bpb = evaluate_bpb(model, val_loader, eval_steps)
        print(f"Step {step} | Validation bpb: {val_bpb:.4f}")
    
    # 7. 保存检查点（按需）
    if step % checkpoint_every == 0:
        save_checkpoint(checkpoint_dir, step, model.state_dict())
'''

print(code_base_train)
print("\n" + "=" * 80)
print("\n💡 运行命令: torchrun --standalone --nproc_per_node=8 -m scripts.base_train --depth=20")
print("   关键参数说明：")
print("   • depth: 模型深度，决定参数量")
print("   • target_param_data_ratio: Chinchilla 定律比例（默认 20）")
print("   • device_batch_size: 每个 GPU 的批次大小")
print("   • total_batch_size: 所有 GPU 的总批次大小（token 数）")


### 5. 分词器评估代码 (`scripts/tok_eval.py`)

评估分词器的压缩率，并与 GPT-2/GPT-4 分词器对比：


In [None]:
# 分词器评估代码（scripts/tok_eval.py）
# 完整代码请查看: scripts/tok_eval.py

print("📄 分词器评估代码示例：\n")
print("=" * 80)

code_tok_eval = '''
# 评估分词器的压缩率
from nanochat.tokenizer import get_tokenizer, RustBPETokenizer

# 测试文本（新闻、代码、数学等不同类型）
test_texts = {
    "news": "Yesterday, Mexico's National Service reported...",
    "code": "def train(self, text, vocab_size):\\n    ...",
    "math": "\\sum_{k=1}^{n} k^{3} = \\left(\\frac{n(n+1)}{2}\\right)^{2}",
}

# 对比不同分词器
tokenizers = {
    "GPT-2": RustBPETokenizer.from_pretrained("gpt2"),
    "GPT-4": RustBPETokenizer.from_pretrained("cl100k_base"),
    "Ours": get_tokenizer(),
}

results = {}

for name, tokenizer in tokenizers.items():
    results[name] = {}
    for text_type, text in test_texts.items():
        # 编码文本
        encoded = tokenizer.encode(text)
        
        # 计算压缩率（字节数 / token数）
        encoded_bytes = len(text.encode('utf-8'))
        ratio = encoded_bytes / len(encoded)
        
        results[name][text_type] = {
            'bytes': encoded_bytes,
            'tokens': len(encoded),
            'ratio': ratio
        }

# 打印对比结果
print("分词器对比结果：")
print("=" * 80)
for text_type in test_texts.keys():
    print(f"\\n{text_type}:")
    for name in tokenizers.keys():
        data = results[name][text_type]
        print(f"  {name:8s}: {data['tokens']:4d} tokens, "
              f"ratio: {data['ratio']:.2f}")
'''

print(code_tok_eval)
print("\n" + "=" * 80)
print("\n💡 运行命令: python -m scripts.tok_eval")
print("   这会评估分词器在不同类型文本上的压缩率")
print("   并与 GPT-2、GPT-4 的分词器进行对比")


In [None]:
# 自定义模型参数量（单位：百万）
my_model_params = 100  # 修改这里！

result = calculate_data_requirement(my_model_params)

print(f"\n🎯 您的模型（{my_model_params}M 参数）需要：\n")
print(f"   Token 数量：{result['tokens']}")
print(f"   字符数量：{result['chars']}")
print(f"   数据分片：{result['shards']} 个")
print(f"   磁盘空间：{result['disk']}")
print("\n下载命令：")
print(f"   python -m nanochat.dataset -n {result['shards']}")

---

## <a id="检查清单"></a>8. 完整流程检查清单

准备好数据了吗？对照这个清单检查：

In [None]:
import shutil

def check_data_readiness():
    """检查数据准备情况"""
    
    print("\n🔍 数据准备状态检查\n")
    print("="*80)
    
    checks = []
    
    # 1. 检查预训练数据
    base_data_dir = Path.home() / ".cache" / "nanochat" / "base_data"
    if base_data_dir.exists():
        parquet_files = list(base_data_dir.glob("*.parquet"))
        if len(parquet_files) >= 8:
            checks.append(("✅", f"预训练数据：找到 {len(parquet_files)} 个分片"))
        else:
            checks.append(("⚠️", f"预训练数据：只有 {len(parquet_files)} 个分片（建议至少 8 个）"))
    else:
        checks.append(("❌", "预训练数据：未下载"))
    
    # 2. 检查分词器
    tokenizer_dir = Path.home() / ".cache" / "nanochat" / "tokenizer"
    if tokenizer_dir.exists() and list(tokenizer_dir.glob("*.model")):
        checks.append(("✅", "分词器：已训练"))
    else:
        checks.append(("⚠️", "分词器：未训练（需要运行 tok_train）"))
    
    # 3. 检查磁盘空间
    cache_dir = Path.home() / ".cache"
    if cache_dir.exists():
        try:
            stat = shutil.disk_usage(cache_dir)
            free_gb = stat.free / (1024**3)
            if free_gb > 30:
                checks.append(("✅", f"磁盘空间：剩余 {free_gb:.1f} GB"))
            else:
                checks.append(("⚠️", f"磁盘空间：剩余 {free_gb:.1f} GB（建议至少 30GB）"))
        except:
            checks.append(("ℹ️", "磁盘空间：无法检测"))
    
    # 4. 检查环境变量
    if 'HF_ENDPOINT' in os.environ:
        checks.append(("✅", f"HuggingFace 镜像：{os.environ['HF_ENDPOINT']}"))
    else:
        checks.append(("ℹ️", "HuggingFace 镜像：未设置（国内用户建议设置）"))
    
    # 显示结果
    for status, msg in checks:
        print(f"{status} {msg}")
    
    print("="*80)
    
    # 总结
    ready_count = sum(1 for s, _ in checks if s == "✅")
    total_count = len(checks)
    
    print(f"\n📊 就绪状态：{ready_count}/{total_count}")
    
    if ready_count >= 2:  # 至少有数据和空间就算基本就绪
        print("\n🎉 数据基本准备完成，可以开始训练了！")
    else:
        print("\n💡 还有一些准备工作需要完成，请查看上面的提示")

# 运行检查
check_data_readiness()

---

## 🚀 下一步

数据准备好了！接下来：

### 1. 训练分词器

In [None]:
# 训练分词器
# ⚠️ 警告：这可能需要较长时间！

# !python -m scripts.tok_train --max_chars=2000000000

### 2. 开始预训练

In [None]:
# 开始预训练（需要 GPU）
# ⚠️ 警告：这需要大量时间和计算资源！

# !torchrun --standalone --nproc_per_node=8 -m scripts.base_train --depth=20