<a href="https://colab.research.google.com/github/ch23s020/Assignment3/blob/main/RNN_withAttention.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install wandb

In [None]:
import csv
import gdown
import random
import time
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import wandb
import matplotlib.pyplot as plt
import seaborn as sns
import matplotlib.ticker as ticker

In [None]:
# Define the download URLs for Google Sheets as CSV

train_url = "https://docs.google.com/spreadsheets/d/11duz5Vbqay5TVn_uyglVQVcEZllTbWQt_8zTt2TcBSA/export?format=csv"

valid_url = "https://docs.google.com/spreadsheets/d/1KbKFfxFkMddkZde0r5PWKnQ0vzdh-XihxsMP7XUFDJc/export?format=csv"

test_url = "https://docs.google.com/spreadsheets/d/1ItKDweGPNtzWiF3rs0jzKjh7ZRRkas2hz7yWvbt4yzQ/export?format=csv"

# Paths to save the files

train_output = 'train_data.csv'

valid_output = 'valid_data.csv'

test_output = 'test_data.csv'

# Download the files

gdown.download(train_url, train_output, quiet=False)

gdown.download(valid_url, valid_output, quiet=False)

gdown.download(test_url, test_output, quiet=False)

# Function to load data

def load_data(file_path):

    data = []

    with open(file_path, 'r', encoding='utf-8') as csvfile:

        csvreader = csv.reader(csvfile)

        for idx, row in enumerate(csvreader):

            try:
                x = str(row[0])  # Assuming the first column contains Romanized strings

                y = str(row[1])  # Assuming the second column contains Devanagari strings

                data.append((x, y))

            except IndexError:

                print(f"IndexError in row {idx + 1}: {row}")

    return data

# Load the data

train_data = load_data(train_output)

valid_data = load_data(valid_output)

test_data = load_data(test_output)

# Data Preparation

class TransliterationDataset(Dataset):

    def __init__(self, data, char2index, max_length=20):

        self.data = data

        self.char2index = char2index

        self.max_length = max_length

    def __len__(self):

        return len(self.data)

    def __getitem__(self, idx):

        x, y = self.data[idx]

        x_indices = [self.char2index[c] for c in x] + [self.char2index['<PAD>']] * (self.max_length - len(x))

        y_indices = [self.char2index[c] for c in y] + [self.char2index['<PAD>']] * (self.max_length - len(y))

        return torch.tensor(x_indices), torch.tensor(y_indices), len(x), len(y)

def collate_fn(batch):

    x, y, x_lengths, y_lengths = zip(*batch)

    x = torch.nn.utils.rnn.pad_sequence(x, batch_first=True, padding_value=char2index['<PAD>'])

    y = torch.nn.utils.rnn.pad_sequence(y, batch_first=True, padding_value=char2index['<PAD>'])

    return x, y, x_lengths, y_lengths

# Create character to index mappings

all_chars = sorted(set(''.join([x for x, y in train_data + valid_data + test_data]) + ''.join([y for x, y in train_data + valid_data + test_data])))

char2index = {char: idx for idx, char in enumerate(all_chars)}

char2index['<PAD>'] = len(char2index)

char2index['<SOS>'] = len(char2index) + 1

char2index['<EOS>'] = len(char2index) + 2


In [None]:
# Initialize wandb
sweep_config = {
    "method": "random",
    "parameters": {
        "learning_rate": {"values": [0.001, 0.01, 0.1]},
        "batch_size": {"values": [32]},
        "num_epochs": {"values": [5, 10, 15, 20]},
        "encoder_layers": {"values": [1]},
        "decoder_layers": {"values": [1]},
        "hidden_dim": {"values": [128, 256, 512]},
        "embedding_dim": {"values": [128, 256, 512]},
        "dropout_rate": {"values": [0, 0.1, 0.2]},
        "rnn_cell_type": {"values": ["lstm", "rnn", "gru"]},
        "bidirectional": {"values": [False]},
        "max_length": {"values": [20, 60, 100, 150]},
        "gradient_clip": {"values": [1, 2]},
    }
}
sweep_id = wandb.sweep(sweep_config, project="Assign3_withattention")



# For Best Model Accuracy to get Test Data Uncomment The following Code.


# # Initialize wandb
# sweep_config = {
#     "method": "random",
#     "parameters": {
#         "learning_rate": {"values": [0.001]},
#         "batch_size": {"values": [32]},
#         "num_epochs": {"values": [15]},
#         "encoder_layers": {"values": [2]},
#         "decoder_layers": {"values": [2]},
#         "hidden_dim": {"values": [128]},
#         "embedding_dim": {"values": [128]},
#         "dropout_rate": {"values": [0]},
#         "rnn_cell_type": {"values": ["gru"]},
#         "bidirectional": {"values": [False]},
#         "max_length": {"values": [100]},
#         "gradient_clip": {"values": [1]},
#     }
# }
# sweep_id = wandb.sweep(sweep_config, project="Assign3_withattention")


In [None]:

# Create datasets and dataloaders

train_dataset = TransliterationDataset(train_data, char2index)

valid_dataset = TransliterationDataset(valid_data, char2index)

test_dataset = TransliterationDataset(test_data, char2index)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, collate_fn=collate_fn)

valid_loader = DataLoader(valid_dataset, batch_size=32, collate_fn=collate_fn)

test_loader = DataLoader(test_dataset, batch_size=32, collate_fn=collate_fn)



In [None]:

# Model Components

class EmbeddingLayer(nn.Module):

    def __init__(self, input_dim, embedding_dim):

        super(EmbeddingLayer, self).__init__()

        self.embedding = nn.Embedding(input_dim, embedding_dim)

    def forward(self, x):

        return self.embedding(x)

class Attention(nn.Module):

    def __init__(self, hidden_dim):

        super(Attention, self).__init__()

        self.attn = nn.Linear(hidden_dim * 2, hidden_dim)

        self.v = nn.Parameter(torch.rand(hidden_dim))

    def forward(self, hidden, encoder_outputs):

        batch_size = encoder_outputs.size(0)

        max_len = encoder_outputs.size(1)

        hidden = hidden.unsqueeze(1).repeat(1, max_len, 1)

        energy = torch.tanh(self.attn(torch.cat((hidden, encoder_outputs), 2)))

        energy = energy.permute(0, 2, 1)

        v = self.v.repeat(batch_size, 1).unsqueeze(1)

        attention = torch.bmm(v, energy).squeeze(1)

        return F.softmax(attention, dim=1)

class EncoderRNN(nn.Module):

    def __init__(self, input_dim, embedding_dim, hidden_dim, num_layers, dropout, rnn_type='lstm', bidirectional=False):

        super(EncoderRNN, self).__init__()

        self.embedding = EmbeddingLayer(input_dim, embedding_dim)

        rnn_cls = {'rnn': nn.RNN, 'lstm': nn.LSTM, 'gru': nn.GRU}[rnn_type]

        self.rnn = rnn_cls(embedding_dim, hidden_dim, num_layers, dropout=dropout, batch_first=True, bidirectional=bidirectional)

        self.bidirectional = bidirectional

    def forward(self, x):

        x = self.embedding(x)

        outputs, hidden = self.rnn(x)

        if self.bidirectional:

            if isinstance(hidden, tuple):  # LSTM

                hidden = (self._concat_hidden(hidden[0]), self._concat_hidden(hidden[1]))

            else:  # RNN or GRU

                hidden = self._concat_hidden(hidden)

        return outputs, hidden

    def _concat_hidden(self, hidden):

        return torch.cat((hidden[-2, :, :], hidden[-1, :, :]), dim=1).unsqueeze(0)

class DecoderRNN(nn.Module):

    def __init__(self, output_dim, embedding_dim, hidden_dim, num_layers, dropout, attention, rnn_type='lstm'):

        super(DecoderRNN, self).__init__()

        self.embedding = EmbeddingLayer(output_dim, embedding_dim)

        rnn_cls = {'rnn': nn.RNN, 'lstm': nn.LSTM, 'gru': nn.GRU}[rnn_type]

        self.rnn = rnn_cls(embedding_dim + hidden_dim, hidden_dim, num_layers, dropout=dropout, batch_first=True)

        self.fc = nn.Linear(hidden_dim * 2, output_dim)

        self.attention = attention

    def forward(self, x, hidden, encoder_outputs):

        x = self.embedding(x).unsqueeze(1)

        attn_weights = self.attention(hidden[-1], encoder_outputs)

        attn_weights = attn_weights.unsqueeze(1)

        context = attn_weights.bmm(encoder_outputs)

        rnn_input = torch.cat((x, context), 2)

        outputs, hidden = self.rnn(rnn_input, hidden)

        predictions = self.fc(torch.cat((outputs.squeeze(1), context.squeeze(1)), 1))

        return predictions, hidden, attn_weights.squeeze(1)

class Seq2Seq(nn.Module):

    def __init__(self, encoder, decoder, device):

        super(Seq2Seq, self).__init__()

        self.encoder = encoder

        self.decoder = decoder

        self.device = device

    def forward(self, src, trg, teacher_forcing_ratio=0.5):

        trg_len = trg.size(1)

        batch_size = src.size(0)

        output_dim = self.decoder.fc.out_features

        outputs = torch.zeros(batch_size, trg_len, output_dim).to(self.device)

        encoder_outputs, hidden = self.encoder(src)

        output = trg[:, 0]

        attn_weights_all = []

        for t in range(1, trg_len):

            output, hidden, attn_weights = self.decoder(output, hidden, encoder_outputs)

            outputs[:, t] = output

            top1 = output.argmax(1)

            output = trg[:, t] if random.random() < teacher_forcing_ratio else top1

            attn_weights_all.append(attn_weights.cpu().detach().numpy())

        return outputs, attn_weights_all



In [None]:

# Training functions

def train(model, iterator, optimizer, criterion, clip):

    model.train()

    epoch_loss = 0

    for i, (src, trg, src_len, trg_len) in enumerate(iterator):

        src = src.to(model.device)

        trg = trg.to(model.device)

        optimizer.zero_grad()

        output, _ = model(src, trg)

        output_dim = output.shape[-1]

        output = output[:, 1:].reshape(-1, output_dim)

        trg = trg[:, 1:].reshape(-1)

        loss = criterion(output, trg)

        loss.backward()

        torch.nn.utils.clip_grad_norm_(model.parameters(), clip)

        optimizer.step()

        epoch_loss += loss.item()

    return epoch_loss / len(iterator)

def evaluate(model, iterator, criterion):

    model.eval()

    epoch_loss = 0

    with torch.no_grad():

        for i, (src, trg, src_len, trg_len) in enumerate(iterator):

            src = src.to(model.device)

            trg = trg.to(model.device)

            output, _ = model(src, trg, 0)

            output_dim = output.shape[-1]

            output = output[:, 1:].reshape(-1, output_dim)

            trg = trg[:, 1:].reshape(-1)

            loss = criterion(output, trg)

            epoch_loss += loss.item()

    return epoch_loss / len(iterator)


In [None]:

# def test(model, iterator, criterion, char2index, index2char):

#     model.eval()

#     epoch_loss = 0

#     predictions = []

#     ground_truths = []

#     with torch.no_grad():
#         for i, (src, trg, src_len, trg_len) in enumerate(iterator):

#             src = src.to(model.device)

#             trg = trg.to(model.device)

#             output, attn_weights_all = model(src, trg, 0)

#             output_dim = output.shape[-1]

#             output = output[:, 1:].reshape(-1, output_dim)

#             trg = trg[:, 1:].reshape(-1)

#             loss = criterion(output, trg)

#             epoch_loss += loss.item()

#             preds = output.argmax(1).cpu().numpy()

#             trg_cpu = trg.cpu().numpy()

#             predictions.extend(preds)

#             ground_truths.extend(trg_cpu)

#             # Ensure the attention weights and source sequences align

#             for b in range(len(attn_weights_all)):

#                 fig, ax = plt.subplots(figsize=(10, 8))

#                 sns.heatmap(attn_weights_all[b], ax=ax, cmap='viridis')

#                 ax.set_xlabel('Encoder Steps')

#                 ax.set_ylabel('Decoder Steps')

#                 ax.xaxis.set_major_locator(ticker.MultipleLocator(1))

#                 ax.yaxis.set_major_locator(ticker.MultipleLocator(1)
)
#                 plt.title(f'Attention Heatmap for Sample {i}-{b}')

#                 wandb.log({f"Attention Heatmap {i}-{b}": wandb.Image(fig)})

#                 plt.close(fig)

#     # Log predictions and ground truths to wandb

#     prediction_strings = [''.join([index2char[idx] for idx in pred if idx in index2char]) for pred in predictions]

#     ground_truth_strings = [''.join([index2char[idx] for idx in truth if idx in index2char]) for truth in ground_truths]

#     table = wandb.Table(data=[(pred, truth) for pred, truth in zip(prediction_strings, ground_truth_strings)],
#                         columns=["Prediction", "Ground Truth"])

#     wandb.log({"Predictions vs Ground Truths": table})

#     return epoch_loss / len(iterator)

def test(model, iterator, criterion, char2index, index2char):

    model.eval()

    epoch_loss = 0

    predictions = []

    ground_truths = []

    with torch.no_grad():

        for i, (src, trg, src_len, trg_len) in enumerate(iterator):

            src = src.to(model.device)

            trg = trg.to(model.device)

            output, attn_weights_all = model(src, trg, 0)

            output_dim = output.shape[-1]

            output = output[:, 1:].reshape(-1, output_dim)

            trg = trg[:, 1:].reshape(-1)

            loss = criterion(output, trg)

            epoch_loss += loss.item()

            preds = output.argmax(1).cpu().numpy()

            trg_cpu = trg.cpu().numpy()

            predictions.extend(preds)

            ground_truths.extend(trg_cpu)

            # Ensure the attention weights and source sequences align

            for b in range(src.size(0)):

                fig, ax = plt.subplots(figsize=(10, 8))

                sns.heatmap(attn_weights_all[b][:len(trg_len[b]), :len(src_len[b])], ax=ax, cmap='viridis')

                ax.set_xlabel('Encoder Steps')

                ax.set_ylabel('Decoder Steps')

                ax.xaxis.set_major_locator(ticker.MultipleLocator(1))

                ax.yaxis.set_major_locator(ticker.MultipleLocator(1))

                plt.title(f'Attention Heatmap for Sample {i}-{b}')

                wandb.log({f"Attention Heatmap {i}-{b}": wandb.Image(fig)})

                plt.close(fig)

    # Log predictions and ground truths to wandb

    prediction_strings = [''.join([index2char[idx] for idx in pred if idx in index2char]) for pred in predictions]

    ground_truth_strings = [''.join([index2char[idx] for idx in truth if idx in index2char]) for truth in ground_truths]

    table = wandb.Table(data=[(pred, truth) for pred, truth in zip(prediction_strings, ground_truth_strings)],
                        columns=["Prediction", "Ground Truth"])

    wandb.log({"Predictions vs Ground Truths": table})

    return epoch_loss / len(iterator)



In [None]:

# # Ensure that the run_sweep function includes wandb logging and attention heatmap generation

# def run_sweep(config=None):

#     with wandb.init(config=config):

#         config = wandb.config

#         input_dim = len(char2index)

#         output_dim = len(char2index)

#         embedding_dim = config.embedding_dim

#         hidden_dim = config.hidden_dim

#         num_layers = config.encoder_layers

#         dropout = config.dropout_rate

#         rnn_type = config.rnn_cell_type

#         bidirectional = config.bidirectional

#         max_length = config.max_length

#         attention = Attention(hidden_dim)

#         encoder = EncoderRNN(input_dim, embedding_dim, hidden_dim, num_layers, dropout, rnn_type, bidirectional)

#         decoder = DecoderRNN(output_dim, embedding_dim, hidden_dim, config.decoder_layers, dropout, attention, rnn_type)

#         model = Seq2Seq(encoder, decoder, device).to(device)

#         optimizer = optim.Adam(model.parameters(), lr=config.learning_rate)

#         criterion = nn.CrossEntropyLoss(ignore_index=char2index['<PAD>'])

#         best_valid_loss = float('inf')

#         for epoch in range(config.num_epochs):

#             train_loss = train(model, train_loader, optimizer, criterion, config.gradient_clip)

#             valid_loss = evaluate(model, valid_loader, criterion)

#             wandb.log({"Train Loss": train_loss, "Valid Loss": valid_loss})

#             if valid_loss < best_valid_loss:

#                 best_valid_loss = valid_loss

#                 torch.save(model.state_dict(), 'best_model.pt')

#         model.load_state_dict(torch.load('best_model.pt'))

#         test_loss = test(model, test_loader, criterion, char2index, index2char)

#         wandb.log({"Test Loss": test_loss})

def run_sweep(config=None):

    with wandb.init(config=config):

        config = wandb.config

        input_dim = len(char2index)

        output_dim = len(char2index)

        embedding_dim = config.embedding_dim

        hidden_dim = config.hidden_dim

        num_layers = config.encoder_layers

        dropout = config.dropout_rate

        rnn_type = config.rnn_cell_type

        bidirectional = config.bidirectional

        max_length = config.max_length

        attention = Attention(hidden_dim)

        encoder = EncoderRNN(input_dim, embedding_dim, hidden_dim, num_layers, dropout, rnn_type, bidirectional)

        decoder = DecoderRNN(output_dim, embedding_dim, hidden_dim, config.decoder_layers, dropout, attention, rnn_type)

        model = Seq2Seq(encoder, decoder, device).to(device)

        optimizer = optim.Adam(model.parameters(), lr=config.learning_rate)

        criterion = nn.CrossEntropyLoss(ignore_index=char2index['<PAD>'])

        best_valid_loss = float('inf')

        for epoch in range(config.num_epochs):

            train_loss = train(model, train_loader, optimizer, criterion, config.gradient_clip)

            valid_loss = evaluate(model, valid_loader, criterion)

            wandb.log({"Train Loss": train_loss, "Valid Loss": valid_loss})

            if valid_loss < best_valid_loss:

                best_valid_loss = valid_loss

                torch.save(model.state_dict(), 'best_model.pt')

        model.load_state_dict(torch.load('best_model.pt'))

        test_loss = test(model, test_loader, criterion, char2index, index2char)

        wandb.log({"Test Loss": test_loss})

        # Save predictions to CSV

        save_predictions_to_csv(predictions, ground_truths, 'predictions_vs_ground_truths.csv', index2char)

# Ensure all other required functions and classes are defined here

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

index2char = {v: k for k, v in char2index.items()}

wandb.agent(sweep_id, run_sweep, count=1)




In [None]:
# Ensure all other required functions and classes are defined here

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

index2char = {v: k for k, v in char2index.items()}


In [None]:
wandb.agent(sweep_id, run_sweep, count=1)