# 多语言平行语料预处理脚本
- 主要功能是下载、清洗、分词（BPE或spacy）、构建词汇表，并将处理后的数据保存为便于模型训练的格式（如pickle)

In [10]:
import os
import logging
import dill as pickle
import urllib

from tqdm import tqdm
import sys
import codecs
import spacy
import torch
import tarfile
import torchtext.data
import torchtext.datasets
import argparse
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator
from torchtext.datasets import Multi30k


In [11]:
import nbimporter
# from c_transformer_model import PAD_WORD, UNK_WORD, BOS_WORD, EOS_WORD
from d_BPE import learn_bpe, BPE

PAD_WORD = '<blank>'  #填充符号
UNK_WORD = '<unk>'  #未知词
BOS_WORD = '<s>'  #起始符号
EOS_WORD = '</s>'  #结束符号

**机器翻译数据**

每个数据源是一个字典，包含三个关键信息：
- `url`: 数据文件的下载链接
- `trg`: 目标语言(target)文件名
- `src`: 源语言(source)文件名

In [12]:
_TRAIN_DATA_SOURCES = [
    {"url": "http://data.statmt.org/wmt17/translation-task/" \
            "training-parallel-nc-v12.tgz",
     "trg": "news-commentary-v12.de-en.en",
     "src": "news-commentary-v12.de-en.de"},
    # 注释掉的两个数据源...
]

_VAL_DATA_SOURCES = [
    {"url": "http://data.statmt.org/wmt17/translation-task/dev.tgz",
     "trg": "newstest2013.en",
     "src": "newstest2013.de"}]

_TEST_DATA_SOURCES = [
    {"url": "https://storage.googleapis.com/tf-perf-public/" \
            "official_transformer/test_data/newstest2014.tgz",
     "trg": "newstest2014.en",
     "src": "newstest2014.de"}]

In [13]:

class TqdmUpTo(tqdm):
    """
    进度条
    """

    def update_to(self, b=1, bsize=1, tsize=None):
        """
        :param b:已完成块的数量
        :param bsize:每个块的大小
        :param tsize:任务的总大小
        """
        if tsize is not None:
            self.total = tsize
        # 目标进度(b * bsize)减去当前已完成进度(self.n)
        self.update(b * bsize - self.n)


def file_exist(dir_name, file_name):
    """
    遍历dir_name,搜索file_name
    """
    for root, dirs, files in os.walk(dir_name):
        if file_name in files:
            return os.path.join(root, file_name)
    return None


def _download_file(download_dir, url):
    """
    指定 URL 下载文件到本地目录，并支持进度显示和重复下载检查。
    """
    # URL http://data.statmt.org/wmt17/training-parallel-nc-v12.tgz会被分割为 ['http:', '', 'data.statmt.org', 'wmt17', 'training-parallel-nc-v12.tgz']
    filename = url.split("/")[-1]
    if file_exist(download_dir, filename):
        sys.stderr.write(f"Already downloaded:{url} at {filename}\n")
    else:
        sys.stderr.write(f"Downloading {url} to {filename}\n")
        # miniters=1：进度条至少每 1 次迭代更新一次,
        # desc=filename：进度条前缀显示文件名
        with TqdmUpTo(unit="B", unit_scale=True, miniters=1, desc=filename) as t:
            # reporthook=t.update_to指定进度回调函数，通过 TqdmUpTo的 update_to方法实时更新进度条。
            urllib.request.urlretrieve(url, filename, reporthook=t.update_to)
    return filename


def download_and_extract(download_dir, url, src_filename, trg_filename):
    """
    从指定 URL 下载压缩语料库，解压后验证源语言（src）和目标语言（trg）文件的存在性
    """
    # step 1: 检查文件是否存在
    src_path = file_exist(download_dir, src_filename)
    trg_path = file_exist(download_dir, trg_filename)

    # step 2:跳过已经存在文件
    if src_path and trg_path:
        sys.stderr.write(f'Already downloaded and extracted: {src_path} -> {trg_path}\n')
        return src_path, trg_path

    # step 3:下载压缩文件
    compressed_file = _download_file(download_dir, url)

    # step 4: 解压缩文件
    sys.stderr.write(f"Extracting {compressed_file} to {trg_path}\n")
    # tarfile模块以只读模式打开 gzip 压缩的 tar 文件（r:gz）
    with tarfile.open(compressed_file, "r:gz") as tar:
        tar.extractall(path=download_dir)

    # step 5:再次验证解压缩的文件
    src_path = file_exist(download_dir, src_filename)
    trg_path = file_exist(download_dir, trg_filename)

    if src_path and trg_path:
        return src_path, trg_path
    raise OSError(f"Download/extraction failed for url {url} to path {download_dir}")


def get_raw_files(raw_dir, sources):
    """
    :param sources:List{url:http://data.statmt.org/wmt17/training-parallel-nc-v12.tgz.
                        src:news-commentary-v12.de-en.de,
                        trg:news-commentary-v12.de-en.en}
    """
    raw_files = {"src": [], "trg": []}
    for d in sources:
        src_file, trg_file = download_and_extract(raw_dir, d["url"], d["src"], d["trg"])
        raw_files["src"].append(src_file)
        raw_files["trg"].append(trg_file)
    return raw_files


def mkdir_if_needed(dir_name):
    if not os.path.exists(dir_name):
        os.makedirs(dir_name)


def compile_files(
        raw_dir: str,
        raw_files: dict[str, list[str]],
        prefix: str
) -> tuple[str, str]:
    """
    合并多源原始文件为统一的源语言(.src)和目标语言(.trg)文件。

    参数:
        raw_dir (str): 原始文件根目录（如 `./data/raw`）。
        raw_files (dict): 含 "src"（源文件路径列表）和 "trg"（目标文件路径列表）的字典，两列表长度需一致。
        prefix (str): 合并文件前缀（生成如 `raw-train.src` 和 `raw-train.trg`）。

    返回:
        tuple[str, str]: 合并后的源文件路径和目标文件路径（顺序：(src_fpath, trg_fpath)）。
    """
    #  raw_dir=./data/raw，prefix=train -> ./data/raw/raw-train.src
    src_fpath = os.path.join(raw_dir, f"raw-{prefix}.src")
    trg_fpath = os.path.join(raw_dir, f"raw-{prefix}.trg")

    if os.path.exists(src_fpath) and os.path.exists(trg_fpath):
        sys.stderr.write(f"Merged files found, skip the merging process.\n")
        return src_fpath, trg_fpath

    # 合并文件
    sys.stderr.write(f"Merge files into two files: {src_fpath} and {trg_fpath}.\n")
    with open(src_fpath, "w") as src_outf, open(trg_fpath, "w") as trg_outf:
        for src_inf, trg_inf in zip(raw_files["src"], raw_files["trg"]):
            sys.stderr.write(f'  Input files: \n'
                             f'    - SRC: {src_inf}, and\n'
                             f'    - TRG: {trg_inf}.\n'
                             )
            with open(src_inf, newline='\n') as src_inf, open(trg_inf, newline='\n') as trg_inf:
                cntr = 0  # 计数器：记录源文件与目标文件的行数差
                for i, line in enumerate(src_inf):
                    cntr += 1
                    # 清理行内空白字符（如 \r）并添加统一换行符（\n）
                    src_outf.write(line.replace('\r', ' ').strip() + '\n')

                for j, line in enumerate(trg_inf):
                    cntr -= 1
                    trg_outf.write(line.replace('\r', ' ').strip() + '\n')
                    # 验证源文件与目标文件行数是否一致（必须完全匹配)
            assert cntr == 0, f'Number of lines in {src_inf} and {trg_inf} are inconsistent.'
    return src_fpath, trg_fpath


def encode_file(bpe, in_file, out_file):
    """
    原始文本文件（如纯文本语料）通过 BPE 算法转换为分词后的编码文件
    :param bpe :BPE编码
    :param in_file:原始文本
    :param out_file:将编码后的文件写入输出文件
    """
    sys.stderr.write(f"Read raw content from {in_file} and \n" \
                     f"Write encoded content to {out_file}\n")
    with codecs.open(in_file, "r", encoding="utf-8") as inf:
        with codecs.open(out_file, "w", encoding="utf-8") as outf:
            for line in inf:
                outf.write(bpe.process_line(line))


def encode_files(bpe, src_in_file, trg_in_file, data_dir, prefix):
    """
    批量处理源语言（src）和目标语言（trg）原始文件，通过 BPE 算法将其转换为模型可处理的结构化编码文件
    """
    src_out_file = os.path.join(data_dir, f"{prefix}.src")
    trg_out_file = os.path.join(data_dir, f"{prefix}.trg")

    if os.path.isfile(src_in_file) and os.path.isfile(trg_out_file):
        sys.stderr.write(f"Encode files found, skip the encode process.\n")

    encode_file(bpe, src_in_file, src_out_file)
    encode_file(bpe, trg_in_file, trg_out_file)
    return src_out_file, trg_out_file

In [14]:
def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("-raw_dir", required=True)
    parser.add_argument("-data_dir", required=True)
    parser.add_argument('-codes', required=True)
    parser.add_argument('-save_data', required=True)
    parser.add_argument('-prefix', required=True)
    parser.add_argument('-max_len', type=int, default=100)
    parser.add_argument('--symbols', '-s', type=int, default=320000, help="Vocabulary size")
    # metavar（“元变量名”）用于在命令行帮助信息（-h/--help）中更友好地显示参数的占位符名称。
    # metavar会将帮助信息中的占位符从类型名（如 INT）替换为你指定的名称
    parser.add_argument(
        '--min-frequency', type=int, default=6, metavar='FREQ',
        help="Stop if no symbol pair has frequency >= FREQ (default: %(default)s))"
    )
    # action决定了参数被指定时的行为逻辑:python script.py --dict-input），解析器会为该参数分配一个布尔值 True
    parser.add_argument(
        '--dict-input', action='store_true',
        help="If set, input file is interpreted as a dictionary where each line contains a word-count pair"
    )
    parser.add_argument(
        '--separator', type=str, default='@@', metavar='STR',
        help="Separator between non-final subword units (default: '%(default)s'))"
    )
    parser.add_argument('--total-symbols', '-t', action='store_true')
    opt = parser.parse_args()

    # step 1: 数据准备阶段
    # 确保原始数据目录（raw_dir）和处理后数据目录（data_dir）存在，避免后续文件操作因目录缺失报错
    mkdir_if_needed(opt.raw_dir)
    mkdir_if_needed(opt.data_dir)

    # 下载并解压原始数据
    raw_train = get_raw_files(opt.raw_dir, _TRAIN_DATA_SOURCES)
    raw_val = get_raw_files(opt.raw_dir, _VAL_DATA_SOURCES)
    raw_test = get_raw_files(opt.raw_dir, _TEST_DATA_SOURCES)

    # step 2: 将分散的源文件和目标文件合并为两个大文件（如 train.src和 train.trg）
    #prefix='train'，则生成 ./data/raw/raw-train.src（源文件）和 ./data/raw/raw-train.trg
    train_src, train_trg = compile_files(opt.raw_dir, raw_train, opt.prefix + '-train')
    val_src, val_trg = compile_files(opt.raw_dir, raw_val, opt.prefix + '-val')
    test_src, test_trg = compile_files(opt.raw_dir, raw_test, opt.prefix + '-test')

    # step 3:BPE编码阶段
    # 生成BPE编码文件
    opt.codes = os.path.join(opt.data_dir, opt.codes)
    if not os.path.isfile(opt.codes):
        learn_bpe(raw_train['src'] + raw_train['trg'], opt.codes, opt.symbols, opt.min_frequency, True)

    # 初始BPE化分词器
    sys.stderr.write(f"BPE codes prepared.\n")
    sys.stderr.write(f"Build up the tokenizer.\n")
    with codecs.open(opt.codes, encoding="utf-8") as codes:
        bpe = BPE(codes, separator=opt.separator)

    # 对合并文件进行BPE编码
    sys.stderr.write(f"Encoding ...\n")
    encode_files(bpe, train_src, train_trg, opt.data_dir, opt.prefix + '-train')
    encode_files(bpe, val_src, val_trg, opt.data_dir, opt.prefix + '-val')
    encode_files(bpe, test_src, test_trg, opt.data_dir, opt.prefix + '-test')

    # step 4: 数据集构建与数据保存
    field = torchtext.data.Field(
        tokenize=str.split,  # 初始分词
        lower=True,
        pad_token=PAD_WORD,  # 填充标记
        init_token=BOS_WORD,  # 句首标记
        eos_token=EOS_WORD  #句尾标记
    )

    # 构建翻译数据集
    fields = (field, field)
    MAX_LEN = opt.max_len

    def filter_examples_with_length(x):
        return len(vars(x)['src']) <= MAX_LEN and len(vars(x)['trg']) <= MAX_LEN

    train = Multi30k(
        fields=fields,
        path=os.path.join(opt.data_dir, opt.prefix + '-train'),
        exts=('.src', '.trg'),
        filter_func=filter_examples_with_length,
    )

    # 生成词汇表（基于BPE编码后的数据）
    from itertools import chain
    # chain：将多个可迭代对象（如 train.src和 train.trg）连接成一个连续的迭代器。此处用于合并源语言和目标语言的语料，生成统一的语料集合
    # build_vocab方法：torchtext.data.Field类的方法，用于基于语料统计词频并生成词汇表
    field.build_vocab(chain(train.src, train.trg), min_freq=2)

    #  field： 保存数据字段的处理规则（如分词、填充标记）
    data = {'settings': opt, 'vocab': field, }
    opt.save_data = os.path.join(opt.data_dir, opt.save_data)

    print('[Info] Dumping the processed data to pickle file', opt.save_data)
    pickle.dump(data, open(opt.save_data, 'wb'))

In [43]:
def main_wo_bpe(opt: argparse.Namespace = None):
    """
    支持命令行与传参两种调用方式
    示例:
        python preprocess.py -lang_src de -lang_trg en -save_data multi30k_de_en.pkl -share_vocab
    """
    if opt is None:
        parser = argparse.ArgumentParser()
        parser.add_argument('-lang_src', required=True,
                            choices=['de', 'el', 'en', 'es', 'fr', 'it', 'lt', 'nb', 'nl', 'pt'])
        parser.add_argument('-lang_trg', required=True,
                            choices=['de', 'el', 'en', 'es', 'fr', 'it', 'lt', 'nb', 'nl', 'pt'])
        parser.add_argument('-save_data', required=True)
        parser.add_argument('-max_len', type=int, default=100)
        parser.add_argument('-min_word_count', type=int, default=3)
        parser.add_argument('-keep_case', action='store_true')
        parser.add_argument('-share_vocab', action='store_true')
        opt = parser.parse_args()
    else:
        required_args = ['lang_src', 'lang_trg', 'save_data']
        for arg in required_args:
            if not hasattr(opt, arg):
                raise ValueError(f"Missing required argument: {arg}")

    # 检查自定义数据是否启用
    assert not any([opt.data_src, opt.data_trg]), 'Custom data input is not supported now.'

    # 加载语言模型
    lang_to_spacy_model = {
        'de': 'de_core_news_sm',  # 德语模型
        'en': 'en_core_web_sm',  # 英语模型
    }
    src_lang_model = spacy.load(lang_to_spacy_model[opt.lang_src])
    trg_lang_model = spacy.load(lang_to_spacy_model[opt.lang_trg])

    # 分词函数
    def tokenize_src(text):
        return [tok.text for tok in src_lang_model.tokenizer(text)]

    def tokenize_trg(text):
        return [tok.text for tok in trg_lang_model.tokenizer(text)]

    # 加载 Multi30k 数据集
    train_data = Multi30k(split='train', language_pair=(opt.lang_src, opt.lang_trg))
    val_data = Multi30k(split='valid', language_pair=(opt.lang_src, opt.lang_trg))
    test_data = Multi30k(split="test", language_pair=(opt.lang_src, opt.lang_trg))

    # 过滤过长句子
    # MAX_LEN = opt.max_len
    # train_data = [ex for ex in train_data if len(ex[0]) <= MAX_LEN and len(ex[1]) <= MAX_LEN]
    # val_data = [ex for ex in val_data if len(ex[0]) <= MAX_LEN and len(ex[1]) <= MAX_LEN]
    # test_data = [ex for ex in test_data if len(ex[0]) <= MAX_LEN and len(ex[1]) <= MAX_LEN]

    # 构建词汇表
    MIN_FREQ = opt.min_word_count

    def yield_tokens(data_iter, tokenizer):
        for src, trg in data_iter:
            yield tokenizer(src)
            yield tokenizer(trg)

    # 构建源语言词汇表
    src_tokenizer = get_tokenizer(tokenize_src)
    src_vocab = build_vocab_from_iterator(
        yield_tokens(train_data, src_tokenizer),
        min_freq=MIN_FREQ,
        specials=[PAD_WORD, BOS_WORD, EOS_WORD],
        special_first=True
    )
    # 当遇到未知单词时返回该索引
    src_vocab.set_default_index(src_vocab[PAD_WORD])

    # 构建目标语言词汇表
    trg_tokenizer = get_tokenizer(tokenize_trg)
    trg_vocab = build_vocab_from_iterator(
        yield_tokens(train_data, trg_tokenizer),
        min_freq=MIN_FREQ,
        specials=[PAD_WORD, BOS_WORD, EOS_WORD],
        special_first=True
    )
    trg_vocab.set_default_index(trg_vocab[PAD_WORD])

    # 共享词汇表
    def yield_tokens(data_iter, tokenizer):
        for src, trg in data_iter:
            yield tokenizer(src)
            yield tokenizer(trg)

    # 构建共享词汇表（如果启用）
    if opt.share_vocab:
        print("[Info] Merging source and target vocabularies...")
        shared_vocab = build_vocab_from_iterator(
            yield_tokens(train_data, tokenize_src),
            min_freq=MIN_FREQ,
            specials=[PAD_WORD, BOS_WORD, EOS_WORD],
            special_first=True
        )
        shared_vocab.set_default_index(shared_vocab[PAD_WORD])
        src_vocab = shared_vocab
        trg_vocab = shared_vocab
        print(f"[Info] Merged vocabulary size: {len(shared_vocab)}")

    # 保存数据
    data = {
        'settings': opt,
        'vocab': {'src': src_vocab, 'trg': trg_vocab},
        'train': train_data,
        'valid': val_data,
        'test': test_data
    }

    print(f"[Info] Saving processed data to {opt.save_data}")
    pickle.dump(data, open(opt.save_data, 'wb'))

## 1.2 Download the spacy language model.
- python -m spacy download en
- python -m spacy download de

## 1.3 Preprocess the data with torchtext and spacy.
- python preprocess.py -lang_src de -lang_trg en -share_vocab -save_data m30k_deen_shr.pkl

In [44]:
import argparse

opt = argparse.Namespace(
    lang_src='de',  # 源语言（德语）
    lang_trg='en',  # 目标语言（英语）
    save_data='multi30k_de_en.pkl',  # 保存路径
    data_src=None,
    data_trg=None,
    max_len=100,
    min_word_count=3,
    keep_case=True,
    share_vocab=True,
)

main_wo_bpe(opt)



[Info] Merging source and target vocabularies...
[Info] Merged vocabulary size: 10264
[Info] Saving processed data to multi30k_de_en.pkl
