<a href="https://colab.research.google.com/github/appliedcode/mthree-c422/blob/mthree-c422-Likhitha/Threat_and_Security_Practice_2_md.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import os, json, math
from datetime import datetime
import numpy as np
import pandas as pd
from sklearn.datasets import fetch_openml
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import (
    accuracy_score, precision_score, recall_score, f1_score, roc_auc_score,
    confusion_matrix, classification_report
)
import shap
import matplotlib.pyplot as plt

In [2]:
np.random.seed(2025)
os.makedirs("artifacts", exist_ok=True)

In [3]:
print("Loading Pima Indians Diabetes dataset...")
pima = fetch_openml(name="diabetes", version=1, as_frame=True)

# Some OpenML versions place target in .target or inside frame. Handle both.
if getattr(pima, "target", None) is not None:
    y_raw = pima.target.copy()
    X_raw = pima.data.copy()
else:
    df_all = pima.frame.copy()
    # detect likely target column name
    cand = [c for c in df_all.columns if c.lower() in ("class", "target", "diabetes", "outcome", "y")]
    if not cand:
        # fallback: assume last column is target
        cand = [df_all.columns[-1]]
    target_col = cand[0]
    y_raw = df_all[target_col].copy()
    X_raw = df_all.drop(columns=[target_col]).copy()

# Map target to 0/1 robustly
if pd.api.types.is_numeric_dtype(y_raw):
    y = (y_raw != 0).astype(int)
else:
    y = y_raw.astype(str).map(lambda s: 1 if "positive" in s.lower() or "tested_positive" in s.lower() or "pos" in s.lower() or s.strip() in ("1","yes","True","true") else 0).astype(int)

print("Dataset shape:", X_raw.shape)
print("Class distribution:\n", y.value_counts())

# Basic cleaning: fill numeric NaNs with median
X = X_raw.copy()
num_cols = X.select_dtypes(include=[np.number]).columns.tolist()
for c in num_cols:
    if X[c].isnull().any():
        X[c] = X[c].fillna(X[c].median())

# Keep a copy for privacy variants
X_orig = X.copy()

Loading Pima Indians Diabetes dataset...
Dataset shape: (768, 8)
Class distribution:
 class
0    500
1    268
Name: count, dtype: int64


In [4]:
X_train, X_test, y_train, y_test = train_test_split(
    X_orig, y, test_size=0.25, random_state=42, stratify=y
)
print(f"Train size: {len(X_train)}, Test size: {len(X_test)}")
print("Train fraud/diabetes rate:", float(y_train.mean()))


Train size: 576, Test size: 192
Train fraud/diabetes rate: 0.3489583333333333


In [5]:
def train_evaluate(clf, X_tr, y_tr, X_te, y_te):
    clf.fit(X_tr, y_tr)
    y_pred = clf.predict(X_te)
    # handle predict_proba safe extraction
    y_proba = None
    if hasattr(clf, "predict_proba"):
        proba = clf.predict_proba(X_te)
        if proba.ndim == 2 and proba.shape[1] >= 2:
            y_proba = proba[:, 1]
        else:
            # only one column present (single-class training) -> fallback constant proba
            y_proba = np.zeros(len(X_te))
    # compute metrics, guard roc_auc if single-class in y_test
    metrics = {
        "accuracy": float(accuracy_score(y_te, y_pred)),
        "precision": float(precision_score(y_te, y_pred, zero_division=0)),
        "recall": float(recall_score(y_te, y_pred, zero_division=0)),
        "f1": float(f1_score(y_te, y_pred, zero_division=0)),
        "roc_auc": float(roc_auc_score(y_te, y_proba)) if (y_proba is not None and len(np.unique(y_te))>1) else None,
        "confusion_matrix": confusion_matrix(y_te, y_pred).tolist()
    }
    return metrics, y_pred, y_proba


In [6]:
def poison_labels(y_series, fraction=0.03, flip_from=1, flip_to=0, rng_seed=2025):
    y_copy = y_series.copy().astype(int)
    rng = np.random.default_rng(rng_seed)
    candidate_idx = y_copy[y_copy == flip_from].index.to_numpy()
    n_poison = max(1, int(math.floor(fraction * len(candidate_idx)))) if len(candidate_idx)>0 else 0
    poisoned_idx = rng.choice(candidate_idx, size=n_poison, replace=False) if n_poison>0 else np.array([], dtype=int)
    y_copy.loc[poisoned_idx] = flip_to
    return y_copy, poisoned_idx

poison_frac = 0.03   # flip 3% of positives -> negatives (hiding attacks)
y_train_poisoned, poisoned_idx = poison_labels(y_train, fraction=poison_frac)
print(f"Poisoned {len(poisoned_idx)} training positive labels (indices sample):", poisoned_idx[:5].tolist())

# quick check: ensure training still has both classes
print("Unique labels in y_train_poisoned:", np.unique(y_train_poisoned, return_counts=True))

Poisoned 6 training positive labels (indices sample): [666, 664, 218, 155, 603]
Unique labels in y_train_poisoned: (array([0, 1]), array([381, 195]))


In [7]:
clf_baseline = RandomForestClassifier(n_estimators=200, random_state=42, n_jobs=-1)
baseline_metrics, baseline_pred, baseline_proba = train_evaluate(clf_baseline, X_train, y_train_poisoned, X_test, y_test)
print("\nBaseline metrics (trained on poisoned data):")
print(json.dumps(baseline_metrics, indent=2))


Baseline metrics (trained on poisoned data):
{
  "accuracy": 0.75,
  "precision": 0.6792452830188679,
  "recall": 0.5373134328358209,
  "f1": 0.6,
  "roc_auc": 0.8104477611940298,
  "confusion_matrix": [
    [
      108,
      17
    ],
    [
      31,
      36
    ]
  ]
}


In [8]:
drop_cols = [c for c in ["Age", "BMI"] if c in X_train.columns]
# Variant A
X_train_privA = X_train.drop(columns=drop_cols, errors='ignore')
X_test_privA  = X_test.drop(columns=drop_cols, errors='ignore')
clf_privA = RandomForestClassifier(n_estimators=200, random_state=42, n_jobs=-1)
privA_metrics, privA_pred, privA_proba = train_evaluate(clf_privA, X_train_privA, y_train_poisoned, X_test_privA, y_test)
print("\nPrivacy Variant A (drop Age & BMI) metrics:")
print(json.dumps(privA_metrics, indent=2))

# Variant B: noisy Age & BMI in training (small relative noise)
X_train_privB = X_train.copy()
noise_frac = 0.05
for c in drop_cols:
    sigma = X_train_privB[c].std() * noise_frac if X_train_privB[c].std() > 0 else 0.0
    if sigma > 0:
        X_train_privB[c] = X_train_privB[c] + np.random.default_rng(2026).normal(0, sigma, size=len(X_train_privB))
clf_privB = RandomForestClassifier(n_estimators=200, random_state=42, n_jobs=-1)
privB_metrics, privB_pred, privB_proba = train_evaluate(clf_privB, X_train_privB, y_train_poisoned, X_test, y_test)
print("\nPrivacy Variant B (noisy Age & BMI in training) metrics:")
print(json.dumps(privB_metrics, indent=2))



Privacy Variant A (drop Age & BMI) metrics:
{
  "accuracy": 0.75,
  "precision": 0.6792452830188679,
  "recall": 0.5373134328358209,
  "f1": 0.6,
  "roc_auc": 0.8104477611940298,
  "confusion_matrix": [
    [
      108,
      17
    ],
    [
      31,
      36
    ]
  ]
}

Privacy Variant B (noisy Age & BMI in training) metrics:
{
  "accuracy": 0.75,
  "precision": 0.6792452830188679,
  "recall": 0.5373134328358209,
  "f1": 0.6,
  "roc_auc": 0.8104477611940298,
  "confusion_matrix": [
    [
      108,
      17
    ],
    [
      31,
      36
    ]
  ]
}


In [9]:
print("\nComputing SHAP explanations (sampled to speed up)...")
sample_size = min(500, len(X_test))
sample_idx = np.random.default_rng(2027).choice(X_test.index, size=sample_size, replace=False)
X_test_sample = X_test.loc[sample_idx]

explainer = shap.TreeExplainer(clf_baseline)
shap_values = explainer.shap_values(X_test_sample, check_additivity=False)  # may be list

if isinstance(shap_values, list):
    shap_for_positive = shap_values[1]
else:
    shap_for_positive = shap_values

# create and save bar summary
plt.figure(figsize=(8,4))
shap.summary_plot(shap_for_positive, X_test_sample, plot_type="bar", show=False)
plt.title("SHAP - Top features for positive (diabetes)")
plt.tight_layout()
shap_png = "artifacts/shap_pima_bar.png"
plt.savefig(shap_png, dpi=150)
plt.close()
print("Saved SHAP plot ->", shap_png)



Computing SHAP explanations (sampled to speed up)...


  shap.summary_plot(shap_for_positive, X_test_sample, plot_type="bar", show=False)
  summary_legacy(
  summary_legacy(


Saved SHAP plot -> artifacts/shap_pima_bar.png


<Figure size 800x400 with 0 Axes>

In [16]:
from sklearn.preprocessing import OneHotEncoder
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
import pandas as pd
import numpy as np
import shap

# Example dataset
data = pd.DataFrame({
    "Gender": ["Male", "Female", "Female", "Male", "Male"],
    "City": ["New York", "London", "London", "Paris", "New York"],
    "Age": [25, 32, 40, 28, 35],
    "Income": [0, 1, 1, 0, 1]  # Target
})

# Separate features and target
X = data.drop(columns=["Income"])
y = data["Income"]

# Train/test split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.4, random_state=42)

# One-hot encode categorical features
encoder = OneHotEncoder(handle_unknown="ignore", sparse_output=False)
X_train_encoded = encoder.fit_transform(X_train)
X_test_encoded = encoder.transform(X_test)

# Get encoded feature names
encoded_feature_names = encoder.get_feature_names_out(X_train.columns)

# Train a model (example: Random Forest)
model = RandomForestClassifier(random_state=42)
model.fit(X_train_encoded, y_train)

# Create SHAP explainer
explainer = shap.TreeExplainer(model)
shap_values = explainer.shap_values(X_test_encoded)

# Get SHAP values for positive class
shap_for_positive = shap_values[1] if isinstance(shap_values, list) else shap_values[..., 1]
mean_abs_shap = np.abs(shap_for_positive).mean(axis=0)

# Create DataFrame of SHAP values
shap_df = pd.DataFrame({
    "feature": encoded_feature_names,
    "mean_abs_shap": mean_abs_shap
}).sort_values(by="mean_abs_shap", ascending=False)

print(shap_df)


         feature  mean_abs_shap
1    Gender_Male       0.079167
0  Gender_Female       0.069167
2    City_London       0.060000
7         Age_40       0.030833
5         Age_25       0.026667
4     City_Paris       0.026667
3  City_New York       0.025000
6         Age_28       0.017500


In [18]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder
from sklearn.ensemble import RandomForestClassifier
import shap
import json

# -------------------------------
# 1. Example dataset
# -------------------------------
data = pd.DataFrame({
    "age": [25, 32, 47, 51, 62, 41, 29, 36, 55, 48],
    "education": ["Bachelors", "Masters", "PhD", "Bachelors", "PhD",
                  "Masters", "Bachelors", "Masters", "PhD", "Bachelors"],
    "hours_per_week": [40, 50, 60, 40, 45, 50, 40, 60, 55, 50],
    "income_gt_50k": [0, 1, 1, 0, 1, 1, 0, 1, 1, 0]
})

X = data.drop("income_gt_50k", axis=1)
y = data["income_gt_50k"]

# -------------------------------
# 2. Train-test split
# -------------------------------
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)

# -------------------------------
# 3. Encode categorical features
# -------------------------------
encoder = OneHotEncoder(handle_unknown="ignore", sparse_output=False)
X_train_encoded = encoder.fit_transform(X_train)
X_test_encoded = encoder.transform(X_test)
encoded_feature_names = encoder.get_feature_names_out(X_train.columns)

# -------------------------------
# 4. Train model
# -------------------------------
model = RandomForestClassifier(random_state=42)
model.fit(X_train_encoded, y_train)

# -------------------------------
# 5. SHAP values
# -------------------------------
explainer = shap.TreeExplainer(model)
shap_values = explainer.shap_values(X_test_encoded)

# Handle binary/multi-class
shap_for_positive = shap_values[1] if isinstance(shap_values, list) else shap_values[..., 1]
mean_abs_shap = np.abs(shap_for_positive).mean(axis=0)

shap_df = pd.DataFrame({
    "feature": encoded_feature_names,
    "mean_abs_shap": mean_abs_shap
})

top_shap = shap_df.sort_values("mean_abs_shap", ascending=False).head(10).to_dict(orient="records")

# -------------------------------
# 6. Governance audit report
# -------------------------------
audit = {
    "model_type": "RandomForestClassifier",
    "data_shape": {
        "train_samples": X_train.shape[0],
        "test_samples": X_test.shape[0],
        "features": len(encoded_feature_names)
    },
    "top_features_by_shap": top_shap,
    "shap_summary": shap_df.to_dict(orient="records")
}

# Print audit report
print(json.dumps(audit, indent=2))


{
  "model_type": "RandomForestClassifier",
  "data_shape": {
    "train_samples": 7,
    "test_samples": 3,
    "features": 14
  },
  "top_features_by_shap": [
    {
      "feature": "education_Bachelors",
      "mean_abs_shap": 0.25778902116402125
    },
    {
      "feature": "hours_per_week_40",
      "mean_abs_shap": 0.09071296296296298
    },
    {
      "feature": "education_PhD",
      "mean_abs_shap": 0.06007804232804234
    },
    {
      "feature": "hours_per_week_60",
      "mean_abs_shap": 0.03220171957671958
    },
    {
      "feature": "hours_per_week_50",
      "mean_abs_shap": 0.026285714285714284
    },
    {
      "feature": "education_Masters",
      "mean_abs_shap": 0.025363756613756613
    },
    {
      "feature": "hours_per_week_45",
      "mean_abs_shap": 0.01783465608465608
    },
    {
      "feature": "age_48",
      "mean_abs_shap": 0.013700396825396824
    },
    {
      "feature": "age_36",
      "mean_abs_shap": 0.013589947089947088
    },
    {
      "

In [11]:
print("\n=== Summary ===")
print(f"Baseline accuracy: {baseline_metrics['accuracy']:.4f}, recall: {baseline_metrics['recall']:.4f}")
print(f"Privacy(drop) accuracy: {privA_metrics['accuracy']:.4f}, recall: {privA_metrics['recall']:.4f}")
print(f"Privacy(noisy) accuracy: {privB_metrics['accuracy']:.4f}, recall: {privB_metrics['recall']:.4f}")
print("Artifacts saved in ./artifacts: pima_audit.json, pima_governance_report.md, shap_pima_bar.png")


=== Summary ===
Baseline accuracy: 0.7500, recall: 0.5373
Privacy(drop) accuracy: 0.7500, recall: 0.5373
Privacy(noisy) accuracy: 0.7500, recall: 0.5373
Artifacts saved in ./artifacts: pima_audit.json, pima_governance_report.md, shap_pima_bar.png
