In [2]:
import pandas as pd
import re
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence

# 1. Caesar cipher function (shift 3 by default)
def caesar_encrypt(text, shift=3):
    result = ""
    for char in text:
        if char.isalpha():
            base = ord('a') if char.islower() else ord('A')
            result += chr((ord(char) - base + shift) % 26 + base)
        else:
            result += char
    return result

# 2. Load dataset
import csv

# df = pd.read_csv("/content/IMDB Dataset.csv", engine='python', quoting=csv.QUOTE_ALL)
# df = pd.read_csv('/content/IMDB Dataset.csv', quoting=csv.QUOTE_ALL, on_bad_lines='skip')
df = pd.read_csv(
    '/content/IMDB Dataset.csv',
    engine='python',
    quoting=csv.QUOTE_MINIMAL,
    on_bad_lines='skip'
)

df = df.drop(columns=['sentiment'])


# 3. Remove HTML tags & preprocess text
def clean_text(text):
    text = str(text).lower()
    text = re.sub(r'<.*?>', '', text)           # Remove HTML tags
    text = re.sub(r'[^a-z\s]', '', text)        # Keep only letters and spaces
    text = re.sub(r'\s+', ' ', text).strip()    # Replace multiple spaces with one
    return text

df['clean_review'] = df['review'].apply(clean_text)

# 4. Create ciphered version of the cleaned text
df['cipher_review'] = df['clean_review'].apply(lambda x: caesar_encrypt(x, shift=3))

# 5. Build character vocabulary
all_text = ' '.join(df['clean_review'].tolist() + df['cipher_review'].tolist())
chars = sorted(list(set(all_text)))
char2idx = {c: i for i, c in enumerate(chars)}
idx2char = {i: c for i, c in enumerate(chars)}
vocab_size = len(chars)
print(f'Vocabulary size: {vocab_size}')

# 6. Dataset class for cipher/plain pairs
class CipherDataset(Dataset):
    def __init__(self, cipher_texts, plain_texts, char2idx):
        self.cipher_texts = cipher_texts
        self.plain_texts = plain_texts
        self.char2idx = char2idx

    def __len__(self):
        return len(self.cipher_texts)

    def __getitem__(self, idx):
        cipher_seq = torch.tensor([self.char2idx[c] for c in self.cipher_texts[idx]], dtype=torch.long)
        plain_seq = torch.tensor([self.char2idx[c] for c in self.plain_texts[idx]], dtype=torch.long)
        return cipher_seq, plain_seq

# 7. Collate function for padding variable-length sequences
def collate_fn(batch):
    cipher_seqs, plain_seqs = zip(*batch)
    cipher_seqs_padded = pad_sequence(cipher_seqs, batch_first=True, padding_value=char2idx[' '])
    plain_seqs_padded = pad_sequence(plain_seqs, batch_first=True, padding_value=char2idx[' '])
    return cipher_seqs_padded, plain_seqs_padded

# 8. DataLoader setup
dataset = CipherDataset(df['cipher_review'].tolist(), df['clean_review'].tolist(), char2idx)
dataloader = DataLoader(dataset, batch_size=32, shuffle=True, collate_fn=collate_fn)

# 9. Define character-level LSTM model
class CharDecoderLSTM(nn.Module):
    def __init__(self, vocab_size, hidden_size=128):
        super(CharDecoderLSTM, self).__init__()
        self.embedding = nn.Embedding(vocab_size, hidden_size)
        self.lstm = nn.LSTM(hidden_size, hidden_size, batch_first=True)
        self.fc = nn.Linear(hidden_size, vocab_size)

    def forward(self, x, hidden=None):
        x = self.embedding(x)
        out, hidden = self.lstm(x, hidden)
        out = self.fc(out)
        return out, hidden

# 10. Training setup
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = CharDecoderLSTM(vocab_size).to(device)
criterion = nn.CrossEntropyLoss(ignore_index=char2idx[' '])
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

# 11. Training loop
num_epochs = 10
for epoch in range(num_epochs):
    model.train()
    total_loss = 0
    for cipher_batch, plain_batch in dataloader:
        cipher_batch = cipher_batch.to(device)
        plain_batch = plain_batch.to(device)

        optimizer.zero_grad()
        outputs, _ = model(cipher_batch)

        outputs = outputs.view(-1, vocab_size)
        targets = plain_batch.view(-1)

        loss = criterion(outputs, targets)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
    print(f"Epoch {epoch+1}/{num_epochs} - Loss: {total_loss/len(dataloader):.4f}")

# 12. Save the trained model
torch.save(model.state_dict(), "char_decoder_lstm.pth")

def decode_output(output_tensor):
    pred_idxs = output_tensor.argmax(dim=2)
    texts = []
    for seq in pred_idxs:
        chars = [idx2char[idx.item()] for idx in seq]
        text = ''.join(chars)
        text = re.sub(r'\s+', ' ', text).strip()
        texts.append(text)
    return texts


# 14. Function to preprocess new cipher text for inference
def preprocess_text(text, char2idx):
    text = text.lower()
    text = re.sub(r'[^a-z\s]', '', text)
    return torch.tensor([char2idx.get(c, char2idx[' ']) for c in text], dtype=torch.long).unsqueeze(0)

# 15. Function to decipher a new cipher text string
def decipher_text(model, cipher_text, char2idx, idx2char, device):
    model.eval()
    with torch.no_grad():
        input_seq = preprocess_text(cipher_text, char2idx).to(device)
        output, _ = model(input_seq)
        decoded = decode_output(output)
    return decoded[0]

# Example test after training
example_plain = "this movie was great"
example_cipher = caesar_encrypt(example_plain, shift=3)
print("Cipher input:", example_cipher)
print("Deciphered output:", decipher_text(model, example_cipher, char2idx, idx2char, device))


Vocabulary size: 27
Epoch 1/10 - Loss: 0.5431
Epoch 2/10 - Loss: 0.0092
Epoch 3/10 - Loss: 0.0034
Epoch 4/10 - Loss: 0.0019
Epoch 5/10 - Loss: 0.0012
Epoch 6/10 - Loss: 0.0009
Epoch 7/10 - Loss: 0.0007
Epoch 8/10 - Loss: 0.0005
Epoch 9/10 - Loss: 0.0004
Epoch 10/10 - Loss: 0.0003
Cipher input: wklv prylh zdv juhdw
Deciphered output: thistmovietwastgreat


In [5]:
def evaluate_accuracy(model, dataloader, char2idx, idx2char, device):
    model.eval()
    correct = 0
    total = 0

    with torch.no_grad():
        for cipher_batch, plain_batch in dataloader:
            cipher_batch = cipher_batch.to(device)
            plain_batch = plain_batch.to(device)

            outputs, _ = model(cipher_batch)
            pred_indices = outputs.argmax(dim=2)

            mask = (plain_batch != char2idx[' '])  # Ignore padding
            correct += (pred_indices[mask] == plain_batch[mask]).sum().item()
            total += mask.sum().item()

    accuracy = correct / total
    print(f"Character-level Accuracy: {accuracy:.4f}")
    return accuracy
