In [1]:
from io import open
import re
import torch
import time
import numpy as np
import torch
from torch.utils.data import TensorDataset, DataLoader
from torch import nn

import warnings
warnings.filterwarnings("ignore")
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [2]:
LENGTH_SEQUENCE = 10
LEARNING_RATE = 0.05
NUM_EPOCHS = 500
BATCH_SIZE = 512
INPUT_DIM = 10

In [3]:
def generate_sequences(length = LENGTH_SEQUENCE):
    x = torch.randint(0, 10, (length,))

    y = torch.cat((x[0].unsqueeze(dim=0), torch.add(x[0], x[1:])))

    for i in y:
        if i >= 10:
            i -= 10

    return x.unsqueeze(dim=0), y.unsqueeze(dim=0)

In [4]:
def create_dataset(number_sequences = 10_000, length_sequence = LENGTH_SEQUENCE, test_size = 0.2, val_size = 0.1, BATCH_SIZE = BATCH_SIZE):
    x_train, y_train = generate_sequences(length_sequence)
    for _ in range(number_sequences - 1):
        x, y = generate_sequences(length_sequence)
        x_train = torch.cat((x_train, x))
        y_train = torch.cat((y_train, y))

    x_test, y_test = generate_sequences(length_sequence)
    for _ in range(round(number_sequences * test_size) - 1):
        x, y = generate_sequences(length_sequence)
        x_test = torch.cat((x_test, x))
        y_test = torch.cat((y_test, y))

    x_val, y_val = generate_sequences(length_sequence)
    for _ in range(round(number_sequences * val_size) - 1):
        x, y = generate_sequences(length_sequence)
        x_val = torch.cat((x_val, x))
        y_val = torch.cat((y_val, y))

    train_dataset = TensorDataset(x_train, y_train)
    test_dataset = TensorDataset(x_test, y_test)
    val_dataset = TensorDataset(x_val, y_val)

    train = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
    test = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=True)
    val = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=True)

    return train, test, val

In [5]:
class Model(torch.nn.Module):

    def __init__(self, model, embed_dim, hidden_dim, layer_dim):
        super().__init__()
        self.embed = nn.Embedding(INPUT_DIM, embed_dim)
        self.model = model(embed_dim, hidden_dim, layer_dim, batch_first=True)
        self.linear = nn.Linear(hidden_dim, INPUT_DIM)

    def forward(self, sentence, state=None):
        embed = self.embed(sentence)
        o, _ = self.model(embed)
        return self.linear(o)

In [6]:
class Training:
    def __init__(self, model, loss_fn, optimizer):
        self.model = model
        self.loss_fn = loss_fn
        self.optimizer = optimizer

    def train(self, train, test):
        for epoch in range(1, NUM_EPOCHS + 1):
            train_loss, train_accuracy, iter_num = .0, .0, .0
            start_epoch_time = time.time()
            self.model.train().to(device)
            for x, y in train:
                x = x.to(device)
                y = y.view(1, -1).squeeze().to(device)

                self.optimizer.zero_grad()

                out = self.model.forward(x).view(-1, INPUT_DIM)

                loss = self.loss_fn(out, y)
                train_loss += loss.item()

                batch_accuracy = (out.argmax(dim=1) == y)
                train_accuracy += batch_accuracy.sum().item() / batch_accuracy.shape[0]

                loss.backward()
                self.optimizer.step()
                iter_num += 1
            if (epoch < 2) | (epoch % 10 == 0):
                print(f"Epoch: {epoch}, loss: {train_loss:.4f}, acc: " f"{train_accuracy / iter_num:.4f}", end=" | ")

            test_loss, test_accuracy, iter_num = .0, .0, .0
            self.model.eval().to(device)
            for x, y in test:
                x = x.to(device)
                y = y.view(1, -1).squeeze().to(device)

                out = self.model.forward(x).view(-1, INPUT_DIM)

                loss = self.loss_fn(out, y)
                test_loss += loss.item()

                batch_accuracy = (out.argmax(dim=1) == y)
                test_accuracy += batch_accuracy.sum().item() / batch_accuracy.shape[0]
                iter_num += 1
            if (epoch < 2) | (epoch % 10 == 0):
                print(f"test loss: {test_loss:.4f}, test acc: {test_accuracy / iter_num:.4f} | " f"{time.time() - start_epoch_time:.2f} sec.")

In [7]:
train, test, val = create_dataset()

### LSTM

In [8]:
models = {
        "RNN": [nn.RNN, 32, 128, 5],
        "LSTM": [nn.LSTM, 32, 64, 1],
        "GRU": [nn.GRU, 32, 64, 1]
    }
model = Model(*models["LSTM"])
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=LEARNING_RATE)
training = Training(model, loss_fn, optimizer)
training.train(train, test)

Epoch: 1, loss: 46.1131, acc: 0.1033 | test loss: 9.2165, test acc: 0.1038 | 0.64 sec.
Epoch: 10, loss: 45.9358, acc: 0.1525 | test loss: 9.1825, test acc: 0.1565 | 0.38 sec.
Epoch: 20, loss: 45.8148, acc: 0.1900 | test loss: 9.1580, test acc: 0.1953 | 0.39 sec.
Epoch: 30, loss: 45.7207, acc: 0.1863 | test loss: 9.1389, test acc: 0.1909 | 0.40 sec.
Epoch: 40, loss: 45.6436, acc: 0.1873 | test loss: 9.1233, test acc: 0.1930 | 0.37 sec.
Epoch: 50, loss: 45.5826, acc: 0.1864 | test loss: 9.1105, test acc: 0.1925 | 0.43 sec.
Epoch: 60, loss: 45.5322, acc: 0.1863 | test loss: 9.0997, test acc: 0.1923 | 0.42 sec.
Epoch: 70, loss: 45.4903, acc: 0.1858 | test loss: 9.0911, test acc: 0.1920 | 0.37 sec.
Epoch: 80, loss: 45.4434, acc: 0.1866 | test loss: 9.0828, test acc: 0.1920 | 0.42 sec.
Epoch: 90, loss: 45.4000, acc: 0.1875 | test loss: 9.0751, test acc: 0.1922 | 0.39 sec.
Epoch: 100, loss: 45.3793, acc: 0.1863 | test loss: 9.0681, test acc: 0.1922 | 0.36 sec.
Epoch: 110, loss: 45.3451, acc: 

#### RNN

In [10]:
model = Model(*models["RNN"])
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=LEARNING_RATE)
training = Training(model, loss_fn, optimizer)
training.train(train, test)

Epoch: 1, loss: 46.1116, acc: 0.1002 | test loss: 9.2159, test acc: 0.0982 | 1.67 sec.
Epoch: 10, loss: 45.9807, acc: 0.1441 | test loss: 9.1943, test acc: 0.1479 | 1.53 sec.
Epoch: 20, loss: 45.8853, acc: 0.1651 | test loss: 9.1744, test acc: 0.1661 | 1.44 sec.
Epoch: 30, loss: 45.7732, acc: 0.1770 | test loss: 9.1502, test acc: 0.1834 | 1.54 sec.
Epoch: 40, loss: 45.6413, acc: 0.1800 | test loss: 9.1237, test acc: 0.1844 | 1.44 sec.
Epoch: 50, loss: 45.5102, acc: 0.1876 | test loss: 9.0965, test acc: 0.1920 | 1.35 sec.
Epoch: 60, loss: 45.3080, acc: 0.1865 | test loss: 9.0552, test acc: 0.1916 | 1.77 sec.
Epoch: 70, loss: 44.6807, acc: 0.1861 | test loss: 8.9179, test acc: 0.1911 | 1.44 sec.
Epoch: 80, loss: 43.1533, acc: 0.1877 | test loss: 8.5994, test acc: 0.1906 | 1.45 sec.
Epoch: 90, loss: 41.5113, acc: 0.1974 | test loss: 8.2725, test acc: 0.2078 | 1.40 sec.
Epoch: 100, loss: 40.3291, acc: 0.2088 | test loss: 8.0385, test acc: 0.2217 | 1.85 sec.
Epoch: 110, loss: 39.1934, acc: 

KeyboardInterrupt: 

### GRU

In [11]:
model = Model(*models["GRU"])
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=LEARNING_RATE)
training = Training(model, loss_fn, optimizer)
training.train(train, test)

Epoch: 1, loss: 46.1799, acc: 0.1099 | test loss: 9.2206, test acc: 0.1086 | 0.58 sec.
Epoch: 10, loss: 45.6938, acc: 0.1840 | test loss: 9.1323, test acc: 0.1885 | 0.59 sec.
Epoch: 20, loss: 45.5443, acc: 0.1862 | test loss: 9.1007, test acc: 0.1921 | 0.50 sec.
Epoch: 30, loss: 45.4809, acc: 0.1857 | test loss: 9.0870, test acc: 0.1919 | 0.57 sec.
Epoch: 40, loss: 45.4240, acc: 0.1867 | test loss: 9.0777, test acc: 0.1918 | 0.49 sec.
Epoch: 50, loss: 45.3918, acc: 0.1861 | test loss: 9.0701, test acc: 0.1917 | 0.68 sec.
Epoch: 60, loss: 45.3481, acc: 0.1864 | test loss: 9.0617, test acc: 0.1920 | 0.49 sec.
Epoch: 70, loss: 45.3105, acc: 0.1862 | test loss: 9.0533, test acc: 0.1919 | 0.53 sec.
Epoch: 80, loss: 45.2502, acc: 0.1869 | test loss: 9.0438, test acc: 0.1920 | 0.51 sec.
Epoch: 90, loss: 45.1976, acc: 0.1865 | test loss: 9.0313, test acc: 0.1923 | 0.49 sec.
Epoch: 100, loss: 45.1216, acc: 0.1868 | test loss: 9.0175, test acc: 0.1920 | 0.47 sec.
Epoch: 110, loss: 45.0297, acc: 

In [12]:
x, y = generate_sequences()

x = x.to(device)
y = y.to(device)
out = model.forward(x).view(-1, INPUT_DIM).argmax(dim=1)

print(f"Original sentence is : {x}")
print("-" * 100)
print(f"Encrypted sentence is : {y}")
print("-" * 100)
print(f"Predicted sentence is : {out}")

Original sentence is : tensor([[4, 0, 6, 0, 9, 5, 8, 6, 5, 3]])
----------------------------------------------------------------------------------------------------
Encrypted sentence is : tensor([[4, 4, 0, 4, 3, 9, 2, 0, 9, 7]])
----------------------------------------------------------------------------------------------------
Predicted sentence is : tensor([4, 4, 0, 4, 3, 9, 2, 0, 9, 7])
