In [1]:
import os
import re
import random
import math
import torch 
from torch import nn
from torch.utils.data import Dataset, DataLoader

## configurations

In [2]:
DATA_PATH = "../data/eng-fra.txt"
BATCH_SIZE = 32
EPOCHS = 5
MAX_SENTENCE_LENGTH = 15    # discard pairs longer than this, for simplicity
MIN_FREQ = 2                # minimum word frequency to keep in vocab 

In [3]:
PAD_TOKEN = "<pad>"
SOS_TOKEN = "<sos>"
EOS_TOKEN = "<eos>"
UNK_TOKEN = "<unk>"

## 1 Get Data

### Basic preprocessing

In [5]:
def basic_tokenize(s: str):
    s = s.lower().strip()

    s = re.sub(r"[.!?]", r" ", s)

    return s.split()

In [32]:
class EngFraRawDataset:
    """
    Load data from the file and store the pairs as a tuple
    """

    def __init__(self, file_path, max_length=MAX_SENTENCE_LENGTH):
        self.pair = []
        with open(file_path, encoding="utf-8") as f:
            for line in f:
                line = line.strip()
                
                if not line:
                    continue
                parts = line.split("\t")
                
                if len(parts) < 2:
                    continue
                
                eng, fra = parts[0], parts[1]
                eng_tokens = basic_tokenize(eng)
                fra_tokens = basic_tokenize(fra)

                if len(eng_tokens) > max_length or len(fra_tokens) > max_length:
                    continue

                self.pair.append((eng_tokens, fra_tokens))
    

    def __len__(self):
        return len(self.pair)
    
    def __getitem__(self, index):
        return self.pair[index]

### Build Vocabulary

In [7]:
def build_vocab(pairs, min_freq = MIN_FREQ):

    eng_freq = {}
    fra_freq = {}

    for eng, fra in pairs:
        for word in eng:
            eng_freq[word] = eng_freq.get(word, 0) + 1
        for word in fra:
            fra_freq[word] = fra_freq.get(word, 0) + 1
    
    def make_vocab(freq_dict):

        idx2word = [PAD_TOKEN, SOS_TOKEN, EOS_TOKEN, UNK_TOKEN]
        for w, c in freq_dict.items():
            if c >= min_freq:
                idx2word.append(w)
        word2idx = {word: idx for idx, word in enumerate(idx2word)}
        return word2idx, idx2word
    
    eng_word2idx, eng_idx2word = make_vocab(eng_freq)
    fra_word2idx, fra_idx2word = make_vocab(fra_freq)

    return eng_word2idx, eng_idx2word, fra_word2idx, fra_idx2word

### Dataset and Dataloader

In [9]:
class EngFraDataset(Dataset):

    def __init__(self, pairs, eng_word2idx, fra_word2idx):
        self.pairs = pairs
        self.eng_word2idx = eng_word2idx
        self.fra_word2idx = fra_word2idx
    
    def __len__(self):
        return len(self.pairs)
    
    def __getitem__(self, index):
        eng_tokens, fra_tokens = self.pairs[index]

        eng_indices = [self.eng_word2idx.get(w, self.eng_word2idx[UNK_TOKEN]) for w in eng_tokens]
        fra_indices = [self.fra_word2idx[SOS_TOKEN]] + \
            [self.fra_word2idx.get(w, self.fra_word2idx[UNK_TOKEN]) for w in fra_tokens] + \
            [self.fra_word2idx[EOS_TOKEN]]

        return torch.LongTensor(eng_indices), torch.LongTensor(fra_indices)

In [12]:
def collate_fn(batch):
    eng_max_len = max(x[0].size(0) for x in batch)
    fra_max_len = max(x[1].size(0) for x in batch)

    eng_batch = []
    fra_batch = []

    for eng_index, fra_index in batch:
        eng_pad = nn.functional.pad(eng_index, (0, eng_max_len - eng_index.size(0)), value=0)
        fra_pad = nn.functional.pad(fra_index, (0, fra_max_len - fra_index.size(0)), value=0)
        
        eng_batch.append(eng_pad.unsqueeze(0))
        fra_batch.append(fra_pad.unsqueeze(0))
    
    eng_batch = torch.cat(eng_batch, dim=0)
    fra_batch = torch.cat(fra_batch, dim=0)
    return eng_batch, fra_batch

### Script

In [None]:

DATA_PATH = "../data/eng-fra.txt"
BATCH_SIZE = 32
EPOCHS = 5
MAX_SENTENCE_LENGTH = 15    # discard pairs longer than this, for simplicity
MIN_FREQ = 2                # minimum word frequency to keep in vocab 

PAD_TOKEN = "<pad>"
SOS_TOKEN = "<sos>"
EOS_TOKEN = "<eos>"
UNK_TOKEN = "<unk>"

### Basic preprocessing

def basic_tokenize(s: str):
    s = s.lower().strip()

    s = re.sub(r"[.!?]", r" ", s)

    return s.split()

class EngFraRawDataset:
    """
    Load data from the file and store the pairs as a tuple
    """

    def __init__(self, file_path, max_length=MAX_SENTENCE_LENGTH):
        self.pair = []
        with open(file_path, encoding="utf-8") as f:
            for line in f:
                line = line.strip()
                
                if not line:
                    continue
                parts = line.split("\t")
                
                if len(parts) < 2:
                    continue
                
                eng, fra = parts[0], parts[1]
                eng_tokens = basic_tokenize(eng)
                fra_tokens = basic_tokenize(fra)

                if len(eng_tokens) > max_length or len(fra_tokens) > max_length:
                    continue

                self.pair.append((eng_tokens, fra_tokens))
    

    def __len__(self):
        return len(self.pair)
    
    def __getitem__(self, index):
        return self.pair[index]
    

### Build Vocabulary

def build_vocab(pairs, min_freq = MIN_FREQ):

    eng_freq = {}
    fra_freq = {}

    for eng, fra in pairs:
        for word in eng:
            eng_freq[word] = eng_freq.get(word, 0) + 1
        for word in fra:
            fra_freq[word] = fra_freq.get(word, 0) + 1
    
    def make_vocab(freq_dict):

        idx2word = [PAD_TOKEN, SOS_TOKEN, EOS_TOKEN, UNK_TOKEN]
        for w, c in freq_dict.items():
            if c >= min_freq:
                idx2word.append(w)
        word2idx = {word: idx for idx, word in enumerate(idx2word)}
        return word2idx, idx2word
    
    eng_word2idx, eng_idx2word = make_vocab(eng_freq)
    fra_word2idx, fra_idx2word = make_vocab(fra_freq)

    return eng_word2idx, eng_idx2word, fra_word2idx, fra_idx2word


### Dataset and Dataloader

class EngFraDataset(Dataset):

    def __init__(self, pairs, eng_word2idx, fra_word2idx):
        self.pairs = pairs
        self.eng_word2idx = eng_word2idx
        self.fra_word2idx = fra_word2idx
    
    def __len__(self):
        return len(self.pairs)
    
    def __getitem__(self, index):
        eng_tokens, fra_tokens = self.pairs[index]

        eng_indices = [self.eng_word2idx.get(w, self.eng_word2idx[UNK_TOKEN]) for w in eng_tokens]
        fra_indices = [self.fra_word2idx[SOS_TOKEN]] + \
            [self.fra_word2idx.get(w, self.fra_word2idx[UNK_TOKEN]) for w in fra_tokens] + \
            [self.fra_word2idx[EOS_TOKEN]]

        return torch.LongTensor(eng_indices), torch.LongTensor(fra_indices)
    

def collate_fn(batch):
    eng_max_len = max(x[0].size(0) for x in batch)
    fra_max_len = max(x[1].size(0) for x in batch)

    eng_batch = []
    fra_batch = []

    for eng_index, fra_index in batch:
        eng_pad = nn.functional.pad(eng_index, (0, eng_max_len - eng_index.size(0)), value=0)
        fra_pad = nn.functional.pad(fra_index, (0, fra_max_len - fra_index.size(0)), value=0)
        
        eng_batch.append(eng_pad.unsqueeze(0))
        fra_batch.append(fra_pad.unsqueeze(0))
    
    eng_batch = torch.cat(eng_batch, dim=0)
    fra_batch = torch.cat(fra_batch, dim=0)
    return eng_batch, fra_batch

def create_dataloader(file_path: str = DATA_PATH, 
                      batch_size: int = BATCH_SIZE) -> DataLoader:
    
    raw_dataset = EngFraRawDataset(file_path=DATA_PATH, 
                               max_length=MAX_SENTENCE_LENGTH)
    print(f"Dataset size: {len(raw_dataset)}")

    eng_word2idx, eng_idx2word, fra_word2idx, fra_idx2word = build_vocab(pairs=raw_dataset.pair, 
                                                                        min_freq=MIN_FREQ)
    print(f"English vocab size: {len(eng_word2idx)}")
    print(f"French vocab size: {len(fra_word2idx)}")

    random.shuffle(raw_dataset.pair)
    train_size = int(0.8 * len(raw_dataset))
    test_size = len(raw_dataset) - train_size
    train_pairs = raw_dataset.pair[:train_size]
    test_pairs = raw_dataset.pair[train_size:]
    print(f"Train size: {len(train_pairs)}")
    print(f"Test size: {len(test_pairs)}")

    train_dataset = EngFraDataset(pairs=train_pairs, 
                                eng_word2idx=eng_word2idx, 
                                fra_word2idx=fra_word2idx)
    test_dataset = EngFraDataset(pairs=test_pairs, 
                                eng_word2idx=eng_word2idx, 
                                fra_word2idx=fra_word2idx)

    train_dataloader = DataLoader(train_dataset, 
                                batch_size=BATCH_SIZE, 
                                shuffle=True, 
                                collate_fn=collate_fn)

    test_dataloader = DataLoader(test_dataset,
                                batch_size=BATCH_SIZE, 
                                shuffle=False, 
                                collate_fn=collate_fn)

    return train_dataloader, test_dataloader, eng_word2idx, eng_idx2word, fra_word2idx, fra_idx2word

## 2 Making a model

In [24]:
import os
import sys

# Get the absolute path of the parent directory (containing both folders)
parent_dir = os.path.abspath(os.path.join(os.getcwd(), ".."))
if parent_dir not in sys.path:
    sys.path.append(parent_dir)

# Now you can import from RNN_self
from RNN_self import model_builder

In [43]:
class RNNEncoder(nn.Module):

    def __init__(self, vocab_size, embed_size, hidden_size):
        super().__init__()

        self.hidden_size = hidden_size
        self.embedding = nn.Embedding(vocab_size, embed_size, padding_idx=0)
        self.rnncell = model_builder.SimpleRNN(input_size=embed_size,
                                               hidden_units=hidden_size,
                                               output_size=0)
    
    def forward(self, src):
        batch_size, scr_len = src.shape
        hidden_state = self.rnncell.init_zero_hidden(batch_size).to(src.device)

        for i in range(scr_len):
            x = self.embedding(src[:, i])
            _, hidden_state = self.rnncell(x, hidden_state)
        
        return hidden_state

In [60]:
class RNNDecoder(nn.Module):

    def __init__(self, vocab_size, embed_size, hidden_size):
        super().__init__()

        self.hidden_size = hidden_size
        self.embedding = nn.Embedding(vocab_size, embed_size, padding_idx=0)
        self.rnncell = model_builder.SimpleRNN(input_size=embed_size,
                                               hidden_units=hidden_size,
                                               output_size=vocab_size)
        self.softmax = nn.Softmax(dim=-1)
    
    def forward(self, target, hidden_state):

        batch_size, target_len = target.shape

        output = []

        for i in range(target_len):
            token_i = target[:, i]
            x = self.embedding(token_i)
            logits, hidden_state = self.rnncell(x, hidden_state)
            logits = self.softmax(logits)
            output.append(logits.unsqueeze(1))

        return torch.cat(output, dim=1)


In [45]:
class Seq2Seq(nn.Module):

    def __init__(self, src_vocab_size, tgt_vocab_size, embed_size, hidden_size):
        super().__init__()

        self.encoder = RNNEncoder(src_vocab_size, embed_size, hidden_size)
        self.decoder = RNNDecoder(tgt_vocab_size, embed_size, hidden_size)

    def forward(self, src, tgt):
        hidden_state = self.encoder(src)
        output = self.decoder(tgt, hidden_state)

        return output

## 3. Creating `train_step()` and `test_step()` functions and `train()` to combine them

In [None]:
def train_step(model: torch.nn.Module, 
               dataloader: torch.utils.data.DataLoader,
               optimizer: torch.optim.Optimizer,
               loss_fn: torch.nn.Module,
               device: torch.device) -> tuple[float, float]:
    
    model.train()

    total_loss = 0.0
    total_bleu = 0.0
    
    for batch in dataloader:
        src, tgt = batch
        src, tgt = src.to(device), tgt.to(device)

        optimizer.zero_grad()

        output = model(src, tgt[:, :-1])
        
        loss = loss_fn(output.reshape(-1, output.size(-1)), tgt[:, 1:].reshape(-1))
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
        # print(f"loss: {loss.item()}")
    
    return total_loss / len(dataloader)

In [53]:
def test_step(model: torch.nn.Module,
              dataloader: torch.utils.data.DataLoader,
              loss_fn: torch.nn.Module,
              device: torch.device) -> float:
    
    model.eval()
    total_loss = 0.0

    with torch.inference_mode():
        for batch in dataloader:
            src, tgt = batch
            src, tgt = src.to(device), tgt.to(device)

            output = model(src, tgt[:, :-1])
            
            loss = loss_fn(output.reshape(-1, output.size(-1)), tgt[:, 1:].reshape(-1))
            total_loss += loss.item()
            # print(f"loss: {loss.item()}")
    
    return total_loss / len(dataloader)

In [48]:
from typing import Dict, List

from tqdm.auto import tqdm

def train(model: torch.nn.Module,
          train_dataloader: torch.utils.data.DataLoader,
          test_dataloader: torch.utils.data.DataLoader,
          optimizer: torch.optim.Optimizer,
          loss_fn: torch.nn.Module,
          epochs: int,
          device: torch.device) -> Dict[str, List[float]]:
    
    results = {
        "train_loss": [],
        "test_loss": []
    }

    for epoch in tqdm(range(epochs)):
        train_loss = train_step(model, train_dataloader, optimizer, loss_fn, device)
        test_loss = test_step(model, test_dataloader, loss_fn, device)

        results["train_loss"].append(train_loss)
        results["test_loss"].append(test_loss)

        print(f"Epoch {epoch+1}/{epochs} - Train Loss: {train_loss:.4f} - Test Loss: {test_loss:.4f}")
    
    return results

### 4. Creating function to load and save the model

In [49]:
from pathlib import Path

def save_model(model: torch.nn.Module,
               save_path: str,
               model_name: str) -> None:
    
    target_dir_path = Path(save_path)
    target_dir_path.mkdir(parents=True, exist_ok=True)

    assert model_name.endswith(".pt") or model_name.endswith(".pth"), "model_name should end with .pt or .pth"
    model_path = target_dir_path / model_name

    torch.save(obj=model.state_dict(), f=model_path)
    print(f"Model saved to {model_path}")
        

In [50]:
def load_model(model: torch.nn.Module,
               load_path: str,
               model_name: str) -> None:
    
    model_path = Path(load_path) / model_name

    if not model_path.exists():
        raise FileNotFoundError(f"Model file {model_path} does not exist")

    model.load_state_dict(torch.load(model_path, map_location="cpu"))
    print(f"Model loaded from {model_path}")

## 5. Train, evaluate and save the model

In [None]:
torch.manual_seed(0)
torch.cuda.manual_seed(0)
random.seed(0)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

raw_dataset = EngFraRawDataset(file_path=DATA_PATH, 
                               max_length=MAX_SENTENCE_LENGTH)
print(f"Dataset size: {len(raw_dataset)}")

eng_word2idx, eng_idx2word, fra_word2idx, fra_idx2word = build_vocab(pairs=raw_dataset.pair, 
                                                                     min_freq=MIN_FREQ)
print(f"English vocab size: {len(eng_word2idx)}")
print(f"French vocab size: {len(fra_word2idx)}")

random.shuffle(raw_dataset.pair)
train_size = int(0.8 * len(raw_dataset))
test_size = len(raw_dataset) - train_size
train_pairs = raw_dataset.pair[:train_size]
test_pairs = raw_dataset.pair[train_size:]
print(f"Train size: {len(train_pairs)}")
print(f"Test size: {len(test_pairs)}")

train_dataset = EngFraDataset(pairs=train_pairs, 
                              eng_word2idx=eng_word2idx, 
                              fra_word2idx=fra_word2idx)
test_dataset = EngFraDataset(pairs=test_pairs, 
                             eng_word2idx=eng_word2idx, 
                             fra_word2idx=fra_word2idx)

train_dataloader = DataLoader(train_dataset, 
                              batch_size=BATCH_SIZE, 
                              shuffle=True, 
                              collate_fn=collate_fn)

test_dataloader = DataLoader(test_dataset,
                             batch_size=BATCH_SIZE, 
                             shuffle=False, 
                             collate_fn=collate_fn)

embed_size = 128
hidden_size = 256
model = Seq2Seq(src_vocab_size=len(eng_word2idx),
                tgt_vocab_size=len(fra_word2idx),
                embed_size=embed_size,
                hidden_size=hidden_size).to(device)

print(model)

optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
loss_fn = nn.CrossEntropyLoss(ignore_index=0)

from timeit import default_timer as timer

start_time = timer()

model_results = train(model=model,
                      train_dataloader=train_dataloader,
                      test_dataloader=test_dataloader,
                      optimizer=optimizer,
                      loss_fn=loss_fn,
                      epochs=EPOCHS,
                      device=device)

end_time = timer()

print(f"[INFO] Total training time: {end_time-start_time:.3f} seconds")

save_model(model=model,
           save_path="../models",
           model_name="encoder_decoder_rnn.pth")

Using device: cuda
Dataset size: 134282
English vocab size: 9944
French vocab size: 17347
Train size: 107425
Test size: 26857
Seq2Seq(
  (encoder): RNNEncoder(
    (embedding): Embedding(9944, 128, padding_idx=0)
    (rnncell): SimpleRNN(
      (i2h): Linear(in_features=128, out_features=256, bias=True)
      (h2h): Linear(in_features=256, out_features=256, bias=True)
      (h2o): Linear(in_features=256, out_features=0, bias=True)
    )
  )
  (decoder): RNNDecoder(
    (embedding): Embedding(17347, 128, padding_idx=0)
    (rnncell): SimpleRNN(
      (i2h): Linear(in_features=128, out_features=256, bias=True)
      (h2h): Linear(in_features=256, out_features=256, bias=True)
      (h2o): Linear(in_features=256, out_features=17347, bias=True)
    )
    (softmax): Softmax(dim=-1)
  )
)




  0%|          | 0/5 [00:00<?, ?it/s]

AttributeError: 'tuple' object has no attribute 'reshape'

In [72]:
model_trail = Seq2Seq(src_vocab_size=len(eng_word2idx),
                     tgt_vocab_size=len(fra_word2idx),
                     embed_size=embed_size,
                     hidden_size=hidden_size).to(device)

load_model(model=model_trail,
           load_path="../models",
           model_name="encoder_decoder_rnn.pth")

model_trail.eval()
# sample_input = "hello world this is a test"
sample_input = "I am happy"

def tokenize_input(input_str: str) -> List[str]:
    tokens = basic_tokenize(input_str)
    return [eng_word2idx.get(w, eng_word2idx[UNK_TOKEN]) for w in tokens]

sample_input = tokenize_input(sample_input)
sample_input = torch.LongTensor(sample_input).unsqueeze(0).to(device)

with torch.inference_mode():
    hidden_state = model_trail.encoder(sample_input)

    dec_input = torch.LongTensor([fra_word2idx[SOS_TOKEN]]).unsqueeze(0).to(device)
    output_sentence = []

    for _ in range(MAX_SENTENCE_LENGTH):
        x = model_trail.decoder.embedding(dec_input[:, -1])
        logits, hidden_state = model_trail.decoder.rnncell(x, hidden_state)
        logits = model_trail.decoder.softmax(logits)
        next_token = torch.argmax(logits, dim=-1)

        if next_token.item() == fra_word2idx[EOS_TOKEN]:
            break

        output_sentence.append(next_token.item())
        dec_input = torch.cat([dec_input, next_token.unsqueeze(0)], dim=1)
    
    idx2word_fr = {idx: word for word, idx in fra_word2idx.items()}
    translated = [idx2word_fr.get(i, UNK_TOKEN) for i in output_sentence]

translated = " ".join(translated).replace("<sos>", "").replace("<eos>", "").strip()
print(f"Translated sentence: {translated}")
print(f"Original sentence: {sample_input}")

Model loaded from ..\models\encoder_decoder_rnn.pth
Translated sentence: je ne me pas de
Original sentence: tensor([[ 12,  92, 750]], device='cuda:0')




In [71]:
# filepath: (if placing in notebook)
def translate_sentence(model: Seq2Seq, input_str: str, device: torch.device) -> str:
    tokens = tokenize_input(input_str)
    src_tensor = torch.LongTensor(tokens).unsqueeze(0).to(device)
    with torch.inference_mode():
        hidden_state = model.encoder(src_tensor)
        dec_input = torch.LongTensor([fra_word2idx[SOS_TOKEN]]).unsqueeze(0).to(device)
        output_sentence = []
        for _ in range(MAX_SENTENCE_LENGTH):
            x = model.decoder.embedding(dec_input[:, -1])
            logits, hidden_state = model.decoder.rnncell(x, hidden_state)
            logits = model.decoder.softmax(logits)
            next_token = torch.argmax(logits, dim=-1)
            if next_token.item() == fra_word2idx[EOS_TOKEN]:
                break
            output_sentence.append(next_token.item())
            dec_input = torch.cat([dec_input, next_token.unsqueeze(0)], dim=1)

        idx2word_fr = {idx: word for word, idx in fra_word2idx.items()}
        translated = " ".join(idx2word_fr.get(i, UNK_TOKEN) for i in output_sentence)
    return translated

In [74]:
model_trail = Seq2Seq(src_vocab_size=len(eng_word2idx),
                     tgt_vocab_size=len(fra_word2idx),
                     embed_size=embed_size,
                     hidden_size=hidden_size).to(device)

load_model(model=model_trail,
           load_path="../models",
           model_name="encoder_decoder_rnn.pth")

sample_input = "I am happy"

translated = translate_sentence(model_trail, sample_input, device)
print(f"Translated sentence: {translated}")
print(f"Original sentence: {sample_input}")

Model loaded from ..\models\encoder_decoder_rnn.pth
Translated sentence: je ne me pas de
Original sentence: I am happy
