In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import random
import numpy as np 

import json
import re
import pandas as pd
from stemmer_lib.stemmer import Stemmer
from sklearn.model_selection import train_test_split
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.metrics import accuracy_score, recall_score, f1_score, precision_score
from tqdm import tqdm

In [2]:
transformer_encoder = nn.TransformerEncoder(
    nn.TransformerEncoderLayer(
        d_model=10,
        nhead=10
    ),
    num_layers=1
)



In [3]:
transformer_encoder

TransformerEncoder(
  (layers): ModuleList(
    (0): TransformerEncoderLayer(
      (self_attn): MultiheadAttention(
        (out_proj): NonDynamicallyQuantizableLinear(in_features=10, out_features=10, bias=True)
      )
      (linear1): Linear(in_features=10, out_features=2048, bias=True)
      (dropout): Dropout(p=0.1, inplace=False)
      (linear2): Linear(in_features=2048, out_features=10, bias=True)
      (norm1): LayerNorm((10,), eps=1e-05, elementwise_affine=True)
      (norm2): LayerNorm((10,), eps=1e-05, elementwise_affine=True)
      (dropout1): Dropout(p=0.1, inplace=False)
      (dropout2): Dropout(p=0.1, inplace=False)
    )
  )
)

In [None]:
def set_seed(seed):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)

set_seed(12345)

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"

In [None]:
with open("tokenizer.json", "r", encoding="utf-8") as file:
    tokenizer_dict = json.load(file)
    file.close()

In [None]:
tokenizer_dict = {v:k for k, v in enumerate(tokenizer_dict) if len(v) >= 3}
tokenizer_dict = {v:k for k, v in enumerate(tokenizer_dict)}

In [None]:
vocab_size = len(tokenizer_dict)

In [None]:
stemmer = Stemmer()

In [None]:
def tokenizer(string:str) -> list[int]:
    string = re.sub(r'\d', '', string)
    string = re.sub(r'[^\w\s]', '', string)
    string = re.sub(r'\s+', ' ', string)
    stems = stemmer.stem_words(string.lower().split())
    func = lambda x: tokenizer_dict[x] if x in tokenizer_dict else tokenizer_dict.get("unknown")
    stems = torch.tensor(list(map(func, stems)))
    return stems

In [None]:
data = pd.read_csv("data.csv").drop("Unnamed: 0", axis=1)

In [None]:
data["query"] = data["query"].apply(lambda x: tokenizer(str(x)))
data["context"] = data["context"].apply(lambda x: tokenizer(str(x)))
data["probability"] = data["probability"].apply(lambda x: -1 if x == 0 else x)

In [None]:
features = data.drop("probability", axis=1)
target = data["probability"]

In [None]:
features_train, features_test, target_train, target_test = train_test_split(features, target, random_state=12345, test_size=0.25)

In [None]:
class Transformer(nn.Module):
    def __init__(self, vocab_size, output_dim, nhead, num_encoder_layers, hidden_dim, tokens_max_length=512, dropout=0.1):
        super(Transformer, self).__init__()
        
        self.embedding = nn.Embedding(vocab_size, hidden_dim)
        self.position = nn.Embedding(tokens_max_length, hidden_dim)
        self.transformer = nn.TransformerEncoder(nn.TransformerEncoderLayer(d_model=hidden_dim, nhead=nhead, batch_first=True,
                                                                            dropout=dropout), num_layers=num_encoder_layers)
        self.fc = nn.Linear(hidden_dim, output_dim)
        
    def forward(self, src):
        src = self.embedding(src)
        t, k = src.size()
        positions = torch.arange(t).to(device)
        positions = self.position(positions)[:, :].expand(t, k)
        src = src + positions
        output = self.transformer(src)
        output = output.mean(dim=0)  
        output = self.fc(output)
        return output

In [None]:
output_dim = 512   
nhead = 8
num_encoder_layers = 1
hidden_dim = 1024

In [None]:
model = Transformer(vocab_size, output_dim, nhead, num_encoder_layers, hidden_dim).to(device)
optimazer = optim.Adam(model.parameters(), lr=0.001)
citeration = nn.CosineEmbeddingLoss().to(device)

In [None]:
num_epochs = 10

for _ in range(num_epochs):
    model.train()
    total_loss = 0
    for query, context, label in tqdm(zip(features_train["query"], features_train["context"], target_train)):
        optimazer.zero_grad()
        
        try:
            query = query.to(device)[:512]
        except:
            query = query.to(device)

        try:
            context = context.to(device)[:512]
        except:
            context = context.to(device)

        label = torch.tensor(label).to(device)

        query = model(query)
        context = model(context)

        loss = citeration(query, context, label)
        loss.backward()
        optimazer.step()
        total_loss += loss.item()
        
    average_loss = total_loss / len(features_train)
    print(f'Epoch [{_+1}/{num_epochs}], Loss: {average_loss:.4f}')
model.eval()

In [None]:
predictions = []
target_test = list(map(lambda x: 0 if x == -1 else x, target_test))

In [None]:
for query, context in tqdm(zip(features_test["query"], features_test["context"])):
    try:
        query = query.to(device)[:512]
    except:
        query = query.to(device)

    try:
        context = context.to(device)[:512]
    except:
        context = context.to(device)

    query = model(query).to("cpu").detach().numpy().reshape(1, -1)
    context = model(context).to("cpu").detach().numpy().reshape(1, -1)
    predictions.append(round(cosine_similarity(query, context)[0, 0]))

In [None]:
predictions = list(map(lambda x: 0 if x == -1 else x, predictions))

In [None]:
accuracy_score = accuracy_score(target_test, predictions)
recall = recall_score(target_test, predictions)
f1 = f1_score(target_test, predictions)
precision = precision_score(target_test, predictions)

In [None]:
print(f"""Accuracy: {accuracy_score:.4f}
Precision: {precision:.4f}
Recall: {recall:.4f}
F1: {f1:.4f}""")