# 诗歌生成

# 数据处理

In [1]:
import numpy as np
import tensorflow as tf
import collections
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras import layers, optimizers, datasets

start_token = 'bos'
end_token = 'eos'

def process_dataset(fileName):
    examples = []
    with open(fileName, 'r', encoding='utf-8') as fd:
        for line in fd:
            outs = line.strip().split(':')
            content = ''.join(outs[1:])
            ins = [start_token] + list(content) + [end_token] 
            if len(ins) > 200:
                continue
            examples.append(ins)
            
    counter = collections.Counter()
    for e in examples:
        for w in e:
            counter[w]+=1
    
    sorted_counter = sorted(counter.items(), key=lambda x: -x[1])  # 排序
    words, _ = zip(*sorted_counter)
    words = ('PAD', 'UNK') + words[:len(words)]
    word2id = dict(zip(words, range(len(words))))
    id2word = {word2id[k]:k for k in word2id}
    
    indexed_examples = [[word2id[w] for w in poem]
                        for poem in examples]
    seqlen = [len(e) for e in indexed_examples]
    
    instances = list(zip(indexed_examples, seqlen))
    
    return instances, word2id, id2word

def poem_dataset():
    instances, word2id, id2word = process_dataset('./poems.txt')
    ds = tf.data.Dataset.from_generator(lambda: [ins for ins in instances], 
                                            (tf.int64, tf.int64), 
                                            (tf.TensorShape([None]),tf.TensorShape([])))
    ds = ds.shuffle(buffer_size=10240)
    ds = ds.padded_batch(100, padded_shapes=(tf.TensorShape([None]),tf.TensorShape([])))
    ds = ds.map(lambda x, seqlen: (x[:, :-1], x[:, 1:], seqlen-1))
    return ds, word2id, id2word

# 模型代码， 完成建模代码

In [2]:
class myRNNModel(keras.Model):
    def __init__(self, w2id):
        super(myRNNModel, self).__init__()
        self.v_sz = len(w2id)
        self.embed_layer = tf.keras.layers.Embedding(self.v_sz, 64, 
                                                    batch_input_shape=[None, None])
        
        self.rnncell = tf.keras.layers.SimpleRNNCell(128)
        self.rnn_layer = tf.keras.layers.RNN(self.rnncell, return_sequences=True)
        self.dense = tf.keras.layers.Dense(self.v_sz)
        
    @tf.function
    def call(self, inp_ids):
        '''
        此处完成建模过程，可以参考Learn2Carry
        '''
        x = self.embed_layer(inp_ids)
        rnn_out = self.rnn_layer(x)
        logits = self.dense(rnn_out)
        return logits
    
    @tf.function
    def get_next_token(self, x, state):
        '''
        shape(x) = [b_sz,] 
        '''
    
        inp_emb = self.embed_layer(x) #shape(b_sz, emb_sz)
        h, state = self.rnncell.call(inp_emb, state) # shape(b_sz, h_sz)
        logits = self.dense(h) # shape(b_sz, v_sz)
        out = tf.argmax(logits, axis=-1)
        return out, state

## 一个计算sequence loss的辅助函数，只需了解用途。

In [3]:
def mkMask(input_tensor, maxLen):
    shape_of_input = tf.shape(input_tensor)
    shape_of_output = tf.concat(axis=0, values=[shape_of_input, [maxLen]])

    oneDtensor = tf.reshape(input_tensor, shape=(-1,))
    flat_mask = tf.sequence_mask(oneDtensor, maxlen=maxLen)
    return tf.reshape(flat_mask, shape_of_output)


def reduce_avg(reduce_target, lengths, dim):
    """
    Args:
        reduce_target : shape(d_0, d_1,..,d_dim, .., d_k)
        lengths : shape(d0, .., d_(dim-1))
        dim : which dimension to average, should be a python number
    """
    shape_of_lengths = lengths.get_shape()
    shape_of_target = reduce_target.get_shape()
    if len(shape_of_lengths) != dim:
        raise ValueError(('Second input tensor should be rank %d, ' +
                         'while it got rank %d') % (dim, len(shape_of_lengths)))
    if len(shape_of_target) < dim+1 :
        raise ValueError(('First input tensor should be at least rank %d, ' +
                         'while it got rank %d') % (dim+1, len(shape_of_target)))

    rank_diff = len(shape_of_target) - len(shape_of_lengths) - 1
    mxlen = tf.shape(reduce_target)[dim]
    mask = mkMask(lengths, mxlen)
    if rank_diff!=0:
        len_shape = tf.concat(axis=0, values=[tf.shape(lengths), [1]*rank_diff])
        mask_shape = tf.concat(axis=0, values=[tf.shape(mask), [1]*rank_diff])
    else:
        len_shape = tf.shape(lengths)
        mask_shape = tf.shape(mask)
    lengths_reshape = tf.reshape(lengths, shape=len_shape)
    mask = tf.reshape(mask, shape=mask_shape)

    mask_target = reduce_target * tf.cast(mask, dtype=reduce_target.dtype)

    red_sum = tf.reduce_sum(mask_target, axis=[dim], keepdims=False)
    red_avg = red_sum / (tf.cast(lengths_reshape, dtype=tf.float32) + 1e-30)
    return red_avg

# 定义loss函数，定义训练函数

In [4]:
@tf.function
def compute_loss(logits, labels, seqlen):
    losses = tf.nn.sparse_softmax_cross_entropy_with_logits(
            logits=logits, labels=labels)
    losses = reduce_avg(losses, seqlen, dim=1)
    return tf.reduce_mean(losses)

@tf.function
def train_one_step(model, optimizer, x, y, seqlen):
    '''
    完成一步优化过程，可以参考之前做过的模型
    '''
    with tf.GradientTape() as tape:
        logits = model(x)
        loss = compute_loss(logits, y, seqlen)

    # compute gradient
    grads = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(grads, model.trainable_variables))
    return loss

def train(epoch, model, optimizer, ds):
    loss = 0.0
    accuracy = 0.0
    for step, (x, y, seqlen) in enumerate(ds):
        loss = train_one_step(model, optimizer, x, y, seqlen)

        if step % 500 == 0:
            print('epoch', epoch, ': loss', loss.numpy())

    return loss

# 训练优化过程

In [5]:
optimizer = optimizers.Adam(0.0005)
train_ds, word2id, id2word = poem_dataset()
model = myRNNModel(word2id)

for epoch in range(10):
    loss = train(epoch, model, optimizer, train_ds)

epoch 0 : loss 8.82041
epoch 1 : loss 6.530802
epoch 2 : loss 6.179639
epoch 3 : loss 5.9114075
epoch 4 : loss 5.7746215
epoch 5 : loss 5.475292
epoch 6 : loss 5.539743
epoch 7 : loss 5.381737
epoch 8 : loss 5.326028
epoch 9 : loss 5.2763305


# 生成过程

In [6]:
def gen_sentence():
    state = [tf.random.normal(shape=(1, 128), stddev=0.5), tf.random.normal(shape=(1, 128), stddev=0.5)]
    cur_token = tf.constant([word2id['bos']], dtype=tf.int32)
    collect = []
    for _ in range(50):
        cur_token, state = model.get_next_token(cur_token, state)
        collect.append(cur_token.numpy()[0])
    return [id2word[t] for t in collect]
print(''.join(gen_sentence()))

此人不得得，不知何处是何人。eos来不得无人事，不得人间不可知。eos道不知何处处，一枝犹是一枝声。eos来不得


In [31]:
def generate_poem_5x4(begin_word, model, word2id, id2word):
    """
    生成5言绝句（5字×4行），并以指定词汇开头
    
    参数:
        begin_word (str): 开头词汇（如"日"、"红"等）
        model: 诗歌生成模型（需实现get_next_token方法）
        word2id (dict): 词汇到ID的映射
        id2word (dict): ID到词汇的映射
    
    返回:
        str: 格式化的5×4诗歌（每行5字，共4行）
    """
    if begin_word not in word2id:
        raise ValueError(f"开头词 '{begin_word}' 不在词典中")

    # 初始化状态和起始token
    state = [tf.random.normal(shape=(1, 128)), tf.random.normal(shape=(1, 128))]
    cur_token = tf.constant([word2id[begin_word]], dtype=tf.int32)
    poem_ids = [word2id[begin_word]]  # 记录所有生成的token（包含开头词）

    # 生成20个字（5字×4行），遇到结束符提前终止
    while len(poem_ids) < 20:
        cur_token, state = model.get_next_token(cur_token, state)
        token_id = cur_token.numpy()[0]
        if id2word[token_id] in ["eos", "<END>"]:  # 结束符检查
            break
        poem_ids.append(token_id)

    # 转换为文字并格式化为5×4
    poem_chars = [id2word[t] for t in poem_ids[:20]]  # 确保最多20字
    poem_lines = [
        ''.join(poem_chars[i*5 : (i+1)*5])  # 每行5字
        for i in range(4)
    ]
    return '\n'.join(poem_lines)  # 用换行符连接4行

# 示例用法
begin_words = ["日", "红", "山", "夜", "湖", "海", "月"]
for word in begin_words:
    print(f"以'{word}'开头的诗歌：")
    poem = generate_poem_5x4(word, model, word2id, id2word)
    print(poem)
    print()

以'日'开头的诗歌：
日何处在，
风雨落花。



以'红'开头的诗歌：
红嵘。_蓉
彧蓉釦蓉昈
bos箘蓉濆蓉
昈蓉昈蓉滦

以'山'开头的诗歌：
山畔滨，恐
蓉滨蓉疠蓉
洲赗蓉滦蓉
屿屿滦蓉壒

以'夜'开头的诗歌：
夜夕。




以'湖'开头的诗歌：
湖日暮水中
。



以'海'开头的诗歌：
海上人。




以'月'开头的诗歌：
月中，不得
无人。





In [37]:
import re

def is_chinese_char(char):
    """检查字符是否为汉字"""
    return '\u4e00' <= char <= '\u9fff'

def format_poem(poem_lines):
    """修正标点，使其仅出现在行末"""
    formatted_lines = []
    
    for i, line in enumerate(poem_lines):
        # 移除所有非汉字字符（确保之前已经处理过）
        line = ''.join(filter(is_chinese_char, line))
        
        # 重新调整标点：保证句尾有适当标点
        if i % 2 == 0:  # 1、3句为逗号
            line += '，'
        else:  # 2、4句为句号
            line += '。'

        formatted_lines.append(line)
    
    return formatted_lines

def gen_poem(begin_word, max_lines=4, line_length=5):
    """生成以指定汉字开头的完整唐诗，并保证换行格式"""

    # 检查起始字是否为汉字
    if not is_chinese_char(begin_word):
        begin_word = '春'  # 默认值
    
    cur_token = tf.constant([word2id.get(begin_word, word2id['bos'])], dtype=tf.int32)
    state = model.rnncell.get_initial_state(batch_size=1, dtype=tf.float32)

    poem = []
    line_tokens = []

    while len(poem) < max_lines:
        cur_token, state = model.get_next_token(cur_token, state)
        token_id = cur_token.numpy()[0]

        if token_id == word2id['eos']:  # 跳过无意义终止符
            continue
            
        char = id2word.get(token_id, '')
        
        # 只保留汉字字符
        if is_chinese_char(char):
            line_tokens.append(char)

        # 如果一行达到了指定长度且不超过最大行数，添加到诗歌中
        if len(line_tokens) == line_length:
            poem.append(''.join(line_tokens))
            line_tokens = []

        # 如果达到了最大行数，结束
        if len(poem) == max_lines:
            break

    # 确保以指定的字开头
    if poem:
        poem[0] = begin_word + poem[0][1:] if len(poem[0]) > 1 else begin_word

    return '\n'.join(format_poem(poem))  # 修正标点

# 测试生成
begin_words = ["日", "红", "山", "夜", "湖", "海", "月"]

for word in begin_words:
    print(f"以'{word}'开头的诗歌：")
    print(gen_poem(word, max_lines=4))
    print()

以'日'开头的诗歌：
日云声向东，
风不可得一。
枝犹是一枝，
声来不得无。

以'红'开头的诗歌：
红递浯蓉阳，
厂蓉纟蓉滦。
蓉疠蓉滦蓉，
壒蓉屿滦蓉。

以'山'开头的诗歌：
山上山风客，
不知何处处。
一枝犹是一，
枝声来不得。

以'夜'开头的诗歌：
夜风吹落水，
深来无处处。
不得不知君，
客无人事何。

以'湖'开头的诗歌：
湖水中春色，
风风不见春。
风月不知何，
处处一枝犹。

以'海'开头的诗歌：
海山上风吹，
落月深风吹。
落月风雨满，
云风客无人。

以'月'开头的诗歌：
月花满风吹，
一月深云不。
可见山上不，
相逢客无人。

