In [None]:
import os
import joblib
import numpy as np
import pandas as pd

from sklearn.model_selection import StratifiedKFold, RandomizedSearchCV, cross_val_predict
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import OneHotEncoder, LabelEncoder
from sklearn.compose import ColumnTransformer
from sklearn.metrics import classification_report, confusion_matrix
from sklearn.base import BaseEstimator, TransformerMixin, clone

from imblearn.pipeline import Pipeline as ImbPipeline
from imblearn.over_sampling import RandomOverSampler

from boruta import BorutaPy

import matplotlib.pyplot as plt
import seaborn as sns

# Set random seed for reproducibility
RANDOM_STATE = 42

# Create artifacts folder if not exists
os.makedirs("../artifacts", exist_ok=True)

# ------------------------
# Boruta wrapper
# ------------------------

class BorutaSelector(BaseEstimator, TransformerMixin):
    def __init__(self, estimator=None, n_estimators=100, verbose=0, random_state=None):
        self.estimator = estimator
        self.n_estimators = n_estimators
        self.verbose = verbose
        self.random_state = random_state

    def fit(self, X, y):
        X_arr = np.asarray(X)
        y_arr = np.asarray(y)
        if self.estimator is None:
            base_est = RandomForestClassifier(n_estimators=self.n_estimators,
                                              n_jobs=-1, random_state=self.random_state)
        else:
            base_est = clone(self.estimator)
        self.boruta_ = BorutaPy(base_est, n_estimators=self.n_estimators, verbose=self.verbose, random_state=self.random_state)
        self.boruta_.fit(X_arr, y_arr)
        self.support_ = self.boruta_.support_.copy()
        return self

    def transform(self, X):
        X_arr = np.asarray(X)
        if not hasattr(self, "support_"):
            raise RuntimeError("BorutaSelector must be fitted before transform()")
        if np.sum(self.support_) == 0:
            return X_arr
        return X_arr[:, self.support_]

    def get_support(self):
        return getattr(self, "support_", None)

# ------------------------
# Helper function for evaluation
# ------------------------

from sklearn.metrics import accuracy_score, f1_score

def evaluate_per_fold(pipeline, X, y, cv):
    fold_accuracies = []
    fold_macro_f1 = []
    fold_per_class_f1 = []

    for fold_idx, (train_idx, test_idx) in enumerate(cv.split(X, y)):
        X_train, X_test = X.iloc[train_idx], X.iloc[test_idx]
        y_train, y_test = y[train_idx], y[test_idx]

        model = clone(pipeline)
        model.fit(X_train, y_train)
        y_pred = model.predict(X_test)

        acc = accuracy_score(y_test, y_pred)
        macro = f1_score(y_test, y_pred, average="macro")
        per_class = f1_score(y_test, y_pred, average=None)

        fold_accuracies.append(acc)
        fold_macro_f1.append(macro)
        fold_per_class_f1.append(per_class)

        print(f"[Fold {fold_idx+1}] acc={acc:.4f}, macroF1={macro:.4f}, per-class={per_class}")

    fold_per_class_f1 = np.array(fold_per_class_f1)

    results = {
        "accuracy_mean": np.mean(fold_accuracies),
        "accuracy_sd": np.std(fold_accuracies),
        "macro_f1_mean": np.mean(fold_macro_f1),
        "macro_f1_sd": np.std(fold_macro_f1),
        "class_f1_means": np.mean(fold_per_class_f1, axis=0),
        "class_f1_sds": np.std(fold_per_class_f1, axis=0)
    }
    return results

In [None]:
# ------------------------
# Load Data
# ------------------------

data_path = "../data/preprocessed/head_modifiers_pairs.csv"
if not os.path.exists(data_path):
    raise FileNotFoundError(f"Preprocessed CSV not found at {data_path}")

df = pd.read_csv(data_path)
print("Loaded data:", df.shape)

# Encode target labels

le_ezafe = LabelEncoder()
df['ezafe_label_enc'] = le_ezafe.fit_transform(df['ezafe_label'])

le_combined = LabelEncoder()
df['combined_label'] = df['ezafe_label_enc'].astype(str) + "_" + df['position'].astype(str)
df['combined_label_enc'] = le_combined.fit_transform(df['combined_label'])

joblib.dump(le_ezafe, "../artifacts/le_ezafe.joblib")
joblib.dump(le_combined, "../artifacts/le_combined.joblib")

# Features and targets

drop_cols = ['nominal_head_form', 'modifier_form', 'ezafe_label', 'position',
             'combined_label', 'ezafe_label_enc', 'combined_label_enc']
X = df.drop(columns=[c for c in drop_cols if c in df.columns])
y_comb = df['combined_label_enc'].values
y_ezafe = df['ezafe_label_enc'].values
y_pos = df['position'].astype(int).values

categorical_cols = X.select_dtypes(include=['object', 'category']).columns.tolist()
numeric_cols = [c for c in X.columns if c not in categorical_cols]

preprocessor = ColumnTransformer(
    transformers=[
        ("cat", OneHotEncoder(handle_unknown='ignore', sparse_output=False), categorical_cols)
    ],
    remainder='passthrough'
)

In [None]:
# ------------------------
# Define Pipeline and RandomizedSearchCV
# ------------------------

boruta_base_estimator = RandomForestClassifier(
    n_estimators=100,
    max_depth=5,
    class_weight='balanced',
    n_jobs=-1,
    random_state=RANDOM_STATE
)

ros = RandomOverSampler(random_state=RANDOM_STATE)

pipeline = ImbPipeline([
    ('pre', preprocessor),
    ('ros', ros),
    ('boruta', BorutaSelector(estimator=boruta_base_estimator, n_estimators=100, verbose=0, random_state=RANDOM_STATE)),
    ('clf', RandomForestClassifier(n_jobs=-1, random_state=RANDOM_STATE))
])

param_dist = {
    'clf__n_estimators': [100, 200, 300],
    'clf__max_depth': [None, 10, 20],
    'clf__min_samples_split': [2, 5],
    'clf__class_weight': [
        {0:1,1:1,2:1,3:1},
        {0:1,1:1,2:1,3:3},
        {0:1,1:1,2:1,3:5},
        {0:1,1:1,2:1,3:10}
    ]
}

cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=RANDOM_STATE)
random_search = RandomizedSearchCV(
    pipeline, param_distributions=param_dist,
    n_iter=10, cv=cv, scoring='f1_weighted',
    random_state=RANDOM_STATE, verbose=2, n_jobs=-1
)

In [None]:
# ------------------------
# Fit RandomizedSearchCV
# ------------------------

print("Starting RandomizedSearchCV (this may take a while)...")
random_search.fit(X, y_comb)
print("Best parameters:", random_search.best_params_)
best_pipeline = random_search.best_estimator_
joblib.dump(best_pipeline, "../artifacts/best_pipeline_combined.joblib")
print("Saved best pipeline.")

In [None]:
# ------------------------
# Feature Selection and Evaluation
# ------------------------

pre = best_pipeline.named_steps['pre']
try:
    pre_feature_names = pre.get_feature_names_out()
except Exception:
    pre_feature_names = np.array(X.columns.tolist())

boruta_mask = best_pipeline.named_steps['boruta'].get_support()
if boruta_mask is not None:
    selected_features = list(np.array(pre_feature_names)[boruta_mask])
else:
    selected_features = list(pre_feature_names)

joblib.dump(selected_features, "../artifacts/selected_feature_names.joblib")
print("Selected features saved:", len(selected_features))

print("\nRunning per-fold evaluation...")
results = evaluate_per_fold(best_pipeline, X, y_comb, cv)
print("Cross-validated results:", results)

In [None]:
# ------------------------
# Cell 5 (continued): Feature Selection
# ------------------------
if boruta_mask is not None:
    selected_features = list(np.array(pre_feature_names)[boruta_mask])
else:
    selected_features = list(pre_feature_names)

joblib.dump(selected_features, "../artifacts/selected_feature_names.joblib")
print("Selected features saved:", len(selected_features))

# ------------------------
# Cell 6: Two-stage evaluation (ezafe & position)
# ------------------------
# Recreate a fresh pipeline template with best classifier params
clf_params = {k.replace('clf__', ''): v for k, v in random_search.best_params_.items() if k.startswith('clf__')}

pipeline_for_cv = ImbPipeline([
    ('pre', preprocessor),
    ('ros', RandomOverSampler(random_state=RANDOM_STATE)),
    ('boruta', BorutaSelector(estimator=clone(boruta_base_estimator), n_estimators=100, verbose=0, random_state=RANDOM_STATE)),
    ('clf', RandomForestClassifier(random_state=RANDOM_STATE, n_jobs=-1, **clf_params))
])

# --- Stage 1: Ezafe ---
print("\n=== Two-stage CV: Ezafe ===")
y_pred_ezafe = cross_val_predict(pipeline_for_cv, X, y_ezafe, cv=cv, n_jobs=-1)
ezafe_results = evaluate_per_fold(pipeline_for_cv, X, y_ezafe, cv)
print("Ezafe per-fold results:", ezafe_results)

# --- Stage 2: Position ---
print("\n=== Two-stage CV: Position ===")
y_pred_pos = cross_val_predict(pipeline_for_cv, X, y_pos, cv=cv, n_jobs=-1)
position_results = evaluate_per_fold(pipeline_for_cv, X, y_pos, cv)
print("Position per-fold results:", position_results)

# --- Stage 3: Combined labels
y_pred_combined = [f"{e}_{p}" for e, p in zip(y_pred_ezafe, y_pred_pos)]
y_true_combined = df['ezafe_label_enc'].astype(str) + "_" + df['position'].astype(str)

mapping = {lab: idx for idx, lab in enumerate(le_combined.classes_)}
y_pred_encoded = np.array([mapping.get(lbl, -1) for lbl in y_pred_combined])
y_true_encoded = np.array([mapping[lbl] for lbl in y_true_combined])

valid_idx = (y_pred_encoded != -1)
y_pred_encoded = y_pred_encoded[valid_idx]
y_true_encoded = y_true_encoded[valid_idx]

label_mapping = {
    "0_1": "No ezafe & Head Initial",
    "0_2": "No ezafe & Head Final",
    "1_1": "With ezafe & Head Initial",
    "1_2": "With ezafe & Head Final"
}

unique_true_encoded = np.unique(y_true_encoded)
decoded_class_labels = [le_combined.inverse_transform([i])[0] for i in unique_true_encoded]
readable_labels = [label_mapping.get(label, label) for label in decoded_class_labels]

print("\nTwo-Stage Classification Report (RF-5-fold CV):")
print(classification_report(y_true_encoded, y_pred_encoded, target_names=readable_labels, digits=4))

# Confusion matrix
cm = confusion_matrix(y_true_encoded, y_pred_encoded, labels=unique_true_encoded)
plt.figure(figsize=(10, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap="cividis",
            xticklabels=readable_labels, yticklabels=readable_labels)
plt.xlabel('Predicted')
plt.ylabel('Actual')
plt.title('Composite Confusion Matrix (5-fold CV)')
plt.tight_layout()
plt.show()
