In [11]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import math

In [12]:
class CustomDataset(Dataset):
    def __init__(self, csv_file):
        self.data = pd.read_csv(csv_file)
#         self.data.sample(frac=1).reset_index(drop=True)
        self.start = 26

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        input_sentence = torch.tensor([self.start]+[ord(c) - ord('a') for c in self.data.iloc[idx, 0]])
#         input_sentence = self.embedding(input_sentence) 
        target_sentence = torch.tensor([self.start]+[ord(c) - ord('a') for c in self.data.iloc[idx, 1]])
       
        return (input_sentence,target_sentence)

In [13]:
training_data = DataLoader(CustomDataset("Data/train_data.csv"))
training_source = []
training_target = []
for input_,target in training_data:
    training_source.append(input_)
    training_target.append(target)
#     print(x.shape)
training_source = torch.cat(training_source,dim=0)[:,1:]
training_target = torch.cat(training_target,dim=0)

In [14]:
test_data = DataLoader(CustomDataset("Data/eval_data.csv"))
test_source = []
test_target = []
for input_,target in test_data:
    test_source.append(input_)
    test_target.append(target)
#     print(x.shape)
test_source = torch.cat(test_source,dim=0)[:,1:]
test_target = torch.cat(test_target,dim=0)

In [15]:
print(test_source.shape,test_target.shape)
print(training_source.shape,training_target.shape)

torch.Size([2000, 8]) torch.Size([2000, 9])
torch.Size([7000, 8]) torch.Size([7000, 9])


In [16]:
# Make Dataset and DataLoader
class MyDataset(Dataset):
    def __init__(self, source, target):
        self.source = source
        self.target = target

    def __getitem__(self, index):
        return self.source[index], self.target[index]

    def __len__(self):
        return len(self.source)
    
train_dataset = MyDataset(training_source, training_target)
eval_dataset = MyDataset(test_source, test_target)

train_loader = DataLoader(train_dataset, batch_size=128, shuffle=True)
eval_loader = DataLoader(eval_dataset, batch_size=128, shuffle=True)

In [17]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, dropout_p, max_len = 10):
        super().__init__()
        # Info
        self.dropout = nn.Dropout(dropout_p)
        
        # Encoding - From formula
        pos_encoding = torch.zeros(max_len, d_model)
        positions_list = torch.arange(0, max_len, dtype=torch.float).view(-1, 1)
        division_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0)) / d_model) # 1000^(2i/d_model)
        
        # PE(pos, 2i) = sin(pos/1000^(2i/dim_model))
        pos_encoding[:, 0::2] = torch.sin(positions_list * division_term)
        
        # PE(pos, 2i + 1) = cos(pos/1000^(2i/dim_model))
        pos_encoding[:, 1::2] = torch.cos(positions_list * division_term)
        
        # Saving buffer (same as parameter without gradients needed)
        self.register_buffer("pos_encoding",pos_encoding)
        
    def forward(self, token_embedding):
        # Residual connection + pos encoding
        return self.dropout(token_embedding + self.pos_encoding[:token_embedding.size(1), :])

class TransformerModel(nn.Module):
    def __init__(self, vocab_size, d_model, dropout_p, nhead, num_encoder_layers, num_decoder_layers):
        super(TransformerModel, self).__init__()
        self.d_model = d_model
        self.embedding_src = nn.Embedding(vocab_size-1, d_model)
        self.embedding_tgt = nn.Embedding(vocab_size, d_model)
        self.positional_encoder = PositionalEncoding(
            d_model=d_model, dropout_p=dropout_p, max_len=10
        )
        self.transformer = nn.Transformer(
            d_model=d_model,
            nhead=nhead,
            num_encoder_layers=num_encoder_layers,
            num_decoder_layers=num_decoder_layers,
            batch_first = True
        )
        self.fc = nn.Linear(d_model, vocab_size-1)
        self.output_layer = nn.Softmax(dim = 2)

    def forward(self, src, tgt):
        tgt_mask = nn.Transformer.generate_square_subsequent_mask(tgt.shape[1])
        src = self.embedding_src(src)
        tgt = self.embedding_tgt(tgt)
        src = self.positional_encoder(src)
        tgt = self.positional_encoder(tgt)
        
        output = self.transformer(src, tgt, tgt_mask=tgt_mask)
        prob_outputs = self.fc(output)
        return prob_outputs
    
    def predict(self, inp):
        start = 26
        input_sentence = torch.tensor([ord(c) - ord('a') for c in inp])
        src = input_sentence.view(1,-1)
        tgt = torch.tensor([start]).reshape(1,-1)
        for i in range(8):         
            logits = self.forward(src,tgt)              # logits = [batch_size,seq_length,27]
            
            logits = self.output_layer(logits).argmax(dim=2)            # logits = [batch_size,seq_length]
            
            ## Append this to tgt
            tgt = torch.cat((tgt,logits[:,-1].reshape(1,-1)),dim=1)
            
        
        # print(tgt.shape)      # [1,9]
        # Convert tgt to string
        final = ""
        
        for i in range(1,9):
            # print(tgt[0][i])
            final += chr(ord('a')+int(tgt[0][i]))

        return final
        

In [18]:
vocab_size = 27  # Lowercase English alphabet letters with start token
d_model = 128
nhead = 8
num_encoder_layers = 2
num_decoder_layers = 2

In [20]:
transformer = TransformerModel(vocab_size, d_model,0.0, nhead, num_encoder_layers, num_decoder_layers)
# transformer = TransformerModel(d_model,nhead,num_encoder_layers,9,vocab_size,vocab_size,dropout = 0.1)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(transformer.parameters(), lr=0.0005)

# Set up early stopping parameters
patience = 20  # Number of epochs to wait for improvement
best_val_loss = float('inf')
epochs_since_improvement = 0
loss_val = 0.0

num_epochs = 50
for epoch in range(num_epochs):
    # Training Loop
    transformer.train()
    total_loss = 0.0
    for source, target in train_loader:
        optimizer.zero_grad()
        output = transformer(source,target)
        output = output.permute(0,2,1)
        loss = criterion(output[:,:,:-1],target[:,1:])
        total_loss += loss.item()
        loss.backward()
        optimizer.step()
    
    print(f'Epoch [{epoch+1}/{num_epochs}], Train Loss: {total_loss / len(train_loader):.4f}')
    
    # Validation Loop
    transformer.eval()
    with torch.no_grad():
        output_ = transformer(test_source,test_target)
        output_ = output_.permute(0,2,1)
        loss_val = criterion(output_[:,:,:-1], test_target[:,1:])
        print(f'Epoch [{epoch+1}/{num_epochs}], Eval Loss: {loss_val.item():.4f}')
    
        if loss_val < best_val_loss:
            best_val_loss = loss_val
            epochs_since_improvement = 0
            torch.save(transformer.state_dict(), 'Model/transformer.pth')

        else:
            epochs_since_improvement += 1

    # Check if we should stop training early
    if epochs_since_improvement >= patience:
        print(f"Early stopping after {epoch+1} epochs with no improvement.")
        break

if loss_val < best_val_loss:
    torch.save(transformer.state_dict(), 'Model/transformer.pth')


Epoch [1/50], Train Loss: 3.2819
Epoch [1/50], Eval Loss: 3.2616
Epoch [2/50], Train Loss: 3.2654
Epoch [2/50], Eval Loss: 3.2532
Epoch [3/50], Train Loss: 3.2427
Epoch [3/50], Eval Loss: 3.1981
Epoch [4/50], Train Loss: 3.0348
Epoch [4/50], Eval Loss: 2.9409
Epoch [5/50], Train Loss: 2.8568
Epoch [5/50], Eval Loss: 2.6507
Epoch [6/50], Train Loss: 2.6477
Epoch [6/50], Eval Loss: 2.6325
Epoch [7/50], Train Loss: 2.6328
Epoch [7/50], Eval Loss: 2.6288
Epoch [8/50], Train Loss: 2.6254
Epoch [8/50], Eval Loss: 2.6291
Epoch [9/50], Train Loss: 2.6179
Epoch [9/50], Eval Loss: 2.6250
Epoch [10/50], Train Loss: 2.6090
Epoch [10/50], Eval Loss: 2.5939
Epoch [11/50], Train Loss: 2.5901
Epoch [11/50], Eval Loss: 2.5806
Epoch [12/50], Train Loss: 2.5693
Epoch [12/50], Eval Loss: 2.5632
Epoch [13/50], Train Loss: 2.5221
Epoch [13/50], Eval Loss: 2.4581
Epoch [14/50], Train Loss: 2.4234
Epoch [14/50], Eval Loss: 2.3968
Epoch [15/50], Train Loss: 2.3820
Epoch [15/50], Eval Loss: 2.3707
Epoch [16/50]

In [22]:
model = TransformerModel(vocab_size, d_model,0.0, nhead, num_encoder_layers, num_decoder_layers)
model.load_state_dict(torch.load('Model/transformer.pth')) 

<All keys matched successfully>

In [23]:
def predict(model, inp):
        start = 26
        input_sentence = torch.tensor([ord(c) - ord('a') for c in inp])
        src = input_sentence.view(1,-1)
        tgt = torch.tensor([start]).reshape(1,-1)
        for i in range(8):         
            logits = model.forward(src,tgt)              # logits = [batch_size,seq_length,27]
            
            logits = model.output_layer(logits).argmax(dim=2)            # logits = [batch_size,seq_length]
            
            ## Append this to tgt
            tgt = torch.cat((tgt,logits[:,-1].reshape(1,-1)),dim=1)
            
        
        # print(tgt.shape)      # [1,9]
        # Convert tgt to string
        final = ""
        
        for i in range(1,9):
            # print(tgt[0][i])
            final += chr(ord('a')+int(tgt[0][i]))

        return final

In [24]:
# Function to check how many characters match in the two strings
def check(pred: str, true: str):
    correct = 0
    for a, b in zip(pred, true):
        if a == b:
            correct += 1
            
    return correct

# Function to score the model's performance
def evaluate(model):

    # Train data
    print("Obtaining results for training data:")
    train_data = pd.read_csv("Data/train_data.csv").to_numpy()
    results = {
        "pred": [],
        "true": [],
        "score": [],
    }
    correct = [0 for _ in range(9)]
    for x, y in train_data:
        pred = predict(model,x)
        score = check(pred, y)
        results["pred"].append(pred)
        results["true"].append(y)
        results["score"].append(score)

        correct[score] += 1
    print("Train dataset results:")
    for num_chr in range(9):
        print(
            f"Number of predictions with {num_chr} correct predictions: {correct[num_chr]}"
        )
    # Save predicitons and true sentences to inspect manually if required.
    pd.DataFrame.from_dict(results).to_csv("results_train.csv", index=False)

    #----------------------------------------------------------------------------------

    print("Obtaining metrics for eval data:")
    eval_data = pd.read_csv("Data/eval_data.csv").to_numpy()
    results = {
        "pred": [],
        "true": [],
        "score": [],
    }
    correct = [0 for _ in range(9)]
    for x, y in eval_data:
        pred = predict(model, x)
        score = check(pred, y)
        results["pred"].append(pred)
        results["true"].append(y)
        results["score"].append(score)

        correct[score] += 1
    print("Eval dataset results:")
    for num_chr in range(9):
        print(
            f"Number of predictions with {num_chr} correct predictions: {correct[num_chr]}"
        )
    # Save predicitons and true sentences to inspect manually if required.
    pd.DataFrame.from_dict(results).to_csv("results_eval.csv", index=False)

In [25]:
evaluate(model)

Obtaining results for training data:
Train dataset results:
Number of predictions with 0 correct predictions: 0
Number of predictions with 1 correct predictions: 0
Number of predictions with 2 correct predictions: 0
Number of predictions with 3 correct predictions: 0
Number of predictions with 4 correct predictions: 0
Number of predictions with 5 correct predictions: 0
Number of predictions with 6 correct predictions: 8
Number of predictions with 7 correct predictions: 417
Number of predictions with 8 correct predictions: 6575
Obtaining metrics for eval data:
Eval dataset results:
Number of predictions with 0 correct predictions: 0
Number of predictions with 1 correct predictions: 0
Number of predictions with 2 correct predictions: 0
Number of predictions with 3 correct predictions: 0
Number of predictions with 4 correct predictions: 0
Number of predictions with 5 correct predictions: 0
Number of predictions with 6 correct predictions: 6
Number of predictions with 7 correct predictions