In [None]:
import torch 
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

from torchtext.legacy import data,datasets
from torchtext.vocab import Vectors

import random
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd

import os
import time
from tqdm import tqdm


import unicodedata
import re
import codecs

In [None]:
DEVICE=torch.device('cuda' if torch.cuda.is_available() else 'cpu')
SEED=2022
torch.manual_seed(SEED)

In [None]:
def load_sentences(path,change_dict):
    
    sentences=[]
    sentence=''
    keys=list(change_dict.keys())
    values=list(change_dict.values())
    for line in codecs.open(path, 'r', 'utf-8'):     
        if line=='\n':
            sentences.append(sentence)
            sentence=''
        else:
            line=line[0:-1]
            line=re.sub(keys[0],values[0],line)
            line=re.sub(keys[1],values[1],line)
            sentence+=line
    return sentences[1:]

def load_dataset(sentences,valid_rate=0.2,fix_length=10):
    phrases=[]
    next_phrases=[]
    for sentence in sentences:
        sen_len=len(sentence)
        for i in range(sen_len-fix_length-1):
            phrases.append(sentence[i:i+fix_length])
            next_phrases.append(sentence[i+1:i+fix_length+1])
    dataset=pd.DataFrame()
    dataset['text']=phrases
    dataset['label']=next_phrases
    train_data=dataset.sample(frac=(1-valid_rate),random_state=0,axis=0)
    valid_data=dataset[~dataset.index.isin(train_data.index)]
    return train_data,valid_data,dataset
def laod_dataset(df,textfield,labelfield):

    examples=[]
    fields = [('text', textfield), ('label', labelfield)]
    for index,row in df.iterrows():
        examples.append(data.Example.fromlist([row['text'], row['label']], fields))
    
    dataset = data.Dataset(examples, fields)

    return dataset


In [None]:
FIX_LENGTH=10
BATCH_SIZE=32
EMBEDDING_DIM=300
VALID_RATE=0.2


#因为预训练模型里面有没，。需要转化
change_dict={
    '。':'○',
    '，':','
}
CORPUS_PATH='../data/poetryFromTang.txt'
sentences=load_sentences(path=CORPUS_PATH,change_dict=change_dict)

def tokenizer(text):
    return list(text)

TEXT=data.Field(sequential=True,tokenize=tokenizer,use_vocab=True)
# LABEL=data.Field(sequential=True,tokenize=tokenizer,use_vocab=True)

train_df,valid_df,df=load_dataset(sentences,valid_rate=VALID_RATE,fix_length=FIX_LENGTH)
train_data=laod_dataset(train_df,textfield=TEXT,labelfield=TEXT)
valid_data=laod_dataset(valid_df,textfield=TEXT,labelfield=TEXT)
dataset=laod_dataset(df,textfield=TEXT,labelfield=TEXT)


if not os.path.exists('.vector_cache'):
    os.mkdir('.vector_cache') 
vectors = Vectors(name='../data/sgns.sikuquanshu.word') 


#dataset/vector/unk_init
TEXT.build_vocab(dataset,vectors=vectors)
# LABEL.build_vocab(dataset,vectors=vectors)

train_iterator=data.BucketIterator(
    train_data,
    batch_size=BATCH_SIZE,
    device=DEVICE,
    sort_key=lambda x: len(x.TEXT)
)

valid_iterator=data.BucketIterator(
    valid_data,
    batch_size=BATCH_SIZE,
    device=DEVICE,
    sort_key=lambda x: len(x.TEXT)
)


In [None]:
print(valid_data[1].text)
print(valid_data[1].label)
print(' ')

batch=next(iter(valid_iterator))
text=batch.text
label=batch.label

for  i  in range(BATCH_SIZE):
    print([TEXT.vocab.itos[t] for t in text[:,i]])
    print([TEXT.vocab.itos[t] for t in label[:,i]])
    print(' ')

In [None]:
class LSTM_BASE(nn.Module):
    def __init__(self,vocab_size,embedding_dim,hidden_dim):
        super().__init__()
        self.embedding=nn.Embedding(vocab_size,embedding_dim)
        self.lstm=nn.LSTM(embedding_dim,
                          hidden_dim,
                          bidirectional=False,
                          batch_first=True,
                          )
        self.fc=nn.Linear(hidden_dim,vocab_size)
    def forward(self,text):
        embedding_text=self.embedding(text)
        out,_=self.lstm(embedding_text)
        out=self.fc(out)
        return out

class GRU_BASE(nn.Module):
    def __init__(self,vocab_size,embedding_dim,hidden_dim):
        super().__init__()
        self.embedding=nn.Embedding(vocab_size,embedding_dim)
        self.gru=nn.GRU(embedding_dim,hidden_dim,batch_first=True)
        self.fc=nn.Linear(hidden_dim,vocab_size)
    def forward(self,text):
        embedding_text=self.embedding(text)
        out,_=self.gru(embedding_text)
        out=self.fc(out)
        return out

INPUT_DIM = len(TEXT.vocab)
HIDDEN_DIM = 300

MODEL_NAME='GRU'

# model=LSTM_BASE(
#     vocab_size=INPUT_DIM,
#     embedding_dim=EMBEDDING_DIM,
#     hidden_dim=HIDDEN_DIM,

#      ).to(DEVICE)

model=GRU_BASE(
    vocab_size=INPUT_DIM,
    embedding_dim=EMBEDDING_DIM,
    hidden_dim=HIDDEN_DIM,
     ).to(DEVICE)     


# Embedding set
pretrained_embeddings = TEXT.vocab.vectors
model.embedding.weight.data.copy_(pretrained_embeddings)

# dont no why use this ,set some vector to zero
# model.embedding.weight.data[PAD_IDX] = torch.zeros(EMBEDDING_DIM)
model.embedding.weight.requires_grad = True


# Optimizer and LossFunction
# use the paper best parameter
optimizer = optim.Adam(model.parameters(),lr=5e-5)
criterion = nn.CrossEntropyLoss().to(DEVICE)

# Evaluation function 
def categorical_accuracy(preds, y):
    """
    Returns accuracy per batch, i.e. if you get 8/10 right, this returns 0.8, NOT 8
    """
    max_preds = preds.argmax(dim = 1, keepdim = True) # get the index of the max probability
    correct = max_preds.squeeze(1).eq(y)
    correct=torch.tensor(correct.sum()).to(DEVICE)
    all=torch.tensor(y.shape[0]).to(DEVICE)
    return correct/ all






In [None]:
def train(model, iterator, optimizer, criterion):
    
    epoch_loss = 0
    epoch_acc = 0  
    model.train() 
    for batch in tqdm(iterator):
        
        optimizer.zero_grad()
        text = batch.text
        text=text.permute(1,0)
        label = batch.label
        label = label.permute(1,0)
        predictions = model(text)

        predictions=predictions.view(FIX_LENGTH*label.shape[0],-1)
        label = label.reshape(FIX_LENGTH*label.shape[0])

        loss = criterion(predictions, label)
                        
        acc = categorical_accuracy(predictions, label)

        loss.backward()

        
        optimizer.step()
        
        epoch_loss += loss.item()
        epoch_acc += acc.item()
        
    return epoch_loss / len(iterator), epoch_acc / len(iterator)

def evaluate(model, iterator, criterion):
    
    epoch_loss = 0
    epoch_acc = 0
    
    model.eval()
    
    with torch.no_grad():
    
        for batch in tqdm(iterator):

            text = batch.text
            text=text.permute(1,0)
            label = batch.label
            label = label.permute(1,0)
            predictions = model(text)

            predictions=predictions.view(FIX_LENGTH*label.shape[0],-1)
            label = label.reshape(FIX_LENGTH*label.shape[0])

            loss = criterion(predictions, label)
                            
            acc = categorical_accuracy(predictions, label)
          
            epoch_loss += loss.item()
            epoch_acc += acc.item()
        
    return epoch_loss / len(iterator), epoch_acc / len(iterator)

def epoch_time(start_time, end_time):
    elapsed_time = end_time - start_time
    elapsed_mins = int(elapsed_time / 60)
    elapsed_secs = int(elapsed_time - (elapsed_mins * 60))
    return elapsed_mins, elapsed_secs

In [None]:

def trainmode():

    N_EPOCHS = 100


    best_valid_loss = float('inf')


    train_loss_list=[]
    valid_loss_list=[]
    train_acc_list=[]
    valid_acc_list=[]
    train_pp_list=[]
    valid_pp_list=[]

    for epoch in range(N_EPOCHS):

        start_time = time.time()
        
        train_loss, train_acc = train(model, train_iterator, optimizer, criterion)
        valid_loss, valid_acc = evaluate(model, valid_iterator, criterion)

        train_pp=np.exp(train_loss)
        valid_pp=np.exp(valid_loss)
        
        end_time = time.time()

        epoch_mins, epoch_secs = epoch_time(start_time, end_time)
        
        if valid_loss < best_valid_loss:
            best_valid_loss = valid_loss
            torch.save(model.state_dict(), '../model/{}-model.pt'.format(MODEL_NAME))
        
        
        train_loss_list.append(train_loss)
        valid_loss_list.append(valid_loss)
        train_acc_list.append(train_acc)
        valid_acc_list.append(valid_acc)
        train_pp_list.append(train_pp)
        valid_pp_list.append(valid_pp)
        
        print(f'Epoch: {epoch+1:02} | Epoch Time: {epoch_mins}m {epoch_secs}s')
        print(f'\tTrain Loss: {train_loss:.3f}| Train PP: {train_pp:.2f} | Train Acc: {train_acc*100:.2f}%')
        print(f'\t Val. Loss: {valid_loss:.3f}| Val PP: {valid_pp:.2f} |  Val. Acc: {valid_acc*100:.2f}%')


    def drawpic(train_loss_list=[],test_loss_list=[],epoch_number=10,title='1',root_path='./'):
        # make data
        x = [i for i in range(epoch_number)]
        # plot
        fig, ax = plt.subplots()
        plt.title(title)
        ax.plot(x, train_loss_list, linewidth=2.0)
        ax.plot(x, test_loss_list, linewidth=2.0)
        path=root_path+title+'.jpg'
        print('![]({})'.format(path))
        plt.savefig(path)
        plt.show()
    root_path='../pic/'
    title='{}-loss-{}'.format(MODEL_NAME,str(N_EPOCHS))

    drawpic(train_loss_list=train_loss_list,test_loss_list=valid_loss_list,epoch_number=N_EPOCHS,title=title,root_path=root_path)

    title='{}-acc-{}'.format(MODEL_NAME,str(N_EPOCHS))

    drawpic(train_loss_list=train_acc_list,test_loss_list=valid_acc_list,epoch_number=N_EPOCHS,title=title,root_path=root_path)

    title='{}-PP-{}'.format(MODEL_NAME,str(N_EPOCHS))

    drawpic(train_loss_list=train_acc_list,test_loss_list=valid_acc_list,epoch_number=N_EPOCHS,title=title,root_path=root_path)

trainmode()

In [None]:
model_gru=GRU_BASE(
    vocab_size=INPUT_DIM,
    embedding_dim=EMBEDDING_DIM,
    hidden_dim=HIDDEN_DIM,
     ).to(DEVICE)   


model_gru.load_state_dict(torch.load('../model/GRU-model.pt'))

batch=next(iter(valid_iterator))
text = batch.text
text=text.permute(1,0)
label = batch.label
label = label.permute(1,0)
predictions = model_gru(text)


# predictions=predictions.view(FIX_LENGTH*label.shape[0],-1)
max_preds = predictions.argmax(dim = 2, keepdim = True)

print(max_preds.shape)
for i in range(32):
    sen=text[i]
    sen_pre=max_preds[i].reshape(FIX_LENGTH)
    lab=label[i]
    sen_pre1=[TEXT.vocab.itos[int(t)] for t in sen_pre]
    sen1=[TEXT.vocab.itos[int(t)] for t in sen]
    lab1=[TEXT.vocab.itos[int(t)] for t in lab]
    print(''.join(sen1),'  ',''.join(lab1),' ',''.join(sen_pre1))  

In [None]:
sentence='勇敢牛牛，不怕困难'
sentence=re.sub('，',',',sentence)
sentence=re.sub('。','○',sentence)
sen_list=list(sentence)
sen_len=len(sen_list)
if sen_len>FIX_LENGTH:
    sen_list=sen_list[sen_len-FIX_LENGTH:sen_len]
elif sen_len<FIX_LENGTH:
    sen_list=['○' for i in range(FIX_LENGTH-sen_len)]+sen_list
indexed = [TEXT.vocab.stoi[t] for t in sen_list] 

tensor = torch.LongTensor(indexed).to(DEVICE)              #转换为张量
tensor = tensor.unsqueeze(1).T                             #reshape成[batch, 单词个数]
poem_idx=[]
poem_len=1000
for i in range(poem_len):
    prediction = model_gru(tensor)
    max_preds = prediction.argmax(dim = 2, keepdim = True)
    max_preds=max_preds.reshape(FIX_LENGTH)
    poem_idx.append(int(max_preds[-1].cpu()))
    tensor=torch.cat([tensor[:,1:FIX_LENGTH], torch.LongTensor([[max_preds[-1]]]).cuda()], 1)



sen_predict=[TEXT.vocab.itos[t] for t in poem_idx]
sen_predict=''.join(sen_predict)
sen_predict=re.sub(',','，',sen_predict)
sen_predict=re.sub('○','。',sen_predict)

sentence=sentence+sen_predict

print(sentence)