In [None]:
%pip install -q hyperopt matplotlib nltk numpy pandas python-dotenv scikit-learn sentence-transformers tqdm xgboost

In [None]:
import os
import pickle
import random
from typing import Any, Dict, Optional, Union

import joblib
import numpy as np
import pandas as pd
from dotenv import load_dotenv
from hyperopt import STATUS_OK, hp, Trials, fmin, tpe
from matplotlib import pyplot as plt
from numpy import average, ndarray
from pandas import read_csv, DataFrame
from sklearn import clone
from sklearn.base import ClassifierMixin
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import (
    auc,
    balanced_accuracy_score,
    classification_report,
    f1_score,
    precision_score,
    recall_score,
    roc_curve,
)
from sklearn.model_selection import cross_val_score, train_test_split
from sklearn.naive_bayes import GaussianNB
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from tqdm.contrib.itertools import product
from xgboost import XGBClassifier

In [None]:
load_dotenv()
os.chdir(os.getenv("ROOT"))

CV = int(os.getenv("CV"))
RETRAIN = True
OVERWRITE = False
RANDOM_STATE = int(os.getenv("RANDOM_STATE"))
TEST_SIZE = float(os.getenv("TEST_SIZE"))

np.random.seed(RANDOM_STATE)
random.seed(RANDOM_STATE)

In [None]:
from notebooks.utils import preprocess_texts, replace_text_components

In [None]:
CLASSIFIERS = {
    "lr": LogisticRegression(solver="liblinear", max_iter=1000, random_state=RANDOM_STATE),
    "gnb": GaussianNB(),
    "rf": RandomForestClassifier(random_state=RANDOM_STATE),
    "xgb": XGBClassifier(random_state=RANDOM_STATE)
}

EMBEDDING_SOURCES = [
    {'name': 'GloVe.6B.50D', 'model-path': os.getenv("GLOVE_6B_50D_PATH"), 'embedding-dim': 50},
    {'name': 'GloVe.6B.100D', 'model-path': os.getenv("GLOVE_6B_100D_PATH"), 'embedding-dim': 100},
    {'name': 'GloVe.6B.200D', 'model-path': os.getenv("GLOVE_6B_200D_PATH"), 'embedding-dim': 200},
    {'name': 'GloVe.6B.300D', 'model-path': os.getenv("GLOVE_6B_300D_PATH"), 'embedding-dim': 300},
    {'name': 'DistilRoBERTa', 'model-path': os.getenv("DISTILROBERTA_PATH"), 'embedding-dim': None},
    {'name': 'SBERT', 'model-path': os.getenv("SBERT_PATH"), 'embedding-dim': None},
    {"name": "ATT&CK-BERT", "model-path": os.getenv("ATTACK_BERT_PATH"), "embedding-dim": None},
]

In [None]:
def train_classifier(
        x: ndarray, y: ndarray, cv: int = 1, random_state: Optional[int] = None, test_size: float = 0.2,
        base_estimator: Optional[ClassifierMixin] = None
) -> Dict[str, Union[float, Any]]:
    classifier_dict = {}
    
    pipeline = Pipeline([
        ('scaler', StandardScaler()),
        ('estimator', clone(base_estimator))
    ])

    x_train, x_test, y_train, y_test = train_test_split(
        x, y,
        test_size=test_size,
        random_state=random_state,
        shuffle=True,
        stratify=y
    )

    classifier_dict['cv-accuracy'] = average(a=cross_val_score(
        estimator=pipeline,
        X=x,
        y=y,
        scoring='balanced_accuracy',
        cv=cv
    ))

    pipeline.fit(X=x_train, y=y_train)
    y_predicted_test = pipeline.predict(X=x_test)
    y_predicted_train = pipeline.predict(X=x_train)
    fpr_train, tpr_train, _ = roc_curve(y_true=y_train, y_score=pipeline.predict_proba(x_train)[:, 1])
    fpr_test, tpr_test, _ = roc_curve(y_true=y_test, y_score=pipeline.predict_proba(x_test)[:, 1])

    classifier_dict['model'] = pipeline

    classifier_dict['train'] = {
        'accuracy': balanced_accuracy_score(y_true=y_train, y_pred=y_predicted_train),
        'precision': precision_score(y_true=y_train, y_pred=y_predicted_train),
        'recall': recall_score(y_true=y_train, y_pred=y_predicted_train),
        'f1': f1_score(y_true=y_train, y_pred=y_predicted_train),
        'fpr': fpr_train,
        'tpr': tpr_train,
        'auc': auc(x=fpr_train, y=tpr_train)
    }

    classifier_dict['test'] = {
        'accuracy': balanced_accuracy_score(y_true=y_test, y_pred=y_predicted_test),
        'precision': precision_score(y_true=y_test, y_pred=y_predicted_test),
        'recall': recall_score(y_true=y_test, y_pred=y_predicted_test),
        'f1': f1_score(y_true=y_test, y_pred=y_predicted_test),
        'fpr': fpr_test,
        'tpr': tpr_test,
        'auc': auc(x=fpr_test, y=tpr_test)
    }

    return classifier_dict

In [None]:
with open(os.getenv("FILTER_TRAIN_CSV"), 'rb') as f:
    tweets = pd.read_csv(f)

texts = tweets['text']
texts = pd.Series([replace_text_components(t) for t in texts])
target = tweets['relevant']

models_dict = {}

for (embedding_info, classifier_name) in product(EMBEDDING_SOURCES, CLASSIFIERS, desc="Training models", unit="model"):
    x = preprocess_texts(
        list_str=texts,
        model_path=embedding_info['model-path'],
        embedding_dim=embedding_info['embedding-dim']
    )

    model_name = f"{classifier_name.upper()} {embedding_info['name']}"

    model_dict = train_classifier(
        x=x,
        y=target,
        cv=CV,
        random_state=RANDOM_STATE,
        test_size=TEST_SIZE,
        base_estimator=CLASSIFIERS[classifier_name]
    )

    model_dict['name'] = model_name

    try:
        if model_dict['cv-accuracy'] > models_dict[model_name]['cv-accuracy']:
            models_dict[model_name] = model_dict
    except KeyError:
        models_dict[model_name] = model_dict

    print(
        f"· {classifier_name.upper()} {embedding_info['name']} - CV Accuracy:\t{models_dict[model_name]['cv-accuracy'] * 100:.2f}%"
    )

In [None]:
plt.figure(figsize=(18, 12))

for name, model_dict in models_dict.items():
    plt.plot(
        model_dict['test']['fpr'], model_dict['test']['tpr'],
        label=f"{name}: {model_dict['test']['auc']:.2f}"
    )

plt.title('ROC Curve')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.legend()
plt.grid(True)
plt.show()

In [None]:
optimal_model = models_dict[list(models_dict.keys())[np.argmax([v["cv-accuracy"] for v in models_dict.values()])]]

optimal_model

In [None]:
with open(os.getenv("FILTER_MODELS"), 'wb') as f:
    pickle.dump(models_dict, f)

In [None]:
with open(os.getenv("FILTER_MODELS"), 'rb') as f:
    models_dict = pickle.load(f)

In [None]:
performances = DataFrame()

for i, name in zip(DataFrame(data=models_dict.values(), index=models_dict.keys())['train'], models_dict.keys()):
    performances[name] = DataFrame(data=i.values(), index=[c.upper() for c in i.keys()])

performances = performances.T.drop(labels=['FPR', 'TPR'], axis=1)
performances

In [None]:
performances = DataFrame()

for i, name in zip(DataFrame(data=models_dict.values(), index=models_dict.keys())['test'], models_dict.keys()):
    performances[name] = DataFrame(data=i.values(), index=[c.upper() for c in i.keys()])

performances = performances.T.drop(labels=['FPR', 'TPR'], axis=1)
performances["CV Accuracy"] = pd.Series({k: v["cv-accuracy"] for k, v in models_dict.items()})
performances

In [None]:
# Round all values to 4 decimal places
performances = performances * 100

for idx, row in performances.iterrows():
    for col in performances.columns:
        performances.at[idx, col] = round(row[col], 2)

performances.to_csv(os.getenv("FILTERS_PERFORMANCES_CSV"), header=True, sep='&')

In [None]:
with open(os.getenv("FILTER_TEST_CSV"), 'rb') as f:
    test_tweets = pd.read_csv(f)

texts = test_tweets['text']
texts = pd.Series([replace_text_components(t) for t in texts])
target = test_tweets['relevant']

In [None]:
x_train, x_test, y_train, y_test = train_test_split(
    preprocess_texts(
        list_str=texts,
        model_path='sentence-transformers/all-mpnet-base-v2',
        embedding_dim=None,
    ), target,
    test_size=TEST_SIZE,
    random_state=RANDOM_STATE,
    shuffle=True,
    stratify=target
)

In [None]:
def objective(params):
    """
    Objective function for hyperopt to minimize the loss of a xgb model with given parameters over a dataset with a 5-fold cross-validation.

    Parameters
    ----------
    params : dict
        Dictionary containing the parameters for the xgboost model.

    Returns
    -------
    dict
        Dictionary containing the loss and status of the objective function.
    """
    clf = XGBClassifier(
        n_estimators=int(params['n_estimators']),
        max_depth=int(params['max_depth']),
        learning_rate=params['learning_rate'],
        gamma=params['gamma'],
        min_child_weight=params['min_child_weight'],
        subsample=params['subsample'],
        colsample_bytree=params['colsample_bytree'],
        objective='binary:logistic',
        n_jobs=-1,
        random_state=RANDOM_STATE
    )
    score = cross_val_score(
        estimator=clf,
        X=x_train,
        y=y_train,
        scoring='balanced_accuracy',
        cv=2
    ).mean()
    return {
        'loss': -score,
        'status': STATUS_OK
    }


space = {
    'n_estimators': hp.quniform('n_estimators', 100, 1000, 1),
    'max_depth': hp.choice('max_depth', [0, 10, 20, 30, 40, 50]),
    'learning_rate': hp.uniform('learning_rate', 0.01, 0.3),
    'gamma': hp.uniform('gamma', 0.0, 0.5),
    'min_child_weight': hp.quniform('min_child_weight', 1, 10, 1),
    'subsample': hp.uniform('subsample', 0.5, 1.0),
    'colsample_bytree': hp.uniform('colsample_bytree', 0.5, 1.0)
}

trials = Trials()
best = fmin(
    fn=objective,
    space=space,
    algo=tpe.suggest,
    max_evals=100,
    trials=trials,
    rstate=np.random.default_rng(seed=RANDOM_STATE)
)

print("Best parameters:")
print(best)

In [None]:
best_params = {
    'n_estimators': int(best['n_estimators']),
    'max_depth': None if best['max_depth'] == 0 else [None, 10, 20, 30, 40, 50][best['max_depth']],
    'min_samples_split': int(best['min_samples_split']),
    'min_samples_leaf': int(best['min_samples_leaf']),
    'max_features': ['sqrt', 'log2', None][best['max_features']],
    'bootstrap': [True, False][best['bootstrap']]
}

final_clf = RandomForestClassifier(**best_params, n_jobs=-1, random_state=RANDOM_STATE)
final_clf.fit(x_train, y_train)

test_accuracy = final_clf.score(x_test, y_test)
print(f"\nTest Accuracy: {test_accuracy * 100:.4f}%")

# Performance over Cotov's Dataset


In [None]:
cotov = read_csv(filepath_or_buffer=os.getenv("COTOV_CSV"))
cotov = cotov[cotov['lang'] == 'en']
model_name = "RF SBERT"

cotov[f'{model_name}'] = optimal_model["model"].predict(X=preprocess_texts(
    list_str=cotov[cotov['lang'] == 'en']['full_text'],
    model_path=os.getenv("SBERT_PATH"),
    embedding_dim=None,
))

In [None]:
rf_accuracy = balanced_accuracy_score(
    y_true=cotov['Related'],
    y_pred=cotov[f'{model_name}']
)
print(f"Accuracy of prediction over Cotov's dataset is:\t{rf_accuracy * 100:.2f}")
print(classification_report(y_true=cotov['Related'], y_pred=cotov[f'{model_name}']))

In [None]:
if OVERWRITE:
    joblib.dump(final_clf, os.getenv("OPTIMAL_FILTER_PICKLE"), compress=9)

optimal_filter = joblib.load(os.getenv("OPTIMAL_FILTER_PICKLE"))
optimal_filter

In [None]:
optimal_filter.predict(X=preprocess_texts(
    list_str=pd.Series(['I\'m studying computer security', 'That\'s a teardrop']),
    model_path='sentence-transformers/all-mpnet-base-v2',
    embedding_dim=None,
))