In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset, Dataset
import pandas as pd
import numpy as np
import re
import random



https://github.com/bentrevett/pytorch-seq2seq/tree/main

https://github.com/pankajrawat9075/English-Hindi-Language-Transliteration-using-Deep-Learning/tree/main

https://github.com/biku1998/Neural-machine-transliteration-using-Pytorch/tree/master

In [2]:
# set the device we will be using to train the model
device = torch.device("cuda:1" if torch.cuda.is_available() else "cpu")

In [3]:
device

device(type='cuda', index=1)

In [4]:
# storing all the alphabets of English and the pad char to a dictionary to create OHE representation later.
eng_alphabets = 'abcdefghijklmnopqrstuvwxyz'
pad_char = '-PAD-'

eng_alpha2index = {pad_char: 0}
for index, alpha in enumerate(eng_alphabets):
    eng_alpha2index[alpha] = index+1

print(eng_alpha2index)

{'-PAD-': 0, 'a': 1, 'b': 2, 'c': 3, 'd': 4, 'e': 5, 'f': 6, 'g': 7, 'h': 8, 'i': 9, 'j': 10, 'k': 11, 'l': 12, 'm': 13, 'n': 14, 'o': 15, 'p': 16, 'q': 17, 'r': 18, 's': 19, 't': 20, 'u': 21, 'v': 22, 'w': 23, 'x': 24, 'y': 25, 'z': 26}


In [9]:
# Tamil Unicode Hex Range is 2944:3071. Source: https://en.wikipedia.org/wiki/Tamil_(Unicode_block)
tamil_alphabets = [chr(alpha) for alpha in range(2944, 3072)]
tamil_alphabet_size = len(tamil_alphabets)

tamil_alpha2index = {0:pad_char}
for index, alpha in enumerate(tamil_alphabets):
    tamil_alpha2index[index+1] = alpha

print(tamil_alpha2index)

{0: '-PAD-', 1: '\u0b80', 2: '\u0b81', 3: 'ஂ', 4: 'ஃ', 5: '\u0b84', 6: 'அ', 7: 'ஆ', 8: 'இ', 9: 'ஈ', 10: 'உ', 11: 'ஊ', 12: '\u0b8b', 13: '\u0b8c', 14: '\u0b8d', 15: 'எ', 16: 'ஏ', 17: 'ஐ', 18: '\u0b91', 19: 'ஒ', 20: 'ஓ', 21: 'ஔ', 22: 'க', 23: '\u0b96', 24: '\u0b97', 25: '\u0b98', 26: 'ங', 27: 'ச', 28: '\u0b9b', 29: 'ஜ', 30: '\u0b9d', 31: 'ஞ', 32: 'ட', 33: '\u0ba0', 34: '\u0ba1', 35: '\u0ba2', 36: 'ண', 37: 'த', 38: '\u0ba5', 39: '\u0ba6', 40: '\u0ba7', 41: 'ந', 42: 'ன', 43: 'ப', 44: '\u0bab', 45: '\u0bac', 46: '\u0bad', 47: 'ம', 48: 'ய', 49: 'ர', 50: 'ற', 51: 'ல', 52: 'ள', 53: 'ழ', 54: 'வ', 55: 'ஶ', 56: 'ஷ', 57: 'ஸ', 58: 'ஹ', 59: '\u0bba', 60: '\u0bbb', 61: '\u0bbc', 62: '\u0bbd', 63: 'ா', 64: 'ி', 65: 'ீ', 66: 'ு', 67: 'ூ', 68: '\u0bc3', 69: '\u0bc4', 70: '\u0bc5', 71: 'ெ', 72: 'ே', 73: 'ை', 74: '\u0bc9', 75: 'ொ', 76: 'ோ', 77: 'ௌ', 78: '்', 79: '\u0bce', 80: '\u0bcf', 81: 'ௐ', 82: '\u0bd1', 83: '\u0bd2', 84: '\u0bd3', 85: '\u0bd4', 86: '\u0bd5', 87: '\u0bd6', 88: 'ௗ', 89: '\u0bd8', 90: '

In [15]:
word = [8, 34, 28, 80, 24, 77, 50, 80, 50, 65]
word = [24, 79, 39, 49, 65, 44, 80]
decoded_word = "".join([tamil_alpha2index.get(char-2) for char in word])
decoded_word

'கௌதமான்'

**Helper Functions**

In [None]:
# Funcitons used to do some pre-processing.
# removing all non-alphabetic char in English as well as Tamil.

non_eng_letters_regex = re.compile('[^a-zA-Z ]')

# Remove all English non-letters
def cleanEnglishVocab(line):
    line = line.replace('-', ' ').replace(',', ' ').lower()
    line = non_eng_letters_regex.sub('', line)
    return line.split()

# Remove all Tamil non-letters
def cleanTamilVocab(line):
    line = line.replace('-', ' ').replace(',', ' ')
    cleaned_line = ''
    for char in line:
        if char in tamil_alpha2index or char == ' ':
            cleaned_line += char
    return cleaned_line.split()

**Dataloader**

In [None]:
class TransliterationDataLoader(Dataset):
    def __init__(self, filename):
        self.eng_words, self.hindi_words = self.readXmlDataset(filename, cleanTamilVocab)
        self.shuffle_indices = list(range(len(self.eng_words)))
        random.shuffle(self.shuffle_indices)
        self.shuffle_start_index = 0

    def __len__(self):
        return len(self.eng_words)

    def __getitem__(self, idx):
        return self.eng_words[idx], self.hindi_words[idx]

    def readXmlDataset(self, filename, lang_vocab_cleaner):
        '''Task : to read the xml file and store all the contents in a list.
                  Then we will do some pre-processing of data to remove noise as well as delimeters. '''
        transliterationCorpus = pd.read_csv(filename,  header=None)
        en_words, ta_words = transliterationCorpus.iloc[:, 0], transliterationCorpus.iloc[:, 1]

        lang1_words = []
        lang2_words = []

        for idx in range(len(en_words)):
            wordlist1 = cleanEnglishVocab(en_words[idx]) # clean English words.
            wordlist2 = cleanTamilVocab(ta_words[idx])# clean hindi words.

            # Skip noisy data
            if len(wordlist1) != len(wordlist2):
                print('Skipping: ', en_words[idx], ' - ', ta_words[idx])
                continue

            for word in wordlist1:
                lang1_words.append(word)
            for word in wordlist2:
                lang2_words.append(word)

        return lang1_words, lang2_words

    def get_random_sample(self):
        return self.__getitem__(np.random.randint(len(self.eng_words)))

    def get_batch_from_array(self, batch_size, array): # child function of get_batch() function.
        '''Given an array , and batch size , this fucntion will return some samples from the array i.e can be HindiWords or EnglishWords etc. '''
        end = self.shuffle_start_index + batch_size # what index till i want to go.
        batch = []
        if end >= len(self.eng_words): # if we overflow the words array , we have to loop back.
            batch = [array[i] for i in self.shuffle_indices[0:end%len(self.eng_words)]]
            end = len(self.eng_words)
        return batch + [array[i] for i in self.shuffle_indices[self.shuffle_start_index : end]]

    def get_batch(self, batch_size, postprocess = True):
        eng_batch = self.get_batch_from_array(batch_size, self.eng_words)
        hindi_batch = self.get_batch_from_array(batch_size, self.hindi_words)
        self.shuffle_start_index += batch_size + 1

        # Reshuffle if 1 epoch is complete
        if self.shuffle_start_index >= len(self.eng_words):
            random.shuffle(self.shuffle_indices)
            self.shuffle_start_index = 0

        return eng_batch, hindi_batch

In [None]:
train_data = TransliterationDataLoader('tam_train.csv')
val_data = TransliterationDataLoader('tam_valid.csv')

**Basic Data Visualization**

In [None]:
print("Train Set Size:\t", len(train_data))
print("Validation Set Size:\t", len(val_data))

print('\nSample data from train-set:')
for i in range(10):
    eng, tam = train_data.get_random_sample()
    print(eng + ' - ' + tam)

Train Set Size:	 51200
Validation Set Size:	 4096

Sample data from train-set:
ravipudram - ரவிபுத்ரம்
anumathikkuriya - அனுமதிக்குரிய
sumikkasha - சுமிக்கஷா
inthappulligalaal - இந்தப்புள்ளிகளால்
ubaga - உபக
kavalaikkollaathirukka - கவலைக்கொள்ளாதிருக்க
maiyavaatham - மையவாதம்
stalinukumtan - ஸ்டாலினுக்கும்தான்
avasiyamaakkiya - அவசியமாக்கிய
kannalmozhi - கன்னல்மொழி


In [None]:
train_data.get_batch(10)

(['anandrathu',
  'aalaakiyullaen',
  'padiganga',
  'malaiyaalaththirkum',
  'valia',
  'kurippidaththodangiya',
  'iruthiyaattralaiyum',
  'vaikkappaduvathundu',
  'puumaraangk',
  'evidae'],
 ['அனன்றது',
  'ஆளாகியுள்ளேன்',
  'பாடிகங்கா',
  'மலையாளத்திற்கும்',
  'வாலியா',
  'குறிப்பிடத்தொடங்கிய',
  'இறுதியாற்றலையும்',
  'வைக்கப்படுவதுண்டு',
  'பூமராங்க்',
  'எவிடே'])

**Sequence to Sequence Modelling**

In [None]:
class Seq2SeqModel(nn.Module):
    def __init__(self, input_size, output_size, embedding_dim, hidden_dim, num_layers, cell_type='lstm', bidirectional=False, dropout=0.0):
        super(Seq2SeqModel, self).__init__()
        self.input_size = input_size
        self.output_size = output_size
        self.embedding_dim = embedding_dim
        self.hidden_dim = hidden_dim
        self.num_layers = num_layers
        self.cell_type = cell_type
        self.bidirectional = bidirectional
        self.dropout = dropout

        self.embedding = nn.Embedding(input_size, embedding_dim)

        if cell_type == 'rnn':
            self.encoder = nn.RNN(embedding_dim, hidden_dim, num_layers, bidirectional=bidirectional, dropout=dropout)
            self.decoder = nn.RNN(hidden_dim * (2 if bidirectional else 1), hidden_dim, num_layers, dropout=dropout)
        elif cell_type == 'lstm':
            self.encoder = nn.LSTM(embedding_dim, hidden_dim, num_layers, bidirectional=bidirectional, dropout=dropout)
            self.decoder = nn.LSTM(hidden_dim * (2 if bidirectional else 1), hidden_dim, num_layers, dropout=dropout)
        elif cell_type == 'gru':
            self.encoder = nn.GRU(embedding_dim, hidden_dim, num_layers, bidirectional=bidirectional, dropout=dropout)
            self.decoder = nn.GRU(hidden_dim * (2 if bidirectional else 1), hidden_dim, num_layers, dropout=dropout)

        self.fc = nn.Linear(hidden_dim, output_size)

    def forward(self, input_seq, hidden=None, beam_size=1):
        batch_size = input_seq.size(1)
        seq_length = input_seq.size(0)

        embedded = self.embedding(input_seq)

        outputs, hidden = self.encoder(embedded, hidden)

        if self.bidirectional:
            hidden = (hidden[0].view(self.num_layers, 2, batch_size, self.hidden_dim)[:, :1].contiguous(),
                      hidden[1].view(self.num_layers, 2, batch_size, self.hidden_dim)[:, :1].contiguous())
        else:
            hidden = (hidden[0].view(self.num_layers, 1, batch_size, self.hidden_dim),
                      hidden[1].view(self.num_layers, 1, batch_size, self.hidden_dim))

        if beam_size > 1:
            hidden = (hidden[0].repeat(1, beam_size, 1, 1),
                      hidden[1].repeat(1, beam_size, 1, 1))
            batch_size = batch_size * beam_size

        decoder_input = torch.zeros(1, batch_size, self.hidden_dim * (2 if self.bidirectional else 1), device=input_seq.device)
        decoder_outputs = []

        hypotheses = torch.zeros(batch_size, beam_size, seq_length, device=input_seq.device).long()
        scores = torch.zeros(batch_size, beam_size, device=input_seq.device)

        for t in range(seq_length):
            output, hidden = self.decoder(decoder_input, hidden)
            decoder_output = self.fc(output.squeeze(0))
            decoder_outputs.append(decoder_output)

            if beam_size > 1:
                decoder_output = decoder_output.view(batch_size, beam_size, -1)
                log_probs = torch.log_softmax(decoder_output, dim=-1)
                scores = scores.unsqueeze(-1) + log_probs
                scores, indices = scores.view(batch_size, -1).topk(beam_size, dim=-1)
                hypotheses[:, :, t] = indices % self.output_size
                decoder_input = hypotheses[:, :, t].view(1, batch_size * beam_size, -1)
            else:
                decoder_input = output

        if beam_size > 1:
            decoder_outputs = [output.view(batch_size, beam_size, -1) for output in decoder_outputs]
            decoder_outputs = torch.stack(decoder_outputs, dim=2)
            decoder_outputs = decoder_outputs.transpose(1, 2)
        else:
            decoder_outputs = torch.stack(decoder_outputs)

        return decoder_outputs

In [None]:
# Define a custom dataset class
class TransliterationDataset(Dataset):
    def __init__(self, csv_file):
        self.data = pd.read_csv(csv_file)

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        src_word = self.data.iloc[idx, 0]
        target_word = self.data.iloc[idx, 1]

        # Convert words to tensors (you might need additional preprocessing)
        # Example: Convert words to indices using some kind of vocabulary mapping
        # For simplicity, let's assume the words are already converted to indices
        src_tensor = torch.tensor(src_word)
        target_tensor = torch.tensor(target_word)

        return src_tensor, target_tensor

In [None]:
# Load data from CSV files
train_data = TransliterationDataset("tam_train.csv")
val_data = TransliterationDataset("tam_valid.csv")
test_data = TransliterationDataset("tam_test.csv")

# Create DataLoaders
batch_size = 32
train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_data, batch_size=batch_size)
test_loader = DataLoader(test_data, batch_size=batch_size)

In [None]:
input_size = 256  # Size of the input character vocabulary
output_size = 256  # Size of the output character vocabulary
embedding_dim = 128  # Dimension of the character embeddings
hidden_dim = 512  # Dimension of the hidden state in the encoder and decoder RNNs
num_layers = 2  # Number of layers in the encoder and decoder RNNs
cell_type = 'lstm'  # Type of RNN cell ('rnn', 'lstm', or 'gru')
bidirectional = True  # Whether the encoder should be bidirectional
dropout = 0.2  # Dropout rate

In [None]:
# Instantiate the model
model = Seq2SeqModel(input_size, output_size, embedding_dim, hidden_dim, num_layers, cell_type, bidirectional, dropout)

# Define the loss function and optimizer
criterion = nn.CrossEntropyLoss(ignore_index=0)  # Assuming 0 is the padding index
optimizer = optim.Adam(model.parameters())

In [None]:
num_epochs = 10

In [None]:
# Training loop
for epoch in range(num_epochs):
    model.train()
    epoch_loss = 0.0

    for inputs, targets in train_loader:
        print(inputs)
        print(targets)
        inputs = inputs.to(device)
        targets = targets.to(device)

        optimizer.zero_grad()

        # Forward pass
        outputs = model(inputs)
        outputs = outputs.view(-1, outputs.size(-1))
        targets = targets.view(-1)

        # Calculate loss
        loss = criterion(outputs, targets)

        # Backward pass and optimization
        loss.backward()
        optimizer.step()

        epoch_loss += loss.item()

    # Validation loop
    model.eval()
    val_loss = 0.0

    with torch.no_grad():
        for inputs, targets in val_loader:
            inputs = inputs.to(device)
            targets = targets.to(device)

            # Forward pass with beam search
            beam_size = 4  # Example beam size
            outputs = model(inputs, beam_size=beam_size)

            # Calculate loss (you may need to modify this based on your evaluation metric)
            outputs = outputs.view(-1, outputs.size(-1))
            targets = targets.view(-1)
            loss = criterion(outputs, targets)

            val_loss += loss.item()

    train_loss = epoch_loss / len(train_loader)
    val_loss = val_loss / len(val_loader)

    print(f'Epoch: {epoch+1}, Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}')

# Save the trained model
torch.save(model.state_dict(), 'model.pth')

TypeError: new(): invalid data type 'str'