### Source-
https://github.com/Alejandro-Garcia-Uceda/Translators-with-RNNs/blob/main/Basic%20bidirectional%20LSTM%20translator%20from%20English%20to%20French.ipynb
https://pytorch.org/tutorials/intermediate/seq2seq_translation_tutorial.html

In [1]:
import pandas as pd
import indicnlp
import indicnlp.tokenize
from indicnlp.tokenize.indic_tokenize import trivial_tokenize_indic
# from indicnlp.tokenize.
from indicnlp.normalize.indic_normalize import IndicNormalizerFactory
from indicnlp.transliterate.unicode_transliterate import UnicodeIndicTransliterator
import unicodedata
import re
from io import open


In [2]:
import torch
import torch.nn as nn
from torch import optim
import torch.nn.functional as F

In [3]:
from tqdm.auto import tqdm


In [4]:
from torch.utils.data import TensorDataset, DataLoader, RandomSampler

In [5]:
device = torch.device("cpu")

In [6]:
import os
# os.environ['CUDA_LAUNCH_BLOCKING'] = "1"
# os.environ['TORCH_USE_CUDA_DSA'] = '1'

## Tokenisation

In [7]:
class Lang:
    def __init__(self,lang):
        self.lang=lang
        self.word2index={}
        self.word2count={}
        self.index2word={1:"SOS",2:"EOS"}
        self.n_words=2
    
    def unicodeToAscii(self,s):
        return ''.join(
            c for c in unicodedata.normalize('NFD', s)
            if unicodedata.category(c) != 'Mn'
        )

    def normalizeString(self,s):
        s = self.unicodeToAscii(s.lower().strip())
        s = re.sub(r"([.!?,])", r" \1 ", s)
        s = re.sub(r"[^a-zA-Z!?]+", " ", s)
        return s.strip()

    
    def normal(self,text):
        if self.lang=="ta":
            text=re.sub(r'([.!?,])', r' \1 ', text)
            # text = re.sub(r'[]',r'\1', text)
            norm=IndicNormalizerFactory()
            n=norm.get_normalizer('ta')
            string=n.normalize(text)
            string.strip()
        else:
            string=self.normalizeString(text)
            
        return string.strip()
    
    def token(self,text):
        if self.lang=="ta":
            # print(self.lang)
            text=self.normal(text)
            words=trivial_tokenize_indic(text)

        else:
            # print(self.lang)
            text=self.normal(text)
            words=[word for word in text.split(' ')]

        for word in words:
            if word not in self.word2index:
                self.word2index[word]=self.n_words
                self.word2count[word]=1
                self.index2word[self.n_words]=word
                self.n_words+=1

            else:
                self.word2count[word]+=1
                    

 
                   
                
    
        
    

In [8]:
def readData():
    data=pd.read_csv("D:\\Language Translation\\first_attempt.ipynb\\data\\ta-en.txt",delimiter="\t",header=None,on_bad_lines='skip')
    tamil_text=[ i for i in data.iloc[:,0] ]
    eng_text=[ i for i in data.iloc[:,1] ]
    count=0

    
    tamil_vocabs=Lang('ta')
    eng_vocabs=Lang('eng')
    for i in tamil_text:
        tamil_vocabs.token(i)
    
    for i in eng_text:
        eng_vocabs.token(i)
    return tamil_vocabs,eng_vocabs,tamil_text,eng_text
    
tamil,eng,tamil_text,eng_text=readData()

In [9]:
len(eng.word2index)

732

## Preparaing Data

In [10]:
def indexesFromSentence(lang, sentence):
    sentence=lang.normal(sentence)
    return [lang.word2index[word] for word in sentence.split()]

In [11]:
test="நீ எங்கே இருக்கிறாய்?"
test1="Where are you?"
test_res=indexesFromSentence(tamil,test)
test1_res=indexesFromSentence(eng,test1)

In [12]:
print(test_res,"\n",test1_res)

[21, 19, 22, 17] 
 [17, 18, 4, 16]


In [13]:
SOS_token = 1
EOS_token = 2
MAX_LENGTH = 10

def tensorFromSentence(lang, sentence):
    indexes = indexesFromSentence(lang, sentence)
    indexes.append(EOS_token)
    return indexes

In [14]:
test2_res=tensorFromSentence(tamil,test)
test3_res=tensorFromSentence(eng,test1)

In [15]:
print(test2_res,"\n",test3_res)

[21, 19, 22, 17, 2] 
 [17, 18, 4, 16, 2]


In [16]:
def tensorsFromPair(sen1,sen2):
    input_tensor = tensorFromSentence(tamil, sen1)
    target_tensor = tensorFromSentence(eng, sen2)
    return input_tensor, target_tensor

In [17]:
def pad_tensors(tensors,num):
    """
    Takes a list of `N` M-dimensional tensors (M<4) and returns a padded tensor.

    The padded tensor is `M+1` dimensional with size `N, S1, S2, ..., SM`
    where `Si` is the maximum value of dimension `i` amongst all tensors.
    """
    rep = tensors[0]
    padded_dim = []
    for dim in range(rep.dim()):
        max_dim = max([tensor.size(dim) for tensor in tensors])
        padded_dim.append(num)
    padded_dim = [len(tensors)] + padded_dim
    padded_tensor = torch.zeros(padded_dim)
    padded_tensor = padded_tensor.type_as(rep)
    for i, tensor in enumerate(tensors):
        size = list(tensor.size())
        if len(size) == 1:
            padded_tensor[i, :size[0]] = tensor
        elif len(size) == 2:
            padded_tensor[i, :size[0], :size[1]] = tensor
        elif len(size) == 3:
            padded_tensor[i, :size[0], :size[1], :size[2]] = tensor
        else:
             raise ValueError('Padding is supported for upto 3D tensors at max.')
    return padded_tensor

In [18]:
def ints_to_tensor(ints,num=28):
    """
    Converts a nested list of integers to a padded tensor.
    """
    if isinstance(ints, torch.Tensor):
        return ints
    if isinstance(ints, list):
        if isinstance(ints[0], int):
            return torch.LongTensor(ints)
        if isinstance(ints[0], torch.Tensor):
            return pad_tensors(ints,num)
        if isinstance(ints[0], list):
            return ints_to_tensor([ints_to_tensor(inti) for inti in ints])

In [19]:
def dataPreparation():
    train=[]
    input_tensors=[]
    label_tensors=[]
    for i, (x, y) in enumerate(zip(tamil_text, eng_text)):
        i,o=tensorsFromPair(x,y)
        input_tensors.append(i)
        label_tensors.append(o)
    
    input_tensor=ints_to_tensor(input_tensors,num=28)
    output_tensor=ints_to_tensor(label_tensors,num=28)
    train_data=TensorDataset(input_tensor.to(device),output_tensor.to(device))
    train_sampler = RandomSampler(train_data)
    train_dataloader = DataLoader(train_data, sampler=train_sampler)
    
    return train_data,train_dataloader,input_tensor,output_tensor
    
    
train_data,train_dataloader,x,y=dataPreparation()

## Training

### LSTM

In [127]:
class RNN(nn.Module):

    def __init__(self, en_dic =tamil.n_words, fr_dic=eng.n_words, device='cpu'):
        super(RNN, self).__init__()
        
        self.device = device
        
        self.emb = nn.Embedding(en_dic, 250,padding_idx=0)            
        
        self.LSTM1 = nn.LSTM(250, 250, num_layers=1, batch_first=True, dropout=0.5)
        
        self.Dense1 = nn.Linear(250, fr_dic*5)
        self.Dense2 = nn.Linear(fr_dic*5, fr_dic)
        
        self.batch_norm1 = nn.BatchNorm1d(23)
        self.batch_norm2 = nn.BatchNorm1d(23)
        
        self.drop = nn.Dropout(p=0.5)
        
    def forward(self, x):
        
        self.batch_size = x.shape[0]
        self.hidden = ( torch.zeros(1, self.batch_size, 250).to(self.device), 
                       torch.zeros(1, self.batch_size, 250).to(self.device) )
        
        x = self.emb(x)
        out, self.hidden = self.LSTM1(x, self.hidden)
        out = F.relu(out) 
        out = self.Dense1(out)
        out = F.relu(out)
        out = self.drop(out)
        out = self.Dense2(out)
        
        return(out)
    
print(RNN())

RNN(
  (emb): Embedding(1091, 250, padding_idx=0)
  (LSTM1): LSTM(250, 250, batch_first=True, dropout=0.5)
  (Dense1): Linear(in_features=250, out_features=3670, bias=True)
  (Dense2): Linear(in_features=3670, out_features=734, bias=True)
  (batch_norm1): BatchNorm1d(23, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (batch_norm2): BatchNorm1d(23, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (drop): Dropout(p=0.5, inplace=False)
)


In [128]:
batch_size = 20
epochs = 10
# device = 'cuda'
device = 'cpu'
display_step = 100
rnn = RNN().to(device)

criterion = nn.NLLLoss()

optimizer = torch.optim.Adam(rnn.parameters(), lr=0.001, betas=(0.9, 0.999))

tr_ds = DataLoader(train_data, batch_size=batch_size, shuffle=True)

In [129]:
h_loss = []
h_loss_it = []
val_loss = []
val_loss_it = []
avg_loss = 0
count = 0
T_it = 0
best_model = False

it = 0

for epoch in range(epochs):
    
    for ta, en in tqdm(tr_ds):
        it += 1
        T_it += 1
        
        ta = ta.to(device)
        en = en.to(device)
        optimizer.zero_grad()
        pred = rnn(ta)
        loss = criterion(pred.view(-1,eng.n_words), en.view(-1)) 
        loss.backward()
        optimizer.step()
        
        count += pred.shape[0]
        avg_loss += loss.item() * pred.shape[0]
        

  0%|          | 0/27 [00:00<?, ?it/s]

  0%|          | 0/27 [00:00<?, ?it/s]

  0%|          | 0/27 [00:00<?, ?it/s]

  0%|          | 0/27 [00:00<?, ?it/s]

  0%|          | 0/27 [00:00<?, ?it/s]

  0%|          | 0/27 [00:00<?, ?it/s]

  0%|          | 0/27 [00:00<?, ?it/s]

  0%|          | 0/27 [00:00<?, ?it/s]

  0%|          | 0/27 [00:00<?, ?it/s]

  0%|          | 0/27 [00:00<?, ?it/s]

In [130]:
torch.save(rnn.state_dict(),"Models/LSTM.pth")

### GRU ###


In [132]:
class GRU(nn.Module):
    def __init__(self,t_dic=tamil.n_words,e_dic=eng.n_words,device='cpu'):
        super(GRU,self).__init__()
        self.device=device
        self.emb=nn.Embedding(t_dic,64)
        self.GRU1=nn.GRU(64,64,num_layers=1,batch_first=True)
        self.dense1=nn.Linear(64,3*e_dic)
        self.dense2 = nn.Linear(e_dic*3, e_dic)

    def forward(self,x):
        self.batch_size=x.shape[0]
        self.hidden = ( torch.zeros(1, self.batch_size, 64).to(self.device) )
        x=self.emb(x)
        out,self.hidden=self.GRU1(x,self.hidden)
        out=F.relu(out)
        out=self.dense1(out)
        out=F.relu(out)
        out=self.dense2(out)
        return (out)

print(GRU())

GRU(
  (emb): Embedding(1091, 64)
  (GRU1): GRU(64, 64, batch_first=True)
  (dense1): Linear(in_features=64, out_features=2202, bias=True)
  (dense2): Linear(in_features=2202, out_features=734, bias=True)
)


In [134]:
batch_size = 20
epochs = 10
device = 'cpu'
display_step = 100
gru = GRU().to(device)
criterion = nn.NLLLoss()
optimizer = torch.optim.Adam(gru.parameters(), lr=0.001, betas=(0.9, 0.999))
tr_ds = DataLoader(train_data, batch_size=batch_size, shuffle=True)

In [135]:
avg_loss = 0
count = 0
T_it = 0
it = 0

for epoch in range(epochs):
    
    for ta, en in tqdm(tr_ds):
        it += 1
        T_it += 1
        
        ta = ta.to(device)
        en = en.to(device)
        optimizer.zero_grad()
        pred = gru(ta)
        loss = criterion(pred.view(-1,eng.n_words), en.view(-1)) 
        loss.backward()
        optimizer.step()
        
        count += pred.shape[0]
        avg_loss += loss.item() * pred.shape[0]
        

  0%|          | 0/27 [00:00<?, ?it/s]

  0%|          | 0/27 [00:00<?, ?it/s]

  0%|          | 0/27 [00:00<?, ?it/s]

  0%|          | 0/27 [00:00<?, ?it/s]

  0%|          | 0/27 [00:00<?, ?it/s]

  0%|          | 0/27 [00:00<?, ?it/s]

  0%|          | 0/27 [00:00<?, ?it/s]

  0%|          | 0/27 [00:00<?, ?it/s]

  0%|          | 0/27 [00:00<?, ?it/s]

  0%|          | 0/27 [00:00<?, ?it/s]

In [136]:
torch.save(rnn.state_dict(),"Models/GRU.pth")