# Preamble

Using a simpler LSTM model based on one found in [Learning the Enigma with Recurrent Neural Networks by Sam Greydanus](https://doi.org/10.48550/arXiv.1708.07576), we manage an effective Seq2Seq translation between caesar cipher encrypted text and the unencrypted text.

## Variables

In [None]:
PAD_TOKEN = 27
SOS_TOKEN = 29
EOS_TOKEN = 28

In [None]:
DATA_AMOUNT = [50000, 150]
FIXED_LENGTH = int(DATA_AMOUNT[1]*5)
TRAIN_SPLIT = 0.8
STREAM = False

In [None]:
BATCH_SIZE = 64
EMBEDDING_DIM = 256
HIDDEN_DIM = 300
DROPOUT = 0.5
VOCAB_SIZE = 30
NUM_EPOCHS = 5
LR = 0.001
CLIP = 1.0

## Imports

In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
from modules.data_handling import get_text
from modules.data_handling import lstm_data
from modules.encryption import caesar
from modules.encryption import substitution
from modules.encryption import enigma_encryptor as enigma



from tqdm.autonotebook import tqdm
import random
import os

In [None]:
import torch
from torch import nn
import torch.nn.functional as F
import torch.optim as optim

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"
device

## Creating Data

In [None]:
# encryption = lambda x: caesar.rand_encrypt(x)
# encryption = lambda x: substitution.rand_encrypt(x)
encryption = lambda x: enigma.rand_encrypt(x)


trainData, testData = lstm_data.initialise(encryption, *DATA_AMOUNT, TRAIN_SPLIT, stream=STREAM, fixed_length=FIXED_LENGTH )
train_loader, test_loader = lstm_data.data2loader(trainData, testData, BATCH_SIZE=BATCH_SIZE)

# Model


The model used borrows heavily from the [PyTorch Seq2Seq tutorial](https://github.com/bentrevett/pytorch-seq2seq/blob/main/1%20-%20Sequence%20to%20Sequence%20Learning%20with%20Neural%20Networks.ipynb). And a paper I found on exactly this topic to which I am very grateful called [Learning the Enigma with Recurrent Neural Networks by Sam Greydanus](https://doi.org/10.48550/arXiv.1708.07576).

In [None]:
class Seq2Seq(nn.Module):
    def __init__(self, input_dim=VOCAB_SIZE, embedding_dim=EMBEDDING_DIM, hidden_dim=HIDDEN_DIM, dropout=DROPOUT):
        super().__init__()
        self.embedding = nn.Embedding(input_dim, embedding_dim)
        self.rnn = nn.LSTM(embedding_dim, hidden_dim, dropout=dropout)
        self.dropout = nn.Dropout(dropout)
        self.fc_out = nn.Linear(hidden_dim, input_dim)

    def forward(self, x):
        x = self.dropout(self.embedding(x))
        x, (hidden, cell) = self.rnn(x)
        return self.fc_out(x)

In [None]:
model = Seq2Seq().to(device)
if os.name != "nt":
    model = torch.compile(model)
    print("compiled")
model

In [None]:
def init_weights(m):
    for name, param in m.named_parameters():
        nn.init.uniform_(param.data, -0.08, 0.08)


model.apply(init_weights)

# Training

In [None]:
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

print(f"The model has {count_parameters(model):,} trainable parameters")

In [None]:
optimiser = optim.Adam(model.parameters(), lr=LR)
loss_fn = nn.CrossEntropyLoss(ignore_index=PAD_TOKEN)

In [None]:
def train_fn(model, clip, data_loader=train_loader, optimiser=optimiser, loss_fn=loss_fn,  device=device):
    model.train()
    epoch_loss = 0
    for i, (X, y) in enumerate(data_loader):
        X, y = X.to(device).permute(1, 0), y.to(device).permute(1, 0)
        optimiser.zero_grad()
        output = model(X)
        output_dim = output.shape[-1]
        output = output[1:].contiguous().view(-1, output_dim)
        y = y[1:].contiguous().view(-1)
        loss = loss_fn(output, y)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
        optimiser.step()
        epoch_loss += loss.item()
    return epoch_loss / len(data_loader)

In [None]:
def evaluate_fn(model, data_loader=test_loader, loss_fn=loss_fn, device=device):
    model.eval()
    epoch_loss = 0
    with torch.inference_mode():
        for i, (X, y) in enumerate(data_loader):
            X, y = X.to(device).permute(1, 0), y.to(device).permute(1, 0)
            output = model(X)
            output_dim = output.shape[-1]
            output = output[1:].contiguous().view(-1, output_dim)
            y = y[1:].contiguous().view(-1)
            loss = loss_fn(output, y)
            epoch_loss += loss.item()
    return epoch_loss / len(data_loader)



In [None]:
for epoch in tqdm(range(NUM_EPOCHS)):
    train_loss = train_fn(
        model=model,
        clip=CLIP,
    )
    valid_loss = evaluate_fn(
        model,
    )
    print(f"Train Loss: {train_loss:.3f} | Test Loss {valid_loss:.3f}")

# Example useage

In [None]:
model.eval()
test_string = "The quick brown fox jumps over the lazy dog"
true_key = 14
enc_text, _ = caesar.encrypt(test_string, key=true_key)
print(f"Original Text: '{test_string}'")
print(f"Encrypted Text: '{enc_text}'")

input_tensor = torch.tensor([SOS_TOKEN] + get_text.string2_num_list(enc_text) + [EOS_TOKEN], dtype=torch.long).unsqueeze(1).to(device)

with torch.inference_mode():
    trg_tensor = torch.zeros(len(input_tensor), 1, dtype=torch.long).to(device)
    output = model(input_tensor)
predicted_indexes = output.argmax(2).squeeze(1)
predicted_chars = []
for idx in predicted_indexes:
    if idx.item() == EOS_TOKEN:
        break
    if idx.item() == 0:
        predicted_chars.append(" ")
    else:
        predicted_chars.append(chr(idx.item() -1 + ord('a')))

predicted_text = "".join(predicted_chars)
print(f"Model Prediction (Text): {predicted_text}")

# Save the Model

In [None]:
torch.save(obj=model.state_dict(), f="models/01_LSTM.pt")