In [None]:
import numpy as np

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

print("TensorFlow 版本:", tf.__version__)


In [None]:
# 每个元素是一对 (英文, 中文)。中文前后会在后续加上开始/结束标记。

pairs = [
    ("hi.", "嗨。"),
    ("hello.", "你好。"),
    ("how are you?", "你好吗？"),
    ("i am fine.", "我很好。"),
    ("thank you.", "谢谢你。"),
    ("good morning.", "早上好。"),
    ("good night.", "晚安。"),
    ("see you tomorrow.", "明天见。"),
    ("i love deep learning.", "我喜欢深度学习。"),
    ("this is a cat.", "这是一只猫。"),
    ("this is a dog.", "这是一只狗。"),
    ("what is your name?", "你叫什么名字？"),
    ("my name is tom.", "我的名字叫汤姆。"),
    ("where do you live?", "你住在哪里？"),
    ("i live in shanghai.", "我住在上海。"),
    ("do you like coffee?", "你喜欢咖啡吗？"),
    ("i like tea.", "我喜欢茶。"),
    ("it is raining today.", "今天在下雨。"),
    ("it is very hot.", "今天很热。"),
    ("i am a student.", "我是学生。"),
]

input_texts = []
target_texts = []

for eng, zh in pairs:
    input_texts.append(eng.lower())
    # 为目标句子加上起始符 \t 和终止符 \n，方便解码
    target_texts.append("\t" + zh + "\n")

num_samples = len(input_texts)
print("样本数:", num_samples)
print("示例输入/输出:")
for i in range(3):
    print(f"{i+1}. '{input_texts[i]}' -> '{target_texts[i]}'")


In [None]:
# 统计所有出现过的英文字符和中文字符（包括标点、空格等）

input_characters = set()
target_characters = set()

for input_text, target_text in zip(input_texts, target_texts):
    for ch in input_text:
        input_characters.add(ch)
    for ch in target_text:
        target_characters.add(ch)

input_characters = sorted(list(input_characters))
target_characters = sorted(list(target_characters))

num_encoder_tokens = len(input_characters)
num_decoder_tokens = len(target_characters)

max_encoder_seq_length = max(len(txt) for txt in input_texts)
max_decoder_seq_length = max(len(txt) for txt in target_texts)

print("输入字符个数:", num_encoder_tokens)
print("输出字符个数:", num_decoder_tokens)
print("最大输入句长:", max_encoder_seq_length)
print("最大输出句长:", max_decoder_seq_length)

# 建立 字符 -> 索引 的映射表
input_token_index = {char: i for i, char in enumerate(input_characters)}
target_token_index = {char: i for i, char in enumerate(target_characters)}


In [None]:
encoder_input_data = np.zeros(
    (num_samples, max_encoder_seq_length, num_encoder_tokens), dtype="float32"
)
decoder_input_data = np.zeros(
    (num_samples, max_decoder_seq_length, num_decoder_tokens), dtype="float32"
)
decoder_target_data = np.zeros(
    (num_samples, max_decoder_seq_length, num_decoder_tokens), dtype="float32"
)

for i, (input_text, target_text) in enumerate(zip(input_texts, target_texts)):
    # 编码输入句子
    for t, char in enumerate(input_text):
        encoder_input_data[i, t, input_token_index[char]] = 1.0
    # 编码解码器输入和目标
    for t, char in enumerate(target_text):
        decoder_input_data[i, t, target_token_index[char]] = 1.0
        if t > 0:
            # 目标序列是输入序列右移一位
            decoder_target_data[i, t - 1, target_token_index[char]] = 1.0

print("encoder_input_data:", encoder_input_data.shape)
print("decoder_input_data:", decoder_input_data.shape)
print("decoder_target_data:", decoder_target_data.shape)


In [None]:
latent_dim = 256  # LSTM 隐状态维度

# Encoder
encoder_inputs = keras.Input(shape=(None, num_encoder_tokens), name="encoder_inputs")
encoder_lstm = layers.LSTM(latent_dim, return_state=True, name="encoder_lstm")
encoder_outputs, state_h, state_c = encoder_lstm(encoder_inputs)
encoder_states = [state_h, state_c]

# Decoder
decoder_inputs = keras.Input(shape=(None, num_decoder_tokens), name="decoder_inputs")
decoder_lstm = layers.LSTM(
    latent_dim, return_sequences=True, return_state=True, name="decoder_lstm"
)
decoder_outputs, _, _ = decoder_lstm(decoder_inputs, initial_state=encoder_states)
decoder_dense = layers.Dense(num_decoder_tokens, activation="softmax", name="decoder_dense")
decoder_outputs = decoder_dense(decoder_outputs)

# 训练用模型：输入 [encoder_inputs, decoder_inputs]，输出 decoder_outputs
model = keras.Model([encoder_inputs, decoder_inputs], decoder_outputs)

model.compile(optimizer="rmsprop", loss="categorical_crossentropy", metrics=["accuracy"])
model.summary()


In [None]:
batch_size = 64
epochs = 100 

history = model.fit(
    [encoder_input_data, decoder_input_data],
    decoder_target_data,
    batch_size=batch_size,
    epochs=epochs,
    validation_split=0.2,
)


In [None]:
encoder_model = keras.Model(encoder_inputs, encoder_states)

decoder_state_input_h = keras.Input(shape=(latent_dim,), name="input_h")
decoder_state_input_c = keras.Input(shape=(latent_dim,), name="input_c")
decoder_states_inputs = [decoder_state_input_h, decoder_state_input_c]

decoder_outputs_inf, state_h_inf, state_c_inf = decoder_lstm(
    decoder_inputs, initial_state=decoder_states_inputs
)
decoder_states_inf = [state_h_inf, state_c_inf]
decoder_outputs_inf = decoder_dense(decoder_outputs_inf)

decoder_model = keras.Model(
    [decoder_inputs] + decoder_states_inputs,
    [decoder_outputs_inf] + decoder_states_inf,
)

reverse_input_char_index = {i: char for char, i in input_token_index.items()}
reverse_target_char_index = {i: char for char, i in target_token_index.items()}


In [None]:
def decode_sequence(input_seq):
    """给定一个 encoder 输入序列（形状 1×T×num_encoder_tokens），生成对应的中文翻译字符串。"""
    states_value = encoder_model.predict(input_seq)

    target_seq = np.zeros((1, 1, num_decoder_tokens), dtype="float32")
    start_token_index = target_token_index["\t"]
    target_seq[0, 0, start_token_index] = 1.0

    stop_condition = False
    decoded_sentence = ""

    while not stop_condition:
        output_tokens, h, c = decoder_model.predict([target_seq] + states_value)

        # 取当前时间步（最后一个时间步）的概率分布
        sampled_token_index = np.argmax(output_tokens[0, -1, :])
        sampled_char = reverse_target_char_index[sampled_token_index]

        # 如果是终止符，或者长度过长，就停止
        if (sampled_char == "\n") or (len(decoded_sentence) > max_decoder_seq_length):
            stop_condition = True
        else:
            decoded_sentence += sampled_char

        # 更新输入：下一步的输入是当前预测出来的字符
        target_seq = np.zeros((1, 1, num_decoder_tokens), dtype="float32")
        target_seq[0, 0, sampled_token_index] = 1.0

        # 更新状态
        states_value = [h, c]

    return decoded_sentence


In [None]:
for seq_index in range(min(10, num_samples)):
    input_seq = encoder_input_data[seq_index : seq_index + 1]
    decoded_sentence = decode_sequence(input_seq)
    print("-" * 50)
    print("输入英文句子:", input_texts[seq_index])
    print("目标中文翻译:", target_texts[seq_index][1:-1])  # 去掉开始/结束标记
    print("模型生成翻译:", decoded_sentence)


- **(1) Seq2Seq 基本思想**  
  - 使用 Encoder–Decoder 结构，把“可变长输入序列”映射为“可变长输出序列”；  
  - Encoder 逐步读入整句英文，将其编码为固定长度的隐藏状态向量；  
  - Decoder 在给定初始隐藏状态和起始符的条件下，逐步生成目标语言的每一个字符。

- **(2) 训练阶段**  
  - Decoder 每一步不仅接收上一时刻的隐藏状态，还能看到“上一时刻的真实输出字符”；  
  - 这样可以让模型更快收敛，避免一开始就被自己“错误的输出”带偏；  
  - 损失函数为所有时间步 softmax 输出与真实字符 one-hot 的交叉熵之和。

- **(3) 推理阶段**  
  - 没有真实输出可用，只能把“模型上一步预测的字符”再喂回解码器；  
  - 需要单独构建 `encoder_model` 和 `decoder_model`，循环进行解码；  
  - 本 Notebook 使用最简单的贪婪搜索策略（每一步取概率最大的字符），实际可以改为 Beam Search 等。

- **(4) 实际应用扩展**  
  - 如果把字符级 one-hot 换成词级嵌入（Embedding），用更大的 `latent_dim` 和更深的网络，就可以扩展到真实的翻译任务；  
  - 换一换训练数据，也可以把 Seq2Seq 用在：对话生成、摘要生成、代码补全等任务上；
