# Deep Learning
## Assignment 3

### Question1
### Transformers

In this assignment we will build the transformer architecture and train it on the given dataset, let us start by loading the respective datasets and converting them into torch tensors and then making the dataset loaders.

In [2]:
import torch
import torch.nn as nn
import pandas as pd
import string
from torch.utils.data import Dataset, DataLoader
import math

In [3]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

train_data = pd.read_csv('train_data.csv')
eval_data = pd.read_csv('eval_data.csv')

print(train_data.head())

   Sentence Transformed sentence
0  udaxihhe             fmvmfthn
1  xdvxrcsn             suiaveib
2  bacghqta             zgvwmloh
3  rgwuwrnh             lmhdulik
4  osizayzf             wfysmuhe


  return torch._C._cuda_getDeviceCount() > 0


In [4]:
char_vocab = [ord(char) - 96 for char in list(string.ascii_lowercase)]
char_vocab = [0] + char_vocab

def to_word(x):
    output = [chr(i + 96) for i in x]
    str = ''
    return str.join(output)

In [5]:
class CustomDataset(Dataset):
    def __init__(self, dataframe, transform=None):
        self.data = dataframe
        self.transform = transform

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        sample = self.data.iloc[idx]

        original_sen = [ord(char) - 96 for char in list(sample['Sentence'])]
        transformed_sen = [ord(char) - 96 for char in list(sample['Transformed sentence'])]

        features = torch.tensor(original_sen, dtype=torch.long)
        target = torch.tensor(transformed_sen, dtype=torch.long)

        if self.transform:
            features = self.transform(features)

        return features, target

In [6]:
batch_size = 500

train_dataset = CustomDataset(train_data)
eval_dataset = CustomDataset(eval_data)

train_loader = DataLoader(dataset=train_dataset, batch_size= batch_size, shuffle = True)
eval_loader = DataLoader(dataset = eval_dataset, batch_size = batch_size, shuffle = False)

Now we will proceed to making the model, we will start by making the individual parts of the model one by one.

### Transformer Encoder

In [7]:
# Single Encoder Layer
class TransformerEncoderLayer(nn.Module):

    def __init__(self, d_model, nhead, dim_feedforward, dropout=0.1):
        super(TransformerEncoderLayer, self).__init__()
        self.self_attn = nn.MultiheadAttention(d_model, nhead, dropout=dropout, batch_first=True)
        self.feed_forward = nn.Sequential(
            nn.Linear(d_model, dim_feedforward),
            nn.ReLU(),
            nn.Linear(dim_feedforward, d_model)
        )
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, x_mask=None):
        x2 = self.self_attn(x, x, x, attn_mask=x_mask)[0]
        x = x + self.dropout(x2)
        x2 = self.feed_forward(x)
        x = x + self.dropout(x2)
        x = self.norm1(x)
        x = self.norm2(x)
        return x


class TransformerEncoder(nn.Module):
    
    def __init__(self, num_layers, d_model, nhead, dim_feedforward, dropout=0.1):
        super(TransformerEncoder, self).__init__()
        self.layers = nn.ModuleList(
            [TransformerEncoderLayer(d_model, nhead, dim_feedforward, dropout) for _ in range(num_layers)]
        )

    def forward(self, src, src_mask):
        for layer in self.layers:
            src = layer(src, src_mask)
        return src



### Transformer Decoder

In [8]:
class TransformerDecoderLayer(nn.Module):
    def __init__(self, d_model, nhead, dim_feedforward, dropout=0.1):
        super(TransformerDecoderLayer, self).__init__()
        self.self_attn = nn.MultiheadAttention(d_model, nhead, dropout=dropout, batch_first=True)
        self.multihead_attn = nn.MultiheadAttention(d_model, nhead, dropout=dropout, batch_first=True)
        self.feed_forward = nn.Sequential(
            nn.Linear(d_model, dim_feedforward),
            nn.ReLU(),
            nn.Linear(dim_feedforward, d_model)
        )
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, tgt, memory, tgt_mask, memory_mask):
        tgt2 = self.self_attn(tgt, tgt, tgt, attn_mask=tgt_mask)[0]
        tgt = tgt + self.dropout(tgt2)
        tgt = self.norm1(tgt)

        tgt2 = self.multihead_attn(tgt, memory, memory, attn_mask=memory_mask)[0]
        tgt = tgt + self.dropout(tgt2)
        tgt = self.norm2(tgt)

        tgt2 = self.feed_forward(tgt)
        tgt = tgt + self.dropout(tgt2)
        tgt = self.norm3(tgt)

        return tgt


class TransformerDecoder(nn.Module):
    def __init__(self, num_layers, d_model, nhead, dim_feedforward, dropout=0.1):
        super(TransformerDecoder, self).__init__()
        self.layers = nn.ModuleList(
            [TransformerDecoderLayer(d_model, nhead, dim_feedforward, dropout) for _ in range(num_layers)]
        )

    def forward(self, tgt, memory, tgt_mask, memory_mask):
        for layer in self.layers:
            tgt = layer(tgt, memory, tgt_mask, memory_mask)
        return tgt


### Embedding Layer

In [9]:
class EmbeddingLayer(nn.Module):
    
    def __init__(self, d_model, max_len=8):
        super(EmbeddingLayer, self).__init__()

        self.embedding = nn.Embedding(num_embeddings=len(char_vocab), embedding_dim=d_model)
        self.input_size = d_model

        position = torch.arange(0, max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2) * -(math.log(10000.0) / d_model))
        pe = torch.zeros(max_len, d_model)
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)

    def forward(self, x):
        embeddings = torch.zeros(x.size(0), x.size(1), self.input_size)
        for i in range(len(x)):
            embeddings[i] = self.embedding(x[i])
            
        embeddings = embeddings + self.pe.view(1, -1, self.input_size)[:, :x.size(1), :]
        return embeddings


### Final Tranformer Model

In [10]:
class Transformer(nn.Module):

    def __init__(self, num_layers, d_model, nhead, dim_feedforward ,dropout=0.1, max_len = 8, train=True):
        super(Transformer, self).__init__()
        self.encoder = TransformerEncoder(num_layers, d_model, nhead, dim_feedforward, dropout)
        self.decoder = TransformerDecoder(num_layers, d_model, nhead, dim_feedforward, dropout)
        self.embedding = EmbeddingLayer(d_model, max_len)
        self.linear = nn.Linear(d_model, len(char_vocab))
        self.dropout = nn.Dropout(dropout)
        self.nhead = nhead

        self.train = True

    def forward(self, src, tgt):
        encoder_input = self.embedding(src.long())
        decoder_input = self.embedding(tgt.long())

        src_mask = None
        
        if self.train:
            tgt_mask = torch.tril(torch.ones((tgt.shape[1], tgt.shape[1]))).expand(tgt.shape[0]*self.nhead, tgt.shape[1], tgt.shape[1]).to(device)
        else:
            tgt_mask = None

        memory = self.encoder(encoder_input, src_mask)
        out = self.decoder(decoder_input, memory, tgt_mask, src_mask)

        out = self.linear(out)
        out = self.dropout(out)
    
        return out
        

Now we will proceed to training the above transformer model with the training samples given to us.

In [21]:
#Hyperparameters
num_layers = 6
d_model = 512
nhead = 8
dim_feedforward = 2048
dropout = 0.6
learning_rate = 0.01
num_epochs = 10

In [22]:
model = Transformer(num_layers, d_model, nhead, dim_feedforward, dropout)

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr = learning_rate)

In [23]:
n_total_steps = len(train_loader)
n_score(images)
print(f'Inception Score is: {inception_score}')
for epoch in range(num_epochs):
    for i, (X_train, Y_train) in enumerate(train_loader):
        input = torch.zeros(Y_train.size(0), Y_train.size(1))
        for j in range(len(Y_train)):
            input[j] = torch.cat((torch.tensor([0]), Y_train[j, :-1]), 0)
        input = input.to(device)

        X_train = X_train.to(device)
        
        outputs = model(X_train, input)
        
        loss = criterion(outputs.view(-1, len(char_vocab)), Y_train.view(-1))

        optimizer.zero_grad()        
        loss.backward()

        optimizer.step()


        if (i+1)%7 == 0:
            print(f'epoch {epoch+1}/{num_epochs}, step {i+1}/{n_total_steps}, loss  {loss.item():.4f}')

epoch 1/10, step 7/14, loss  4.7416
epoch 1/10, step 14/14, loss  3.9979
epoch 2/10, step 7/14, loss  3.6529
epoch 2/10, step 14/14, loss  3.4855
epoch 3/10, step 7/14, loss  3.4115
epoch 3/10, step 14/14, loss  3.3505
epoch 4/10, step 7/14, loss  3.3311
epoch 4/10, step 14/14, loss  3.3084
epoch 5/10, step 7/14, loss  3.3198
epoch 5/10, step 14/14, loss  3.3095
epoch 6/10, step 7/14, loss  3.3071
epoch 6/10, step 14/14, loss  3.2926
epoch 7/10, step 7/14, loss  3.3029
epoch 7/10, step 14/14, loss  3.2980
epoch 8/10, step 7/14, loss  3.3052
epoch 8/10, step 14/14, loss  3.2985
epoch 9/10, step 7/14, loss  3.2954
epoch 9/10, step 14/14, loss  3.3001
epoch 10/10, step 7/14, loss  3.3001
epoch 10/10, step 14/14, loss  3.2965


Now that we have trained the model, it is time to run inference on the model and obtain the final results.

In [24]:
def check(pred: str, true: str):
    correct = 0
    for a, b in zip(pred, true):
        if a == b:
            correct += 1
    return correct

total_correct = 0
total = 0

with torch.no_grad():
    for i, (X_test, Y_test) in enumerate(eval_loader):
        X_test = X_test.to(device)
        input = torch.zeros(batch_size, 1)

        model.train = False

        for i in range(8):
            out = model(X_test, input)

            out1 = out[:, -1, :]
            out1 = torch.argmax(out1, -1, keepdims = True)
            
            input = torch.cat([input, out1], -1)
        
        input = input[:, 1:]

        for j in range(len(Y_test)):
            print('-----------')
            print(f'Original string : {to_word(X_test[j])}')
            print(f'Transformed String : {to_word(Y_test[j])}')
            print(f'Predicted String: {to_word(input[j].to(torch.int32))}')
            print(f'Correct character : {check(Y_test[j], input[j])}')
            total_correct += check(Y_test[j], input[j])
            total += 8


-----------
Original string : atxwvepa
Transformed String : ldlhgjuj
Predicted String: rssidpqk
Correct character : 0
-----------
Original string : iaayjhyr
Transformed String : mffolhhl
Predicted String: gdaakdsw
Correct character : 0
-----------
Original string : nmnqjobz
Transformed String : xjwbqnnq
Predicted String: scsskind
Correct character : 1
-----------
Original string : puembxfo
Transformed String : nqxwxmtb
Predicted String: pfdsfdvw
Correct character : 0
-----------
Original string : kpxqtnng
Transformed String : dlbdbgvx
Predicted String: aspdsasd
Correct character : 1
-----------
Original string : nqalgpgq
Transformed String : fwkibsou
Predicted String: nandsegd
Correct character : 0
-----------
Original string : bryqeggb
Transformed String : ulafntih
Predicted String: npcawcrd
Correct character : 0
-----------
Original string : soactzsr
Transformed String : ontitdlb
Predicted String: yydgpkad
Correct character : 0
-----------
Original string : fjopbuvo
Transformed Strin

In [25]:
print(f'The correct number of characters are : {total_correct}/{total}')

The correct number of characters are : 619/16000
