# 诗歌生成

# 数据处理

In [41]:
import numpy as np  # 导入numpy库，用于数值计算
import tensorflow as tf  # 导入TensorFlow库，用于深度学习
import collections  # 导入collections模块，用于计数等操作
from tensorflow import keras  # 导入TensorFlow的Keras模块，用于构建神经网络
from tensorflow.keras import layers  # 导入Keras的layers模块，用于构建网络层
from tensorflow.keras import layers, optimizers, datasets  # 导入Keras的优化器和数据集模块

start_token = 'bos'  # 定义开始标记，用于标识文本的开始
end_token = 'eos'  # 定义结束标记，用于标识文本的结束

def process_dataset(fileName):  # 定义处理数据集的函数
    examples = []  # 初始化一个空列表，用于存储处理后的文本样本
    with open(fileName, 'r', encoding='utf-8') as fd:  # 打开指定文件进行读取
        for line in fd:  # 遍历文件的每一行
            outs = line.strip().split(':')  # 去掉行首尾的空白字符，并按冒号分割
            content = ''.join(outs[1:])  # 将分割后的第二部分及以后的内容拼接起来，忽略第一部分
            ins = [start_token] + list(content) + [end_token]  # 在内容前后分别添加开始标记和结束标记
            if len(ins) > 200:  # 如果文本长度超过200，则跳过该样本
                continue
            examples.append(ins)  # 将处理后的样本添加到列表中
            
    counter = collections.Counter()  # 创建一个计数器
    for e in examples:  # 遍历每个样本
        for w in e:  # 遍历样本中的每个字符
            counter[w]+=1  # 对字符进行计数
    
    sorted_counter = sorted(counter.items(), key=lambda x: -x[1])  # 按字符出现频率降序排序
    words, _ = zip(*sorted_counter)  # 提取排序后的字符
    words = ('PAD', 'UNK') + words[:len(words)]  # 在字符列表前添加PAD和UNK标记
    word2id = dict(zip(words, range(len(words))))  # 创建字符到索引的映射字典
    id2word = {word2id[k]:k for k in word2id}  # 创建索引到字符的映射字典
    
    indexed_examples = [[word2id[w] for w in poem]  # 将样本中的字符转换为索引
                        for poem in examples]
    seqlen = [len(e) for e in indexed_examples]  # 计算每个样本的长度
    
    instances = list(zip(indexed_examples, seqlen))  # 将样本索引和长度打包成实例列表
    
    return instances, word2id, id2word  # 返回处理后的实例、字符到索引的映射和索引到字符的映射

def poem_dataset():  # 定义生成诗歌数据集的函数
    instances, word2id, id2word = process_dataset('chap6_RNN\poems.txt')  # 调用process_dataset函数处理诗歌文件
    ds = tf.data.Dataset.from_generator(lambda: [ins for ins in instances],  # 使用tf.data.Dataset.from_generator创建数据集
                                            (tf.int64, tf.int64),  # 指定数据类型为int64
                                            (tf.TensorShape([None]),tf.TensorShape([])))  # 指定数据形状
    ds = ds.shuffle(buffer_size=10240)  # 打乱数据集，缓冲区大小为10240
    ds = ds.padded_batch(100, padded_shapes=(tf.TensorShape([None]),tf.TensorShape([])))  # 对数据集进行填充并分批，每批100个样本
    ds = ds.map(lambda x, seqlen: (x[:, :-1], x[:, 1:], seqlen-1))  # 将输入序列和目标序列错开一位，并将序列长度减1
    return ds, word2id, id2word  # 返回数据集、字符到索引的映射和索引到字符的映射

# 模型代码， 完成建模代码

In [48]:
class myRNNModel(keras.Model):  # 定义一个继承自keras.Model的自定义RNN模型类
    def __init__(self, w2id):  # 构造函数，接收词到索引的映射字典w2id
        super(myRNNModel, self).__init__()  # 调用父类的构造函数
        self.v_sz = len(w2id)  # 获取词汇表大小，即词到索引映射字典的长度
        self.embed_layer = tf.keras.layers.Embedding(self.v_sz, 64,  # 定义嵌入层，将输入的索引映射到64维的向量空间
                                                    batch_input_shape=[None, None])  # 指定输入形状为动态大小的二维张量
        
        self.rnncell = tf.keras.layers.SimpleRNNCell(128)  # 定义一个简单的RNN单元，隐藏层大小为128
        self.rnn_layer = tf.keras.layers.RNN(self.rnncell, return_sequences=True)  # 定义RNN层，返回整个序列的输出
        self.dense = tf.keras.layers.Dense(self.v_sz)  # 定义全连接层，输出维度为词汇表大小，用于生成最终的预测结果
        
    @tf.function  # 使用tf.function装饰器，将call方法转换为TensorFlow图函数，提高执行效率
    def call(self, inp_ids):  # 定义模型的前向传播逻辑
        '''
        此处完成建模过程，可以参考Learn2Carry
        '''
        # 答：
        # 将输入的索引通过嵌入层，得到嵌入表示
        embedded = self.embed_layer(inp_ids)  # 嵌入层的输出形状为 [batch_size, seq_length, embedding_dim]
        
        # 将嵌入表示通过RNN层，得到RNN的输出
        rnn_output = self.rnn_layer(embedded)  # RNN层的输出形状为 [batch_size, seq_length, rnn_hidden_size]
        
        # 将RNN的输出通过全连接层，得到最终的预测结果
        logits = self.dense(rnn_output)  # 全连接层的输出形状为 [batch_size, seq_length, vocab_size]


        return logits  # 返回模型的输出（logits）
    
    # @tf.function  # 使用tf.function装饰器，将get_next_token方法转换为TensorFlow图函数
    # def get_next_token(self, x, state):  # 定义获取下一个标记的方法
    #     '''
    #     shape(x) = [b_sz,]  # 输入x的形状为[批次大小,]
    #     '''
    
    #     inp_emb = self.embed_layer(x)  # 将输入x通过嵌入层，得到嵌入表示，形状为[批次大小, 嵌入维度]
    #     h, state = self.rnncell.call(inp_emb, state)  # 将嵌入表示和状态输入到RNN单元，得到隐藏状态h和新的状态
    #     logits = self.dense(h)  # 将隐藏状态h通过全连接层，得到logits
    #     out = tf.argmax(logits, axis=-1)  # 使用tf.argmax获取logits中概率最大的索引作为输出
    #     return out, state  # 返回下一个标记和新的状态

    @tf.function
    def get_next_token(self, x, state, temperature=1.0, top_k=5):
        '''
        x: shape [batch_size,] -> 当前 token
        state: RNN 状态
        temperature: 控制随机性（低温趋向贪心，高温趋向随机）
        top_k: 只在前 k 个最有可能的 token 里采样
        '''
        
        inp_emb = self.embed_layer(x)  # 词嵌入层
        h, state = self.rnncell.call(inp_emb, state)  # RNN 计算新状态
        logits = self.dense(h)  # 计算 logits
        
        # 1. 通过 temperature 调整 logits
        logits = logits / temperature
        
        # 2. 仅保留 top_k 最高概率的 token 进行采样
        if top_k > 1:
            top_k_values, top_k_indices = tf.math.top_k(logits, k=top_k)
            top_k_logits = tf.nn.softmax(top_k_values)
            out = tf.random.categorical(tf.math.log(top_k_logits), num_samples=1)
            out = tf.gather(top_k_indices, out, batch_dims=1)  # 选择对应索引
        else:
            # 如果 top_k=1，则直接使用 argmax
            out = tf.argmax(logits, axis=-1)
    
        return tf.squeeze(out, axis=-1), state  # 返回 token 索引和新状态

## 一个计算sequence loss的辅助函数，只需了解用途。

In [49]:
def mkMask(input_tensor, maxLen):  # 定义一个函数，用于生成掩码
    shape_of_input = tf.shape(input_tensor)  # 获取输入张量的形状
    shape_of_output = tf.concat(axis=0, values=[shape_of_input, [maxLen]])  # 将输入形状与最大长度拼接，形成输出形状

    oneDtensor = tf.reshape(input_tensor, shape=(-1,))  # 将输入张量展平为一维张量
    flat_mask = tf.sequence_mask(oneDtensor, maxlen=maxLen)  # 生成掩码，掩码的长度由输入张量的值决定，最大长度为maxLen
    return tf.reshape(flat_mask, shape_of_output)  # 将掩码重塑为与输入张量相同的形状，但增加了一个最大长度的维度


def reduce_avg(reduce_target, lengths, dim):  # 定义一个函数，用于对张量进行加权平均
    """
    Args:
        reduce_target : shape(d_0, d_1,..,d_dim, .., d_k)  # 需要进行平均的张量
        lengths : shape(d0, .., d_(dim-1))  # 每个序列的实际长度
        dim : which dimension to average, should be a python number  # 指定在哪一维上进行平均
    """
    shape_of_lengths = lengths.get_shape()  # 获取lengths的静态形状
    shape_of_target = reduce_target.get_shape()  # 获取reduce_target的静态形状
    if len(shape_of_lengths) != dim:  # 检查lengths的维度是否等于dim
        raise ValueError(('Second input tensor should be rank %d, ' +  # 如果不满足条件，抛出错误
                         'while it got rank %d') % (dim, len(shape_of_lengths)))
    if len(shape_of_target) < dim+1 :  # 检查reduce_target的维度是否至少为dim+1
        raise ValueError(('First input tensor should be at least rank %d, ' +  # 如果不满足条件，抛出错误
                         'while it got rank %d') % (dim+1, len(shape_of_target)))

    rank_diff = len(shape_of_target) - len(shape_of_lengths) - 1  # 计算reduce_target和lengths的维度差
    mxlen = tf.shape(reduce_target)[dim]  # 获取reduce_target在指定维度上的最大长度
    mask = mkMask(lengths, mxlen)  # 调用mkMask函数生成掩码
    if rank_diff!=0:  # 如果reduce_target和lengths的维度差不为0
        len_shape = tf.concat(axis=0, values=[tf.shape(lengths), [1]*rank_diff])  # 为lengths的形状补充维度
        mask_shape = tf.concat(axis=0, values=[tf.shape(mask), [1]*rank_diff])  # 为掩码的形状补充维度
    else:
        len_shape = tf.shape(lengths)  # 如果维度差为0，直接使用lengths的形状
        mask_shape = tf.shape(mask)  # 如果维度差为0，直接使用掩码的形状
    lengths_reshape = tf.reshape(lengths, shape=len_shape)  # 将lengths重塑为新的形状
    mask = tf.reshape(mask, shape=mask_shape)  # 将掩码重塑为新的形状

    mask_target = reduce_target * tf.cast(mask, dtype=reduce_target.dtype)  # 将掩码应用到reduce_target上，忽略掩码为False的部分
    red_sum = tf.reduce_sum(mask_target, axis=[dim], keepdims=False)  # 在指定维度上对掩码后的张量求和
    red_avg = red_sum / (tf.cast(lengths_reshape, dtype=tf.float32) + 1e-30)  # 计算加权平均值，避免除以0
    return red_avg  # 返回加权平均值

# 定义loss函数，定义训练函数

In [50]:
@tf.function  # 使用tf.function装饰器，将compute_loss函数转换为TensorFlow图函数，提高执行效率
def compute_loss(logits, labels, seqlen):  # 定义计算损失的函数
    losses = tf.nn.sparse_softmax_cross_entropy_with_logits(  # 计算每个样本的交叉熵损失
            logits=logits, labels=labels)  # logits是模型的输出，labels是真实标签
    losses = reduce_avg(losses, seqlen, dim=1)  # 使用reduce_avg函数对损失进行加权平均，dim=1表示按序列长度对损失进行平均
    return tf.reduce_mean(losses)  # 返回损失的均值

@tf.function  # 使用tf.function装饰器，将train_one_step函数转换为TensorFlow图函数
def train_one_step(model, optimizer, x, y, seqlen):  # 定义训练一步的函数
    '''
    完成一步优化过程，可以参考之前做过的模型
    '''
    # 答：
    with tf.GradientTape() as tape:  # 创建梯度记录上下文
        logits = model(x, training=True)  # 前向传播，计算模型的输出
        loss = compute_loss(logits, y, seqlen)  # 计算损失
    gradients = tape.gradient(loss, model.trainable_variables)  # 计算损失关于模型可训练变量的梯度
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))  # 使用优化器更新模型参数
    
    return loss  # 返回这一步的损失值（需要在函数内部实现具体的训练逻辑）

def train(epoch, model, optimizer, ds):  # 定义训练函数
    loss = 0.0  # 初始化损失值
    accuracy = 0.0  # 初始化准确率（虽然代码中没有用到）
    for step, (x, y, seqlen) in enumerate(ds):  # 遍历数据集ds，x是输入，y是标签，seqlen是序列长度
        loss = train_one_step(model, optimizer, x, y, seqlen)  # 调用train_one_step函数进行一步训练

        if step % 500 == 0:  # 每500步打印一次损失值
            print('epoch', epoch, ': loss', loss.numpy())  # 打印当前epoch和损失值

    return loss  # 返回最终的损失值

# 训练优化过程

In [51]:
optimizer = optimizers.Adam(0.0005)  # 创建Adam优化器，学习率为0.0005，用于模型的参数更新

train_ds, word2id, id2word = poem_dataset()  # 调用poem_dataset函数，获取诗歌数据集、词到索引的映射字典和索引到词的映射字典

model = myRNNModel(word2id)  # 创建自定义的RNN模型实例，传入词到索引的映射字典

for epoch in range(10):  # 进行10个训练周期（epoch）
    loss = train(epoch, model, optimizer, train_ds)  # 调用train函数进行一个周期的训练，传入当前周期数、模型、优化器和数据集

epoch 0 : loss 8.820603
epoch 1 : loss 6.597644
epoch 2 : loss 6.21198
epoch 3 : loss 5.83355
epoch 4 : loss 5.712871
epoch 5 : loss 5.4954658
epoch 6 : loss 5.535479
epoch 7 : loss 5.318954
epoch 8 : loss 5.3391547
epoch 9 : loss 5.252771


# 生成过程

In [58]:
def gen_sentence():  # 定义生成句子的函数
    state = [tf.random.normal(shape=(1, 128), stddev=0.5), tf.random.normal(shape=(1, 128), stddev=0.5)]  # 初始化RNN单元的状态，随机生成两个形状为(1, 128)的张量作为初始状态
    cur_token = tf.constant([word2id['bos']], dtype=tf.int32)  # 将起始标记（'bos'）的索引转换为张量，作为当前时间步的输入
    collect = []  # 初始化一个空列表，用于收集生成的标记
    for _ in range(50):  # 循环生成最多50个标记
        cur_token, state = model.get_next_token(cur_token, state)  # 调用模型的get_next_token方法，预测下一个标记和更新状态
        collect.append(cur_token.numpy()[0])  # 将预测的标记索引添加到列表中
    return [id2word[t] for t in collect]  # 将标记索引转换为对应的单词，并返回生成的句子

print(''.join(gen_sentence()))  # 调用gen_sentence函数生成句子，并打印结果

不得无人事自同。eos子相来，相过春风。eos说不知何处去，不须无处在君人。eos人不是长相在，何处人间更是君。


In [59]:
def gen_sentence(begin_word, top_k=5, temperature=0.7, max_length=50):
    # 确保起始词在词汇表中，否则使用 'bos'
    cur_token = tf.constant([word2id.get(begin_word, word2id['bos'])], dtype=tf.int32)
    
    # 初始化 RNN 单元的状态
    state = model.rnncell.get_initial_state(batch_size=1, dtype=tf.float32)
    
    # 使用 TensorArray 代替 Python 列表存储生成的标记
    collect = tf.TensorArray(dtype=tf.int32, size=max_length)
    collect = collect.write(0, cur_token[0])
    
    # 生成 Token
    for i in range(1, max_length):
        cur_token, state = model.get_next_token(cur_token, state, top_k=top_k, temperature=temperature)
        
        # 终止条件：遇到 EOS 结束
        if cur_token.numpy()[0] == word2id['eos']:
            break
        
        collect = collect.write(i, cur_token[0])
    
    # 转换成单词并返回
    token_ids = collect.stack().numpy()
    return ''.join(id2word[t] for t in token_ids)

# 定义起始词汇列表
begin_words = ["日", "红", "山", "夜", "湖", "海", "月"]

# 生成并打印诗歌
for word in begin_words:
    print(f"以'{word}'开头的诗歌：")
    poem = gen_sentence(word)
    print(poem)
    print()

以'日'开头的诗歌：
日月中花里，江上月深。PADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPAD

以'红'开头的诗歌：
红蕖上。PADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPAD

以'山'开头的诗歌：
山下一年人。PADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPAD

以'夜'开头的诗歌：
夜落花声断，一片水头。PADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPAD

以'湖'开头的诗歌：
湖上路。PADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPAD

以'海'开头的诗歌：
海阳前别，何人去，何人一处，一声不知何。PADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPAD

以'月'开头的诗歌：
月上山。PADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPADPA

In [61]:
import re
import random

def format_poem(poem_lines):
    """修正标点，使其仅出现在行末"""
    formatted_lines = []
    
    for i, line in enumerate(poem_lines):
        punctuation = re.findall(r'[，。]', line)
        line = re.sub(r'[，。]', '', line)  # 移除原标点

        # 重新调整标点：保证句尾有适当标点
        if i % 2 == 0:  # 1、3句为逗号
            if punctuation:
                line += '，'
            else:
                line += '，' if random.random() > 0.5 else '。'  # 随机补充句号或逗号
        else:  # 2、4句为句号
            if punctuation:
                line += '。'
            else:
                line += '。' if random.random() > 0.5 else '，'

        formatted_lines.append(line)
    
    return formatted_lines

def gen_poem(begin_word, max_lines=4, line_length=7, top_k=5, temperature=0.7):
    """生成以指定汉字开头的完整唐诗，并保证换行格式"""

    cur_token = tf.constant([word2id.get(begin_word, word2id['bos'])], dtype=tf.int32)
    state = model.rnncell.get_initial_state(batch_size=1, dtype=tf.float32)

    poem = []
    line_tokens = []

    while len(poem) < max_lines:
        cur_token, state = model.get_next_token(cur_token, state, top_k=top_k, temperature=temperature)
        token_id = cur_token.numpy()[0]

        if token_id == word2id['eos']:  # 跳过无意义终止符
            continue

        line_tokens.append(token_id)

        # 如果一行达到了指定长度且不超过最大行数，添加到诗歌中
        if len(line_tokens) == line_length:
            poem.append(''.join(id2word[t] for t in line_tokens))
            line_tokens = []

        # 如果达到了最大行数，结束
        if len(poem) == max_lines:
            break

    # 确保以指定的字开头
    poem[0] = begin_word + poem[0][1:]

    return '\n'.join(format_poem(poem))  # 修正标点

# 测试生成
begin_words = ["日", "红", "山", "夜", "湖", "海", "月"]

for word in begin_words:
    print(f"以'{word}'开头的诗歌：")
    print(gen_poem(word, max_lines=4))
    print()

以'日'开头的诗歌：
日前年不知不，
得不道不能不。
可忘今日长安，
不得人有一年。

以'红'开头的诗歌：
红畔上烟来无，
处一枝为此时。
君不可得不，
觉在人间得来。

以'山'开头的诗歌：
山上东风道多，
难在何曾有此。
情人无限处，
无事有君家有。

以'夜'开头的诗歌：
夜云花落有长，
归相思人中。
人不见不是，
何所）有一千人。

以'湖'开头的诗歌：
湖水边春不知，
此人不如何道。
有人未见无情，
不知相见。

以'海'开头的诗歌：
海一年多事不，
知来来未得长。
无事不得无人，
是此来道已知。

以'月'开头的诗歌：
月年不得人君，
不得不得不知。
有三十万年，
不得无时不可。

