# Exercise 1
## Character-level recurrent sequence-to-sequence model
### By: Daniel Mehta

---

## Inports

In [37]:
import numpy as np
import os
from pathlib import Path
import random
import torch
import torch.nn as nn
from tqdm import tqdm

---

## Dataset Path Setup

In [6]:
# setting path to dataset
data_dir = Path("fra-eng")
data_path = data_dir/"fra.txt"

In [7]:
if not data_path.exists():
    raise FileNotFoundError(f"Dataset not found at {data_path}")

print(f"Dataset located at: {data_path}")

Dataset located at: fra-eng\fra.txt


---

## Data Exploration and Cleaning

In [28]:
# Reading the file and split into lines
with open(data_path,"r",encoding="utf-8") as f:
    lines = f.read().strip().split("\n")

In [9]:
print(f"Total sentence pairs in file: {len(lines)}")
print("Sample lines:")
for i in range(5):
    print(lines[i])

Total sentence pairs in file: 237838
Sample lines:
Go.	Va !	CC-BY 2.0 (France) Attribution: tatoeba.org #2877272 (CM) & #1158250 (Wittydev)
Go.	Marche.	CC-BY 2.0 (France) Attribution: tatoeba.org #2877272 (CM) & #8090732 (Micsmithel)
Go.	En route !	CC-BY 2.0 (France) Attribution: tatoeba.org #2877272 (CM) & #8267435 (felix63)
Go.	Bouge !	CC-BY 2.0 (France) Attribution: tatoeba.org #2877272 (CM) & #9022935 (Micsmithel)
Hi.	Salut !	CC-BY 2.0 (France) Attribution: tatoeba.org #538123 (CM) & #509819 (Aiji)


In [10]:
# Separating into English and French
pairs =[line.split("\t") for line in lines]
english_sentences =[pair[0] for pair in pairs]
french_sentences =[pair[1] for pair in pairs]

In [11]:
print("\nExample pair:")
print("EN:",english_sentences[0])
print("FR:",french_sentences[0])


Example pair:
EN: Go.
FR: Va !


---

## Configuration

In [23]:
# setting up seed
SEED = 5501
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)

if torch.cuda.is_available():
    torch.cuda.manual_seed_all(SEED)

In [25]:
# Settubg yo start and end tokens
START_TOKEN="\t"
END_TOKEN="\n"

In [26]:
#  hyperparameters
batch_size = 64 # Batch size
epochs =100 # epochs of training
latent_dim = 256 #Latent dimensionality of the encoding space
num_samples = 10000  # Num of samples

print(f"batch_size={batch_size}, epochs={epochs}, latent_dim={latent_dim}, num_samples={num_samples}")
print(f"Decoder tokens -> start: {repr(START_TOKEN)}, end: {repr(END_TOKEN)}")

batch_size=64, epochs=100, latent_dim=256, num_samples=10000
Decoder tokens -> start: '\t', end: '\n'


In [27]:
# Device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("PyTorch version:",torch.__version__)
print("CUDA available:",torch.cuda.is_available())
if torch.cuda.is_available():
    print("GPU:", torch.cuda.get_device_name(0))

PyTorch version: 2.7.1+cu118
CUDA available: True
GPU: NVIDIA GeForce RTX 4060


In [32]:
# Building vocabularies

#  sorted unique characters for each language
input_characters = sorted(list(set("".join(english_sentences))))
target_characters = sorted(list(set("".join(french_sentences))))

#mapping dicts
input_char_to_idx ={char: idx for idx, char in enumerate(input_characters)}
input_idx_to_char ={idx: char for char, idx in input_char_to_idx.items()}

target_char_to_idx ={char: idx for idx, char in enumerate(target_characters)}
target_idx_to_char ={idx: char for char, idx in target_char_to_idx.items()}

# vocabulary sizes
input_vocab_size = len(input_characters)
target_vocab_size = len(target_characters)

print(f"Input vocab size: {input_vocab_size}")
print(f"Target vocab size: {target_vocab_size}")

Input vocab size: 90
Target vocab size: 113


---

## Building the model

In [35]:
embed_dim = 128 # it must be smaller or equal to the latent dim

class Encoder(nn.Module):
    def __init__(self, input_vocab_size, embed_dim,latent_dim):
        super().__init__()
        self.embedding = nn.Embedding(input_vocab_size, embed_dim)
        self.lstm = nn.LSTM(
            input_size=embed_dim,
            hidden_size=latent_dim,
            num_layers=1,
            batch_first=True
        )


    def forward(self, src_idxs):
        # src_idxs:(batch, src_len)
        embedded =self.embedding(src_idxs) # (batch, src_len, embed_dim)
        outputs,(h,c) =self.lstm(embedded) #outputs not used,keep states
        return h,c

class Decoder(nn.Module):
    def __init__(self, target_vocab_size, embed_dim,latent_dim):
        super().__init__()
        self.embedding =nn.Embedding(target_vocab_size,embed_dim)
        self.lstm = nn.LSTM(
            input_size=embed_dim,
            hidden_size=latent_dim,
            num_layers=1,
            batch_first=True
        )
        self.fc_out =nn.Linear(latent_dim,target_vocab_size)
        
    def forward(self, tgt_idxs, hidden, cell):
        # tgt_idxs:(batch, tgt_len) with teacher forcing
        embedded = self.embedding(tgt_idxs)# (batch, tgt_len,embed_dim)
        outputs, (h,c) = self.lstm(embedded, (hidden,cell))
        logits = self.fc_out(outputs)#(batch, tgt_len, target_vocab_size)
        return logits, h,c


class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder):
        super().__init__()
        self.encoder =encoder
        self.decoder =decoder

    def forward(self, src_idxs, tgt_input_idxs):
        # training forward pass with teacher forcing
        h,c =self.encoder(src_idxs)
        logits,_,_=self.decoder(tgt_input_idxs, h,c)
        return logits


#Instantiate and move to device
encoder = Encoder(input_vocab_size, embed_dim, latent_dim)
decoder = Decoder(target_vocab_size, embed_dim, latent_dim)
model =Seq2Seq(encoder, decoder).to(device)

# Loss and optimizer
PAD_IDX =None
criterion = nn.CrossEntropyLoss(ignore_index=PAD_IDX) if PAD_IDX is not None else nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

print(model)

Seq2Seq(
  (encoder): Encoder(
    (embedding): Embedding(90, 128)
    (lstm): LSTM(128, 256, batch_first=True)
  )
  (decoder): Decoder(
    (embedding): Embedding(113, 128)
    (lstm): LSTM(128, 256, batch_first=True)
    (fc_out): Linear(in_features=256, out_features=113, bias=True)
  )
)


---

## Train model

In [None]:
for epoch in range(1, epochs+ 1):
    model.train()
    total_loss =0

    for src, tgt_input, tgt_target in tqdm(train_dataloader,desc=f"Epoch {epoch}/{epochs}"):
        src = src.to(device)
        tgt_input =tgt_input.to(device)
        tgt_target =tgt_target.to(device)

        optimizer.zero_grad()

        # Forward pass
        output_logits = model(src, tgt_input) #(batch, tgt_len, vocab_size)

        # Reshape for loss, mergeing batch & time dims
        output_logits = output_logits.reshape(-1,target_vocab_size)
        tgt_target = tgt_target.reshape(-1)

        # Compute loss
        loss = criterion(output_logits,tgt_target)
        loss.backward()

        optimizer.step()

        total_loss+=loss.item()

    avg_loss = total_loss/len(train_dataloader)
    print(f"Epoch {epoch} | Loss: {avg_loss:.4f}")

# Save model
torch.save(model.state_dict(),"seq2seq_model.pth")
print("Model saved to seq2seq_model.pth")