##### Обучена простая рекуррентная нейронную сеть (без GRU/LSTM, без внимания) решающая задачу дешифровки шифра Цезаря:
##### Написан алгоритм шифра Цезаря для генерации выборки (сдвиг на CAESAR_SHIFT каждой буквы), для латиницы.

In [12]:
from io import open
import re
import torch
import time
import numpy as np
import torch
from torch.utils.data import TensorDataset, DataLoader
from torch import nn

import warnings
warnings.filterwarnings("ignore")
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [13]:
CAESAR_SHIFT = 4
LEARNING_RATE = 0.5
NUM_EPOCHS = 30
BATCH_SIZE = 512

##### Читаем текст из файла и делаем препроцессинг: переводим все в нижний регистр и все небуквенные символы заменяем пробелами.

In [14]:
def reading_and_preprocessing(text_path):
    with open(text_path, encoding='utf-8') as txt_file:
        text = txt_file.read().lower()
    text = re.sub('[^a-z ]', ' ', text)
    text = re.sub('\s+', ' ', text)
    txt_file.close()
    return text

##### Шифрует текст с заданным сдвигом.

In [15]:
def encryption(text, shift = CAESAR_SHIFT):
    alphabet_lower = 'abcdefghijklmnopqrstuvwxyz'
    cipher = ""
    for letter in text:
        if letter in alphabet_lower:
            position = alphabet_lower.index(letter)
            new_position = (position + shift) % 26
            letter = alphabet_lower[new_position]
            cipher += letter
        else:
            cipher += letter
    return cipher

##### Дает на выходе список символов и словарь для данного набора символов.

In [16]:
def indexing_chars(text):
    INDEX_TO_CHAR = sorted(list(set(text)))
    CHAR_TO_INDEX = {c: i for i, c in enumerate(INDEX_TO_CHAR)}
    return INDEX_TO_CHAR, CHAR_TO_INDEX

##### Переводит символы в индексы.

In [17]:
def char_to_index(text):
    indexes = torch.zeros((len(text)), dtype=int)
    _, CHAR_TO_INDEX = indexing_chars(text)
    for c, char in enumerate(text):
        indexes[c] = CHAR_TO_INDEX[char]
    return indexes

##### Переводит индексы в символы по данному алфавиту.

In [18]:
def index_to_char(indexes, alphabet):
    sentence = "".join(sorted(list(set(alphabet)))[i] for i in indexes.flatten())
    return sentence

##### Формируем из сплошного текста датасет.

In [19]:
def create_dataset(text, length = 40, step = 20, shift = CAESAR_SHIFT, BATCH_SIZE = BATCH_SIZE, test_size = 0.2):
    sentences = []
    encrypted_sentences = []

    for i in range(0, len(text) - length, step):
        sentences.append(text[i: i + length])
        encrypted_sentences.append(encryption(text[i: i + length], shift))

    X = torch.zeros((len(sentences), length), dtype=int)
    Y = torch.zeros((len(sentences), length), dtype=int)

    for i, sent in enumerate(encrypted_sentences):
        for c, char in enumerate(sent):
            X[i, c] = CHAR_TO_INDEX[char]

    for i, sent in enumerate(sentences):
        for c, char in enumerate(sent):
            Y[i, c] = CHAR_TO_INDEX[char]

    percent = round(len(sentences) * test_size)

    X_train = X[percent:, :]
    X_test = X[:percent, :]
    Y_train = Y[percent:, :]
    Y_test = Y[:percent, :]

    dataset_train = TensorDataset(X_train, Y_train)
    train = DataLoader(dataset_train, BATCH_SIZE, shuffle=True)

    dataset_test = TensorDataset(X_test, Y_test)
    test = DataLoader(dataset_test, BATCH_SIZE, shuffle=True)

    return train, test

In [20]:
class RNN_Model(torch.nn.Module):

    def __init__(self):
        super().__init__()
        self.embed = nn.Embedding(len(set(text)), 32)
        self.rnn = nn.RNN(32, 128, batch_first=True)
        self.linear = nn.Linear(128, len(set(text)))

    def forward(self, sentence, state=None):
        embed = self.embed(sentence)
        o, h = self.rnn(embed)
        return self.linear(o)

In [21]:
class Training(object):
    def __init__(self, model, loss_fn, optimizer):
        self.model = model
        self.loss_fn = loss_fn
        self.optimizer = optimizer

    def train(self, train, test):
        for epoch in range(1, NUM_EPOCHS + 1):
            train_loss, train_accuracy, iter_num = .0, .0, .0
            start_epoch_time = time.time()
            self.model.train().to(device)
            for x, y in train:
                x = x.to(device)
                y = y.view(1, -1).squeeze().to(device)

                self.optimizer.zero_grad()

                out = self.model.forward(x).view(-1, len(set(text)))

                loss = self.loss_fn(out, y)
                train_loss += loss.item()

                batch_accuracy = (out.argmax(dim=1) == y)
                train_accuracy += batch_accuracy.sum().item() / batch_accuracy.shape[0]

                loss.backward()
                self.optimizer.step()
                iter_num += 1
            if (epoch < 2) | (epoch % 5 == 0):
                print(f"Epoch: {epoch}, loss: {train_loss:.4f}, acc: " f"{train_accuracy / iter_num:.4f}", end=" | ")

            test_loss, test_accuracy, iter_num = .0, .0, .0
            self.model.eval().to(device)
            for x, y in test:
                x = x.to(device)
                y = y.view(1, -1).squeeze().to(device)

                out = self.model.forward(x).view(-1, len(set(text)))

                loss = self.loss_fn(out, y)
                test_loss += loss.item()

                batch_accuracy = (out.argmax(dim=1) == y)
                test_accuracy += batch_accuracy.sum().item() / batch_accuracy.shape[0]
                iter_num += 1
            if (epoch < 2) | (epoch % 5 == 0):
                print(f"test loss: {test_loss:.4f}, test acc: {test_accuracy / iter_num:.4f} | " f"{time.time() - start_epoch_time:.2f} sec.")

In [22]:
text = reading_and_preprocessing('Fathers_and_Sons.txt')
INDEX_TO_CHAR, CHAR_TO_INDEX = indexing_chars(text)
train, test = create_dataset(text)

In [23]:
model = RNN_Model()
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=LEARNING_RATE)
training = Training(model, loss_fn, optimizer)
training.train(train, test)

Epoch: 1, loss: 20.7042, acc: 0.9088 | test loss: 0.8724, test acc: 0.9960 | 9.93 sec.
Epoch: 5, loss: 0.5595, acc: 0.9984 | test loss: 0.1283, test acc: 0.9982 | 0.40 sec.
Epoch: 10, loss: 0.2140, acc: 0.9998 | test loss: 0.0521, test acc: 0.9998 | 0.47 sec.
Epoch: 15, loss: 0.1183, acc: 1.0000 | test loss: 0.0296, test acc: 1.0000 | 0.41 sec.
Epoch: 20, loss: 0.0795, acc: 1.0000 | test loss: 0.0200, test acc: 1.0000 | 0.47 sec.
Epoch: 25, loss: 0.0595, acc: 1.0000 | test loss: 0.0152, test acc: 1.0000 | 0.39 sec.
Epoch: 30, loss: 0.0475, acc: 1.0000 | test loss: 0.0121, test acc: 1.0000 | 0.63 sec.


In [24]:
sentence = "the quick brown fox jumps over the lazy dog"
alphabet, _ = indexing_chars(sentence)

sentence_idx = char_to_index(sentence).to(device)

encrypted_sentence = encryption(sentence, CAESAR_SHIFT)

encrypted_sentence_idx = char_to_index(encrypted_sentence).to(device)

result = model(encrypted_sentence_idx).argmax(dim=1)

decrypted_sentence = index_to_char(result, alphabet)

print(f"Original sentence is : {sentence}")
print("-" * 100)
print(f"Encrypted sentence is : {encrypted_sentence}")
print("-" * 100)
print(f"Decrypted sentence is : {decrypted_sentence}")
print("-" * 100)
print(f"Accuracy is : {(result == sentence_idx).sum().item() / (result == sentence_idx).shape[0]}")

Original sentence is : the quick brown fox jumps over the lazy dog
----------------------------------------------------------------------------------------------------
Encrypted sentence is : xli uymgo fvsar jsb nyqtw sziv xli pedc hsk
----------------------------------------------------------------------------------------------------
Decrypted sentence is : the quick brown fox jumps over the lazy dog
----------------------------------------------------------------------------------------------------
Accuracy is : 1.0
