In [None]:
!pip install -q optuna xgboost lightgbm catboost category_encoders


In [None]:
import pandas as pd
import numpy as np
from pathlib import Path

from sklearn.model_selection import train_test_split, StratifiedKFold
from sklearn.metrics import (
    accuracy_score, f1_score, roc_auc_score, classification_report,
    confusion_matrix, precision_recall_fscore_support
)
from sklearn.preprocessing import OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer

import optuna
import xgboost as xgb
import lightgbm as lgb
from catboost import CatBoostClassifier, Pool

import category_encoders as ce
import matplotlib.pyplot as plt

RANDOM_STATE = 42
np.random.seed(RANDOM_STATE)

def summarize_results(y_true, y_prob, threshold=0.5, label="Model"):
    y_pred = (y_prob >= threshold).astype(int)
    acc = accuracy_score(y_true, y_pred)
    f1  = f1_score(y_true, y_pred)
    auc = roc_auc_score(y_true, y_prob)
    print(f"[{label}]  Acc: {acc:.4f} | F1: {f1:.4f} | ROC-AUC: {auc:.4f} | thr={threshold:.3f}")
    print("\nClassification Report:")
    print(classification_report(y_true, y_pred, digits=4))
    print("Confusion Matrix:\n", confusion_matrix(y_true, y_pred))
    return {"accuracy": acc, "f1": f1, "roc_auc": auc, "threshold": threshold}

def best_threshold_by_metric(y_true, y_prob, metric="accuracy"):
    # Search thresholds on a fine grid and pick the best for the chosen metric
    thresholds = np.linspace(0.05, 0.95, 181)
    best_thr, best_score = 0.5, -1
    for t in thresholds:
        y_pred = (y_prob >= t).astype(int)
        if metric == "f1":
            score = f1_score(y_true, y_pred)
        elif metric == "accuracy":
            score = accuracy_score(y_true, y_pred)
        else:
            raise ValueError("metric must be 'accuracy' or 'f1'")
        if score > best_score:
            best_score, best_thr = score, t
    return best_thr, best_score


In [None]:
# Set your path; if running locally in Colab after uploading, this usually works:
csv_path = "/content/hotel_bookings.csv"
assert Path(csv_path).exists(), f"CSV not found at {csv_path}"

df = pd.read_csv(csv_path)
print(df.shape)
print(df.head(3))
print(df.isnull().sum().sort_values(ascending=False).head(10))
print(df['is_canceled'].value_counts(normalize=True))


In [None]:
df_clean = df.copy()

# Drop known leakage / non-predictive columns
# reservation_status_date leaks the future; reservation_status is a direct outcome label.
for col in ["reservation_status_date", "reservation_status"]:
    if col in df_clean.columns:
        df_clean.drop(columns=[col], inplace=True)

# Map month names to numbers (if present)
if "arrival_date_month" in df_clean.columns and df_clean["arrival_date_month"].dtype == object:
    month_map = {'January':1,'February':2,'March':3,'April':4,'May':5,'June':6,
                 'July':7,'August':8,'September':9,'October':10,'November':11,'December':12}
    df_clean["arrival_date_month"] = df_clean["arrival_date_month"].map(month_map)

# Feature engineering
if set(["stays_in_weekend_nights","stays_in_week_nights"]).issubset(df_clean.columns):
    df_clean["total_nights"] = df_clean["stays_in_weekend_nights"] + df_clean["stays_in_week_nights"]

if set(["adults","children","babies"]).issubset(df_clean.columns):
    df_clean["children"] = df_clean["children"].fillna(0)  # occasional NaNs
    df_clean["total_guests"] = df_clean["adults"] + df_clean["children"] + df_clean["babies"]
    df_clean["has_children"] = (df_clean["children"] > 0).astype(int)
    df_clean["has_babies"]   = (df_clean["babies"] > 0).astype(int)
    df_clean["is_family"]    = ((df_clean["children"] > 0) | (df_clean["babies"] > 0)).astype(int)

# Target & features
y = df_clean["is_canceled"].astype(int)
X = df_clean.drop(columns=["is_canceled"])

# Identify columns
num_cols = X.select_dtypes(include=["number","float","int"]).columns.tolist()
cat_cols = X.select_dtypes(include=["object"]).columns.tolist()

print("Numerical:", len(num_cols), num_cols[:10], "...")
print("Categorical:", len(cat_cols), cat_cols[:10], "...")

# Train/Test split (stratified)
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=RANDOM_STATE, stratify=y
)

print("Train:", X_train.shape, " Test:", X_test.shape)


In [None]:
# Decide which categorical columns are high-cardinality
high_card_cols = [c for c in cat_cols if X_train[c].nunique() > 30]
low_card_cols  = [c for c in cat_cols if c not in high_card_cols]

print("High-cardinality:", high_card_cols)
print("Low-cardinality:", low_card_cols)

# Preprocessors:
#  - Numerical: median impute (trees don't need scaling)
#  - Low-card cats: OneHotEncoder handle_unknown='ignore'
#  - High-card cats: TargetEncoder (supervised) -> use only on train folds internally via pipeline
numeric_transformer = SimpleImputer(strategy="median")

low_card_transformer = Pipeline(steps=[
    ("imputer", SimpleImputer(strategy="most_frequent")),
    ("onehot", OneHotEncoder(handle_unknown="ignore", sparse_output=True))
])

high_card_transformer = Pipeline(steps=[
    ("imputer", SimpleImputer(strategy="most_frequent")),
    ("target", ce.TargetEncoder(handle_missing="value", handle_unknown="value", smoothing=0.3, min_samples_leaf=20))
])

preprocessor = ColumnTransformer(
    transformers=[
        ("num", numeric_transformer, num_cols),
        ("lowcard", low_card_transformer, low_card_cols),
        ("highcard", high_card_transformer, high_card_cols),
    ],
    remainder="drop"
)


In [None]:
# Class imbalance ratio for XGBoost
pos = y_train.sum()
neg = (y_train == 0).sum()
scale_pos_weight = max(1.0, neg / max(1, pos))

def xgb_objective(trial):
    params = {
        "n_estimators": trial.suggest_int("n_estimators", 300, 1200),
        "max_depth": trial.suggest_int("max_depth", 3, 10),
        "learning_rate": trial.suggest_float("learning_rate", 0.01, 0.3, log=True),
        "subsample": trial.suggest_float("subsample", 0.6, 1.0),
        "colsample_bytree": trial.suggest_float("colsample_bytree", 0.6, 1.0),
        "min_child_weight": trial.suggest_int("min_child_weight", 1, 12),
        "gamma": trial.suggest_float("gamma", 0.0, 5.0),
        "reg_lambda": trial.suggest_float("reg_lambda", 1e-3, 10.0, log=True),
        "reg_alpha": trial.suggest_float("reg_alpha", 1e-4, 3.0, log=True),
        "random_state": RANDOM_STATE,
        "objective": "binary:logistic",
        "eval_metric": "auc",
        "tree_method": "hist",
        "scale_pos_weight": scale_pos_weight
    }

    model = xgb.XGBClassifier(**params)

    # Build full pipeline each fold to avoid leakage
    pipe = Pipeline(steps=[("prep", preprocessor), ("model", model)])

    cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=RANDOM_STATE)
    aucs = []

    for train_idx, valid_idx in cv.split(X_train, y_train):
        X_tr, X_va = X_train.iloc[train_idx], X_train.iloc[valid_idx]
        y_tr, y_va = y_train.iloc[train_idx], y_train.iloc[valid_idx]

        pipe.fit(X_tr, y_tr)  # No early stopping here
        y_va_prob = pipe.predict_proba(X_va)[:, 1]
        aucs.append(roc_auc_score(y_va, y_va_prob))

    return np.mean(aucs)

# Run Optuna study
xgb_study = optuna.create_study(direction="maximize")
xgb_study.optimize(xgb_objective, n_trials=40, show_progress_bar=True)

print("Best XGB params:", xgb_study.best_params)
print("Best CV AUC:", xgb_study.best_value)

# Train final model with best params
best_xgb = xgb.XGBClassifier(
    **xgb_study.best_params,
    random_state=RANDOM_STATE,
    objective="binary:logistic",
    eval_metric="auc",
    tree_method="hist",
    scale_pos_weight=scale_pos_weight
)

xgb_pipe = Pipeline(steps=[("prep", preprocessor), ("model", best_xgb)])
xgb_pipe.fit(X_train, y_train)

# Evaluate
y_test_prob_xgb = xgb_pipe.predict_proba(X_test)[:, 1]
thr_acc_xgb, acc_xgb = best_threshold_by_metric(y_test, y_test_prob_xgb, metric="accuracy")
thr_f1_xgb, f1_xgb = best_threshold_by_metric(y_test, y_test_prob_xgb, metric="f1")

print(f"\n[XGB] Best threshold by accuracy: {thr_acc_xgb:.3f} (acc={acc_xgb:.4f})")
res_xgb_acc = summarize_results(y_test, y_test_prob_xgb, threshold=thr_acc_xgb, label="XGB-OPT (accuracy-opt)")

print(f"\n[XGB] Best threshold by F1: {thr_f1_xgb:.3f} (f1={f1_xgb:.4f})")
res_xgb_f1 = summarize_results(y_test, y_test_prob_xgb, threshold=thr_f1_xgb, label="XGB-OPT (f1-opt)")


In [None]:
import joblib

# Save trained pipeline (preprocessor + XGB model)
joblib.dump(xgb_pipe, "xgb_pipe_model.pkl")
joblib.dump(xgb_study, "xgb_optuna_study.pkl")


In [None]:
import shap

# --- SHAP explainability for tuned XGBoost ---
# Extract the fitted model from pipeline
best_xgb_model = xgb_pipe.named_steps["model"]

# Get processed feature names from preprocessor
feature_names = xgb_pipe.named_steps["prep"].get_feature_names_out()

# Use TreeExplainer for XGBoost
explainer = shap.TreeExplainer(best_xgb_model)

# Important: transform X_train with preprocessor (since pipeline hides this step)
X_train_processed = xgb_pipe.named_steps["prep"].transform(X_train)

# Compute SHAP values (may be heavy → sample if large dataset)
shap_values = explainer.shap_values(X_train_processed)

# --- Plots ---
# Summary plot (global importance)
shap.summary_plot(shap_values, X_train_processed, feature_names=feature_names)

# Bar plot version
shap.summary_plot(shap_values, X_train_processed, feature_names=feature_names, plot_type="bar")

# Dependence plot (example: top feature)
top_feature = feature_names[np.argsort(np.abs(shap_values).mean(0))[::-1][0]]
shap.dependence_plot(top_feature, shap_values, X_train_processed, feature_names=feature_names)
