# 自然语言推断与数据集
## 自然语言推断
## 斯坦福自然语言推断（SNLI）数据集

## 报错
### 数据集下载失败
手动下载数据集，由于斯坦福SNLI数据集有macOS文件，在windows平台无法运行，手动在压缩包文件删除（Icon文件），再解压。然后手动更改data_dir路径

In [1]:
import re
import torch
from torch import nn
from d2l import torch as d2l


#@save
# d2l.DATA_HUB['SNLI'] = (
#     'https://nlp.stanford.edu/projects/snli/snli_1.0.zip',
#     '9fcde07509c7e87ec61c640c1b2753d9041758e4')
# d2l.DATA_HUB['SNLI'] = (
#     'https://mirrors.tuna.tsinghua.edu.cn/snli/snli_1.0.zip',
#     '9fcde07509c7e87ec61c640c1b2753d9041758e4')
# data_dir = d2l.download_extract('SNLI')
# 1. 手动下载并解压

extract_dir = '..\\data\\snli_1.0'

# 3. 直接使用
data_dir = extract_dir

In [2]:
# 清理 data/snli_1.0 目录中可能包含的无效文件（比如来自 macOS 的 ``Icon\r``）
# 这个单元避免后续的文件操作因为非法文件名而失败
import os

# Helper: 清理可能导致 Windows 报错的无效文件名
# 定义为函数以便在多处复用（例如在调用 read_snli/load_data_snli 前）
def _cleanup_snli_dir(data_dir):
    """删除 data_dir 下包含控制字符或以 Icon 开头的文件并返回已删除路径列表"""
    import os
    if not os.path.isdir(data_dir):
        return []
    removed = []
    for root, dirs, files in os.walk(data_dir):
        for name in files:
            if '\r' in name or '\x00' in name or name.startswith('Icon'):
                path = os.path.join(root, name)
                try:
                    os.remove(path)
                    removed.append(path)
                except Exception as e:
                    print(f'无法删除 {path}:', e)
    if removed:
        print('已删除以下无效文件：')
        for p in removed:
            print(' -', p)
    return removed

### [**读取数据集**]


In [3]:
# data_dir：数据集目录路径；is_train：True读取训练集，False读取测试集
# 返回：(premises,hypotheses,labels)三个列表
#@save
def read_snli(data_dir=None, is_train=True):
    """将SNLI数据集解析为前提、假设和标签

    如果未提供 data_dir，会尝试使用 `extract_dir` 全局变量；若不存在则调用 `d2l.download_extract('SNLI')`。
    函数会在打开 TSV 文件前尝试清理目录中的无效文件（如包含 '\r' 的文件名），并在打开失败时重试一次。
    """
    # 1. 文本清洗函数：清理文本中的括号和冗余空格
    def extract_text(s):
        # 删除我们不会使用的信息
        s = re.sub('\\(', '', s) # 删左括号
        s = re.sub('\\)', '', s) # 删右括号
        # 用一个空格替换两个或多个连续的空格
        s = re.sub('\\s{2,}', ' ', s) # 两个以上空格合并为一个
        return s.strip()
    
    '''
    2. 标签映射：将文本标签转为整数
    'entailment'（蕴含）→0
    'contradiction'（矛盾）→1
    'neutral'（中性）→2
    SNLI中有些样本标记为'-'（问题样本），将被过滤掉
    '''
    label_set = {'entailment': 0, 'contradiction': 1, 'neutral': 2}
    '''
    3. 文件读取与解析
    选择文件：根据is_train选择训练集或测试集
    读取TSV：按制表符分割每行，readlines()[1:]跳过CSV表头
    结果：rows是一个二维列表，每行是一个字段列表
    '''
    # 确保 data_dir 可用
    if data_dir is None:
        try:
            data_dir = extract_dir
        except NameError:
            data_dir = d2l.download_extract('SNLI')
    # 尝试清理目录以删除可能导致 Windows 报错的文件
    try:
        _cleanup_snli_dir(data_dir)
    except Exception:
        pass
    file_name = os.path.join(data_dir, 'snli_1.0_train.txt' if is_train else 'snli_1.0_test.txt')
    try:
        with open(file_name, 'r') as f:
            rows = [row.split('\t') for row in f.readlines()[1:]]
    except Exception as e:
        # 若因目录中存在无效文件名导致打开失败，先尝试清理目录后重试
        print(f'打开 {file_name} 失败: {e}. 尝试清理目录并重试。')
        try:
            _cleanup_snli_dir(data_dir)
            with open(file_name, 'r') as f:
                rows = [row.split('\t') for row in f.readlines()[1:]]
        except Exception as e2:
            raise RuntimeError(f'无法打开文件 {file_name}，请检查 SNLI 数据集是否正确解压到 {data_dir}，或手动删除目录中异常文件。 原始错误: {e2}')
    '''
    4. 数据提取与过滤
    row[0]：标签列（如'entailment'），不在label_set中的行被跳过
    row[1]：前提（premise）句子
    row[2]：假设（hypothesis）句子
    列表推导式：只保留标签有效的样本，确保数据质量
    '''
    premises = [extract_text(row[1]) for row in rows if row[0] in label_set]
    hypotheses = [extract_text(row[2]) for row in rows if row[0] \
                in label_set]
    labels = [label_set[row[0]] for row in rows if row[0] in label_set]
    '''
    premises：字符串列表，如['A man inspects...',...]
    hypotheses：字符串列表，如['An activity is being...',...]
    labels：整数列表，如[0,1,2,0,...]
    '''
    return premises, hypotheses, labels

现在让我们[**打印前3对**]前提和假设，以及它们的标签（“0”“1”和“2”分别对应于“蕴涵”“矛盾”和“中性”）。


In [4]:
# rain_data是一个元组(premises,hypotheses,labels)，打乱顺序
train_data = read_snli(data_dir, is_train=True)
'''
train_data[0][:3]：取前3个前提（premises）
train_data[1][:3]：取前3个假设（hypotheses）
train_data[2][:3]：取前3个标签（labels）
zip(...)：将三个列表的对应元素成对打包，生成可迭代的三元组
for x0, x1, y in ...：每次迭代解包一个样本的三个部分
'''
for x0, x1, y in zip(train_data[0][:3], train_data[1][:3], train_data[2][:3]):
    print('前提：', x0)
    print('假设：', x1)
    print('标签：', y)

前提： A person on a horse jumps over a broken down airplane .
假设： A person is training his horse for a competition .
标签： 2
前提： A person on a horse jumps over a broken down airplane .
假设： A person is at a diner , ordering an omelette .
标签： 1
前提： A person on a horse jumps over a broken down airplane .
假设： A person is outdoors , on a horse .
标签： 0


训练集约有550000对，测试集约有10000对。下面显示了训练集和测试集中的三个[**标签“蕴涵”“矛盾”和“中性”是平衡的**]。


In [5]:
# test_data是 (premises,hypotheses,labels)元组，不打乱顺序
test_data = read_snli(data_dir, is_train=False)
# data[2]是标签列表（labels）,依次统计训练集和测试集
for data in [train_data, test_data]:
    # for i in range(3)：i取值0,1,2（对应三种标签）
    # .count(i)：统计标签i出现的次数
    # [data[2].count(0),data[2].count(1),data[2].count(2)]
    print([[row for row in data[2]].count(i) for i in range(3)])

[183416, 183187, 182764]
[3368, 3237, 3219]


### [**定义用于加载数据集的类**]

In [6]:
#@save
class SNLIDataset(torch.utils.data.Dataset):
    """用于加载SNLI数据集的自定义数据集"""
    '''
    dataset：元组 (premises,hypotheses,labels)，来自read_snli()的输出
    num_steps：序列最大长度（填充/截断的目标长度）
    vocab：可选的预构建词汇表（用于共享训练/测试集词表）
    '''
    def __init__(self, dataset, num_steps, vocab=None):
        # 1. 设置序列长度：保存最大序列长度，供后续填充/截断使用
        self.num_steps = num_steps
        '''
        2. 分词处理
        dataset[0]：前提（premise）字符串列表
        dataset[1]：假设（hypothesis）字符串列表
        tokenize：将每个句子字符串拆分为单词列表（如['a','man','is',...]）
        '''
        all_premise_tokens = d2l.tokenize(dataset[0])
        all_hypothesis_tokens = d2l.tokenize(dataset[1])
        '''
        3. 构建或复用词汇表
        首次调用（如训练集）：合并所有前提和假设的词元，构建新词表
            min_freq=5：过滤低频词（出现<5次的词映射为<unk>）
            reserved_tokens=['<pad>']：添加填充符
        后续调用（如测试集）：复用训练集词表，确保索引一致
        '''
        if vocab is None:
            self.vocab = d2l.Vocab(all_premise_tokens + \
                all_hypothesis_tokens, min_freq=5, reserved_tokens=['<pad>'])
        else:
            self.vocab = vocab
        '''
        4. 序列填充/截断
        _pad方法（未显示但可推断）：
            将每个词元列表转换为索引序列（通过vocab）
            长度>num_steps：截断尾部
            长度<num_steps：用<pad>索引填充
        结果：self.premises和self.hypotheses是形状为(样本数,num_steps)的张量列表
        '''
        self.premises = self._pad(all_premise_tokens)
        self.hypotheses = self._pad(all_hypothesis_tokens)
        # 5. 标签张量化：dataset[2]：原始整数标签列表（0/1/2），转换为PyTorch张量，便于后续训练和计算损失
        self.labels = torch.tensor(dataset[2])
        # 6. 打印统计信息
        print('read ' + str(len(self.premises)) + ' examples')
    '''
    1. 输入：lines
    类型：词元列表的列表（nested list）
    示例：[['a','man','is','walking'],['the','dog','barks'],...]；每个子列表代表一个句子的单词分词结果
    2. 词元→索引转换：self.vocab[line]
    调用词汇表将每个词元字符串映射为整数索引；示例：['a','man','is']→[12,85,7]
    OOV处理：不在词表中的词映射为<unk>的索引
    3. 截断与填充：d2l.truncate_pad(...,self.num_steps,self.vocab['<pad>'])
    self.num_steps：目标固定长度（如50）
    self.vocab['<pad>']：填充符的索引（通常是0）
    逻辑：
        序列长度>num_steps：截断尾部，只保留前num_steps个词
        序列长度<num_steps：在末尾补填充符，直到长度为num_steps
    4. 转换为张量：torch.tensor([... for line in lines])
    列表推导式遍历所有句子
    最终返回二维张量，形状：(句子数量,num_steps)
    '''
    def _pad(self, lines):
        return torch.tensor([d2l.truncate_pad(
            self.vocab[line], self.num_steps, self.vocab['<pad>'])
                         for line in lines])
    # 特殊方法：当使用dataset[i]访问元素时自动调用
    # idx：要获取的样本索引（整数，如0,1,2...）
    def __getitem__(self, idx):
        '''
        返回一个嵌套元组：((premise,hypothesis),label)
        self.premises[idx]：第idx个前提句子的张量
            形状：(num_steps,)，如tensor([12,85,7,33,0,0,...])
        self.hypotheses[idx]：第idx个假设句子的张量
            形状：(num_steps,)，与前提格式相同
        self.labels[idx]：第idx个样本的标签
            类型：torch.tensor(0)或tensor(1)或tensor(2)
            对应关系：0=蕴含(entailment),1=矛盾(contradiction),2=中性(neutral)
        '''
        return (self.premises[idx], self.hypotheses[idx]), self.labels[idx]
    # 数据集的总样本数
    def __len__(self):
        return len(self.premises)

### [**整合代码**]

In [7]:
# batch_size：每个训练批次的样本数量
# num_steps：序列最大长度（默认50），用于填充/截断句子

#@save
def load_data_snli(batch_size, num_steps=50):
    """下载SNLI数据集并返回数据迭代器和词表"""
    # 1. 设置多进程加载
    # num_workers = d2l.get_dataloader_workers()
    num_workers = 0
    # 2. 下载数据集
    # data_dir = d2l.download_extract('SNLI')
    data_dir = '..\\data\\snli_1.0'
    # 3. 读取原始数据
    train_data = read_snli(data_dir, True) # 训练集，(premises,hypotheses,labels)元组
    test_data = read_snli(data_dir, False) # 测试集，(premises,hypotheses,labels)元组
    # 4. 创建数据集对象
    train_set = SNLIDataset(train_data, num_steps)
    test_set = SNLIDataset(test_data, num_steps, train_set.vocab)
    '''
    5. 创建数据迭代器
    train_iter：shuffle=True：打乱数据顺序，确保每轮训练顺序不同，用于模型训练
    test_iter：shuffle=False：保持原始顺序，便于复现评估结果，用于模型验证/测试
    '''
    train_iter = torch.utils.data.DataLoader(train_set, batch_size,
                                             shuffle=True,
                                             num_workers=num_workers)
    test_iter = torch.utils.data.DataLoader(test_set, batch_size,
                                            shuffle=False,
                                            num_workers=num_workers)
    '''
    train_iter：训练数据迭代器
    test_iter：测试数据迭代器
    train_set.vocab：词汇表对象（后续可用于词向量加载或解码）
    '''
    return train_iter, test_iter, train_set.vocab

In [8]:
'''
1. 数据加载（耗时操作）
    自动下载：首次执行会从网络下载SNLI数据集（约900MB）
    预处理：读取原始TSV文件，清洗文本，构建词汇表
    批量化：创建两个DataLoader，批量大小为128，序列长度截断/填充为50
2. 返回值
    train_iter：训练集迭代器（约55万样本，4300个批次/轮）
    test_iter：测试集迭代器（约1万样本，77个批次）
    vocab：词汇表对象
'''
train_iter, test_iter, vocab = load_data_snli(128, 50)
len(vocab)

read 549367 examples
read 9824 examples


18678

In [9]:
'''
1. 循环取值：for X, Y in train_iter
    train_iter：DataLoader创建的训练数据迭代器
    每个批次返回：(X, Y)，其中X是特征，Y是标签
2. X的结构
    在SNLI数据集中，X是一个元组，包含两个元素：
    X[0]：前提（premise）张量，形状(batch_size,num_steps)
    X[1]：假设（hypothesis）张量，形状(batch_size,num_steps)
'''
for X, Y in train_iter:
    print(X[0].shape)
    print(X[1].shape)
    print(Y.shape)
    break

torch.Size([128, 50])
torch.Size([128, 50])
torch.Size([128])
