In [None]:
import tensorflow as tf
import numpy as np
import os

# 小说语料预处理

In [None]:
# 语料已清洗，对语料进行读取
def readtxt(txtspath):
  txtsList=[]
  txtfiles=os.listdir(txtspath)
  for file in txtfiles:
    position = txtspath + '/' + file
    # print(position)
    with open(position,"r",encoding='UTF-8') as f:   # 打开文件
      lines = f.readlines()
      for line in lines:
        if line!='\n':
          txtsList.append(line)
  return txtsList # 返回语料段落列表

In [None]:
# 对语料进行索引化
def tokenized(txtsList):
  text = ""
  for line in txtsList:
    text += line # 读取为段落后带换行符的 # print(text[:1000])
  total_num = len(text)
  unique_num = len(set(text))
  print(f'全文共 {total_num} 个汉字')     # 文本总字数
  print(f'其中不重复出现汉字为 {unique_num} 个')  # 不重复字符数
  # 创建以字为单位的Tokenizer
  num_chars = unique_num
  tokenizer = tf.keras.preprocessing.text.Tokenizer(num_words=num_chars,char_level=True,filters='')
  tokenizer.fit_on_texts(text) # 读小说文本
  text_to_int = tokenizer.texts_to_sequences([text])[0] # 将字加入字典并转化索引数字列表
  # 创建双向索引字典
  dicts_ic = tokenizer.index_word # {索引下标：字}
  dicts_ci = {}
  for key,value in dicts_ic.items():
    dicts_ci[value] = key
  return num_chars,tokenizer,text_to_int,dicts_ic,dicts_ci

In [None]:
# 创建dataset存储输入目标对
def get_pairs(text_to_int,seq_length,batch_size):
  ids_dataset = tf.data.Dataset.from_tensor_slices(text_to_int) # 将索引转化为tensor
  sequences = ids_dataset.batch(seq_length+1, drop_remainder=True) # 拆分文本索引序列
  pairs_num = len(text_to_int) // (seq_length+1) # 全文所包含的所有输入目标对数
  def split_input_target(sequence): # 此函数用于获得每个时间步的输入与目标序列
    input_text = sequence[:-1]
    target_text = sequence[1:]
    return input_text, target_text
  # 将sequences拆分成输入目标对并随机打乱组成batch
  datasets = sequences.map(split_input_target).shuffle(pairs_num).batch(batch_size, drop_remainder=True)#.prefetch(tf.data.experimental.AUTOTUNE)
  return datasets

In [None]:
# 创建训练、验证、测试集
def split_datasets(datasets):
  # 8:1:1
  dataset_size = len(datasets)
  train_size = int(0.8 * dataset_size)
  val_size = int(0.1 * dataset_size)
  test_size = dataset_size - train_size - val_size
  # print(train_size,val_size,test_size)
  train_ds = datasets.take(train_size)
  valid_test_ds = datasets.skip(train_size)
  valid_ds = valid_test_ds.take(val_size)
  test_ds = valid_test_ds.skip(val_size)
  return train_ds,valid_ds,test_ds

# 模型创建

In [None]:
# 创建模型
class MyModel_gui(tf.keras.Model):
  def __init__(self, vocab_size, embedding_dim, rnn_units, batch_size):
    super().__init__(self)
    self.embedding = tf.keras.layers.Embedding(input_dim=vocab_size,output_dim=embedding_dim,batch_input_shape=[batch_size,None])
    self.LSTM = tf.keras.layers.LSTM(units=rnn_units,return_sequences=True,stateful=True,recurrent_initializer='glorot_uniform')
    self.dense = tf.keras.layers.Dense(vocab_size)

  def call(self, inputs, states=None, return_state=False, training=False):
    x = inputs
    x = self.embedding(x, training=training)
    if states is None:
      states = self.LSTM.get_initial_state(x)
    x = self.LSTM(x, initial_state=states, training=training)
    x = self.dense(x, training=training)

    if return_state:
      return x, states
    else:
      return x

# 模型训练

In [None]:
# 损失函数：计算模型预测值和真实值的差异
def loss(y_true, y_pred):
  return tf.keras.losses.sparse_categorical_crossentropy(y_true, y_pred, from_logits=True)

In [None]:
# 初始化模型，并进行训练
def train(vocab_size,embedding_dim,rnn_units,batch_size,train_ds,valid_ds,epochs,learning_rate):
  model = MyModel_gui(vocab_size=vocab_size,embedding_dim=embedding_dim,rnn_units=rnn_units,batch_size=batch_size)
  model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=learning_rate),loss=loss)
  history = model.fit(
      train_ds,
      epochs=epochs,
      validation_data=valid_ds,
      validation_freq=1
  )
  return model,history

# 执行训练

In [None]:
def preprocessed(txtspath,seq_length,batch_size):
  txtsList = readtxt(txtspath)
  num_chars,tokenizer,text_to_int,dicts_ic,dicts_ci = tokenized(txtsList)
  # 存储双向索引字典
  np.save('/content/drive/MyDrive/LSTM/char-LSTM/npy/gui_dict_ic.npy', dicts_ic)
  np.save('/content/drive/MyDrive/LSTM/char-LSTM/npy/gui_dicts_ci.npy', dicts_ci)
  np.save('/content/drive/MyDrive/LSTM/char-LSTM/npy/gui_charsnum.npy', num_chars)
  # 获取全部datasets
  datasets = get_pairs(text_to_int,seq_length,batch_size)
  # 获取训练、验证、测试集
  train_ds,valid_ds,test_ds= split_datasets(datasets)
  return num_chars,train_ds,valid_ds,test_ds,datasets

In [None]:
# 执行训练
def main(txtspath,seq_length,batch_size,embedding_dim,rnn_units,learning_rate,epochs):
  vocab_size,train_ds,valid_ds,test_ds,datasets = preprocessed(txtspath,seq_length,batch_size)
  model,history = train(vocab_size,embedding_dim,rnn_units,batch_size,train_ds,valid_ds,epochs,learning_rate)
  model.save_weights('/content/drive/MyDrive/LSTM/char-LSTM/model_save/gui_best_weights.h5')
  eval = model.evaluate(test_ds)
  return history,eval

In [None]:
# 训练参数设置
txtspath = '/content/drive/MyDrive/LSTM/dataG'
seq_length = 50
batch_size = 64
embedding_dim = 512
rnn_units = 1024
learning_rate = 0.00125
epochs = 10

In [None]:
history,eval = main(txtspath,seq_length,batch_size,embedding_dim,rnn_units,learning_rate,epochs)

# 小说生成

In [None]:
# 将句子转换成索引列表,text为字符串
def index_form_text(text,dicts_ci):
  index=[]
  for char in text:
    index+=[dicts_ci[char]]
  return index

In [None]:
# 由下标索引转换成字，并连接成句
def text_from_index(novel_ids,dicts_ic):
  novel=""
  for id in novel_ids:
    novel += dicts_ic[id]
  return novel

In [None]:
# 专门用来生成的模型
def generate_Model(embedding_dim,vocab_size,rnn_units,batch_size,ckpt_path):
  gen_model = tf.keras.models.Sequential([
    # 词嵌入层                                      
    tf.keras.layers.Embedding(input_dim=vocab_size,output_dim=embedding_dim,
                 batch_input_shape=[batch_size, None]),
    # LSTM 层
    tf.keras.layers.LSTM(units=rnn_units,return_sequences=True,stateful=True),

    # 全连接层
    tf.keras.layers.Dense(vocab_size)

  ])
  gen_model.load_weights(ckpt_path)      # 读入之前训练时存储下来的权重
  gen_model.build(tf.TensorShape([1, None]))

  return gen_model

In [None]:
def load_para(path1,path2,path3,path4):
  dicts_ic = np.load(path1,allow_pickle=True).item()
  dicts_ci = np.load(path2,allow_pickle=True).item()
  vocab_size = np.load(path3,allow_pickle=True).item()
  ckpt_path = path4
  return dicts_ic,dicts_ci,vocab_size,ckpt_path

In [None]:
def gen_novel(model,start_text,words_num,dicts_ci,temperature):
  start_index = index_form_text(start_text,dicts_ci)
  generateText = []
  for i in range(words_num):
    if i < len(start_index):
      generateText+=[start_index[i]]
    input = tf.expand_dims([generateText[i]], axis=0)
    predictions = model(input)
    
    predictions = tf.squeeze(predictions, 0)   #这个张量是将原始input中所有维度为1的那些维都删掉的结果
    predictions /= temperature
    

    # 从一个分类分布中抽取样本(;num_samples:抽取的样本个数)  #
    # logits:形状为 [batch_size, num_classes]的张量. 每个切片[i, :]代表对于所有类的未正规化的log概率。
    # 最后softmax的概率分布；也可以是整数，会自动变换成概率分布
    sampled_indices = tf.random.categorical(predictions, num_samples=1)  

    if i >= len(start_index)-1:
      generateText += list(sampled_indices.numpy()[0])
  return generateText

In [None]:
def generate(start_words,temperature,gen_num):
  embedding_dim = 512
  rnn_units = 1024
  batch_size = 1
  # start_words="我的话刚说了一半，便听一声巨响，顶门的木椅突然被撞成了数断，"
  path1='/content/drive/MyDrive/LSTM/char-LSTM/npy/gui_dict_ic.npy'
  path2='/content/drive/MyDrive/LSTM/char-LSTM/npy/gui_dict_ci.npy'
  path3='/content/drive/MyDrive/LSTM/char-LSTM/npy/gui_charsnum.npy'
  path4='/content/drive/MyDrive/LSTM/char-LSTM/model_save/gui_best_weights.h5'
  dicts_ic,dicts_ci,vocab_size,ckpt_path = load_para(path1,path2,path3,path4)
  model_gen = generate_Model(embedding_dim,vocab_size,rnn_units,batch_size,ckpt_path)
  generateText = gen_novel(model_gen,start_words,gen_num,dicts_ci,temperature)
  finalText = text_from_index(generateText,dicts_ic)
  print(finalText)

In [None]:
start_words="我的话刚说了一半，便听一声巨响，顶门的木椅突然被撞成了数断，"
temperature = 0.8
gen_num = 800

In [None]:
for i in range(5):
  print('*'*30)
  generate(start_words,temperature,gen_num)