In [None]:
# Imports
from dotenv import load_dotenv
from json import dump
from langchain_huggingface import HuggingFaceEmbeddings
from pathlib import Path
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report, roc_auc_score
from time import time
import joblib
import matplotlib.pyplot as plt
import os
import polars as pl

In [None]:
# Load environment variables
load_dotenv()

# Get the directory of the current file
__dir__ = Path(os.path.abspath(""))
"""
The directory of the current file
"""

# Load environment variables
DATASET_NAME = os.environ["DATASET_NAME"]
"""
Dataset name
"""

EMBEDDING_MODEL_NAME = os.environ["EMBEDDING_MODEL_NAME"]
"""
Embedding model name
"""

# Create the output directory
OUTPUT_DIRECTORY = __dir__ / f"../data/notebooks/classifier-embedding/{DATASET_NAME.replace("/", "-")}/{EMBEDDING_MODEL_NAME.replace("/", "-")}"
OUTPUT_DIRECTORY.mkdir(parents=True, exist_ok=True)

In [None]:
# Load the embedding model
embedding_model = HuggingFaceEmbeddings(
    model_name=EMBEDDING_MODEL_NAME,
    show_progress=True,
)

In [None]:
# Load the prepared datasets
train_df = pl.read_parquet(__dir__ / f"../data/notebooks/prepare-datasets/{DATASET_NAME.replace("/", "-")}/train.parquet")
test_df = pl.read_parquet(__dir__ / f"../data/notebooks/prepare-datasets/{DATASET_NAME.replace("/", "-")}/test.parquet")

In [None]:
# Embed the datasets
train_df = train_df.with_columns(
    pl.Series(
        "embeddings",
        embedding_model.embed_documents(texts=train_df["text"].to_list())
    )
)

start = time()
test_df = test_df.with_columns(
    pl.Series(
        "embeddings",
        embedding_model.embed_documents(texts=test_df["text"].to_list())
    )
)
end = time()

elapsed_test_embed = end - start

In [None]:
# Save the embedded datasets
train_df.write_parquet(OUTPUT_DIRECTORY / "train-embedded.parquet")
test_df.write_parquet(OUTPUT_DIRECTORY / "test-embedded.parquet")

In [None]:
# Train the classifier
# See https://github.com/AhsanAyub/malicious-prompt-detection/blob/main/binary_classification.py
base_classifier = RandomForestClassifier(criterion="gini", n_estimators=200, random_state=0, verbose=2)
base_classifier.fit(
    train_df["embeddings"].to_list(),
    train_df["label"].to_list()
)

In [None]:
# Save the classifier
joblib.dump(base_classifier, OUTPUT_DIRECTORY / "random-forest.joblib")

In [None]:
# Benchmark the classifier
start = time()
raw_y_probabilities = base_classifier.predict_proba(test_df["embeddings"].to_list())
end = time()

elapsed_test_classify = end - start

In [None]:
# Convert per-class Y predictions to benign prediction probabilities
y_probabilities = [
  prediction[1].item()
  for prediction in raw_y_probabilities
]

In [None]:
y_probabilities

In [None]:
# Convert Y probabilities to labels
y_predictions = [
    0 if probability < 0.5 else 1
    for probability in y_probabilities
]

In [None]:
# Get the Y actual labels
y_actual = test_df["label"].to_list()

In [None]:
# Save the results
with open(OUTPUT_DIRECTORY / "results.json", "w", encoding="utf-8") as results_file:
  dump({
      "auc": roc_auc_score(y_actual, y_predictions),
      "report": classification_report(y_actual, y_predictions, target_names=["benign", "malicious"], output_dict=True),
      "embed_time": elapsed_test_embed,
      "classify_time": elapsed_test_classify,
      "total_time": elapsed_test_embed + elapsed_test_classify,
  }, results_file, indent=2)

In [None]:
# COLUMNS = ["text", "label", "predicted_label", "predicted_malicious_probability"]
# SAMPLE = 50

# # Save some randomly-selected true positives, true negatives, false positives, and false negatives
# results_df = test_df.with_columns(
#     pl.Series("predicted_label", y_predictions),
#     pl.Series(
#         "predicted_malicious_probability",
#         y_probabilities,
#     ),
# )
# tp_df = results_df.filter((pl.col("label") == 1) & (pl.col("predicted_label") == 1))
# tn_df = results_df.filter((pl.col("label") == 0) & (pl.col("predicted_label") == 0))
# fp_df = results_df.filter((pl.col("label") == 0) & (pl.col("predicted_label") == 1))
# fn_df = results_df.filter((pl.col("label") == 1) & (pl.col("predicted_label") == 0))

# pl.concat([df.sample(SAMPLE) for df in [tp_df, tn_df, fp_df, fn_df]]).sample(
#     n=4 * SAMPLE,
#     shuffle=True
# ).select(COLUMNS).write_csv(OUTPUT_DIRECTORY / "samples.csv")

In [None]:
# Create an overlapping histogram of the prediction probabilities for each class
true_benign_probabilities = [probability for i, probability in enumerate(y_probabilities) if y_actual[i] == 0]
true_malicious_probabilities = [probability for i, probability in enumerate(y_probabilities) if y_actual[i] == 1]

plt.hist(true_benign_probabilities, bins=25, alpha=0.5, label="Benign", color="green")
plt.hist(true_malicious_probabilities, bins=25, alpha=0.5, label="Malicious", color="red")

plt.xlim(0, 1)
plt.xlabel("Predicted Probability of Being Malicious")
plt.ylabel("Frequency")
plt.title("Embedding Classifier Ground Truth Class Probability Distribution")

plt.legend()
plt.savefig(OUTPUT_DIRECTORY / "probability-distribution.png", dpi=600)