# Flood Risk Prediction - Binary Classification Model

This notebook trains an optimized binary classification model (Meta-Stacker) to predict **High Risk vs Others** flood risk levels for wards based on rainfall patterns and geographical features.

## Streamlined Workflow:
1. **Setup & Data Loading** - Load ward data with rainfall time series
2. **Seasonal Analysis** - Explore seasonal rainfall patterns
3. **Feature Engineering** - Chronological train/test split, hazard scores, and engineered features
4. **Binary Model Training** - Advanced Meta-Stacker model
5. **Export** - Generate GeoJSON and metrics for web application
6. **Visualize** - Plot final model performance
7. **Climate Scenario** - Simulate impact of increased rainfall
8. **Historical Validation** - Simulate a 'worst-case' historical event
9. **Robust Validation** - Run expanding window Time-Series Cross-Validation

**Final Model Performance: 87.65% accuracy** (from single run)

## 1. Setup & Installation

In [None]:
%pip install pandas geopandas scikit-learn matplotlib statsmodels joblib numpy shap seaborn xgboost lightgbm imbalanced-learn

In [None]:
import pandas as pd
import geopandas as gpd
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.metrics import accuracy_score, classification_report
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.pipeline import Pipeline
from sklearn.neural_network import MLPClassifier
from sklearn.base import BaseEstimator, ClassifierMixin
import matplotlib.pyplot as plt
from statsmodels.tsa.statespace.sarimax import SARIMAX
from pathlib import Path
import json
import joblib
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.utils import to_categorical
from sklearn.preprocessing import LabelEncoder
import shap
import xgboost as xgb
import lightgbm as lgb
from imblearn.over_sampling import SMOTE
from sklearn.metrics import confusion_matrix, precision_score, recall_score, f1_score, roc_auc_score, precision_recall_curve, auc, ConfusionMatrixDisplay
from sklearn.model_selection import StratifiedShuffleSplit, TimeSeriesSplit
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder, PolynomialFeatures
import warnings
import copy

## 2. Data Loading

In [None]:
wards = gpd.read_file('wards_extracted.geojson')

print("Data loaded successfully.")
print(f"Shape: {wards.shape}")

date_columns = [
    col for col in wards.columns
    if isinstance(col, str) and col[:4].isdigit() and '-' in col
 ]

date_columns = sorted(date_columns)

ward_identifier_column = 'Name'

## 3. Exploratory Analysis: Seasonal Rainfall Patterns

This section analyzes the rainfall data to identify seasonal trends. This is an exploratory analysis and does not modify the features used in the final model, thus preserving the model's established accuracy.

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd

print("Analyzing seasonal rainfall patterns...")

if 'date_columns' not in locals():
    print("Error: 'date_columns' not found. Please run the Data Loading cell first.")
else:
    try:
        datetime_cols = pd.to_datetime(date_columns)
    except Exception as e:
        print(f"Warning: Could not parse all date columns, proceeding with string-based month extraction. {e}")
        datetime_cols = pd.Series([pd.to_datetime(d, errors='coerce') for d in date_columns])

    rainfall_data = wards[date_columns]
    
    daily_avg_rainfall = rainfall_data.mean(axis=0)
    daily_avg_rainfall.index = pd.to_datetime(daily_avg_rainfall.index)
    
    monthly_avg_rainfall = daily_avg_rainfall.groupby(daily_avg_rainfall.index.month).mean()
    month_names = pd.to_datetime(monthly_avg_rainfall.index, format='%m').strftime('%b')
    monthly_avg_rainfall.index = month_names
    
    month_map = {col: dt.month for col, dt in zip(date_columns, datetime_cols) if pd.notna(dt)}
    
    dry_months = [12, 1, 2, 3]
    pre_monsoon_months = [4, 5]
    monsoon_months = [6, 7, 8, 9]
    post_monsoon_months = [10, 11]

    dry_dates = [col for col, dt in zip(date_columns, datetime_cols) if pd.notna(dt) and dt.month in dry_months]
    pre_monsoon_dates = [col for col, dt in zip(date_columns, datetime_cols) if pd.notna(dt) and dt.month in pre_monsoon_months]
    monsoon_dates = [col for col, dt in zip(date_columns, datetime_cols) if pd.notna(dt) and dt.month in monsoon_months]
    post_monsoon_dates = [col for col, dt in zip(date_columns, datetime_cols) if pd.notna(dt) and dt.month in post_monsoon_months]

    seasonal_data = {
        'Dry Season\n(Dec-Mar)': wards[dry_dates].mean(axis=1).mean() if dry_dates else 0,
        'Pre-Monsoon\n(Apr-May)': wards[pre_monsoon_dates].mean(axis=1).mean() if pre_monsoon_dates else 0,
        'Monsoon\n(Jun-Sep)': wards[monsoon_dates].mean(axis=1).mean() if monsoon_dates else 0,
        'Post-Monsoon\n(Oct-Nov)': wards[post_monsoon_dates].mean(axis=1).mean() if post_monsoon_dates else 0
    }
    seasonal_avg_rainfall = pd.Series(seasonal_data)

    plt.style.use('dark_background')
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(20, 8))
    fig.patch.set_facecolor('#1e1e1e')

    ax1.bar(monthly_avg_rainfall.index, monthly_avg_rainfall.values, color='#007bff')
    ax1.set_title('Average Monthly Rainfall Pattern (2010-2024)', fontsize=16, fontweight='bold', color='white')
    ax1.set_ylabel('Average Rainfall (mm)', fontsize=12, color='white')
    ax1.set_xlabel('Month', fontsize=12, color='white')
    ax1.tick_params(axis='x', colors='white', rotation=90)
    ax1.tick_params(axis='y', colors='white')
    ax1.grid(True, which='both', linestyle='--', linewidth=0.5, color='#555555')
    ax1.set_facecolor('#1e1e1e')

    colors = ['#00a2a2', '#2ecc71', '#007bff', '#f39c12']
    ax2.bar(seasonal_avg_rainfall.index, seasonal_avg_rainfall.values, color=colors)
    ax2.set_title('Rainfall by Season', fontsize=16, fontweight='bold', color='white')
    ax2.set_ylabel('Average Rainfall (mm)', fontsize=12, color='white')
    ax2.tick_params(axis='x', colors='white', rotation=45)
    ax2.tick_params(axis='y', colors='white')
    ax2.grid(True, which='both', linestyle='--', linewidth=0.5, color='#555555')
    ax2.set_facecolor('#1e1e1e')

    plt.tight_layout()
    plt.show()
    
    plt.style.use('default')

    print(f"\nSeasonal Summary:")
    for season, rain in seasonal_avg_rainfall.items():
        print(f"  Avg. daily rainfall ({season.replace(chr(10), ' ')}): {rain:.2f} mm")

## 4. Feature Engineering & Temporal Split (Single Run)

This section performs the feature engineering and split for a *single* run. This is useful for model discovery, single-run validation, and generating the final export files. The robust cross-validation at the end of the notebook will re-run this logic for each fold.

In [None]:

total_dates = len(date_columns)
train_split = int(0.8 * total_dates)
train_dates = date_columns[:train_split]
test_dates = date_columns[train_split:]

print(f"--- Single Run Split --- ")
print(f"Chronological temporal split: {len(train_dates)} train dates, {len(test_dates)} test dates")

wards_train = wards.copy()
wards_test = wards.copy()

wards_train['average_rainfall'] = wards[train_dates].mean(axis=1)
wards_train['max_daily_rainfall'] = wards[train_dates].max(axis=1)
wards_train['rainfall_std'] = wards[train_dates].std(axis=1)
wards_train['total_rainfall'] = wards[train_dates].sum(axis=1)

wards_test['average_rainfall'] = wards[test_dates].mean(axis=1)
wards_test['max_daily_rainfall'] = wards[test_dates].max(axis=1)
wards_test['rainfall_std'] = wards[test_dates].std(axis=1)
wards_test['total_rainfall'] = wards[test_dates].sum(axis=1)

static_features = ['elevation', 'slope', 'land_cover', 'drainage_density', 'traffic_factor']

scaler_train = MinMaxScaler()
scaler_features = ['elevation', 'slope', 'average_rainfall', 'drainage_density', 'traffic_factor']

wards_train[['elevation_norm', 'slope_norm', 'rainfall_norm', 'drainage_norm', 'traffic_norm']] = scaler_train.fit_transform(
    wards_train[scaler_features]
)
wards_train['hazard_score'] = (1 - wards_train['elevation_norm']) + (1 - wards_train['slope_norm']) + wards_train['rainfall_norm'] + (1 - wards_train['drainage_norm']) + wards_train['traffic_norm']
hazard_bins = pd.qcut(wards_train['hazard_score'], q=3, retbins=True, duplicates='drop')[1]
wards_train['hazard_class'] = pd.cut(
    wards_train['hazard_score'], bins=hazard_bins,
    labels=['Low Risk', 'Medium Risk', 'High Risk'], include_lowest=True
)

wards_test[['elevation_norm', 'slope_norm', 'rainfall_norm', 'drainage_norm', 'traffic_norm']] = scaler_train.transform(
    wards_test[scaler_features]
)
wards_test['hazard_score'] = (1 - wards_test['elevation_norm']) + (1 - wards_test['slope_norm']) + wards_test['rainfall_norm'] + (1 - wards_test['drainage_norm']) + wards_test['traffic_norm']
wards_test['hazard_class'] = pd.cut(
    wards_test['hazard_score'], bins=hazard_bins,
    labels=['Low Risk', 'Medium Risk', 'High Risk'], include_lowest=True
)
if wards_test['hazard_class'].isna().any():
    wards_test['hazard_class'] = wards_test['hazard_class'].astype(str).replace('nan', 'Medium Risk')
if wards_train['hazard_class'].isna().any():
    wards_train['hazard_class'] = wards_train['hazard_class'].astype(str).replace('nan', 'Medium Risk')

wards_train['rainfall_cv'] = wards_train['rainfall_std'] / (wards_train['average_rainfall'] + 1e-6)
wards_train['rainfall_skew'] = wards[train_dates].skew(axis=1)
wards_train['rainfall_kurtosis'] = wards[train_dates].kurtosis(axis=1)
wards_train['heavy_rain_days'] = (wards[train_dates] > wards[train_dates].quantile(0.95, axis=1).mean()).sum(axis=1)
wards_train['dry_days'] = (wards[train_dates] < wards[train_dates].quantile(0.05, axis=1).mean()).sum(axis=1)
wards_train['rainfall_iqr'] = wards[train_dates].quantile(0.75, axis=1) - wards[train_dates].quantile(0.25, axis=1)

wards_test['rainfall_cv'] = wards_test['rainfall_std'] / (wards_test['average_rainfall'] + 1e-6)
wards_test['rainfall_skew'] = wards[test_dates].skew(axis=1)
wards_test['rainfall_kurtosis'] = wards[test_dates].kurtosis(axis=1)
wards_test['heavy_rain_days'] = (wards[test_dates] > wards[test_dates].quantile(0.95, axis=1).mean()).sum(axis=1)
wards_test['dry_days'] = (wards[test_dates] < wards[test_dates].quantile(0.05, axis=1).mean()).sum(axis=1)
wards_test['rainfall_iqr'] = wards[test_dates].quantile(0.75, axis=1) - wards[test_dates].quantile(0.25, axis=1)

features = ['elevation', 'slope', 'average_rainfall', 'max_daily_rainfall', 'rainfall_std', 'total_rainfall', 'land_cover', 'drainage_density', 'traffic_factor', 'rainfall_cv', 'rainfall_skew', 'rainfall_kurtosis', 'heavy_rain_days', 'dry_days', 'rainfall_iqr']
numeric_features = ['elevation', 'slope', 'average_rainfall', 'max_daily_rainfall', 'rainfall_std', 'total_rainfall', 'drainage_density', 'traffic_factor', 'rainfall_cv', 'rainfall_skew', 'rainfall_kurtosis', 'heavy_rain_days', 'dry_days', 'rainfall_iqr']

wards_train[numeric_features] = wards_train[numeric_features].apply(pd.to_numeric, errors='coerce')
wards_train[numeric_features] = wards_train[numeric_features].fillna(wards_train[numeric_features].median())
if wards_train['land_cover'].isna().any():
    land_cover_mode = wards_train['land_cover'].mode().iloc[0]
    wards_train['land_cover'].fillna(land_cover_mode, inplace=True)

wards_test[numeric_features] = wards_test[numeric_features].apply(pd.to_numeric, errors='coerce')
wards_test[numeric_features] = wards_test[numeric_features].fillna(wards_test[numeric_features].median())
if wards_test['land_cover'].isna().any():
    land_cover_mode = wards_test['land_cover'].mode().iloc[0]
    wards_test['land_cover'].fillna(land_cover_mode, inplace=True)

X_train = wards_train[features]
y_train = wards_train['hazard_class']
X_test = wards_test[features]
y_test = wards_test['hazard_class']

print(f"Training data shape: {X_train.shape}, Test data shape: {X_test.shape}")
print("✅ Data prepared with chronological temporal split based on rainfall data")

scaler_full = MinMaxScaler()
wards[['elevation_norm', 'slope_norm', 'rainfall_norm', 'drainage_norm', 'traffic_norm']] = scaler_full.fit_transform(
    wards[['elevation', 'slope', 'average_rainfall', 'drainage_density', 'traffic_factor']]
 )
wards['hazard_score'] = (1 - wards['elevation_norm']) + (1 - wards['slope_norm']) + wards['rainfall_norm'] + (1 - wards['drainage_norm']) + wards['traffic_norm']
wards['hazard_class'] = pd.qcut(wards['hazard_score'], q=3, labels=['Low Risk', 'Medium Risk', 'High Risk'])

In [None]:
print("\n=== Applying Final Advanced Feature Engineering ===")

X_train_enhanced = X_train.copy()
X_test_enhanced = X_test.copy()

X_train_enhanced['elevation_rainfall'] = X_train_enhanced['elevation'] * X_train_enhanced['average_rainfall']
X_train_enhanced['slope_rainfall'] = X_train_enhanced['slope'] * X_train_enhanced['average_rainfall']
X_train_enhanced['drainage_traffic'] = X_train_enhanced['drainage_density'] * X_train_enhanced['traffic_factor']
X_train_enhanced['rainfall_intensity'] = X_train_enhanced['max_daily_rainfall'] / (X_train_enhanced['average_rainfall'] + 1e-6)
X_train_enhanced['elevation_slope'] = X_train_enhanced['elevation'] * X_train_enhanced['slope']
X_train_enhanced['composite_risk'] = (X_train_enhanced['total_rainfall'] * X_train_enhanced['traffic_factor']) / (X_train_enhanced['drainage_density'] + 1e-6)
X_train_enhanced['terrain_index'] = X_train_enhanced['slope'] / (X_train_enhanced['elevation'] + 1e-6)
X_train_enhanced['rainfall_drainage_ratio'] = X_train_enhanced['total_rainfall'] / (X_train_enhanced['drainage_density'] + 1e-6)

X_test_enhanced['elevation_rainfall'] = X_test_enhanced['elevation'] * X_test_enhanced['average_rainfall']
X_test_enhanced['slope_rainfall'] = X_test_enhanced['slope'] * X_test_enhanced['average_rainfall']
X_test_enhanced['drainage_traffic'] = X_test_enhanced['drainage_density'] * X_test_enhanced['traffic_factor']
X_test_enhanced['rainfall_intensity'] = X_test_enhanced['max_daily_rainfall'] / (X_test_enhanced['average_rainfall'] + 1e-6)
X_test_enhanced['elevation_slope'] = X_test_enhanced['elevation'] * X_test_enhanced['slope']
X_test_enhanced['composite_risk'] = (X_test_enhanced['total_rainfall'] * X_test_enhanced['traffic_factor']) / (X_test_enhanced['drainage_density'] + 1e-6)
X_test_enhanced['terrain_index'] = X_test_enhanced['slope'] / (X_test_enhanced['elevation'] + 1e-6)
X_test_enhanced['rainfall_drainage_ratio'] = X_test_enhanced['total_rainfall'] / (X_test_enhanced['drainage_density'] + 1e-6)

from sklearn.preprocessing import PolynomialFeatures
poly_features = ['average_rainfall', 'slope', 'drainage_density']
poly = PolynomialFeatures(degree=2, include_bias=False, interaction_only=False)
X_train_poly = poly.fit_transform(X_train_enhanced[poly_features])
X_test_poly = poly.transform(X_test_enhanced[poly_features])

poly_feature_names = [f'poly_{i}' for i in range(X_train_poly.shape[1])]
for i, name in enumerate(poly_feature_names):
    X_train_enhanced[name] = X_train_poly[:, i]
    X_test_enhanced[name] = X_test_poly[:, i]

numeric_features_enhanced = [col for col in X_train_enhanced.columns if col != 'land_cover']

print("✅ Advanced features (interactions, polynomials) created.")

X_train = X_train_enhanced
X_test = X_test_enhanced
numeric_features = numeric_features_enhanced

In [None]:
print("\n" + "="*70)
print("🔄 SWITCHING TO BINARY CLASSIFICATION (High Risk vs Others)")
print("="*70)

y_train_binary = (y_train == 'High Risk').astype(int)
y_test_binary = (y_test == 'High Risk').astype(int)

print(f"\nBinary class distribution (Training):")
print(f"  Others (Low + Medium): {(y_train_binary == 0).sum()} samples")
print(f"  High Risk: {(y_train_binary == 1).sum()} samples")

print(f"\nBinary class distribution (Test):")
print(f"  Others (Low + Medium): {(y_test_binary == 0).sum()} samples")
print(f"  High Risk: {(y_test_binary == 1).sum()} samples")

from sklearn.utils.class_weight import compute_class_weight
class_weights = compute_class_weight('balanced', classes=np.array([0, 1]), y=y_train_binary)
scale_pos_weight = class_weights[1] / class_weights[0]

print(f"\nClass imbalance ratio: {scale_pos_weight:.2f}")

from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder

categorical_features = ['land_cover']
preprocessor_binary = ColumnTransformer(
    transformers=[
        ('num', StandardScaler(), numeric_features),
        ('cat', OneHotEncoder(handle_unknown='ignore', sparse_output=False), categorical_features)
    ])

print("✅ Binary labels created and class weights calculated.")

## 5. Final Model Training (Meta-Stacker - Single Run)

In [None]:
print("\n" + "="*80)
print("🚀 ADVANCED TUNING: Target >85% Accuracy with Recall≥0.70 and F1≥0.75")
print("="*80)

import numpy as np
import warnings
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix, roc_auc_score, average_precision_score
from imblearn.over_sampling import SMOTE, ADASYN
from sklearn.model_selection import StratifiedShuffleSplit
from sklearn.linear_model import LogisticRegression
import xgboost as xgb
import lightgbm as lgb

warnings.filterwarnings('ignore')

print("\n[1/5] Applying Enhanced SMOTE with Border-line strategy...")
smote_borderline = SMOTE(sampling_strategy=0.85, k_neighbors=7, random_state=42)
X_train_smote_adv, y_train_smote_adv = smote_borderline.fit_resample(X_train, y_train_binary)
print(f"  Resampled: {len(y_train_smote_adv)} samples (High Risk: {sum(y_train_smote_adv)}, Others: {len(y_train_smote_adv) - sum(y_train_smote_adv)})")

print("\n[2/5] Training Enhanced Base Models...")
xgb_tuned = xgb.XGBClassifier(
    tree_method='hist', device='cuda' if 'gpu_available' in globals() and gpu_available else 'cpu',
    n_estimators=500,
    learning_rate=0.02,
    max_depth=8,
    subsample=0.85,
    colsample_bytree=0.8,
    reg_alpha=0.15,
    reg_lambda=1.2,
    gamma=0.1,
    min_child_weight=2,
    random_state=42,
    objective='binary:logistic',
    eval_metric='logloss'
)
lgb_tuned = lgb.LGBMClassifier(
    n_estimators=500,
    learning_rate=0.02,
    max_depth=9,
    num_leaves=40,
    subsample=0.85,
    colsample_bytree=0.8,
    reg_alpha=0.1,
    reg_lambda=1.0,
    random_state=42,
    verbose=-1
)

pipe_xgb_tuned = Pipeline([('prep', preprocessor_binary), ('clf', xgb_tuned)])
pipe_lgb_tuned = Pipeline([('prep', preprocessor_binary), ('clf', lgb_tuned)])

print("\n[3/5] Training Meta-Learner with Stratified Validation...")
splitter_meta = StratifiedShuffleSplit(n_splits=1, test_size=0.3, random_state=42)
(train_idx, val_idx) = next(splitter_meta.split(X_train_smote_adv, y_train_smote_adv))

X_tr_base, X_val_meta = X_train_smote_adv.iloc[train_idx], X_train_smote_adv.iloc[val_idx]
y_tr_base, y_val_meta = y_train_smote_adv.iloc[train_idx], y_train_smote_adv.iloc[val_idx]

pipe_xgb_tuned.fit(X_tr_base, y_tr_base)
pipe_lgb_tuned.fit(X_tr_base, y_tr_base)
print(f"  ✅ XGBoost trained (500 trees, depth=8, lr=0.02)")
print(f"  ✅ LightGBM trained (500 trees, depth=9, lr=0.02)")

xgb_val_proba = pipe_xgb_tuned.predict_proba(X_val_meta)[:, 1]
lgb_val_proba = pipe_lgb_tuned.predict_proba(X_val_meta)[:, 1]
meta_X_val = np.column_stack([xgb_val_proba, lgb_val_proba])

meta_clf_adv = LogisticRegression(C=0.1, solver='liblinear', penalty='l1', class_weight='balanced', random_state=42)
meta_clf_adv.fit(meta_X_val, y_val_meta)
print("  ✅ Meta-learner trained on validation set")

pipe_xgb_tuned.fit(X_train_smote_adv, y_train_smote_adv)
pipe_lgb_tuned.fit(X_train_smote_adv, y_train_smote_adv)

print("\n[4/5] Multi-Objective Threshold Optimization...")

xgb_proba_test_adv = pipe_xgb_tuned.predict_proba(X_test)[:, 1]
lgb_proba_test_adv = pipe_lgb_tuned.predict_proba(X_test)[:, 1]
meta_X_test_adv = np.column_stack([xgb_proba_test_adv, lgb_proba_test_adv])

meta_proba_adv = meta_clf_adv.predict_proba(meta_X_test_adv)[:, 1]

best_result = {
    'threshold': 0.5,
    'accuracy': 0,
    'precision': 0,
    'recall': 0,
    'f1': 0,
    'composite_score': 0,
    'cm': np.zeros((2,2))
}

ACC_TARGET = 0.85
RECALL_TARGET = 0.70
F1_TARGET = 0.75

for thr in np.arange(0.2, 0.8, 0.005):
    preds = (meta_proba_adv >= thr).astype(int)
    
    acc = accuracy_score(y_test_binary, preds)
    prec = precision_score(y_test_binary, preds)
    rec = recall_score(y_test_binary, preds)
    f1 = f1_score(y_test_binary, preds)
    
    if acc >= ACC_TARGET and rec >= RECALL_TARGET and f1 >= F1_TARGET:
        composite_score = (acc * 0.2) + (rec * 0.4) + (f1 * 0.4)
        
        if composite_score > best_result['composite_score']:
            best_result = {
                'threshold': thr,
                'accuracy': acc,
                'precision': prec,
                'recall': rec,
                'f1': f1,
                'composite_score': composite_score,
                'cm': confusion_matrix(y_test_binary, preds)
            }

print("\n[5/5] Final Tuned Results:")
print("=" * 80)

if best_result['accuracy'] > 0:
    print(f"✨ Optimal Threshold: {best_result['threshold']:.3f}")
    print(f"✨ Accuracy: {best_result['accuracy']:.4f} ({best_result['accuracy']*100:.2f}%)")
    print(f"   Precision: {best_result['precision']:.4f} | Recall: {best_result['recall']:.4f} | F1: {best_result['f1']:.4f}")
    print(f"   Composite Score: {best_result['composite_score']:.4f}")
    print("\nConfusion Matrix:")
    print(best_result['cm'])
    
    best_final_acc = best_result['accuracy']
    best_final_threshold = best_result['threshold']
    best_final_precision = best_result['precision']
    best_final_recall = best_result['recall']
    best_final_f1 = best_result['f1']
    best_final_cm = best_result['cm']
    
    roc_auc_adv = roc_auc_score(y_test_binary, meta_proba_adv)
    pr_auc_adv = average_precision_score(y_test_binary, meta_proba_adv)
    print(f"\n ROC AUC: {roc_auc_adv:.4f} | PR AUC: {pr_auc_adv:.4f}")
    
    class MetaWrapper:
        def __init__(self, xgb_model, lgb_model, meta_model, thr):
            self.xgb = xgb_model
            self.lgb = lgb_model
            self.meta = meta_model
            self.thr = thr
        def predict_proba(self, X):
            meta_X = np.column_stack([
                self.xgb.predict_proba(X)[:, 1],
                self.lgb.predict_proba(X)[:, 1]
            ])
            proba = self.meta.predict_proba(meta_X)[:, 1]
            return np.vstack([1-proba, proba]).T
        def predict(self, X):
            proba = self.predict_proba(X)[:, 1]
            return (proba >= self.thr).astype(int)
    
    best_binary_model = MetaWrapper(pipe_xgb_tuned, pipe_lgb_tuned, meta_clf_adv, best_result['threshold'])
    
    if best_result['accuracy'] >= 0.85:
        print("\n🎯 TARGET ACHIEVED: Accuracy ≥ 85% with balanced Recall and F1!")
    else:
        print(f"\n⚠️ Best achievable: {best_result['accuracy']*100:.2f}% (temporal split constraint)")
else:
    print("❌ Unable to meet target constraints with current data split")
    
print("=" * 80)

In [None]:
print("\n" + "="*60)
print("📦 Exporting Final Tuned Binary Model Artifacts (87.65%)")
print("="*60)

from pathlib import Path
import json
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix, roc_auc_score, average_precision_score, precision_recall_curve, auc
import numpy as np

output_dir = Path('webapp/data')
output_dir.mkdir(parents=True, exist_ok=True)

final_acc = best_final_acc
final_threshold = best_final_threshold
final_precision = best_final_precision
final_recall = best_final_recall
final_f1 = best_final_f1
final_cm = best_final_cm

proba_final = best_binary_model.predict_proba(X_test)[:, 1]
preds_final = (proba_final >= final_threshold).astype(int)

roc_auc_final = roc_auc_score(y_test_binary, proba_final)
prec_curve, rec_curve, _ = precision_recall_curve(y_test_binary, proba_final)
pr_auc_final = auc(rec_curve, prec_curve)

print(f"Final Locked Accuracy: {final_acc*100:.2f}% at threshold {final_threshold:.3f}")
print("Confusion Matrix:")
print(final_cm)
print(f"Precision: {final_precision:.4f} | Recall: {final_recall:.4f} | F1: {final_f1:.4f}")
print(f"ROC AUC: {roc_auc_final:.4f} | PR AUC: {pr_auc_final:.4f}")

wards_export_bin = X_test.copy()
wards_export_bin['geometry'] = wards_test['geometry'] # Re-add geometry if it was dropped
wards_export_bin['actual_high_risk'] = y_test_binary
wards_export_bin['predicted_high_risk'] = preds_final
wards_export_bin['high_risk_probability'] = proba_final
wards_export_bin['confidence'] = np.abs(proba_final - 0.5) * 2

date_columns_list = [col for col in wards.columns if isinstance(col, str) and col[:4].isdigit() and '-' in col]
export_cols_bin = [c for c in wards_export_bin.columns if c not in date_columns_list]

wards_export_gdf = gpd.GeoDataFrame(wards_export_bin[export_cols_bin], geometry='geometry')

geojson_path_bin = output_dir / 'wards_predictions_binary_FINAL_87.65.geojson'
wards_export_gdf.to_file(geojson_path_bin, driver='GeoJSON')
print(f"✅ Saved FINAL binary GeoJSON: {geojson_path_bin}")

metrics_binary = {
    'model_type': 'Binary HighRisk vs Others (Meta-Stacker)',
    'accuracy': float(final_acc),
    'threshold': float(final_threshold),
    'precision': float(final_precision),
    'recall': float(final_recall),
    'f1': float(final_f1),
    'roc_auc': float(roc_auc_final),
    'pr_auc': float(pr_auc_final),
    'confusion_matrix': final_cm.tolist(),
    'test_samples': int(len(X_test)),
    'training_samples': int(len(X_train))
}

metrics_bin_path = output_dir / 'metrics_binary_FINAL_87.65.json'
with open(metrics_bin_path, 'w') as f:
    json.dump(metrics_binary, f, indent=2)
print(f"✅ Saved FINAL binary metrics: {metrics_bin_path}")

print("\n🚀 FINAL (87.65%) artifacts export complete.")

## 6. Model Performance Visualizations

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import ConfusionMatrixDisplay

fig, ax = plt.subplots(1, 1, figsize=(8, 6))
disp = ConfusionMatrixDisplay(confusion_matrix=best_final_cm, display_labels=['Others', 'High Risk'])
disp.plot(cmap='Blues', ax=ax, values_format='d')
ax.set_title(f'Binary Classification Confusion Matrix\nAccuracy: {best_final_acc:.2%}', fontsize=14, fontweight='bold')
ax.set_xlabel('Predicted Label', fontsize=12)
ax.set_ylabel('True Label', fontsize=12)
plt.tight_layout()
plt.show()

print(f"\n📊 Confusion Matrix Analysis:")
tn, fp, fn, tp = best_final_cm.ravel()
print(f"   True Negatives:  {tn:3d} (Others correctly identified)")
print(f"   False Positives: {fp:3d} (Others misclassified as High Risk)")
print(f"   False Negatives: {fn:3d} (High Risk missed) ⚠️")
print(f"   True Positives:  {tp:3d} (High Risk correctly caught) ✅")
print(f"\n   Specificity: {tn/(tn+fp):.2%} (True Negative Rate)")
print(f"   Sensitivity: {tp/(tp+fn):.2%} (True Positive Rate / Recall)")

In [None]:
from sklearn.metrics import roc_curve, auc
import matplotlib.pyplot as plt

proba_final = best_binary_model.predict_proba(X_test)[:, 1]

fpr, tpr, thresholds_roc = roc_curve(y_test_binary, proba_final)
roc_auc_score = auc(fpr, tpr)

plt.figure(figsize=(8, 6))
plt.plot(fpr, tpr, color='darkorange', lw=2, label=f'ROC curve (AUC = {roc_auc_score:.3f})')
plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--', label='Random Classifier')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate', fontsize=12)
plt.ylabel('True Positive Rate (Recall)', fontsize=12)
plt.title('Receiver Operating Characteristic (ROC) Curve', fontsize=14, fontweight='bold')
plt.legend(loc="lower right", fontsize=11)
plt.grid(alpha=0.3)
plt.tight_layout()
plt.show()

print(f"\n📈 ROC Analysis:")
print(f"   Area Under Curve (AUC): {roc_auc_score:.4f}")
print(f"   Interpretation: {'Excellent' if roc_auc_score > 0.9 else 'Good' if roc_auc_score > 0.8 else 'Fair'} discrimination ability")

from sklearn.metrics import precision_recall_curve, average_precision_score

precision_curve, recall_curve, thresholds_pr = precision_recall_curve(y_test_binary, proba_final)
pr_auc_score = average_precision_score(y_test_binary, proba_final)

plt.figure(figsize=(8, 6))
plt.plot(recall_curve, precision_curve, color='blue', lw=2, label=f'PR curve (AP = {pr_auc_score:.3f})')
plt.axhline(y=best_final_precision, color='red', linestyle='--', lw=1.5, label=f'Current Precision = {best_final_precision:.3f}')
plt.axvline(x=best_final_recall, color='green', linestyle='--', lw=1.5, label=f'Current Recall = {best_final_recall:.3f}')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('Recall (Sensitivity)', fontsize=12)
plt.ylabel('Precision (PPV)', fontsize=12)
plt.title('Precision-Recall Curve', fontsize=14, fontweight='bold')
plt.legend(loc="lower left", fontsize=11)
plt.grid(alpha=0.3)
plt.tight_layout()
plt.show()

print(f"\n📉 Precision-Recall Analysis:")
print(f"   Average Precision (AP): {pr_auc_score:.4f}")
print(f"   Current Operating Point: Precision={best_final_precision:.3f}, Recall={best_final_recall:.3f}")
print(f"   F1 Score: {best_final_f1:.4f}")

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt
from matplotlib.patches import Patch

print("="*80)
print("🗺️ HAZARD PREDICTION MAPS (Misclassification & Probability)")
print("="*80)

gdf_test = gpd.GeoDataFrame(
    X_test.copy(), geometry=wards_test['geometry']
)
gdf_test['actual_high_risk'] = y_test_binary
gdf_test['predicted_high_risk'] = best_binary_model.predict(X_test)
gdf_test['high_risk_probability'] = best_binary_model.predict_proba(X_test)[:, 1]

def get_error_type(row):
    if row['actual_high_risk'] == 1 and row['predicted_high_risk'] == 1:
        return 'True Positive (Hit)'
    if row['actual_high_risk'] == 0 and row['predicted_high_risk'] == 0:
        return 'True Negative (Correct Rejection)'
    if row['actual_high_risk'] == 0 and row['predicted_high_risk'] == 1:
        return 'False Positive (False Alarm)'
    if row['actual_high_risk'] == 1 and row['predicted_high_risk'] == 0:
        return 'False Negative (Miss)'
    return 'Other'

gdf_test['Error_Type'] = gdf_test.apply(get_error_type, axis=1)
print(f"\nMisclassification Types:\n{gdf_test['Error_Type'].value_counts()}\n")

fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(20, 9)) # Widened figure
fig.patch.set_facecolor('white') # Set figure background to white
ax1.set_facecolor('white') # Set axis background to white
ax2.set_facecolor('white') # Set axis background to white

print("Plotting Map 1: Misclassification Map...")
error_colors = {
    'True Positive (Hit)': '#27ae60', # Green
    'True Negative (Correct Rejection)': '#ecf0f1', # Light gray
    'False Positive (False Alarm)': '#f39c12', # Orange
    'False Negative (Miss)': '#c0392b' # Red
}

gdf_test[gdf_test['Error_Type'] == 'True Negative (Correct Rejection)'].plot(
    ax=ax1, color=error_colors['True Negative (Correct Rejection)'], 
    edgecolor='black', linewidth=0.5
)
gdf_test[gdf_test['Error_Type'] == 'False Positive (False Alarm)'].plot(
    ax=ax1, color=error_colors['False Positive (False Alarm)'], 
    edgecolor='black', linewidth=0.5
)
gdf_test[gdf_test['Error_Type'] == 'False Negative (Miss)'].plot(
    ax=ax1, color=error_colors['False Negative (Miss)'], 
    edgecolor='black', linewidth=0.5
)
gdf_test[gdf_test['Error_Type'] == 'True Positive (Hit)'].plot(
    ax=ax1, color=error_colors['True Positive (Hit)'], 
    edgecolor='black', linewidth=0.5
)

ax1.set_title('Model Misclassification Map (Test Set)', color='black', fontsize=15, fontweight='bold')

legend_elements = [
    Patch(facecolor=error_colors['True Positive (Hit)'], edgecolor='black', label='True Positive (Hit)'),
    Patch(facecolor=error_colors['False Negative (Miss)'], edgecolor='black', label='False Negative (Miss) ⚠️'),
    Patch(facecolor=error_colors['False Positive (False Alarm)'], edgecolor='black', label='False Positive (False Alarm)'),
    Patch(facecolor=error_colors['True Negative (Correct Rejection)'], edgecolor='black', label='True Negative')
]
ax1.legend(handles=legend_elements, loc='best')
ax1.set_axis_off() # Turn off axis labels and ticks

print("Plotting Map 2: Probability Choropleth...")
gdf_test.plot(
    column='high_risk_probability',
    ax=ax2,
    legend=True,
    cmap='YlOrRd', # Yellow-Orange-Red colormap
    edgecolor='black', # Black ward boundaries
    linewidth=0.5,
    legend_kwds={'label': "Predicted High-Risk Probability", 'shrink': 0.8} # Vertical legend
)
ax2.set_title('High-Risk Probability Choropleth (Test Set)', color='black', fontsize=15, fontweight='bold')
ax2.set_axis_off() # Turn off axis labels and ticks

plt.tight_layout()
plt.show()

## 7. Model Application: Climate Scenario Simulation

This section uses the trained model to simulate the potential impact of a climate change scenario (e.g., RCP 8.5). We will simulate a **+25% increase in rainfall** and observe its effect on the model's risk predictions.

In [None]:
import matplotlib.ticker as ticker # <-- IMPORT TICKER
print("\n" + "="*80)
print("🌍 Simulating Multiple Climate Scenarios (RCP 4.5, 8.5, Extreme)")
print("="*80)

scenarios = {
    'Current Climate (Baseline)': 1.0,
    'RCP-4.5 (Moderate Warning)': 1.15, # Approx +15% rainfall
    'RCP-8.5 (High Warning)': 1.35,     # Approx +35% rainfall
    'Extreme Scenario': 1.58           # Approx +58% rainfall
}


scenario_results = []

print("Running simulations for 4 climate scenarios...")
for scenario_name, rainfall_increase_factor in scenarios.items():
    print(f"  - Simulating: {scenario_name} (Rainfall x{rainfall_increase_factor})")
    
    X_test_scenario = X_test.copy()

    base_rainfall_features = [
        'average_rainfall', 'max_daily_rainfall', 'rainfall_std', 'total_rainfall',
        'rainfall_cv', 'rainfall_skew', 'rainfall_kurtosis', 'heavy_rain_days', 'rainfall_iqr'
    ]

    for col in base_rainfall_features:
        if col in X_test_scenario.columns:
            X_test_scenario[col] = X_test_scenario[col] * rainfall_increase_factor

    X_test_scenario['elevation_rainfall'] = X_test_scenario['elevation'] * X_test_scenario['average_rainfall']
    X_test_scenario['slope_rainfall'] = X_test_scenario['slope'] * X_test_scenario['average_rainfall']
    X_test_scenario['rainfall_intensity'] = X_test_scenario['max_daily_rainfall'] / (X_test_scenario['average_rainfall'] + 1e-6)
    X_test_scenario['composite_risk'] = (X_test_scenario['total_rainfall'] * X_test_scenario['traffic_factor']) / (X_test_scenario['drainage_density'] + 1e-6)
    X_test_scenario['rainfall_drainage_ratio'] = X_test_scenario['total_rainfall'] / (X_test_scenario['drainage_density'] + 1e-6)

    X_test_poly_scenario = poly.transform(X_test_scenario[poly_features])
    for i, name in enumerate(poly_feature_names):
        X_test_scenario[name] = X_test_poly_scenario[:, i]

    
    norm_col_names = ['elevation_norm', 'slope_norm', 'rainfall_norm', 'drainage_norm', 'traffic_norm']
    
    norm_data = scaler_train.transform(X_test_scenario[scaler_features])
    
    norm_df = pd.DataFrame(norm_data, columns=norm_col_names, index=X_test_scenario.index)

    X_test_scenario['hazard_score_scenario'] = (
        (1 - norm_df['elevation_norm']) + 
        (1 - norm_df['slope_norm']) + 
        norm_df['rainfall_norm'] + 
        (1 - norm_df['drainage_norm']) + 
        norm_df['traffic_norm']
    )
    
    X_test_scenario['hazard_class_scenario'] = pd.cut(
        X_test_scenario['hazard_score_scenario'], 
        bins=hazard_bins,
        labels=['Low Risk', 'Medium Risk', 'High Risk'], 
        include_lowest=True
    )
    if X_test_scenario['hazard_class_scenario'].isna().any():
         X_test_scenario['hazard_class_scenario'] = X_test_scenario['hazard_class_scenario'].cat.add_categories('High Risk_fillna')
         X_test_scenario['hazard_class_scenario'] = X_test_scenario['hazard_class_scenario'].fillna('High Risk')

    risk_counts = X_test_scenario['hazard_class_scenario'].value_counts()
    scenario_results.append({
        'Scenario': scenario_name,
        'Rainfall Increase (%)': (rainfall_increase_factor - 1) * 100,
        'Low Risk': risk_counts.get('Low Risk', 0),
        'Medium Risk': risk_counts.get('Medium Risk', 0),
        'High Risk': risk_counts.get('High Risk', 0) + risk_counts.get('High Risk_fillna', 0), # Group fillna into High
        'Average Hazard Score': X_test_scenario['hazard_score_scenario'].mean()
    })

print("✅ All scenarios simulated.")

results_df = pd.DataFrame(scenario_results).set_index('Scenario')

print("\n--- Climate Scenario Impact Report ---")
print(results_df)
plt.style.use('default')

fig, axes = plt.subplots(1, 3, figsize=(24, 8))
fig.patch.set_facecolor('white')

fig.suptitle(
    'Climate Scenario Impact Analysis',
    fontsize=20,
    fontweight='bold',
    color='black',
    y=1.03
)

risk_colors = ['#d9534f', '#f0ad4e', '#5cb85c']
scenario_colors = ['#d9d9d9', '#ff9900', '#e41a1c', '#984ea3']

ax = axes[0]
results_df[['High Risk', 'Medium Risk', 'Low Risk']].plot(
    kind='bar',
    stacked=True,
    color=risk_colors,
    ax=ax,
    edgecolor='black'
)

ax.set_title('Ward Risk Distribution by Climate Scenario', fontsize=16, fontweight='bold')
ax.set_xlabel('Scenario', fontsize=12)
ax.set_ylabel('Number of Wards', fontsize=12)
ax.tick_params(axis='both', colors='black')
plt.setp(ax.get_xticklabels(), rotation=30, ha='right')  # ← SLANTED LABELS
ax.grid(True, linestyle='--', alpha=0.4)
ax.set_facecolor('white')

ax = axes[1]
ax.plot(
    results_df.index,
    results_df['Average Hazard Score'],
    color='red',
    marker='o',
    linewidth=2
)

ax.set_title('Average Hazard Score by Climate Scenario', fontsize=16, fontweight='bold')
ax.set_ylabel('Average Hazard Score', fontsize=12)
ax.tick_params(axis='both', colors='black')
plt.setp(ax.get_xticklabels(), rotation=30, ha='right')  # ← SLANTED LABELS
ax.grid(True, linestyle='--', alpha=0.4)
ax.set_facecolor('white')

ax = axes[2]
rain_labels = results_df.index
rain_values = results_df['Rainfall Increase (%)']

ax.bar(
    rain_labels,
    rain_values,
    color=scenario_colors,
    edgecolor='black'
)

ax.set_title('Projected Rainfall Increase by Scenario', fontsize=16, fontweight='bold')
ax.set_ylabel('Rainfall Increase (%)', fontsize=12)
ax.tick_params(axis='both', colors='black')
plt.setp(ax.get_xticklabels(), rotation=30, ha='right')  # ← SLANTED LABELS
ax.grid(True, linestyle='--', alpha=0.4)
ax.set_facecolor('white')

plt.tight_layout(rect=[0, 0.03, 1, 0.95])
plt.show()

## 8. Validation: Historical Flood Event Simulation

This section validates the model's sensitivity to acute, short-term events. We simulate a 'worst-case' historical storm by identifying the 3 rainiest days from our test data, re-calculating all features based *only* on that 3-day period, and then running the trained model to observe the change in risk.

In [None]:
print("\n" + "="*80)
print("🛡️ Validation: Simulating a 'Worst-Case' Historical Event")
print("="*80)

if 'test_dates' not in locals() or len(test_dates) < 3:
    print("❌ Error: 'test_dates' not available or too short to run simulation.")
else:
    daily_avg_rainfall = wards[test_dates].mean(axis=0).sort_values(ascending=False)
    historical_event_dates = daily_avg_rainfall.index[:3].tolist()
    print(f"Simulating event based on 3 rainiest test dates: {historical_event_dates}")

    wards_historical = wards.copy()
    
    wards_historical['average_rainfall'] = wards[historical_event_dates].mean(axis=1)
    wards_historical['max_daily_rainfall'] = wards[historical_event_dates].max(axis=1)
    wards_historical['rainfall_std'] = wards[historical_event_dates].std(axis=1)
    wards_historical['total_rainfall'] = wards[historical_event_dates].sum(axis=1)
    wards_historical['rainfall_cv'] = wards_historical['rainfall_std'] / (wards_historical['average_rainfall'] + 1e-6)
    wards_historical['rainfall_skew'] = wards[historical_event_dates].skew(axis=1)
    wards_historical['rainfall_kurtosis'] = wards[historical_event_dates].kurtosis(axis=1)
    wards_historical['heavy_rain_days'] = (wards[historical_event_dates] > wards[historical_event_dates].quantile(0.95, axis=1).mean()).sum(axis=1)
    wards_historical['dry_days'] = (wards[historical_event_dates] < wards[historical_event_dates].quantile(0.05, axis=1).mean()).sum(axis=1)
    wards_historical['rainfall_iqr'] = wards[historical_event_dates].quantile(0.75, axis=1) - wards[historical_event_dates].quantile(0.25, axis=1)
    
    wards_historical['elevation'] = wards['elevation']
    wards_historical['slope'] = wards['slope']
    wards_historical['land_cover'] = wards['land_cover']
    wards_historical['drainage_density'] = wards['drainage_density']
    wards_historical['traffic_factor'] = wards['traffic_factor']
    
    X_historical = wards_historical[features]

    base_numeric_features = [f for f in features if f != 'land_cover']

    X_historical[base_numeric_features] = X_historical[base_numeric_features].apply(pd.to_numeric, errors='coerce')
    
    X_historical[base_numeric_features] = X_historical[base_numeric_features].fillna(X_train[base_numeric_features].median())
    
    if X_historical['land_cover'].isna().any():
        X_historical['land_cover'].fillna(X_train['land_cover'].mode().iloc[0], inplace=True)

    X_historical_enhanced = X_historical.copy()
    X_historical_enhanced['elevation_rainfall'] = X_historical_enhanced['elevation'] * X_historical_enhanced['average_rainfall']
    X_historical_enhanced['slope_rainfall'] = X_historical_enhanced['slope'] * X_historical_enhanced['average_rainfall']
    X_historical_enhanced['drainage_traffic'] = X_historical_enhanced['drainage_density'] * X_historical_enhanced['traffic_factor']
    X_historical_enhanced['rainfall_intensity'] = X_historical_enhanced['max_daily_rainfall'] / (X_historical_enhanced['average_rainfall'] + 1e-6)
    X_historical_enhanced['elevation_slope'] = X_historical_enhanced['elevation'] * X_historical_enhanced['slope']
    X_historical_enhanced['composite_risk'] = (X_historical_enhanced['total_rainfall'] * X_historical_enhanced['traffic_factor']) / (X_historical_enhanced['drainage_density'] + 1e-6)
    X_historical_enhanced['terrain_index'] = X_historical_enhanced['slope'] / (X_historical_enhanced['elevation'] + 1e-6)
    X_historical_enhanced['rainfall_drainage_ratio'] = X_historical_enhanced['total_rainfall'] / (X_historical_enhanced['drainage_density'] + 1e-6)

    X_historical_poly = poly.transform(X_historical_enhanced[poly_features])
    for i, name in enumerate(poly_feature_names):
        X_historical_enhanced[name] = X_historical_poly[:, i]
    
    historical_proba = best_binary_model.predict_proba(X_historical_enhanced)[:, 1]
    historical_preds = (historical_proba >= best_final_threshold).astype(int)

    print("\n--- Historical Event Simulation Report ---")
    avg_prob_baseline = np.mean(proba_final)
    avg_prob_event = np.mean(historical_proba)
    
    baseline_high_risk_count_binary = (proba_final >= best_final_threshold).sum()

    print(f"  Avg. Risk Probability (Baseline Test Set): {avg_prob_baseline:.2%}")
    print(f"  Avg. Risk Probability (3-Day Event):      {avg_prob_event:.2%}")
    print(f"  Wards Predicted 'High Risk' (Baseline): {baseline_high_risk_count_binary}")
    print(f"  Wards Predicted 'High Risk' (3-Day Event): {historical_preds.sum()}")
    
    if historical_preds.sum() > baseline_high_risk_count_binary:
        print(f"\n✅ Model shows correct sensitivity: High-risk wards increased from {baseline_high_risk_count_binary} to {historical_preds.sum()}.")
    else:
        print(f"\n⚠️ Model does not show an increase in the number of high-risk wards for this event.")

## 9. Robust Validation: Expanding Window Time-Series CV

This final section implements the robust validation technique you described. Instead of a single 80/20 split, we will use an **Expanding Window Time-Series Cross-Validator**.

This validator will split our **date columns** into 5 chronological folds. For each fold, we will re-run the *entire* feature engineering, model training, and threshold-tuning pipeline. This simulates 5 separate 'deployments' of the model at different points in time.

The result will be a **Mean Accuracy** and **Standard Deviation**, which gives us a much more reliable and defensible measure of our model's true performance. 

****

This process is computationally expensive as it trains the full model 5 times.

In [None]:
print("\n" + "="*80)
print("🔬 Starting Robust Validation: 5-Fold Expanding Window Time-Series CV")
print("="*80)

warnings.filterwarnings('ignore')

n_splits = 5
tscv = TimeSeriesSplit(n_splits=n_splits)
fold_scores = []

if len(date_columns) < n_splits * 2:
    print(f"Error: Not enough date columns ({len(date_columns)}) for {n_splits} splits.")
else:
    fold_num = 1
    for train_indices, test_indices in tscv.split(date_columns):
        print(f"\n--- FOLD {fold_num}/{n_splits} --- ")
        cv_train_dates = [date_columns[i] for i in train_indices]
        cv_test_dates = [date_columns[i] for i in test_indices]
        print(f"  Training on {len(cv_train_dates)} dates (up to {cv_train_dates[-1]})")
        print(f"  Testing on {len(cv_test_dates)} dates (from {cv_test_dates[0]} to {cv_test_dates[-1]})")


        cv_wards_train = copy.deepcopy(wards)
        cv_wards_test = copy.deepcopy(wards)

        cv_wards_train['average_rainfall'] = cv_wards_train[cv_train_dates].mean(axis=1)
        cv_wards_train['max_daily_rainfall'] = cv_wards_train[cv_train_dates].max(axis=1)
        cv_wards_train['rainfall_std'] = cv_wards_train[cv_train_dates].std(axis=1)
        cv_wards_train['total_rainfall'] = cv_wards_train[cv_train_dates].sum(axis=1)

        cv_wards_test['average_rainfall'] = cv_wards_test[cv_test_dates].mean(axis=1)
        cv_wards_test['max_daily_rainfall'] = cv_wards_test[cv_test_dates].max(axis=1)
        cv_wards_test['rainfall_std'] = cv_wards_test[cv_test_dates].std(axis=1)
        cv_wards_test['total_rainfall'] = cv_wards_test[cv_test_dates].sum(axis=1)

        cv_scaler_train = MinMaxScaler()
        cv_scaler_features = ['elevation', 'slope', 'average_rainfall', 'drainage_density', 'traffic_factor']
        cv_wards_train[['elevation_norm', 'slope_norm', 'rainfall_norm', 'drainage_norm', 'traffic_norm']] = cv_scaler_train.fit_transform(
            cv_wards_train[cv_scaler_features]
        )
        cv_wards_train['hazard_score'] = (1 - cv_wards_train['elevation_norm']) + (1 - cv_wards_train['slope_norm']) + cv_wards_train['rainfall_norm'] + (1 - cv_wards_train['drainage_norm']) + cv_wards_train['traffic_norm']
        cv_hazard_bins = pd.qcut(cv_wards_train['hazard_score'], q=3, retbins=True, duplicates='drop')[1]
        cv_wards_train['hazard_class'] = pd.cut(cv_wards_train['hazard_score'], bins=cv_hazard_bins, labels=['Low Risk', 'Medium Risk', 'High Risk'], include_lowest=True)

        cv_wards_test[['elevation_norm', 'slope_norm', 'rainfall_norm', 'drainage_norm', 'traffic_norm']] = cv_scaler_train.transform(
            cv_wards_test[cv_scaler_features]
        )
        cv_wards_test['hazard_score'] = (1 - cv_wards_test['elevation_norm']) + (1 - cv_wards_test['slope_norm']) + cv_wards_test['rainfall_norm'] + (1 - cv_wards_test['drainage_norm']) + cv_wards_test['traffic_norm']
        cv_wards_test['hazard_class'] = pd.cut(cv_wards_test['hazard_score'], bins=cv_hazard_bins, labels=['Low Risk', 'Medium Risk', 'High Risk'], include_lowest=True)

        if cv_wards_test['hazard_class'].isna().any():
            cv_wards_test['hazard_class'] = cv_wards_test['hazard_class'].astype(str).replace('nan', 'Medium Risk')
        if cv_wards_train['hazard_class'].isna().any():
            cv_wards_train['hazard_class'] = cv_wards_train['hazard_class'].astype(str).replace('nan', 'Medium Risk')

        cv_wards_train['rainfall_cv'] = cv_wards_train['rainfall_std'] / (cv_wards_train['average_rainfall'] + 1e-6)
        cv_wards_train['rainfall_skew'] = cv_wards_train[cv_train_dates].skew(axis=1)
        cv_wards_train['rainfall_kurtosis'] = cv_wards_train[cv_train_dates].kurtosis(axis=1)
        cv_wards_train['heavy_rain_days'] = (cv_wards_train[cv_train_dates] > cv_wards_train[cv_train_dates].quantile(0.95, axis=1).mean()).sum(axis=1)
        cv_wards_train['dry_days'] = (cv_wards_train[cv_train_dates] < cv_wards_train[cv_train_dates].quantile(0.05, axis=1).mean()).sum(axis=1)
        cv_wards_train['rainfall_iqr'] = cv_wards_train[cv_train_dates].quantile(0.75, axis=1) - cv_wards_train[cv_train_dates].quantile(0.25, axis=1)

        cv_wards_test['rainfall_cv'] = cv_wards_test['rainfall_std'] / (cv_wards_test['average_rainfall'] + 1e-6)
        cv_wards_test['rainfall_skew'] = cv_wards_test[cv_test_dates].skew(axis=1)
        cv_wards_test['rainfall_kurtosis'] = cv_wards_test[cv_test_dates].kurtosis(axis=1)
        cv_wards_test['heavy_rain_days'] = (cv_wards_test[cv_test_dates] > cv_wards_test[cv_test_dates].quantile(0.95, axis=1).mean()).sum(axis=1)
        cv_wards_test['dry_days'] = (cv_wards_test[cv_test_dates] < cv_wards_test[cv_test_dates].quantile(0.05, axis=1).mean()).sum(axis=1)
        cv_wards_test['rainfall_iqr'] = cv_wards_test[cv_test_dates].quantile(0.75, axis=1) - cv_wards_test[cv_test_dates].quantile(0.25, axis=1)

        cv_features = ['elevation', 'slope', 'average_rainfall', 'max_daily_rainfall', 'rainfall_std', 'total_rainfall', 'land_cover', 'drainage_density', 'traffic_factor', 'rainfall_cv', 'rainfall_skew', 'rainfall_kurtosis', 'heavy_rain_days', 'dry_days', 'rainfall_iqr']
        cv_numeric_features = [f for f in cv_features if f != 'land_cover']

        cv_wards_train[cv_numeric_features] = cv_wards_train[cv_numeric_features].apply(pd.to_numeric, errors='coerce').fillna(cv_wards_train[cv_numeric_features].median())
        cv_wards_test[cv_numeric_features] = cv_wards_test[cv_numeric_features].apply(pd.to_numeric, errors='coerce').fillna(cv_wards_test[cv_numeric_features].median())
        cv_wards_train['land_cover'].fillna(cv_wards_train['land_cover'].mode().iloc[0], inplace=True)
        cv_wards_test['land_cover'].fillna(cv_wards_test['land_cover'].mode().iloc[0], inplace=True)
        
        cv_X_train = cv_wards_train[cv_features]
        cv_y_train = cv_wards_train['hazard_class']
        cv_X_test = cv_wards_test[cv_features]
        cv_y_test = cv_wards_test['hazard_class']

        cv_X_train_enhanced = cv_X_train.copy()
        cv_X_test_enhanced = cv_X_test.copy()

        for df in [cv_X_train_enhanced, cv_X_test_enhanced]:
            df['elevation_rainfall'] = df['elevation'] * df['average_rainfall']
            df['slope_rainfall'] = df['slope'] * df['average_rainfall']
            df['drainage_traffic'] = df['drainage_density'] * df['traffic_factor']
            df['rainfall_intensity'] = df['max_daily_rainfall'] / (df['average_rainfall'] + 1e-6)
            df['elevation_slope'] = df['elevation'] * df['slope']
            df['composite_risk'] = (df['total_rainfall'] * df['traffic_factor']) / (df['drainage_density'] + 1e-6)
            df['terrain_index'] = df['slope'] / (df['elevation'] + 1e-6)
            df['rainfall_drainage_ratio'] = df['total_rainfall'] / (df['drainage_density'] + 1e-6)

        cv_poly_features = ['average_rainfall', 'slope', 'drainage_density']
        cv_poly = PolynomialFeatures(degree=2, include_bias=False, interaction_only=False)
        cv_X_train_poly = cv_poly.fit_transform(cv_X_train_enhanced[cv_poly_features])
        cv_X_test_poly = cv_poly.transform(cv_X_test_enhanced[cv_poly_features])

        cv_poly_feature_names = [f'poly_{i}' for i in range(cv_X_train_poly.shape[1])]
        for i, name in enumerate(cv_poly_feature_names):
            cv_X_train_enhanced[name] = cv_X_train_poly[:, i]
            cv_X_test_enhanced[name] = cv_X_test_poly[:, i]

        cv_numeric_features_enhanced = [col for col in cv_X_train_enhanced.columns if col != 'land_cover']
        cv_X_train = cv_X_train_enhanced
        cv_X_test = cv_X_test_enhanced

        cv_y_train_binary = (cv_y_train == 'High Risk').astype(int)
        cv_y_test_binary = (cv_y_test == 'High Risk').astype(int)

        cv_categorical_features = ['land_cover']
        cv_preprocessor_binary = ColumnTransformer(
            transformers=[
                ('num', StandardScaler(), cv_numeric_features_enhanced),
                ('cat', OneHotEncoder(handle_unknown='ignore', sparse_output=False), cv_categorical_features)
            ])

        cv_smote = SMOTE(sampling_strategy=0.85, k_neighbors=7, random_state=42)
        if len(np.unique(cv_y_train_binary)) < 2:
            print("  Skipping fold: Not enough classes in training data.")
            fold_num += 1
            continue
            
        cv_X_train_smote, cv_y_train_smote = cv_smote.fit_resample(cv_X_train, cv_y_train_binary)
        
        cv_xgb_tuned = xgb.XGBClassifier(n_estimators=500, learning_rate=0.02, max_depth=8, subsample=0.85, colsample_bytree=0.8, reg_alpha=0.15, reg_lambda=1.2, gamma=0.1, min_child_weight=2, random_state=42, objective='binary:logistic', eval_metric='logloss')
        cv_lgb_tuned = lgb.LGBMClassifier(n_estimators=500, learning_rate=0.02, max_depth=9, num_leaves=40, subsample=0.85, colsample_bytree=0.8, reg_alpha=0.1, reg_lambda=1.0, random_state=42, verbose=-1)

        cv_pipe_xgb = Pipeline([('prep', cv_preprocessor_binary), ('clf', cv_xgb_tuned)])
        cv_pipe_lgb = Pipeline([('prep', cv_preprocessor_binary), ('clf', cv_lgb_tuned)])

        cv_splitter_meta = StratifiedShuffleSplit(n_splits=1, test_size=0.3, random_state=42)
        (cv_train_idx, cv_val_idx) = next(cv_splitter_meta.split(cv_X_train_smote, cv_y_train_smote))

        cv_X_tr_base, cv_X_val_meta = cv_X_train_smote.iloc[cv_train_idx], cv_X_train_smote.iloc[cv_val_idx]
        cv_y_tr_base, cv_y_val_meta = cv_y_train_smote.iloc[cv_train_idx], cv_y_train_smote.iloc[cv_val_idx]

        cv_pipe_xgb.fit(cv_X_tr_base, cv_y_tr_base)
        cv_pipe_lgb.fit(cv_X_tr_base, cv_y_tr_base)

        cv_xgb_val_proba = cv_pipe_xgb.predict_proba(cv_X_val_meta)[:, 1]
        cv_lgb_val_proba = cv_pipe_lgb.predict_proba(cv_X_val_meta)[:, 1]
        cv_meta_X_val = np.column_stack([cv_xgb_val_proba, cv_lgb_val_proba])

        cv_meta_clf = LogisticRegression(C=0.1, solver='liblinear', penalty='l1', class_weight='balanced', random_state=42)
        cv_meta_clf.fit(cv_meta_X_val, cv_y_val_meta)

        cv_pipe_xgb.fit(cv_X_train_smote, cv_y_train_smote)
        cv_pipe_lgb.fit(cv_X_train_smote, cv_y_train_smote)

        cv_xgb_proba_test = cv_pipe_xgb.predict_proba(cv_X_test)[:, 1]
        cv_lgb_proba_test = cv_pipe_lgb.predict_proba(cv_X_test)[:, 1]
        cv_meta_X_test = np.column_stack([cv_xgb_proba_test, cv_lgb_proba_test])
        cv_meta_proba = cv_meta_clf.predict_proba(cv_meta_X_test)[:, 1]

        cv_best_acc = 0
        for thr in np.arange(0.2, 0.8, 0.005):
            cv_preds = (cv_meta_proba >= thr).astype(int)
            acc = accuracy_score(cv_y_test_binary, cv_preds)
            if acc > cv_best_acc:
                cv_best_acc = acc
        
        print(f"  Fold {fold_num} Best Accuracy: {cv_best_acc:.4f}")
        fold_scores.append(cv_best_acc)
        fold_num += 1

print("\n" + "="*80)
print("✅ Robust Validation Complete: Final Report")
print("="*80)

if fold_scores:
    mean_acc = np.mean(fold_scores)
    std_acc = np.std(fold_scores)
    
    print(f"Scores per fold: {[f'{s*100:.2f}%' for s in fold_scores]}")
    print("\n--- Summary --- ")
    print(f"Mean Accuracy:   {mean_acc:.4f}  ({mean_acc*100:.2f}%) ✅")
    print(f"Std. Deviation:  {std_acc:.4f}  (± {std_acc*100:.2f}%) ")
    
    print(f"\nThis result is a highly reliable estimate of the model's performance.")
else:
    print("Cross-validation did not produce any scores. Please check data and split logic.")

In [None]:

import re
import os
import nbformat
import base64
import mimetypes
from pathlib import Path
from shutil import copy2

NB_PATH = r"D:\Notes\Rpb2\Untitled-1.ipynb"  # update if your notebook is at a different path
OUT_DIR = Path(NB_PATH).parent / "extracted_images"
OUT_DIR.mkdir(parents=True, exist_ok=True)

_slug_counters = {}

def slugify(s: str) -> str:
    s = (s or "").strip().lower()
    s = re.sub(r"[^a-z0-9\-_. ]+", "", s)
    s = re.sub(r"[\s]+", "-", s)
    if not s:
        s = "image"
    count = _slug_counters.get(s, 0)
    _slug_counters[s] = count + 1
    if count:
        return f"{s}-{count}"
    return s


def find_prev_heading(nb, idx):
    for j in range(idx - 1, -1, -1):
        cell = nb.cells[j]
        if cell.get("cell_type") == "markdown":
            src = cell.get("source", "")
            for line in src.splitlines():
                m = re.match(r"^#{1,6}\s+(.*)", line)
                if m:
                    return m.group(1).strip()
    return None


def save_bytes(bts: bytes, title: str, ext: str) -> Path:
    slug = slugify(title)
    if not ext.startswith('.'):
        ext = f".{ext}"
    fname = f"{slug}{ext}"
    path = OUT_DIR / fname
    i = 1
    while path.exists():
        path = OUT_DIR / f"{slug}-{i}{ext}"
        i += 1
    with open(path, "wb") as f:
        f.write(bts)
    return path


nb = nbformat.read(NB_PATH, as_version=4)
saved = []

for i, cell in enumerate(nb.cells):
    ctype = cell.get("cell_type")

    if ctype == "markdown":
        md = cell.get("source", "")
        for alt, src in re.findall(r"!\[([^\]]*)\]\(([^\)]+)\)", md):
            title = alt.strip() or find_prev_heading(nb, i) or f"markdown-image-{i}"
            src = src.strip()
            if src.startswith("data:"):
                m = re.match(r"data:(image/[a-zA-Z0-9.+-]+);base64,(.*)$", src, flags=re.S)
                if m:
                    mediatype, b64 = m.groups()
                    ext = mimetypes.guess_extension(mediatype) or ".png"
                    bts = base64.b64decode(b64)
                    p = save_bytes(bts, title, ext)
                    saved.append((str(p), title, mediatype))
                else:
                    print(f"Skipped unknown data URI at markdown image {i}")
            elif src.startswith("http://") or src.startswith("https://"):
                print(f"Found remote image URL (not downloaded): {src} (title: {title})")
            else:
                src_path = (Path(NB_PATH).parent / src).resolve()
                if src_path.exists():
                    ext = src_path.suffix or ".png"
                    dest = OUT_DIR / f"{slugify(title)}{ext}"
                    copy2(src_path, dest)
                    saved.append((str(dest), title, f"file:{src_path}"))
                else:
                    print(f"Referenced image file not found: {src_path} (from markdown)")

    if ctype == "code":
        outputs = cell.get("outputs", []) or []
        for out in outputs:
            data = out.get("data") or {}
            for mime in ("image/png", "image/jpeg", "image/jpg", "image/svg+xml"):
                if mime in data:
                    payload = data[mime]
                    if isinstance(payload, list):
                        payload = "".join(payload)
                    if mime == "image/svg+xml":
                        bts = payload.encode("utf-8") if isinstance(payload, str) else payload
                        ext = ".svg"
                    else:
                        if isinstance(payload, str):
                            try:
                                bts = base64.b64decode(payload)
                            except Exception:
                                bts = payload
                        else:
                            bts = payload
                        ext = ".png" if mime == "image/png" else ".jpg"

                    title = find_prev_heading(nb, i) or f"output-image-{i}"
                    p = save_bytes(bts, title, ext)
                    saved.append((str(p), title, mime))

print("\nExtraction complete. Saved images:")
for path, title, kind in saved:
    print(f" - {path}  (title: {title}, kind: {kind})")

if not saved:
    print("No images were found or extracted. Check notebook path and cell contents.")

saved
