## **Import Libraries**

In [228]:
import torch
import csv
import torch.nn as nn
import random
import numpy as np
import torch.optim as optim
import pandas as pd
import matplotlib.pyplot as plt
from torch.utils.data import DataLoader, TensorDataset

## **SET DEVICE (CPU / GPU)**

In [229]:
# Set Device which to use
def set_device():
    device = "cpu"
    if torch.cuda.is_available():
        device = "cuda"
    return device

device = set_device()
print(device)

cpu


## **LOAD DATA**

In [230]:
def load_data(lan = 'hin'):
    train_data_path = "D:\DL_A3\Dataset\\" + lan + f'\{lan}_train.csv'
    test_data_path = "D:\DL_A3\Dataset\\" + lan + f'\{lan}_test.csv'
    val_data_path = "D:\DL_A3\Dataset\\" + lan + f'\{lan}_valid.csv'
    
    train_x = []
    train_y = []
    with open(train_data_path, 'r', encoding='utf-8') as file:
        csv_reader = csv.reader(file)
        # next(csv_reader)  # Skip the header row if it exists
        for row in csv_reader:
            train_x.append(row[0] + '$')
            train_y.append('#' + row[1] + '$')
       
    
    val_x, val_y = [], []
    with open(val_data_path, 'r', encoding='utf-8') as file:
        csv_reader = csv.reader(file)
        # next(csv_reader)  # Skip the header row if it exists
        for row in csv_reader:
            val_x.append(row[0] + '$')
            val_y.append('#' + row[1] + '$')
    
    test_x, test_y = [], []
    with open(test_data_path, 'r', encoding='utf-8') as file:
        csv_reader = csv.reader(file)
        # next(csv_reader)  # Skip the header row if it exists
        for row in csv_reader:
            test_x.append(row[0] + '$')
            test_y.append('#' + row[1] + '$')

    # Conversion in NP Array
    train_x, train_y = np.array(train_x), np.array(train_y)
    val_x, val_y = np.array(val_x), np.array(val_y)
    test_x, test_y = np.array(test_x), np.array(test_y)

    #get size of data
    max_decoder_length_train = np.max(np.array([len(s) for s in train_y]))
    max_decoder_length_val = np.max(np.array([len(s) for s in val_y]))
    max_decoder_length_test = np.max(np.array([len(s) for s in test_y]))
    max_decoder_length = max(max_decoder_length_train, max(max_decoder_length_val, max_decoder_length_test))

    max_encoder_length_train = np.max(np.array([len(s) for s in train_x]))
    max_encoder_length_val = np.max(np.array([len(s) for s in val_x]))
    max_encoder_length_test = np.max(np.array([len(s) for s in test_x]))
    max_encoder_length = max(max_encoder_length_train, max(max_encoder_length_val, max_encoder_length_test))


    print(max_decoder_length)
    print(max_encoder_length)

    

# Find the maximum length
    return_res = {
        "train_x" : train_x,
        "train_y" : train_y,
        "val_x" : val_x,
        "val_y" : val_y,
        "test_x" : test_x,
        "test_y" : test_y,
        "max_decoder_length" : max_decoder_length,
        "max_encoder_length" : max_encoder_length,
    }

    return return_res


res = load_data("hin")
tx = res["train_x"]
ty = res["train_y"]
vx = res["val_x"]
vy = res["val_y"]
tex = res["test_x"]
tey = res["test_y"]
max_decoder_length = res["max_decoder_length"]
max_encoder_length = res["max_encoder_length"]
train_len = tx.shape[0]
val_len = vx.shape[0]


22
27
51200
4096


In [231]:
def create_corpus(train_x, train_y, val_x, val_y, test_x, test_y):

    english_vocab = "#$abcdefghijklmnopqrstuvwxyz"
    input_corpus_dict = {}
    input_corpus_dict[''] = 0
    index = 1
    for characters in english_vocab:
        input_corpus_dict[characters] = index
        index += 1
    
    output_vocab = set()
    for word in train_y:
        for characters in word:
            output_vocab.add(characters)
    for word in val_y:
        for characters in word:
            output_vocab.add(characters)
    for word in test_y:
        for characters in word:
            output_vocab.add(characters)
    output_vocab.add('')
    sorted_output_vocab = sorted(output_vocab)

    output_corpus_dict = {}
    index = 0
    for characters in sorted_output_vocab:
        output_corpus_dict[characters] = index
        index += 1


    

    


    return_dict = {
        "input_corpus_length" : len(english_vocab),
        "output_corpus_length" : len(output_vocab),
        "input_corpus_dict" : input_corpus_dict,
        "output_corpus_dict" : output_corpus_dict,
    }
    
    return return_dict

res_dict = create_corpus(tx, ty, vx, vy, tex, tey)
print(res_dict["input_corpus_length"])
print(res_dict["output_corpus_length"])
print(res_dict["input_corpus_dict"])
print(res_dict["output_corpus_dict"])

input_corpus_length = res_dict["input_corpus_length"]
output_corpus_length = res_dict["output_corpus_length"]
output_corpus_dict = res_dict["output_corpus_dict"]
input_corpus_dict = res_dict["input_corpus_dict"]
        

28
68
{'': 0, '#': 1, '$': 2, 'a': 3, 'b': 4, 'c': 5, 'd': 6, 'e': 7, 'f': 8, 'g': 9, 'h': 10, 'i': 11, 'j': 12, 'k': 13, 'l': 14, 'm': 15, 'n': 16, 'o': 17, 'p': 18, 'q': 19, 'r': 20, 's': 21, 't': 22, 'u': 23, 'v': 24, 'w': 25, 'x': 26, 'y': 27, 'z': 28}
{'': 0, '#': 1, '$': 2, 'ँ': 3, 'ं': 4, 'ः': 5, 'अ': 6, 'आ': 7, 'इ': 8, 'ई': 9, 'उ': 10, 'ऊ': 11, 'ऋ': 12, 'ए': 13, 'ऐ': 14, 'ऑ': 15, 'ओ': 16, 'औ': 17, 'क': 18, 'ख': 19, 'ग': 20, 'घ': 21, 'ङ': 22, 'च': 23, 'छ': 24, 'ज': 25, 'झ': 26, 'ञ': 27, 'ट': 28, 'ठ': 29, 'ड': 30, 'ढ': 31, 'ण': 32, 'त': 33, 'थ': 34, 'द': 35, 'ध': 36, 'न': 37, 'प': 38, 'फ': 39, 'ब': 40, 'भ': 41, 'म': 42, 'य': 43, 'र': 44, 'ल': 45, 'ळ': 46, 'व': 47, 'श': 48, 'ष': 49, 'स': 50, 'ह': 51, '़': 52, 'ऽ': 53, 'ा': 54, 'ि': 55, 'ी': 56, 'ु': 57, 'ू': 58, 'ृ': 59, 'ॅ': 60, 'े': 61, 'ै': 62, 'ॉ': 63, 'ॊ': 64, 'ो': 65, 'ौ': 66, '्': 67}


In [232]:
def create_tensor(tx, ty, vx, vy, res, dict):
    max_len = max(res["max_encoder_length"], res["max_decoder_length"])
    train_input = np.zeros((max_len, train_len), dtype = 'int64')
    train_output = np.zeros((max_len, train_len), dtype = 'int64')
    val_input = np.zeros((max_len, val_len), dtype = 'int64')
    val_output = np.zeros((max_len, val_len), dtype = 'int64')

    print(dict["input_corpus_dict"])
    print(dict["output_corpus_dict"])
    word_count = 0
    for words in tx:
        index = 0
        for chars in words:
            # print(chars)
            train_input[index, word_count] = dict["input_corpus_dict"][chars]
            index += 1
        word_count += 1

    word_count = 0
    for words in ty:
        index = 0
        for chars in words:
            # print(chars)
            train_output[index, word_count] = dict["output_corpus_dict"][chars]
            index += 1
        word_count += 1

    word_count = 0
    for words in vx:
        index = 0
        for chars in words:
            # print(chars)
            val_input[index, word_count] = dict["input_corpus_dict"][chars]
            index += 1
        word_count += 1

    word_count = 0
    for words in vy:
        index = 0
        for chars in words:
            # print(chars)
            val_output[index, word_count] = dict["output_corpus_dict"][chars]
            index += 1
        word_count += 1


    # Convert in tensor
    train_input = torch.tensor(train_input)
    train_output = torch.tensor(train_output)
    val_input = torch.tensor(val_input)
    val_output = torch.tensor(val_output)

    return_dict = {
        "train_input" : train_input,
        "train_output" : train_output,
        "val_input" : val_input,
        "val_output" : val_output,
    }
    return return_dict

In [233]:
tensors = create_tensor(tx, ty, vx, vy, res, res_dict)
train_input = tensors["train_input"].T
train_output = tensors["train_output"].T
# print(train_output.shape)

{'': 0, '#': 1, '$': 2, 'a': 3, 'b': 4, 'c': 5, 'd': 6, 'e': 7, 'f': 8, 'g': 9, 'h': 10, 'i': 11, 'j': 12, 'k': 13, 'l': 14, 'm': 15, 'n': 16, 'o': 17, 'p': 18, 'q': 19, 'r': 20, 's': 21, 't': 22, 'u': 23, 'v': 24, 'w': 25, 'x': 26, 'y': 27, 'z': 28}
{'': 0, '#': 1, '$': 2, 'ँ': 3, 'ं': 4, 'ः': 5, 'अ': 6, 'आ': 7, 'इ': 8, 'ई': 9, 'उ': 10, 'ऊ': 11, 'ऋ': 12, 'ए': 13, 'ऐ': 14, 'ऑ': 15, 'ओ': 16, 'औ': 17, 'क': 18, 'ख': 19, 'ग': 20, 'घ': 21, 'ङ': 22, 'च': 23, 'छ': 24, 'ज': 25, 'झ': 26, 'ञ': 27, 'ट': 28, 'ठ': 29, 'ड': 30, 'ढ': 31, 'ण': 32, 'त': 33, 'थ': 34, 'द': 35, 'ध': 36, 'न': 37, 'प': 38, 'फ': 39, 'ब': 40, 'भ': 41, 'म': 42, 'य': 43, 'र': 44, 'ल': 45, 'ळ': 46, 'व': 47, 'श': 48, 'ष': 49, 'स': 50, 'ह': 51, '़': 52, 'ऽ': 53, 'ा': 54, 'ि': 55, 'ी': 56, 'ु': 57, 'ू': 58, 'ृ': 59, 'ॅ': 60, 'े': 61, 'ै': 62, 'ॉ': 63, 'ॊ': 64, 'ो': 65, 'ौ': 66, '्': 67}
torch.Size([51200, 27])


## **RNN**

In [234]:
class Encoder(nn.Module):
    def __init__(self, PARAM):
        super(Encoder, self).__init__()
        self.input_size = PARAM["input_size"]
        self.embedding_size = PARAM["embedding_size"]
        self.hidden_size = PARAM["hidden_size"]
        self.output_size = PARAM["output_size"]
        self.num_layers = PARAM["num_layers"]
        self.drop_prob = PARAM["drop_prob"]
        self.cell_name = PARAM["cell_name"]

        self.dropout = nn.Dropout(self.drop_prob)
        self.embedding = nn.Embedding(self.input_size, self.embedding_size)
        
        if self.cell_name == "RNN":
            self.cell = nn.RNN(self.embedding_size, self.hidden_size, self.num_layers, dropout = self.drop_prob)
        elif self.cell_name == "LSTM":
            self.cell = nn.LSTM(self.embedding_size, self.hidden_size, self.num_layers, dropout = self.drop_prob)
        elif self.cell_name == "GRU":
            self.cell = nn.GRU(self.embedding_size, self.hidden_size, self.num_layers, dropout = self.drop_prob)
            
    def forward(self, x):
        embedding = self.embedding(x)
        drops = self.dropout(embedding)
        outputs, hidden = self.cell(drops)
        return hidden
    
class Decoder(nn.Module):
    def __init__(self, PARAM):
        super(Decoder, self).__init__()
        self.input_size = PARAM["input_size"]
        self.embedding_size = PARAM["embedding_size"]
        self.hidden_size = PARAM["hidden_size"]
        self.output_size = PARAM["output_size"]
        self.num_layers = PARAM["num_layers"]
        self.drop_prob = PARAM["drop_prob"]
        self.cell_name = PARAM["cell_name"]
        self.dropout = nn.Dropout(self.drop_prob)
        self.embedding = nn.Embedding(self.input_size, self.embedding_size)

        if self.cell_name == "RNN":
            self.cell = nn.RNN(self.embedding_size, self.hidden_size, self.num_layers, dropout = self.drop_prob)
        elif self.cell_name == "LSTM":
            self.cell = nn.LSTM(self.embedding_size, self.hidden_size, self.num_layers, dropout = self.drop_prob)
        elif self.cell_name == "GRU":
            self.cell = nn.GRU(self.embedding_size, self.hidden_size, self.num_layers, dropout = self.drop_prob)
        self.fc = nn.Linear(self.hidden_size, self.output_size)
    
    def forward(self, x, hidden):
        x = x.unsqueeze(0)
        # print("In a decoder   ", x.shape)
        # print(self.embedding(x))
        drops = self.dropout(self.embedding(x))
        outputs, hidden = self.cell(drops, hidden)
        predictions = self.fc(outputs).squeeze(0)

        return predictions, hidden

class SeqToSeq(nn.Module):
    def __init__(self, encoder, decoder):
        super(SeqToSeq, self).__init__()
        self.encoder = encoder
        self.decoder = decoder

    def forward(self, src, target, teacher_forcing_ratio = 0.5):
        batch_size = src.shape[1]
        target_len = target.shape[0]

        target_vocab_size = len(output_corpus_dict)
        
        outputs = torch.zeros(target_len, batch_size, target_vocab_size).to(device)
        
        encoder_hidden = self.encoder(src)
        # print("encoder_hidden", len(encoder_hidden))
        decoder_input = target[0, :]  # <SOS> token
        # print("decoder_input :", decoder_input)
        
    
        for t in range(1, target_len):
            output, encoder_hidden = self.decoder(decoder_input, encoder_hidden)
            outputs[t] = output
            teacher_force = random.random() < teacher_forcing_ratio
            top1 = output.argmax(1)
            decoder_input = target[t] if teacher_force else top1
        
        return outputs
        


In [236]:
PARAM_Encoder = {
    "input_size" : len(input_corpus_dict),
    "embedding_size" : 256,
    "hidden_size" : 512,
    "output_size" : len(input_corpus_dict),
    "num_layers" : 2,
    "drop_prob" : 0.1,
    "cell_name" : "LSTM"
}

PARAM_Decoder = {
    "input_size" : len(output_corpus_dict),
    "embedding_size" : 256,
    "hidden_size" : 512,
    "output_size" : len(output_corpus_dict),
    "num_layers" : 2,
    "drop_prob" : 0.1,
    "cell_name" : "LSTM"
}


eta = 0.01
batch_size = 32
epochs = 10


encoder = Encoder(PARAM_Encoder).to(device)
decoder = Decoder(PARAM_Decoder).to(device)

model = SeqToSeq(encoder, decoder).to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=eta)



# print(train_input.shape, train_output.shape)
train_data = TensorDataset(train_input, train_output)
train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True)

# print(len(train_data), len(train_loader))

first_batch = next(iter(train_loader))

# Get the size and shape of the first batch
size_first_batch = len(first_batch)
shape_first_batch = tuple(item.shape for item in first_batch)

print("Size of the first batch:", size_first_batch)
print("Shape of each item in the first batch:", shape_first_batch)

for epoch in range(epochs):
    model.train()
    total_loss = 0
    correct_pred = 0
    total_pred = 0

    for source, target in(train_loader):
        source = source.to(device)
        target = target.to(device)
        optimizer.zero_grad()

        output = model(source, target)
        output_dim = output.shape[-1]
        output = output[1:].view(-1, output_dim)
        target = target[1:].view(-1)
    
        loss = criterion(output, target)
    
        loss.backward()
        optimizer.step()
    
        total_loss += loss.item()
    
        # Calculate accuracy
        predicted = output.argmax(dim=1)
        correct_pred += (predicted == target).sum().item()
        total_pred += target.numel()
    
    epoch_loss = total_loss / len(train_loader)
    epoch_accuracy = correct_pred / total_pred

    print(f"Epoch {epoch + 1}/{epochs}, Loss: {epoch_loss:.4f}, Accuracy: {epoch_accuracy:.4f}")





torch.Size([51200, 27]) torch.Size([51200, 27])
51200 1600
Size of the first batch: 2
Shape of each item in the first batch: (torch.Size([32, 27]), torch.Size([32, 27]))
encoder_hidden 2
decoder_input : tensor([ 1, 40, 61, 20, 56, 38, 65, 44, 54,  2,  0,  0,  0,  0,  0,  0,  0,  0,
         0,  0,  0,  0,  0,  0,  0,  0,  0])
In a decoder    torch.Size([1, 27])
tensor([[[ 0.3806,  0.8761, -1.1138,  ..., -0.8168,  0.7457,  1.0330],
         [-0.4402, -0.3560, -2.2350,  ..., -2.4741, -1.4690, -0.7216],
         [ 0.1628, -0.2817, -1.1511,  ...,  0.0616,  0.6821,  0.5406],
         ...,
         [-1.2739, -1.6960,  0.4608,  ...,  0.8871, -0.0473, -0.7064],
         [-1.2739, -1.6960,  0.4608,  ...,  0.8871, -0.0473, -0.7064],
         [-1.2739, -1.6960,  0.4608,  ...,  0.8871, -0.0473, -0.7064]]],
       grad_fn=<EmbeddingBackward0>)
In a decoder    torch.Size([1, 27])
tensor([[[ 0.3806,  0.8761, -1.1138,  ..., -0.8168,  0.7457,  1.0330],
         [ 0.2379,  0.8244,  1.4402,  ..., -0.4450

KeyboardInterrupt: 