In [None]:
import pandas as pd
import numpy as np
import torch
from transformers import AutoTokenizer, AutoModel
import torch
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
from sklearn.metrics import f1_score, accuracy_score, recall_score, precision_score, confusion_matrix, ConfusionMatrixDisplay

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


tokenizer = AutoTokenizer.from_pretrained("intfloat/multilingual-e5-large-instruct")
encoder = AutoModel.from_pretrained("intfloat/multilingual-e5-large-instruct").to(device)

def encode(sentences):
    encoded_input = tokenizer(sentences, padding=True, truncation=True, max_length=512, return_tensors='pt')
    with torch.no_grad():
        model_output = encoder(**encoded_input.to(device))
        embeddings = model_output.pooler_output
        embeddings = torch.nn.functional.normalize(embeddings)
    return embeddings.to("cpu")

dataset = pd.read_csv("dataset.csv")
dataset.describe()

In [None]:
y_text = dataset['Tag']
y_np = np.array(y_text)

unique, counts = np.unique(y_np, return_counts=True)
y_counts = dict(zip(unique, counts))

y_np.shape

In [None]:
# x_text = dataset['Sentense']

# batch_size = 1000
# for i in range(0, len(x_text), batch_size):
#     batch = x_text[i:i+batch_size]
#     X_np = encode(batch.to_list()).numpy()
#     np.save(f'X_emb_batch_{i}.npy', X_np)
    
# X_np0 = np.load("X_emb_batch_0.npy")
# X_np1 = np.load("X_emb_batch_1000.npy")
# X_np2 = np.load("X_emb_batch_2000.npy")
# X_np3 = np.load("X_emb_batch_3000.npy")
# X_np4 = np.load("X_emb_batch_4000.npy")
# X_np5 = np.load("X_emb_batch_5000.npy")
# X_np6 = np.load("X_emb_batch_6000.npy")
# X_np7 = np.load("X_emb_batch_7000.npy")
# X_np8 = np.load("X_emb_batch_8000.npy")
# X_np9 = np.load("X_emb_batch_9000.npy")
# X_np10 = np.load("X_emb_batch_10000.npy")
# X_np11 = np.load("X_emb_batch_11000.npy")
# X_np12 = np.load("X_emb_batch_12000.npy")
# X_np13 = np.load("X_emb_batch_13000.npy")
# X_np14 = np.load("X_emb_batch_14000.npy")
# X_np15 = np.load("X_emb_batch_15000.npy")
# X_np16 = np.load("X_emb_batch_16000.npy")
# X_np17 = np.load("X_emb_batch_17000.npy")
# X_np18 = np.load("X_emb_batch_18000.npy")
# X_np19 = np.load("X_emb_batch_19000.npy")
# X_np20 = np.load("X_emb_batch_20000.npy")
# X_np21 = np.load("X_emb_batch_21000.npy")
# X_np22 = np.load("X_emb_batch_22000.npy")
# X_np23 = np.load("X_emb_batch_23000.npy")
# X_np24 = np.load("X_emb_batch_24000.npy")
# X_np25 = np.load("X_emb_batch_25000.npy")
# X_np26 = np.load("X_emb_batch_26000.npy")
# X_np27 = np.load("X_emb_batch_27000.npy")

# X_np = np.concatenate([
#     X_np0, X_np1, X_np2, X_np3, X_np4, 
#     X_np5, X_np6, X_np7, X_np8, X_np9, 
#     X_np10, X_np11, X_np12, X_np13, 
#     X_np14, X_np15, X_np16, X_np17, 
#     X_np18, X_np19, X_np20, X_np21, 
#     X_np22, X_np23, X_np24, X_np25, 
#     X_np26, X_np27
# ])
# print(X_np.shape)

# np.save('X_np.npy', X_np)
X_np = np.load("X_np.npy")
X_np.shape

In [None]:
X = torch.from_numpy(X_np).type(torch.float)
y = torch.from_numpy(y_np).type(torch.float)


X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.1,random_state=42, stratify=y) 
X_train = X_train.to(device)
X_test = X_test.to(device)
y_train = y_train.to(device)
y_test = y_test.to(device)

In [None]:
classificator = torch.nn.Sequential(
    torch.nn.Linear(in_features=1024, out_features=1024),
    torch.nn.ReLU(),
    torch.nn.Linear(in_features=1024, out_features=1024),
    torch.nn.ReLU(),
    torch.nn.Linear(in_features=1024, out_features=1024),
    torch.nn.ReLU(),
    torch.nn.Linear(in_features=1024, out_features=1024),
    torch.nn.ReLU(),
    torch.nn.Linear(in_features=1024, out_features=1)
)
classificator.to(device)

neg_count = y_counts[0]
pos_count = y_counts[1]
pos_weight = torch.tensor([neg_count / pos_count]).to(device)

loss_fn = torch.nn.BCEWithLogitsLoss(pos_weight=pos_weight)

optimizer = torch.optim.SGD(params=classificator.parameters(), lr=0.02)

def accuracy_fn(y_true, y_pred):
    correct = torch.eq(y_true, y_pred).sum().item() # torch.eq() calculates where two tensors are equal
    acc = (correct / len(y_pred)) * 100 
    return acc

def f1_fn(y_true, y_pred):
    return f1_score(y_true.detach().to('cpu'), y_pred.detach().to('cpu'), average='weighted')

In [None]:
def learn_binaryclass_classification_model(cl_model, n_epochs, print_every_epoch=None, batch_size=128):
    if print_every_epoch is None:
        print_every_epoch = n_epochs / 10
    
    torch.manual_seed(42)
    best_loss = 10000000
    best_acc = 0.0
    best_f1 = 0.0
    
    for epoch in range(n_epochs):
    
        permutation = torch.randperm(X_train.size()[0])
        
        for i in range(0,X_train.size()[0], batch_size):
            ### Обучение
            cl_model.train()
            
            indices = permutation[i:i+batch_size]
            x_batch, y_batch = X_train[indices], y_train[indices]
            
            y_logits = cl_model(x_batch).squeeze()
            y_pred = torch.round(torch.sigmoid(y_logits))
        
            loss = loss_fn(y_logits, y_batch) 

            acc = accuracy_fn(y_batch,y_pred)
            f1 = f1_fn(y_batch, y_pred)

            optimizer.zero_grad()

            loss.backward()

            optimizer.step()

        ### Тестирование
        cl_model.eval()
        with torch.inference_mode():
            test_logits = cl_model(X_test).squeeze() 
            test_pred = torch.round(torch.sigmoid(test_logits))
            
            test_loss = loss_fn(test_logits, y_test)
            test_acc = accuracy_fn(y_test,test_pred)
            test_f1 = f1_fn(y_test, test_pred)

        if test_f1 > best_f1 or (test_f1 >= best_f1 and test_loss < best_loss): 
            best_f1 = test_f1
            best_loss = test_loss
            print("Save best model")
            print(f"Epoch: {epoch} | Loss: {loss:.5f}, Acc: {acc:.2f}%, F1: {f1:.2f} | T.Loss: {test_loss:.5f}, T.Acc: {test_acc:.2f}%, T.F1: {test_f1:.2f}")
            print("-------")
            torch.save(classificator.state_dict(), "best_clf_interest_model.pth")
        
        if epoch % print_every_epoch == 0:
            print(f"Epoch: {epoch} | Loss: {loss:.5f}, Acc: {acc:.2f}%, F1: {f1:.2f} | T.Loss: {test_loss:.5f}, T.Acc: {test_acc:.2f}%, T.F1: {test_f1:.2f}") 
            
learn_binaryclass_classification_model(classificator, 5000, 100, 512)

In [None]:
model = torch.nn.Sequential(
    torch.nn.Linear(in_features=1024, out_features=1024),
    torch.nn.ReLU(),
    torch.nn.Linear(in_features=1024, out_features=1024),
    torch.nn.ReLU(),
    torch.nn.Linear(in_features=1024, out_features=1024),
    torch.nn.ReLU(),
    torch.nn.Linear(in_features=1024, out_features=1024),
    torch.nn.ReLU(),
    torch.nn.Linear(in_features=1024, out_features=1)
)
model.load_state_dict(torch.load("best_clf_interest_model.pth", map_location=device, weights_only=True))

model.to(device)

In [None]:
test_logits = model(X_test).squeeze() 
test_pred = torch.round(torch.sigmoid(test_logits))
y_test_a = y_test.detach().to('cpu')
y_pred_a = test_pred.detach().to('cpu')

print('accuracy:', round(accuracy_score(y_test_a, y_pred_a), 3))
print('precision:', round(precision_score(y_test_a, y_pred_a), 3))
print('recall:', round(recall_score(y_test_a, y_pred_a), 3))
print('f1:', round(f1_score(y_test_a, y_pred_a, average='weighted'), 3))

cm = confusion_matrix(y_test_a, y_pred_a, labels=[1, 0])
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=[1, 0])
disp.plot()
plt.show()