# Data Quality Pipeline


This notebook performs:
- Data loading
- Profiling (types, missingness, uniques, numeric summaries)
- Plausibility rules (dates, age, BMI, generic bounds)
- Outlier governance (IQR-based flags)
- Imputation using your *variable action plan* ('Impute', 'Drop', 'Flag', etc.)
- Exports: cleaned data, reports, and logs.

> Note: Columns with more than 40% missing were removed upstream, as specified.

In [1]:

import pandas as pd
import numpy as np
from pathlib import Path
from pandas.api.types import is_numeric_dtype, is_categorical_dtype
from datetime import datetime

# --- Paths (adjust if needed) ---
DATA_PATH = Path(r"C:\Users\HP\OneDrive\Desktop\VERO_code\Phase_1\data\processed\cleaned_data.xlsx")
PLAN_PATH = Path(r"C:\Users\HP\OneDrive\Desktop\VERO_code\Phase_1\outputs\recommendations\variable_action_plan.xlsx")  # uploaded here
OUT_DIR   = Path(r"C:\Users\HP\OneDrive\Desktop\VERO_code\Phase_1\data\processed\outputs")
OUT_DIR.mkdir(parents=True, exist_ok=True)

# --- Load data & action plan ---
plan_all = pd.read_excel(PLAN_PATH, sheet_name=None)
if "action_plan" in plan_all:
    action_plan_df = plan_all["action_plan"].copy()
else:
    # fallback to the first sheet if 'action_plan' is not present
    first_sheet = list(plan_all.keys())[0]
    action_plan_df = plan_all[first_sheet].copy()

df = pd.read_excel(DATA_PATH)

print(f"Loaded data shape: {df.shape}")
print(f"Action plan rows: {len(action_plan_df)}")

# --- Normalization helper ---
def norm(s):
    return str(s).strip().lower().replace("\u00a0"," ").replace("\u200b","")

# Normalize plan headers
action_plan_df.columns = [norm(c) for c in action_plan_df.columns]
if "variable" not in action_plan_df.columns or "recommended_action" not in action_plan_df.columns:
    raise KeyError("Action plan must contain 'variable' and 'recommended_action' columns (case-insensitive).")

# Build lookup from normalized variable -> original recommended action
action_plan_df["variable_norm"] = action_plan_df["variable"].map(norm)
plan_lookup = (
    action_plan_df
      .dropna(subset=["variable_norm","recommended_action"])
      .drop_duplicates(subset=["variable_norm"], keep="first")
      .set_index("variable_norm")["recommended_action"]
      .to_dict()
)

# Columns missing from plan
missing_in_plan = [c for c in df.columns if norm(c) not in plan_lookup]
print(f"Columns missing in plan: {len(missing_in_plan)}")


FileNotFoundError: [Errno 2] No such file or directory: 'C:\\Users\\HP\\OneDrive\\Desktop\\VERO_code\\Phase_1\\outputs\\recommendations\\variable_action_plan.xlsx'

## Profiling

In [None]:

# Basic profiling summary per column
profile_rows = []
for c in df.columns:
    s = df[c]
    entry = {
        "column": c,
        "dtype": str(s.dtype),
        "n_missing": int(s.isna().sum()),
        "pct_missing": float(s.isna().mean()) * 100,
        "n_unique": int(s.nunique(dropna=True))
    }
    if is_numeric_dtype(s):
        entry.update({
            "min": s.min(skipna=True),
            "q1": s.quantile(0.25),
            "median": s.median(),
            "q3": s.quantile(0.75),
            "max": s.max(skipna=True),
            "mean": s.mean(skipna=True),
            "std": s.std(skipna=True),
        })
    profile_rows.append(entry)

profile_df = pd.DataFrame(profile_rows).sort_values("column")
profile_path = OUT_DIR / "profiling_summary.xlsx"
with pd.ExcelWriter(profile_path) as xw:
    profile_df.to_excel(xw, sheet_name="profile", index=False)
print(f"Saved profiling summary -> {profile_path}")


In [None]:
import pandas as pd
from pandas.api.types import is_numeric_dtype

numeric_columns = [col for col in df.columns if is_numeric_dtype(df[col])]

print("numeric_columns = [")
for col in numeric_columns:
    print(f"    '{col}',")
print("]")


## Plausibility Rules

In [None]:
import pandas as pd
import numpy as np
from pandas.api.types import is_numeric_dtype
from pathlib import Path



# numeric plausibility bounds (case-insensitive keys)
plaus_rules = {
    "active_principles_n": (0, 50),
    "adr_chemo_correlation": (0, 3),
    "adr_ctcae_grade": (1, 5),
    "adr_left": (0, 1),
    "adr_n_grado1": (0, 50),
    "adr_n_grado2": (0, 50),
    "adr_n_grado3": (0, 50),
    "adr_n_grado4": (0, 50),
    "adr_n_grado5": (0, 50),
    "adr_n_tot": (0, 100),
    "age": (0, 120),
    "bmi_value": (10, 80),
    "altre_pat_n": (0, 50),
    "altro": (0, 1),
    "chemio_fine_tossicita": (0, 1),
    "chemo_cycles_n": (0, 50),
    "comorbidity_category_list": (0, 50),
    "comorbilita_cat": (0, 20),
    "farmaci_cat_n": (0, 50),
    "gastroesophageal_reflux_full": (0, 1),
    "hypertensive_heart_disease": (0, 1),
    "intervento_chirurgico_altro": (0, 1),
    "linea_trattamento_oncologico": (0, 10),
    "observation_days": (0, 3650),
    "oncology_treatment_lines_n": (0, 10),
    "ordinary_hospitalizations_n": (0, 50),
    "pregresso_numero_linee_trattamento": (0, 10),
    "radiotherapy_status": (0, 2),
    "ricovero_n": (0, 50),
    "smoking_status_binary": (0, 1),
    "smoking_years": (0, 80),
    "tipo_left": (0, 1),
    "transfusions_total_n": (0, 100),
    "treatment_line_n": (0, 10),
}

# --- Build summary ---
rule_lookup = {k.lower(): v for k, v in plaus_rules.items()}

rows = []
for col in df.columns:
    col_key = str(col).strip().lower()
    if col_key not in rule_lookup:
        continue
    s = df[col]
    if not is_numeric_dtype(s):
        continue

    lo, hi = rule_lookup[col_key]
    n_total = len(s)
    n_nonmissing = int(s.notna().sum())
    n_missing = n_total - n_nonmissing

    if n_nonmissing > 0:
        within = (s >= lo) & (s <= hi)
        within = within[s.notna()]  # drop NaNs
        n_within = int(within.sum())
        n_outside = int(n_nonmissing - n_within)
        pct_within_nonmissing = (n_within / n_nonmissing) * 100
        pct_outside_nonmissing = (n_outside / n_nonmissing) * 100
        pct_within_allrows = (n_within / n_total) * 100
        pct_outside_allrows = (n_outside / n_total) * 100
        observed_min = float(s.min(skipna=True))
        observed_max = float(s.max(skipna=True))
    else:
        n_within = 0
        n_outside = 0
        pct_within_nonmissing = np.nan
        pct_outside_nonmissing = np.nan
        pct_within_allrows = 0.0
        pct_outside_allrows = 0.0
        observed_min = np.nan
        observed_max = np.nan

    pct_missing_allrows = (n_missing / n_total) * 100

    rows.append({
        "column": col,
        "min_allowed": float(lo),
        "max_allowed": float(hi),
        "n_total_rows": n_total,
        "n_nonmissing": n_nonmissing,
        "n_missing": n_missing,
        "n_within": n_within,
        "n_outside": n_outside,
        "pct_within_nonmissing": round(pct_within_nonmissing, 2) if pd.notna(pct_within_nonmissing) else np.nan,
        "pct_outside_nonmissing": round(pct_outside_nonmissing, 2) if pd.notna(pct_outside_nonmissing) else np.nan,
        "pct_within_allrows": round(pct_within_allrows, 2),
        "pct_outside_allrows": round(pct_outside_allrows, 2),
        "pct_missing_allrows": round(pct_missing_allrows, 2),
        "observed_min": observed_min if pd.isna(observed_min) else round(observed_min, 2),
        "observed_max": observed_max if pd.isna(observed_max) else round(observed_max, 2),
    })

summary = pd.DataFrame(rows)

# Optional: sort like your example (most outside first, then by column)
summary = summary.sort_values(
    by=["pct_outside_nonmissing", "pct_missing_allrows", "column"],
    ascending=[False, False, True]
).reset_index(drop=True)

# Reorder columns exactly as your sample
summary = summary[[
    "column", "min_allowed", "max_allowed",
    "n_total_rows", "n_nonmissing", "n_missing",
    "n_within", "n_outside",
    "pct_within_nonmissing", "pct_outside_nonmissing",
    "pct_within_allrows", "pct_outside_allrows", "pct_missing_allrows",
    "observed_min", "observed_max"
]]

# --- Save to Excel ---
out_path = Path(r"C:\Users\HP\OneDrive\Desktop\VERO_code\Phase_1\data\processed\outputs\plausibility_flags_summary.xlsx")
with pd.ExcelWriter(out_path) as xw:
    summary.to_excel(xw, sheet_name="flag_summary", index=False)

print(f"✅ Saved: {out_path}")


print(summary.head(20).to_string(index=False))


## Outlier Governance (IQR Method)

In [None]:
import pandas as pd
import numpy as np
from pandas.api.types import is_numeric_dtype
from pathlib import Path

# Assumptions: df already exists, and OUT_DIR exists (same as in your plausibility script)
# If not, define:
# OUT_DIR = Path(r"C:\Users\HP\OneDrive\Desktop\VERO_code\Phase_1\outputs")
# OUT_DIR.mkdir(parents=True, exist_ok=True)

def iqr_outlier_flags_and_summary(
    df_in: pd.DataFrame,
    multiplier: float = 1.5,
    min_nonmissing: int = 8,
    skip_binary: bool = True
):
    """
    Returns:
      flags_df: DataFrame of 0/1 flags per column with suffix '_iqr_outlier_flag'
      summary_df: counts and percentages per column
      thresholds_df: per-column Q1/Q3/IQR and computed low/high thresholds
    Notes:
      - Only numeric columns considered
      - Requires at least `min_nonmissing` non-missing values to compute IQR
      - Optionally skips binary-like columns (<=2 unique non-null values)
    """
    flags = {}
    summaries = []
    thresholds = []

    for col in df_in.columns:
        s = df_in[col]
        if not is_numeric_dtype(s):
            continue

        # skip binary/dummy columns if requested
        nunique_nonnull = s.dropna().nunique()
        if skip_binary and nunique_nonnull <= 2:
            continue

        n_total = len(s)
        n_nonmissing = int(s.notna().sum())
        n_missing = n_total - n_nonmissing

        if n_nonmissing < min_nonmissing:
            # Not enough data to compute stable IQR thresholds
            continue

        q1 = s.quantile(0.25)
        q3 = s.quantile(0.75)
        iqr = q3 - q1

        if pd.isna(iqr) or iqr == 0:
            # No spread -> do not flag as outliers
            continue

        low = q1 - multiplier * iqr
        high = q3 + multiplier * iqr

        # Flag outliers only among non-missing
        outlier_mask = (s < low) | (s > high)
        flags[f"{col}_iqr_outlier_flag"] = outlier_mask.fillna(False).astype(int)

        n_outliers = int(outlier_mask.sum(skipna=True))
        n_inliers = int(n_nonmissing - n_outliers)

        pct_outliers_nonmissing = (n_outliers / n_nonmissing) * 100 if n_nonmissing > 0 else np.nan
        pct_outliers_allrows = (n_outliers / n_total) * 100 if n_total > 0 else np.nan
        pct_missing_allrows = (n_missing / n_total) * 100 if n_total > 0 else np.nan

        observed_min = s.min(skipna=True)
        observed_max = s.max(skipna=True)

        summaries.append({
            "column": col,
            "iqr_multiplier": multiplier,
            "n_total_rows": n_total,
            "n_nonmissing": n_nonmissing,
            "n_missing": n_missing,
            "n_inliers": n_inliers,
            "n_outliers": n_outliers,
            "pct_outliers_nonmissing": round(pct_outliers_nonmissing, 2) if pd.notna(pct_outliers_nonmissing) else np.nan,
            "pct_outliers_allrows": round(pct_outliers_allrows, 2) if pd.notna(pct_outliers_allrows) else np.nan,
            "pct_missing_allrows": round(pct_missing_allrows, 2) if pd.notna(pct_missing_allrows) else np.nan,
            "observed_min": round(float(observed_min), 4) if pd.notna(observed_min) else np.nan,
            "observed_max": round(float(observed_max), 4) if pd.notna(observed_max) else np.nan,
        })

        thresholds.append({
            "column": col,
            "q1": round(float(q1), 4) if pd.notna(q1) else np.nan,
            "q3": round(float(q3), 4) if pd.notna(q3) else np.nan,
            "iqr": round(float(iqr), 4) if pd.notna(iqr) else np.nan,
            "low_threshold": round(float(low), 4) if pd.notna(low) else np.nan,
            "high_threshold": round(float(high), 4) if pd.notna(high) else np.nan,
        })

    flags_df = pd.DataFrame(flags) if flags else pd.DataFrame(index=df_in.index)
    summary_df = pd.DataFrame(summaries) if summaries else pd.DataFrame(columns=[
        "column","iqr_multiplier","n_total_rows","n_nonmissing","n_missing",
        "n_inliers","n_outliers","pct_outliers_nonmissing","pct_outliers_allrows",
        "pct_missing_allrows","observed_min","observed_max"
    ])
    thresholds_df = pd.DataFrame(thresholds) if thresholds else pd.DataFrame(columns=[
        "column","q1","q3","iqr","low_threshold","high_threshold"
    ])

    # Sort summary by highest outlier rate
    if not summary_df.empty:
        summary_df = summary_df.sort_values(
            by=["pct_outliers_nonmissing","pct_missing_allrows","column"],
            ascending=[False, False, True]
        ).reset_index(drop=True)

    return flags_df, summary_df, thresholds_df


# ---- Run and Save ----
outlier_flags, outlier_summary, outlier_thresholds = iqr_outlier_flags_and_summary(
    df_plaus if 'df_plaus' in globals() else df,  # use df_plaus if you’re chaining after plausibility
    multiplier=1.5,
    min_nonmissing=8,
    skip_binary=True
)

out_path = OUT_DIR / "outlier_flags_summary.xlsx"
with pd.ExcelWriter(out_path) as xw:
    # Flags matrix (0/1)
    (outlier_flags if not outlier_flags.empty else pd.DataFrame()).to_excel(
        xw, sheet_name="flags", index=False
    )
    # Column-level counts and percentages
    (outlier_summary if not outlier_summary.empty else pd.DataFrame()).to_excel(
        xw, sheet_name="outlier_counts", index=False
    )
    # Thresholds used per column
    (outlier_thresholds if not outlier_thresholds.empty else pd.DataFrame()).to_excel(
        xw, sheet_name="thresholds", index=False
    )

print(f"✅ Outlier flags saved -> {out_path}")


In [18]:
import pandas as pd
from pandas.api.types import is_numeric_dtype

# 1. Replace extreme value in bmi_value
# Compute median excluding NaN
bmi_median = df['bmi_value'].median(skipna=True)

# Identify the current maximum
bmi_max = df['bmi_value'].max(skipna=True)

# Replace ONLY the max value with the median
df.loc[df['bmi_value'] == bmi_max, 'bmi_value'] = bmi_median

print(f"Replaced bmi_value max ({bmi_max}) with median ({bmi_median}).")

# 2. Median-impute all remaining missing numeric variables
for col in df.columns:
    if is_numeric_dtype(df[col]):
        median_val = df[col].median(skipna=True)
        df[col] = df[col].fillna(median_val)

print("✅ All numeric missing values have been imputed with column medians.")


Replaced bmi_value max (150.695) with median (24.08).
✅ All numeric missing values have been imputed with column medians.


## Imputation (Action Plan Driven)

In [20]:
import pandas as pd
from pathlib import Path
from pandas.api.types import is_numeric_dtype, is_categorical_dtype

# ---------- Config ----------
# Use df_plaus if defined; else use df
BASE_DF = globals().get("df_plaus", globals().get("df", None))
if BASE_DF is None:
    raise RuntimeError("No dataframe found. Define either `df_plaus` or `df` before running.")

# Ensure OUT_DIR exists
OUT_DIR = globals().get("OUT_DIR", Path(r"C:\Users\HP\OneDrive\Desktop\VERO_code\Phase_1\outputs"))
OUT_DIR = Path(OUT_DIR)
OUT_DIR.mkdir(parents=True, exist_ok=True)

# Optionally: if you have an action plan DataFrame, convert it to a lookup dict here.
# Expected columns: 'variable' and 'recommended_action'
# If you already have plan_lookup, you can skip this block.
if "action_plan_df" in globals():
    plan_lookup = {
        str(v).strip().lower(): str(a).strip()
        for v, a in zip(action_plan_df["variable"], action_plan_df["recommended_action"])
        if pd.notna(v) and pd.notna(a)
    }
elif "plan_lookup" in globals():
    # keep user-provided plan_lookup
    plan_lookup = globals()["plan_lookup"]
else:
    raise RuntimeError("No action plan found. Provide `action_plan_df` with columns "
                       "['variable','recommended_action'] or a `plan_lookup` dict.")

def norm(x) -> str:
    return str(x).strip().lower()

def impute_with_plan(df_input: pd.DataFrame, plan_lookup: dict):
    """
    Actions supported (case-insensitive):
      - 'impute' : mode for categoricals, median for numerics
      - 'drop' or 'remove' : drop the column
      - 'flag' or 'indicator' : create <col>_missing_flag (1 if missing)
    """
    df_out = df_input.copy()
    logs = []

    # Snapshot of columns to avoid mutation-during-iteration issues
    for raw_col in list(df_out.columns):
        col_norm = norm(raw_col)
        miss_n = int(df_out[raw_col].isna().sum())
        if miss_n == 0:
            continue

        action = plan_lookup.get(col_norm)
        if not action:
            logs.append(f"[SKIP] {raw_col}: no action configured in plan")
            continue

        action_norm = norm(action)

        if action_norm == "impute":
            s = df_out[raw_col]
            if s.dtype == "object" or is_categorical_dtype(s):
                mode_val = s.mode(dropna=True)
                if not mode_val.empty:
                    df_out[raw_col] = s.fillna(mode_val.iloc[0])
                    logs.append(f"[IMPUTE-MODE] {raw_col} -> {mode_val.iloc[0]} (filled {miss_n})")
                else:
                    logs.append(f"[WARN] {raw_col}: mode empty, no fill performed")
            elif is_numeric_dtype(s):
                med = s.median(skipna=True)
                df_out[raw_col] = s.fillna(med)
                logs.append(f"[IMPUTE-MEDIAN] {raw_col} -> {med} (filled {miss_n})")
            else:
                logs.append(f"[SKIP] {raw_col}: unsupported dtype for impute")

        elif action_norm in {"drop", "remove"}:
            df_out = df_out.drop(columns=[raw_col])
            logs.append(f"[DROP] {raw_col}")

        elif action_norm in {"flag", "indicator"}:
            flag_col = f"{raw_col}_missing_flag"
            df_out[flag_col] = df_out[raw_col].isna().astype(int)
            logs.append(f"[FLAG] {raw_col} -> {flag_col} (created indicator)")

        else:
            logs.append(f"[SKIP] {raw_col}: unrecognized action '{action}'")

    return df_out, logs

# ---------- Run ----------
before_missing = BASE_DF.isna().sum()
df_imputed, impute_logs = impute_with_plan(BASE_DF, plan_lookup)
after_missing = df_imputed.isna().sum()

miss_summary = (
    pd.DataFrame({"missing_before": before_missing, "missing_after": after_missing})
      .assign(delta=lambda d: d["missing_after"] - d["missing_before"])
      .sort_values("delta")
)

# ---------- Save ----------
clean_path = OUT_DIR / "cleaned_imputed.xlsx"
log_path = OUT_DIR / "imputation_log.txt"
miss_path = OUT_DIR / "missingness_before_after.xlsx"

df_imputed.to_excel(clean_path, index=False)
miss_summary.to_excel(miss_path)

with open(log_path, "w", encoding="utf-8") as f:
    for line in impute_logs:
        f.write(line + "\n")

print(f"Saved cleaned data -> {clean_path}")
print(f"Saved imputation log -> {log_path}")
print(f"Saved missingness summary -> {miss_path}")


  df_imputed.to_excel(clean_path, index=False)


Saved cleaned data -> C:\Users\HP\OneDrive\Desktop\VERO_code\Phase_1\data\processed\outputs\cleaned_imputed.xlsx
Saved imputation log -> C:\Users\HP\OneDrive\Desktop\VERO_code\Phase_1\data\processed\outputs\imputation_log.txt
Saved missingness summary -> C:\Users\HP\OneDrive\Desktop\VERO_code\Phase_1\data\processed\outputs\missingness_before_after.xlsx


  miss_summary.to_excel(miss_path)


## Combined Report Export

In [22]:
report_path = OUT_DIR / "data_quality_report.xlsx"
with pd.ExcelWriter(report_path) as xw:

    # Always include profiling
    profile_df.to_excel(xw, sheet_name="profile", index=False)

    # ---- Plausibility Flags + Counts ----
    if 'plaus_flags' in globals() and isinstance(plaus_flags, pd.DataFrame) and not plaus_flags.empty:
        plaus_flags.to_excel(xw, sheet_name="plausibility_flags", index=False)

        # Ensure plaus_summary is DataFrame
        if isinstance(plaus_summary, pd.Series):
            plaus_summary.to_frame("n_flags").to_excel(xw, sheet_name="plausibility_counts")
        else:
            # already DataFrame
            plaus_summary.to_excel(xw, sheet_name="plausibility_counts", index=False)

    # ---- Outlier Flags + Counts ----
    if 'outlier_flags' in globals() and isinstance(outlier_flags, pd.DataFrame) and not outlier_flags.empty:
        outlier_flags.to_excel(xw, sheet_name="outlier_flags", index=False)

        if isinstance(outlier_summary, pd.Series):
            outlier_summary.to_frame("n_outliers").to_excel(xw, sheet_name="outlier_counts")
        else:
            outlier_summary.to_excel(xw, sheet_name="outlier_counts", index=False)

    # ---- Missingness Summary ----
    miss_summary.to_excel(xw, sheet_name="missingness_before_after", index=True)

print(f"✅ Saved combined report -> {report_path}")


  with pd.ExcelWriter(report_path) as xw:


✅ Saved combined report -> C:\Users\HP\OneDrive\Desktop\VERO_code\Phase_1\data\processed\outputs\data_quality_report.xlsx
