Task: Train a neural network to decrypt text encoded with the Caesar cipher.

The essence of the Caesar cipher is as follows: each letter of the alphabet is replaced by another letter that is shifted a fixed number of positions forward or backward in the alphabet.

In this case, the encryption process involves the following steps:

A key (shift) is chosen, which determines how many positions each letter will be shifted.

Each letter of the original text is replaced with the one that is located the chosen number of positions ahead in the alphabet.


For training the neural network, we will use the text used in the practical session. We will load it, remove punctuation marks (except for internal spaces), and slice it into chunks of 60 characters to obtain approximately 10,000 training examples.

In [25]:
!wget https://s3.amazonaws.com/text-datasets/nietzsche.txt

--2025-05-09 17:36:15--  https://s3.amazonaws.com/text-datasets/nietzsche.txt
Resolving s3.amazonaws.com (s3.amazonaws.com)... 52.216.179.61, 52.217.19.38, 52.217.122.96, ...
Connecting to s3.amazonaws.com (s3.amazonaws.com)|52.216.179.61|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 600901 (587K) [text/plain]
Saving to: ‘nietzsche.txt.1’


2025-05-09 17:36:18 (397 KB/s) - ‘nietzsche.txt.1’ saved [600901/600901]



In [27]:
import torch
from torch import nn
import re
import random
import tqdm
import time
from torch.utils.data import DataLoader, Dataset

with open('nietzsche.txt', encoding='utf-8') as f:
    text = f.read()

print('Original length:', len(text))

def preprocess_text(text):
    text = text.lower()
    text = re.sub('[^a-z ]', ' ', text)
    text = re.sub(r'\s+', ' ', text).strip()
    return text
    
text = preprocess_text(text)
print('Processed length:', len(text))

# We will take slices of 60 characters, so we will trim the text to ensure it divides evenly into chunks of 60 characters. Otherwise, we will have slices of different lengths.
text = text[:(len(text) // 60) * 60]

example_length = 60
#Create slices
examples = [text[i:i + example_length] for i in range(0, len(text), example_length)]

# Remove empty lines (which consist only of spaces)
examples = [example for example in examples if len(example) == 60]

print(f"Processed examples: {len(examples)}")
print(f"Example length: {len(examples[0]) if examples else 'No examples'}")

Original length: 600893
Processed length: 579262
Processed examples: 9654
Example length: 60


In [29]:
print(text)



In [31]:
print(examples)



Let's create an encryption function with a random shift.

In [36]:
import random

def caesar_cipher(text, shift):
    encrypted_text = ''
    for char in text:
        if char.isalpha():
            if char.islower():
                new_char = chr(((ord(char) - ord('a') + shift) % 26) + ord('a'))
            else:
                new_char = chr(((ord(char) - ord('A') + shift) % 26) + ord('A'))
        else:
            new_char = char
        
        encrypted_text += new_char

    return encrypted_text

encrypted_examples = [caesar_cipher(example, random.randint(1, 10)) for example in examples]
print(f"Original: {examples[1:3]}")
print(f"Encrypted: {encrypted_examples[1:3]}")


Original: ['ot ground for suspecting that all philosophers in so far as ', 'they have been dogmatists have failed to understand women th']
Encrypted: ['xc paxdwm oxa bdbynlcrwp cqjc juu yqruxbxyqnab rw bx oja jb ', 'bpmg pidm jmmv lwouibqaba pidm niqtml bw cvlmzabivl ewumv bp']


Let's combine the original and encrypted examples into one file, organized as a list of tuples

In [40]:
data = list(zip(encrypted_examples, examples))
print(data)



Let's create an alphabet as a set of unique characters in the text, with a space indexed separately. Then, we'll define functions to convert characters to indices and vice versa. Based on these indices, we will create tensors, since tensors work with numerical representations

In [44]:
chars = sorted(list(set(text)))
char_to_idx = {ch: i for i, ch in enumerate(chars)}
idx_to_char = {i: ch for i, ch in enumerate(chars)}

def text_to_tensor(text, char_to_idx):
    return torch.tensor([char_to_idx[ch] for ch in text], dtype=torch.long)

def tensor_to_text(tensor, idx_to_char):
    return ''.join([idx_to_char[idx.item()] for idx in tensor])

data_tensors = [(text_to_tensor(enc, char_to_idx), text_to_tensor(dec, char_to_idx)) for enc, dec in data]

In [46]:
print(chars)
print(char_to_idx)
print(idx_to_char)

[' ', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z']
{' ': 0, 'a': 1, 'b': 2, 'c': 3, 'd': 4, 'e': 5, 'f': 6, 'g': 7, 'h': 8, 'i': 9, 'j': 10, 'k': 11, 'l': 12, 'm': 13, 'n': 14, 'o': 15, 'p': 16, 'q': 17, 'r': 18, 's': 19, 't': 20, 'u': 21, 'v': 22, 'w': 23, 'x': 24, 'y': 25, 'z': 26}
{0: ' ', 1: 'a', 2: 'b', 3: 'c', 4: 'd', 5: 'e', 6: 'f', 7: 'g', 8: 'h', 9: 'i', 10: 'j', 11: 'k', 12: 'l', 13: 'm', 14: 'n', 15: 'o', 16: 'p', 17: 'q', 18: 'r', 19: 's', 20: 't', 21: 'u', 22: 'v', 23: 'w', 24: 'x', 25: 'y', 26: 'z'}


Let's create a CaesarDataset class, which will be convenient to feed into a DataLoader (DataLoader accepts datasets only in the form of such a class instance)

In [50]:
class CaesarDataset(Dataset):
    def __init__(self, data):
        self.data = data
    
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        return self.data[idx]

dataset = CaesarDataset(data_tensors)
dataloader = DataLoader(dataset, batch_size=64, shuffle=True)

Creating an LSTM Network Architecture

nn.Embedding — a layer for converting input character indices into dense vectors of fixed size (hidden_size).

nn.LSTM — the LSTM layer itself, which will process the sequence by passing it through hidden states. It takes an input tensor of shape [batch_size, seq_length, hidden_size] and returns a sequence of hidden states.

nn.Linear — a linear layer that converts the LSTM hidden state into output data of size output_size.

Forward Pass:

h0 and c0 — initial hidden states and cell states of the LSTM. For each batch, zero vectors are created corresponding to the number of LSTM layers.

embedding(x) — the input sequence of characters (as indices) is converted into dense vectors of size hidden_size.

lstm(out, (h0, c0)) — the LSTM processes the input sequence, updating the hidden states at each step and returning the final state.

fc(out) — the linear layer converts the output hidden states of the LSTM into the result of the desired output_size.

In [54]:
class CaesarEndecoder(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, num_layers=2):
        super(CaesarEndecoder, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.embedding = nn.Embedding(input_size, hidden_size)
        self.lstm = nn.LSTM(hidden_size, hidden_size, num_layers, dropout=0.2, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)
    
    def forward(self, x):
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        
        out = self.embedding(x)
        out, _ = self.lstm(out, (h0, c0))
        out = self.fc(out)
        return out


Let's also create a decryption function that will use our model

In [57]:
def decode_text(model, tensor, device):
    model.eval()
    with torch.no_grad():
        tensor = tensor.unsqueeze(0).to(device)
        output = model(tensor)
        _, decoded = torch.max(output, 2)
    return decoded.squeeze()

In [59]:
import os
torch.cuda.empty_cache()
os.environ["CUDA_LAUNCH_BLOCKING"] = "1"

Starting the Training.

We will define the evaluation criterion and optimizer. We will iterate over the data in the DataLoader (where enc is the encrypted part that we feed into the model for processing, and orig is the original, unchanged text, i.e., the expected output from the model)

In [63]:


device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

model = CaesarEndecoder(input_size=len(chars), hidden_size=512, output_size=len(chars)).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.0001)

num_epochs = 50
for epoch in range(num_epochs):
    total_loss = 0

    for enc, dec in dataloader:
        enc, dec = enc.to(device), dec.to(device)

        optimizer.zero_grad()

        output_dec = model(enc)
        loss_dec = criterion(output_dec.view(-1, len(chars)), dec.view(-1))

        output_enc = model(dec)
        loss_enc = criterion(output_enc.view(-1, len(chars)), enc.view(-1))

        loss = loss_dec + loss_enc
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    avg_loss = total_loss / len(dataloader)
    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {avg_loss:.4f}')

print("Training is completed!")


Epoch [1/50], Loss: 5.1345
Epoch [2/50], Loss: 3.6742
Epoch [3/50], Loss: 3.1004
Epoch [4/50], Loss: 2.8142
Epoch [5/50], Loss: 2.6021
Epoch [6/50], Loss: 2.4356
Epoch [7/50], Loss: 2.3192
Epoch [8/50], Loss: 2.2435
Epoch [9/50], Loss: 2.1957
Epoch [10/50], Loss: 2.1609
Epoch [11/50], Loss: 2.1357
Epoch [12/50], Loss: 2.1176
Epoch [13/50], Loss: 2.1035
Epoch [14/50], Loss: 2.0927
Epoch [15/50], Loss: 2.0829
Epoch [16/50], Loss: 2.0806
Epoch [17/50], Loss: 2.0689
Epoch [18/50], Loss: 2.0630
Epoch [19/50], Loss: 2.0581
Epoch [20/50], Loss: 2.0534
Epoch [21/50], Loss: 2.0489
Epoch [22/50], Loss: 2.0456
Epoch [23/50], Loss: 2.0417
Epoch [24/50], Loss: 2.0389
Epoch [25/50], Loss: 2.0353
Epoch [26/50], Loss: 2.0330
Epoch [27/50], Loss: 2.0305
Epoch [28/50], Loss: 2.0275
Epoch [29/50], Loss: 2.0244
Epoch [30/50], Loss: 2.0281
Epoch [31/50], Loss: 2.0206
Epoch [32/50], Loss: 2.0171
Epoch [33/50], Loss: 2.0141
Epoch [34/50], Loss: 2.0122
Epoch [35/50], Loss: 2.0087
Epoch [36/50], Loss: 2.0056
E

Increasing the number of epochs to 50 resulted in a 1% improvement in accuracy, which could be significant, as we are working with textual information

We can see that the loss is decreasing. However, we have to evaluate the model by testing it on a specific phrase to assess its performance

Let's preprocess the test text

In [66]:
test_text = """He who seeks to go downward
will be quickly swallowed by the abyss of this place!
Only you, Zarathustra,
love to grow on the slope
of the towering pine above it!
Her roots reach down
where the rock itself
looks in horror into the abyss;
she clings to it
where all falls downward;
where impatience reigns,
of the falling stones and the waterfall,
she, the patient one,
stands firm, silent, and alone"""

test_text = preprocess_text(test_text)
test_text_enc = caesar_cipher(test_text, random.randint(1, 5))
print(test_text)


he who seeks to go downward will be quickly swallowed by the abyss of this place only you zarathustra love to grow on the slope of the towering pine above it her roots reach down where the rock itself looks in horror into the abyss she clings to it where all falls downward where impatience reigns of the falling stones and the waterfall she the patient one stands firm silent and alone


Decode it

In [69]:
test_tensor = text_to_tensor(test_text_enc, char_to_idx).to(device)

decoded_tensor = decode_text(model, test_tensor, device)
decoded_text = tensor_to_text(decoded_tensor, idx_to_char)

print("Original text:", test_text)
print("\n" + "-"*50 + "\n") 
print("Decoded text:", decoded_text)




Original text: he who seeks to go downward will be quickly swallowed by the abyss of this place only you zarathustra love to grow on the slope of the towering pine above it her roots reach down where the rock itself looks in horror into the abyss she clings to it where all falls downward where impatience reigns of the falling stones and the waterfall she the patient one stands firm silent and alone

--------------------------------------------------

Decoded text: ee who seeks to go downward will be quickly swallowed by the abyss of this place only you yarathustra love to grow on the slope of the towering pine above it her roots reach down where the rock itself looks in horror into the abyss she clings to it where all falls downward where impatience reigns of the falling stones and the waterfall she the patient one stands firm silent and alone


It works correct, though not ideally

Let's take one more example

In [71]:
test_text2 = '''Here, where an island rises between two seas,
Amidst a chaos of rocks, like a rugged altar,
It is here, under the blackened skies,
That Zarathustra kindles his towering fires—
Fiery signals for shipwrecked sailors,
Question marks for those who know the answer…

This flame, with its ashen-gray womb,
Its tongues reach into the cold distance,
Its neck stretches ever higher—
A serpent, impatiently uncoiling skyward.
With these markers, I stake my place.

For my soul is this very flame,
Insatiably yearning outward and upward,
Upward and upward in its silent heat.

Why did Zarathustra flee from man and beast?
What did he escape, abandoning solid ground?
One by one, he has known six solitudes—
The sea alone was not lonely enough.
The island cast him ashore—he climbed its peak and became flame.
He has stopped at the seventh solitude—
Catching his own forehead on a fisherman’s hook.

Shipwrecked sailors!
Ruined constellations!
Seas of the future! Skies uninhabited by knowledge!
Now I shall cast my line for all that is lonely.

Answer me, I call upon you, impatient flame—
Answer me, catch me, the fisherman upon the mountain’s peak—
For my seventh solitude shall be my last!'''

test_text2 = preprocess_text(test_text2)
test_text_enc2 = caesar_cipher(test_text2, random.randint(1, 5))
print(test_text2)


here where an island rises between two seas amidst a chaos of rocks like a rugged altar it is here under the blackened skies that zarathustra kindles his towering fires fiery signals for shipwrecked sailors question marks for those who know the answer this flame with its ashen gray womb its tongues reach into the cold distance its neck stretches ever higher a serpent impatiently uncoiling skyward with these markers i stake my place for my soul is this very flame insatiably yearning outward and upward upward and upward in its silent heat why did zarathustra flee from man and beast what did he escape abandoning solid ground one by one he has known six solitudes the sea alone was not lonely enough the island cast him ashore he climbed its peak and became flame he has stopped at the seventh solitude catching his own forehead on a fisherman s hook shipwrecked sailors ruined constellations seas of the future skies uninhabited by knowledge now i shall cast my line for all that is lonely answe

In [73]:
test_tensor2 = text_to_tensor(test_text_enc2, char_to_idx).to(device)

decoded_tensor2 = decode_text(model, test_tensor2, device)
decoded_text2 = tensor_to_text(decoded_tensor2, idx_to_char)

print("Original text:", test_text2)
print("\n" + "-"*50 + "\n") 
print("Decoded text:", decoded_text2)


Original text: here where an island rises between two seas amidst a chaos of rocks like a rugged altar it is here under the blackened skies that zarathustra kindles his towering fires fiery signals for shipwrecked sailors question marks for those who know the answer this flame with its ashen gray womb its tongues reach into the cold distance its neck stretches ever higher a serpent impatiently uncoiling skyward with these markers i stake my place for my soul is this very flame insatiably yearning outward and upward upward and upward in its silent heat why did zarathustra flee from man and beast what did he escape abandoning solid ground one by one he has known six solitudes the sea alone was not lonely enough the island cast him ashore he climbed its peak and became flame he has stopped at the seventh solitude catching his own forehead on a fisherman s hook shipwrecked sailors ruined constellations seas of the future skies uninhabited by knowledge now i shall cast my line for all that 

Thus, the model performs well, and it makes sense to calculate the accuracy on a larger dataset

Let's evaluate on a large volume and calculate the accuracy

In [75]:
!wget https://www.gutenberg.org/files/84/84-0.txt


--2025-05-09 17:44:43--  https://www.gutenberg.org/files/84/84-0.txt
Resolving www.gutenberg.org (www.gutenberg.org)... 2610:28:3090:3000:0:bad:cafe:47, 152.19.134.47
Connecting to www.gutenberg.org (www.gutenberg.org)|2610:28:3090:3000:0:bad:cafe:47|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 428995 (419K) [text/plain]
Saving to: ‘84-0.txt.1’


2025-05-09 17:44:45 (345 KB/s) - ‘84-0.txt.1’ saved [428995/428995]



In [77]:
with open('84-0.txt', encoding='utf-8') as f:
    text = f.read()

print('Original length:', len(text))

text = preprocess_text(text)
print('Processed length:', len(text))

example_length = 60
text = text[:(len(text) // example_length) * example_length]
test_examples = [text[i:i + example_length] for i in range(0, len(text), example_length)]

test_examples = [example for example in test_examples if len(example) == example_length]

print(f"Processed examples: {len(test_examples)}")
print(f"Example length: {len(test_examples[0]) if test_examples else 'No examples'}")

encrypted_test_examples = [caesar_cipher(example, random.randint(1, 10)) for example in test_examples]
print(f"Original: {test_examples[0]}")
print(f"Encrypted: {encrypted_test_examples[0]}")

test_examples_tensor = [text_to_tensor(example, char_to_idx) for example in encrypted_test_examples]

test_dataset = CaesarDataset(test_examples_tensor)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=True)

print(f"Test examples: {len(test_examples_tensor)}")
print(f"Example length: {len(test_examples_tensor[0]) if test_examples_tensor else 'No examples'}")




Original length: 419434
Processed length: 407791
Processed examples: 6796
Example length: 60
Original: start of the project gutenberg ebook frankenstein or the mod
Encrypted: abizb wn bpm xzwrmkb ocbmvjmzo mjwws nzivsmvabmqv wz bpm uwl
Test examples: 6796
Example length: 60


Futher was an issue with shuffling – the indices were lost, and it became difficult to restore the correspondence. At the same time, the goal was to work with shuffling to ensure the examples were not related to each other. Therefore, it was decided to fix the correspondence of indices at the beginning

In [80]:
import random

# Create indices and shuffle them
indices = list(range(len(test_examples)))
random.shuffle(indices)

# Shuffle original and encrypted texts in the same way
shuffled_test_examples = [test_examples[i] for i in indices]
shuffled_encrypted_examples = [encrypted_test_examples[i] for i in indices]

# Convert encrypted examples to tensors
test_examples_tensor = [text_to_tensor(example, char_to_idx) for example in shuffled_encrypted_examples]

# Create dataset and DataLoader (shuffle=False to keep the order!)
test_dataset = CaesarDataset(test_examples_tensor)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

def calculate_accuracy(model, test_loader, device, idx_to_char, shuffled_originals, indices):
    model.eval()  # Set the model to evaluation mode

    correct = 0
    total = 0

    # Restore the original texts according to their initial order
    restored_originals = [None] * len(indices)
    for shuffled_idx, original_idx in enumerate(indices):
        restored_originals[shuffled_idx] = shuffled_originals[shuffled_idx]  # Use the already shuffled order

    with torch.no_grad():
        batch_size = test_loader.batch_size
        for batch_idx, data in enumerate(test_loader):
            inputs = data.to(device)

            outputs = model(inputs)
            _, predicted = torch.max(outputs, 2)

            for i in range(inputs.shape[0]):  # Loop through all examples in the batch
                index = batch_idx * batch_size + i
                if index >= len(restored_originals):  # To avoid out of bounds
                    continue
                
                original_text = restored_originals[index]  # Get the original text
                predicted_text = ''.join([idx_to_char[idx.item()] for idx in predicted[i]])

                print(f"Original:   {original_text}")
                print(f"Decoded:    {predicted_text}")
                print("-" * 50)

                # Count matching characters
                correct += sum(1 for orig, pred in zip(original_text, predicted_text) if orig == pred)
                total += len(original_text)

    accuracy = correct / total
    return accuracy

test_accuracy = calculate_accuracy(model, test_loader, device, idx_to_char, shuffled_test_examples, indices)


Original:   february we accordingly determined to commence our journey t
Decoded:    edartary we accordingly determined to commence our journey t
--------------------------------------------------
Original:   tenderly and sincerely i never saw any woman who excited as 
Decoded:    tenderly and sincerely i never saw any woman who excited as 
--------------------------------------------------
Original:   ed them felix soon learned that the treacherous turk for who
Decoded:    ee them felix soon learned that the treacherous turk for who
--------------------------------------------------
Original:   s the nearer i approached to your habitation the more deeply
Decoded:    t the nearer i approached to your habitation the more deeply
--------------------------------------------------
Original:   eel that burning hatred and ardent desire of revenge i once 
Decoded:    eul that burning hatred and ardent desire of revenge i once 
--------------------------------------------------
Original:    th

In [82]:
print(f"Test Accuracy: {test_accuracy * 100:.2f}%")

Test Accuracy: 95.88%


**Conclusion**

Model works good. Using such a model allows decryption without specifying the shift length, which is typically required in Python functions. This is a clear advantage of the model, despite its more complex implementation