In [40]:
from sklearn.base import BaseEstimator, ClassifierMixin
from sklearn.neighbors import KNeighborsClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import confusion_matrix, roc_curve, roc_auc_score
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.decomposition import PCA
import numpy as np
from sklearn.model_selection import HalvingGridSearchCV
from sklearn.metrics import f1_score, make_scorer
from sklearn.metrics import classification_report
import pandas as pd
from sentence_transformers import SentenceTransformer
import numpy as np
import os
import pandas as pd

In [41]:
class KNNBERTClassifier(BaseEstimator, ClassifierMixin):
    """
    KNN-based hate speech classifier using DistilBERT from SentenceTransformer.

    Args:
        threshold (float): Classification threshold.
        n_neighbors (int): Number of neighbors for KNN.
        scale (bool): Whether to scale the embeddings.
        model_name (str): Name of SentenceTransformer model.
    """
    def __init__(self, threshold=0.25, n_neighbors=5, scale=True, model_name='distilbert-base-nli-mean-tokens'):
        self.threshold = threshold
        self.n_neighbors = n_neighbors
        self.scale = scale
        self.model_name = model_name

        self.embedder = SentenceTransformer(model_name)
        self.scaler = StandardScaler()
        self.knn = KNeighborsClassifier(n_neighbors=n_neighbors)

    def fit(self, X, y):
        X_emb = self.get_embeddings(X, scale=self.scale)
        self.knn.fit(X_emb, y)
        return self

    def predict_proba(self, X):
        X_emb = self.get_embeddings(X, scale=self.scale)
        return self.knn.predict_proba(X_emb)

    def predict(self, X):
        probs = self.predict_proba(X)[:, 1]
        return (probs >= self.threshold).astype(int)

    def score(self, X, y):
        return np.mean(self.predict(X) == np.array(y))

    def get_embeddings(self, texts, scale=True, batch_size=32):
        if isinstance(texts, np.ndarray):
            texts = texts.tolist()
        embeddings = self.embedder.encode(texts, batch_size=batch_size, show_progress_bar=False)
        return self.scaler.fit_transform(embeddings) if scale else embeddings

    def plot_confusion_matrix(self, y_true, y_pred):
        cm = confusion_matrix(y_true, y_pred)
        sns.heatmap(cm, annot=True, fmt="d", cmap="Blues",
                    xticklabels=["Not Hate", "Hate"],
                    yticklabels=["Not Hate", "Hate"])
        plt.xlabel("Predicted")
        plt.ylabel("Actual")
        plt.title("Confusion Matrix")
        plt.tight_layout()
        plt.show()

    def plot_roc_curve(self, X, y_true):
        probs = self.predict_proba(X)[:, 1]
        fpr, tpr, _ = roc_curve(y_true, probs)
        auc = roc_auc_score(y_true, probs)
        plt.figure(figsize=(8, 6))
        plt.plot(fpr, tpr, label=f"ROC Curve (AUC = {auc:.2f})", linewidth=2)
        plt.plot([0, 1], [0, 1], 'k--', label="Random Classifier")
        plt.xlabel("False Positive Rate")
        plt.ylabel("True Positive Rate")
        plt.title("ROC Curve - KNN BERT Classifier")
        plt.legend(loc="lower right")
        plt.grid(True)
        plt.tight_layout()
        plt.show()


In [44]:
base_dir =  os.getcwd()
test_path = os.path.join(base_dir, "../data/test_data_clean.csv")
train_path = os.path.join(base_dir, "../data/train_data.csv")

train_df = pd.read_csv(train_path)
test_df = pd.read_csv(test_path)

train_texts = train_df["text"].tolist()
train_labels = train_df["label"].tolist()

test_texts = test_df["comment"].tolist()
test_labels = test_df["isHate"].tolist()
train_texts = [str(t) if pd.notnull(t) else "" for t in train_texts]
test_texts = [str(t) if pd.notnull(t) else "" for t in test_texts]


In [45]:
model = KNNBERTClassifier()
X_train = model.get_embeddings(train_texts, scale=True)
X_test = model.get_embeddings(test_texts, scale=True)

# Dimensionality reduction
pca = PCA(n_components=50)
X_train_reduced = pca.fit_transform(X_train)
X_test_reduced = pca.transform(X_test)

param_grid = {
    'n_neighbors': list(range(2, 7))
}

custom_scorer = make_scorer(f1_score)

grid = HalvingGridSearchCV(
    estimator=KNNBERTClassifier(scale=False),  
    param_grid=param_grid,
    scoring=custom_scorer,
    cv=2,
    factor=2,
    n_jobs=-1,
    verbose=1
)

grid.fit(X_train_reduced, train_labels)

print("Best n_neighbors:", grid.best_params_['n_neighbors'])
best_model = grid.best_estimator_

threshold = 0.25
test_labels_bin = (np.array(test_labels) >= threshold).astype(int)
probs = best_model.predict_proba(X_test_reduced)[:, 1]
custom_preds = (probs >= threshold).astype(int)

print(classification_report(test_labels_bin, custom_preds))



KeyboardInterrupt: 

In [None]:
best_model.plot_confusion_matrix(test_labels_bin, custom_preds)

In [None]:
best_model.plot_roc_curve(X_test_reduced, test_labels_bin)