In [0]:
# 挂载Google Drive（仅在Colab环境需要）
# 本地运行时可以跳过这部分
from google.colab import drive
drive.mount('/content/drive')

# 安装必要的Python包
# keras-transformer提供了Transformer模型的实现
!pip install keras-transformer

# 打印当前工作目录和中间数据目录内容
!pwd
!ls "/content/drive/My Drive/Colab Notebooks/middle_data/"

In [1]:
# 导入必要的库
import numpy as np
import pickle
import operator
from keras_transformer import get_model, decode  # Transformer模型实现

# 设置数据路径
# Google Colab路径（已注释）
# main_path = '/content/drive/My Drive/Colab Notebooks/'
# path = main_path + 'middle_data/'

# 本地运行路径
path = 'middle_data/'

# 加载预处理好的数据
# encode_input: 编码后的源语言(中文)序列
with open(path + 'encode_input.pkl', 'rb') as f:
    encode_input = pickle.load(f)
    
# decode_input: 编码后的目标语言(英文)序列(decoder输入)
with open(path + 'decode_input.pkl', 'rb') as f:
    decode_input = pickle.load(f)
    
# decode_output: 编码后的目标语言(英文)序列(decoder目标输出)
with open(path + 'decode_output.pkl', 'rb') as f:
    decode_output = pickle.load(f)
    
# source_token_dict: 源语言(中文)词表
with open(path + 'source_token_dict.pkl', 'rb') as f:
    source_token_dict = pickle.load(f)
    
# target_token_dict: 目标语言(英文)词表
with open(path + 'target_token_dict.pkl', 'rb') as f:
    target_token_dict = pickle.load(f)
    
# source_tokens: 源语言token列表
with open(path + 'source_tokens.pkl', 'rb') as f:
    source_tokens = pickle.load(f)
    
print('数据加载完成')

In [2]:
# 打印词表大小和数据量信息
print('源语言词表大小:', len(source_token_dict))
print('目标语言词表大小:', len(target_token_dict))
print('训练样本数量:', len(encode_input))

# 构建Transformer模型
model = get_model(
    token_num=max(len(source_token_dict), len(target_token_dict)),  # 词表大小
    embed_dim=64,         # 词嵌入维度
    encoder_num=2,        # Encoder层数
    decoder_num=2,        # Decoder层数
    head_num=4,           # 注意力头数
    hidden_dim=256,       # 前馈网络隐藏层维度
    dropout_rate=0.05,    # Dropout率
    use_same_embed=False, # 中英文使用不同的词嵌入矩阵
)

# 编译模型
# 使用Adam优化器和稀疏分类交叉熵损失函数
model.compile('adam', 'sparse_categorical_crossentropy')

# 可以取消注释查看模型结构
# model.summary()

print('模型构建完成')

In [None]:
# 训练模型
from keras.callbacks import ModelCheckpoint, ReduceLROnPlateau

# 模型保存路径配置
# 保存验证损失最低的模型
filepath = path + "models/W-" + "-{epoch:3d}-{loss:.4f}-.h5"
checkpoint = ModelCheckpoint(
    filepath,
    monitor='loss',       # 监控损失值
    verbose=1,           # 打印日志
    save_best_only=True, # 只保存最好的模型
    mode='min',          # 监控指标越小越好
    save_freq='epoch',   # 每个epoch保存一次
)

# 学习率调度器
# 当损失停止下降时降低学习率
reduce_lr = ReduceLROnPlateau(
    monitor='loss',      # 监控损失值
    factor=0.2,          # 学习率衰减因子
    patience=2,          # 等待epoch数
    verbose=1,           # 打印日志
    mode='min',          # 监控指标越小越好
    min_delta=0.0001,    # 最小变化量
    cooldown=0,          # 学习率减少后的等待epoch数
    min_lr=0             # 学习率下限
)

# 回调函数列表
callbacks_list = [checkpoint, reduce_lr]

# 开始训练
model.fit(
    x=[np.array(encode_input[:20000]), np.array(decode_input[:20000])],  # 输入数据
    y=np.array(decode_output[:20000]),  # 目标数据
    epochs=100,        # 训练轮数
    batch_size=64,     # 批量大小
    verbose=1,         # 显示进度条
    callbacks=callbacks_list,  # 回调函数
    # 其他可选参数:
    # class_weight=None,       # 类别权重
    # max_queue_size=5,        # 生成器队列最大尺寸
    # workers=1,               # 使用的线程数
    # use_multiprocessing=False, # 是否使用多进程
    # shuffle=False,           # 是否打乱数据
    # initial_epoch=0          # 起始epoch
)

In [3]:
# 加载训练好的模型
model.load_weights('model\\W-- 99-0.0070-.h5')

# 创建目标语言词表的反向映射(从索引到单词)
target_token_dict_inv = {v: k for k, v in target_token_dict.items()}

print('模型加载完成')

In [4]:
# 导入必要的库
from keras.preprocessing import sequence
import numpy as np
import matplotlib.pyplot as plt
import matplotlib
import jieba  # 中文分词
import requests

def get_input(seq):
    """
    预处理输入的中文句子
    1. 使用jieba进行分词
    2. 添加开始和结束标记
    3. 填充到固定长度
    4. 转换为词表索引
    """
    # 中文分词
    seq = ' '.join(jieba.lcut(seq, cut_all=False))
    seq = seq.split(' ')
    print('分词结果:', seq)
    
    # 添加开始和结束标记
    seq = ['<START>'] + seq + ['<END>']
    
    # 填充到固定长度(34)
    seq = seq + ['<PAD>'] * (34 - len(seq))
    print('填充后序列:', seq)
    
    # 检查所有token是否在词表中
    flag = True
    for x in seq:
        if x not in source_token_dict:
            flag = False
            break
            
    # 转换为词表索引
    if flag:
        seq = [source_token_dict[x] for x in seq]
        
    return flag, seq

def get_ans(seq):
    """
    使用训练好的模型进行翻译
    1. 使用beam search解码
    2. 将索引转换回单词
    """
    # 使用beam search解码
    decoded = decode(
        model,
        [seq],  # 输入序列
        start_token=target_token_dict['<START>'],  # 开始标记
        end_token=target_token_dict['<END>'],     # 结束标记
        pad_token=target_token_dict['<PAD>'],     # 填充标记
        # top_k=10,                               # beam size
        # temperature=1.0,                        # 采样温度
    )
    
    # 将索引转换为单词并打印结果
    print('翻译结果:', ' '.join(map(lambda x: target_token_dict_inv[x], decoded[0][1:-1])))

# 交互式翻译循环
print('输入x退出')
while True:
    try:
        seq = input('请输入中文: ')
        if seq == 'x':
            break
            
        # 预处理输入
        flag, seq = get_input(seq)
        
        # 如果所有token都在词表中，进行翻译
        if flag:
            get_ans(seq)
        else:
            print('错误: 包含未知词汇')
            
    except KeyboardInterrupt:
        break