In [None]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
import matplotlib.pyplot as plt
import math

In [None]:
class Embed(nn.Module):
    def __init__(self, vocab_size, embed_size):
        super(Embed, self).__init__()
        self.embed=nn.Embedding(vocab_size, embed_size)

    def forward(self, inp):
        
        out=self.embed(inp)

        return out

In [None]:
class Pos_Embed(nn.Module):
    def __init__(self, embed_size, max_size_input_seq):
        super(Pos_Embed, self).__init__()
        self.embed_size=embed_size

        pos_em=torch.zeros(max_size_input_seq, embed_size)

        for i in range(max_size_input_seq):
            for j in range(embed_size//2):
                temp=2*j/embed_size
                pos_em[i][j]=math.sin(i/(10000**temp))
                pos_em[i][j+1]=math.cos(i/(10000**temp))

        self.pos_em=pos_em
        

    def forward(self, embeded_inp):

        embeded_inp=embeded_inp

        seq_len=embeded_inp.shape[1]

        pos_embeded=embeded_inp+torch.autograd.Variable(self.pos_em[:seq_len,:])

        return pos_embeded



In [None]:
class Multi_Head_Attention(nn.Module):
    def __init__(self, num_head, pos_embed_size):
        super(Multi_Head_Attention, self).__init__()

        self.num_head=num_head
        self.pos_embed_size=pos_embed_size
        self.each_head_pos_embed_size=pos_embed_size//num_head

        self.q_mat_multiply=nn.Linear(self.pos_embed_size, self.pos_embed_size)
        self.k_mat_multiply=nn.Linear(self.pos_embed_size, self.pos_embed_size)
        self.v_mat_multiply=nn.Linear(self.pos_embed_size, self.pos_embed_size)

        self.mult=nn.Linear(self.pos_embed_size, self.pos_embed_size)

    def forward(self, key, query, value, mask=None):

        batch_size=key.shape[0]
        seq_len=key.shape[1]

        query_seq_len=query.shape[1]

        q_after_mult=self.q_mat_multiply(query)
        k_after_mult=self.k_mat_multiply(key)
        v_after_mult=self.v_mat_multiply(value)

        key=k_after_mult.view(batch_size, seq_len, self.num_head, self.each_head_pos_embed_size)
        query=q_after_mult.view(batch_size, query_seq_len, self.num_head, self.each_head_pos_embed_size)
        value=v_after_mult.view(batch_size, seq_len, self.num_head, self.each_head_pos_embed_size)

        prod=torch.einsum("nqhe,nlhe->nhql",[query,key])

        if mask is not None:
            prod=prod.masked_fill(mask==0, float("-1e20"))

        prod=prod/math.sqrt(self.pos_embed_size)

        att=torch.softmax(prod,dim=3)

        final=torch.einsum("nhql,nlhe->nqhe",[att,value])

        final=final.reshape(batch_size,query_seq_len,self.pos_embed_size)

        final_mult=self.mult(final)

        return final_mult


In [None]:
class Norm_and_MultiheadAttension(nn.Module):
    def __init__(self, num_head, pos_embed_size, drop_out):
        super(Norm_and_MultiheadAttension, self).__init__()
        self.drop=drop_out

        self.multiattention=Multi_Head_Attention(num_head, pos_embed_size)

        self.norm=nn.LayerNorm(pos_embed_size)

        self.drop_out=nn.Dropout(self.drop)


    def forward(self, query, key, value, mask=None):

        output_after_attention=self.multiattention(key, query, value, mask)

        output=output_after_attention+query

        output=self.norm(output)

        output=self.drop_out(output)

        return output

In [None]:
class Attention_and_Feed_forward(nn.Module):
    def __init__(self, num_head, pos_embed_size, drop_out, expansion_factor):
        super(Attention_and_Feed_forward, self).__init__()
        self.drop=drop_out

        self.attention_and_norm=Norm_and_MultiheadAttension(num_head, pos_embed_size,drop_out)

        self.feed_forward=nn.Sequential(
                                        nn.Linear(pos_embed_size, expansion_factor*pos_embed_size), 
                                        nn.ReLU(), 
                                        nn.Linear(expansion_factor*pos_embed_size, pos_embed_size), 
                                    )
        
        self.norm=nn.LayerNorm(pos_embed_size)

        self.drop_out=nn.Dropout(self.drop)

    def forward(self, query, key, value, mask=None):

        output_after_attention_norm=self.attention_and_norm(query, key, value, mask)

        output_after_feed=self.feed_forward(output_after_attention_norm)

        output=output_after_attention_norm+output_after_feed

        output=self.norm(output)

        output=self.drop_out(output)

        return output


In [None]:
class Encoder(nn.Module):
    def __init__(self, vocab_size, embed_size, max_size_input_seq, num_of_layers_of_encoder, num_head, pos_embed_size, drop_out, expansion_factor):
        super(Encoder, self).__init__()
        
        self.embed=Embed(vocab_size, embed_size)

        self.pos_embed=Pos_Embed(embed_size, max_size_input_seq)

        self.num_of_layers_of_encoder=num_of_layers_of_encoder

        self.layers=nn.ModuleList([Attention_and_Feed_forward(num_head, pos_embed_size, drop_out, expansion_factor) for i in range(num_of_layers_of_encoder)])

        self.drop_out=nn.Dropout(drop_out)    

    def forward(self, inp):

        embed_inp=self.embed(inp)

        pos_embed_inp=self.pos_embed(embed_inp)

        output=self.drop_out(pos_embed_inp)

        for layer in self.layers:
            output=layer(output, output, output, None)

        return output


In [None]:
class Decoder_Repeat(nn.Module):
    def __init__(self, num_head, pos_embed_size, drop_out, expansion_factor):
        super(Decoder_Repeat, self).__init__()

        self.norm_masked_attention=Norm_and_MultiheadAttension(num_head, pos_embed_size, drop_out)

        self.drop=drop_out

        self.drop_out=nn.Dropout(self.drop)

        self.attention_and_feed_forward=Attention_and_Feed_forward(num_head, pos_embed_size, drop_out, expansion_factor)


    def forward(self, query, key, value, mask):
        after_masked_attension=self.norm_masked_attention(query, query, query, mask)

        output=self.attention_and_feed_forward(after_masked_attension, key, value)

        return output


In [None]:
class Decoder(nn.Module):
    def __init__(self, vocab_size, embed_size, max_size_input_seq, num_head, pos_embed_size, drop_out, expansion_factor, num_of_layers_of_decoder):
        super(Decoder, self).__init__()

        self.embed=Embed(vocab_size, embed_size)
        
        self.pos_embed=Pos_Embed(embed_size, max_size_input_seq)

        self.layers=nn.ModuleList([Decoder_Repeat(num_head, pos_embed_size, drop_out, expansion_factor) for i in range(num_of_layers_of_decoder)])

        self.linear_layer_last=nn.Linear(embed_size, vocab_size)

        self.drop_out=nn.Dropout(drop_out)

    def forward(self, input_dec, encoder_output, mask):
        inp_dec=self.embed(input_dec)

        inp_dec=self.pos_embed(inp_dec)

        value=self.drop_out(inp_dec)

        for layer in self.layers:
            value=layer(value, encoder_output, encoder_output, mask)

        output=self.linear_layer_last(value)

        output=nn.functional.softmax(output,dim=2)

        return output
        

In [None]:
class Transformer(nn.Module):
    def __init__(self, vocab_size, embed_size, max_size_input_seq, num_of_layers_of_encoder, num_head, pos_embed_size, drop_out, expansion_factor, num_of_layers_of_decoder):
        super(Transformer, self).__init__()

        self.max_seq_len=max_size_input_seq

        self.encoder=Encoder(vocab_size, embed_size, max_size_input_seq, num_of_layers_of_encoder, num_head, pos_embed_size, drop_out, expansion_factor)
        self.decoder=Decoder(vocab_size, embed_size, max_size_input_seq, num_head, pos_embed_size, drop_out, expansion_factor, num_of_layers_of_decoder)

    def create_mask(self,trg):
        batch_size,seq_len=trg.shape

        mask=torch.tril(torch.ones((seq_len, seq_len))).expand(batch_size, 1, seq_len, seq_len)

        return mask

    def forward(self,src,trg):

        mask_made=self.create_mask(trg)

        encoder_output=self.encoder(src)

        decoder_output=self.decoder(trg, encoder_output, mask_made)

        return decoder_output
    
    def output(self,input):

        input_str = np.array([max(1, min(ord(c) - ord('a') + 1, 27)) for c in input])
        input = torch.tensor([input])

        encoder_out=self.encoder(input)

        batch_size=input.shape[0]

        output=torch.zeros(batch_size,1)

        remember=np.full((batch_size,1),0)

        for _ in range(self.max_seq_len):
            output=torch.tensor(remember)
            target_mask=self.create_mask(output)
            output=self.decoder(output,encoder_out,target_mask)
            output=output[:,-1,:]
            output=output.argmax(-1)
            output = torch.unsqueeze(output,axis=1)
            remember=np.concatenate((remember,output.numpy()),axis=1)
            
        return remember[:,1:]


In [None]:
train_data=pd.read_csv('./train_data.csv')
eval_data=pd.read_csv('./eval_data.csv')

train_input=train_data['Sentence']
train_input=np.array([[ord(char) - ord('a') + 1  for char in string] for string in train_input])
train_output=train_data['Transformed sentence']
train_output=np.array([[ord(char) - ord('a') + 1 for char in string] for string in train_output])
temp=np.zeros((7000,1))
train_output=np.hstack((temp,train_output)).astype(np.int32)

eval_input=eval_data['Sentence']
eval_input=np.array([[ord(char) - ord('a') + 1 for char in string] for string in eval_input])
eval_output=eval_data['Transformed sentence']
eval_output=np.array([[ord(char) - ord('a') + 1 for char in string] for string in eval_output])


In [None]:
inp_train=DataLoader(train_input, batch_size=64, shuffle=False)
out_train=DataLoader(train_output, batch_size=64, shuffle=False)

In [None]:
model = Transformer(vocab_size=27,embed_size=80,max_size_input_seq=8,num_of_layers_of_encoder=1,num_head=4,pos_embed_size=128,drop_out=0.3,expansion_factor=4,num_of_layers_of_decoder=1)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [None]:
losses=[]

for epoch in range(200):
    loss_epoch=[]

    for inputs,targets in zip(inp_train,out_train):
        optimizer.zero_grad()
        outputs = model(inputs,targets[:,:-1])
        temp_targets=torch.tensor(np.eye(27)[targets[:,1:]])
        loss = criterion(outputs, temp_targets)
        loss.backward()
        optimizer.step()
        loss_epoch.append(loss.item())
        
    print("Epoch : ",epoch+1,"\t Loss : ",sum(loss_epoch)/len(loss_epoch))

    losses.append(sum(loss_epoch)/len(loss_epoch))


In [None]:
model.eval()


In [None]:
def predict(model,input_str):
    output = ''
    input_str = np.array([ord(c) - ord('a') + 1 for c in input_str])
    src = torch.tensor([input_str])
    tgt = torch.tensor([[0]])
    # print(src,tgt)
    for _ in range(8):
        pred = model(src, tgt)
        pred = pred.argmax(dim=2)
        print(pred)
        output += chr(pred[0, -1] + ord('a') - 1)
        tgt = torch.cat((tgt, pred[:, -1].unsqueeze(0)), dim=1)
    return output

print(predict(model, 'rgwuwrnh'))

In [None]:
# Function to check how many characters match in the two strings
def check(pred: str, true: str):
    correct = 0
    for a, b in zip(pred, true):
        if a == b:
            correct += 1
    return correct

# Function to score the model's performance
def evaluate(model):
    print("Obtaining metrics for eval data:")
    eval_data = pd.read_csv("eval_data.csv").to_numpy()
    results = {
        "pred": [],
        "true": [],
        "score": [],
    }
    correct = [0 for _ in range(9)]
    for x, y in eval_data:
        pred = predict(model, x)
        print(f"Predicted: {pred}, True: {y}")
        score = check(pred, y)
        results["pred"].append(pred)
        results["true"].append(y)
        results["score"].append(score)

        correct[score] += 1
    print("Eval dataset results:")
    for num_chr in range(9):
        print(
            f"Number of predictions with {num_chr} correct predictions: {correct[num_chr]}"
        )
    points = sum(correct[4:6]) * 0.5 + sum(correct[6:])
    marks = round(min(2, points / 1400 * 2) * 2) / 2  # Rounds to the nearest 0.5
    print(f"Points: {points}")
    print(f"Marks: {marks}")
    # Save predicitons and true sentences to inspect manually if required.
    pd.DataFrame.from_dict(results).to_csv("results_eval_2.csv", index=False)


evaluate(model)


In [None]:
src_vocab_size = 11
target_vocab_size = 11
num_layers = 6
seq_length= 12


# let 0 be sos token and 1 be eos token
src = torch.tensor([[0, 2, 5, 6, 4, 3, 9, 5, 2, 9, 10, 1]])
trg = torch.tensor([[0]])

model = Transformer(src_vocab_size,embed_size=64,max_size_input_seq=12,num_of_layers_of_encoder=2,num_head=2,pos_embed_size=64,drop_out=0.3,expansion_factor=4,num_of_layers_of_decoder=2)

In [None]:
src = torch.tensor([[0, 2, 5, 6, 4, 3, 9, 5, 2, 9, 10, 1], 
                    [0, 2, 8, 7, 3, 4, 5, 6, 7, 2, 10, 1]])
print(model.output(src))

In [None]:
encoder_output=model.encoder(src)

output=[0]
out=trg

for i in range(seq_length-len(output)):
    out=torch.tensor(output)
    out=out.unsqueeze(0)
    trg_mask=model.create_mask(out)
    out=model.decoder(out,encoder_output,trg_mask)
    out = out[:,-1,:]
    out = out.argmax(-1)
    out = torch.unsqueeze(out,axis=0)
    output.append(out.item())

print(output)
