# 第 12 章 自然语言生成实战

## 12.1 LSTM 写诗

### 12.1.3 实现 LSTM 写诗

In [1]:
import os
import glob
import json
import random
import operator
import collections

from typing import List, Dict

import numpy as np
import pandas as pd
import tensorflow as tf

from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.utils import to_categorical

# 只保留五言绝句
def should_keep(paragraphs: List[str]):
    return all([len(par) == 12 for par in paragraphs])

# 读取数据
def read_all_data(path: str):
    poems = []
    files = glob.glob(os.path.join(path, 'poet.tang.*.json'))
    for file in files:
        file_data = json.load(open(file, 'r'))
        for item in file_data:
            if should_keep(item['paragraphs']):
                poem = ''.join(item['paragraphs'])
                poems.append(poem)
    return poems

poems = read_all_data('data/poetry')
# 为了加快训练，这里只取了 10000 条诗，可以根据自己资源增加或者减少
poems = poems[:10000]
print(poems[:3])

['公子申敬爱，携朋玩物华。人是平阳客，地即石崇家。水文生旧浦，风色满新花。日暮连归骑，长川照晚霞。', '高门引冠盖，下客抱支离。绮席珍羞满，文场翰藻摛。蓂华雕上月，柳色蔼春池。日斜归戚里，连骑勒金羁。', '今夜可怜春，河桥多丽人。宝马金为络，香车玉作轮。连手窥潘掾，分头看洛神。重城自不掩，出向小平津。']


In [2]:
class Processor(object):

    def build_token_dict(self, corpus: List[List[str]]):
        """
        构建 token 字典，这个方法将会遍历分词后的语料，构建一个标记频率字典和标记与索引的映射字典

        Args:
            corpus: 所有分词后的语料
        """
        token2idx = {
            '<PAD>': 0,
            '<UNK>': 1,
            '<BOS>': 2,
            '<EOS>': 3
        }

        token2count = {}
        for sentence in corpus:
            for token in sentence:
                count = token2count.get(token, 0)
                token2count[token] = count + 1
        # 按照词频降序排序
        sorted_token2count = sorted(token2count.items(),
                                    key=operator.itemgetter(1),
                                    reverse=True)
        token2count = collections.OrderedDict(sorted_token2count)

        for token in token2count.keys():
            if token not in token2idx:
                token2idx[token] = len(token2idx)
        return token2idx, token2count

    @staticmethod
    def numerize_sequences(sequence: List[str],
                           token2index: Dict[str, int]) -> List[int]:
        """
        将分词后的标记（token）数组转换成对应的索引数组
        如 ['我', '想', '睡觉'] -> [10, 313, 233]

        Args:
            sequence: 分词后的标记数组
            token2index: 索引词典
        Returns: 输入数据对应的索引数组
        """
        token_result = []
        for token in sequence:
            token_index = token2index.get(token)
            if token_index is None:
                token_index = token2index['<UNK>']
            token_result.append(token_index)
        return token_result

In [3]:
p = Processor()
# 这里我们对所有的诗做了基于字的分词，然后再构建词表
p.token2idx, p.token2count = p.build_token_dict([list(seq) for seq in poems])
# 由于我们这是文本生成，还需要一个索引到词的映射关系
p.idx2token = dict([(v, k) for k,v in p.token2idx.items()])

In [4]:
# 先定义一下两个全局变量，输入序列长度和批次大小
INPUT_LEN = 6
BATCH_SIZE = 500

# 所有的诗整合为一个大字符串，方便后续遍历
corpus = ''.join(poems)

def data_generator():
    t = 0
    while True:
        x_data = []
        y_data = []
        for i in range(BATCH_SIZE):
            # 取出 t 到 t + INPUT_LEN 位置的字符串序列作为输入
            x = corpus[t: t + INPUT_LEN]
            # 取出 t + INPUT_LEN 位置的字符串作为输出
            y = corpus[t + INPUT_LEN]

            # 输入输出转换为数字
            x_data.append(p.numerize_sequences(list(x), p.token2idx))
            y_data.append(p.token2idx[y])

            t += 1
            # 当游标到了最后，从头开始遍历
            if t + 1 >= len(corpus) - INPUT_LEN:
                t = 0

        x_data = np.array(x_data)
        # 将输出序列转换为 one-hot 编码
        y_data = to_categorical(y_data, len(p.token2idx))

        yield x_data, y_data
        
# 初始化数据生成器
# 如果想观察生成器每一步产生的数据，初始化生成器后调用 `next(gen)` 函数观察
gen = data_generator()

In [5]:
L = tf.keras.layers

model = tf.keras.Sequential([
    L.Embedding(input_dim=len(p.token2idx), output_dim=50, input_shape=(6, )),
    L.LSTM(128),
    L.Dropout(0.1),
    L.Dense(len(p.token2idx), activation='softmax')
])

model.compile(optimizer='adam', loss='categorical_crossentropy')
model.summary()


# 每个 epoch 步数等于（整个语料序列长度 - 窗口长度）除以批次大小
steps = (len(corpus) - INPUT_LEN - 1) // BATCH_SIZE
model.fit_generator(gen,
                    steps_per_epoch=steps,
                    epochs=10)

Model: "sequential"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
embedding (Embedding)        (None, 6, 50)             294300    
_________________________________________________________________
lstm (LSTM)                  (None, 128)               91648     
_________________________________________________________________
dropout (Dropout)            (None, 128)               0         
_________________________________________________________________
dense (Dense)                (None, 5886)              759294    
Total params: 1,145,242
Trainable params: 1,145,242
Non-trainable params: 0
_________________________________________________________________
Instructions for updating:
Please use Model.fit, which supports generators.
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<tensorflow.python.keras.callbacks.History at 0x13de92240>

In [6]:
def sample(preds: np.ndarray, temperature: float = 1.0) -> int:
    """
    使用 softmax 温度随机采样
    当 temperature = 1.0 时，模型输出正常
    当 temperature = 0.5 时，模型输出比较open
    当 temperature = 1.5 时，模型输出比较保守

    Args:
        preds: 模型预测结果
        temperature: softmax 温度
    Returns:
        采样结果
    """
    preds = np.asarray(preds).astype('float64')
    exp_preds = np.power(preds, 1. / temperature)
    preds = exp_preds / np.sum(exp_preds)
    pro = np.random.choice(range(len(preds)), 1, p=preds)
    return int(pro.squeeze())

def predict_next_char(input_seq: List[str],
                      temperature: float = 1.0) -> str:
    """
    输入序列，预测下一个字符

    Args:
        input_seq: 输入序列
        temperature: softmax 温度
    Returns:
        下一个字符串
    """
    if len(input_seq) < INPUT_LEN:
        raise ValueError(f'seq length must large than {INPUT_LEN}')

    input_seq = input_seq[-INPUT_LEN:]
    input_tensor = p.numerize_sequences(input_seq, p.token2idx)
    input_tensor = np.array([input_tensor])
    preds = model.predict(input_tensor)[0]
    pred_idx = sample(preds, temperature)
    pred_char = p.idx2token[pred_idx]
    return pred_char

def pred_with_start(input_seq: List[str],
                    temperature: float = 1.0) -> List[str]:
    """
    以给定字符串作为开头写诗

    Args:
        input_seq: 诗开头字符串
        temperature: softmax 温度
    Returns:
        生成的诗歌序列
    """
    result = input_seq
    # 如果长度不足，则随机取一首诗补全
    if len(input_seq) < INPUT_LEN:
        padding_poem = list(random.choice(poems))
    else:
        padding_poem = []

    # 当序列中出现四个 。 或者序列长度超过 100 时候停止
    # 100 这个限制主要是为了避免出现死循环
    should_continue = True
    while should_continue:
        pred_char = predict_next_char(padding_poem + result, temperature)
        result.append(pred_char)
        if result.count('。') == 4 or len(result) > 100:
            should_continue = False
    return result

In [7]:
for temp in [0.3, 0.6, 1.0, 1.2, 1.5]:
    print(f'\nTemperature: {temp}')
    for _ in range(3):
        print(''.join(pred_with_start(['冬', '日'], temp)))


Temperature: 0.3
冬日乘何处，春风落远时。春风吹落日，秋雨带寒风。惆怅南山上，归人独自怜。一年无事意，万里见长安。
冬日长安见，朝风满客游。从君不可见，何必为相思。一生千里里，万里万里中。谁能当夜暮，犹复向云中。
冬日日已长，夜云犹自适。何处归别人，时来不可见。君子不可怜，何人不可忘。何事无所见，此心无所思。

Temperature: 0.6
冬日生岁月，龙花过自流。月晴从月落，山色不归归。独与江浪白，秋来水鸟红。孤舟楚高塞，积露夜烟山。
冬日相送处，日暮东山暮。孤竹静未疏，寒钟雨相引。江日未繁别，云山无任情。尧年西五重，簪悴万里难。
冬日五十年，何事再三扬。身上若无道，我来无所心。从君最不惜，作者我何何。自闻花落水，不及凤凰州。

Temperature: 1.0
冬日临蝉骑，陂川照雪农。气酣明刃管，空质暗珊红。咫里栽名废，旧人饮手募。素功虽百沂，玉富慎犹剖。
冬日忽应领，白云偏可唾。旧家俱用台，百事本元珪。今尚有回好，随天谁处家。静行复有意，大事所相惊。
冬日相无寄，音生始有修。或须人苦卜，息别一泉鸣。时独俱成出，开歌共自归。中云不可见，见赏仍流索。

Temperature: 1.2
冬日云飞雨，荆川白日碑。云华陈月夜，花上犬毖垂。颦若临沙日，放陵满野舟。逢游望何在，非止移更夸。
冬日怜应挂，风阴动无康。早蝉繁石鹊，冬稻接篮笺。隐盎东溪残，沟邻麋夜光。上朝孤掖客，然夏到江边。
冬日剪袍劲，耕毫扫建舆。神黑商川阔，宋泉向老宽。自嫌飞策宴，古合谢繇翔。抱路经朝罢，琴心出客情。

Temperature: 1.5
冬日古来戍，离舟斜寂堂。恐重开骢寿，形滋游哲异。春陌外万物，漾旗仰荥城。憯裘开橤火，于渥凋伥药。
冬日竟相永，滴欢回雒阴。暮寒灯景远，春雨浪鼠豪。暝雪俱即扇，㶉严称凤筝。槭峨楼萼迥，翛恍无人宿。
冬日失知唧，帽与愿要迫。刘怅门学愚，度十剧陀彦。谁苦拍鳃木，野老裁酸兔。闻角涉大处，讴家出山山。


In [8]:
def predict_hide(head_tokens: List[str],
                 temperature: float = 1.0) -> List[str]:
    """
    写藏头诗

    Args:
        head_tokens: 每一句的第一个字组成的数组
        temperature: softmax 温度
    Returns:
        生成的诗歌序列
    """
    padding_poem = list(random.choice(poems))
    result = []

    for i in range(4):
        result.append(head_tokens[i])
        sentence_end = False
        while not sentence_end:
            char = predict_next_char(padding_poem + result, temperature)
            result.append(char)
            if char == '。' or len(result) > 100:
                sentence_end = True
    return result

In [9]:
for temp in [0.3, 0.6, 1.0, 1.2, 1.5]:
    print(f'\nTemperature: {temp}')
    for _ in range(3):
        print(''.join(predict_hide(['机', '器', '学', '习'])))


Temperature: 0.3
机门高草里，兔脉落阳台。器为高名在，思浮此始悲。学之难恋恶，自醉又成衰。习来舟陵上，彻历烟天外。
机占已分酒，深身更似情。器深芳海峻，携手在天前。学迹随节场，归丝对高蠹。习语去乡场，殊人碌轶记。
机岂已相久，回南宁相知。器石于朱墀，寂寞五阳汉。学置六里运，争见一庭幽。习客阳菲路，贻程任此情。

Temperature: 0.6
机眄造道师，手充皆送潘。器财仙出菊，𫌇态桂微铺。学足本将戴，滥哉全者孟。习仁天圣远，及小鼎彝诗。
机劝招台步，文歌发圣荣。器书军偃激，竞败瑶兴白。学季西北建，方令波碧残。习花休镐印，方贺诏清褒。
机绿闾中识，香晴剑上然。器承佐忠大，贻说在冥茫。学就忆时处，逢城无似阑。习手即常制，羞令闲镆崭。

Temperature: 1.0
机兴服灵策，郑义降衰下。器复转联绁，空沾荷酒花。学婴劳暂看，烟壑空远传。习息同止麦，生涯著占之。
机灵重幕浦，愿以胡江柏。器勃宫群籍，龙佳玉压膺。学书徒郁就，锵义可思过。习我刘凤末，遂欲歌山沃。
机调当态远，驰州忘辛歌。器负金粱茎，□辉荆葛峩。学宽刊帝府，更此入皇天。习忠行远札，奄解咏新香。

Temperature: 1.2
机相愁返望，复往别来家。器舅随家府，其官忝一钧。学道耸难盛，弃患动方珍。习宠事王贵，负师奏武兵。
机官起正去，鞍马心乘绝。器金闲布藻，尺马劳焙蠹。学名大肠为，怅何始与昔。习之徒逸伦，酺贱尚可泯。
机心讵为色，书引妾何胸。器禄既一年，反将陶组宦。学旱五两铺，续农八二精。习成歌健愿，寻会曲生情。

Temperature: 1.5
机定共生明，群情宵且闲。器在疑爱羁，可见遶头木。学义且可见，深为则子病。习剑剸新琴，□诗示长肉。
机鸣每寒黛，荆官自向春。器言新大梦，妆乐是登峰。学拜入临草，来行或逐鸥。习时能妄复，犹恐尔无伤。
机从死重寿，卒实祸诸途。器提妄士论，薏别旋陵棱。学位物高省，平生岂一顾。习汉君未回，贼丈我徒同。
