In [1]:
#coding:utf8

import torch
import torch.nn as nn
import numpy as np
import math
import random
import os
import re

"""
基于pytorch的LSTM语言模型
"""


class LanguageModel(nn.Module):
    def __init__(self, input_dim, vocab):
        super(LanguageModel, self).__init__()
        self.embedding = nn.Embedding(len(vocab), input_dim)
        self.layer = nn.LSTM(input_dim, input_dim, num_layers=1, batch_first=True)
        self.classify = nn.Linear(input_dim, len(vocab))
        self.dropout = nn.Dropout(0.1)
        self.loss = nn.functional.cross_entropy

    #当输入真实标签，返回loss值；无真实标签，返回预测值
    def forward(self, x, y=None):
        x = self.embedding(x)       #output shape:(batch_size, sen_len, input_dim)
        x, _ = self.layer(x)        #output shape:(batch_size, sen_len, input_dim)
        y_pred = self.classify(x)   #output shape:(batch_size, vocab_size)
        # print(y_pred.shape)
        # print(y.shape)
        if y is not None:
            return self.loss(y_pred.view(-1, y_pred.shape[-1]), y.view(-1))
        else:
            return torch.softmax(y_pred, dim=-1)

#加载字表
def build_vocab(vocab_path):
    vocab = {"<pad>":0}
    with open(vocab_path, encoding="utf8") as f:
        for index, line in enumerate(f):
            char = line[:-1]       #去掉结尾换行符
            vocab[char] = index + 1 #留出0位给pad token
    return vocab

#加载语料
def load_corpus(path):
    corpus = ""
    with open(path, encoding="gbk") as f:
        for line in f:
            corpus += line.strip()
    return corpus

#随机生成一个样本
#从文本中截取随机窗口，前n个字作为输入，最后一个字作为输出
def build_sample(vocab, window_size, corpus):
    start = random.randint(0, len(corpus) - 1 - window_size)
    end = start + window_size
    window = corpus[start:end]
    target = corpus[start + 1:end + 1]  #输入输出错开一位
    # print(window, target)
    x = [vocab.get(word, vocab["<UNK>"]) for word in window]   #将字转换成序号
    y = [vocab.get(word, vocab["<UNK>"]) for word in target]
    return x, y

#建立数据集
#sample_length 输入需要的样本数量。需要多少生成多少
#vocab 词表
#window_size 样本长度
#corpus 语料字符串
def build_dataset(sample_length, vocab, window_size, corpus):
    dataset_x = []
    dataset_y = []
    for i in range(sample_length):
        x, y = build_sample(vocab, window_size, corpus)
        dataset_x.append(x)
        dataset_y.append(y)
    return torch.LongTensor(dataset_x), torch.LongTensor(dataset_y)

#建立模型
def build_model(vocab, char_dim):
    model = LanguageModel(char_dim, vocab)
    return model

#文本生成测试代码
def generate_sentence(openings, model, vocab, window_size):
    reverse_vocab = dict((y, x) for x, y in vocab.items())
    model.eval()
    with torch.no_grad():
        pred_char = ""
        #生成了换行符，或生成文本超过30字则终止迭代
        while pred_char != "\n" and len(openings) <= 30:
            openings += pred_char
            x = [vocab.get(char, vocab["<UNK>"]) for char in openings[-window_size:]]
            x = torch.LongTensor([x])
            if torch.cuda.is_available():
                x = x.cuda()
            y = model(x)[0][-1]
            index = sampling_strategy(y)
            pred_char = reverse_vocab[index]
    return openings

def sampling_strategy(prob_distribution):
    if random.random() > 0.1:
        strategy = "greedy"
    else:
        strategy = "sampling"
    if strategy == "greedy":
        return int(torch.argmax(prob_distribution))
    elif strategy == "sampling":
        prob_distribution = prob_distribution.cpu().numpy()
        return np.random.choice(list(range(len(prob_distribution))), p=prob_distribution)


#计算文本ppl
def calc_perplexity(sentence, model, vocab, window_size):
    prob = 0
    model.eval()
    with torch.no_grad():
        for i in range(1, len(sentence)):
            start = max(0, i - window_size)
            window = sentence[start:i]
            x = [vocab.get(char, vocab["<UNK>"]) for char in window]
            x = torch.LongTensor([x])
            target = sentence[i]
            target_index = vocab.get(target, vocab["<UNK>"])
            if torch.cuda.is_available():
                x = x.cuda()
            pred_prob_distribute = model(x)[0][-1]
            target_prob = pred_prob_distribute[target_index]
            prob += math.log(target_prob, 10)
    return 2 ** (prob * ( -1 / len(sentence)))


def train(corpus_path, save_weight=False):
    epoch_num = 20        #训练轮数
    batch_size = 32       #每次训练样本个数
    train_sample = 50000   #每轮训练总共训练的样本总数
    char_dim = 256        #每个字的维度
    window_size = 10       #样本文本长度
    vocab = build_vocab("vocab.txt")       #建立字表
    corpus = load_corpus(corpus_path)     #加载语料
    model = build_model(vocab, char_dim)    #建立模型
    # print(model)
    if torch.cuda.is_available():
        model = model.cuda()
    optim = torch.optim.Adam(model.parameters(), lr=0.01)   #建立优化器
    print("文本词表模型加载完毕，开始训练")
    for epoch in range(epoch_num):
        model.train()
        watch_loss = []
        for batch in range(int(train_sample / batch_size)):
            x, y = build_dataset(batch_size, vocab, window_size, corpus) #构建一组训练样本
            # print(x.shape, y.shape)
            if torch.cuda.is_available():
                x, y = x.cuda(), y.cuda()
            optim.zero_grad()    #梯度归零
            loss = model(x, y)   #计算loss
            loss.backward()      #计算梯度
            optim.step()         #更新权重
            watch_loss.append(loss.item())
        print("=========\n第%d轮平均loss:%f" % (epoch + 1, np.mean(watch_loss)))
        print(generate_sentence("让他在半年之前，就不能做出", model, vocab, window_size))
        print(generate_sentence("李慕站在山路上，深深的呼吸", model, vocab, window_size))
    if not save_weight:
        return
    else:
        base_name = os.path.basename(corpus_path).replace("txt", "pth")
        model_path = os.path.join("model", base_name)
        torch.save(model.state_dict(), model_path)
        return



if __name__ == "__main__":
    # build_vocab_from_corpus("corpus/all.txt")
    train("corpus.txt", False)


文本词表模型加载完毕，开始训练
第1轮平均loss:4.447406
让他在半年之前，就不能做出来，李慕又问道：“你说的，我们还是我
李慕站在山路上，深深的呼吸走。然后，李慕的身影，成为他的身影，
第2轮平均loss:4.125595
让他在半年之前，就不能做出来说，这么多了，李慕也是一个人，李慕
李慕站在山路上，深深的呼吸收功德念力，不能在漩涡中轻吐一眼，说
第3轮平均loss:4.066303
让他在半年之前，就不能做出来，为了，他们的修为，他们的修为，李
李慕站在山路上，深深的呼吸布置，李慕抬头望向李慕的脑袋，说道：
第4轮平均loss:4.028897
让他在半年之前，就不能做出来的，他同样的，他们也不会有一个人，
李慕站在山路上，深深的呼吸过，幻姬走进来，说道：“你们的，你们
第5轮平均loss:4.017029
让他在半年之前，就不能做出来，也不是他们的自己的，他们的身影道
李慕站在山路上，深深的呼吸急促，他们的身影，说道：“你们不了，
第6轮平均loss:4.012940
让他在半年之前，就不能做出来，说道：“你们的，我们的，我们的长
李慕站在山路上，深深的呼吸一个人，他们的存在，他们的身体，他们
第7轮平均loss:4.013132
让他在半年之前，就不能做出了一个人，他们是不是一个人，他们是不
李慕站在山路上，深深的呼吸收了，将她的身体，他们也不敢再次，他
第8轮平均loss:4.019767
让他在半年之前，就不能做出来。”李慕摇了摇头，说道：“魄力，我
李慕站在山路上，深深的呼吸干，说道：“你们，我们是心魔宗正寺丞
第9轮平均loss:4.013728
让他在半年之前，就不能做出现了一个人，他们的身上，说道：“你们
李慕站在山路上，深深的呼吸干净子，他们的身上的，他们的身上的，
第10轮平均loss:4.026452
让他在半年之前，就不能做出来，便是一个人，他们的身份，他们的身
李慕站在山路上，深深的呼吸引人，他们的身份，他们的身份，他们的
第11轮平均loss:4.034593
让他在半年之前，就不能做出了郡王，他们的时候，他们的时候，才有
李慕站在山路上，深深的呼吸有些，他们的声音，说道：“你们的，你
第12轮平均loss:4.034904
让他在半年之前，就不能做出来，他们的身体，说道：“你们的，你们
李慕站在山路上，深深的呼吸收。”李