In [1]:
import re
import math
import numpy as np
import random
import torch
from sklearn.model_selection import train_test_split
from torchinfo import summary
# from torchsummary import summary

In [2]:
def normalize_and_split_sentences(file_path, file_path2):
    # Чтение содержимого файла
    with open(file_path, 'r') as file:
        content = file.read()

    # Приведение текста к нижнему регистру
    content = content.lower()

    # Удаление нежелательных символов и знаков препинания
    content = re.sub(r'[^а-яА-Я0-9\s\.?!]', '', content)

    # Удаление лишних пробелов и пробельных символов
    content = re.sub(r'\s+', ' ', content)
    
    # Разделение текста на предложения
    sentences = re.split(r'[.!?]', content)

    # Удаление пустых предложений
    sentences = [sentence.strip() for sentence in sentences if sentence.strip()]

    # Запись нормализованных предложений в файл
    with open(file_path2, 'w') as file:
        file.write('\n'.join(sentences))


In [3]:
file_path = 'text_raw.txt'
normalize_and_split_sentences(file_path, 'text_new.txt')

In [4]:
CHARS = 'абвгдежзийклмнопрстуфхцчшщьыъэюя1234567890 '
INDEX_TO_CHAR =  [w for w in CHARS] + ['none']
CHARS_NUM = len(INDEX_TO_CHAR)
CHAR_TO_INDEX = {w: i for i, w in enumerate(INDEX_TO_CHAR)}
MAX_LENGTH = 40

In [5]:
def Ceasar(line, shift):
    index_line = ['none']*MAX_LENGTH
    for i in range(min(len(line), MAX_LENGTH)):
        index_line[i] = line[i]
    index_line = list(map(lambda x: CHAR_TO_INDEX.get(x, CHAR_TO_INDEX['none']), index_line))    
    for i, ch in enumerate(index_line):
        if ch not in [CHARS_NUM-1, CHARS_NUM-2]:
            index_line[i] = (ch + shift) % (len(INDEX_TO_CHAR)-2)
    return torch.tensor(index_line, dtype=torch.long)


In [6]:
res = Ceasar(' абвг эюя 890', 3)
print(res.shape)
print(list(map(lambda x: INDEX_TO_CHAR[x], res)))
print(res.dtype)

torch.Size([40])
[' ', 'г', 'д', 'е', 'ж', ' ', '1', '2', '3', ' ', 'а', 'б', 'в', 'none', 'none', 'none', 'none', 'none', 'none', 'none', 'none', 'none', 'none', 'none', 'none', 'none', 'none', 'none', 'none', 'none', 'none', 'none', 'none', 'none', 'none', 'none', 'none', 'none', 'none', 'none']
torch.int64


In [7]:
Data_raw = []
with open('text_new.txt', 'r') as fp:
    for line in fp:
        K = random.randint(1, CHARS_NUM-3)
        Data_raw.append(tuple([Ceasar(line, K), Ceasar(line, 0)]))



In [8]:
Data_train, Data_test = train_test_split(Data_raw, test_size=0.2)

In [9]:
Data_train[12]

(tensor([20, 15, 17, 25, 21, 42, 20, 12, 42, 14, 20,  7, 12, 25, 42, 22, 23, 21,
         42,  8, 26, 20, 29, 18,  7, 26, 43, 43, 43, 43, 43, 43, 43, 43, 43, 43,
         43, 43, 43, 43]),
 tensor([13,  8, 10, 18, 14, 42, 13,  5, 42,  7, 13,  0,  5, 18, 42, 15, 16, 14,
         42,  1, 19, 13, 22, 11,  0, 19, 43, 43, 43, 43, 43, 43, 43, 43, 43, 43,
         43, 43, 43, 43]))

In [10]:
Train = torch.utils.data.DataLoader(Data_train, batch_size=10, shuffle=True)
Test = torch.utils.data.DataLoader(Data_test, batch_size=10)

In [11]:
check = 'здесь мы проверяем качество работы'
print(len(check))
def test_sentence(test_sentence):
    # K = random.randint(1, CHARS_NUM-3)
    K = random.randint(1, 15)
    return Ceasar(test_sentence, K).unsqueeze(0)

34


In [12]:
print(test_sentence(check).shape)
print(test_sentence(check).dtype)
print(test_sentence(check))

torch.Size([1, 40])
torch.int64
tensor([[13, 10, 11, 23, 32, 42, 18, 33, 42, 21, 22, 20,  8, 11, 22, 37, 11, 18,
         42, 16,  6, 29, 11, 23, 24,  8, 20, 42, 22,  6,  7, 20, 24, 33, 43, 43,
         43, 43, 43, 43]])


In [13]:
res = test_sentence(check)
print(res.shape)
print(*list(map(lambda x: INDEX_TO_CHAR[x], res.squeeze())))

torch.Size([1, 40])
у р с э 7   ш 8   ы ъ ь о с ъ б с ш   ц м 4 с э ю о ь   ъ м н ь ю 8 none none none none none none


In [14]:
class Network(torch.nn.Module):
    def __init__(self):
        super(Network, self).__init__()
        self.embed = torch.nn.Embedding(CHARS_NUM, 400)
        self.rnn = torch.nn.RNN(400, 1000, batch_first=True, nonlinearity='relu')
        self.linear = torch.nn.Linear(1000, CHARS_NUM)
        self.softmax = torch.nn.Softmax(dim=1)
        
    def forward(self, sentences, state=None):
        embed = self.embed(sentences)
        o, s = self.rnn(embed)
        fc = self.linear(o)
        out = self.softmax(fc)
        return out

In [15]:
model = Network()
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

In [16]:
summary(model, test_sentence(check).shape, dtypes=['torch.IntTensor'], device='cpu')

Layer (type:depth-idx)                   Output Shape              Param #
Network                                  [1, 40, 44]               --
├─Embedding: 1-1                         [1, 40, 400]              17,600
├─RNN: 1-2                               [1, 40, 1000]             1,402,000
├─Linear: 1-3                            [1, 40, 44]               44,044
├─Softmax: 1-4                           [1, 40, 44]               --
Total params: 1,463,644
Trainable params: 1,463,644
Non-trainable params: 0
Total mult-adds (M): 56.14
Input size (MB): 0.00
Forward/backward pass size (MB): 0.46
Params size (MB): 5.85
Estimated Total Size (MB): 6.32

In [17]:
def unwrap(batch):
    new_batch = torch.zeros(list(batch.shape) + [CHARS_NUM])
    for i, sentence in enumerate(batch):
        for j, ch in enumerate(sentence):
            new_batch[i][j][int(ch)] = 1
    return new_batch


In [18]:
for X, Y in Train:
    # print(model(X).shape)
    # print(unwrap(Y).shape)
    print(criterion(model(X),unwrap(Y)))

tensor(3.3539, grad_fn=<DivBackward1>)
tensor(3.3541, grad_fn=<DivBackward1>)
tensor(3.3537, grad_fn=<DivBackward1>)
tensor(3.3536, grad_fn=<DivBackward1>)
tensor(3.3541, grad_fn=<DivBackward1>)
tensor(3.3536, grad_fn=<DivBackward1>)
tensor(3.3540, grad_fn=<DivBackward1>)
tensor(3.3540, grad_fn=<DivBackward1>)
tensor(3.3539, grad_fn=<DivBackward1>)
tensor(3.3542, grad_fn=<DivBackward1>)
tensor(3.3538, grad_fn=<DivBackward1>)
tensor(3.3541, grad_fn=<DivBackward1>)
tensor(3.3537, grad_fn=<DivBackward1>)
tensor(3.3540, grad_fn=<DivBackward1>)
tensor(3.3537, grad_fn=<DivBackward1>)
tensor(3.3544, grad_fn=<DivBackward1>)
tensor(3.3537, grad_fn=<DivBackward1>)
tensor(3.3540, grad_fn=<DivBackward1>)
tensor(3.3538, grad_fn=<DivBackward1>)
tensor(3.3537, grad_fn=<DivBackward1>)
tensor(3.3538, grad_fn=<DivBackward1>)
tensor(3.3538, grad_fn=<DivBackward1>)
tensor(3.3538, grad_fn=<DivBackward1>)
tensor(3.3539, grad_fn=<DivBackward1>)
tensor(3.3537, grad_fn=<DivBackward1>)
tensor(3.3540, grad_fn=<D

In [None]:
epoch_num = 15
hist_train = np.array([])
hist_test = np.array([])
acc_train = np.array([])
acc_test = np.array([])

for X, Y in Train:
    ep_hist_train, ep_acc_train = 0, 0
    y_pred = model(X)
    l = criterion(y_pred, unwrap(Y))
    ep_hist_train += l.item()
    ep_acc_train += (y_pred.argmax(dim=2) == Y).sum().item()
    
for X, Y in Test:
    ep_hist_test, ep_acc_test = 0, 0
    y_pred = model(X)
    l = criterion(y_pred, unwrap(Y))
    ep_hist_test += l.item()
    ep_acc_test += (y_pred.argmax(dim=2) == Y).sum().item()
    
hist_train = np.append(hist_train, ep_hist_train)
hist_test = np.append(hist_test, ep_hist_test)
acc_train = np.append(acc_train, ep_acc_train/len(Data_train)/MAX_LENGTH*100)
acc_test = np.append(acc_test, ep_acc_test/len(Data_test)/MAX_LENGTH*100)

print('Initial Loss = ', hist_train[-1], ', ', hist_test[-1])
print('Initial Accuracy = ', acc_train[-1], '%, ', acc_test[-1], '%')

check_data = test_sentence(check)

for epoch in range(1, epoch_num+1):
    
    ep_hist_train, ep_hist_test = 0, 0
    ep_acc_train, ep_acc_test = 0, 0
    
    model.train()
    for X, Y in Train:
        optimizer.zero_grad()
        y_pred = model(X)
        l = criterion(y_pred, unwrap(Y))
        l.backward()
        optimizer.step()
        
        ep_hist_train += l.item()
        ep_acc_train += (y_pred.argmax(dim=2) == Y).sum().item()
    
    model.eval()
    for X, Y in Test:
        y_pred = model(X)
        l = criterion(y_pred, unwrap(Y))
        
        ep_hist_test += l.item()
        ep_acc_test += (y_pred.argmax(dim=2) == Y).sum().item()
    
    hist_train = np.append(hist_train, ep_hist_train/len(Data_train))
    hist_test = np.append(hist_test, ep_hist_test/len(Data_test))
    acc_train = np.append(acc_train, ep_acc_train/len(Data_train)/MAX_LENGTH*100)
    acc_test = np.append(acc_test, ep_acc_test/len(Data_test)/MAX_LENGTH*100)
    

    print('Done ', epoch, ' of ', epoch_num, ' which is ', 
            math.ceil(epoch/epoch_num*100), '%')
    print('Loss = ', hist_train[-1], ', ', hist_test[-1])
    print('Accuracy = ', acc_train[-1], ', ', acc_test[-1])
    check_result = model(check_data).argmax(dim=2)
    print('Тестовое предложение на эпохе:')
    print(print(*list(map(lambda x: INDEX_TO_CHAR[x], check_result.squeeze()))))
    

print('Training Done')

Initial Loss =  3.3536601066589355 ,  3.3537771701812744
Initial Accuracy =  0.0009900205924283226 %,  0.0039594551789673745 %
Done  1  of  15  which is  7 %
Loss =  0.33049261282953013 ,  0.32960900476472
Accuracy =  32.52257246950737 ,  39.57079505859994
Тестовое предложение на эпохе:
я 2 р ц и   ф ы   ч о г ъ 0 л 5 8 и   б е 3 8 ш ь й г   д е 3 т ь и none none none none none none
None
Done  2  of  15  which is  14 %
Loss =  0.32695602082836067 ,  0.32460440628733256
Accuracy =  37.92709488357358 ,  43.26259106746912
Тестовое предложение на эпохе:
я 2 6 ъ ч   1 щ   м о р г б л ы ю и   ъ д ц 0 0 н й 8   с е 3 0 ш ь none none none none none none
None
Done  3  of  15  which is  20 %
Loss =  0.32157798818376465 ,  0.3193069900709653
Accuracy =  46.21752732456835 ,  49.275419702248975
Тестовое предложение на эпохе:
1 9 6 ъ 4   ф 0   х т о а ж л ю й и   к 2 ч 5 с р а л   н а б т т ь none none none none none none
None
Done  4  of  15  which is  27 %
Loss =  0.31666789696142184 ,  0.31535791