In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import os
local_dir = 's18-transformer-speeding-up-strategy'
repo_url = 'https://github.com/aakashvardhan/s18-transformer-speeding-up-strategy.git'

# Check if the local directory already exists
if not os.path.exists(local_dir):
    # Clone the repository because it does not exist
    !git clone --quiet {repo_url}
else:
    # Change directory to the local repository
    %cd {local_dir}
    # Pull the latest changes because the repository already exists
    !git pull

/content/s18-transformer-speeding-up-strategy
Already up to date.


In [3]:
import sys
sys.path.append('/content/s18-transformer-speeding-up-strategy')

In [4]:
!pip install -q -r /content/s18-transformer-speeding-up-strategy/requirements.txt

In [5]:
import torch
from torch.utils.data import Dataset, DataLoader, random_split
from lightning import LightningDataModule
from datasets import load_dataset
from tokenizers import Tokenizer
from tokenizers.models import WordLevel
from tokenizers.pre_tokenizers import Whitespace
from tokenizers.trainers import WordLevelTrainer
from torch.nn.utils.rnn import pad_sequence
from pathlib import Path


class BillingualDataset(Dataset):
    def __init__(self, ds, tokenizer_src, tokenizer_tgt, src_lang, tgt_lang, seq_len):
        super().__init__()
        self.seq_len = seq_len
        self.ds = ds
        self.tokenizer_src = tokenizer_src
        self.tokenizer_tgt = tokenizer_tgt
        self.src_lang = src_lang
        self.tgt_lang = tgt_lang

        self.sos_token = torch.tensor(
            [tokenizer_tgt.token_to_id("[SOS]")], dtype=torch.int64
        )
        self.eos_token = torch.tensor(
            [tokenizer_tgt.token_to_id("[EOS]")], dtype=torch.int64
        )
        self.pad_token = torch.tensor(
            [tokenizer_tgt.token_to_id("[PAD]")], dtype=torch.int64
        )

    def __len__(self):
        return len(self.ds)

    def __getitem__(self, idx):
        src_tgt_pair = self.ds[idx]
        src_text = src_tgt_pair["translation"][self.src_lang]
        tgt_text = src_tgt_pair["translation"][self.tgt_lang]

        enc_input_tokens = self.tokenizer_src.encode(src_text).ids
        dec_input_tokens = self.tokenizer_tgt.encode(tgt_text).ids

        enc_num_padding_tokens = self.seq_len - len(enc_input_tokens) - 2
        dec_num_padding_tokens = self.seq_len - len(dec_input_tokens) - 1
        # For encoding, we PAD both SOS and EOS. For decoding, we only pad SOS.
        # THe model is required to predict EOS and stop on its own.

        # Make sure that padding is not negative (ie the sentance is too long)
        if enc_num_padding_tokens < 0 or dec_num_padding_tokens < 0:
            raise ValueError("Sentence too long")

        encoder_input = torch.cat(
            [
                self.sos_token,
                torch.tensor(enc_input_tokens, dtype=torch.int64),
                self.eos_token,
                torch.tensor(
                    [self.pad_token] * enc_num_padding_tokens, dtype=torch.int64
                ),
            ],
            dim=0,
        )

        decoder_input = torch.cat(
            [
                self.sos_token,
                torch.tensor(dec_input_tokens, dtype=torch.int64),
                torch.tensor(
                    [self.pad_token] * dec_num_padding_tokens, dtype=torch.int64
                ),
            ],
            dim=0,
        )

        label = torch.cat(
            [
                torch.tensor(dec_input_tokens, dtype=torch.int64),
                self.eos_token,
                torch.tensor(
                    [self.pad_token] * dec_num_padding_tokens, dtype=torch.int64
                ),
            ],
            dim=0,
        )

        assert encoder_input.size(0) == self.seq_len
        assert decoder_input.size(0) == self.seq_len
        assert label.size(0) == self.seq_len

        return {
            "encoder_input": encoder_input,
            "decoder_input": decoder_input,
            # "encoder_mask": (encoder_input != self.pad_token)
            # .unsqueeze(0)
            # .unsqueeze(0)
            # .int(),
            # # encoder mask: (1, 1, seq_len) -> Has 1 when there is text and 0 when there is pad (no text)
            # "decoder_mask": (decoder_input != self.pad_token).unsqueeze(0).int()
            # & causal_mask(decoder_input.size(0)),
            # (1, seq_len) and (1, seq_len, seq_len)
            # Will get 0 for all pads. And 0 for earlier text.
            "label": label,
            "src_text": src_text,
            "tgt_text": tgt_text,
            "encoder_str_length": len(enc_input_tokens),
            "decoder_str_length": len(dec_input_tokens),
        }

    def has_nan(self):
        for idx in range(len(self)):
            sample = self[idx]
            for key, value in sample.items():
                if isinstance(value, torch.Tensor) and torch.isnan(value).any():
                    print(f"NaN found in sample {idx}, key: {key}")
                    return True
        return False

In [6]:
# run tokenizer
def get_all_sentences(ds, lang):
    for item in ds:
        yield item['translation'][lang]

def get_or_build_tokenizer(config, ds,lang):

    tokenizer_path = Path(config['tokenizer_file'].format(lang))
    if not Path.exists(tokenizer_path):
        tokenizer = Tokenizer(WordLevel(unk_token="[UNK]"))
        tokenizer.pre_tokenizer = Whitespace()
        trainer = WordLevelTrainer(special_tokens=["[UNK]","[PAD]","[SOS]","[EOS]"],min_frequency = 2)
        tokenizer.train_from_iterator(get_all_sentences(ds,lang),trainer = trainer)
        tokenizer.save(str(tokenizer_path))
    else:
        tokenizer = Tokenizer.from_file(str(tokenizer_path))
    return tokenizer

In [7]:
from utils import clean_long_text
from config_file import get_config
config = get_config()


ds_raw = load_dataset('opus_books',f"{config['lang_src']}-{config['lang_tgt']}",split='train' )
ds_raw = clean_long_text(config,ds_raw)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


In [8]:
tokenizer_src = get_or_build_tokenizer(config, ds_raw, config['lang_src'])
tokenizer_tgt = get_or_build_tokenizer(config, ds_raw, config['lang_tgt'])

In [9]:
tokenizer_src

<tokenizers.Tokenizer at 0x7c4d55661c30>

In [10]:
train_ds = BillingualDataset(ds_raw, tokenizer_src, tokenizer_tgt, config['lang_src'], config['lang_tgt'], config['seq_len'])

In [11]:
train_ds.__getitem__(2)

{'encoder_input': tensor([  2, 114,  32,   3,   1,   1,   1,   1,   1,   1,   1,   1,   1,   1,
           1,   1,   1,   1,   1,   1,   1,   1,   1,   1,   1,   1,   1,   1,
           1,   1,   1,   1,   1,   1,   1,   1,   1,   1,   1,   1,   1,   1,
           1,   1,   1,   1,   1,   1,   1,   1,   1,   1,   1,   1,   1,   1,
           1,   1,   1,   1,   1,   1,   1,   1,   1,   1,   1,   1,   1,   1,
           1,   1,   1,   1,   1,   1,   1,   1,   1,   1,   1,   1,   1,   1,
           1,   1,   1,   1,   1,   1,   1,   1,   1,   1,   1,   1,   1,   1,
           1,   1,   1,   1,   1,   1,   1,   1,   1,   1,   1,   1,   1,   1,
           1,   1,   1,   1,   1,   1,   1,   1,   1,   1,   1,   1,   1,   1,
           1,   1,   1,   1,   1,   1,   1,   1,   1,   1,   1,   1,   1,   1,
           1,   1,   1,   1,   1,   1,   1,   1,   1,   1,   1,   1,   1,   1,
           1,   1,   1,   1,   1,   1]),
 'decoder_input': tensor([2, 0, 5, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,

In [12]:
if train_ds.has_nan():
    print("NaN values found.")
else:
    print("No NaN values found.")

No NaN values found.


In [14]:
from dataset import LiTDataModule

# Initialize and setup the LightningDataModule
data_module = LiTDataModule(config)
data_module.setup()

# Get train DataLoader and iterate over it to find NaN values
train_dl = data_module.train_dataloader()

for batch_idx, batch in enumerate(train_dl):
    nan_found = False

    # Check for NaN values in each tensor within the batch
    for key, tensor in batch.items():
        if isinstance(tensor, torch.Tensor) and torch.isnan(tensor).any():
            print(f"NaN found in batch {batch_idx}, key: {key}")
            nan_found = True

Max length of the source sentence : 45
Max length of the source target : 41


