# 作诗

#### 加载数据集

In [5]:
'''
准备数据
构建模型(构造损失函数和优化器)
训练模型
评估模型
预测
'''

from collections import Counter
import math
import numpy as np
import tensorflow as tf

# 禁用词
disallowed_words = ['（', '）', '(', ')', '__', '《', '》', '【', '】', '[', ']']
# 句子最大长度
max_len = 64
# 最小词频
min_word_frequency = 8
# mini batch 大小
batch_size = 16

# 加载数据集
with open('./poetry.txt', 'r', encoding='utf-8') as f:
    lines = f.readlines()
    # 将冒号统一成相同格式
    lines = [line.replace('：', ':') for line in lines]
# 数据集列表
poetry = []
# 逐行处理读取到的数据
for line in lines:
    # 有且只能有一个冒号用来分割标题
    if line.count(':') != 1:
        continue
    # 后半部分不能包含禁止词
    __, last_part = line.split(':')
#     print(__)
    ignore_flag = False
    for dis_word in disallowed_words:
        if dis_word in last_part:
            ignore_flag = True
            break
    if ignore_flag:
        continue
    # 长度不能超过最大长度
    if len(last_part) > max_len - 2:
        continue
    poetry.append(last_part.replace('\n', ''))

编写一个类——Tokenizer，这是为了方便我们完成字符转编号、编号转字符、字符串转编号序列、编号序列转字符串等操作而编写的一个辅助类。

使用的特殊字符有四个，为’[PAD]’, ‘[UNK]’, ‘[CLS]’, ‘[SEP]’，它们分别代表填充字符、低频词、古诗开始标记、古诗结束标记。

In [6]:
class Tokenizer:
    """
    分词器
    """

    def __init__(self, token_dict):
        # 词->编号的映射
        self.token_dict = token_dict
        # 编号->词的映射
        self.token_dict_rev = {value: key for key, value in self.token_dict.items()}
        # 词汇表大小
        self.vocab_size = len(self.token_dict)

    def id_to_token(self, token_id):
        """
        给定一个编号，查找词汇表中对应的词
        :param token_id: 带查找词的编号
        :return: 编号对应的词
        """
        return self.token_dict_rev[token_id]

    def token_to_id(self, token):
        """
        给定一个词，查找它在词汇表中的编号
        未找到则返回低频词[UNK]的编号
        :param token: 带查找编号的词
        :return: 词的编号
        """
        return self.token_dict.get(token, self.token_dict['[UNK]'])

    def encode(self, tokens):
        """
        给定一个字符串s，在头尾分别加上标记开始和结束的特殊字符，并将它转成对应的编号序列
        :param tokens: 待编码字符串
        :return: 编号序列
        """
        # 加上开始标记
        token_ids = [self.token_to_id('[CLS]'), ]
        # 加入字符串编号序列
        for token in tokens:
            token_ids.append(self.token_to_id(token))
        # 加上结束标记
        token_ids.append(self.token_to_id('[SEP]'))
        return token_ids

    def decode(self, token_ids):
        """
        给定一个编号序列，将它解码成字符串
        :param token_ids: 待解码的编号序列
        :return: 解码出的字符串
        """
        # 起止标记字符特殊处理
        spec_tokens = {'[CLS]', '[SEP]'}
        # 保存解码出的字符的list
        tokens = []
        for token_id in token_ids:
            token = self.id_to_token(token_id)
            if token in spec_tokens:
                continue
            tokens.append(token)
        # 拼接字符串
        return ''.join(tokens)


# 统计词频
counter = Counter()
for line in poetry:
    counter.update(line)
# 过滤掉低频词
_tokens = [(token, count) for token, count in counter.items() if count >= min_word_frequency]
# 按词频排序
_tokens = sorted(_tokens, key=lambda x: -x[1])
# 去掉词频，只保留词列表
_tokens = [token for token, count in _tokens]

# 将特殊词和数据集中的词拼接起来
_tokens = ['[PAD]', '[UNK]', '[CLS]', '[SEP]'] + _tokens
# 创建词典 token->id映射关系
token_id_dict = dict(zip(_tokens, range(len(_tokens))))
# 使用新词典重新建立分词器
tokenizer = Tokenizer(token_id_dict)
# 混洗数据
np.random.shuffle(poetry)

写成生成器的形式，主要出于内存方面的考虑。训练时需要对数据进行填充、转one-hot形式等操作，会占用较多内  存。如果提前对全部数据都进行处理，内存可能会溢出。而以生成器的形式，可以只在要进行训练的时候，处理相应  batch size的数据即可。

In [7]:
class PoetryDataGenerator:
    """
    古诗数据集生成器
    """

    def __init__(self, data, random=False):
        # 数据集
        self.data = data
        # batch size
        self.batch_size = batch_size
        # 每个epoch迭代的步数
        self.steps = int(math.floor(len(self.data) / self.batch_size))
        # 每个epoch开始时是否随机混洗
        self.random = random

    def sequence_padding(self, data, length=None, padding=None):
        """
        将给定数据填充到相同长度
        :param data: 待填充数据
        :param length: 填充后的长度，不传递此参数则使用data中的最大长度
        :param padding: 用于填充的数据，不传递此参数则使用[PAD]的对应编号
        :return: 填充后的数据
        """
        # 计算填充长度
        if length is None:
            length = max(map(len, data))
        # 计算填充数据
        if padding is None:
            padding = tokenizer.token_to_id('[PAD]')
        # 开始填充
        outputs = []
        for line in data:
            padding_length = length - len(line)
            # 不足就进行填充
            if padding_length > 0:
                outputs.append(np.concatenate([line, [padding] * padding_length]))
            # 超过就进行截断
            else:
                outputs.append(line[:length])
        return np.array(outputs)

    def __len__(self):
        return self.steps

    def __iter__(self):
        total = len(self.data)
        # 是否随机混洗
        if self.random:
            np.random.shuffle(self.data)
        # 迭代一个epoch，每次yield一个batch
        for start in range(0, total, self.batch_size):
            end = min(start + self.batch_size, total)
            batch_data = []
            # 逐一对古诗进行编码
            for single_data in self.data[start:end]:
                batch_data.append(tokenizer.encode(single_data))
            # 填充为相同长度
            batch_data = self.sequence_padding(batch_data)
            # yield x,y
            #前面部分是数据x,后面部分是标签y。将诗的内容错开一位分别作为数据和标签。
            #标签部分使用了one-hot进行处理，而数据部分没有使用。原因在于，数据部分准备输入词嵌入层，而词嵌入层的输入不需要进行one-hot；
            #而标签部分，需要和模型的输出计算交叉熵，输出层的激活函数是softmax，所以标签部分也要转成相应的shape，故使用one-hot形式。
            yield batch_data[:, :-1], tf.one_hot(batch_data[:, 1:], tokenizer.vocab_size)
            del batch_data

    def for_fit(self):
        """
        创建一个生成器，用于训练
        """
        # 死循环，当数据训练一个epoch之后，重新迭代数据
        while True:
            # 委托生成器
            yield from self.__iter__()


构建模型

In [8]:
# 构建模型
model = tf.keras.Sequential([
    # 不定长度的输入
    tf.keras.layers.Input((None,)),
    # 词嵌入层
    tf.keras.layers.Embedding(input_dim=tokenizer.vocab_size, output_dim=128),
    # 第一个LSTM层，返回序列作为下一层的输入
    tf.keras.layers.LSTM(128, dropout=0.5, return_sequences=True),
    # 第二个LSTM层，返回序列作为下一层的输入
    tf.keras.layers.LSTM(128, dropout=0.5, return_sequences=True),
    # 对每一个时间点的输出都做softmax，预测下一个词的概率
    tf.keras.layers.TimeDistributed(tf.keras.layers.Dense(tokenizer.vocab_size, activation='softmax')),
])

# 查看模型结构
model.summary()
# 配置优化器和损失函数
model.compile(optimizer=tf.keras.optimizers.Adam(), loss=tf.keras.losses.categorical_crossentropy)


W0106 06:41:50.551586 140464297342784 deprecation.py:506] From /opt/conda/lib/python3.6/site-packages/tensorflow_core/python/keras/initializers.py:119: calling RandomUniform.__init__ (from tensorflow.python.ops.init_ops) with dtype is deprecated and will be removed in a future version.
Instructions for updating:
Call initializer instance with the dtype argument instead of passing it to the constructor
W0106 06:41:50.581644 140464297342784 deprecation.py:506] From /opt/conda/lib/python3.6/site-packages/tensorflow_core/python/ops/resource_variable_ops.py:1630: calling BaseResourceVariable.__init__ (from tensorflow.python.ops.resource_variable_ops) with constraint is deprecated and will be removed in a future version.
Instructions for updating:
If using Keras pass *_constraint arguments to layers.


Model: "sequential"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
embedding (Embedding)        (None, None, 128)         439552    
_________________________________________________________________
lstm (LSTM)                  (None, None, 128)         131584    
_________________________________________________________________
lstm_1 (LSTM)                (None, None, 128)         131584    
_________________________________________________________________
time_distributed (TimeDistri (None, None, 3434)        442986    
Total params: 1,145,706
Trainable params: 1,145,706
Non-trainable params: 0
_________________________________________________________________


In [14]:
def generate_random_poetry(tokenizer, model, s=''):
    """
    随机生成一首诗
    :param tokenizer: 分词器
    :param model: 用于生成古诗的模型
    :param s: 用于生成古诗的起始字符串，默认为空串
    :return: 一个字符串，表示一首古诗
    """
    # 将初始字符串转成token
    token_ids = tokenizer.encode(s)
    # 去掉结束标记[SEP]
    token_ids = token_ids[:-1]
    while len(token_ids) < 64:
        # 进行预测，只保留第一个样例（我们输入的样例数只有1）的、最后一个token的预测的、不包含[PAD][UNK][CLS]的概率分布
        _probas = model.predict([token_ids, ])[0, -1, 3:]
#         print(_probas)
        # 按照出现概率，对所有token倒序排列
        p_args = _probas.argsort()[::-1][:100]
#         print(p_args)
        # 排列后的概率顺序
        p = _probas[p_args]
#         print(p)
        # 先对概率归一
        p = p / sum(p)
#         print(p)
#         print(len(p))
        # 再按照预测出的概率，随机选择一个词作为预测结果
        target_index = np.random.choice(len(p), p=p)
        target = p_args[target_index] + 3
        # 保存
        token_ids.append(target)
        if target == 3:
            break
    return tokenizer.decode(token_ids)


def generate_acrostic(tokenizer, model, head):
    """
    随机生成一首藏头诗
    :param tokenizer: 分词器
    :param model: 用于生成古诗的模型
    :param head: 藏头诗的头
    :return: 一个字符串，表示一首古诗
    """
    # 使用空串初始化token_ids，加入[CLS]
    token_ids = tokenizer.encode('')
    token_ids = token_ids[:-1]
    # 标点符号，这里简单的只把逗号和句号作为标点
    punctuations = ['，', '。']
    punctuation_ids = {tokenizer.token_to_id(token) for token in punctuations}
    # 缓存生成的诗的list
    poetry = []
    # 对于藏头诗中的每一个字，都生成一个短句
    for ch in head:
        # 先记录下这个字
        poetry.append(ch)
        # 将藏头诗的字符转成token id
        token_id = tokenizer.token_to_id(ch)
        # 加入到列表中去
        token_ids.append(token_id)
        # 开始生成一个短句
        token_ids = tokenizer.encode('')
        token_ids = token_ids[:-1]
#         while len(token_ids) < 5:
        while True:
            # 进行预测，只保留第一个样例（我们输入的样例数只有1）的，最后一个token的预测的、不包含[PAD][UNK][CLS]的概率分布
            _probas = model.predict([token_ids, ])[0, -1, 3:]
            # 按照出现概率，对所有token倒序排列，只取前100
            p_args = _probas.argsort()[::-1][:100]
            # 排列后的概率顺序
            p = _probas[p_args]
            # 先对概率归一
            p = p / sum(p)
            # 再按照预测出的概率，随机选择一个词作为预测结果
            target_index = np.random.choice(len(p), p=p)  # len(p)=100, p为概率
            target = p_args[target_index] + 3
#             print(target)
            # 保存
            token_ids.append(target)
            # 只有不是特殊字符时，才保存到poetry里面去
            if target > 3:
                poetry.append(tokenizer.id_to_token(target))
            if target in punctuation_ids:  # !!!!预测是下一个是标点符号才退出这次循环
                break
            if len(token_ids) % 5 ==0 :
                poetry.append(' ')
                break
                
    return ''.join(poetry)

In [10]:
#在训练时，我们只用随机生成古体诗的方法观察效果。在Keras里，可以通过回调（callback）执行测试方法。
#训练的时间很长，大家看看即可，我们已经训练好了模型存储在best_model.h5
'''
class Evaluate(tf.keras.callbacks.Callback):
    """
    在每个epoch训练完成后，保留最优权重，并随机生成settings.SHOW_NUM首古诗展示
    """

    def __init__(self):
        super().__init__()
        # 给loss赋一个较大的初始值
        self.lowest = 1e10

    def on_epoch_end(self, epoch, logs=None):
        # 在每个epoch训练完成后调用
        # 如果当前loss更低，就保存当前模型参数
        if logs['loss'] <= self.lowest:
            self.lowest = logs['loss']
            model.save('./best_model.h5')
        # 随机生成几首古体诗测试，查看训练效果
        print()
        for i in range(5):
            print(generate_random_poetry(tokenizer, model))


# 创建数据集
data_generator = PoetryDataGenerator(poetry, random=True)
# 开始训练
model.fit_generator(data_generator.for_fit(), steps_per_epoch=data_generator.steps, epochs=20,
                    callbacks=[Evaluate()])
'''

'\nclass Evaluate(tf.keras.callbacks.Callback):\n    """\n    在每个epoch训练完成后，保留最优权重，并随机生成settings.SHOW_NUM首古诗展示\n    """\n\n    def __init__(self):\n        super().__init__()\n        # 给loss赋一个较大的初始值\n        self.lowest = 1e10\n\n    def on_epoch_end(self, epoch, logs=None):\n        # 在每个epoch训练完成后调用\n        # 如果当前loss更低，就保存当前模型参数\n        if logs[\'loss\'] <= self.lowest:\n            self.lowest = logs[\'loss\']\n            model.save(\'./best_model.h5\')\n        # 随机生成几首古体诗测试，查看训练效果\n        print()\n        for i in range(5):\n            print(generate_random_poetry(tokenizer, model))\n\n\n# 创建数据集\ndata_generator = PoetryDataGenerator(poetry, random=True)\n# 开始训练\nmodel.fit_generator(data_generator.for_fit(), steps_per_epoch=data_generator.steps, epochs=20,\n                    callbacks=[Evaluate()])\n'

In [11]:
import warnings
warnings.filterwarnings("ignore")

调用训练好的模型

In [12]:
modeltest = tf.keras.models.load_model('./best_model.h5')

W0106 06:41:53.935435 140464297342784 deprecation.py:323] From /opt/conda/lib/python3.6/site-packages/tensorflow_core/python/ops/math_grad.py:1424: where (from tensorflow.python.ops.array_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use tf.where in 2.0, which has the same broadcast rule as np.where


In [15]:
# 随机生成一首诗
print(generate_random_poetry(tokenizer, modeltest))
# 给出部分信息的情况下，随机生成剩余部分
print(generate_random_poetry(tokenizer, modeltest, s='床前明月光，'))
# 生成藏头诗
print(generate_acrostic(tokenizer, modeltest, head='海阔天空'))

print(generate_acrostic(tokenizer, modeltest, head='教育技术'))

三雨春一今云白长柳欲南五仙见落独斋天西十海江楚三古南远独我衣一客白去朝玉人白西相几长沙新落楚相日紫楚衣晓二山春日出衣别一斋千南
床前明月光，出行年旧千小今南欲君落玉旧高世天雨故西云去黄月君长闻一石野人行衣柳楚南三云金黄夜一古春草晓高秋碧一客为落二西君五红
海南草君昔 阔平旧何欲 天旧花西野 空岁秋日江 
教出云见一 育闲春野旧 技西雨旧仙 术君石夜秋 
