# Improved Expert Model: Differentiating Class 1 vs 2
This notebook builds an improved expert model focused on distinguishing class 1 (injured) vs class 2 (heavily injured), then integrates it into the existing hierarchical pipeline (stage 1: injury vs no injury).  
Run this after the base data preparation notebook so that `X_train`, `X_test`, `y_train`, `y_test`, `cat_cols`, `num_cols` already exist.

## 1. Prepare 1-vs-2 expert dataset
Filter training/test sets to rows with y ∈ {1,2}.  
Binary encoding: y_bin = (y == 2).  
Create stratified train/validation split for expert tuning, leaving the test subset only for final evaluation.

In [None]:
import numpy as np
import pandas as pd

from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score, classification_report, confusion_matrix, ConfusionMatrixDisplay
import matplotlib.pyplot as plt

# Safety checks (in case notebook is run standalone)
required_vars = ['X_train','X_test','y_train','y_test','cat_cols','num_cols']
missing = [v for v in required_vars if v not in globals()]
if missing:
    raise RuntimeError(f"Missing required variables from previous preparation notebook: {missing}")

# Filter to classes 1 and 2
mask_train_12 = y_train.isin([1,2])
mask_test_12  = y_test.isin([1,2])

X_train_1_2 = X_train[mask_train_12].copy()
y_train_1_2 = y_train[mask_train_12].copy()

X_test_1_2  = X_test[mask_test_12].copy()
y_test_1_2  = y_test[mask_test_12].copy()

# Binary label: 1 => class 2 (heavily injured), 0 => class 1 (injured)
y_train_bin = (y_train_1_2 == 2).astype(int)

# Stratified split for validation
X_tr_1_2, X_val_1_2, y_tr_bin, y_val_bin = train_test_split(
    X_train_1_2, y_train_bin, test_size=0.2, stratify=y_train_bin, random_state=42
)

cat_idx = [X_train.columns.get_loc(c) for c in X_train.columns if c not in num_cols]

print("Train 1-2 shape:", X_tr_1_2.shape, "Validation shape:", X_val_1_2.shape, "Test subset shape:", X_test_1_2.shape)
print("Class balance (train_bin):", np.bincount(y_tr_bin))
print("Class balance (val_bin):  ", np.bincount(y_val_bin))

## 2. CatBoost expert (binary 1 vs 2): strong baseline with early stopping
Use CatBoost with automatic class weights, early stopping, and compute validation F1.

In [None]:
from catboost import CatBoostClassifier

cat_expert = CatBoostClassifier(
    loss_function='Logloss',
    eval_metric='F1',
    task_type='GPU',
    devices='0',
    auto_class_weights='Balanced',
    cat_features=cat_idx,
    random_state=42,
    iterations=10000,
    verbose=False
)

cat_expert.fit(
    X_tr_1_2, y_tr_bin,
    eval_set=(X_val_1_2, y_val_bin),
    use_best_model=True,
    early_stopping_rounds=200
)

# Validation performance
val_pred_bin = (cat_expert.predict_proba(X_val_1_2)[:,1] >= 0.5).astype(int)
val_f1 = f1_score(y_val_bin, val_pred_bin, average='binary')
print(f"Baseline CatBoost (binary) validation F1: {val_f1:.4f}; best_iteration={cat_expert.get_best_iteration()}")

## 3. Manual randomized search for CatBoost hyperparameters
Sample ~30 configurations; keep the best by validation F1 (threshold=0.5).  
Search spaces:
- depth ∈ {6..10}
- learning_rate ∈ logspace(-3, -0.5)
- l2_leaf_reg ∈ logspace(-2, 2)
- subsample ∈ [0.6, 0.9]
- rsm ∈ [0.6, 1.0]
- grow_policy ∈ {'SymmetricTree','Lossguide'}
- border_count ∈ {128, 254}
Iterations large (10000) with early stopping to truncate.

In [None]:
import random
from math import log10

def sample_hparams(n=30, rng=None):
    rng = rng or np.random.default_rng(42)
    configs = []
    for _ in range(n):
        cfg = {
            'depth': int(rng.integers(6, 11)),
            'learning_rate': 10 ** rng.uniform(-3.0, -0.5),
            'l2_leaf_reg': 10 ** rng.uniform(-2.0, 2.0),
            'subsample': rng.uniform(0.6, 0.9),
            'rsm': rng.uniform(0.6, 1.0),
            'grow_policy': random.choice(['SymmetricTree','Lossguide']),
            'border_count': int(random.choice([128, 254]))
        }
        configs.append(cfg)
    return configs

search_space = sample_hparams(32)

best_cat_expert = cat_expert
best_f1 = val_f1
best_cfg = {
    'depth': cat_expert.get_param('depth'),
    'learning_rate': cat_expert.get_param('learning_rate')
}

for i, cfg in enumerate(search_space, 1):
    model = CatBoostClassifier(
        loss_function='Logloss',
        eval_metric='F1',
        task_type='GPU',
        devices='0',
        auto_class_weights='Balanced',
        cat_features=cat_idx,
        iterations=10000,
        random_state=42 + i,
        verbose=False,
        early_stopping_rounds=200,
        **cfg
    )
    model.fit(
        X_tr_1_2, y_tr_bin,
        eval_set=(X_val_1_2, y_val_bin),
        use_best_model=True
    )
    preds_val = (model.predict_proba(X_val_1_2)[:,1] >= 0.5).astype(int)
    f1v = f1_score(y_val_bin, preds_val)
    if f1v > best_f1:
        best_f1 = f1v
        best_cat_expert = model
        best_cfg = cfg
    if i % 5 == 0:
        print(f"[{i}/{len(search_space)}] current best F1={best_f1:.4f}")

print("Best CatBoost expert validation F1:", best_f1)
print("Best hyperparameters:", best_cfg)

## 4. Probability calibration (isotonic) for the expert
Calibrate predicted probabilities of class 2 (heavily injured) on validation set.

In [None]:
from sklearn.isotonic import IsotonicRegression

p_val_cat_raw = best_cat_expert.predict_proba(X_val_1_2)[:,1]
iso_cat = IsotonicRegression(y_min=0.0, y_max=1.0, increasing=True, out_of_bounds='clip')
iso_cat.fit(p_val_cat_raw, y_val_bin)

def cat_expert_proba_calibrated(X):
    p = best_cat_expert.predict_proba(X)[:,1]
    return iso_cat.predict(p)

# Quick check
raw_f1 = f1_score(y_val_bin, (p_val_cat_raw >= 0.5).astype(int))
cal_f1 = f1_score(y_val_bin, (iso_cat.predict(p_val_cat_raw) >= 0.5).astype(int))
print(f"CatBoost raw val F1 @0.5: {raw_f1:.4f} | calibrated F1 @0.5: {cal_f1:.4f}")

## 5. HGB expert (binary 1 vs 2) with ordinal encoding
Train several HistGradientBoostingClassifier configurations, pick best by validation F1, then calibrate probabilities.

In [None]:
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import OrdinalEncoder
from sklearn.ensemble import HistGradientBoostingClassifier
from sklearn.utils.class_weight import compute_sample_weight

hgb_transformer = ColumnTransformer([
    ('cat', OrdinalEncoder(handle_unknown='use_encoded_value', unknown_value=-1), cat_cols),
    ('num', 'passthrough', num_cols)
], remainder='drop')

X_tr_hgb = hgb_transformer.fit_transform(X_tr_1_2)
X_val_hgb = hgb_transformer.transform(X_val_1_2)
X_te_hgb  = hgb_transformer.transform(X_test_1_2)

cat_feature_indices = list(range(len(cat_cols)))
sw_tr = compute_sample_weight(class_weight='balanced', y=y_tr_bin)

# Hyperparameter grid (compact)
learning_rates = [0.03, 0.05, 0.08]
max_iters = [500, 800, 1100]
max_depths = [6, 8, None]
min_leafs = [30, 60, 120]
l2_regs = np.logspace(-3, 2, 5)

best_hgb = None
best_hgb_f1 = -1
hgb_results = []

for lr in learning_rates:
    for mi in max_iters:
        for md in max_depths:
            for ml in min_leafs:
                for reg in l2_regs:
                    clf = HistGradientBoostingClassifier(
                        categorical_features=cat_feature_indices,
                        learning_rate=lr,
                        max_iter=mi,
                        max_depth=md,
                        min_samples_leaf=ml,
                        l2_regularization=reg,
                        early_stopping='auto',
                        random_state=42
                    )
                    clf.fit(X_tr_hgb, y_tr_bin, sample_weight=sw_tr)
                    pred_val = (clf.predict_proba(X_val_hgb)[:,1] >= 0.5).astype(int)
                    f1v = f1_score(y_val_bin, pred_val)
                    hgb_results.append((f1v, lr, mi, md, ml, reg))
                    if f1v > best_hgb_f1:
                        best_hgb_f1 = f1v
                        best_hgb = clf

print(f"Best HGB validation F1 @0.5: {best_hgb_f1:.4f}")

# Calibrate HGB
p_val_hgb_raw = best_hgb.predict_proba(X_val_hgb)[:,1]
iso_hgb = IsotonicRegression(y_min=0.0, y_max=1.0, increasing=True, out_of_bounds='clip')
iso_hgb.fit(p_val_hgb_raw, y_val_bin)

def hgb_expert_proba_calibrated(X):
    p = best_hgb.predict_proba(hgb_transformer.transform(X))[:,1] if X.shape[1] == X_tr_1_2.shape[1] else best_hgb.predict_proba(X)[:,1]
    return iso_hgb.predict(p)

raw_hgb_f1 = f1_score(y_val_bin, (p_val_hgb_raw >= 0.5).astype(int))
cal_hgb_f1 = f1_score(y_val_bin, (iso_hgb.predict(p_val_hgb_raw) >= 0.5).astype(int))
print(f"HGB raw val F1 @0.5: {raw_hgb_f1:.4f} | calibrated F1 @0.5: {cal_hgb_f1:.4f}")

## 6. Soft-voting ensemble of CatBoost and HGB experts (tune ensemble weight)
Compute calibrated probabilities and search combination weights w ∈ [0.1,0.9] and coarse thresholds τ to maximize macro F1 on labels {1,2}.

In [None]:
# Calibrated validation probabilities
P_cat_val = cat_expert_proba_calibrated(X_val_1_2)
P_hgb_val = hgb_expert_proba_calibrated(X_val_1_2)

weight_grid = np.arange(0.1, 0.91, 0.05)
threshold_coarse = np.arange(0.3, 0.81, 0.05)

best_ens = {'f1': -1}

def macro_f1_12(y_true_bin, y_pred_bin):
    # Map back to classes {1,2} for macro F1
    y_true_full = np.where(y_true_bin == 1, 2, 1)
    y_pred_full = np.where(y_pred_bin == 1, 2, 1)
    return f1_score(y_true_full, y_pred_full, average='macro')

for w in weight_grid:
    P_comb = w * P_hgb_val + (1 - w) * P_cat_val
    for tau in threshold_coarse:
        pred_bin = (P_comb >= tau).astype(int)
        f1m = macro_f1_12(y_val_bin, pred_bin)
        if f1m > best_ens['f1']:
            best_ens = {'f1': f1m, 'w': w, 'tau': float(tau)}

print("Best ensemble (coarse search):", best_ens)

## 7. Tune decision threshold (refinement) for class 2
Refine threshold τ with finer grid for the chosen ensemble weight; evaluate macro F1 over {1,2}.

In [None]:
fine_thresholds = np.linspace(max(0.05, best_ens['tau'] - 0.15),
                              min(0.95, best_ens['tau'] + 0.15), 61)
w_best = best_ens['w']

best_refined = {'f1': -1, 'tau': best_ens['tau']}
P_comb_val = w_best * P_hgb_val + (1 - w_best) * P_cat_val

for tau in fine_thresholds:
    pred_bin = (P_comb_val >= tau).astype(int)
    f1m = macro_f1_12(y_val_bin, pred_bin)
    if f1m > best_refined['f1']:
        best_refined = {'f1': f1m, 'tau': float(tau)}

print("Refined ensemble threshold:", best_refined)

## 8. Integrate expert into hierarchical pipeline
Reuse (or train if absent) a stage-1 model for (0 vs 1+2) to obtain p_inj.  
For p_heavy_cond (class 2 vs 1 among injured), use calibrated ensemble probability.  
Combine:
- p2 = p_inj * p_heavy_cond  
- p0 = 1 - p_inj  
- p1 = p_inj - p2  
Tune injury threshold t1 over a grid; heavy threshold = refined τ.  
Decision rule:
- if p_heavy_cond ≥ τ → class 2  
- elif p_inj ≥ t1 → class 1  
- else → class 0

In [None]:
from sklearn.preprocessing import OrdinalEncoder
from sklearn.compose import ColumnTransformer
from sklearn.ensemble import HistGradientBoostingClassifier
from sklearn.utils.class_weight import compute_sample_weight

# Attempt to reuse stage1_clf; if missing, train a new one
if 'stage1_clf' not in globals():
    print("stage1_clf not found. Training a new stage-1 classifier (0 vs 1+2)...")
    stage1_y = (y_train >= 1).astype(int)
    stage1_sw = compute_sample_weight(class_weight='balanced', y=stage1_y)
    stage1_transformer = ColumnTransformer([
        ('cat', OrdinalEncoder(handle_unknown='use_encoded_value', unknown_value=-1), cat_cols),
        ('num', 'passthrough', num_cols)
    ], remainder='drop')
    X_tr_stage1 = stage1_transformer.fit_transform(X_train)
    cat_idx_stage1 = list(range(len(cat_cols)))
    stage1_clf = HistGradientBoostingClassifier(
        categorical_features=cat_idx_stage1,
        learning_rate=0.05,
        max_depth=6,
        max_iter=700,
        min_samples_leaf=60,
        l2_regularization=1.0,
        random_state=42
    )
    stage1_clf.fit(X_tr_stage1, stage1_y, sample_weight=stage1_sw)
    # Store transformer for later
    hier_transformer = stage1_transformer
else:
    # If stage1 model exists, try to infer its transformer if present
    if 'hier_transformer' not in globals():
        print("Warning: 'hier_transformer' not found; reconstructing for inference.")
        hier_transformer = ColumnTransformer([
            ('cat', OrdinalEncoder(handle_unknown='use_encoded_value', unknown_value=-1), cat_cols),
            ('num', 'passthrough', num_cols)
        ], remainder='drop')
        hier_transformer.fit(X_train)

# p_inj on test
X_te_full = hier_transformer.transform(X_test)
p_inj = stage1_clf.predict_proba(X_te_full)[:,1]  # P(y >= 1)

# Calibrated expert ensemble probability for class 2 vs 1
def ensemble_expert_proba(X):
    # Use direct frames (expects same columns)
    p_cat = cat_expert_proba_calibrated(X)
    p_hgb = hgb_expert_proba_calibrated(X)
    return w_best * p_hgb + (1 - w_best) * p_cat

p_heavy_cond = ensemble_expert_proba(X_test)  # apply to all rows

tau = best_refined['tau']

# Combine hierarchical probabilities
p2 = p_inj * p_heavy_cond
p0 = 1.0 - p_inj
p1 = p_inj - p2
probs = np.vstack([p0, p1, p2]).T
probs = np.clip(probs, 0, 1)
probs /= probs.sum(axis=1, keepdims=True)

# Tune t1 (injury threshold) while keeping tau fixed
injury_thresholds = np.linspace(0.2, 0.9, 71)
best_hier = {'macro_f1': -1}

y_true = y_test.to_numpy()

for t1 in injury_thresholds:
    # Decision
    y_pred = np.zeros_like(y_true)
    heavy_mask = p_heavy_cond >= tau
    inj_mask = p_inj >= t1
    y_pred[inj_mask] = 1
    y_pred[heavy_mask] = 2

    mf1 = f1_score(y_true, y_pred, average='macro')
    if mf1 > best_hier['macro_f1']:
        best_hier = {'macro_f1': mf1, 't1': float(t1)}

print("Best hierarchical injury threshold search:", best_hier)

## 9. Final evaluation and plots
Report:
1. Expert performance on {1,2} test subset.
2. Full hierarchical performance (all classes).
Save artifacts (models, calibrators, parameters).

In [None]:
from joblib import dump
import os

# Final expert evaluation on {1,2} subset
P_cat_test = cat_expert_proba_calibrated(X_test_1_2)
P_hgb_test = hgb_expert_proba_calibrated(X_test_1_2)
P_comb_test = w_best * P_hgb_test + (1 - w_best) * P_cat_test
y_pred_expert_bin = (P_comb_test >= tau).astype(int)
y_pred_expert_full = np.where(y_pred_expert_bin==1, 2, 1)

print("=== Expert (classes 1 vs 2) Test Evaluation ===")
print(classification_report(y_test_1_2, y_pred_expert_full, digits=4))
ConfusionMatrixDisplay(
    confusion_matrix(y_test_1_2, y_pred_expert_full, labels=[1,2], normalize='true'),
    display_labels=['injured (1)','heavily injured (2)']
).plot()
plt.title("Expert Confusion Matrix (Normalized)")
plt.show()

# Hierarchical predictions with best thresholds
t1_best = best_hier['t1']
y_pred_hier = np.zeros_like(y_true)
heavy_mask = p_heavy_cond >= tau
inj_mask = p_inj >= t1_best
y_pred_hier[inj_mask] = 1
y_pred_hier[heavy_mask] = 2

print("=== Hierarchical Full Evaluation ===")
print(f"Chosen thresholds: t1(injury)={t1_best:.3f}, tau(heavy)={tau:.3f}")
print(classification_report(y_true, y_pred_hier, digits=4))
ConfusionMatrixDisplay(
    confusion_matrix(y_true, y_pred_hier, labels=[0,1,2], normalize='true'),
    display_labels=['uninjured (0)','injured (1)','heavily injured (2)']
).plot()
plt.title("Hierarchical Confusion Matrix (Normalized)")
plt.show()

# Save artifacts
artifacts_dir = "expert_artifacts"
os.makedirs(artifacts_dir, exist_ok=True)

dump(best_cat_expert, os.path.join(artifacts_dir, "cat_expert_best.joblib"))
dump(iso_cat, os.path.join(artifacts_dir, "iso_cat.joblib"))
dump(best_hgb, os.path.join(artifacts_dir, "hgb_expert_best.joblib"))
dump(hgb_transformer, os.path.join(artifacts_dir, "hgb_transformer.joblib"))
dump(iso_hgb, os.path.join(artifacts_dir, "iso_hgb.joblib"))
dump(stage1_clf, os.path.join(artifacts_dir, "stage1_clf.joblib"))
dump(hier_transformer, os.path.join(artifacts_dir, "hier_transformer.joblib"))

config = {
    'ensemble_weight_hgb': w_best,
    'tau_heavy': tau,
    't1_injury': t1_best,
    'cat_expert_params': best_cfg,
    'best_cat_val_f1': best_f1,
    'best_hgb_val_f1': best_hgb_f1,
    'refined_macro_f1_val_12': best_refined['f1'],
    'hierarchical_macro_f1_test': f1_score(y_true, y_pred_hier, average='macro')
}

import json
with open(os.path.join(artifacts_dir, "config.json"), "w") as f:
    json.dump(config, f, indent=2)

print("Artifacts saved to", artifacts_dir)
print("Configuration:", config)