In [39]:
import torch
import re
import random
import time
import warnings
from random import shuffle
warnings.filterwarnings("ignore")

In [40]:
BATCH_SIZE = 10
STRING_SIZE = 60
NUM_EPOCHS = 20
LEARNING_RATE = 0.05
SHIFT = 15
CHARS = set('abcdefghijklmnopqrstuvwxyz')
INDEX_TO_CHAR = [' '] + [w for w in CHARS]
CHAR_TO_INDEX = {w: i for i, w in enumerate(INDEX_TO_CHAR)}

In [43]:
class Data:
    def __init__(self, file_name, step):
        self.file_name = file_name
        self.step = step
        self.text_array, self.X, self.Y = [], [], []
        self.X_tensor, self.Y_tensor = torch.empty(1, 1), torch.empty(1, 1)
        self.data = []

    def get_text_array(self):
        with open(self.file_name, encoding='utf-8') as file:
            while True:
                text = file.read(self.step).lower()
                if not text:
                    break
                self.text_array.append(text)
        del self.text_array[-1]
        file.close()
        return self.text_array

    def shuffle_data(self):
        shuffle(self.text_array)
        return self.text_array

    def create_data_tensors(self):
        for sen in self.text_array:
            y = [CHAR_TO_INDEX.get(char, CHAR_TO_INDEX[' ']) for char in sen]
            x = [CHAR_TO_INDEX.get((chr((ord(char) - 97 + SHIFT) % 26 + 97) if char.isalpha() else char), CHAR_TO_INDEX[' ']) for char in sen]
            self.Y.append(y)
            self.X.append(x)
            self.X_tensor = torch.tensor(self.X)
            self.Y_tensor = torch.tensor(self.Y)
        return self.X_tensor, self.Y_tensor

    def use_dataloader(self):
        dataset = torch.utils.data.TensorDataset(self.X_tensor, self.Y_tensor)
        self.data = torch.utils.data.DataLoader(dataset, BATCH_SIZE, shuffle=True)
        return self.data

    def preprocess_data_val(self):
        self.get_text_array()
        self.shuffle_data()
        self.create_data_tensors()
        return self.X_tensor, self.Y_tensor

    def preprocess_data(self):
        self.preprocess_data_val()
        self.use_dataloader()
        return self.data


In [44]:
data_train = Data(file_name = 'text.txt', step = STRING_SIZE).preprocess_data()

In [45]:
data_test = Data(file_name = 'test.txt', step = STRING_SIZE).preprocess_data()

In [46]:
class RNNModel(torch.nn.Module):
    
    def __init__(self):
        super().__init__()
        self.embed = torch.nn.Embedding(len(CHAR_TO_INDEX), 32)
        self.rnn = torch.nn.RNN(32, 128, batch_first=True)
        self.linear = torch.nn.Linear(128, len(CHAR_TO_INDEX))

    def forward(self, sentence, state=None):
        embed = self.embed(sentence)
        o, h = self.rnn(embed)
        return self.linear(o)

In [47]:
model = RNNModel()
loss = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=LEARNING_RATE)

In [48]:
for epoch in range(NUM_EPOCHS):
    train_loss, train_acc, iter_num = .0, .0, .0
    start_epoch_time = time.time()
    model.train()
    for x_in, y_in in data_train:
        x_in = x_in
        y_in = y_in.view(1, -1).squeeze()
        optimizer.zero_grad()
        out = model.forward(x_in).view(-1, len(CHAR_TO_INDEX))
        l = loss(out, y_in)
        train_loss += l.item()
        batch_acc = (out.argmax(dim=1) == y_in)
        train_acc += batch_acc.sum().item() / batch_acc.shape[0]
        l.backward()
        optimizer.step()
        iter_num += 1
    print(
        f"Epoch: {epoch}, loss: {train_loss:.4f}, acc: "
        f"{train_acc / iter_num:.4f}",
        end=" | "
    )
    test_loss, test_acc, iter_num = .0, .0, .0
    model.eval()
    for x_in, y_in in data_test:
        x_in = x_in
        y_in = y_in.view(1, -1).squeeze()
        out = model.forward(x_in).view(-1, len(CHAR_TO_INDEX))
        l = loss(out, y_in)
        test_loss += l.item()
        batch_acc = (out.argmax(dim=1) == y_in)
        test_acc += batch_acc.sum().item() / batch_acc.shape[0]
        iter_num += 1
    print(
        f"test loss: {test_loss:.4f}, test acc: {test_acc / iter_num:.4f} | "
        f"{time.time() - start_epoch_time:.2f} sec."
    )    

Epoch: 0, loss: 72.1655, acc: 0.6383 | test loss: 14.9163, test acc: 0.8328 | 1.94 sec.
Epoch: 1, loss: 36.7910, acc: 0.8777 | test loss: 8.5558, test acc: 0.9039 | 1.76 sec.
Epoch: 2, loss: 22.6537, acc: 0.9230 | test loss: 5.5947, test acc: 0.9480 | 1.65 sec.
Epoch: 3, loss: 15.4741, acc: 0.9572 | test loss: 3.9234, test acc: 0.9693 | 1.59 sec.
Epoch: 4, loss: 11.1936, acc: 0.9711 | test loss: 2.8760, test acc: 0.9838 | 1.69 sec.
Epoch: 5, loss: 8.4749, acc: 0.9823 | test loss: 2.2320, test acc: 0.9862 | 1.51 sec.
Epoch: 6, loss: 6.6604, acc: 0.9858 | test loss: 1.7641, test acc: 0.9884 | 1.43 sec.
Epoch: 7, loss: 5.3881, acc: 0.9884 | test loss: 1.4455, test acc: 0.9898 | 1.70 sec.
Epoch: 8, loss: 4.4777, acc: 0.9905 | test loss: 1.2061, test acc: 0.9917 | 1.44 sec.
Epoch: 9, loss: 3.7952, acc: 0.9917 | test loss: 1.0365, test acc: 0.9951 | 1.50 sec.
Epoch: 10, loss: 3.2793, acc: 0.9949 | test loss: 0.8925, test acc: 0.9970 | 1.48 sec.
Epoch: 11, loss: 2.8677, acc: 0.9961 | test los

In [49]:
X_val, Y_val = Data(file_name = 'val.txt', step = STRING_SIZE).preprocess_data_val()

In [51]:
idx = 0
val_results = model(X_val).argmax(dim=2)
val_acc = (val_results == Y_val).flatten()
val_acc = (val_acc.sum() / val_acc.shape[0]).item()
out_sentence = "".join([INDEX_TO_CHAR[i] for i in val_results[idx]])
true_sentence = "".join([INDEX_TO_CHAR[i] for i in Y_val[idx]])
print(f"Validation accuracy is : {val_acc:.4f}")
print("-" * 20)
print(f"Validation sentence is: \"{out_sentence}\"")
print("-" * 20)
print(f"True sentence is:       \"{true_sentence}\"")

Validation accuracy is : 0.9967
--------------------
Validation sentence is: "to an english lawyer named norton     but she could not love"
--------------------
True sentence is:       "to an english lawyer named norton     but she could not love"


Вывод: перед нами стояла задача дешифровки шифра Цезаря. Для ее решения была использована простая модель рекуррентной нейронной сети. Точность дешифровки на валидационной выборке составила 0,9967.