
# 02 · De‑identification Scorecard (k‑anonymity, l‑diversity)

This notebook takes the identifier scan from **Notebook 01** and demonstrates a **before → after** de‑identification workflow:
- Apply simple generalizations (dates → year, ZIP → ZIP3, bucket rare categories)
- Recompute **k‑anonymity** and **l‑diversity**
- Produce a **scorecard** and **visuals** to show improvement
- Save `data/deidentification_scorecard.json` for downstream compliance & ROI


In [None]:

import os, json, pandas as pd, numpy as np, matplotlib.pyplot as plt
from pathlib import Path
import sys

# Import project modules
repo_root = Path.cwd()
if (repo_root / "notebooks").exists():
    scripts_dir = repo_root / "scripts"
    visuals_dir = repo_root / "visuals"
else:
    repo_root = Path.cwd().parent
    scripts_dir = repo_root / "scripts"
    visuals_dir = repo_root / "visuals"
for p in (scripts_dir, visuals_dir):
    if str(p) not in sys.path:
        sys.path.append(str(p))

from scripts.privacy_checks import (
    load_dataset, build_privacy_report, detect_direct_identifiers,
    detect_quasi_identifiers, k_anonymity, l_diversity, suggest_generalization
)
from visuals.privacy_plots import plot_k_equivalence_hist, save_fig

DATA_DIR = repo_root / "data"
ASSETS = repo_root / "reports" / "assets"
DATA_DIR.mkdir(exist_ok=True, parents=True)
ASSETS.mkdir(exist_ok=True, parents=True)

print("Environment ready.")
print("repo_root =", repo_root)



## Load dataset

We reuse the **sample** dataset by default. You can point `DATA_FILE` to your own CSV/Parquet.


In [None]:

DATA_FILE = DATA_DIR / "sample_synthetic.csv"  # change to your own file if desired

if DATA_FILE.exists():
    df_raw = load_dataset(DATA_FILE)
    print(f"[info] Loaded {DATA_FILE} shape={df_raw.shape}")
else:
    print("[warn] sample_synthetic.csv missing — creating a tiny synthetic frame in-memory.")
    df_raw = pd.DataFrame({
        "patient_id": [1,2,3,4,5,6,7,8],
        "birth_date": ["1980-01-01","1980-02-10","1978-05-03","1990-07-21","1985-10-12","1972-03-30","1972-03-30","1972-03-30"],
        "zip_code": ["94110","94110","02139","02139","10027","10027","10027","10027"],
        "sex": ["F","F","M","M","F","F","M","M"],
        "condition": ["Diabetes","Hypertension","Asthma","Cancer","COPD","Diabetes","Asthma","COPD"],
        "visit_date": ["2022-03-10","2022-03-10","2022-03-11","2022-03-11","2022-03-12","2022-03-13","2022-03-13","2022-03-14"],
        "lab_result": [7.1,"130/85","Mild","Stage II","FEV1=65%","7.4","Moderate","FEV1=60%"]
    })
    print(f"[info] Created synthetic df_raw shape={df_raw.shape}")

display(df_raw.head())



## Baseline (BEFORE) metrics
We detect identifiers and compute baseline **k** and **l**.


In [None]:

SENSITIVE_COL = "condition" if "condition" in df_raw.columns else None

direct_before = detect_direct_identifiers(df_raw)
quasi_before  = detect_quasi_identifiers(df_raw)

k_before = k_anonymity(df_raw, quasi_before)
l_before = l_diversity(df_raw, quasi_before, SENSITIVE_COL, method="distinct") if SENSITIVE_COL else np.nan

report_before = build_privacy_report(df_raw, sensitive_col=SENSITIVE_COL, quasi_override=sorted(quasi_before))
print("Direct (before):", sorted(direct_before) if direct_before else "None")
print("Quasi  (before):", sorted(quasi_before) if quasi_before else "None")
print(f"k (before)={k_before}, l (before)={l_before}")
report_before



## Apply simple de‑identification transforms

- **Dates → year** (or year-month)  
- **ZIP → ZIP3**  
- **Bucket rare categories** in high-cardinality columns  
- **Drop obvious direct identifiers** if present


In [None]:

df_after = df_raw.copy()

# 1) Generalize date-like columns
date_cols = [c for c in df_after.columns if ("date" in c.lower()) or ("birth" in c.lower()) or ("dob" in c.lower())]
for c in date_cols:
    try:
        s = pd.to_datetime(df_after[c], errors="coerce")
        # Use year granularity by default
        df_after[c] = s.dt.year.astype("Int64")
    except Exception:
        pass

# 2) ZIP -> ZIP3
zip_like = [c for c in df_after.columns if "zip" in c.lower() or "postal" in c.lower()]
for c in zip_like:
    df_after[c] = df_after[c].astype(str).str[:3]

# 3) Bucket rare categories for object columns with many unique values
for c in df_after.columns:
    if df_after[c].dtype == "object":
        vc = df_after[c].value_counts(dropna=True)
        if len(vc) > 10:
            top = set(vc.head(10).index.tolist())
            df_after[c] = df_after[c].where(df_after[c].isin(top), other="Other")

# 4) Drop direct identifiers if any were detected
to_drop = [c for c in direct_before if c in df_after.columns]
if to_drop:
    df_after = df_after.drop(columns=to_drop)

print("Applied generalizations. Preview:")
display(df_after.head())



## Recompute (AFTER) metrics


In [None]:

direct_after = detect_direct_identifiers(df_after)
quasi_after  = detect_quasi_identifiers(df_after)

k_after = k_anonymity(df_after, quasi_after)
l_after = l_diversity(df_after, quasi_after, SENSITIVE_COL, method="distinct") if SENSITIVE_COL else np.nan

report_after = build_privacy_report(df_after, sensitive_col=SENSITIVE_COL, quasi_override=sorted(quasi_after))

summary = pd.DataFrame([
    {"stage":"Before","k":k_before, "l":l_before, "direct":len(direct_before), "quasi":len(quasi_before)},
    {"stage":"After" ,"k":k_after , "l":l_after , "direct":len(direct_after) , "quasi":len(quasi_after)}
])
summary



## Visuals

k‑anonymity **equivalence class size** distributions before and after.


In [None]:

fig1 = plot_k_equivalence_hist(df_raw, quasi_before, title="k distribution — BEFORE")
p1 = save_fig(fig1, ASSETS / "k_hist_before.png")
plt.show(); print("[ok]", p1)

fig2 = plot_k_equivalence_hist(df_after, quasi_after, title="k distribution — AFTER")
p2 = save_fig(fig2, ASSETS / "k_hist_after.png")
plt.show(); print("[ok]", p2)



## Save scorecard JSON

Writes `data/deidentification_scorecard.json` with before/after metrics and the suggested generalizations.


In [None]:

scorecard = {
    "sensitive_col": SENSITIVE_COL,
    "before": report_before,
    "after": report_after,
    "delta": {
        "k": float(k_after) - float(k_before),
        "l": float(l_after) - float(l_before) if not pd.isna(l_before) and not pd.isna(l_after) else None,
        "direct_removed": max(0, len(direct_before) - len(direct_after))
    }
}

out_json = DATA_DIR / "deidentification_scorecard.json"
out_json.write_text(json.dumps(scorecard, indent=2))
print(f"[ok] wrote → {out_json}")



## How to use this

- If **k < 5** or **l < 2**, increase generalization (e.g., dates to year-month, widen geography, bucket categories).  
- Re-run this notebook until metrics meet your risk posture.  
- Use the JSON output in:
  - `03_jurisdictional_compliance_matrix.ipynb` (HIPAA/GDPR pass/fail + DUA gates)  
  - `04_privacy_risk_to_roi.ipynb` (translate improvements to business value)  
  - `scripts/report_builder.py` (embed visuals + tables into a PDF)  
