In [None]:
import torch
import torch.nn as nn
import random
import numpy as np
import Tokenizer as tk
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
VOCAB_SIZE = 1201

разбивка текста и создание словаря

In [None]:
import os

samples = []
for sample in os.listdir('samples'):
    with open("samples/" + sample, encoding="utf-8") as text:
        samples.append(text.read().lower())


class QAPair:
    def __init__(self, question, answer):
        self.question = question
        self.answer = answer

dataset = []

for sample in samples:
    lines = sample.splitlines()
    questions = lines[::2]
    answers = lines[1::2]
    for q, a in zip(questions, answers):
        dataset.append(QAPair(q, a))


In [None]:
tdg = tk.TokenDictionaryGenerator(vocabulary_size = VOCAB_SIZE-1)
tokens = tdg.generate_tokens(samples)
tokenizer = tk.Tokenizer(tokens)
tokens.save("tokens.json")

создание модели 
предложение -> hidden
последние слово + hidden -> слово(1)...слово(n)

In [None]:
class RnnTextGen(nn.Module):

    def __init__(self,input_size,inp_lstm_size,hid_size,n_layers,out_size,dropout=0.2) -> None:
        super(RnnTextGen,self).__init__()
        self.input_size = input_size
        self.n_layers = n_layers
        self.hidden_size=hid_size
        self.Encoder = nn.Embedding(input_size,inp_lstm_size)
        self.lstm = nn.LSTM(inp_lstm_size,hid_size,n_layers)
        self.dropout = nn.Dropout(dropout)
        self.l1 = nn.Linear(hid_size,out_size)
        
    def forward(self,x,hidden=None):
        x = self.Encoder(x)
        x,hidden = self.lstm(x)
        x = self.dropout(x)
        x = self.l1(x)
        return x,hidden
    
    def init_hidden(self,batch_size=1):
        return (torch.zeros(self.n_layers, batch_size, self.hidden_size, requires_grad=True).to(device),
               torch.zeros(self.n_layers, batch_size, self.hidden_size, requires_grad=True).to(device))

In [None]:
model=RnnTextGen(VOCAB_SIZE,1000,500,2,VOCAB_SIZE).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-2, amsgrad=True)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
    optimizer, 
    patience=5, 
    verbose=True, 
    factor=0.5
)

In [None]:
def evaluate(model:RnnTextGen,text:str,prediction_lim:int=15):
    text_idx = torch.LongTensor(list(tokenizer.tokenize(text))).to(device)
    hidden = model.init_hidden()
    inp = text_idx
    predicted_text=""
    for i in range(prediction_lim):
        next_w , hidden = model(inp.view(-1,1).to(device),hidden)
        inp = torch.cat([inp,next_w[-1].argmax().view(-1)])
        if next_w[-1].argmax() == torch.LongTensor([VOCAB_SIZE-1]).to(device):
            break
        word = tokens.decode(int(next_w[-1].argmax()))
        predicted_text +=word
    return predicted_text

In [None]:
def get_batch(dataset:list):
    for qa in dataset:
        question_idx = list(tokenizer.tokenize(qa.question))
        target = list(tokenizer.tokenize(qa.answer))+[tokens.count()]
        test = question_idx+target[:-1]

        target =torch.LongTensor(target).to(device)
        test = torch.LongTensor(test).to(device)
        yield target,test

In [None]:
def train(epoches:int,model:RnnTextGen,batch_size:int)->None:
    """epoches - number of epoches through all dataset
    model - model required to teach
    batch_size - n/a"""
    loss_avg =[]
    for epoch in range(epoches):
        for target,train in get_batch(dataset):
            model.train()

            hidden = model.init_hidden(batch_size)

            output,hidden = model(train,hidden)
            target_len = len(target)
            loss = criterion(output[-target_len:],target)

            loss.backward()
            optimizer.step()
            optimizer.zero_grad()

            loss_avg.append(loss.item())
            if len(loss_avg) >= 50:
                mean_loss = np.mean(loss_avg)
                print(f'Loss: {mean_loss}')
                scheduler.step(mean_loss)
                model.eval()
                question = random.choice(dataset).question
                answer = evaluate(model,question)
                print(f"Question: {question} \n Answer: {answer}")
                loss_avg = []

обучение модели

In [None]:
train(30, model, 1)

In [None]:
quest = input().lower()
evaluate(model,quest,35)

In [62]:
torch.save(model,"data.pkl")

In [None]:
model = torch.load("data.pkl").to(device)