In [None]:
import os

import joblib
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
from sklearn.calibration import calibration_curve
from sklearn.inspection import PartialDependenceDisplay, permutation_importance
from sklearn.metrics import (
    accuracy_score,
    auc,
    average_precision_score,
    brier_score_loss,
    classification_report,
    confusion_matrix,
    f1_score,
    precision_recall_curve,
    roc_curve,
)
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, label_binarize

In [None]:
OUT_DIR = "results/models"
OUT_VIS = "results/figures"
OUT_CSV = "results/csv"
os.makedirs(OUT_DIR, exist_ok=True)
os.makedirs(OUT_VIS, exist_ok=True)
os.makedirs(OUT_CSV, exist_ok=True)

In [None]:
X_train = pd.read_csv("data/processed/X/train.csv")
y_train = pd.read_csv("data/processed/Y/train.csv").squeeze()
X_val = pd.read_csv("data/processed/X/val.csv")
y_val = pd.read_csv("data/processed/Y/val.csv").squeeze()

print(f"Before filter - X_train shape: {X_train.shape}")
print(f"Before filter - X_val shape: {X_val.shape}")

X_train = X_train.select_dtypes(include=[np.number])
X_val = X_val.select_dtypes(include=[np.number])

print(f"After filter - X_train shape: {X_train.shape}")
print(f"After filter - X_val shape: {X_val.shape}")

In [None]:
# Combine or prune rare classes in train/val
threshold = 50
counts = y_train.value_counts()
rare_classes = counts[counts < threshold].index.tolist()
# Map rare labels to 'Other'
y_train = y_train.replace({cls: 'Other' for cls in rare_classes})
y_val = y_val.replace({cls: 'Other' for cls in rare_classes})
print(f"Classes after combining rare into 'Other': {y_train.unique()}")

## RF Model

We now train a Random Forest model which, in theory, should outperform the k-NN baseline due to its ability to capture complex interactions and non-linear relationships in the data. The Random Forest model is robust to overfitting and can handle high-dimensional feature spaces effectively.

In [None]:
from imblearn.over_sampling import SMOTE
from imblearn.pipeline import Pipeline as ImbPipeline
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import RandomizedSearchCV, StratifiedKFold

In [None]:
pipeline = ImbPipeline(
    [
        ("smote", SMOTE()),
        ("scaler", StandardScaler()),
        ("rf", RandomForestClassifier(class_weight="balanced", random_state=42)),
    ]
)

In [None]:
param_dist = {
    "rf__n_estimators": [100, 200, 500],
    "rf__max_depth": [None, 10, 20, 30],
    "rf__min_samples_split": [2, 5, 10],
    "rf__min_samples_leaf": [1, 2, 4],
}

In [None]:
cv = StratifiedKFold(n_splits=3, shuffle=True, random_state=42)
search = RandomizedSearchCV(
    estimator=pipeline,
    param_distributions=param_dist,
    n_iter=50,
    cv=cv,
    scoring="f1_macro",
    random_state=42,
    n_jobs=-1,
    verbose=1,
)

print("Starting RandomizedSearchCV for Random Forest with cv=StratifiedKFold(3)...")
search.fit(X_train, y_train)

In [None]:
best_rf = search.best_estimator_
print(f"Best parameters: {search.best_params_}")
print(f"Best CV F1 (weighted): {search.best_score_:.4f}")

In [None]:
X_combined = pd.concat([X_train, X_val], axis=0)
y_combined = pd.concat([y_train, y_val], axis=0)

In [None]:
print("Retraining best Random Forest on train + val...")
best_rf.fit(X_combined, y_combined)

In [None]:
X_test = pd.read_csv("data/processed/X/test.csv")
y_test = pd.read_csv("data/processed/Y/test.csv").squeeze()

X_test = X_test.select_dtypes(include=[np.number])
X_test = X_test[X_train.columns]

In [None]:
y_test_pred = best_rf.predict(X_test)
test_accuracy = accuracy_score(y_test, y_test_pred)
test_f1 = f1_score(y_test, y_test_pred, average="weighted")

print(f"Test Accuracy: {test_accuracy:.4f}")
print(f"Test F1 (weighted): {test_f1:.4f}")

In [None]:
report = classification_report(y_test, y_test_pred)
print("Classification Report on Test Set:")
print(report)

joblib.dump(best_rf, "results/models/rf_best.pkl")
print("Saved best Random Forest model to results/models/rf_best.pkl")

pd.DataFrame(search.cv_results_).to_csv("results/csv/rf_cv_results.csv", index=False)
with open("results/models/rf_test_metrics.txt", "w") as f:
    f.write(f"Test Accuracy: {test_accuracy:.4f}\n")
    f.write(f"Test F1 (weighted): {test_f1:.4f}\n")
    f.write(report)
print("Saved CV results and test metrics.")

In [None]:
importances = best_rf.named_steps["rf"].feature_importances_
importances_df = pd.Series(importances, index=X_train.columns)

importances_csv_path = os.path.join(OUT_CSV, "rf_feature_importances.csv")
importances_df.sort_values(ascending=False).to_csv(importances_csv_path, index=False)

In [None]:
importances_top20 = importances_df.sort_values(ascending=False).head(20)
plt.figure(figsize=(10, 8))
plt.barh(importances_top20.index, importances_top20.values, color="skyblue")
plt.gca().invert_yaxis()
plt.title("Top 20 Feature Importances (Random Forest)")
plt.xlabel("Importance")
plt.tight_layout()
path = os.path.join(OUT_VIS, "rf_top20_feature_importances.png")
plt.savefig(path)
print(f"Saved top 20 feature importances plot to {path}")
plt.close()

In [None]:
cm = confusion_matrix(y_test, y_test_pred, labels=best_rf.named_steps["rf"].classes_)
cm_df =pd.DataFrame(
    cm,
    index=best_rf.named_steps["rf"].classes_,
    columns=best_rf.named_steps["rf"].classes_,
)

cm_csv_path = os.path.join(OUT_CSV, "rf_confusion_matrix.csv")
cm_df.to_csv(cm_csv_path, index=False)
print(f"Saved confusion matrix to {cm_csv_path}")

In [None]:
fig, ax = plt.subplots(figsize=(10, 8))
sns.heatmap(cm_df, annot=True, fmt='d', cmap='Blues', ax=ax)
ax.set_xlabel('Predicted label')
ax.set_ylabel('True label')
ax.set_title('Random Forest Confusion Matrix Heatmap')
plt.tight_layout()
heatmap_path = os.path.join(OUT_VIS, 'rf_confusion_matrix_heatmap.png')
plt.savefig(heatmap_path)
plt.close(fig)
print(f'Saved confusion matrix heatmap to {heatmap_path}')

In [None]:
n_classes = len(best_rf.named_steps["rf"].classes_)
y_test_bin = label_binarize(y_test, classes=best_rf.named_steps["rf"].classes_)
y_score = best_rf.predict_proba(X_test)

fpr = {}
tpr = {}
roc_auc = {}
for i, cls in enumerate(best_rf.named_steps["rf"].classes_):
    fpr[i], tpr[i], _ = roc_curve(y_test_bin[:, i], y_score[:, i])
    roc_auc[i] = auc(fpr[i], tpr[i])

In [None]:
auc_df = pd.DataFrame(
    {
        "class": best_rf.named_steps["rf"].classes_,
        "auc": [roc_auc[i] for i in range(n_classes)],
    }
)
auc_csv_path = os.path.join(OUT_CSV, "rf_roc_auc_values.csv")
auc_df.to_csv(auc_csv_path, index=False)
print(f"Saved per-class AUC values to {auc_csv_path}")

In [None]:
fig, ax = plt.subplots(figsize=(10, 8))
for i, cls in enumerate(best_rf.named_steps["rf"].classes_):
    ax.plot(fpr[i], tpr[i], label=f"{cls} (AUC = {roc_auc[i]:.2f})")
ax.plot([0, 1], [0, 1], "k--")
ax.set_xlim([0.0, 1.0])
ax.set_ylim([0.0, 1.05])
ax.set_xlabel("False Positive Rate")
ax.set_ylabel("True Positive Rate")
ax.set_title("ROC Curves for Random Forest (One-vs-Rest)")
ax.legend(loc="lower right")
plt.tight_layout()
roc_plot_path = os.path.join(OUT_VIS, "rf_roc_curves.png")
plt.savefig(roc_plot_path)
plt.close(fig)
print(f"Saved ROC curves plot to {roc_plot_path}")

In [None]:
precision = {}
recall = {}
avg_prec = {}
for i, cls in enumerate(best_rf.named_steps["rf"].classes_):
    precision[i], recall[i], _ = precision_recall_curve(y_test_bin[:, i], y_score[:, i])
    avg_prec[i] = average_precision_score(y_test_bin[:, i], y_score[:, i])

In [None]:
pr_df = pd.DataFrame(
    {
        "class": best_rf.named_steps["rf"].classes_,
        "avg_precision": [avg_prec[i] for i in range(n_classes)],
    }
)
pr_csv_path = os.path.join(OUT_CSV, "rf_pr_avg_precision.csv")
pr_df.to_csv(pr_csv_path, index=False)
print(f"Saved per-class average precision to {pr_csv_path}")

In [None]:
fig, ax = plt.subplots(figsize=(10, 8))
for i, cls in enumerate(best_rf.named_steps["rf"].classes_):
    ax.plot(recall[i], precision[i], label=f"{cls} (AP = {avg_prec[i]:.2f})")
ax.set_xlabel("Recall")
ax.set_ylabel("Precision")
ax.set_title("Precision-Recall Curves for Random Forest (One-vs-Rest)")
ax.legend(loc="lower left")
plt.tight_layout()
pr_plot_path = os.path.join(OUT_VIS, "rf_pr_curves.png")
plt.savefig(pr_plot_path)
plt.close(fig)
print(f"Saved PR curves plot to {pr_plot_path}")

In [None]:
y_prob = best_rf.predict_proba(X_test)

calibration_data = {}
brier_scores = {}
for i, cls in enumerate(best_rf.named_steps["rf"].classes_):
    prob_pos = y_prob[:, i]
    frac_of_pos, mean_pred_val = calibration_curve(
        y_test_bin[:, i], prob_pos, n_bins=10
    )
    calibration_data[i] = (mean_pred_val, frac_of_pos)
    brier_scores[cls] = brier_score_loss(y_test_bin[:, i], prob_pos)

In [None]:
brier_df = pd.DataFrame(
    {
        "class": best_rf.named_steps["rf"].classes_,
        "brier_score": [
            brier_scores[cls] for cls in best_rf.named_steps["rf"].classes_
        ],
    }
)
brier_csv_path = os.path.join(OUT_CSV, "rf_brier_scores.csv")
brier_df.to_csv(brier_csv_path, index=False)
print(f"Saved per-class Brier scores to {brier_csv_path}")

In [None]:
fig, ax = plt.subplots(figsize=(10, 8))
for i, cls in enumerate(best_rf.named_steps["rf"].classes_):
    mean_pred_val, frac_of_pos = calibration_data[i]
    ax.plot(mean_pred_val, frac_of_pos, marker="o", label=f"{cls}")
ax.plot([0, 1], [0, 1], "k--")
ax.set_xlabel("Mean Predicted Probability")
ax.set_ylabel("Fraction of Positives")
ax.set_title("Calibration Curves for Random Forest")
ax.legend(loc="best")
plt.tight_layout()
cal_curve_path = os.path.join(OUT_VIS, "rf_calibration_curve.png")
plt.savefig(cal_curve_path)
plt.close(fig)
print(f"Saved calibration curve plot to {cal_curve_path}")

In [None]:
perm_results = permutation_importance(
    best_rf, X_test, y_test, n_repeats=10, random_state=42, n_jobs=4
)
perm_df = pd.DataFrame(
    {
        "feature": X_test.columns,
        "importance_mean": perm_results.importances_mean,
        "importance_std": perm_results.importances_std,
    }
)
perm_csv_path = os.path.join(OUT_CSV, "rf_permutation_importance.csv")
perm_df.to_csv(perm_csv_path, index=False)
print(f"Saved permutation importances to {perm_csv_path}")

In [None]:
rf_base = best_rf.named_steps["rf"]
n_estimators_list = [50, 100, 200, 300, 400, 500]
train_scores = []
val_scores = []
for n in n_estimators_list:
    rf_temp = RandomForestClassifier(
        n_estimators=n,
        max_depth=rf_base.max_depth,
        min_samples_split=rf_base.min_samples_split,
        min_samples_leaf=rf_base.min_samples_leaf,
        random_state=rf_base.random_state,
    )
    pipeline_temp = Pipeline([("scaler", StandardScaler()), ("rf", rf_temp)])
    pipeline_temp.fit(X_train, y_train)
    y_train_pred = pipeline_temp.predict(X_train)
    y_val_pred = pipeline_temp.predict(X_val)
    train_scores.append(f1_score(y_train, y_train_pred, average="weighted"))
    val_scores.append(f1_score(y_val, y_val_pred, average="weighted"))

In [None]:
lc_df = pd.DataFrame(
    {"n_estimators": n_estimators_list, "train_f1": train_scores, "val_f1": val_scores}
)
lc_csv = os.path.join(OUT_CSV, "rf_learning_curve.csv")
lc_df.to_csv(lc_csv, index=False)
print(f"Saved learning curve data to {lc_csv}")

In [None]:
plt.figure(figsize=(10, 6))
plt.plot(n_estimators_list, train_scores, label="Train F1", marker="o")
plt.plot(n_estimators_list, val_scores, label="Validation F1", marker="o")
plt.xlabel("Number of Trees")
plt.ylabel("F1 Score (weighted)")
plt.title("Learning Curve: Random Forest")
plt.legend()
plt.tight_layout()
lc_plot = os.path.join(OUT_VIS, "rf_learning_curve.png")
plt.savefig(lc_plot)
plt.close()
print(f"Saved learning curve plot to {lc_plot}")

In [None]:
top_features = importances_df.sort_values(ascending=False).head(3).index.tolist()

fig, ax = plt.subplots(figsize=(15, 5))
PartialDependenceDisplay.from_estimator(
    best_rf, X_train, features=top_features, kind='average', grid_resolution=50, ax=ax
)
plt.tight_layout()
pdp_plot_path = os.path.join(OUT_VIS, 'rf_partial_dependence.png')
plt.savefig(pdp_plot_path)
plt.close(fig)
print(f"Saved partial dependence plot to {pdp_plot_path}")