### **Importing Libraries**

In [None]:
import torch
import math
import copy
import torch.nn as nn
import torch.nn.functional as F
import nltk
# nltk.download('punkt')
from sklearn.model_selection import train_test_split
from nltk.tokenize import word_tokenize
from nltk.tokenize import sent_tokenize
from gensim.models import Word2Vec
import numpy as np
import torch.optim as optim
from torch.autograd import Variable


device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

### **Importing and Cleaning Dataset**

In [None]:
with open('./datasets/Auguste_Maquet.txt', 'r', encoding='utf-8') as file:
    corpus = file.read()

corpus = corpus.lower()
clean_text = sent_tokenize(corpus)
print(len(clean_text))

### **Tokenization and Emmbedding**

In [None]:
tokenized_corpus = [word_tokenize(sentence) for sentence in clean_text]
word_to_ind = {}
longest_seq = 1
for i in range(len(tokenized_corpus)):
    token_arr = tokenized_corpus[i]
    longest_seq = max(longest_seq, len(token_arr))
    
    #Vocabulary
    for tokken in token_arr:
        if tokken not in word_to_ind:
            word_to_ind[tokken] = len(word_to_ind)
    
    token_arr = ['<sos>'] * 5 + token_arr + ['<eos>'] * 5
    tokenized_corpus[i] = token_arr

# print(tokenized_corpus[2])
word_to_ind["<sos>"] = len(word_to_ind)
word_to_ind["<eos>"] = len(word_to_ind)
print(len(word_to_ind))


word2vec_model = Word2Vec(sentences=tokenized_corpus, vector_size=200, window=5, min_count=1, workers=4)

### **Test-Train Split**

In [None]:
train_val_data, test_data = train_test_split(tokenized_corpus, test_size=0.2)

train_data, validation_data = train_test_split(train_val_data, test_size=0.125)

# Print the sizes of each set
print(f"Training data size: {len(train_data)}")
print(f"Validation data size: {len(validation_data)}")
print(f"Test data size: {len(test_data)}")


## **Encoder**

### **Positional Encoding**

In [None]:
class PosEncoding(nn.Module):
    def __init__(self, model_dim, max_len):
        super(PosEncoding, self).__init__()
        pos_code = torch.zeros(max_len, model_dim).to(device)
        pos = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1).to(device)
        scale = torch.exp(torch.arange(0, model_dim, 2, dtype=torch.float) * 
                          -(math.log(10000.0) / model_dim)).to(device)
        
        pos_code[:, 0::2] = torch.sin(pos * scale)
        pos_code[:, 1::2] = torch.cos(pos * scale)
        
        # Registering the positional encoding matrix as a buffer to avoid updating during training
        self.register_buffer('pos_code', pos_code.unsqueeze(0))

    def forward(self, inp):
        seq_len = inp.size(1)
        inp = inp.to(device) + self.pos_code[:, :seq_len]
        
        return inp

### **Multi Head Attention**

In [None]:
# class MultiHeadAttention(nn.Module):
#     def __init__(self, model_dim, num_layers):
#         super(MultiHeadAttention, self).__init__()
#         self.model_dim = model_dim
#         self.num_layers = num_layers
#         self.dim_key = self.model_dim // self.num_layers

#         # Linear layers for query, key, and value
#         self.query = nn.Linear(model_dim, model_dim).to(device)
#         self.key = nn.Linear(model_dim, model_dim).to(device)
#         self.value = nn.Linear(model_dim, model_dim).to(device)
#         self.out = nn.Linear(model_dim, model_dim).to(device)

#     def attention_val(self, Q, K, V, mask=None):
#         score = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.dim_key)

#         if mask is not None:
#             score = score.masked_fill(mask == 0, -1e9)

#         attn_weight = torch.softmax(score, dim=-1)

#         new_val = torch.matmul(attn_weight, V)
#         return new_val

#     def split_layers(self, x):
#         batch_size, seq_len, model_dim = x.size()  
#         return x.view(batch_size, seq_len, self.num_layers, self.dim_key).transpose(1, 2)

#     def combine_layers(self, x):
#         batch_size, _, seq_len, dim_key = x.size()
#         return x.transpose(1, 2).contiguous().view(batch_size, seq_len, self.model_dim)

#     def forward(self, Q, K, V, mask=None):
#         # Split into multiple heads
#         Q = self.split_layers(self.query(Q).to(device))
#         K = self.split_layers(self.key(K).to(device))
#         V = self.split_layers(self.value(V).to(device))

#         layer_out = self.attention_val(Q, K, V, mask)
#         final_output = self.out(self.combine_layers(layer_out)).to(device)
        
#         return final_output


### **FeedForward**

In [None]:
# class FeedForward(nn.Module):
#     def __init__(self, model_dim, hid_dim):
#         super(FeedForward, self).__init__()
#         self.l1 = nn.Linear(model_dim, hid_dim).to(device)
#         self.ac1 = nn.ReLU().to(device)
#         self.l2 = nn.Linear(hid_dim, model_dim).to(device)

#     def forward(self, inp):
#         inp = self.l1(inp).to(device)
#         inp = self.ac1(inp).to(device)
#         inp = self.l2(inp).to(device)
#         return inp

### **Combining Encoder**

In [None]:
# class Encoder(nn.Module):
#     def __init__(self, model_dim, num_layers, hid_dim, dropout):
#         super(Encoder, self).__init__()
#         self.self_attn = MultiHeadAttention(model_dim, num_layers).to(device)
#         self.norm1 = nn.LayerNorm(model_dim).to(device)
#         self.ffn = FeedForward(model_dim, hid_dim).to(device)
#         self.norm2 = nn.LayerNorm(model_dim).to(device)
#         self.dropout = nn.Dropout(dropout).to(device)

#     def forward(self, inp, mask):

#         att_score = self.self_attn(inp, inp, inp, mask).to(device)
#         inp = self.norm1(inp + self.dropout(att_score).to(device)).to(device)
#         ffn_out = self.ffn(inp).to(device)
#         inp = self.norm2(inp + self.dropout(ffn_out).to(device)).to(device)
        
#         return inp

## **Decoder**

### **Combining Decoder**

In [None]:
# class Decoder(nn.Module):
#     def __init__(self, model_dim, num_layers, hid_dim, dropout):
#         super(Decoder, self).__init__()
#         self.self_attn = MultiHeadAttention(model_dim, num_layers).to(device)
#         self.norm1 = nn.LayerNorm(model_dim).to(device)
#         self.cross_attn = MultiHeadAttention(model_dim, num_layers).to(device)
#         self.norm2 = nn.LayerNorm(model_dim).to(device)
#         self.ffn = FeedForward(model_dim, hid_dim).to(device)
#         self.norm3 = nn.LayerNorm(model_dim).to(device)
#         self.dropout = nn.Dropout(dropout).to(device)

#     def forward(self, inp, encoder_out, source_mask, target_mask):

#         att_score = self.self_attn(inp, inp, inp, target_mask).to(device)
#         inp = self.norm1(inp + self.dropout(att_score).to(device)).to(device)
#         att_score_cross = self.cross_attn(inp, encoder_out, encoder_out, source_mask).to(device)
#         inp = self.norm2(inp + self.dropout(att_score_cross).to(device)).to(device)
#         ffn_out = self.ffn(inp).to(device)
#         inp = self.norm3(inp + self.dropout(ffn_out).to(device)).to(device)

#         return inp

## **Transformer**

In [None]:
class Transformer(nn.Module):
    def __init__(self, vocab_size, model_dim, num_layers, num_times, hid_dim, max_len, dropout, pretrained_embeddings):
        super(Transformer, self).__init__()
        self.encoderEmb = nn.Embedding.from_pretrained(torch.tensor(pretrained_embeddings), freeze=True).to(device)
        self.pos_enc = PosEncoding(model_dim, max_len).to(device)
        
        encoder_layer = nn.TransformerEncoderLayer(d_model=model_dim, nhead=num_layers, dim_feedforward=hid_dim, dropout=dropout)
        self.enclayer = nn.TransformerEncoder(encoder_layer, num_layers=num_times).to(device)

        self.decoderEmb = nn.Embedding.from_pretrained(torch.tensor(pretrained_embeddings), freeze=True).to(device)
        decoder_layer = nn.TransformerDecoderLayer(d_model=model_dim, nhead=num_layers, dim_feedforward=hid_dim, dropout=dropout)
        self.declayer = nn.TransformerDecoder(decoder_layer, num_layers=num_times).to(device)

        self.final_layer = nn.Linear(model_dim, vocab_size).to(device)
        self.dropout = nn.Dropout(dropout).to(device)

    def masking(self, src, target):
        source_mask = (src != 0).unsqueeze(1).unsqueeze(2).to(device)
        target_mask = (target != 0).unsqueeze(1).unsqueeze(3).to(device)
        length = target.size(1)
        
        no_mask = (1 - torch.triu(torch.ones(1, length, length), diagonal=1)).bool().to(device)
        target_mask = target_mask & no_mask
        return source_mask, target_mask

    def forward(self, source, target):
        source_mask, target_mask = self.masking(source, target)

        source_emb = self.pos_enc(self.encoderEmb(source).to(device))
        source_emb = self.dropout(source_emb).to(device)

        target_emb = self.pos_enc(self.decoderEmb(target).to(device))
        target_emb = self.dropout(target_emb).to(device)

        enc_out = self.enclayer(source_emb, src_key_padding_mask=source_mask.squeeze(1).squeeze(1)).to(device)

        dec_out = self.declayer(target_emb, enc_out, tgt_mask=target_mask, memory_mask=source_mask).to(device)

        final_out = self.final_layer(dec_out).to(device)
        
        return final_out

### **Creating Datasets**

In [None]:
class LM_Dataset(torch.utils.data.Dataset):
    def __init__(self, sentences, targets, max_len=None):
        self.sentences = sentences
        self.targets = targets
        self.max_len = max_len

    def __len__(self):
        return len(self.sentences)

    def __getitem__(self, idx):
        sentence = self.sentences[idx]
        target = self.targets[idx]

        # Padding or truncation to max_len if specified
        if self.max_len is not None:
            sentence = sentence[:self.max_len] + [0] * max(0, self.max_len - len(sentence))
            target = target[:self.max_len] + [0] * max(0, self.max_len - len(target))

        sentence_tensor = torch.tensor(sentence, dtype=torch.long)
        target_tensor = torch.tensor(target, dtype=torch.long)

        return sentence_tensor, target_tensor

### **Creating Input**

In [None]:
def process_sentences(sentences, word_to_index, max_len=None):
    def words_to_indices(words, word_to_index):
        return [word_to_index.get(word, 0) for word in words]
    
    context_indices = []
    central_word_indices = []

    for sentence in sentences:
        word_indices = words_to_indices(sentence, word_to_index)
        
        if max_len is not None:
            word_indices = word_indices[:max_len] + [0] * (max_len - len(word_indices))

        context_indices.append(word_indices[:-1])
        
        central_word_indices.append(word_indices[1:])

    return context_indices, central_word_indices

train_gram_inp, train_cen_inp = process_sentences(train_data, word_to_ind, max_len=20)
val_gram_inp, val_cen_inp = process_sentences(validation_data, word_to_ind, max_len=20)
test_gram_inp, test_cen_inp = process_sentences(test_data, word_to_ind, max_len=20)



### **Train Model**

In [None]:
dataset_train = LM_Dataset(train_gram_inp, train_cen_inp, max_len=longest_seq)  
dataloader_train = torch.utils.data.DataLoader(dataset_train, batch_size=128, shuffle=True)

dataset_val = LM_Dataset(val_gram_inp, val_cen_inp, max_len=longest_seq)
dataloader_val = torch.utils.data.DataLoader(dataset_val, batch_size=128)

pretrained_embeddings = word2vec_model.wv.vectors

model = Transformer(vocab_size=len(word_to_ind), model_dim=200, num_layers=4, num_times=6, hid_dim=300, max_len=longest_seq, dropout=0.1, pretrained_embeddings=pretrained_embeddings)
model.to(device)

num_epochs = 10
learning_rate = 0.001
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

for epoch in range(num_epochs):
    model.train()
    total_loss = 0
    
    for batch in dataloader_train:
        context_words, target_words = batch
        context_words = context_words.to(device)
        target_words = target_words.to(device)

        outputs = model(context_words, target_words) 
        
        outputs = outputs.view(-1, outputs.size(-1))  
        target_words = target_words.view(-1)
        loss = criterion(outputs, target_words)
        
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()

    avg_train_loss = total_loss / len(dataloader_train)

    # Validation loop
    model.eval()
    total_val_loss = 0
    correct = 0
    total = 0
    
    with torch.no_grad():
        for batch in dataloader_val:
            context_words, target_words = batch
            context_words = context_words.to(device)
            target_words = target_words.to(device)
            
            outputs = model(context_words, target_words)
            
            outputs = outputs.view(-1, outputs.size(-1))
            target_words = target_words.view(-1)
            loss = criterion(outputs, target_words)
            total_val_loss += loss.item()
            
            _, predicted = torch.max(outputs, 1)
            total += target_words.size(0)
            correct += (predicted == target_words).sum().item()
    
    avg_val_loss = total_val_loss / len(dataloader_val)
    accuracy = 100 * correct / total
    
    print(f'Epoch [{epoch+1}/{num_epochs}], Train Loss: {avg_train_loss:.4f}, Val Loss: {avg_val_loss:.4f}, Val Accuracy: {accuracy:.2f}%')

### **Evaluate Model**

In [None]:
# dataset_test = LM_Dataset(test_gram_inp, test_cen_inp)
# dataloader_test = torch.utils.data.DataLoader(dataset_test, batch_size=128)

# model.eval()
# correct = 0
# total = 0
# total_loss = 0
# total_tokens = 0
# criteria = nn.CrossEntropyLoss()


# with torch.no_grad():
#     for batch in dataloader_test:
#         context_words, target_words = batch
#         context_words = context_words.to(device)
#         target_words = target_words.to(device)

#         outputs = model(context_words)  
#         outputs = outputs.view(-1, outputs.size(-1))
#         target_words = target_words.view(-1)
        
#         loss = criterion(outputs, target_words)
#         total_loss += loss.item()
#         # total_tokens += target_words.numel()
        
#         _, predicted = torch.max(outputs, 1)
#         total += target_words.size(0)
#         correct += (predicted == target_words).sum().item()

# accuracy = 100 * correct / total
# print(f'Test Accuracy: {accuracy:.2f}%')
# print(math.exp(total_loss/len(dataloader_test)))
