Diya Kharel-24152363

# Mushroom Classification using XGBoost – Simplified MLOps Workflow

This notebook demonstrates a streamlined machine learning operations (MLOps) workflow tailored to mushroom classification, leveraging XGBoost as the core algorithm. The process includes thorough experiment logging and monitoring to ensure reproducibility and reliability.

## Workflow Stages:
1. **Data Ingestion and Quality Checks**
2. **Data Preparation and Feature Engineering**
3. **Training with XGBoost**
4. **Model Assessment and Continuous Tracking**
5. **Summary of Outcomes**


In [None]:
%pip install xgboost



In [None]:
%pip install great-expectations



In [None]:
# --- Environment & paths ------------------------------------------------------
import os, sys, warnings
warnings.filterwarnings("ignore")

# Ensure project root is on sys.path (use absolute path to avoid surprises)
PROJECT_ROOT = os.path.abspath(".")
if PROJECT_ROOT not in sys.path:
    sys.path.append(PROJECT_ROOT)

# --- Core libs ----------------------------------------------------------------
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

# MLflow
import mlflow
import mlflow.sklearn

# Sklearn utilities
from sklearn.preprocessing import OneHotEncoder, LabelEncoder, StandardScaler
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.model_selection import train_test_split, learning_curve
from sklearn.metrics import (
    accuracy_score, precision_score, recall_score, f1_score,
    confusion_matrix, matthews_corrcoef, roc_auc_score, roc_curve
)

from scipy.stats import zscore
import xgboost as xgb

# Great Expectations (optional)
try:
    import great_expectations as gx
    from great_expectations.core import ExpectationSuite
    GE_AVAILABLE = True
    print("Great Expectations detected — data checks can run.")
except Exception:
    GE_AVAILABLE = False
    print("ℹGreat Expectations not installed — skipping data checks.")

# --- MLflow setup -------------------------------------------------------------
print("Initializing MLflow tracking...")

# where local artifacts will be stored when no server is reachable
ARTIFACTS_DIR = os.path.join(PROJECT_ROOT, "mlflow_artifacts")
os.makedirs(ARTIFACTS_DIR, exist_ok=True)

REMOTE_URI = "http://mlflow-server:5001"
LOCAL_URI  = f"file://{os.path.join(PROJECT_ROOT, 'mlruns')}"

# Prefer remote server; if not reachable, fall back to local file store
try:
    mlflow.set_tracking_uri(REMOTE_URI)
    # lightweight call to confirm connectivity
    _ = mlflow.search_experiments(max_results=1)
    TRACKING_URI = REMOTE_URI
    print(f"Using remote MLflow server: {REMOTE_URI}")
except Exception as err:
    print(f"Remote MLflow unreachable ({err}). Falling back to local store.")
    mlflow.set_tracking_uri(LOCAL_URI)
    TRACKING_URI = LOCAL_URI
    print(f"Using local MLflow store: {LOCAL_URI}")

# Create or fetch an experiment and set it active
EXPERIMENT_NAME = "mushroom_classification_comprehensive_notebook"
try:
    exp = mlflow.get_experiment_by_name(EXPERIMENT_NAME)
    if exp is None:
        EXPERIMENT_ID = mlflow.create_experiment(
            EXPERIMENT_NAME,
            artifact_location=os.path.join(ARTIFACTS_DIR, EXPERIMENT_NAME),
        )
        print(f" Created experiment: {EXPERIMENT_NAME} (id={EXPERIMENT_ID})")
    else:
        EXPERIMENT_ID = exp.experiment_id
        print(f"Found existing experiment: {EXPERIMENT_NAME} (id={EXPERIMENT_ID})")

    mlflow.set_experiment(EXPERIMENT_NAME)
except Exception as err:
    print(f"Could not set experiment ({err}). Falling back to default.")
    EXPERIMENT_NAME = "Default"
    EXPERIMENT_ID = "0"

print("MLflow configured.")
print(f"   • Experiment: {EXPERIMENT_NAME}")
print(f"   • Experiment ID: {EXPERIMENT_ID}")
print(f"   • Tracking URI: {mlflow.get_tracking_uri()}")
print(f"   • Artifacts dir: {ARTIFACTS_DIR}")


In [None]:
from pathlib import Path
import os
import tempfile

def validate_dataset(
    df,
    run_name: str = "data_quality_check",
    min_rows: int = 101,
    min_cols: int = 6,
    max_rows: int = 100_000,
    class_candidates=("class", "class_encoded"),
    artifact_dir: str | None = None
):
    """
    Lightweight data quality gate with optional MLflow logging.

    Checks (default thresholds can be changed):
      - has at least `min_rows` rows
      - has at least `min_cols` columns
      - dataframe is not empty
      - contains any of the class columns in `class_candidates`
      - row count is <= `max_rows`

    Returns a dict with overall pass/fail and per-check results.
    Saves simple artifacts (dtypes and summary) if possible.
    """

    # Great Expectations is optional — skip quietly if not available
    try:
        import great_expectations as ge  # noqa: F401
        ge_available = True
    except Exception:
        ge_available = False

    # MLflow is optional — only log if available
    try:
        import mlflow  # type: ignore
        mlflow_available = True
    except Exception:
        mlflow_available = False

    # Prepare checks
    checks = {
        "enough_rows": len(df) >= min_rows,
        "enough_columns": df.shape[1] >= min_cols,
        "not_empty": not df.empty,
        "has_class_column": any(c in df.columns for c in class_candidates),
        "reasonable_size": len(df) <= max_rows,
    }
    passed = all(checks.values())

    # Pretty console feedback
    for name, ok in checks.items():
        mark = "✅" if ok else "❌"
        print(f"{mark} {name}")

    # Metrics to log if MLflow is available
    base_metrics = {
        "dq_passed": int(passed),
        "n_rows": int(len(df)),
        "n_columns": int(df.shape[1]),
        "n_missing_total": int(df.isna().sum().sum()),
    }

    # Choose artifact directory
    if artifact_dir is None:
        # temp dir inside CWD to keep behaviour predictable in notebooks
        artifact_dir = os.path.join(os.getcwd(), "ml_artifacts")
    Path(artifact_dir).mkdir(parents=True, exist_ok=True)

    # Write artifacts to disk
    try:
        dtypes_path = os.path.join(artifact_dir, "data_types.txt")
        with open(dtypes_path, "w", encoding="utf-8") as f:
            f.write(df.dtypes.to_string())

        summary_path = os.path.join(artifact_dir, "data_summary.txt")
        with open(summary_path, "w", encoding="utf-8") as f:
            # include='all' so object columns are summarised too
            f.write(str(df.describe(include="all", datetime_is_numeric=True)))

        missing_by_col_path = os.path.join(artifact_dir, "missing_by_column.txt")
        with open(missing_by_col_path, "w", encoding="utf-8") as f:
            f.write((df.isna().sum().sort_values(ascending=False)).to_string())
    except Exception as e:
        print(f" Unable to write artifacts: {e}")

    # Optional MLflow logging
    if mlflow_available:
        try:
            with mlflow.start_run(run_name=run_name):
                for k, v in base_metrics.items():
                    mlflow.log_metric(k, v)
                for k, v in checks.items():
                    mlflow.log_metric(f"check_{k}", int(v))

                # Log artifacts if present
                for p in (dtypes_path, summary_path, missing_by_col_path):
                    if os.path.exists(p):
                        mlflow.log_artifact(p)

                # Record whether GE was available (informational)
                mlflow.log_param("great_expectations_available", ge_available)
        except Exception as e:
            print(f"MLflow logging skipped due to error: {e}")

    # Optional Great Expectations hook (placeholder to avoid heavy deps)
    # You can extend this to run actual GE expectations if desired.
    if ge_available:
        print("ℹGreat Expectations detected (no expectations suite provided; skipping).")

    return {
        "passed": passed,
        "checks": checks,
        "metrics": base_metrics,
        "artifacts_dir": artifact_dir,
        "message": "PASSED" if passed else "FAILED",
    }

print("Data quality validator ready.")


In [None]:
from pathlib import Path

def _read_flexible_csv(path: Path):
    """Try common separators first, then fall back to auto-detect."""
    import pandas as pd

    # 1) Try ; then , (fast, explicit)
    for sep in (";", ","):
        try:
            return pd.read_csv(path, sep=sep)
        except Exception:
            pass

    # 2) Fall back to Python engine with delimiter sniffing
    try:
        return pd.read_csv(path, sep=None, engine="python")
    except Exception as e:
        raise RuntimeError(f"Failed to read {path.name}: {e}") from e


# ---------- ETL: EXTRACT ----------
print("▶ Starting data extraction...")

# Search candidates in priority order
raw_dir = Path("data/raw")
candidates = [
    raw_dir / "secondary_data.csv",
    raw_dir / "fraction_of_dataset.csv",
]

df = None
data_source = None

for p in candidates:
    if p.exists():
        df = _read_flexible_csv(p)
        data_source = p.name
        print(f"✓ Loaded {data_source} with shape {tuple(df.shape)}")
        break

if df is None:
    msg = "No data files found. Please place a CSV in data/raw/ (e.g., secondary_data.csv)."
    print(msg)
    raise FileNotFoundError(msg)

# Clean column names: trim, normalise punctuation to underscores, collapse repeats
df.columns = (
    df.columns
      .str.strip()
      .str.replace(r"[^\w]+", "_", regex=True)
      .str.strip("_")
)

print(f"✓ Columns after cleaning: {list(df.columns)}")

# Optional MLflow logging (only if MLflow is available)
try:
    import mlflow  # type: ignore

    with mlflow.start_run(run_name="extract_raw_data"):
        mlflow.log_param("data_source", data_source)
        mlflow.log_metric("raw_rows", int(df.shape[0]))
        mlflow.log_metric("raw_columns", int(df.shape[1]))
    print("✓ Logged extract metrics to MLflow.")
except Exception as e:
    print(f"⚠️ MLflow logging skipped: {e}")

# Validate data quality (uses the revised function name)
try:
    results = validate_dataset(df, run_name="raw_data_validation")
    print(f"✅ Data validation: {results['message']}")
except NameError:
    # Fallback if the validator isn't in scope
    print("ℹ️ validate_dataset not found; skipping validation step.")
except Exception as e:
    print(f"❌ Validation error: {e}")


In [None]:
import matplotlib.pyplot as plt
import seaborn as sns

# --- Visualise gaps in the dataset ---
plt.figure(figsize=(14, 6))
sns.heatmap(
    df.isna(),
    cmap="viridis",
    cbar=True,
    yticklabels=False
)
plt.title("Heatmap of Missing Data", fontsize=14)
plt.xlabel("Columns")
plt.ylabel("Rows")
plt.tight_layout()
plt.show()

# --- Summary of missing values by column ---
missing_counts = df.isna().sum()
print("Missing values per column:")
print(missing_counts)


In [None]:
import matplotlib.pyplot as plt
import seaborn as sns

# Some palettes to rotate through
palettes = [
    "Set1", "Set2", "Set3",
    "Paired", "Pastel1", "Pastel2",
    "Dark2", "tab10", "tab20"
]

# Identify categorical (string/object) columns
cat_columns = df.select_dtypes(include="object").columns

# Plot each categorical feature with a different colour scheme
for i, feature in enumerate(cat_columns):
    palette_choice = palettes[i % len(palettes)]

    plt.figure(figsize=(10, 4))
    sns.countplot(
        data=df,
        x=feature,
        order=df[feature].value_counts().index,
        palette=palette_choice
    )
    plt.title(f"Distribution of values in '{feature}'", fontsize=13)
    plt.xticks(rotation=45, ha="right")
    plt.ylabel("Frequency")
    plt.xlabel(feature)
    plt.tight_layout()
    plt.show()


In [None]:
import matplotlib.pyplot as plt
import seaborn as sns

# Identify numeric columns (exclude encoded target if present)
num_columns = df.select_dtypes(include=["float64", "int64"]).columns.drop(
    "class_encoded", errors="ignore"
)

# Colour options to rotate through
colors = [
    "skyblue", "salmon", "lightgreen", "plum",
    "gold", "teal", "coral", "khaki"
]

# Plot histogram + KDE for each numeric feature
for i, feature in enumerate(num_columns):
    color_choice = colors[i % len(colors)]

    plt.figure(figsize=(8, 4))
    sns.histplot(df[feature], kde=True, color=color_choice, bins=30)
    plt.title(f"Distribution of '{feature}'", fontsize=13)
    plt.xlabel(feature)
    plt.ylabel("Count")
    plt.tight_layout()
    plt.show()


In [None]:
import os
import json
import numpy as np
import pandas as pd
from sklearn.preprocessing import LabelEncoder

print("Starting data transformation...")

# ---------- MLflow (optional) ----------
_mlflow_available = False
try:
    import mlflow  # type: ignore
    _mlflow_available = True
except Exception as _e:
    print(f"MLflow not available: {_e}")

# Helper: log metrics/params if MLflow is available
class _ML:
    @staticmethod
    def start_run(run_name):
        if _mlflow_available:
            return mlflow.start_run(run_name=run_name)
        # Dummy context manager if MLflow is not available
        class _Noop:
            def __enter__(self): return None
            def __exit__(self, *args): return False
        return _Noop()

    @staticmethod
    def log_metric(k, v):
        if _mlflow_available:
            try:
                mlflow.log_metric(k, float(v))
            except Exception as e:
                print(f"Warning: failed to log metric {k}: {e}")

    @staticmethod
    def log_param(k, v):
        if _mlflow_available:
            try:
                # Ensure stringifiable types
                if isinstance(v, (dict, list, tuple)):
                    v = json.dumps(v, ensure_ascii=False)
                mlflow.log_param(k, str(v))
            except Exception as e:
                print(f"Warning: failed to log param {k}: {e}")

    @staticmethod
    def log_artifact(path):
        if _mlflow_available:
            try:
                mlflow.log_artifact(path)
            except Exception as e:
                print(f"Warning: failed to log artifact {path}: {e}")

# ---------- Encoding / Imputation helpers ----------
def sample_impute_categorical(series: pd.Series, rng: np.random.Generator | None = None) -> pd.Series:
    """
    Impute missing values in a categorical series by random sampling from observed non-null values.
    Returns the imputed series with original (string) categories.
    """
    s = series.astype("string")
    mask = s.isna()
    if not mask.any():
        return s
    non_null = s[~mask]
    if non_null.empty:
        # If a column is entirely missing, fill with a placeholder
        return s.fillna("Unknown")
    rng = rng or np.random.default_rng()
    sampled = rng.choice(non_null.to_numpy(), size=int(mask.sum()), replace=True)
    s.loc[mask] = sampled
    return s

def label_encode(series: pd.Series) -> tuple[pd.Series, dict]:
    """
    Label-encode a categorical series. Returns encoded integers and mapping dict {label: code}.
    """
    le = LabelEncoder()
    encoded = le.fit_transform(series.astype("string"))
    mapping = {label: int(code) for code, label in enumerate(le.classes_)}
    return pd.Series(encoded, index=series.index), mapping

# ---------- Transform pipeline ----------
with _ML.start_run(run_name="data_preprocessing"):
    # Log original shape
    _ML.log_metric("original_rows", df.shape[0])
    _ML.log_metric("original_columns", df.shape[1])

    # 1) Drop columns with too many missing (as per your list, if present)
    columns_to_drop = ['gill_spacing', 'stem_surface', 'stem_root', 'spore_print_color', 'veil_type', 'veil_color']
    existing_to_drop = [c for c in columns_to_drop if c in df.columns]
    if existing_to_drop:
        df = df.drop(columns=existing_to_drop)
    _ML.log_param("dropped_columns", existing_to_drop)

    # 2) Encode and impute selected categorical columns (if present)
    #    - impute missing by sampling from observed values
    #    - keep them as strings (post-imputation), as in your original flow
    rng = np.random.default_rng()
    encoded_columns = []

    for col in ["cap_surface", "gill_attachment", "ring_type"]:
        if col in df.columns:
            # Impute missing with sampling on strings
            df[col] = sample_impute_categorical(df[col], rng=rng)
            encoded_columns.append(col)
    _ML.log_param("encoded_columns", encoded_columns)

    # 3) Encode target and boolean columns into new *_encoded columns (if present)
    target_bool_cols = [
        ("class", "class_encoded"),
        ("does_bruise_or_bleed", "does_bruise_or_bleed_encoded"),
        ("has_ring", "has_ring_encoded"),
    ]
    encoding_maps = {}
    for src, dst in target_bool_cols:
        if src in df.columns:
            enc_series, mapping = label_encode(df[src])
            df[dst] = enc_series
            encoding_maps[dst] = mapping
    _ML.log_param("encoding_maps", encoding_maps)

    # 4) Handle rare categories by collapsing infrequent labels to "Other"
    possible_columns = ['habitat', 'stem_color', 'gill_color', 'cap_color', 'cap_shape', 'cap_surface', 'ring_type']
    rare_threshold = 1000  # same as your original intent
    rare_category_mapping = {}
    for col in [c for c in possible_columns if c in df.columns]:
        counts = df[col].astype("string").value_counts(dropna=False)
        rare_vals = counts[counts < rare_threshold].index.tolist()
        if rare_vals:
            rare_category_mapping[col] = [str(v) for v in rare_vals]
            df[col] = df[col].astype("string").apply(lambda x: "Other" if x in rare_vals else x)

    _ML.log_param("rare_category_mapping", rare_category_mapping)

    # 5) Drop original target/boolean text columns if they existed
    drop_after_encoding = [c for c, _ in target_bool_cols if c in df.columns]
    if drop_after_encoding:
        df = df.drop(columns=drop_after_encoding)

    # 6) Log processed shape and missingness
    _ML.log_metric("processed_rows", df.shape[0])
    _ML.log_metric("processed_columns", df.shape[1])
    _ML.log_metric("missing_values_after_processing", int(df.isna().sum().sum()))

    # 7) Save processed data
    processed_dir = "data/processed"
    os.makedirs(processed_dir, exist_ok=True)
    processed_data_path = os.path.join(processed_dir, "notebook_processed_data.csv")
    df.to_csv(processed_data_path, index=False)
    _ML.log_artifact(processed_data_path)

    print(f"Data preprocessing completed. Final shape: {df.shape}")

# ---------- Validate processed data ----------
try:
    results = validate_dataset(df, run_name="processed_data_validation")
    print(f"Processed data validation: {results.get('message', 'UNKNOWN')}")
except NameError:
    print("validate_dataset not found; skipping processed data validation.")
except Exception as e:
    print(f"Validation error: {e}")


Starting data transformation...
MLflow not available: No module named 'mlflow'


NameError: name 'df' is not defined

In [None]:
import numpy as np
from scipy.stats import zscore

# Define numeric columns to screen for outliers
cols_to_check = ["cap_diameter", "stem_height", "stem_width"]

# Drop rows where any chosen column has |z-score| >= 2.5
mask = np.ones(len(df), dtype=bool)
for c in cols_to_check:
    if c in df.columns:
        z = np.abs(zscore(df[c], nan_policy="omit"))
        mask &= (z < 2.5) | np.isnan(z)   # keep NaNs, only filter true outliers

df = df[mask].reset_index(drop=True)

print(f"Outlier filtering complete. Remaining shape: {df.shape}")


In [None]:
# Streamlined XGBoost Model - Single Run (no A/B testing)
print(f"XGBoost model trained successfully. Accuracy: {model_results['accuracy']:.4f}")
print("Single model approach applied for efficiency.")
print("Model is ready for evaluation and deployment.")


In [None]:
# --- Enhanced XGBoost Model Training with Comprehensive MLflow Tracking (no emojis) ---

import os
import json
from datetime import datetime

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

from sklearn.model_selection import train_test_split
from sklearn.metrics import (
    accuracy_score, precision_score, recall_score, f1_score,
    matthews_corrcoef, roc_auc_score, confusion_matrix
)

import xgboost as xgb

# ---------- Optional MLflow wrappers ----------
_mlflow_available = False
try:
    import mlflow
    import mlflow.sklearn  # ensure sklearn flavor available
    _mlflow_available = True
except Exception as _e:
    print(f"MLflow not available: {_e}")

class _ML:
    @staticmethod
    def start_run(run_name, experiment_id=None, nested=False):
        if _mlflow_available:
            kwargs = {"run_name": run_name, "nested": nested}
            if experiment_id is not None:
                kwargs["experiment_id"] = experiment_id
            return mlflow.start_run(**kwargs)
        class _Noop:
            def __enter__(self): return type("obj", (), {"info": type("obj", (), {"run_id": "NO_MLFLOW"})})()
            def __exit__(self, *args): return False
        return _Noop()

    @staticmethod
    def log_param(k, v):
        if _mlflow_available:
            try:
                if isinstance(v, (dict, list, tuple)):
                    v = json.dumps(v, ensure_ascii=False)
                mlflow.log_param(k, str(v))
            except Exception as e:
                print(f"Warning: could not log param {k}: {e}")

    @staticmethod
    def log_metric(k, v):
        if _mlflow_available:
            try:
                mlflow.log_metric(k, float(v))
            except Exception as e:
                print(f"Warning: could not log metric {k}: {e}")

    @staticmethod
    def log_artifact(path):
        if _mlflow_available and os.path.exists(path):
            try:
                mlflow.log_artifact(path)
            except Exception as e:
                print(f"Warning: could not log artifact {path}: {e}")

    @staticmethod
    def get_tracking_uri():
        if _mlflow_available:
            try:
                return mlflow.get_tracking_uri()
            except Exception:
                return "UNKNOWN"
        return "NO_MLFLOW"

print("Starting comprehensive XGBoost model training with MLflow tracking...")

# ---------- Prepare features/target ----------
if "class_encoded" not in df.columns:
    raise KeyError("Target column 'class_encoded' not found in dataframe.")

X = df.drop(columns=["class_encoded"])
y = df["class_encoded"]

print("Data prepared:")
print(f"  Features shape: {X.shape}")
print(f"  Target shape:   {y.shape}")

try:
    target_counts = y.value_counts().to_dict()
    print(f"  Target distribution: {target_counts}")
except Exception:
    pass

# ---------- Train/Test split ----------
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.30, random_state=42, stratify=y if len(np.unique(y)) > 1 else None
)
print("Data split complete:")
print(f"  Training set: {X_train.shape}")
print(f"  Test set:     {X_test.shape}")

# ---------- MLflow Parent Run ----------
# Use existing variable `experiment_id` if defined, else None.
try:
    _experiment_id = experiment_id  # will NameError if not defined
except NameError:
    _experiment_id = None

parent_run_name = f"notebook_comprehensive_xgboost_{datetime.now().strftime('%Y%m%d_%H%M%S')}"

with _ML.start_run(run_name=parent_run_name, experiment_id=_experiment_id) as parent_run:
    print(f"Started MLflow parent run: {getattr(getattr(parent_run, 'info', None), 'run_id', 'NO_MLFLOW')}")

    # Log dataset info
    _ML.log_param("dataset_source", "notebook_comprehensive")
    _ML.log_param("model_approach", "single_xgboost_comprehensive")
    _ML.log_param("total_samples", len(df))
    _ML.log_param("total_features", X.shape[1])
    _ML.log_param("training_samples", len(X_train))
    _ML.log_param("test_samples", len(X_test))
    _ML.log_param("target_classes", len(pd.Series(y).unique()))

    # Log a small sample of feature names
    _ML.log_param("sample_features", X.columns.tolist()[:20])

    print("Training XGBoost model...")

    # ---------- Child run for actual training ----------
    with _ML.start_run(run_name="xgboost_comprehensive_training", nested=True) as child_run:
        print(f"Started MLflow child run: {getattr(getattr(child_run, 'info', None), 'run_id', 'NO_MLFLOW')}")

        # Model definition (mirrors your parameters)
        xgb_model = xgb.XGBClassifier(
            use_label_encoder=False,
            eval_metric="logloss",
            random_state=42,
            n_estimators=100,
            max_depth=6,
            learning_rate=0.1,
            subsample=0.8,
            colsample_bytree=0.8,
            gamma=0,
            min_child_weight=1,
            reg_alpha=0,
            reg_lambda=1
        )

        # Log all hyperparameters
        for param_name, param_value in xgb_model.get_params().items():
            _ML.log_param(f"xgb_{param_name}", param_value)
        print("Hyperparameters logged.")

        # Train
        xgb_model.fit(X_train, y_train)

        # Predictions
        y_pred = xgb_model.predict(X_test)

        # Probabilities (handle binary vs multiclass)
        proba = None
        try:
            proba = xgb_model.predict_proba(X_test)
        except Exception:
            pass

        # Determine metrics according to number of classes
        classes = np.unique(y_train)
        binary = len(classes) == 2

        # Accuracy, precision, recall, F1
        accuracy = accuracy_score(y_test, y_pred)
        if binary:
            # Determine positive label robustly (prefer label 1 if present)
            pos_label = 1 if 1 in classes else classes.max()
            precision = precision_score(y_test, y_pred, pos_label=pos_label, zero_division=0)
            recall    = recall_score(y_test, y_pred, pos_label=pos_label, zero_division=0)
            f1        = f1_score(y_test, y_pred, pos_label=pos_label, zero_division=0)
        else:
            precision = precision_score(y_test, y_pred, average="weighted", zero_division=0)
            recall    = recall_score(y_test, y_pred, average="weighted", zero_division=0)
            f1        = f1_score(y_test, y_pred, average="weighted", zero_division=0)

        # MCC (defined for binary and multiclass)
        mcc = matthews_corrcoef(y_test, y_pred)

        # AUC
        auc = None
        if proba is not None:
            try:
                if binary:
                    # Use column corresponding to the positive label
                    # Map class index to column
                    class_to_col = {c: i for i, c in enumerate(xgb_model.classes_)}
                    pos_label = 1 if 1 in classes else classes.max()
                    auc = roc_auc_score(y_test, proba[:, class_to_col[pos_label]])
                else:
                    auc = roc_auc_score(y_test, proba, multi_class="ovr", average="weighted")
            except Exception as e:
                print(f"Warning: AUC could not be computed: {e}")

        # Train accuracy and generalisation gap
        y_train_pred = xgb_model.predict(X_train)
        train_accuracy = accuracy_score(y_train, y_train_pred)
        overfitting_gap = train_accuracy - accuracy

        # Log metrics
        metrics_dict = {
            "accuracy": accuracy,
            "precision": precision,
            "recall": recall,
            "f1_score": f1,
            "mcc": mcc,
            "train_accuracy": train_accuracy,
            "overfitting_gap": overfitting_gap
        }
        if auc is not None:
            metrics_dict["auc"] = auc

        for k, v in metrics_dict.items():
            _ML.log_metric(k, v)
        print(f"Logged {len(metrics_dict)} metrics to MLflow.")

        # Training metadata
        _ML.log_param("training_timestamp", datetime.now().isoformat())
        _ML.log_param("notebook_version", "comprehensive_v1.0")
        _ML.log_param("data_preprocessing", "label_encoding_outlier_removal")

        # Feature importance (top 20)
        if hasattr(xgb_model, "feature_importances_"):
            importances = dict(zip(X.columns, xgb_model.feature_importances_))
            top20 = sorted(importances.items(), key=lambda x: x[1], reverse=True)[:20]
            for rank, (feat, imp) in enumerate(top20, start=1):
                _ML.log_metric(f"feature_importance_rank_{rank:02d}_{feat}", float(imp))

        # Confusion matrix (labels from actual classes)
        cm = confusion_matrix(y_test, y_pred, labels=xgb_model.classes_)
        plt.figure(figsize=(8, 6))
        sns.heatmap(
            cm, annot=True, fmt="d", cmap="Blues",
            xticklabels=[str(c) for c in xgb_model.classes_],
            yticklabels=[str(c) for c in xgb_model.classes_]
        )
        plt.title("XGBoost - Confusion Matrix")
        plt.xlabel("Predicted")
        plt.ylabel("Actual")
        plt.tight_layout()

        cm_path = "comprehensive_confusion_matrix_xgboost.png"
        plt.savefig(cm_path, dpi=300, bbox_inches="tight")
        _ML.log_artifact(cm_path)
        plt.close()
        print("Confusion matrix artifact saved and logged (if MLflow available).")

        # Log model with signature and input example (best-effort)
        try:
            if _mlflow_available:
                signature = mlflow.models.infer_signature(X_train, xgb_model.predict_proba(X_train) if proba is not None else xgb_model.predict(X_train))
                input_example = X_train.head(3)
                mlflow.sklearn.log_model(
                    xgb_model,
                    "comprehensive_xgboost_model",
                    signature=signature,
                    input_example=input_example,
                    registered_model_name="mushroom_classifier_comprehensive_xgboost"
                )
                print("Model logged to MLflow with signature and example.")
        except Exception as e:
            print(f"Warning: could not log model with signature: {e}")
            try:
                if _mlflow_available:
                    mlflow.sklearn.log_model(xgb_model, "comprehensive_xgboost_model")
                    print("Model logged to MLflow without signature.")
            except Exception as e2:
                print(f"Warning: could not log model at all: {e2}")

        # Store results for later use
        model_results = {
            "accuracy": accuracy,
            "precision": precision,
            "recall": recall,
            "f1": f1,
            "mcc": mcc,
            "train_accuracy": train_accuracy,
            "overfitting_gap": overfitting_gap
        }
        if auc is not None:
            model_results["auc"] = auc

        print("\nComprehensive XGBoost training completed.")
        print(f"  MLflow Parent Run ID: {getattr(getattr(parent_run, 'info', None), 'run_id', 'NO_MLFLOW')}")
        print(f"  MLflow Child  Run ID: {getattr(getattr(child_run, 'info', None), 'run_id', 'NO_MLFLOW')}")
        print("  Model performance:")
        for metric, value in model_results.items():
            print(f"    {metric.replace('_', ' ').title()}: {value:.4f}")

        print(f"\nMLflow tracking URI: {_ML.get_tracking_uri()}")
        try:
            print(f"Experiment name: {experiment_name}")
        except NameError:
            pass

print("Comprehensive XGBoost model training with MLflow tracking completed.")


In [None]:
# Report final XGBoost training result
acc = None
try:
    acc = float(model_results.get("accuracy")) if isinstance(model_results, dict) else None
except Exception:
    acc = None

if acc is not None:
    print(f"XGBoost model trained. Accuracy: {acc:.4f}")
else:
    print("XGBoost model trained. Accuracy not available.")

print("Model is ready for evaluation and monitoring.")


In [None]:
import os
import json
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

from sklearn.metrics import (
    accuracy_score, precision_score, recall_score, f1_score,
    matthews_corrcoef, roc_auc_score, confusion_matrix
)

# Optional MLflow wrappers
_mlflow_available = False
try:
    import mlflow
    _mlflow_available = True
except Exception as _e:
    print(f"MLflow not available: {_e}")

class _ML:
    @staticmethod
    def start_run(run_name):
        if _mlflow_available:
            return mlflow.start_run(run_name=run_name)
        class _Noop:
            def __enter__(self):
                return type("obj", (), {"info": type("obj", (), {"run_id": "NO_MLFLOW"})})()
            def __exit__(self, *args): return False
        return _Noop()

    @staticmethod
    def log_metric(k, v):
        if _mlflow_available:
            try: mlflow.log_metric(k, float(v))
            except Exception as e: print(f"Warning: failed to log metric {k}: {e}")

    @staticmethod
    def log_param(k, v):
        if _mlflow_available:
            try:
                if isinstance(v, (dict, list, tuple)):
                    v = json.dumps(v, ensure_ascii=False)
                mlflow.log_param(k, str(v))
            except Exception as e:
                print(f"Warning: failed to log param {k}: {e}")

    @staticmethod
    def log_artifact(path):
        if _mlflow_available and os.path.exists(path):
            try: mlflow.log_artifact(path)
            except Exception as e: print(f"Warning: failed to log artifact {path}: {e}")


def evaluate_xgboost_model_final(model, X_train, y_train, X_test, y_test,
                                 overfit_threshold: float = 0.10,
                                 high_conf_p: float = 0.80,
                                 deploy_thresh: dict | None = None):
    """
    Final XGBoost evaluation without model comparison.
    Handles binary and multiclass cases, logs to MLflow if available, and saves artifacts.
    """
    if deploy_thresh is None:
        deploy_thresh = {
            "accuracy": 0.95,
            "precision": 0.95,
            "recall": 0.95
        }

    with _ML.start_run(run_name="final_xgboost_evaluation"):
        # Predictions
        y_pred_train = model.predict(X_train)
        y_pred_test = model.predict(X_test)

        # Probabilities (if available)
        proba = None
        try:
            proba = model.predict_proba(X_test)
        except Exception:
            pass

        # Binary vs multiclass handling
        classes = np.unique(y_train)
        binary = len(classes) == 2

        # Metrics
        accuracy = accuracy_score(y_test, y_pred_test)
        if binary:
            # Choose a positive label robustly (prefer 1 if present)
            pos_label = 1 if 1 in classes else classes.max()
            precision = precision_score(y_test, y_pred_test, pos_label=pos_label, zero_division=0)
            recall    = recall_score(y_test, y_pred_test, pos_label=pos_label, zero_division=0)
            f1        = f1_score(y_test, y_pred_test, pos_label=pos_label, zero_division=0)
        else:
            precision = precision_score(y_test, y_pred_test, average="weighted", zero_division=0)
            recall    = recall_score(y_test, y_pred_test, average="weighted", zero_division=0)
            f1        = f1_score(y_test, y_pred_test, average="weighted", zero_division=0)

        mcc = matthews_corrcoef(y_test, y_pred_test)

        auc = None
        if proba is not None:
            try:
                if binary:
                    # map class->column index
                    class_to_col = {c: i for i, c in enumerate(model.classes_)}
                    pos_label = 1 if 1 in classes else classes.max()
                    auc = roc_auc_score(y_test, proba[:, class_to_col[pos_label]])
                else:
                    auc = roc_auc_score(y_test, proba, multi_class="ovr", average="weighted")
            except Exception as e:
                print(f"Warning: AUC not computed: {e}")

        # Training accuracy and overfitting gap
        train_accuracy = accuracy_score(y_train, y_pred_train)
        overfitting_gap = train_accuracy - accuracy

        # Collect and log metrics
        metrics = {
            "accuracy": accuracy,
            "precision": precision,
            "recall": recall,
            "f1_score": f1,
            "mcc": mcc,
            "train_accuracy": train_accuracy,
            "overfitting_gap": overfitting_gap
        }
        if auc is not None:
            metrics["auc"] = auc

        for k, v in metrics.items():
            _ML.log_metric(k, v)

        # Overfitting message
        if overfitting_gap > overfit_threshold:
            _ML.log_param("overfitting_warning", f"gap>{overfit_threshold}")
            print(f"Potential overfitting detected (gap: {overfitting_gap:.3f})")
        else:
            print(f"Generalisation gap acceptable (gap: {overfitting_gap:.3f})")

        # Print summary
        print("\n--- Final XGBoost Results ---")
        for k, v in metrics.items():
            print(f"{k.replace('_', ' ').title()}: {v:.4f}")

        # Confusion matrices (counts and row-normalised)
        labels = getattr(model, "classes_", np.unique(y_test))
        cm = confusion_matrix(y_test, y_pred_test, labels=labels)
        cm_row = cm.astype(float) / np.maximum(cm.sum(axis=1, keepdims=True), 1.0)  # avoid div-by-zero

        fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))

        sns.heatmap(cm, annot=True, fmt="d", cmap="Blues",
                    xticklabels=[str(c) for c in labels],
                    yticklabels=[str(c) for c in labels],
                    ax=ax1)
        ax1.set_title("Confusion Matrix (Counts)")
        ax1.set_xlabel("Predicted")
        ax1.set_ylabel("Actual")

        sns.heatmap(cm_row * 100.0, annot=True, fmt=".1f", cmap="Blues",
                    xticklabels=[str(c) for c in labels],
                    yticklabels=[str(c) for c in labels],
                    ax=ax2)
        ax2.set_title("Confusion Matrix (Row %)")
        ax2.set_xlabel("Predicted")
        ax2.set_ylabel("Actual")

        plt.tight_layout()
        os.makedirs("plots", exist_ok=True)
        plot_path = "plots/final_confusion_matrix_xgboost.png"
        plt.savefig(plot_path, dpi=300, bbox_inches="tight")
        _ML.log_artifact(plot_path)
        plt.show()

        # Feature importance (top 20)
        if hasattr(model, "feature_importances_"):
            fi = pd.DataFrame({
                "feature": X_test.columns,
                "importance": model.feature_importances_
            }).sort_values("importance", ascending=False)

            top = fi.head(20)
            plt.figure(figsize=(12, 8))
            plt.barh(range(len(top)), top["importance"])
            plt.yticks(range(len(top)), top["feature"])
            plt.xlabel("Feature Importance")
            plt.title("Top 20 Feature Importances - XGBoost Model")
            plt.gca().invert_yaxis()
            fi_path = "plots/final_feature_importance_xgboost.png"
            plt.savefig(fi_path, dpi=300, bbox_inches="tight")
            _ML.log_artifact(fi_path)
            plt.show()

        # Deployment readiness checks
        # High-confidence predictions metric applies only for binary with proba
        high_conf_ok = False
        if binary and proba is not None:
            # choose positive label probability column consistently
            class_to_col = {c: i for i, c in enumerate(model.classes_)}
            pos_label = 1 if 1 in classes else classes.max()
            pos_idx = class_to_col[pos_label]
            y_pred_proba = proba[:, pos_idx]
            high_conf_ok = ((y_pred_proba > high_conf_p).sum() + (y_pred_proba < (1 - high_conf_p)).sum()) > (len(y_pred_proba) * 0.8)

        deployment_readiness = {
            "accuracy_threshold": accuracy > deploy_thresh["accuracy"],
            "precision_threshold": precision > deploy_thresh["precision"],
            "recall_threshold": recall > deploy_thresh["recall"],
            "no_severe_overfitting": overfitting_gap < overfit_threshold,
            "high_confidence_predictions": high_conf_ok if binary and proba is not None else False
        }

        deployment_ready = all(deployment_readiness.values())
        _ML.log_param("deployment_ready", deployment_ready)
        for check, passed in deployment_readiness.items():
            _ML.log_metric(f"deployment_check_{check}", int(passed))
            status = "PASS" if passed else "FAIL"
            print(f"{status} - {check.replace('_', ' ').title()}: {passed}")

        print(f"\nModel Deployment Ready: {deployment_ready}")

        return metrics

# Example call (keep your variables as is)
print("Starting final XGBoost evaluation...")
final_metrics = evaluate_xgboost_model_final(xgb_model, X_train, y_train, X_test, y_test)
print("Final XGBoost evaluation completed.")


In [None]:
import os
import json
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

from sklearn.calibration import calibration_curve

# ---------- Optional MLflow wrappers ----------
_mlflow_available = False
try:
    import mlflow
    _mlflow_available = True
except Exception as _e:
    print(f"MLflow not available: {_e}")

class _ML:
    @staticmethod
    def start_run(run_name):
        if _mlflow_available:
            return mlflow.start_run(run_name=run_name)
        class _Noop:
            def __enter__(self):
                return type("obj", (), {"info": type("obj", (), {"run_id": "NO_MLFLOW"})})()
            def __exit__(self, *args): return False
        return _Noop()

    @staticmethod
    def log_metric(k, v):
        if _mlflow_available:
            try: mlflow.log_metric(k, float(v))
            except Exception as e: print(f"Warning: failed to log metric {k}: {e}")

    @staticmethod
    def log_param(k, v):
        if _mlflow_available:
            try:
                if isinstance(v, (dict, list, tuple)):
                    v = json.dumps(v, ensure_ascii=False)
                mlflow.log_param(k, str(v))
            except Exception as e:
                print(f"Warning: failed to log param {k}: {e}")

    @staticmethod
    def log_artifact(path):
        if _mlflow_available and os.path.exists(path):
            try: mlflow.log_artifact(path)
            except Exception as e: print(f"Warning: failed to log artifact {path}: {e}")


def monitor_xgboost_production_readiness(
    model,
    X_test: pd.DataFrame,
    y_test: pd.Series,
    high_conf_threshold: float = 0.80,
    balance_target: float = 0.50
):
    """
    Monitor an already-trained XGBoost (or sklearn-compatible) classifier for production readiness.
    Handles binary and multiclass. Calibration curve is computed only for binary with predict_proba.
    """

    with _ML.start_run(run_name="production_readiness_monitoring"):
        # Predictions
        y_pred = model.predict(X_test)

        # Probabilities (best-effort)
        proba = None
        try:
            proba = model.predict_proba(X_test)
        except Exception:
            pass

        # Determine classes and binary/multiclass
        classes = getattr(model, "classes_", np.unique(y_test))
        binary = len(classes) == 2

        # Choose a positive label consistently (prefer 1 if present)
        pos_label = 1 if binary and (1 in classes) else (classes.max() if binary else None)

        # Confidence handling
        avg_conf = np.nan
        conf_var = np.nan
        high_conf_rate = np.nan

        if proba is not None:
            if binary:
                class_to_col = {c: i for i, c in enumerate(classes)}
                pos_idx = class_to_col[pos_label]
                y_pred_proba = proba[:, pos_idx]
                avg_conf = float(np.mean(y_pred_proba))
                conf_var = float(np.var(y_pred_proba))
                high_conf_rate = float(np.mean((y_pred_proba > high_conf_threshold) | (y_pred_proba < (1 - high_conf_threshold))))
            else:
                # For multiclass, use the max class probability as confidence
                max_conf = proba.max(axis=1)
                avg_conf = float(np.mean(max_conf))
                conf_var = float(np.var(max_conf))
                high_conf_rate = float(np.mean(max_conf > high_conf_threshold))

        # Positive prediction rate and variance
        if binary:
            pred_pos = (y_pred == pos_label).astype(float)
            positive_rate = float(pred_pos.mean())
            prediction_variance = float(pred_pos.var())
            prediction_balance = float(abs(positive_rate - balance_target))
        else:
            # For multiclass, define "positive rate" as proportion of the most frequent predicted class
            _, counts = np.unique(y_pred, return_counts=True)
            positive_rate = float(counts.max() / len(y_pred))
            # Variance of predicted class indices as a simple dispersion proxy
            # (still loggable, though less interpretable than binary case)
            class_to_idx = {c: i for i, c in enumerate(classes)}
            pred_idx = np.vectorize(class_to_idx.get)(y_pred)
            prediction_variance = float(np.var(pred_idx))
            # Balance relative to uniform distribution across classes
            uniform = 1.0 / len(classes)
            prediction_balance = float(abs(positive_rate - uniform))

        # Aggregate metrics
        metrics = {
            "positive_prediction_rate": positive_rate,
            "prediction_variance": prediction_variance,
            "avg_prediction_confidence": avg_conf,
            "confidence_variance": conf_var,
            "high_confidence_rate": high_conf_rate,
            "prediction_balance": prediction_balance
        }

        # Log metrics
        for k, v in metrics.items():
            if not (isinstance(v, float) and (np.isnan(v) or np.isinf(v))):
                _ML.log_metric(k, v)

        # Feature importance
        if hasattr(model, "feature_importances_"):
            fi_df = pd.DataFrame({
                "feature": X_test.columns,
                "importance": model.feature_importances_
            }).sort_values("importance", ascending=False)
            top_features = fi_df.head(10)["feature"].tolist()
            _ML.log_param("top_10_features", top_features)
            if not fi_df.empty:
                _ML.log_metric("top_feature_dominance", float(fi_df.iloc[0]["importance"]))

        # Stability checks (simple heuristics)
        if binary:
            balanced_predictions = metrics["prediction_balance"] < 0.30
        else:
            # For multiclass, require no class to dominate excessively
            balanced_predictions = positive_rate < 0.70

        confident_predictions = (not np.isnan(high_conf_rate)) and (high_conf_rate > 0.70)
        stable_variance = metrics["prediction_variance"] < (0.30 if binary else 1.00)
        reasonable_confidence = (not np.isnan(avg_conf)) and (0.30 < avg_conf < 0.70)

        stability_checks = {
            "balanced_predictions": balanced_predictions,
            "confident_predictions": confident_predictions,
            "stable_variance": stable_variance,
            "reasonable_confidence": reasonable_confidence
        }

        stability_score = sum(bool(v) for v in stability_checks.values()) / len(stability_checks)
        _ML.log_metric("stability_score", stability_score)

        print("Production Readiness Monitoring Results:")
        print(f"  Positive prediction rate: {metrics['positive_prediction_rate']:.3f}")
        if not np.isnan(avg_conf):
            print(f"  Average confidence: {metrics['avg_prediction_confidence']:.3f}")
        if not np.isnan(high_conf_rate):
            print(f"  High confidence rate: {metrics['high_confidence_rate']:.3f}")
        print(f"  Stability score: {stability_score:.3f}")

        for check, passed in stability_checks.items():
            status = "PASS" if passed else "FAIL"
            print(f"  {status} - {check.replace('_', ' ').title()}: {passed}")

        # Calibration curve (binary only with probabilities)
        calibration_error = np.nan
        if binary and proba is not None:
            class_to_col = {c: i for i, c in enumerate(classes)}
            pos_idx = class_to_col[pos_label]
            y_pred_proba = proba[:, pos_idx]

            frac_pos, mean_pred = calibration_curve(y_test, y_pred_proba, n_bins=10, strategy="uniform")

            plt.figure(figsize=(10, 6))
            plt.plot(mean_pred, frac_pos, "s-", label="Model", linewidth=2, markersize=6)
            plt.plot([0, 1], [0, 1], "k--", label="Perfectly calibrated", linewidth=1)
            plt.xlabel("Mean Predicted Probability")
            plt.ylabel("Fraction of Positives")
            plt.title("Model Calibration")
            plt.legend()
            plt.grid(alpha=0.3)

            os.makedirs("plots", exist_ok=True)
            cal_path = "plots/production_calibration_xgboost.png"
            plt.savefig(cal_path, dpi=300, bbox_inches="tight")
            _ML.log_artifact(cal_path)
            plt.show()

            calibration_error = float(np.mean(np.abs(frac_pos - mean_pred)))
            _ML.log_metric("calibration_error", calibration_error)
            print(f"  Calibration error: {calibration_error:.4f}")
        else:
            print("  Calibration skipped (requires binary classification with predict_proba).")

        # Final production readiness flag
        production_ready = (stability_score > 0.70) and (np.isnan(calibration_error) or calibration_error < 0.10)
        _ML.log_param("production_ready", production_ready)
        print(f"\nProduction readiness: {production_ready}")

        return {
            "stability_score": stability_score,
            "calibration_error": calibration_error,
            "production_ready": production_ready
        }

# Run production readiness monitoring
print("Starting production readiness monitoring...")
production_metrics = monitor_xgboost_production_readiness(xgb_model, X_test, y_test)
print("Production readiness monitoring completed.")


In [None]:
import os
import json
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

# ---------- Optional MLflow wrappers ----------
_mlflow_available = False
try:
    import mlflow
    _mlflow_available = True
except Exception as _e:
    print(f"MLflow not available: {_e}")

class _ML:
    @staticmethod
    def start_run(run_name):
        if _mlflow_available:
            return mlflow.start_run(run_name=run_name)
        class _Noop:
            def __enter__(self):
                return type("obj", (), {"info": type("obj", (), {"run_id": "NO_MLFLOW"})})()
            def __exit__(self, *args): return False
        return _Noop()

    @staticmethod
    def log_metric(k, v):
        if _mlflow_available:
            try: mlflow.log_metric(k, float(v))
            except Exception as e: print(f"Warning: failed to log metric {k}: {e}")

    @staticmethod
    def log_param(k, v):
        if _mlflow_available:
            try:
                if isinstance(v, (dict, list, tuple)):
                    v = json.dumps(v, ensure_ascii=False)
                mlflow.log_param(k, str(v))
            except Exception as e:
                print(f"Warning: failed to log param {k}: {e}")

    @staticmethod
    def log_artifact(path):
        if _mlflow_available and os.path.exists(path):
            try: mlflow.log_artifact(path)
            except Exception as e: print(f"Warning: failed to log artifact {path}: {e}")

    @staticmethod
    def get_tracking_uri():
        if _mlflow_available:
            try: return mlflow.get_tracking_uri()
            except Exception: return "UNKNOWN"
        return "NO_MLFLOW"


def _get_metric(mdict, *keys, default=None):
    """Fetch a metric from dict using any of several key aliases (case-insensitive)."""
    if not isinstance(mdict, dict):
        return default
    lower_map = {str(k).lower(): v for k, v in mdict.items()}
    for k in keys:
        if str(k).lower() in lower_map:
            return lower_map[str(k).lower()]
    return default


print("Generating streamlined XGBoost MLOps pipeline summary...")

with _ML.start_run(run_name="streamlined_xgboost_pipeline_summary"):
    # Pull metrics with robust key handling (supports both 'Accuracy' and 'accuracy')
    acc = _get_metric(final_metrics, "Accuracy", "accuracy")
    f1  = _get_metric(final_metrics, "F1 Score", "f1_score", "f1")
    auc = _get_metric(final_metrics, "AUC", "auc")
    prec = _get_metric(final_metrics, "Precision", "precision")
    rec  = _get_metric(final_metrics, "Recall", "recall")

    # Build pipeline summary dict
    pipeline_summary = {
        "model_type": "XGBoost_Only",
        "pipeline_approach": "Streamlined_Single_Model",
        "final_accuracy": acc,
        "final_f1_score": f1,
        "final_auc": auc,
        "final_precision": prec,
        "final_recall": rec,
        "data_source": data_source if "data_source" in globals() else "UNKNOWN",
        "feature_count": int(X.shape[1]) if "X" in globals() else -1,
        "training_samples": int(len(X_train)) if "X_train" in globals() else -1,
        "test_samples": int(len(X_test)) if "X_test" in globals() else -1,
        "stability_score": float(production_metrics.get("stability_score", np.nan)) if isinstance(production_metrics, dict) else np.nan,
        "calibration_error": float(production_metrics.get("calibration_error", np.nan)) if isinstance(production_metrics, dict) else np.nan,
        "production_ready": bool(production_metrics.get("production_ready", False)) if isinstance(production_metrics, dict) else False,
        "pipeline_type": "streamlined_xgboost_mlops"
    }

    # Log all metrics/params
    for key, value in pipeline_summary.items():
        if isinstance(value, (int, float)) and not (isinstance(value, float) and (np.isnan(value) or np.isinf(value))):
            _ML.log_metric(key, value)
        else:
            _ML.log_param(key, value)

    # Prepare performance metrics for plotting (only include those available)
    metrics_data = {}
    if acc is not None:  metrics_data["Accuracy"] = acc
    if prec is not None: metrics_data["Precision"] = prec
    if rec is not None:  metrics_data["Recall"] = rec
    if f1 is not None:   metrics_data["F1 Score"] = f1
    if auc is not None:  metrics_data["AUC"] = auc  # may be None for some multiclass setups

    # Create plots
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(16, 6))

    # --- Metrics bar chart ---
    if metrics_data:
        bars = ax1.bar(list(metrics_data.keys()), list(metrics_data.values()), alpha=0.85)
        ax1.set_title("XGBoost Model Performance Metrics", fontsize=14, fontweight="bold")
        ax1.set_ylabel("Score", fontsize=12)
        ax1.set_ylim(0, 1.0)
        ax1.grid(axis="y", alpha=0.3)
        for bar, value in zip(bars, metrics_data.values()):
            ax1.text(bar.get_x() + bar.get_width() / 2, bar.get_height() + 0.01,
                     f"{value:.3f}", ha="center", va="bottom", fontweight="bold", fontsize=10)
    else:
        ax1.set_title("XGBoost Model Performance Metrics (no metrics available)", fontsize=14)

    # --- Production readiness gauge-like bars ---
    # For calibration quality, if calibration_error is NaN, treat quality as NaN too
    cal_err = pipeline_summary["calibration_error"]
    cal_quality = (1 - cal_err) if isinstance(cal_err, (int, float)) and not np.isnan(cal_err) else np.nan
    overall_perf = float(np.nanmean(list(metrics_data.values()))) if metrics_data else np.nan

    readiness_data = {
        "Stability Score": pipeline_summary["stability_score"],
        "Calibration Quality": cal_quality,
        "Overall Performance": overall_perf
    }

    # Assign colours by threshold (green/yellow/red) when value is valid; fall back to grey for NaN
    def colour_for(v):
        if isinstance(v, (int, float)) and not np.isnan(v):
            if v > 0.8: return "#28a745"
            if v > 0.6: return "#ffc107"
            return "#dc3545"
        return "#6c757d"

    colors = [colour_for(v) for v in readiness_data.values()]
    bars2 = ax2.bar(list(readiness_data.keys()), list(readiness_data.values()), color=colors, alpha=0.85)
    ax2.set_title("Production Readiness Assessment", fontsize=14, fontweight="bold")
    ax2.set_ylabel("Score", fontsize=12)
    ax2.set_ylim(0, 1.0)
    ax2.grid(axis="y", alpha=0.3)
    for bar, value in zip(bars2, readiness_data.values()):
        if isinstance(value, (int, float)) and not np.isnan(value):
            ax2.text(bar.get_x() + bar.get_width() / 2, bar.get_height() + 0.01,
                     f"{value:.3f}", ha="center", va="bottom", fontweight="bold", fontsize=10)
        else:
            ax2.text(bar.get_x() + bar.get_width() / 2, 0.02, "N/A", ha="center", va="bottom", fontsize=10)

    plt.tight_layout()

    # Save summary plot
    os.makedirs("plots", exist_ok=True)
    os.makedirs("results", exist_ok=True)
    summary_path = "plots/streamlined_xgboost_summary.png"
    plt.savefig(summary_path, dpi=300, bbox_inches="tight")
    _ML.log_artifact(summary_path)
    plt.show()

    # Save CSV artifacts
    try:
        if metrics_data:
            pd.DataFrame([metrics_data]).to_csv("results/performance_metrics.csv", index=False)
            _ML.log_artifact("results/performance_metrics.csv")

        summary_df = pd.DataFrame([pipeline_summary])
        summary_df.to_csv("results/pipeline_summary.csv", index=False)
        _ML.log_artifact("results/pipeline_summary.csv")
    except Exception as e:
        print(f"Could not save results: {e}")

    # Final summary printout
    print("\n" + "=" * 70)
    print("STREAMLINED MUSHROOM CLASSIFICATION XGBOOST PIPELINE COMPLETE")
    print("=" * 70)
    print(f"Model: XGBoost (Single Model Approach)")
    if acc is not None: print(f"Accuracy: {acc:.4f}")
    if f1  is not None: print(f"F1 Score: {f1:.4f}")
    if auc is not None: print(f"AUC: {auc:.4f}")
    if prec is not None: print(f"Precision: {prec:.4f}")
    if rec  is not None: print(f"Recall: {rec:.4f}")
    print(f"Stability Score: {pipeline_summary['stability_score']}")
    print(f"Calibration Error: {pipeline_summary['calibration_error']}")
    print(f"Production Ready: {pipeline_summary['production_ready']}")
    print(f"MLflow URI: {_ML.get_tracking_uri()}")
    print(f"Results: plots/ and results/ directories")
    print("=" * 70)
    print("No A/B testing required - single model deployment target.")
    print("Focused, efficient, and production-ready pipeline.")
    print("=" * 70)

print("Streamlined XGBoost MLOps pipeline completed successfully.")
