In [None]:
#----------------------------------------------------------------------
# Imports
#----------------------------------------------------------------------

import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset, random_split

import string, random
import pandas as pd

from tqdm import tqdm
from typing import Union, List, Optional

In [None]:
# ---------------------------------------------------------------------
# GRU Sequence Model
# ---------------------------------------------------------------------
class CipherNoiseLSTM(nn.Module):
    """Maps a sequence of tokens to a single class prediction using LSTM"""
    def __init__(self,
                 embed_dim: int,
                 vocab_size: int,
                 hidden_size: int,
                 output_size: int,
                 pad_idx: int,
                 num_layers: int = 1,
                 dropout: float = 0.4):

        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim, padding_idx=pad_idx)
        self.lstm = nn.LSTM(embed_dim,
                           hidden_size,
                           num_layers,
                           batch_first=True,
                           dropout=dropout if num_layers > 1 else 0,  # dropout only between layers
                           bidirectional=True)
        self.fc = nn.Linear(hidden_size * 2, output_size)  # Output size = num_classes

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """Returns logits shaped [batch_size, output_size]"""
        embedded = self.embedding(x)  # [B, L, D]
        lstm_out, _ = self.lstm(embedded)  # [B, L, H*2]
        # Take only the last output for many-to-one classification
        last_output = lstm_out[:, -1, :]  # [B, H*2]
        logits = self.fc(last_output)  # [B, output_size]
        return logits

In [None]:
# ---------------------------------------------------------------------
# GRU Sequence Model
# ---------------------------------------------------------------------
class CipherNoiseGRU(nn.Module):
    """Maps a sequence of tokens to a single class prediction"""
    def __init__(self,
                 embed_dim: int,
                 vocab_size: int,
                 hidden_size: int,
                 output_size: int,
                 pad_idx: int,
                 num_layers: int = 1,
                 dropout = 0.4):

        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim, padding_idx=pad_idx)
        self.gru = nn.GRU(embed_dim,
                         hidden_size,
                         num_layers,
                         batch_first=True,
                         dropout=dropout,
                         bidirectional=True)
        self.fc = nn.Linear(hidden_size  * 2, output_size)  # Output size = num_classes

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """Returns logits shaped [batch_size, vocab_size]"""
        embedded = self.embedding(x)  # [B, L, D]
        gru_out, _ = self.gru(embedded)  # [B, L, H]
        # Take only the last output for many-to-one classification
        last_output = gru_out[:, -1, :]  # [B, H]
        logits = self.fc(last_output)  # [B, vocab_size]
        return logits

In [None]:
def flexible_tokenizer(
    word: Optional[str] = None,
    tokens: Optional[List[int]] = None,
    vocab: List[str] = None,
    pad_len: Optional[int] = None,
    unk_token: str = 'X'
) -> Union[int, List[int], str]:
    """
    Flexible version of your original tokenizer that:
    - Takes vocabulary as input (not hardcoded)
    - Handles both tokenization and detokenization
    - Maintains all original padding behavior
    """
    # Default to your original alphabet if no vocab provided
    if vocab is None:
        raise ValueError("No vocab was provided")

    # Detokenization mode (tokens → word)
    if tokens is not None:
        return ''.join([vocab[i] if i < len(vocab) else unk_token
                       for i in tokens]).strip()

    # Tokenization mode (word → tokens)
    if word is not None and pad_len is not None:
        if len(word) == 1:
            try:
                return vocab.index(word)
            except ValueError:
                return vocab.index(unk_token)

        indices = [
            vocab.index(c) if c in vocab else vocab.index(unk_token)
            for c in word.lower()
        ]
        return indices[:pad_len] + [vocab.index(unk_token)] * max(0, pad_len - len(indices))

    raise ValueError("Must provide either (word + pad_len) or tokens")

In [None]:
from google.colab import drive
### import drive here

In [None]:

import os

# Define paths
cipher_dataset_path = 'enter path here'
post_train_dataset_path = 'enter path here'

print(os.path.exists(cipher_dataset_path))
print(os.path.exists(post_train_dataset_path))

In [None]:
#----------------------------------------------------------------------
# load data-set
#----------------------------------------------------------------------
dataset_df  = pd.read_csv(cipher_dataset_path)

In [None]:
#----------------------------------------------------------------------
# calculate input and output pad
#----------------------------------------------------------------------
in_pad = in_pad = max(len(s) for s in dataset_df["input"])

In [None]:
#----------------------------------------------------------------------
# convert data-set to tokens
#----------------------------------------------------------------------
target_vocab = sorted(list(set(dataset_df["target"])))
input_vocab = list(string.ascii_lowercase) + ['[', ']', 'X', ' ']

input_tokens = [flexible_tokenizer(word=w, pad_len=in_pad, vocab=input_vocab) for w in dataset_df["input"]]
target_tokens = [flexible_tokenizer(word=w, pad_len=in_pad, vocab=target_vocab) for w in dataset_df["target"]]

input_tensor =  torch.tensor(input_tokens, dtype=torch.long)
target_tensor =  torch.tensor(target_tokens, dtype=torch.long)



In [None]:
# ---------------------------------------------------------------------
# Hyper‑parameters
# ---------------------------------------------------------------------
VOCAB_SIZE   = len(input_vocab)
PAD_IDX      = input_vocab.index('X')
HIDDEN_SIZE  = 48
EMBED_DIM    = 16
NUM_LAYERS   = 3
BATCH_SIZE   = 128
LR           = 1e-3
EPOCHS       = 50
TRAIN_SPLIT  = 0.70
SEED         = 42

In [None]:
# ---------------------------------------------------------------------
# Helper Functions (Adjusted for Many-to-1)
# ---------------------------------------------------------------------
def df_to_loader(df, batch_size=BATCH_SIZE, i_v = input_vocab, t_v = target_vocab, i_p= in_pad ):
    # Convert DataFrame columns to tensors
    x = [flexible_tokenizer(word=w, pad_len=i_p, vocab=input_vocab) for w in df["input"]]
    y = [flexible_tokenizer(word=w, pad_len=i_p, vocab=target_vocab) for w in df["target"]]

    inputs =  torch.tensor(x, dtype=torch.long)
    targets =  torch.tensor(y, dtype=torch.long)


    # Create TensorDataset and DataLoader
    dataset = TensorDataset(inputs, targets)
    return DataLoader(dataset, batch_size=batch_size, shuffle=True)

def get_device() -> torch.device:
    return torch.device("cuda" if torch.cuda.is_available() else "cpu")

def split_loaders(input_tensor: torch.Tensor,
                 target_tensor: torch.Tensor,
                 batch_size: int = BATCH_SIZE,
                 train_split: float = TRAIN_SPLIT,
                 seed: int = SEED):
    """Returns train_loader, val_loader after split"""
    dataset = TensorDataset(input_tensor, target_tensor)
    train_size = int(train_split * len(dataset))
    val_size = len(dataset) - train_size
    train_ds, val_ds = random_split(dataset,
                                  [train_size, val_size],
                                  generator=torch.Generator().manual_seed(seed))
    train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_ds, batch_size=batch_size, shuffle=False)
    return train_loader, val_loader

def train(model: nn.Module,
          train_loader: DataLoader,
          val_loader: DataLoader,
          num_epochs: int = EPOCHS,
          lr: float = LR,
          pad_idx: int = PAD_IDX):
    device = get_device()
    print(f"Using device: {device}\n")
    model.to(device)

    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    criterion = nn.CrossEntropyLoss(ignore_index=pad_idx)
    scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
        optimizer,
        mode='min',     # Monitor val loss
        factor=0.1,     # LR *= 0.1
        patience=5,     # Wait 5 epochs w/o improvement
        min_lr=1e-5,    # Lower bound
    )


    for epoch in range(1, num_epochs + 1):
        # Training
        model.train()
        train_loss, train_correct = 0.0, 0
        pbar = tqdm(train_loader, desc=f"Epoch {epoch}/{num_epochs}")

        for inputs, targets in pbar:
            inputs, targets = inputs.to(device), targets.to(device)

            optimizer.zero_grad()
            logits = model(inputs)
            loss = criterion(logits, targets)
            loss.backward()
            optimizer.step()

            train_loss += loss.item()
            train_correct += (logits.argmax(1) == targets).sum().item()
            pbar.set_postfix(train_loss=train_loss/(pbar.n + 1))

        # Validation
        model.eval()
        val_loss, val_correct = 0.0, 0
        with torch.no_grad():
            for inputs, targets in val_loader:
                inputs, targets = inputs.to(device), targets.to(device)
                logits = model(inputs)
                val_loss += criterion(logits, targets).item()
                val_correct += (logits.argmax(1) == targets).sum().item()

        # Calculate metrics
        train_loss /= len(train_loader)
        val_loss /= len(val_loader)
        train_acc = train_correct / len(train_loader.dataset)
        val_acc = val_correct / len(val_loader.dataset)

        # Update learning rate at the end of each epoch
        scheduler.step(val_loss)


        print(f"Epoch {epoch:02d} | "
              f"Train Loss: {train_loss:.4f} | Val Loss: {val_loss:.4f} | "
              f"Train Acc: {train_acc:.2%} | Val Acc: {val_acc:.2%}")



In [None]:
# ---------------------------------------------------------------------
# Example Usage
# ---------------------------------------------------------------------

train_loader, val_loader = split_loaders(input_tensor, target_tensor)

gru_model = CipherNoiseGRU(embed_dim=EMBED_DIM,
                      vocab_size=VOCAB_SIZE,
                      hidden_size=HIDDEN_SIZE,
                      pad_idx=PAD_IDX,
                      num_layers=NUM_LAYERS,
                      output_size=len(target_vocab))

train(gru_model, train_loader, val_loader, lr=1e-3)


In [None]:
post_train_df  = pd.read_csv(post_train_dataset_path)

In [None]:
in_pad = 230

In [None]:

input_tokens = [flexible_tokenizer(word=w, pad_len=in_pad, vocab=input_vocab) for w in post_train_df["input"]]
target_tokens = [flexible_tokenizer(word=w, pad_len=in_pad, vocab=target_vocab) for w in post_train_df["target"]]


input_tensor =  torch.tensor(input_tokens, dtype=torch.long)
target_tensor =  torch.tensor(target_tokens, dtype=torch.long)

In [None]:
train_loader, val_loader = split_loaders(input_tensor, target_tensor)
train(gru_model, train_loader, val_loader, lr=1e-3)

In [None]:
torch.save(gru_model.state_dict(), 'enter path here')