
# 🚀 Data Preparation (SageMaker + Redshift)

**Purpose:** Create a *stable*, **reproducible**, and **config-driven** data preparation process that is safe to promote toward production deployment.

> This notebook is designed to run on SageMaker or locally. It loads data from **Redshift** or **S3 Parquet** via the provided `load_data` function in `data_io.py`, applies deterministic preprocessing & feature engineering, validates schema, and writes versioned artifacts for downstream training.



## 📦 What You Get
- Config-first **data loader** (Redshift or S3 Parquet) using `load_data()` from `data_io.py`  
- **Cleaning & feature engineering** examples
- **Leakage-aware** train/val/test splits → `splits.json`  
- Export **processed dataset** → Parquet (optionally partitioned)  



## 🧰 Prerequisites
- Python 3.9+
- Packages: `pandas`, `numpy`, `pyarrow`, `scikit-learn`, `mlflow` (optional), `sqlalchemy`, `redshift_connector`, `s3fs`
- A `data_io.py` next to this notebook containing the provided `load_data(...)` implementation.


In [None]:

# If running on a fresh environment, uncomment as needed (SageMaker kernels usually have most of these)
# %pip install pandas numpy pyarrow scikit-learn mlflow sqlalchemy redshift_connector s3fs



## ♻️ Reproducibility & Environment Capture
- **Fixed seeds** for deterministic results.
- Capture **package versions** for traceability.
- All artifacts are written under a **run folder** with a unique timestamp/hash.


In [None]:

import os, sys, json, time, hashlib, platform, random
from datetime import datetime
import numpy as np
import pandas as pd

# Fixed seeds for full determinism
SEED = 42
random.seed(SEED)
np.random.seed(SEED)

RUN_TS = datetime.utcnow().strftime("%Y%m%dT%H%M%SZ")
RUN_ID = hashlib.sha1(f"{RUN_TS}-{SEED}".encode()).hexdigest()[:10]

ARTIFACT_DIR = os.environ.get("ARTIFACT_DIR", f"artifacts/run_{RUN_TS}_{RUN_ID}")
os.makedirs(ARTIFACT_DIR, exist_ok=True)

env_info = {
    "python": sys.version,
    "platform": platform.platform(),
    "timestamp_utc": RUN_TS,
    "seed": SEED,
    "packages": {
        "pandas": pd.__version__,
        "numpy": np.__version__,
    },
}
with open(os.path.join(ARTIFACT_DIR, "env_info.json"), "w") as f:
    json.dump(env_info, f, indent=2)

env_info



## ⚙️ Configuration
Single source of truth for inputs, outputs, and behavior. Switch **source** between `"redshift"` and `"parquet"` using this cell only.


In [None]:

from pathlib import Path

CONFIG = {
    "data": {
        # choose 'redshift' or 'parquet'
        "source": os.environ.get("SOURCE", "redshift"),
        # parquet URI if using S3 parquet (supports wildcards)
        "parquet_uri": os.environ.get("PARQUET_URI", "s3://your-bucket/path/to/data/*.parquet"),
        # SQL for Redshift (must be deterministic if you sample!)
        "sql": os.environ.get("SQL", "SELECT * FROM your_schema.your_table ORDER BY id"),
        # Redshift connection kwargs (fill via env vars or IAM role inside SageMaker)
        "redshift_kwargs": {
            "host": os.environ.get("REDSHIFT_HOST", "example.your-cluster.redshift.amazonaws.com"),
            "database": os.environ.get("REDSHIFT_DB", "dev"),
            "user": os.environ.get("REDSHIFT_USER", "username"),
            "password": os.environ.get("REDSHIFT_PASSWORD", "password"),
            "port": int(os.environ.get("REDSHIFT_PORT", "5439")),
        },
        # Optional: limit rows deterministically for dev runs (None = no limit)
        "row_limit": int(os.environ.get("ROW_LIMIT", "50000")),
    },
    "columns": {
        # Define a target column if already known (can be None and set later)
        "target": os.environ.get("TARGET", "churned"),

        # Optional: primary key for splits and deterministic joins
        "primary_key": os.environ.get("PRIMARY_KEY", "customer_id"),
    },
    "processing": {
        "stratify_splits": True,       # set to False for regression
        "test_size": 0.2,
        "val_size": 0.1,
        "dropna_threshold_ratio": 0.95,  # drop columns with >5% missing if needed
        "cap_outliers_iqr": True,
        "normalize_categoricals": True,
    },
    "output": {
        "artifact_dir": ARTIFACT_DIR,
        "processed_parquet_path": str(Path(ARTIFACT_DIR) / "processed" / "dataset.parquet"),
        "feature_schema_path": str(Path(ARTIFACT_DIR) / "feature_schema.json"),
        "splits_path": str(Path(ARTIFACT_DIR) / "splits.json"),
    },
    "mlflow": {
        "enabled": False,
        "tracking_uri": os.environ.get("MLFLOW_TRACKING_URI", ""),
        "experiment_name": os.environ.get("MLFLOW_EXPERIMENT", "data-prep"),
    }
}

CONFIG



## 📥 Load Data (Redshift or S3 Parquet)
Uses `load_data()` defined in `data_io.py`:

```python
df = load_data(
    source=CONFIG["data"]["source"],
    uri=CONFIG["data"]["parquet_uri"],
    sql=CONFIG["data"]["sql"],
    redshift_kwargs=CONFIG["data"]["redshift_kwargs"]
)
```
If `data_io.py` is not found, we fall back to a **synthetic demo dataset** so the rest of the pipeline remains testable.


In [None]:

# Try to import the provided load_data function from data_io.py
load_data = None
try:
    from data_io import load_data  # expects file next to this notebook
except Exception as e:
    print("⚠️ Could not import `load_data` from data_io.py. Using synthetic demo data. Error:", repr(e))

def _demo_dataset(n=5000, seed=SEED):
    rng = np.random.default_rng(seed)
    df = pd.DataFrame({
        "customer_id": np.arange(1, n+1),
        "age": rng.integers(18, 90, size=n),
        "tenure_months": rng.integers(0, 120, size=n),
        "monthly_charges": rng.normal(45, 15, size=n).round(2),
        "contract_type": rng.choice(["month-to-month", "one-year", "two-year"], size=n, p=[0.6, 0.25, 0.15]),
        "payment_method": rng.choice(["card", "bank", "paypal"], size=n),
        "country": rng.choice(["PT","ES","FR","DE"], size=n, p=[0.5,0.2,0.2,0.1]),
        "churned": rng.choice([0,1], size=n, p=[0.78, 0.22]).astype(int),
    })
    # inject a few anomalies
    df.loc[rng.choice(df.index, 20, replace=False), "monthly_charges"] = -1.0  # invalid negative price
    df.loc[rng.choice(df.index, 30, replace=False), "age"] = None             # missing age
    return df

if load_data:
    source = CONFIG["data"]["source"]
    uri = CONFIG["data"]["parquet_uri"]
    sql = CONFIG["data"]["sql"]
    rs_kwargs = CONFIG["data"]["redshift_kwargs"]
    print(f"Loading data via data_io.load_data(source={source!r}) ...")
    df_raw = load_data(source=source, uri=uri, sql=sql, redshift_kwargs=rs_kwargs)
else:
    print("Using synthetic dataset for demonstration.")
    df_raw = _demo_dataset(n=CONFIG["data"]["row_limit"] or 5000)

# Optional row limit for dev runs (deterministic by ORDER + head or sample with fixed seed)
row_limit = CONFIG["data"]["row_limit"]
if row_limit and len(df_raw) > row_limit:
    # deterministic subset if primary key exists and ordering is stable
    pk = CONFIG["columns"]["primary_key"]
    if pk in df_raw.columns:
        df_raw = df_raw.sort_values(pk).head(row_limit).reset_index(drop=True)
    else:
        df_raw = df_raw.sample(n=row_limit, random_state=SEED).reset_index(drop=True)

df_raw.head(), df_raw.shape



## 🔎 Quick Profile
Lightweight overview to understand data types, nulls, and basic distributions.


In [None]:

summary = {
    "shape": df_raw.shape,
    "dtypes": df_raw.dtypes.astype(str).to_dict(),
    "null_counts": df_raw.isna().sum().to_dict(),
    "sample_rows": 5,
}
pd.DataFrame({
    "column": df_raw.columns,
    "dtype": df_raw.dtypes.astype(str).values,
    "nulls": [df_raw[c].isna().sum() for c in df_raw.columns],
    "non_nulls": [df_raw[c].notna().sum() for c in df_raw.columns],
}).head(20)



## 🧾 Feature Schema (Draft)
Define **types, nullability, and basic constraints**. This schema will be exported to JSON and should be reviewed by ML + DevOps.


In [None]:

from typing import Any, Dict

target_col = CONFIG["columns"]["target"]
primary_key = CONFIG["columns"]["primary_key"]

def infer_basic_schema(df: pd.DataFrame) -> Dict[str, Any]:
    schema = {}
    for c in df.columns:
        col_dtype = str(df[c].dtype)
        col = {
            "dtype": col_dtype,
            "nullable": bool(df[c].isna().any()),
        }
        if pd.api.types.is_numeric_dtype(df[c]):
            # robust min/max ignoring NaNs and infs
            finite_vals = pd.to_numeric(df[c], errors="coerce").replace([np.inf, -np.inf], np.nan).dropna()
            if len(finite_vals):
                col["min"] = float(finite_vals.min())
                col["max"] = float(finite_vals.max())
        else:
            # capture a sample of categories for non-numeric cols
            col["example_values"] = df[c].dropna().astype(str).unique()[:20].tolist()
        schema[c] = col
    schema_meta = {
        "_meta": {
            "target": target_col if target_col in df.columns else None,
            "primary_key": primary_key if primary_key in df.columns else None,
            "generated_at": RUN_TS,
            "seed": SEED,
        },
        "columns": schema
    }
    return schema_meta

feature_schema = infer_basic_schema(df_raw)

# write to artifacts
schema_path = CONFIG["output"]["feature_schema_path"]
os.makedirs(Path(schema_path).parent, exist_ok=True)
with open(schema_path, "w") as f:
    json.dump(feature_schema, f, indent=2)

schema_path



## 🧼 Cleaning
- Handle missing values
- Normalize categoricals
- Fix invalid values (e.g., negative prices)
- Optional: Outlier capping via IQR
> **Stable rule sets** are critical—avoid ad-hoc fixes. All transformations must be deterministic and versioned.


In [None]:

df = df_raw.copy()

# Example: type coercions (id as string to preserve leading zeros, etc.)
if primary_key in df.columns:
    df[primary_key] = df[primary_key].astype(str)

# Example: fix invalid negatives in 'monthly_charges' if present
if "monthly_charges" in df.columns:
    df.loc[df["monthly_charges"] < 0, "monthly_charges"] = np.nan

# Missing value strategy (simple example; in practice consider smarter imputers)
num_cols = [c for c in df.columns if pd.api.types.is_numeric_dtype(df[c]) and c != target_col]
cat_cols = [c for c in df.columns if (not pd.api.types.is_numeric_dtype(df[c])) and c not in [target_col]]

for c in num_cols:
    median_val = df[c].median()
    df[c] = df[c].fillna(median_val)

for c in cat_cols:
    df[c] = df[c].fillna("__MISSING__")

# Normalize categoricals (lowercase/trim) for consistency
if bool(CONFIG["processing"]["normalize_categoricals"]):
    for c in cat_cols:
        df[c] = df[c].astype(str).str.strip().str.lower()

# Optional IQR capping for outliers on numeric cols
if bool(CONFIG["processing"]["cap_outliers_iqr"]):
    for c in num_cols:
        q1, q3 = np.percentile(df[c], [25, 75])
        iqr = q3 - q1
        lower, upper = q1 - 1.5*iqr, q3 + 1.5*iqr
        df[c] = np.clip(df[c], lower, upper)

df.head()



## 🧩 Feature Engineering (Examples)
- Ratios and interactions
- Encoding categoricals for model training (defer heavy encoders to training pipeline)
- Date/time features (if present)


In [None]:

# Example engineered features
if "tenure_months" in df.columns and "monthly_charges" in df.columns:
    df["est_lifetime_value"] = (df["tenure_months"] * df["monthly_charges"]).round(2)

# Simple frequency encoding for categoricals (kept numeric but reproducible)
for c in cat_cols:
    freq = df[c].value_counts(normalize=True)
    df[f"{c}__freq"] = df[c].map(freq).astype(float)

df.head()



## ✅ Lightweight Validation
Simple checks before export. For production, consider a formal framework (e.g., Great Expectations or pandera).


In [None]:

checks = []

# Example: target present and binary (if classification)
if (target_col in df.columns) and CONFIG["processing"]["stratify_splits"]:
    is_binary = set(pd.Series(df[target_col]).dropna().unique()) <= {0,1}
    checks.append({"check": "target_binary", "passed": bool(is_binary)})

# Example: no nulls in primary key
if primary_key in df.columns:
    pk_nulls = int(df[primary_key].isna().sum())
    checks.append({"check": "no_null_primary_key", "passed": pk_nulls == 0, "null_count": pk_nulls})

pd.DataFrame(checks)



## ✂️ Train / Validation / Test Split
- Deterministic with fixed `random_state`
- Stratified if classification (`stratify=True`)
- Optionally **time-based** if a timestamp column is provided (edit here if needed)


In [None]:

from sklearn.model_selection import train_test_split

random_state = SEED
stratify = None
if CONFIG["processing"]["stratify_splits"] and target_col in df.columns:
    stratify = df[target_col]

# First split: train+val vs test
train_val_df, test_df = train_test_split(
    df,
    test_size=CONFIG["processing"]["test_size"],
    random_state=random_state,
    stratify=stratify
)

# Second split: train vs val
stratify_train_val = train_val_df[target_col] if stratify is not None else None
train_df, val_df = train_test_split(
    train_val_df,
    test_size=CONFIG["processing"]["val_size"] / (1.0 - CONFIG["processing"]["test_size"]),
    random_state=random_state,
    stratify=stratify_train_val
)

len(train_df), len(val_df), len(test_df)



## 💾 Write Artifacts
- **Processed dataset** (Parquet)
- **Feature schema** (`feature_schema.json`)
- **Splits** (`splits.json`) with IDs for deterministic reuse


In [None]:

from pathlib import Path
out_path = Path(CONFIG["output"]["processed_parquet_path"])
out_path.parent.mkdir(parents=True, exist_ok=True)

# Save full processed dataset (you can also save splits separately if preferred)
df.to_parquet(out_path, index=False)

# Save split indices by primary key (preferred) or DataFrame indices
splits = {}
pk = primary_key if primary_key in df.columns else None
def ids_of(subdf):
    if pk:
        return subdf[pk].tolist()
    else:
        return subdf.index.tolist()

splits = {
    "meta": {
        "seed": SEED,
        "created_at": RUN_TS,
        "primary_key": pk,
        "target": target_col if target_col in df.columns else None,
        "source": CONFIG["data"]["source"],
    },
    "train_ids": ids_of(train_df),
    "val_ids": ids_of(val_df),
    "test_ids": ids_of(test_df),
}

with open(CONFIG["output"]["splits_path"], "w") as f:
    json.dump(splits, f, indent=2)

{
    "processed_parquet": str(out_path),
    "feature_schema": CONFIG["output"]["feature_schema_path"],
    "splits": CONFIG["output"]["splits_path"],
}



## 📈 (Optional) MLflow Trace
Record data prep parameters and artifacts for lineage. Enable by setting `CONFIG["mlflow"]["enabled"] = True`.


In [None]:

if CONFIG["mlflow"]["enabled"]:
    import mlflow
    mlflow.set_tracking_uri(CONFIG["mlflow"]["tracking_uri"] or "file://" + str(Path(ARTIFACT_DIR).absolute()))
    mlflow.set_experiment(CONFIG["mlflow"]["experiment_name"])

    with mlflow.start_run(run_name=f"data-prep-{RUN_TS}") as run:
        mlflow.log_params({
            "seed": SEED,
            "source": CONFIG["data"]["source"],
            "row_limit": CONFIG["data"]["row_limit"],
            "stratify": CONFIG["processing"]["stratify_splits"],
        })
        mlflow.log_artifact(CONFIG["output"]["feature_schema_path"])
        mlflow.log_artifact(CONFIG["output"]["splits_path"])
        # logging the whole parquet can be large; consider sampling or schema-only
        # mlflow.log_artifact(CONFIG["output"]["processed_parquet_path"])

        print("MLflow run:", run.info.run_id)
