In [1]:
# 数据分析/处理
import numpy as np
import pandas as pd
import re

# 搭建神经网络
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.nn.init as init
from torch import optim
from torch.utils.data import Dataset,DataLoader

# 数据可视化
import matplotlib.pyplot as plt
import warnings

# word2vec
from gensim.models import Word2Vec


warnings.filterwarnings('ignore')
%matplotlib inline

In [2]:
# 验证cuda是否可用
cuda_available=torch.cuda.is_available()
device = torch.device("cuda" if cuda_available else "cpu")
if cuda_available:
    print("CUDA Device Name:", torch.cuda.get_device_name(0))
    print("CUDA Compute Capability:", torch.cuda.get_device_capability(0))
# 宇宙的答案
np.random.seed(42)
torch.manual_seed(42)

<torch._C.Generator at 0x1d83a665410>

### 读入数据

由于是中文诗词，且需要进行字符级建模，所以需要搭建字典并切分句子。


In [3]:
# 首先将整个语料库读入，并对其进行清洗和重新划分。
poe=open("poetryFromTang.txt",'r')
poem=poe.read()
poe.close()
# 将诗词按句号划分
tmp=poem.replace(poem[0],"")
tmp=tmp.replace(" ","")
tmp=tmp.replace("。","。\n")
# 检查有无乱码
if('a' in tmp):
    print("yes")
# 诗词中有乱码，推测为拼音需要在分词时注意
# 作为新文件保存
newFile=open("NewPoetryFromTang.txt",'w')
newFile.write(tmp)
newFile.close()

yes


In [4]:
corprus=pd.read_csv("NewPoetryFromTang.txt",header=None,delimiter='\t')
max_len=corprus[0].str.len().max()
# print(max_len)
def sen2list(sentence):
    pattern=r'[a-zA-Z0-9]+'
    match=re.findall(pattern,sentence)
    # 把句子中的拼音全换成中文字符“空”
    for m in match:
        sentence=sentence.replace(m,"空")
    l=[w for w in sentence]
    return l
s=sen2list("君问归期未有期，巴山夜雨zhangqiu1秋池。")
print(s)

corprus=corprus[0].apply(sen2list).to_list()

['君', '问', '归', '期', '未', '有', '期', '，', '巴', '山', '夜', '雨', '空', '秋', '池', '。']


In [5]:
# 使用word2vec得到对应的word2vec
# 将向量维度设置为50，同时由于是字符级别的模型，所以对所有的字符都要训练，min_count需要设置为1
wvmodel=Word2Vec(corprus,vector_size=50,sg=1,min_count=1)

In [6]:
# 例子
wvmodel.wv["巴"]

array([-0.02159202, -0.03349001, -0.04736485,  0.09071317, -0.01452516,
       -0.13695037,  0.07575325,  0.14807402, -0.10778531, -0.01217772,
       -0.03407127, -0.04166519, -0.02607638,  0.12653652, -0.12478288,
        0.02732991, -0.01574592,  0.05789427, -0.2224392 , -0.08616893,
        0.053178  ,  0.07778005,  0.18014918, -0.10958737,  0.10932513,
       -0.02118833, -0.01260017, -0.05897688, -0.16173272,  0.02780469,
        0.07224127, -0.04969627, -0.01044901,  0.06253659, -0.15252413,
        0.08228706,  0.12177187, -0.05846209,  0.05054614, -0.08211034,
        0.14143018, -0.01661082, -0.03055685,  0.0646854 ,  0.23934047,
       -0.02147242, -0.00454188, -0.05778379,  0.09433985,  0.10131933],
      dtype=float32)

In [7]:
def sen2word2vec(sentence_list,model,maxlen=max_len):
    vecList=np.zeros((maxlen,model.vector_size)) 
    # vecList=np.zeros((len(sentence_list),model.vector_size)) 
    
    if(sentence_list==[]):
        return vecList 
    # 将列表句子转化为稠密词向量句子
    # vecList=np.array([])
    for i,e in enumerate(sentence_list):
        if(e in model.wv):
            vecList[i]=model.wv[e]
    return vecList

s=['巴','山','夜']
p=sen2word2vec(s,wvmodel)
print(len(p))
print(p[0]==wvmodel.wv['巴'])

40
[ True  True  True  True  True  True  True  True  True  True  True  True
  True  True  True  True  True  True  True  True  True  True  True  True
  True  True  True  True  True  True  True  True  True  True  True  True
  True  True  True  True  True  True  True  True  True  True  True  True
  True  True]


In [13]:
# 在训练集将数据整理成模型便于训练的格式
# 由于我们需要预测下一个token，所以我们需要枚举字符串所有的长度大于1的前缀
tmp=[sentence[:i] for sentence in corprus for i in range(2,len(sentence)+1)]
train_size=int(0.9*len(tmp))
train=tmp[:train_size]
test=tmp[train_size:]
print(len(tmp))
print(tmp[:20])
print(train_size)


14550
[['巴', '山'], ['巴', '山', '上'], ['巴', '山', '上', '峡'], ['巴', '山', '上', '峡', '重'], ['巴', '山', '上', '峡', '重', '复'], ['巴', '山', '上', '峡', '重', '复', '重'], ['巴', '山', '上', '峡', '重', '复', '重', '，'], ['巴', '山', '上', '峡', '重', '复', '重', '，', '阳'], ['巴', '山', '上', '峡', '重', '复', '重', '，', '阳', '台'], ['巴', '山', '上', '峡', '重', '复', '重', '，', '阳', '台', '碧'], ['巴', '山', '上', '峡', '重', '复', '重', '，', '阳', '台', '碧', '峭'], ['巴', '山', '上', '峡', '重', '复', '重', '，', '阳', '台', '碧', '峭', '十'], ['巴', '山', '上', '峡', '重', '复', '重', '，', '阳', '台', '碧', '峭', '十', '二'], ['巴', '山', '上', '峡', '重', '复', '重', '，', '阳', '台', '碧', '峭', '十', '二', '峰'], ['巴', '山', '上', '峡', '重', '复', '重', '，', '阳', '台', '碧', '峭', '十', '二', '峰', '。'], ['荆', '王'], ['荆', '王', '猎'], ['荆', '王', '猎', '时'], ['荆', '王', '猎', '时', '逢'], ['荆', '王', '猎', '时', '逢', '暮']]
13095


### 生成式语言模型建模建模

使用LSTM和GRU搭建语言模型。使用预测下一个token的方式生成，这种自回归生成方法是目前最主流的生成方法。

具体可以参考AK的[字符级语言模型教程](https://www.youtube.com/watch?v=PaCmpygFfXo&list=PLAqhIrjkxbuWI23v9cThsA9GvCAUhRvKZ&index=2&ab_channel=AndrejKarpathy)。

In [None]:
import torch.nn.utils.rnn as rnn_utils
# 定义模型
class GRU(nn.Module):
    def __init__(self,d_model,hidden_size,num_layer,output_size,dropout=0.1,bid=False) -> None:
        super().__init__()
        self.d_model=d_model
        self.hidden_size=hidden_size
        self.layers=num_layer
        self.bid=bid
        
        self.rnn=nn.GRU(d_model,hidden_size,num_layer,\
                        batch_first=True,dropout=dropout,bidirectional=bid)

    def forward(self,X):
        if(self.bid==True):
            h0=torch.zeros(X.size(0),2*self.num_layer,self.hidden_size)
        else:
            h0=torch.zeros(X.size(0),self.num_layer,self.hidden_size)
        X,h_n=self.rnn(X,h0)
        return X,h_n

class LSTM(nn.Module):
    def __init__(self,d_model,hidden_size,num_layer,output_size=50,dropout=0.1,bid=False) -> None:
        super().__init__()
        self.d_model=d_model
        self.hidden_size=hidden_size
        self.layer=num_layer
        self.bid=bid
        self.rnn=nn.LSTM(d_model,hidden_size,num_layer,\
                        batch_first=True,dropout=dropout,bidirectional=bid)

    def forward(self,X,l):
        if(self.bid==True):
            h0=torch.zeros(2*self.layer,X.size(0),self.hidden_size).to(device)
            c0=torch.zeros(2*self.layer,X.size(0),self.hidden_size).to(device)
        else:
            h0=torch.zeros(self.layer,X.size(0),self.hidden_size).to(device)
            c0=torch.zeros(self.layer,X.size(0),self.hidden_size).to(device)
        X=rnn_utils.pack_padded_sequence(X,l,batch_first=True,enforce_sorted=False)
        X,(hn,cn)=self.rnn(X,(h0,c0))
        X,_=rnn_utils.pad_packed_sequence(X,batch_first=True)
        return X,hn,cn

### 数据集处理

In [None]:
class MyDataSet(Dataset):
    def __init__(self,data) -> None:
        super().__init__()
        self.data=data
    def __len__(self):
        return len(self.data)
    def __getitem__(self, index):
        
        sentence=self.data[index]
        sentence_tmp=sen2word2vec(self.data[index])
        # sentence_extend=
        mask=torch.tril(torch.ones((len(sentence),len(sentence))))
        l=[i+1 for i in range(len(self.data[index]))] 
        return


In [None]:
batchsize=64
epoch=25
train_dataset=MyDataSet(train)
train_loader=DataLoader(train_dataset,batch_size=batchsize,shuffle=True)
test_dataset=MyDataSet(test)
test_loader=DataLoader(test_dataset,batch_size=batchsize,shuffle=True)


In [None]:
class Poet(nn.Module):
    def __init__(self,d_model,hidden_size,model="GRU") -> None:
        super().__init__()
        if(model=="GRU"):
            self.model=GRU(d_model=d_model,hidden_size=hidden_size,output_size=d_model)
        elif(model=="LSTM"):
            self.model=LSTM(d_model=d_model,hidden_size=hidden_size,output_size=d_model)


    def forward(self,X,l):
        pass

    @torch.inference_mode()
    def generate(self,s):
        '''
        给定句子，然后让模型通过接龙的方式完成接下来的句子。
        由于数据中的每个诗句都是以“。”结束的。所以当模型输出“。”时，我们认为模型输出结束
        '''
        pass

In [None]:
def TrainModel(mnodel,dataloader):
    pass

In [None]:
def TestModel(model,test,config):
    '''
    测试模型，计算困惑度
    '''
    pass

In [None]:
s="山水有情亦无情"
poe=Poet(model="LSTM")
TrainModel(poe)
TestModel(poe)
print(poe.generate(s))