# Automatic Back-Transliteration

This notebook will get you started in running the automatic romanized Bangla back-transliteration model using the TB Encoder Enhance t5 model, which is the best performing model based on our experimental results.

## Importing and Installing libraries

In [None]:
import os
import gc
import re
import ast
import sys
import copy
import json
import time
import math
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
from torch.nn import Parameter
import torch.nn.functional as F
from torch.optim import Adam, SGD, AdamW
import torch.optim as optim
from transformers import T5Tokenizer, T5ForConditionalGeneration, TrainingArguments
from torch.utils.data import DataLoader, Dataset
from normalizer import normalize
from transformers import AutoTokenizer, AutoModel
from tqdm import tqdm

In [None]:
! pip install git+https://github.com/csebuetnlp/normalizer

In [None]:
# device availability
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

## Configuration Class

In [None]:
class CFG:
    debug=False # want to debug or not
    apex=True # for faster training
    print_freq=1000
    num_workers=4
    model_name = 'csebuetnlp/banglat5'
    encoder_name = "aplycaebous/tb-BERT-fpt"
    epochs=8
    learning_rate=2e-5
    eps=1e-6
    betas=(0.9, 0.999) # for adam optimizer
    batch_size=6 # batch size
    max_len=50
    weight_decay=0.01 # for adam optimizer regulaization parameter
    gradient_accumulation_steps=1
    max_grad_norm=1000

    seed=42 # seed no. for random initialization
    train=True
    fusion_mode = "sum" #"concat", "sum"
    filtering_th = 70

## Loading Tokenizer and Model

In [None]:
# Load tokenizer and model
CFG.t5_tokenizer = T5Tokenizer.from_pretrained(CFG.model_name)
CFG.t5_model = T5ForConditionalGeneration.from_pretrained(CFG.model_name)
CFG.encoder_tokenizer = AutoTokenizer.from_pretrained(CFG.encoder_name)
CFG.encoder_model = AutoModel.from_pretrained(CFG.encoder_name)
CFG.device = device

## Reading the Dataset

In [None]:
df_train = pd.read_csv("/kaggle/input/pentabd-transliteration-dataset-all-combined/train.csv")
df_test = pd.read_csv("/kaggle/input/pentabd-transliteration-dataset-all-combined/test.csv")
df_val =  pd.read_csv("/kaggle/input/pentabd-transliteration-dataset-all-combined/val.csv")
df_train.head()

## Simple Data Analysis

In [None]:
count_greater = (df_train['word_count_transliterated'] > CFG.filtering_th).sum()
print("Number of samples greater than 100 word count:", count_greater)
count_greater = (df_test['word_count_transliterated'] > CFG.filtering_th).sum()
print("Number of samples greater than 100 word count:", count_greater)
count_greater = (df_val['word_count_transliterated'] > CFG.filtering_th).sum()
print("Number of samples greater than 100 word count:", count_greater)

## Data Splits

In [None]:
df_train = df_train[df_train['word_count_transliterated'] <= CFG.filtering_th]
df_val = df_val[df_val['word_count_transliterated'] <= CFG.filtering_th]
df_test = df_test[df_test['word_count_transliterated'] <= CFG.filtering_th]
if CFG.debug:
    df_train = df_train[:160]
    df_test = df_test[:40]
    df_val = df_val[:40]

## Data Preprocessing

In [None]:
# Normalization for Bengali text (replace with your desired normalization function)
def normalize_bengali(text):
    normalized_text = normalize(text)
    return normalized_text
df_train['normalized_bengali'] = df_train['text_bengali'].apply(normalize_bengali)
df_test['normalized_bengali'] = df_test['text_bengali'].apply(normalize_bengali)
df_val['normalized_bengali'] = df_val['text_bengali'].apply(normalize_bengali)

In [None]:
lengths = []
tk0 = tqdm(df_train['text_transliterated'].fillna("").values, total=len(df_train))
for text in tk0:
    length = len(CFG.t5_tokenizer(text, truncation=True, add_special_tokens=False)['input_ids'])
    lengths.append(length)

t5_tokenizer_max_len = max(lengths) + 5

lengths = []
for text in tk0:
    length = len(CFG.encoder_tokenizer(text, truncation=True, add_special_tokens=False)['input_ids'])
    lengths.append(length)


encoder_tokenizer_max_len = max(lengths) + 5

lengths = []
tk1 = tqdm(df_train['normalized_bengali'].fillna("").values, total=len(df_train))
for text in tk1:
    length = len(CFG.t5_tokenizer(text, truncation=True, add_special_tokens=False)['input_ids'])
    lengths.append(length)

target_max_len = max(lengths) + 5
print(t5_tokenizer_max_len, encoder_tokenizer_max_len, target_max_len)

CFG.max_len = max(t5_tokenizer_max_len, encoder_tokenizer_max_len, target_max_len)

if CFG.max_len > 512:
    CFG.max_len = 512

print(CFG.max_len)

## Loading the Dataset

In [None]:
# Define a simple dataset for demonstration purposes
class SimpleDataset(Dataset):
    def __init__(self, t5_tokenizer, encoder_tokenizer, data, max_length=512):
        self.t5_tokenizer = t5_tokenizer
        self.encoder_tokenizer = encoder_tokenizer
        self.transliterated_texts = data['text_transliterated'].tolist()
        self.bangla_texts = data['text_bengali'].tolist()
        self.max_length = max_length

    def __len__(self):
        return len(self.bangla_texts)

    def __getitem__(self, idx):
        input_text = self.transliterated_texts[idx]
        target_text = self.bangla_texts[idx]
        input_ids = self.t5_tokenizer(input_text, return_tensors='pt', padding='max_length', truncation=True, max_length=self.max_length).input_ids
        encoder_inputs = self.encoder_tokenizer(input_text, return_tensors='pt', padding='max_length', truncation=True, max_length=self.max_length)
        encoder_ids = encoder_inputs.input_ids
        encoder_attn_mask = encoder_inputs.attention_mask
        target_ids = self.t5_tokenizer(target_text, return_tensors='pt', padding='max_length', truncation=True, max_length=self.max_length).input_ids
        return input_ids.squeeze(), encoder_ids.squeeze(), encoder_attn_mask.squeeze(), target_ids.squeeze()

In [None]:
# Prepare dataset and dataloader
dataset = SimpleDataset(CFG.t5_tokenizer, CFG.encoder_tokenizer, df_train, max_length=CFG.max_len)
train_dataloader = DataLoader(dataset, batch_size=CFG.batch_size, shuffle=True)

dataset = SimpleDataset(CFG.t5_tokenizer, CFG.encoder_tokenizer, df_val, max_length=CFG.max_len)
valid_dataloader = DataLoader(dataset, batch_size=CFG.batch_size, shuffle=False)

dataset = SimpleDataset(CFG.t5_tokenizer, CFG.encoder_tokenizer, df_test, max_length=CFG.max_len)
test_dataloader = DataLoader(dataset, batch_size=CFG.batch_size, shuffle=False)

In [None]:
for i in train_dataloader:
    print(i[0].shape)
    print(i[1].shape)
    print(i[2].shape)
    break

## Model

In [None]:
class CustomModel(nn.Module):
    def __init__(self, cfg, is_train = True):
        super().__init__()
        self.cfg = cfg
        self.t5_model = self.cfg.t5_model
        self.encoder_model = self.cfg.encoder_model
        self.t5_tokenizer = self.cfg.t5_tokenizer
        self.device = self.cfg.device
        self.is_train = is_train
        self.hidden_size = 768
        self.mlp = nn.Linear(self.hidden_size * 2, self.hidden_size)

    def forward(self, input_ids, encoder_input_ids, encoder_attn_mask, target_ids= None):
        t5_encoder_outputs = self.t5_model.encoder(input_ids=input_ids)
        tb_model_ouputs = self.encoder_model(input_ids=encoder_input_ids,
                                                 attention_mask = encoder_attn_mask)

        t5_final_repr = t5_encoder_outputs.last_hidden_state
        tb_final_repr = tb_model_ouputs.last_hidden_state

        if self.cfg.fusion_mode == "concat":
            updated_repr = torch.cat((t5_final_repr, tb_final_repr), dim = -1)
            updated_repr = self.mlp(updated_repr)

        elif self.cfg.fusion_mode == "sum":
            updated_repr = t5_final_repr + tb_final_repr

        else:
            updated_repr = t5_encoder_outputs

        t5_encoder_outputs['last_hidden_state'] = updated_repr


        if self.is_train:
            # Forward pass through decoder
            decoder_input_ids = target_ids[:, :-1]  # Shift target ids for decoder input
            labels = target_ids[:, 1:].clone()  # Shift target ids for labels
            labels[labels == self.t5_tokenizer.pad_token_id] = -100  # Ignore pad token positions in loss
            outputs = self.t5_model(
                encoder_outputs=t5_encoder_outputs,  # Use modified encoder hidden states
                decoder_input_ids=decoder_input_ids,
                labels=labels,
                )
        else:
            decoder_input_ids = self.t5_tokenizer.encode("", return_tensors='pt')
            decoder_input_ids = decoder_input_ids.to(self.device)
            outputs = self.t5_model(
                encoder_outputs=t5_encoder_outputs,
                decoder_input_ids=decoder_input_ids,
                )

        return outputs, t5_encoder_outputs

## Helper Functions

In [None]:
class AverageMeter(object):
    """Computes and stores the average and current value"""
    def __init__(self):
        self.reset()

    def reset(self):
        self.val = 0
        self.avg = 0
        self.sum = 0
        self.count = 0

    def update(self, val, n=1):
        self.val = val
        self.sum += val * n
        self.count += n
        self.avg = self.sum / self.count


def asMinutes(s):
    m = math.floor(s / 60)
    s -= m * 60
    return '%dm %ds' % (m, s)


def timeSince(since, percent):
    now = time.time()
    s = now - since
    es = s / (percent)
    rs = es - s
    return '%s (remain %s)' % (asMinutes(s), asMinutes(rs))

## Training

In [None]:
def train_fn(train_loader, model, optimizer, epoch, device=CFG.device):
    # Enabling Model Training Mode
    model.train()

    losses = AverageMeter()
    start = end = time.time()
    global_step = 0

    for step, batch in enumerate(train_loader):
        input_ids, encoder_inputs, encoder_attn_mask, target_ids = batch

        input_ids = input_ids.to(device)
        encoder_inputs = encoder_inputs.to(device)
        encoder_attn_mask = encoder_attn_mask.to(device)
        target_ids = target_ids.to(device)

        batch_size = input_ids.size(0)

        outputs, _ = model(input_ids, encoder_inputs, encoder_attn_mask, target_ids)
        loss = outputs.loss

        grad_norm = torch.nn.utils.clip_grad_norm_(model.parameters(), CFG.max_grad_norm)
        losses.update(loss.item(), batch_size)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        end = time.time()

        if step % CFG.print_freq == 0 or step == (len(train_loader)-1):
            print('Epoch: [{0}][{1}/{2}] '
                  'Elapsed {remain:s} '
                  'Loss: {loss.val:.4f}({loss.avg:.4f}) '
                  'Grad: {grad_norm:.4f}  '
                  .format(epoch+1, step, len(train_loader),
                          remain=timeSince(start, float(step+1)/len(train_loader)),
                          loss=losses,
                          grad_norm=grad_norm))

    return losses.avg

In [None]:
def valid_fn(valid_loader, model, epoch, device=CFG.device):
    # Enabling Model Training Mode
    model.eval()

    losses = AverageMeter()
    start = end = time.time()
    global_step = 0

    for step, batch in enumerate(valid_loader):

        input_ids, encoder_inputs, encoder_attn_mask, target_ids = batch

        input_ids = input_ids.to(device)
        encoder_inputs = encoder_inputs.to(device)
        encoder_attn_mask = encoder_attn_mask.to(device)
        target_ids = target_ids.to(device)

        batch_size = input_ids.size(0)

        with torch.no_grad():
            outputs, _ = model(input_ids, encoder_inputs, encoder_attn_mask, target_ids)
            loss = outputs.loss

        losses.update(loss.item(), batch_size)
        end = time.time()

        if step % CFG.print_freq == 0 or step == (len(valid_loader)-1):
            print('EVAL: [{0}/{1}] '
                  'Elapsed {remain:s} '
                  'Loss: {loss.val:.4f}({loss.avg:.4f}) '
                  .format(step, len(valid_loader),
                          loss=losses,
                          remain=timeSince(start, float(step+1)/len(valid_loader))))

    return losses.avg

In [None]:
# train loop

def train_loop():

    model = CustomModel(CFG)
    model = model.to(CFG.device) # GPU Config

    optimizer = AdamW(model.parameters(), lr=CFG.learning_rate, eps=CFG.eps, betas=CFG.betas)

    best_loss = 1e4

    for epoch in range(CFG.epochs):

        start_time = time.time()

        # train function
        avg_loss = train_fn(train_dataloader, model, optimizer, epoch)

        # eval function
        avg_val_loss = valid_fn(valid_dataloader, model, device)


        elapsed = time.time() - start_time

        if best_loss > avg_val_loss: # Saving the best model w.r.t the score
            best_loss = avg_val_loss
            torch.save({'model': model.state_dict(),
                        },
                        OUTPUT_DIR+f"{CFG.fusion_mode}_best_loss.pth")


    torch.cuda.empty_cache()
    gc.collect()

    return best_loss

In [None]:
OUTPUT_DIR = "./"
best_loss = train_loop()
model = CustomModel(CFG)
model = model.to(CFG.device) # GPU Config
state = torch.load(OUTPUT_DIR+f"{CFG.fusion_mode}_best_loss.pth",
                       map_location=torch.device('cpu'))

model.load_state_dict(state['model'])
print(model)

## Inferencing

In [None]:
def generate_text_fn(test_loader, model, tokenizer, max_length=50, device=CFG.device):
    # Set model to evaluation mode
    model.eval()

    generated_texts = []

    with torch.no_grad():
        for step, batch in enumerate(test_loader):
            # Load data from the test loader
            input_ids, encoder_inputs, encoder_attn_mask, _ = batch
            input_ids = input_ids.to(device)
            encoder_inputs = encoder_inputs.to(device)
            encoder_attn_mask = encoder_attn_mask.to(device)

#             outputs, t5_encoder_outputs = model(input_ids, encoder_inputs,
#                                                 encoder_attn_mask, target_ids)

            # Generate text
            output_sequences = model.t5_model.generate(
                input_ids=input_ids,
                attention_mask=encoder_attn_mask,
                max_length=max_length,
                num_beams=4,  # Adjust as needed
                early_stopping=True
            )

            # Decode generated sequences
            generated_texts.extend([tokenizer.decode(output, skip_special_tokens=True) for output in output_sequences])

    return generated_texts

In [None]:
generated_texts = generate_text_fn(test_dataloader, model, CFG.t5_tokenizer)
df_test['predictions'] = generated_texts
df_test.head()

In [None]:
exp_name = CFG.model_name.replace('/', '-') + "_" + CFG.encoder_name.replace('/', '-') + "_" + CFG.fusion_mode
print(exp_name)
df_test.to_csv(f"./Our_Model_{exp_name}_Predictions.csv", index = False)

In [None]:
model = CustomModel(CFG, is_train = False)
model = model.to(CFG.device) # GPU Config
state = torch.load(OUTPUT_DIR+f"{CFG.fusion_mode}_best_loss.pth",
                       map_location=torch.device('cpu'))

model.load_state_dict(state['model'])

In [None]:
def generate_single_text_fn(input_text, model, device=CFG.device):
    # Set model to evaluation mode
    model.eval()
    model.to(device)

    decoder_input_ids = CFG.t5_tokenizer.encode("", return_tensors='pt')
    decoder_input_ids = decoder_input_ids.to(device)

    input_ids = CFG.t5_tokenizer.encode(input_text, return_tensors='pt', padding='max_length',
                                                truncation=True, max_length=CFG.max_len)

    encoder_inputs = CFG.encoder_tokenizer(input_text, return_tensors='pt', padding='max_length',
                                                truncation=True, max_length=CFG.max_len)
    encoder_ids = encoder_inputs.input_ids
    encoder_attn_mask = encoder_inputs.attention_mask

    input_ids = input_ids.to(device)
    encoder_ids = encoder_ids.to(device)
    encoder_attn_mask = encoder_attn_mask.to(device)

    outputs, t5_encoder_outputs = model(input_ids, encoder_ids,
                                        encoder_attn_mask)

    # Generate text
    output_sequences = model.t5_model.generate(
            input_ids=None,  # The input_ids will be None since we use encoder_outputs directly
            encoder_outputs=t5_encoder_outputs,
            decoder_input_ids=decoder_input_ids,
            max_length=50,  # Set the max length for generated text
            num_beams=5,  # Beam search for better quality text
            early_stopping=True  # Stop when the end token is generated
        )

    generated_text = CFG.t5_tokenizer.decode(output_sequences[0], skip_special_tokens=True)
    return generated_text

## Sample Inference

In [None]:
input_text = "Ami vaat khai"
text = generate_single_text_fn(input_text, model)
text

## Additional Code

In [None]:
# # Set the model to evaluation mode
# model.eval()

# # Function to generate text for a test sample
# def generate_text(input_text):
#     input_ids = tokenizer(input_text, return_tensors='pt').input_ids


# t5_encoder_outputs
#     # Prepare the decoder input
#     decoder_input_ids = tokenizer.encode("", return_tensors='pt')

#     # Generate text using the decoder
#     outputs = model.generate(
#         input_ids=None,  # The input_ids will be None since we use encoder_outputs directly
#         encoder_outputs=encoder_outputs,
#         decoder_input_ids=decoder_input_ids,
#         max_length=50,  # Set the max length for generated text
#         num_beams=5,  # Beam search for better quality text
#         early_stopping=True  # Stop when the end token is generated
#     )

#     # Decode the generated tokens to text
#     generated_text = tokenizer.decode(outputs[0], skip_special_tokens=True)
#     return generated_text

# # Test input text
# test_input_text = "translate English to French: I love natural language processing."

# # Generate text for the test sample
# generated_output_text = generate_text(test_input_text)
# print("Generated text:", generated_output_text)

# # For batch wise

# from transformers import MarianMTModel, MarianTokenizer

# # Load the MarianMTModel for English to French translation
# model_name = "Helsinki-NLP/opus-mt-en-fr"
# tokenizer = MarianTokenizer.from_pretrained(model_name)
# model = MarianMTModel.from_pretrained(model_name)

# # Define a list of input sentences in English
# input_texts = ["Hello, how are you?", "What is your name?", "How was your day?"]

# # Tokenize the input texts
# inputs = tokenizer(input_texts, return_tensors="pt", padding=True, truncation=True)

# # Translate the input texts in batch
# translated = model.generate(**inputs)

# # Decode the translated outputs
# translated_texts = tokenizer.batch_decode(translated, skip_special_tokens=True)

# # Print the translated texts
# for input_text, translated_text in zip(input_texts, translated_texts):
#     print("Input:", input_text)
#     print("Translated:", translated_text)