In [2]:
# src/utils/data_loader.py
import pandas as pd
from pathlib import Path

# Define the data directory path
DATA_DIR = Path('../data')
PROCESSED_DIR = DATA_DIR

def load_processed_data(filename="heart_disease_processed.csv"):
    """
    Load the processed CSV for modeling.
    """
    file_path = PROCESSED_DIR / filename
    if not file_path.exists():
        raise FileNotFoundError(f"Processed file not found: {file_path}")
    return pd.read_csv(file_path)

In [3]:
# extended_model_pipeline_best_shap.py
import pandas as pd
import numpy as np
import shap
from imblearn.over_sampling import SMOTE
from imblearn.pipeline import Pipeline as ImbPipeline
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier, AdaBoostClassifier
from xgboost import XGBClassifier
from lightgbm import LGBMClassifier
from sklearn.metrics import accuracy_score, f1_score, recall_score, precision_score

# -------------------------
# 1. Preprocess & Split
# -------------------------
def preprocess_split(df, target_col="target"):
    df = df.copy()
    for col in df.columns:
        if df[col].isnull().any():
            df[col] = df[col].mode()[0] if df[col].dtype == "int" else df[col].median()
    X = df.drop(columns=[target_col])
    y = df[target_col].astype(int)
    return train_test_split(X, y, test_size=0.3, stratify=y, random_state=42)

# -------------------------
# 2. Build Pipeline
# -------------------------
def build_pipeline(model_name="LogisticRegression"):
    scaler = StandardScaler()
    
    if model_name == "LogisticRegression":
        model = LogisticRegression(max_iter=1000, class_weight="balanced", random_state=42)
    elif model_name == "RandomForest":
        model = RandomForestClassifier(n_estimators=100, class_weight="balanced", random_state=42)
    elif model_name == "XGBoost":
        model = XGBClassifier(use_label_encoder=False, eval_metric="logloss", random_state=42)
    elif model_name == "LightGBM":
        model = LGBMClassifier(random_state=42)
    elif model_name == "AdaBoost":
        model = AdaBoostClassifier(random_state=42)
    else:
        raise ValueError(f"Unsupported model: {model_name}")
    
    pipeline = ImbPipeline([
        ("balancer", SMOTE(random_state=42)),
        ("scaler", scaler),
        ("clf", model)
    ])
    return pipeline

# -------------------------
# 3. Train & Evaluate Models
# -------------------------
def train_evaluate(df, models=None, target_col="target"):
    if models is None:
        models = ["LogisticRegression","RandomForest","XGBoost","LightGBM","AdaBoost"]
    
    X_train, X_test, y_train, y_test = preprocess_split(df, target_col)
    results = []
    pipelines = {}
    
    # Train all models
    for m in models:
        pipe = build_pipeline(m)
        pipe.fit(X_train, y_train)
        pipelines[m] = pipe
        
        y_pred = pipe.predict(X_test)
        metrics = {
            "Model": m,
            "Accuracy": accuracy_score(y_test, y_pred),
            "F1": f1_score(y_test, y_pred),
            "Recall": recall_score(y_test, y_pred),
            "Precision": precision_score(y_test, y_pred)
        }
        results.append(metrics)
    
    results_df = pd.DataFrame(results)
    
    # Identify best model (highest F1)
    best_model_name = results_df.sort_values("F1", ascending=False).iloc[0]["Model"]
    print(f"\nBest Model based on F1: {best_model_name}")
    
    # SHAP for best model (if tree-based)
    if best_model_name in ["RandomForest", "XGBoost", "LightGBM"]:
        best_pipe = pipelines[best_model_name]
        model = best_pipe.named_steps["clf"]
        explainer = shap.TreeExplainer(model)
        shap_values = explainer.shap_values(X_test)
        shap_importance = pd.Series(np.abs(shap_values).mean(axis=0), index=X_train.columns)
        print(f"\nSHAP Feature Importance for {best_model_name}:\n", shap_importance.sort_values(ascending=False))
    
    return results_df, best_model_name

# -------------------------
# Example Usage
# -------------------------
if __name__ == "__main__":
    df = pd.read_csv("processed_heart_disease.csv")
    results_df, best_model_name = train_evaluate(df)
    print("\nFinal Model Comparison:")
    print(results_df)

FileNotFoundError: [Errno 2] No such file or directory: 'processed_heart_disease.csv'

In [12]:
# -------------------------
# Notebook Modeling Pipeline
# -------------------------
import pandas as pd
import numpy as np
import shap
import matplotlib.pyplot as plt
from pathlib import Path
from imblearn.over_sampling import SMOTE
from imblearn.pipeline import Pipeline as ImbPipeline
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier, AdaBoostClassifier
from xgboost import XGBClassifier
from lightgbm import LGBMClassifier
from sklearn.metrics import accuracy_score, f1_score, recall_score, precision_score

# -------------------------
# Configuration
# -------------------------

# -------------------------
# 1. Preprocess & split
# -------------------------
def preprocess_split(df, target_col="target"):
    df = df.copy()
    for col in df.columns:
        if df[col].isnull().any():
            df[col] = df[col].mode()[0] if df[col].dtype == "int" else df[col].median()
    X = df.drop(columns=[target_col])
    y = df[target_col].astype(int)
    return train_test_split(X, y, test_size=0.3, stratify=y, random_state=42)

# -------------------------
# 2. Build pipeline
# -------------------------
def build_pipeline(model_name="LogisticRegression"):
    scaler = StandardScaler()
    if model_name == "LogisticRegression":
        model = LogisticRegression(max_iter=1000, class_weight="balanced", random_state=42)
    elif model_name == "XGBoost":
        model = XGBClassifier(use_label_encoder=False, eval_metric="logloss", random_state=42)
    elif model_name == "LightGBM":
        model = LGBMClassifier(random_state=42, verbose=-1)  # Added verbose=-1 to suppress warnings
    elif model_name == "RandomForest":
        model = RandomForestClassifier(n_estimators=100, class_weight="balanced", random_state=42)
    elif model_name == "AdaBoost":
        model = AdaBoostClassifier(random_state=42)
    else:
        raise ValueError("Unsupported model")
    
    pipeline = ImbPipeline([
        ("balancer", SMOTE(random_state=42)),
        ("scaler", scaler),
        ("clf", model)
    ])
    return pipeline

# -------------------------
# 3. Train & evaluate
# -------------------------
def train_evaluate(df, models=["LogisticRegression","XGBoost","LightGBM","RandomForest","AdaBoost"]):
    X_train, X_test, y_train, y_test = preprocess_split(df)
    results = []
    best_model_name = None
    best_f1 = 0
    best_pipeline = None

    for m in models:
        print(f"Training {m}...")
        pipe = build_pipeline(m)
        pipe.fit(X_train, y_train)
        y_pred = pipe.predict(X_test)
        
        f1 = f1_score(y_test, y_pred)
        metrics = {
            "Model": m,
            "Accuracy": accuracy_score(y_test, y_pred),
            "F1": f1,
            "Recall": recall_score(y_test, y_pred),
            "Precision": precision_score(y_test, y_pred)
        }
        results.append(metrics)
        
        # track best model for SHAP
        if f1 > best_f1 and m in ["XGBoost","LightGBM","RandomForest","AdaBoost"]:
            best_f1 = f1
            best_model_name = m
            best_pipeline = pipe

    # SHAP only for best tree-based model
    if best_pipeline is not None:
        print(f"Generating SHAP analysis for {best_model_name}...")
        # Get the transformed training data for SHAP
        X_train_transformed = best_pipeline.named_steps["balancer"].fit_resample(X_train, y_train)[0]
        X_train_scaled = best_pipeline.named_steps["scaler"].transform(X_train_transformed)
        
        # Get the transformed test data
        X_test_scaled = best_pipeline.named_steps["scaler"].transform(X_test)
        
        model = best_pipeline.named_steps["clf"]
        explainer = shap.TreeExplainer(model)
        shap_values = explainer.shap_values(X_test_scaled)
        
        # Handle multi-class case (if shap_values is a list)
        if isinstance(shap_values, list):
            shap_values = shap_values[1]  # Use positive class for binary classification
        
        shap_importance = pd.Series(np.abs(shap_values).mean(axis=0), index=X_train.columns)
        
        plt.figure(figsize=(10, 8))
        shap_importance.sort_values().plot(kind="barh", color="skyblue")
        plt.title(f"SHAP Feature Importance ({best_model_name})")
        plt.xlabel("Mean |SHAP Value|")
        plt.tight_layout()
        plt.show()
    
    return pd.DataFrame(results), best_model_name

# -------------------------
# USAGE
# -------------------------
# Create directory if it doesn't exist
PROCESSED_DIR.mkdir(parents=True, exist_ok=True)

# Check if file exists
csv_path = PROCESSED_DIR / "processed_heart_disease.csv"
if not csv_path.exists():
    raise FileNotFoundError(
        f"The file {csv_path} does not exist. Please ensure the processed dataset is in the correct location."
    )

# Load the CSV file
df = pd.read_csv(str(csv_path))

# Run the modeling pipeline
results_df, best_model = train_evaluate(df)
print("\nFinal Model Comparison:")
print(results_df.round(4))
print(f"\nBest Model for SHAP: {best_model}")

FileNotFoundError: The file ..\data\processed_heart_disease.csv does not exist. Please ensure the processed dataset is in the correct location.