# 第 12 章 自然语言生成实战

## 12.1 LSTM 写诗

### 12.1.3 实现 LSTM 写诗

In [11]:
import os
import glob
import json
import random
import operator
import collections

from typing import List, Dict

import numpy as np
import pandas as pd
import tensorflow as tf

from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.utils import to_categorical

# 只保留五言绝句
def should_keep(paragraphs: List[str]):
    return all([len(par) == 12 for par in paragraphs])

# 读取数据
def read_all_data(path: str):
    poems = []
    files = glob.glob(os.path.join(path, 'poet.tang.*.json'))
    for file in files:
        file_data = json.load(open(file, 'r',encoding='utf-8'))
        for item in file_data:
            if should_keep(item['paragraphs']):
                poem = ''.join(item['paragraphs'])
                poems.append(poem)
    return poems

poems = read_all_data('data/poetry')
# 为了加快训练，这里只取了 10000 条诗，可以根据自己资源增加或者减少
poems = poems[:10000]
print(poems[:3])

['秦川雄帝宅，函谷壮皇居。绮殿千寻起，离宫百雉余。连甍遥接汉，飞观迥凌虚。云日隐层阙，风烟出绮疏。', '岩廊罢机务，崇文聊驻辇。玉匣启龙图，金绳披凤篆。韦编断仍续，缥帙舒还卷。对此乃淹留，欹案观坟典。', '移步出词林，停舆欣武宴。雕弓写明月，骏马疑流电。惊雁落虚弦，啼猿悲急箭。阅赏诚多美，于兹乃忘倦。']


In [12]:
class Processor(object):

    def build_token_dict(self, corpus: List[List[str]]):
        """
        构建 token 字典，这个方法将会遍历分词后的语料，构建一个标记频率字典和标记与索引的映射字典

        Args:
            corpus: 所有分词后的语料
        """
        token2idx = {
            '<PAD>': 0,
            '<UNK>': 1,
            '<BOS>': 2,
            '<EOS>': 3
        }

        token2count = {}
        for sentence in corpus:
            for token in sentence:
                count = token2count.get(token, 0)
                token2count[token] = count + 1
        # 按照词频降序排序
        sorted_token2count = sorted(token2count.items(),
                                    key=operator.itemgetter(1),
                                    reverse=True)
        token2count = collections.OrderedDict(sorted_token2count)

        for token in token2count.keys():
            if token not in token2idx:
                token2idx[token] = len(token2idx)
        return token2idx, token2count

    @staticmethod
    def numerize_sequences(sequence: List[str],
                           token2index: Dict[str, int]) -> List[int]:
        """
        将分词后的标记（token）数组转换成对应的索引数组
        如 ['我', '想', '睡觉'] -> [10, 313, 233]

        Args:
            sequence: 分词后的标记数组
            token2index: 索引词典
        Returns: 输入数据对应的索引数组
        """
        token_result = []
        for token in sequence:
            token_index = token2index.get(token)
            if token_index is None:
                token_index = token2index['<UNK>']
            token_result.append(token_index)
        return token_result

In [13]:
p = Processor()
# 这里我们对所有的诗做了基于字的分词，然后再构建词表
p.token2idx, p.token2count = p.build_token_dict([list(seq) for seq in poems])
# 由于我们这是文本生成，还需要一个索引到词的映射关系
p.idx2token = dict([(v, k) for k,v in p.token2idx.items()])

In [14]:
# 先定义一下两个全局变量，输入序列长度和批次大小
INPUT_LEN = 6
BATCH_SIZE = 500

# 所有的诗整合为一个大字符串，方便后续遍历
corpus = ''.join(poems)

def data_generator():
    t = 0
    while True:
        x_data = []
        y_data = []
        for i in range(BATCH_SIZE):
            # 取出 t 到 t + INPUT_LEN 位置的字符串序列作为输入
            x = corpus[t: t + INPUT_LEN]
            # 取出 t + INPUT_LEN 位置的字符串作为输出
            y = corpus[t + INPUT_LEN]

            # 输入输出转换为数字
            x_data.append(p.numerize_sequences(list(x), p.token2idx))
            y_data.append(p.token2idx[y])

            t += 1
            # 当游标到了最后，从头开始遍历
            if t + 1 >= len(corpus) - INPUT_LEN:
                t = 0

        x_data = np.array(x_data)
        # 将输出序列转换为 one-hot 编码
        y_data = to_categorical(y_data, len(p.token2idx))

        yield x_data, y_data
        
# 初始化数据生成器
# 如果想观察生成器每一步产生的数据，初始化生成器后调用 `next(gen)` 函数观察
gen = data_generator()

In [15]:
L = tf.keras.layers

model = tf.keras.Sequential([
    L.Embedding(input_dim=len(p.token2idx), output_dim=50, input_shape=(6, )),
    L.LSTM(128),
    L.Dropout(0.1),
    L.Dense(len(p.token2idx), activation='softmax')
])

model.compile(optimizer='adam', loss='categorical_crossentropy')
model.summary()


# 每个 epoch 步数等于（整个语料序列长度 - 窗口长度）除以批次大小
steps = (len(corpus) - INPUT_LEN - 1) // BATCH_SIZE
model.fit_generator(gen,
                    steps_per_epoch=steps,
                    epochs=10)

NotImplementedError: Cannot convert a symbolic Tensor (lstm_2/strided_slice:0) to a numpy array.

In [None]:
def sample(preds: np.ndarray, temperature: float = 1.0) -> int:
    """
    使用 softmax 温度随机采样
    当 temperature = 1.0 时，模型输出正常
    当 temperature = 0.5 时，模型输出比较open
    当 temperature = 1.5 时，模型输出比较保守

    Args:
        preds: 模型预测结果
        temperature: softmax 温度
    Returns:
        采样结果
    """
    preds = np.asarray(preds).astype('float64')
    exp_preds = np.power(preds, 1. / temperature)
    preds = exp_preds / np.sum(exp_preds)
    pro = np.random.choice(range(len(preds)), 1, p=preds)
    return int(pro.squeeze())

def predict_next_char(input_seq: List[str],
                      temperature: float = 1.0) -> str:
    """
    输入序列，预测下一个字符

    Args:
        input_seq: 输入序列
        temperature: softmax 温度
    Returns:
        下一个字符串
    """
    if len(input_seq) < INPUT_LEN:
        raise ValueError(f'seq length must large than {INPUT_LEN}')

    input_seq = input_seq[-INPUT_LEN:]
    input_tensor = p.numerize_sequences(input_seq, p.token2idx)
    input_tensor = np.array([input_tensor])
    preds = model.predict(input_tensor)[0]
    pred_idx = sample(preds, temperature)
    pred_char = p.idx2token[pred_idx]
    return pred_char

def pred_with_start(input_seq: List[str],
                    temperature: float = 1.0) -> List[str]:
    """
    以给定字符串作为开头写诗

    Args:
        input_seq: 诗开头字符串
        temperature: softmax 温度
    Returns:
        生成的诗歌序列
    """
    result = input_seq
    # 如果长度不足，则随机取一首诗补全
    if len(input_seq) < INPUT_LEN:
        padding_poem = list(random.choice(poems))
    else:
        padding_poem = []

    # 当序列中出现四个 。 或者序列长度超过 100 时候停止
    # 100 这个限制主要是为了避免出现死循环
    should_continue = True
    while should_continue:
        pred_char = predict_next_char(padding_poem + result, temperature)
        result.append(pred_char)
        if result.count('。') == 4 or len(result) > 100:
            should_continue = False
    return result

In [None]:
for temp in [0.3, 0.6, 1.0, 1.2, 1.5]:
    print(f'\nTemperature: {temp}')
    for _ in range(3):
        print(''.join(pred_with_start(['冬', '日'], temp)))

In [None]:
def predict_hide(head_tokens: List[str],
                 temperature: float = 1.0) -> List[str]:
    """
    写藏头诗

    Args:
        head_tokens: 每一句的第一个字组成的数组
        temperature: softmax 温度
    Returns:
        生成的诗歌序列
    """
    padding_poem = list(random.choice(poems))
    result = []

    for i in range(4):
        result.append(head_tokens[i])
        sentence_end = False
        while not sentence_end:
            char = predict_next_char(padding_poem + result, temperature)
            result.append(char)
            if char == '。' or len(result) > 100:
                sentence_end = True
    return result

In [None]:
for temp in [0.3, 0.6, 1.0, 1.2, 1.5]:
    print(f'\nTemperature: {temp}')
    for _ in range(3):
        print(''.join(predict_hide(['机', '器', '学', '习'])))

In [None]:
import datetime
print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))