In [5]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import pandas as pd
import numpy as np
from torch.utils.data import Dataset, DataLoader
import math
import os

class SentenceTransformDataset(Dataset):
    def __init__(self, csv_file):
        data = pd.read_csv(csv_file)
        self.sentences = data['Sentence'].apply(lambda x: x).values  
        self.transformed_sentences = data['Transformed sentence'].apply(lambda x: '#' + x ).values 
        self.vocab = self.build_vocab()
        self.src_vocab_size = len(self.vocab)
        self.idx2char = {i: char for i, char in enumerate(self.vocab)}
        self.char2idx = {char: i for i, char in enumerate(self.vocab)}

    def build_vocab(self):
        sentences = np.concatenate((self.sentences, self.transformed_sentences))
        vocab = set(''.join(sentences))
        return sorted(list(vocab))

    def __len__(self):
        return len(self.sentences)

    def __getitem__(self, idx):
        source_sentence = self.sentences[idx]
        transformed_sentence = self.transformed_sentences[idx]

        source_idxs = [self.char2idx[char] for char in source_sentence]
        transformed_idxs = [self.char2idx[char] for char in transformed_sentence]

        return {
            'source': torch.tensor(source_idxs, dtype=torch.long),
            'transformed': torch.tensor(transformed_idxs, dtype=torch.long),
            'transformed_sentence': transformed_sentence[1:]
        }

train_dataset = SentenceTransformDataset('train_data.csv')
eval_dataset = SentenceTransformDataset('eval_data.csv')

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
eval_loader = DataLoader(eval_dataset, batch_size=1)
final_train_loader = DataLoader(train_dataset, batch_size=1, shuffle=True)

# Transformer Model
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super(MultiHeadAttention, self).__init__()
        assert d_model % num_heads == 0, "d_model must be divisible by num_heads"
        
        self.d_model = d_model
        self.num_heads = num_heads
        self.d_k = d_model // num_heads
        
        self.W_q = nn.Linear(d_model, d_model)
        self.W_k = nn.Linear(d_model, d_model)
        self.W_v = nn.Linear(d_model, d_model)
        self.W_o = nn.Linear(d_model, d_model)
        
    def scaled_dot_product_attention(self, Q, K, V):
        attn_scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_k)
        attn_probs = torch.softmax(attn_scores, dim=-1)
        output = torch.matmul(attn_probs, V)
        return output
        
    def split_heads(self, x):
        batch_size, seq_length, d_model = x.size()
        return x.view(batch_size, seq_length, self.num_heads, self.d_k).transpose(1, 2)
        
    def combine_heads(self, x):
        batch_size, _, seq_length, d_k = x.size()
        return x.transpose(1, 2).contiguous().view(batch_size, seq_length, self.d_model)
        
    def forward(self, Q, K, V):
        Q = self.split_heads(self.W_q(Q))
        K = self.split_heads(self.W_k(K))
        V = self.split_heads(self.W_v(V))
        
        attn_output = self.scaled_dot_product_attention(Q, K, V)
        output = self.W_o(self.combine_heads(attn_output))
        return output

class PositionWiseFeedForward(nn.Module):
    def __init__(self, d_model, d_ff):
        super(PositionWiseFeedForward, self).__init__()
        self.fc1 = nn.Linear(d_model, d_ff)
        self.fc2 = nn.Linear(d_ff, d_model)
        self.relu = nn.ReLU()

    def forward(self, x):
        return self.fc2(self.relu(self.fc1(x)))

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_seq_length):
        super(PositionalEncoding, self).__init__()
        
        pe = torch.zeros(max_seq_length, d_model)
        position = torch.arange(0, max_seq_length, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * -(math.log(10000.0) / d_model))
        
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        
        self.register_buffer('pe', pe.unsqueeze(0))
        
    def forward(self, x):
        seq_length = x.size(1)
        return x + self.pe[:, :seq_length]

class EncoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout):
        super(EncoderLayer, self).__init__()
        self.self_attn = MultiHeadAttention(d_model, num_heads)
        self.feed_forward = PositionWiseFeedForward(d_model, d_ff)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, x):
        attn_output = self.self_attn(x, x, x)
        x = self.norm1(x + self.dropout(attn_output))
        ff_output = self.feed_forward(x)
        x = self.norm2(x + self.dropout(ff_output))
        return x

class DecoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout):
        super(DecoderLayer, self).__init__()
        self.self_attn = MultiHeadAttention(d_model, num_heads)
        self.cross_attn = MultiHeadAttention(d_model, num_heads)
        self.feed_forward = PositionWiseFeedForward(d_model, d_ff)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, x, enc_output):
        attn_output = self.self_attn(x, x, x)
        x = self.norm1(x + self.dropout(attn_output))
        attn_output = self.cross_attn(x, enc_output, enc_output)
        x = self.norm2(x + self.dropout(attn_output))
        ff_output = self.feed_forward(x)
        x = self.norm3(x + self.dropout(ff_output))
        return x

class Transformer(nn.Module):
    def __init__(self, src_vocab_size, tgt_vocab_size, d_model, num_heads, num_layers, d_ff, max_seq_length, dropout):
        super(Transformer, self).__init__()
        self.encoder_embedding = nn.Embedding(src_vocab_size, d_model)
        self.decoder_embedding = nn.Embedding(tgt_vocab_size, d_model)
        self.positional_encoding = PositionalEncoding(d_model, max_seq_length)

        self.encoder_layers = nn.ModuleList([EncoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])
        self.decoder_layers = nn.ModuleList([DecoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])

        self.fc = nn.Linear(d_model, tgt_vocab_size)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, src, tgt):
        src_embedded = self.dropout(self.positional_encoding(self.encoder_embedding(src)))
        tgt_embedded = self.dropout(self.positional_encoding(self.decoder_embedding(tgt)))

        enc_output = src_embedded
        for enc_layer in self.encoder_layers:
            enc_output = enc_layer(enc_output)

        dec_output = tgt_embedded
        for dec_layer in self.decoder_layers:
            dec_output = dec_layer(dec_output, enc_output)

        output = self.fc(dec_output)
        return output

# Check if the pre-trained model exists, and if not, train a new model
if not torch.cuda.is_available():
    device = 'cpu'
else:
    device = 'cuda'

pretrained_model_path = 'transformer_model.pth'

if not os.path.exists(pretrained_model_path):
    src_vocab_size = 28
    tgt_vocab_size = 28
    d_model = 256
    num_heads = 8
    num_layers = 6
    d_ff = 2048
    max_seq_length = 9
    dropout = 0.1

    transformer = Transformer(src_vocab_size, tgt_vocab_size, d_model, num_heads, num_layers, d_ff, max_seq_length, dropout)

    # Training Loop
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(transformer.parameters(), lr=0.0001)

    def train(model, data_loader, criterion, optimizer):
        model.train()
        total_loss = 0.0
        for batch in data_loader:
            src_data = batch['source']
            tgt_data = batch['transformed']
            optimizer.zero_grad()
            output = model(src_data,tgt_data[:, :-1])
            loss = criterion(output.contiguous().view(-1, tgt_vocab_size),tgt_data[:, 1:].contiguous().view(-1))
            loss.backward()
            optimizer.step()

            total_loss += loss.item()

        return total_loss / len(data_loader)


    def evaluate(model, data_loader, criterion):
        model.eval()
        total_loss = 0.0
        for batch in data_loader:
            src_data = batch['source']
            tgt_data = batch['transformed']
            output = model(src_data, tgt_data[:, :-1])
            loss = criterion(output.contiguous().view(-1, tgt_vocab_size),tgt_data[:, 1:].contiguous().view(-1))

            total_loss += loss.item()
        
        return total_loss / len(data_loader)

    num_epochs = 10

    for epoch in range(num_epochs):
        train_loss = train(transformer, train_loader, criterion, optimizer)
        eval_loss = evaluate(transformer, eval_loader, criterion)
        print(f"Epoch {epoch+1}: Train Loss = {train_loss:.4f}, Eval Loss = {eval_loss:.4f}")

    torch.save(transformer.state_dict(), 'transformer_model.pth')

else:
    # Load the pre-trained model
    transformer = Transformer(28, 28, 256, 8, 6, 2048,9, 0.1)
    transformer.load_state_dict(torch.load(pretrained_model_path))
    transformer.to(device)

def check(pred: str, true: str):
    correct = 0
    for a, b in zip(pred, true):
        if a == b:
            correct += 1
    return correct
    
def predict_transformed_string(model,idx2char, dataloader,max_seq_length=9):
    model.eval()
    results = {
        "pred": [],
        "true": [],
        "score": []
    }
    correct = [0 for _ in range(9)]
    cnt=0
    with torch.no_grad():
        for batch in dataloader:
            cnt = cnt+1
            src = batch['source']
            tgt = batch['transformed']
            y=batch['transformed_sentence']
            y=str(y[0])
            output = torch.tensor([train_dataset.char2idx['#']] * max_seq_length, dtype=torch.long).unsqueeze(0)
            output = model(src, tgt[:, :-1])
            output = torch.argmax(output, dim=-1).tolist()
            predicted_string = [idx2char[idx] for idx in output[0]]
            final_y_hat=''
            for ch in predicted_string:
                final_y_hat+=ch
            score = check(final_y_hat,y)
            results["pred"].append(final_y_hat)
            results["true"].append(y)
            results["score"].append(score)
            correct[score] += 1
    print("Eval dataset results:")
    for num_chr in range(9):
        print(
            f"Number of predictions with {num_chr} correct predictions: {correct[num_chr]}"
        )
        
print("Obtaining metrics for train data:")
predict_transformed_string(transformer,train_dataset.idx2char,final_train_loader)
print("Obtaining metrics for eval data:")
predict_transformed_string(transformer,train_dataset.idx2char,eval_loader)


Epoch 1: Train Loss = 2.5746, Eval Loss = 1.4659
Epoch 2: Train Loss = 0.7056, Eval Loss = 0.3692
Epoch 3: Train Loss = 0.2653, Eval Loss = 0.1119
Epoch 4: Train Loss = 0.1325, Eval Loss = 0.0974
Epoch 5: Train Loss = 0.1088, Eval Loss = 0.0965
Epoch 6: Train Loss = 0.1064, Eval Loss = 0.0961
Epoch 7: Train Loss = 0.1037, Eval Loss = 0.0996
Epoch 8: Train Loss = 0.0952, Eval Loss = 0.0769
Epoch 9: Train Loss = 0.0428, Eval Loss = 0.0044
Epoch 10: Train Loss = 0.0118, Eval Loss = 0.0021
Obtaining metrics for train data:
Eval dataset results:
Number of predictions with 0 correct predictions: 0
Number of predictions with 1 correct predictions: 0
Number of predictions with 2 correct predictions: 0
Number of predictions with 3 correct predictions: 0
Number of predictions with 4 correct predictions: 59
Number of predictions with 5 correct predictions: 3358
Number of predictions with 6 correct predictions: 2759
Number of predictions with 7 correct predictions: 96
Number of predictions with 8 