In [53]:
import torch
import numpy as np

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [1]:
class Script:
    def __init__(self, script_name):
        self.script_name = script_name
        self.char2idx = {}
        self.inx2char = {}
        self.vocab_size = 0

    def create_vocab(self, char_list):
        for i, char in enumerate(char_list):
            self.char2idx[char] = i
            self.inx2char[i] = char
        self.vocab_size = len(char_list)
    
    def add_char(self, char):
        if char not in self.char2idx:
            self.char2idx[char] = self.vocab_size
            self.inx2char[self.vocab_size] = char
            self.vocab_size += 1
        else:
            print("Character already exists in the script")



In [8]:
import os
dataset_name = "aksharantar_sampled"
languages_dataset = os.listdir(dataset_name)
print(languages_dataset)

['asm', 'ben', 'brx', 'guj', 'hin', 'kan', 'kas', 'kok', 'mai', 'mal', 'mar', 'mni', 'ori', 'pan', 'san', 'sid', 'tam', 'tel', 'urd']


In [57]:
MAX_LENGTH=10
language = 'kan'
START='<'
END='>'
def load_dataset_csv(path):
    X, y = [], []
    with open(path, 'r', encoding='UTF-8') as f:
        for line in f:
            line = line.strip().split(',')
            X.append(f'{START}{line[0]}{END}')
            y.append(f'{START}{line[1]}{END}')
    
    return X, y

list_files = os.listdir(f'{dataset_name}/{language}')
path = f'{dataset_name}/{language}'
X_test, y_test = load_dataset_csv(f'{path}/{list_files[0]}')


29

In [63]:
MAX_LENGTH = max([len(x) for x in X_test] + [len(y) for y in y_test])

unique_chars = set()
[unique_chars.update(list(x)) for x in y_test]
unique_chars = list(unique_chars)
unique_chars.sort()

local_script = Script(language)
local_script.create_vocab(unique_chars)
local_script.inx2char

unique_chars = set()
[unique_chars.update(list(x)) for x in X_test]
unique_chars = list(unique_chars)
unique_chars.sort()

latin_script = Script('latin')
latin_script.create_vocab(unique_chars)
# latin_script.inx2char


In [50]:
transliter_pairs = list(zip(X_test, y_test))

In [65]:
def get_dataloader(transliter_pairs, latin_script, local_script, batch_size=32):
    n = len(transliter_pairs)
    input_ids = np.zeros((n, MAX_LENGTH), dtype=int)
    output_ids = np.zeros((n, MAX_LENGTH), dtype=int)


    for idx, (latin, local) in enumerate(transliter_pairs):
        inp_ids = [latin_script.char2idx[c] for c in latin]
        out_ids = [local_script.char2idx[c] for c in local]
        input_ids[idx, :len(inp_ids)] = inp_ids
        output_ids[idx, :len(out_ids)] = out_ids

    
    

    dataset = torch.utils.data.TensorDataset(torch.LongTensor(input_ids).to(device),
                               torch.LongTensor(output_ids).to(device))
    sampler = torch.utils.data.RandomSampler(dataset)
    dataloader = torch.utils.data.DataLoader(dataset, sampler=sampler, batch_size=batch_size)
    return dataloader

In [67]:
dataloader = get_dataloader(transliter_pairs, latin_script, local_script, batch_size=32)

In [42]:
class Encoder(torch.nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, dropout):
        super(Encoder, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.dropout = torch.nn.Dropout(dropout)
        self.embedding = torch.nn.Embedding(input_size, hidden_size)
        self.rnn = torch.nn.LSTM(hidden_size, hidden_size, num_layers, dropout=dropout, batch_first=True)
        
    
    def forward(self, x):
        x = self.embedding(x)
        x, (hidden, cell) = self.rnn(x)
        return hidden, cell

In [96]:
class DecoderRNN(torch.nn.Module):
    def __init__(self, hidden_size, output_size):
        super(DecoderRNN, self).__init__()
        self.embedding = torch.nn.Embedding(output_size, hidden_size)
        self.gru = torch.nn.GRU(hidden_size, hidden_size, batch_first=True)
        self.out = torch.nn.Linear(hidden_size, output_size)

    def forward(self, encoder_outputs, encoder_hidden, target_tensor=None):
        batch_size = encoder_outputs.size(0)
        decoder_input = torch.empty(batch_size, 1, dtype=torch.long, device=device).fill_(0)
        decoder_hidden = encoder_hidden
        decoder_outputs = []

        for i in range(MAX_LENGTH):
            decoder_output, decoder_hidden  = self.forward_step(decoder_input, decoder_hidden)
            decoder_outputs.append(decoder_output)

            if target_tensor is not None:
                # Teacher forcing: Feed the target as the next input
                decoder_input = target_tensor[:, i].unsqueeze(1) # Teacher forcing
            else:
                # Without teacher forcing: use its own predictions as the next input
                _, topi = decoder_output.topk(1)
                decoder_input = topi.squeeze(-1).detach()  # detach from history as input

        decoder_outputs = torch.cat(decoder_outputs, dim=1)
        decoder_outputs = torch.nn.functional.log_softmax(decoder_outputs, dim=-1)
        return decoder_outputs, decoder_hidden, None # We return `None` for consistency in the training loop

    def forward_step(self, input, hidden):
        output = self.embedding(input)
        output = torch.nn.functional.relu(output)
        output, hidden = self.gru(output, hidden)
        output = self.out(output)
        return output, hidden

In [97]:
hidden_size = 128
batch_size=32
encoder = Encoder(latin_script.vocab_size, hidden_size, num_layers=1, dropout=0).to(device)
decoder = DecoderRNN(hidden_size, local_script.vocab_size).to(device)



In [98]:
for data in dataloader:
    input_tensor, target_tensor = data

    encoder_outputs, encoder_hidden = encoder(input_tensor)
    decoder_outputs, _, _ = decoder(encoder_outputs, encoder_hidden, target_tensor)

    print(decoder_outputs.shape, encoder_hidden.shape)
    print(decoder_outputs, encoder_hidden)
    break

RuntimeError: Expected hidden size (1, 1, 128), got [1, 32, 128]

In [54]:
def train_epoch(dataloader, encoder, decoder, encoder_optimizer,
          decoder_optimizer, criterion):

    total_loss = 0
    for data in dataloader:
        input_tensor, target_tensor = data

        encoder_optimizer.zero_grad()
        decoder_optimizer.zero_grad()

        encoder_outputs, encoder_hidden = encoder(input_tensor)
        decoder_outputs, _, _ = decoder(encoder_outputs, encoder_hidden, target_tensor)

        loss = criterion(
            decoder_outputs.view(-1, decoder_outputs.size(-1)),
            target_tensor.view(-1)
        )
        loss.backward()

        encoder_optimizer.step()
        decoder_optimizer.step()

        total_loss += loss.item()

    return total_loss / len(dataloader)

device(type='cpu')