### homework1：练习Seq2Seq数据预处理部分

In [1]:
import matplotlib as mpl
import matplotlib.pyplot as plt
%matplotlib inline
import numpy as np
import sklearn
import pandas as pd
import os
import sys
import time
from tqdm.auto import tqdm
import torch
import torch.nn as nn
import torch.nn.functional as F

print(sys.version_info)
for module in mpl, np, pd, sklearn, torch:
    print(module.__name__, module.__version__)

device = torch.device("cuda:0") if torch.cuda.is_available() else torch.device("cpu")
print(device)

seed = 42
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
np.random.seed(seed)

sys.version_info(major=3, minor=12, micro=3, releaselevel='final', serial=0)
matplotlib 3.10.0
numpy 1.26.4
pandas 2.2.3
sklearn 1.6.0
torch 2.5.1+cu124
cuda:0


数据加载

In [2]:
import unicodedata
import re
from sklearn.model_selection import train_test_split

# 定义一个函数，将Unicode字符串转换为ASCII字符串
def unicode_to_ascii(s):
    # 使用unicodedata.normalize方法将字符串s转换为NFD形式。
    # NFD（Normalization Form D）是一种Unicode规范化形式，它将字符分解为基本字符和组合字符。
    # 例如，字符'é'会被分解为'e'和一个组合的重音符号。
    normalized_string = unicodedata.normalize('NFD', s)
    
    # 使用列表推导式遍历规范化后的字符串中的每个字符。
    # unicodedata.category(c)返回字符c的Unicode类别。
    # 'Mn'表示“非间距标记”（例如重音符号），我们通过条件判断去除这些字符。
    ascii_string = ''.join(c for c in normalized_string if unicodedata.category(c) != 'Mn')
    
    # 返回转换后的ASCII字符串
    return ascii_string

# 测试unicode_to_ascii函数
# 定义一个英文句子和一个西班牙语句子，使用u前缀表示Unicode字符串
en_sentence = u"May I borrow this book?"
sp_sentence = u"¿Puedo tomar prestado este libro?"

# 打印转换后的英文句子
print(unicode_to_ascii(en_sentence))
# 输出: May I borrow this book?

# 打印转换后的西班牙语句子
print(unicode_to_ascii(sp_sentence))
# 输出: ¿Puedo tomar prestado este libro?

May I borrow this book?
¿Puedo tomar prestado este libro?


In [3]:
def preprocess_sentence(w):
    # 1. 将字符串转换为小写，并去掉首尾多余的空格
    # 2. 调用 unicode_to_ascii 函数，将 Unicode 字符转换为 ASCII 字符（去除重音符号等）
    w = unicode_to_ascii(w.lower().strip())

    # 在单词与跟在其后的标点符号之间插入一个空格
    # 例如："he is a boy." => "he is a boy . "
    # 使用正则表达式匹配标点符号（?.!,¿），并在其前后添加空格
    w = re.sub(r"([?.!,¿])", r" \1 ", w)

    # 将多个连续的空格替换为一个空格
    # 因为上一步可能会引入多余的空格
    w = re.sub(r'[" "]+', " ", w)

    # 除了字母 (a-z, A-Z) 和标点符号（".", "?", "!", ",", "¿"），将所有其他字符替换为空格
    # 这样可以去除数字、特殊符号等不需要的字符
    w = re.sub(r"[^a-zA-Z?.!,¿]+", " ", w)

    # 去掉字符串首尾的空格，确保最终结果干净
    w = w.rstrip().strip()

    return w

# 测试 preprocess_sentence 函数
en_sentence = u"May I borrow this book?"
sp_sentence = u"¿Puedo tomar prestado este libro?"

# 打印处理后的英文句子
print(preprocess_sentence(en_sentence))
# 输出: may i borrow this book ?

# 打印处理后的西班牙语句子
print(preprocess_sentence(sp_sentence))
# 输出: ¿ puedo tomar prestado este libro ?

# 打印处理后的西班牙语句子的 UTF-8 编码
print(preprocess_sentence(sp_sentence).encode('utf-8'))
# 输出: b' \xc2\xbf puedo tomar prestado este libro ?'

may i borrow this book ?
¿ puedo tomar prestado este libro ?
b'\xc2\xbf puedo tomar prestado este libro ?'


Dataset

In [4]:
#zip例子
a = [[1,2],[4,5],[7,8]]
zipped = list(zip(*a))
print(zipped)

[(1, 4, 7), (2, 5, 8)]


In [5]:
split_index1 = np.random.choice(a=["train", "test"], replace=True, p=[0.9, 0.1], size=100)
split_index1

array(['train', 'test', 'train', 'train', 'train', 'train', 'train',
       'train', 'train', 'train', 'train', 'test', 'train', 'train',
       'train', 'train', 'train', 'train', 'train', 'train', 'train',
       'train', 'train', 'train', 'train', 'train', 'train', 'train',
       'train', 'train', 'train', 'train', 'train', 'test', 'test',
       'train', 'train', 'train', 'train', 'train', 'train', 'train',
       'train', 'test', 'train', 'train', 'train', 'train', 'train',
       'train', 'test', 'train', 'test', 'train', 'train', 'test',
       'train', 'train', 'train', 'train', 'train', 'train', 'train',
       'train', 'train', 'train', 'train', 'train', 'train', 'test',
       'train', 'train', 'train', 'train', 'train', 'train', 'train',
       'train', 'train', 'train', 'train', 'train', 'train', 'train',
       'train', 'train', 'train', 'train', 'train', 'train', 'train',
       'train', 'train', 'train', 'train', 'train', 'train', 'train',
       'train', 'train'], dty

In [7]:
from pathlib import Path
from torch.utils.data import Dataset, DataLoader
import numpy as np

class LangPairDataset(Dataset):
    # 数据文件路径
    fpath = Path(r"./data_spa_en/spa.txt")
    # 缓存文件路径
    cache_path = Path(r"./.cache/lang_pair.npy")
    # 按照 9:1 的比例随机划分训练集和测试集
    split_index = np.random.choice(a=["train", "test"], replace=True, p=[0.9, 0.1], size=118964)

    def __init__(self, mode="train", cache=False):
        # 如果没有缓存，或者缓存文件不存在，则处理数据并保存缓存
        if cache or not self.cache_path.exists():
            # 创建缓存文件夹（如果不存在）
            self.cache_path.parent.mkdir(parents=True, exist_ok=True)
            # 打开数据文件并读取所有行
            with open(self.fpath, "r", encoding="utf8") as file:
                lines = file.readlines()
                # 对每一行数据进行预处理，分割成目标语言和源语言
                lang_pair = [[preprocess_sentence(w) for w in l.split('\t')] for l in lines]
                # 将目标语言和源语言分离
                trg, src = zip(*lang_pair)
                # 转换为 numpy 数组
                trg = np.array(trg)
                src = np.array(src)
                # 将数据保存为 npy 文件，方便下次直接读取
                np.save(self.cache_path, {"trg": trg, "src": src})
        else:
            # 如果缓存文件存在，则直接加载缓存数据
            lang_pair = np.load(self.cache_path, allow_pickle=True).item()
            trg = lang_pair["trg"]
            src = lang_pair["src"]

        # 根据 mode（train/test）从 split_index 中筛选出对应的数据
        self.trg = trg[self.split_index == mode]  # 目标语言（英语）
        self.src = src[self.split_index == mode]  # 源语言（西班牙语）

    def __getitem__(self, index):
        # 返回指定索引的源语言和目标语言对
        return self.src[index], self.trg[index]

    def __len__(self):
        # 返回数据集的长度
        return len(self.src)


# 创建训练集和测试集实例
train_ds = LangPairDataset("train")
test_ds = LangPairDataset("test")

In [8]:
print("source: {}\ntarget: {}".format(*train_ds[-1]))

source: si quieres sonar como un hablante nativo , debes estar dispuesto a practicar diciendo la misma frase una y otra vez de la misma manera en que un musico de banjo practica el mismo fraseo una y otra vez hasta que lo puedan tocar correctamente y en el tiempo esperado .
target: if you want to sound like a native speaker , you must be willing to practice saying the same sentence over and over in the same way that banjo players practice the same phrase over and over until they can play it correctly and at the desired tempo .


Tokenizer

这里有两种处理方式，分别对应着 encoder 和 decoder 的 word embedding 是否共享，这里实现不共享的方案。

In [9]:
from collections import Counter

def get_word_idx(ds, mode="src", threshold=2):
    # 初始化词表，包含特殊 token
    word2idx = {
        "[PAD]": 0,     # 填充 token，用于填充短句子
        "[BOS]": 1,     # 句子开始 token
        "[UNK]": 2,     # 未知 token，用于表示词表中未出现的词
        "[EOS]": 3,     # 句子结束 token
    }
    # 反向词表，用于根据索引查找单词
    idx2word = {value: key for key, value in word2idx.items()}
    # 当前词表的下一个可用索引
    index = len(idx2word)
    # 设置词频阈值，低于此阈值的单词将被忽略
    threshold = 1

    # 将数据集中所有句子拼接成一个长字符串，然后按空格分割成单词列表
    # 如果数据集很大，建议使用 for 循环逐句处理，避免内存不足
    word_list = " ".join([pair[0 if mode == "src" else 1] for pair in ds]).split()
    # 统计单词频率
    counter = Counter(word_list)
    print("word count:", len(counter))

    # 遍历单词频率统计结果
    for token, count in counter.items():
        # 如果单词出现次数大于等于阈值，则加入词表
        if count >= threshold:
            word2idx[token] = index  # 加入正向词表
            idx2word[index] = token  # 加入反向词表
            index += 1  # 更新下一个可用索引

    return word2idx, idx2word

# 生成源语言（西班牙语）和目标语言（英语）的词表
src_word2idx, src_idx2word = get_word_idx(train_ds, "src")  # 源语言词表（西班牙语）
trg_word2idx, trg_idx2word = get_word_idx(train_ds, "trg")  # 目标语言词表（英语）

word count: 23745
word count: 12475


In [10]:
import torch

class Tokenizer:
    def __init__(self, word2idx, idx2word, max_length=500, pad_idx=0, bos_idx=1, eos_idx=3, unk_idx=2):
        # 初始化词表和参数
        self.word2idx = word2idx  # 单词到索引的映射
        self.idx2word = idx2word  # 索引到单词的映射
        self.max_length = max_length  # 最大序列长度
        self.pad_idx = pad_idx  # 填充 token 的索引
        self.bos_idx = bos_idx  # 句子开始 token 的索引
        self.eos_idx = eos_idx  # 句子结束 token 的索引
        self.unk_idx = unk_idx  # 未知 token 的索引

    def encode(self, text_list, padding_first=False, add_bos=True, add_eos=True, return_mask=False):
        """将文本列表编码为索引列表
        - padding_first: 是否在前面填充
        - add_bos: 是否添加句子开始 token
        - add_eos: 是否添加句子结束 token
        - return_mask: 是否返回掩码（用于指示哪些是填充 token）
        """
        # 计算最大长度（不超过 self.max_length）
        max_length = min(self.max_length, add_eos + add_bos + max([len(text) for text in text_list]))
        indices_list = []  # 存储编码后的索引列表

        for text in text_list:
            # 将单词转换为索引，未知单词用 unk_idx 代替
            indices = [self.word2idx.get(word, self.unk_idx) for word in text[:max_length - add_bos - add_eos]]
            # 添加句子开始 token
            if add_bos:
                indices = [self.bos_idx] + indices
            # 添加句子结束 token
            if add_eos:
                indices = indices + [self.eos_idx]
            # 在前面或后面填充
            if padding_first:
                indices = [self.pad_idx] * (max_length - len(indices)) + indices
            else:
                indices = indices + [self.pad_idx] * (max_length - len(indices))
            indices_list.append(indices)

        # 将索引列表转换为 tensor
        input_ids = torch.tensor(indices_list)
        # 生成掩码（1 表示填充 token，0 表示真实 token）
        masks = (input_ids == self.pad_idx).to(dtype=torch.int64)

        # 返回编码结果和掩码（如果 return_mask 为 True）
        return input_ids if not return_mask else (input_ids, masks)

    def decode(self, indices_list, remove_bos=True, remove_eos=True, remove_pad=True, split=False):
        """将索引列表解码为文本列表
        - remove_bos: 是否移除句子开始 token
        - remove_eos: 是否移除句子结束 token
        - remove_pad: 是否移除填充 token
        - split: 是否返回单词列表（而不是拼接的句子）
        """
        text_list = []
        for indices in indices_list:
            text = []
            for index in indices:
                word = self.idx2word.get(index, "[UNK]")  # 将索引转换为单词，未知索引用 "[UNK]" 代替
                # 根据参数决定是否移除特殊 token
                if remove_bos and word == "[BOS]":
                    continue
                if remove_eos and word == "[EOS]":
                    break
                if remove_pad and word == "[PAD]":
                    break
                text.append(word)
            # 将单词列表拼接为句子或直接返回单词列表
            text_list.append(" ".join(text) if not split else text)
        return text_list


# 创建源语言和目标语言的 tokenizer
src_tokenizer = Tokenizer(word2idx=src_word2idx, idx2word=src_idx2word)  # 源语言 tokenizer
trg_tokenizer = Tokenizer(word2idx=trg_word2idx, idx2word=trg_idx2word)  # 目标语言 tokenizer

# 测试编码和解码功能
raw_text = ["hello world".split(), "tokenize text datas with batch".split(), "this is a test".split()]
indices, mask = trg_tokenizer.encode(raw_text, padding_first=False, add_bos=True, add_eos=True, return_mask=True)
decode_text = trg_tokenizer.decode(indices.tolist(), remove_bos=False, remove_eos=False, remove_pad=False)

# 打印结果
print("raw text" + '-' * 10)
for raw in raw_text:
    print(raw)

print("mask" + '-' * 10)
for m in mask:
    print(m)

print("indices" + '-' * 10)
for index in indices:
    print(index)

print("decode text" + '-' * 10)
for decode in decode_text:
    print(decode)

raw text----------
['hello', 'world']
['tokenize', 'text', 'datas', 'with', 'batch']
['this', 'is', 'a', 'test']
mask----------
tensor([0, 0, 0, 0, 1, 1, 1])
tensor([0, 0, 0, 0, 0, 0, 0])
tensor([0, 0, 0, 0, 0, 0, 1])
indices----------
tensor([   1,   17, 3222,    3,    0,    0,    0])
tensor([   1,    2, 3883,    2,  737,    2,    3])
tensor([   1,  119,  228,  106, 1274,    3,    0])
decode text----------
[BOS] hello world [EOS] [PAD] [PAD] [PAD]
[BOS] [UNK] text [UNK] with [UNK] [EOS]
[BOS] this is a test [EOS] [PAD]
