
# Non‑Invasive Breast Cancer Prediction — Step‑by‑Step (No `def`)

**Date:** 2025-08-08 19:20

Procedural, section‑by‑section analysis using the 10% sample.  
No custom functions or classes are defined; only standard library and package APIs are used.

**Sections**
1. Setup & Config  
2. Load Data  
3. First Look (head/tail/info/dtypes/describe)  
4. Missing Values (NaN) Audit  
5. Basic Distributions & Correlations  
6. Target Column Detection & Class Balance  
7. Train/Test Split  
8. Preprocessing (Impute/Encode/Scale)  
9. Optional Feature Engineering (procedural)  
10. SMOTE (Class Imbalance)  
11. Baseline Models (LogReg, RandomForest, XGBoost if available) with Cross‑Validation  
12. Hold‑out Evaluation (Confusion Matrix, ROC, PR)  
13. Hyperparameter Tuning (RandomizedSearchCV)  
14. Final Notes


## 1) Setup & Config

In [None]:

# Paths and parameters
DATA_PATH = "/mnt/data/sample_10percent.csv"  # change to full dataset later
RANDOM_STATE = 42
CV_FOLDS = 5
TEST_SIZE = 0.2
MAX_ROWS_FOR_PLOTS = 50000  # to keep plots responsive
N_JOBS = -1

# Libraries
import warnings, os, math, itertools
warnings.filterwarnings("ignore")

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from sklearn.model_selection import train_test_split, StratifiedKFold, cross_validate, RandomizedSearchCV
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.metrics import (accuracy_score, precision_score, recall_score, f1_score, roc_auc_score,
                             roc_curve, precision_recall_curve, average_precision_score, ConfusionMatrixDisplay)

from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier

# Optional: XGBoost
try:
    from xgboost import XGBClassifier
    XGB_AVAILABLE = True
except Exception as e:
    print("XGBoost not available; install with: pip install xgboost")
    XGB_AVAILABLE = False

# Imbalanced-learn for SMOTE
try:
    from imblearn.over_sampling import SMOTE
    from imblearn.pipeline import Pipeline as ImbPipeline
    IMB_OK = True
except Exception as e:
    print("imbalanced-learn not available; install with: pip install imbalanced-learn")
    IMB_OK = False


## 2) Load Data

In [None]:

df = pd.read_csv(DATA_PATH)
print("Loaded shape:", df.shape)
display(df.head(3))
display(df.tail(3))


## 3) First Look (head/tail/info/dtypes/describe)

In [None]:

_ = df.info()
display(pd.DataFrame(df.dtypes, columns=["dtype"]).T)
display(df.describe(include='number').T.head(20))
display(df.describe(include='object').T.head(20))


## 4) Missing Values (NaN) Audit

In [None]:

missing_counts = df.isna().sum().sort_values(ascending=False)
missing_pct = (missing_counts / len(df)) * 100
missing_table = pd.DataFrame({"missing_count": missing_counts, "missing_pct": missing_pct})
display(missing_table.head(30))

# Quick checks: any completely empty columns or rows?
empty_cols = missing_counts[missing_counts == len(df)].index.tolist()
print("Completely empty columns:", empty_cols)

any_row_all_nan = df.isna().all(axis=1).any()
print("Any row entirely NaN?:", any_row_all_nan)


## 5) Basic Distributions & Correlations

In [None]:

numeric_cols = df.select_dtypes(include=[np.number]).columns.tolist()
categorical_cols = df.select_dtypes(exclude=[np.number]).columns.tolist()
print("Numeric count:", len(numeric_cols), "Categorical count:", len(categorical_cols))

# Sample for plotting
plot_df = df
if len(df) > MAX_ROWS_FOR_PLOTS:
    plot_df = df.sample(n=MAX_ROWS_FOR_PLOTS, random_state=RANDOM_STATE)

# Histograms for up to 8 numeric columns
for col in numeric_cols[:8]:
    plt.figure()
    plot_df[col].hist(bins=40)
    plt.title(f"Distribution: {col}")
    plt.xlabel(col); plt.ylabel("count")
    plt.show()

# Bar charts for up to 6 categoricals
for col in categorical_cols[:6]:
    plt.figure()
    plot_df[col].astype(str).value_counts().head(15).plot(kind='bar')
    plt.title(f"Top categories: {col}")
    plt.xlabel(col); plt.ylabel("count")
    plt.xticks(rotation=45, ha='right')
    plt.tight_layout()
    plt.show()

# Correlation heatmap for numerics (limited to first 20 to keep readable)
subset_cols = numeric_cols[:20]
if len(subset_cols) >= 2:
    corr = plot_df[subset_cols].corr()
    plt.figure(figsize=(8,6))
    im = plt.imshow(corr, aspect='auto')
    plt.colorbar(im)
    plt.title("Correlation heatmap (subset of numeric features)")
    plt.xticks(range(len(subset_cols)), [c[:12] for c in subset_cols], rotation=90)
    plt.yticks(range(len(subset_cols)), [c[:12] for c in subset_cols])
    plt.tight_layout()
    plt.show()
else:
    print("Not enough numeric columns for heatmap.")


## 6) Target Column Detection & Class Balance

In [None]:

TARGET_COL = None

candidate_names = ["target","label","cancer","malignant","y","outcome"]
for c in df.columns:
    if c.lower() in candidate_names:
        TARGET_COL = c
        break

if TARGET_COL is None:
    last_col = df.columns[-1]
    uniq = pd.Series(df[last_col].dropna().unique())
    looks_binary = len(uniq) <= 5 and set(uniq.astype(str)) <= set(map(str,[0,1,"0","1","yes","no","True","False"]))
    TARGET_COL = last_col if looks_binary else None

print("Detected TARGET_COL:", TARGET_COL)

if TARGET_COL is None:
    raise ValueError("Please set a binary TARGET_COL manually in this cell and re-run.")

print("Target value counts:")
display(df[TARGET_COL].value_counts(dropna=False))

print("Target distribution (%):")
display((df[TARGET_COL].value_counts(normalize=True) * 100).round(3))


## 7) Train/Test Split

In [None]:

ID_COLUMNS = []  # add any id-like columns to drop

X = df.drop(columns=[TARGET_COL] + [c for c in ID_COLUMNS if c in df.columns], errors='ignore')
y = df[TARGET_COL]

# Basic NaN check before split
print("NaNs in X before split:", int(X.isna().sum().sum()))
print("NaNs in y before split:", int(y.isna().sum()))

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=TEST_SIZE, random_state=RANDOM_STATE, stratify=y
)

print("Train shapes:", X_train.shape, y_train.shape)
print("Test shapes:", X_test.shape, y_test.shape)

print("Class balance (train):")
display(y_train.value_counts(normalize=True))

print("Class balance (test):")
display(y_test.value_counts(normalize=True))


## 8) Preprocessing (Impute/Encode/Scale)

In [None]:

num_cols = X_train.select_dtypes(include=[np.number]).columns.tolist()
cat_cols = X_train.select_dtypes(exclude=[np.number]).columns.tolist()
print("Numerical:", len(num_cols), "Categorical:", len(cat_cols))

numeric_transformer = Pipeline(steps=[
    ("imputer", SimpleImputer(strategy="median")),
    ("scaler", StandardScaler())
])

categorical_transformer = Pipeline(steps=[
    ("imputer", SimpleImputer(strategy="most_frequent")),
    ("encoder", OneHotEncoder(handle_unknown="ignore", sparse_output=False))
])

preprocessor = ColumnTransformer(
    transformers=[
        ("num", numeric_transformer, num_cols),
        ("cat", categorical_transformer, cat_cols)
    ]
)

# Fit the preprocessor to training data and transform both sets to inspect NaNs
Xt_train = preprocessor.fit_transform(X_train)
Xt_test = preprocessor.transform(X_test)

print("Transformed shapes:", Xt_train.shape, Xt_test.shape)

# Convert to DataFrame with generated column names when possible
feature_names = []
for name, trans, cols in preprocessor.transformers_:
    if hasattr(trans, "named_steps"):
        # try to fetch names from last step if available
        steps = list(trans.named_steps.values())
        last = steps[-1]
        if hasattr(last, "get_feature_names_out"):
            try:
                arr = last.get_feature_names_out(cols)
                feature_names.extend(list(arr))
            except Exception:
                feature_names.extend(list(cols))
        else:
            feature_names.extend(list(cols))
    else:
        feature_names.extend(list(cols))

Xt_train_df = pd.DataFrame(Xt_train, columns=[str(c) for c in feature_names])
Xt_test_df = pd.DataFrame(Xt_test, columns=[str(c) for c in feature_names])

print("Any NaNs after preprocessing — train?:", Xt_train_df.isna().any().any())
print("Any NaNs after preprocessing — test?:", Xt_test_df.isna().any().any())

display(Xt_train_df.head(3))


## 9) Optional Feature Engineering (procedural; no `def`)

In [None]:

# Create a working copy for FE
X_train_fe = X_train.copy()
X_test_fe = X_test.copy()

# Age bins if a column containing 'age' exists
age_cols = [c for c in X_train_fe.columns if "age" in c.lower()]
for c in age_cols:
    try:
        bins = [0, 30, 40, 50, 60, 70, 200]
        labels = ["<30","30-39","40-49","50-59","60-69","70+"]
        X_train_fe[c + "_bin"] = pd.cut(X_train_fe[c], bins=bins, labels=labels, include_lowest=True)
        X_test_fe[c + "_bin"] = pd.cut(X_test_fe[c], bins=bins, labels=labels, include_lowest=True)
    except Exception as e:
        print("Age binning failed for", c, ":", e)

# BMI classes if a column containing 'bmi' exists
bmi_cols = [c for c in X_train_fe.columns if "bmi" in c.lower()]
for c in bmi_cols:
    try:
        bins = [0, 18.5, 25, 30, 100]
        labels = ["underweight","normal","overweight","obese"]
        X_train_fe[c + "_class"] = pd.cut(X_train_fe[c], bins=bins, labels=labels, include_lowest=True)
        X_test_fe[c + "_class"] = pd.cut(X_test_fe[c], bins=bins, labels=labels, include_lowest=True)
    except Exception as e:
        print("BMI binning failed for", c, ":", e)

# A few safe ratios across first few numeric pairs to avoid explosion
num_cols_fe = X_train_fe.select_dtypes(include=[np.number]).columns.tolist()
pairs = list(itertools.combinations(num_cols_fe[:10], 2))  # limit
for a, b in pairs[:10]:
    try:
        new_col = f"ratio_{a}_over_{b}"
        X_train_fe[new_col] = X_train_fe[a] / (X_train_fe[b].abs() + 1e-6)
        X_test_fe[new_col]  = X_test_fe[a]  / (X_test_fe[b].abs() + 1e-6)
    except Exception as e:
        pass

print("NaNs after FE (train):", int(X_train_fe.isna().sum().sum()))
print("NaNs after FE (test):", int(X_test_fe.isna().sum().sum()))

# Rebuild preprocessors on FE data
num_cols2 = X_train_fe.select_dtypes(include=[np.number]).columns.tolist()
cat_cols2 = X_train_fe.select_dtypes(exclude=[np.number]).columns.tolist()

preprocessor_fe = ColumnTransformer(
    transformers=[
        ("num", Pipeline([("imputer", SimpleImputer(strategy="median")), ("scaler", StandardScaler())]), num_cols2),
        ("cat", Pipeline([("imputer", SimpleImputer(strategy="most_frequent")), ("encoder", OneHotEncoder(handle_unknown="ignore", sparse_output=False))]), cat_cols2),
    ]
)

Xt_train_fe = preprocessor_fe.fit_transform(X_train_fe)
Xt_test_fe = preprocessor_fe.transform(X_test_fe)

print("Shapes after FE+preprocess:", Xt_train_fe.shape, Xt_test_fe.shape)


## 10) SMOTE (Class Imbalance)

In [None]:

if IMB_OK:
    # We'll integrate SMOTE inside model pipelines during CV to avoid leakage.
    # Here we only show class balance pre‑SMOTE as a check.
    print("Train class distribution before SMOTE:")
    display(y_train.value_counts(normalize=True))
else:
    print("SMOTE unavailable. Install imbalanced-learn to enable it.")


## 11) Baseline Models (Cross‑Validation)

In [None]:

scorer = {
    "accuracy": "accuracy",
    "precision": "precision",
    "recall": "recall",
    "f1": "f1",
    "roc_auc": "roc_auc",
    "average_precision": "average_precision"
}

cv = StratifiedKFold(n_splits=CV_FOLDS, shuffle=True, random_state=RANDOM_STATE)

# Logistic Regression pipeline (with SMOTE inside)
pipe_logreg = ImbPipeline(steps=[
    ("preprocess", preprocessor_fe),
    ("smote", SMOTE(random_state=RANDOM_STATE)),
    ("clf", LogisticRegression(max_iter=1000, n_jobs=N_JOBS))
])

scores_lr = cross_validate(pipe_logreg, X_train_fe, y_train, cv=cv, scoring=scorer, n_jobs=N_JOBS)
print("LogReg CV (mean):", {k: float(np.mean(v)) for k,v in scores_lr.items()})

# Random Forest pipeline
pipe_rf = ImbPipeline(steps=[
    ("preprocess", preprocessor_fe),
    ("smote", SMOTE(random_state=RANDOM_STATE)),
    ("clf", RandomForestClassifier(n_estimators=300, random_state=RANDOM_STATE, n_jobs=N_JOBS))
])
scores_rf = cross_validate(pipe_rf, X_train_fe, y_train, cv=cv, scoring=scorer, n_jobs=N_JOBS)
print("RF CV (mean):", {k: float(np.mean(v)) for k,v in scores_rf.items()})

# XGBoost pipeline (if available)
if XGB_AVAILABLE:
    pipe_xgb = ImbPipeline(steps=[
        ("preprocess", preprocessor_fe),
        ("smote", SMOTE(random_state=RANDOM_STATE)),
        ("clf", XGBClassifier(
            n_estimators=400, learning_rate=0.05, subsample=0.8, colsample_bytree=0.8,
            max_depth=6, random_state=RANDOM_STATE, n_jobs=N_JOBS, eval_metric="logloss"
        ))
    ])
    scores_xgb = cross_validate(pipe_xgb, X_train_fe, y_train, cv=cv, scoring=scorer, n_jobs=N_JOBS)
    print("XGB CV (mean):", {k: float(np.mean(v)) for k,v in scores_xgb.items()})
else:
    scores_xgb = None


## 12) Hold‑out Evaluation

In [None]:

results = {}

# Choose best model by CV ROC-AUC
roc_means = []
roc_means.append(("LogReg", float(np.mean(scores_lr['test_roc_auc']))))
roc_means.append(("RF", float(np.mean(scores_rf['test_roc_auc']))))
if scores_xgb is not None:
    roc_means.append(("XGB", float(np.mean(scores_xgb['test_roc_auc']))))

roc_means = sorted(roc_means, key=lambda x: x[1], reverse=True)
print("CV ROC-AUC ranking:", roc_means)

best_name = roc_means[0][0]
print("Selected model:", best_name)

if best_name == "LogReg":
    final_pipe = pipe_logreg
elif best_name == "RF":
    final_pipe = pipe_rf
else:
    final_pipe = pipe_xgb

final_pipe.fit(X_train_fe, y_train)

y_proba = final_pipe.predict_proba(X_test_fe)[:,1]
y_pred = (y_proba >= 0.5).astype(int)

print("Test Accuracy:", accuracy_score(y_test, y_pred))
print("Test Precision:", precision_score(y_test, y_pred, zero_division=0))
print("Test Recall:", recall_score(y_test, y_pred, zero_division=0))
print("Test F1:", f1_score(y_test, y_pred, zero_division=0))
print("Test ROC-AUC:", roc_auc_score(y_test, y_proba))
print("Test PR-AUC:", average_precision_score(y_test, y_proba))

plt.figure()
ConfusionMatrixDisplay.from_predictions(y_test, y_pred)
plt.title(f"Confusion Matrix — {best_name}")
plt.show()

fpr, tpr, _ = roc_curve(y_test, y_proba)
plt.figure()
plt.plot(fpr, tpr, label="ROC")
plt.plot([0,1],[0,1],'--')
plt.xlabel("False Positive Rate"); plt.ylabel("True Positive Rate"); plt.title(f"ROC Curve — {best_name}")
plt.legend(); plt.show()

prec, rec, _ = precision_recall_curve(y_test, y_proba)
plt.figure()
plt.plot(rec, prec, label="PR")
plt.xlabel("Recall"); plt.ylabel("Precision"); plt.title(f"Precision-Recall Curve — {best_name}")
plt.legend(); plt.show()


## 13) Hyperparameter Tuning (RandomizedSearchCV)

In [None]:

import scipy.stats as st

model_to_tune = best_name

if model_to_tune == "RF":
    param_dist = {
        "clf__n_estimators": st.randint(200, 800),
        "clf__max_depth": st.randint(3, 20),
        "clf__min_samples_split": st.randint(2, 20),
        "clf__min_samples_leaf": st.randint(1, 20),
        "clf__max_features": ["sqrt", "log2", None],
    }
    base = pipe_rf
elif model_to_tune == "LogReg":
    param_dist = {
        "clf__C": st.loguniform(1e-3, 1e2),
        "clf__penalty": ["l2"],
        "clf__solver": ["lbfgs", "liblinear"],
    }
    base = pipe_logreg
elif model_to_tune == "XGB" and XGB_AVAILABLE:
    param_dist = {
        "clf__n_estimators": st.randint(200, 800),
        "clf__max_depth": st.randint(3, 12),
        "clf__learning_rate": st.uniform(0.01, 0.2),
        "clf__subsample": st.uniform(0.6, 0.4),
        "clf__colsample_bytree": st.uniform(0.6, 0.4),
        "clf__gamma": st.uniform(0.0, 5.0),
        "clf__reg_alpha": st.uniform(0.0, 1.0),
        "clf__reg_lambda": st.uniform(0.5, 1.5),
    }
    base = pipe_xgb
else:
    param_dist = None

if param_dist is None:
    print("No hyperparameter space for the selected model; skipping.")
else:
    tuner = RandomizedSearchCV(
        estimator=base,
        param_distributions=param_dist,
        n_iter=25,
        scoring="roc_auc",
        cv=StratifiedKFold(n_splits=CV_FOLDS, shuffle=True, random_state=RANDOM_STATE),
        random_state=RANDOM_STATE,
        n_jobs=N_JOBS,
        verbose=1
    )
    tuner.fit(X_train_fe, y_train)
    print("Best params:", tuner.best_params_)
    print("Best CV ROC-AUC:", tuner.best_score_)

    tuned = tuner.best_estimator_
    y_proba_tuned = tuned.predict_proba(X_test_fe)[:,1]
    y_pred_tuned = (y_proba_tuned >= 0.5).astype(int)

    print("\n=== Test (Tuned) ===")
    print("Accuracy:", accuracy_score(y_test, y_pred_tuned))
    print("Precision:", precision_score(y_test, y_pred_tuned, zero_division=0))
    print("Recall:", recall_score(y_test, y_pred_tuned, zero_division=0))
    print("F1:", f1_score(y_test, y_pred_tuned, zero_division=0))
    print("ROC-AUC:", roc_auc_score(y_test, y_proba_tuned))
    print("PR-AUC:", average_precision_score(y_test, y_proba_tuned))

    plt.figure()
    ConfusionMatrixDisplay.from_predictions(y_test, y_pred_tuned)
    plt.title(f"Confusion Matrix — {model_to_tune} (Tuned)")
    plt.show()

    fpr2, tpr2, _ = roc_curve(y_test, y_proba_tuned)
    plt.figure()
    plt.plot(fpr2, tpr2, label="ROC (tuned)")
    plt.plot([0,1],[0,1],'--')
    plt.xlabel("False Positive Rate"); plt.ylabel("True Positive Rate"); plt.title(f"ROC Curve — {model_to_tune} (Tuned)")
    plt.legend(); plt.show()

    prec2, rec2, _ = precision_recall_curve(y_test, y_proba_tuned)
    plt.figure()
    plt.plot(rec2, prec2, label="PR (tuned)")
    plt.xlabel("Recall"); plt.ylabel("Precision"); plt.title(f"Precision-Recall Curve — {model_to_tune} (Tuned)")
    plt.legend(); plt.show()


## 14) Final Notes


- No custom `def`/UDFs or classes were used; everything is procedural with scikit‑learn/imbalanced‑learn components.
- NaN checks are included: before split, after preprocessing, after FE, and around modeling outputs.
- When scaling to ~1.5M rows, consider batching, increasing CV folds/iterations if runtime permits, and enabling model logging.
- If you know the exact target column, set it explicitly in **Section 6** for reliability.
