<a href="https://colab.research.google.com/github/MonuSingh16/code-blogs-articles/blob/main/04-mlops-data-pipeline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install pandas==2.2.2 numpy==2.0.2 matplotlib==3.10.0 scikit-learn==1.6.1



### Config

In [None]:
from pathlib import Path
import logging, random
import numpy as np

# Colab working dirs
ROOT = Path("/content")
DATA_DIR = ROOT / "data"
OUT_DIR = ROOT / "output"
FIG_DIR = OUT_DIR / "figs"
for p in [DATA_DIR, OUT_DIR, FIG_DIR]:
    p.mkdir(parents=True, exist_ok=True)

# Reproducibility
RNG_SEED = 42
np.random.seed(RNG_SEED)
random.seed(RNG_SEED)

logging.basicConfig(level=logging.INFO, format="%(asctime)s | %(levelname)s | %(message)s")

### Helper Functions

In [None]:
import json
import pandas as pd

def _ts(s: str) -> pd.Timestamp:
    return pd.to_datetime(s, utc=False, errors="coerce")

def save_json(obj, path: Path):
    path.write_text(json.dumps(obj, indent=2, default=str), encoding="utf-8")

### Data Generation

In [None]:
import pandas as pd
import numpy as np
import sqlite3
from datetime import datetime, timedelta

def bootstrap_demo_data():
    sales_csv = DATA_DIR / "sales.csv"
    events_json = DATA_DIR / "events.json"
    customers_db = DATA_DIR / "customers.db"

    if sales_csv.exists() and events_json.exists() and customers_db.exists():
        logging.info("Demo data already present. Skipping bootstrap.")
        return

    logging.info("Bootstrapping demo data...")
    # Customers
    n_customers = 400
    countries = ["IN", "US", "UK", "DE", "SG"]
    cities_by_country = {
        "IN": ["Delhi", "Bengaluru", "Mumbai", "Hyderabad"],
        "US": ["NYC", "SF", "Austin"],
        "UK": ["London", "Manchester"],
        "DE": ["Berlin", "Munich"],
        "SG": ["Singapore"]
    }

    start_date = datetime(2023, 9, 1)
    end_date = datetime(2024, 9, 1)
    days = (end_date - start_date).days

    cust_rows = []
    for cid in range(1001, 1001 + n_customers):
        country = random.choice(countries)
        city = random.choice(cities_by_country[country])
        signup = start_date + timedelta(days=int(np.random.uniform(0, max(1, days*0.6))))
        cust_rows.append({
            "customer_id": cid,
            "country": country,
            "city": city,
            "signup_date": signup.date().isoformat()
        })

    # SQLite customers table
    conn = sqlite3.connect(customers_db)
    cur = conn.cursor()
    cur.execute("""
        CREATE TABLE IF NOT EXISTS customers (
            customer_id INTEGER PRIMARY KEY,
            country TEXT,
            city TEXT,
            signup_date TEXT
        )
    """)
    cur.executemany(
        "INSERT OR REPLACE INTO customers (customer_id, country, city, signup_date) VALUES (?, ?, ?, ?)",
        [(r["customer_id"], r["country"], r["city"], r["signup_date"]) for r in cust_rows]
    )
    conn.commit()
    conn.close()

    # Sales CSV with realistic noise/outliers
    sale_rows = []
    sale_id = 1
    for r in cust_rows:
        base_lambda = 10
        signup_dt = _ts(r["signup_date"])
        active_days = max(1, (end_date - max(start_date, signup_dt.to_pydatetime())).days)
        expected = max(0.5, base_lambda * (active_days / days))
        n = np.random.poisson(lam=expected)

        for _ in range(n):
            ts = start_date + timedelta(days=int(np.random.uniform(0, days)))
            if ts.date() < signup_dt.date():
                continue
            amt = np.random.lognormal(mean=5.0, sigma=0.6)
            if np.random.rand() < 0.02:
                amt = np.nan
            if np.random.rand() < 0.01:
                amt = -abs(amt)

            sale_rows.append({
                "sale_id": sale_id,
                "customer_id": r["customer_id"],
                "amount": amt,
                "ts": ts.isoformat(timespec="seconds")
            })
            sale_id += 1

    pd.DataFrame(sale_rows).sort_values("ts").to_csv(sales_csv, index=False)

    # Events JSON
    event_types = ["login", "view", "cart_add", "wishlist"]
    ev_rows = []
    ev_id = 1
    for r in cust_rows:
        m = np.random.poisson(lam=5.0)
        for _ in range(m):
            ts = start_date + timedelta(days=int(np.random.uniform(0, days)))
            if ts.date() < _ts(r["signup_date"]).date():
                continue
            ev_rows.append({
                "event_id": ev_id,
                "customer_id": r["customer_id"],
                "event_type": random.choice(event_types),
                "ts": ts.isoformat(timespec="seconds")
            })
            ev_id += 1

    save_json(ev_rows, events_json)
    logging.info("Bootstrap complete")

bootstrap_demo_data()

### Extraction Phase - I

In [None]:
def extract_sales_csv(path: Path) -> pd.DataFrame:
    df = pd.read_csv(path)
    logging.info("Extracted sales CSV: %s rows, %s cols", *df.shape)
    return df

def extract_events_json(path: Path) -> pd.DataFrame:
    data = json.loads(path.read_text(encoding="utf-8"))
    df = pd.json_normalize(data)
    logging.info("Extracted events JSON: %s rows, %s cols", *df.shape)
    return df

def extract_customers_sqlite(path: Path) -> pd.DataFrame:
    conn = sqlite3.connect(path)
    df = pd.read_sql("SELECT * FROM customers", conn)
    conn.close()
    logging.info("Extracted customers SQLite: %s rows, %s cols", *df.shape)
    return df

sales = extract_sales_csv(DATA_DIR / "sales.csv")
events = extract_events_json(DATA_DIR / "events.json")
customers = extract_customers_sqlite(DATA_DIR / "customers.db")

sales.head(3), events.head(3), customers.head(3)

### Validation

In [None]:
def validate_sales(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()
    for c in ["sale_id", "customer_id", "amount", "ts"]:
        if c not in df.columns:
            raise ValueError(f"sales missing required column: {c}")
    df["sale_id"] = pd.to_numeric(df["sale_id"], errors="coerce").astype("Int64")
    df["customer_id"] = pd.to_numeric(df["customer_id"], errors="coerce").astype("Int64")
    df["amount"] = pd.to_numeric(df["amount"], errors="coerce")
    df["ts"] = pd.to_datetime(df["ts"], errors="coerce")

    before = len(df)
    df = df.dropna(subset=["sale_id", "customer_id", "ts"]).reset_index(drop=True)

    # Negative / zero -> NaN, cap extreme outliers
    df.loc[df["amount"] <= 0, "amount"] = np.nan
    cap = df["amount"].quantile(0.95) * 5 if df["amount"].notna().sum() > 0 else np.nan
    df.loc[df["amount"] > cap, "amount"] = cap

    # Fill amounts: customer median -> global median
    df["amount"] = df.groupby("customer_id")["amount"].transform(lambda s: s.fillna(s.median()))
    df["amount"] = df["amount"].fillna(df["amount"].median())

    # Deduplicate by latest ts per sale_id
    df = df.sort_values(["sale_id", "ts"]).drop_duplicates(subset=["sale_id"], keep="last")
    logging.info("Validated sales: dropped %d bad/dup rows", before - len(df))
    return df

def validate_events(df: pd.DataFrame) -> pd.DataFrame:
    req = {"event_id", "customer_id", "event_type", "ts"}
    if not req.issubset(df.columns):
        raise ValueError(f"events missing required columns: {req - set(df.columns)}")
    df = df.copy()
    df["event_id"] = pd.to_numeric(df["event_id"], errors="coerce").astype("Int64")
    df["customer_id"] = pd.to_numeric(df["customer_id"], errors="coerce").astype("Int64")
    df["event_type"] = df["event_type"].astype("string")
    df["ts"] = pd.to_datetime(df["ts"], errors="coerce")

    before = len(df)
    df = df.dropna(subset=["event_id", "customer_id", "event_type", "ts"])
    df = df.sort_values(["event_id", "ts"]).drop_duplicates(subset=["event_id"], keep="last")
    logging.info("Validated events: dropped %d bad/dup rows", before - len(df))
    return df

def validate_customers(df: pd.DataFrame) -> pd.DataFrame:
    req = {"customer_id", "country", "city", "signup_date"}
    if not req.issubset(df.columns):
        raise ValueError(f"customers missing required columns: {req - set(df.columns)}")
    df = df.copy()
    df["customer_id"] = pd.to_numeric(df["customer_id"], errors="coerce").astype("Int64")
    df["country"] = df["country"].astype("string")
    df["city"] = df["city"].astype("string")
    df["signup_date"] = pd.to_datetime(df["signup_date"], errors="coerce").dt.date

    before = len(df)
    df = df.dropna(subset=["customer_id", "country", "city", "signup_date"])
    df = df.drop_duplicates(subset=["customer_id"], keep="last")
    logging.info("Validated customers: dropped %d bad/dup rows", before - len(df))
    return df

sales_v = validate_sales(sales)
events_v = validate_events(events)
customers_v = validate_customers(customers)

sales_v.head(3), events_v.head(3), customers_v.head(3)

### Transform Phase - I

In [None]:
def transform_make_features(sales: pd.DataFrame,
                            customers: pd.DataFrame,
                            cutoff: pd.Timestamp,
                            obs_days: int,
                            label_days: int):
    sales = sales.copy()
    customers = customers.copy()

    obs_start = cutoff - timedelta(days=obs_days)
    label_end = cutoff + timedelta(days=label_days)

    sales["ts"] = pd.to_datetime(sales["ts"])
    obs_mask = (sales["ts"] >= obs_start) & (sales["ts"] < cutoff)
    label_mask = (sales["ts"] >= cutoff) & (sales["ts"] < label_end)

    sales_obs = sales.loc[obs_mask].copy()
    sales_label = sales.loc[label_mask].copy()

    # Daily time series for viz
    daily_obs = (sales_obs.assign(day=sales_obs["ts"].dt.date)
                 .groupby("day", as_index=False)["amount"].sum()
                 .rename(columns={"amount": "daily_sales"}))

    # RFM-like features
    last_ts = sales_obs.groupby("customer_id")["ts"].max().to_frame("last_ts").reset_index()
    last_ts["recency_days"] = (cutoff - last_ts["last_ts"]).dt.days

    agg = sales_obs.groupby("customer_id").agg(
        freq=("sale_id", "count"),
        monetary=("amount", "sum"),
        avg_amount=("amount", "mean"),
        max_amount=("amount", "max")
    ).reset_index()

    def avg_gap(group):
        g = group.sort_values("ts")["ts"].values
        if len(g) < 2:
            return np.nan
        diffs = np.diff(g).astype("timedelta64[D]").astype(float)
        return float(np.mean(diffs))

    gaps = sales_obs.groupby("customer_id", group_keys=False).apply(avg_gap, include_groups=False).to_frame("avg_gap_days").reset_index()

    feats = customers[["customer_id", "country", "city", "signup_date"]].merge(agg, on="customer_id", how="left")
    feats = feats.merge(last_ts[["customer_id", "recency_days"]], on="customer_id", how="left")
    feats = feats.merge(gaps, on="customer_id", how="left")

    feats["freq"] = feats["freq"].fillna(0).astype(int)
    for c in ["monetary", "avg_amount", "max_amount", "recency_days", "avg_gap_days"]:
        feats[c] = feats[c].astype(float)

    feats["tenure_days"] = (pd.to_datetime(cutoff.date()) - pd.to_datetime(feats["signup_date"])).dt.days.clip(lower=0)

    has_future_purchase = sales_label.groupby("customer_id")["sale_id"].count().reindex(feats["customer_id"]).fillna(0)
    feats["label_churn"] = (has_future_purchase.values == 0).astype(int)

    feats = feats[[
        "customer_id", "country", "city", "tenure_days",
        "freq", "monetary", "avg_amount", "max_amount",
        "recency_days", "avg_gap_days", "label_churn"
    ]]

    return feats, daily_obs, sales_obs

default_cutoff = pd.to_datetime(sales_v["ts"].quantile(0.75)).normalize()
obs_days, label_days = 200, 60

feats, daily_obs, sales_obs = transform_make_features(
    sales=sales_v, customers=customers_v,
    cutoff=default_cutoff, obs_days=obs_days, label_days=label_days
)

default_cutoff, feats.shape, sales_obs.shape, daily_obs.shape

### Visual Analysis

In [None]:
import matplotlib.pyplot as plt

def plot_missingness(df: pd.DataFrame, title: str, out_path: Path):
    miss = df.isna().mean()
    miss = miss[miss > 0].sort_values(ascending=False)  # only plot columns with missing
    if miss.empty:
        print(f"No missing values found in: {title}")
        return
    plt.figure(figsize=(8, 4))
    miss.plot(kind="bar")
    plt.title(title)
    plt.ylabel("Fraction Missing")
    plt.tight_layout()
    plt.savefig(out_path)
    plt.close()

def plot_amount_hist(sales_obs: pd.DataFrame, out_path: Path):
    plt.figure(figsize=(6, 4))
    plt.hist(sales_obs["amount"].dropna(), bins=40)
    plt.title("Amount Distribution (Observation Window)")
    plt.xlabel("Amount")
    plt.ylabel("Count")
    plt.tight_layout()
    plt.savefig(out_path)
    plt.close()

def plot_daily_sales(daily_obs: pd.DataFrame, out_path: Path):
    plt.figure(figsize=(8, 4))
    plt.plot(pd.to_datetime(daily_obs["day"]), daily_obs["daily_sales"])
    plt.title("Daily Sales (Observation Window)")
    plt.xlabel("Day")
    plt.ylabel("Sales")
    plt.tight_layout()
    plt.savefig(out_path)
    plt.close()

def plot_corr_heatmap(feats: pd.DataFrame, out_path: Path):
    numeric = feats.select_dtypes(include=[np.number])
    if numeric.shape[1] < 2:
        return
    corr = numeric.corr()
    plt.figure(figsize=(6, 5))
    plt.imshow(corr, aspect="auto")
    plt.xticks(range(len(corr.columns)), corr.columns, rotation=45, ha="right")
    plt.yticks(range(len(corr.index)), corr.index)
    plt.colorbar()
    plt.title("Feature Correlation (Numeric)")
    plt.tight_layout()
    plt.savefig(out_path)
    plt.close()

# Generate and show inline
plot_missingness(sales_v, "Missingness: Sales (All)", FIG_DIR / "missing_sales.png")
plot_missingness(customers_v, "Missingness: Customers", FIG_DIR / "missing_customers.png")
plot_amount_hist(sales_obs, FIG_DIR / "amount_hist_obs.png")
plot_daily_sales(daily_obs, FIG_DIR / "daily_sales_obs.png")
plot_corr_heatmap(feats.drop(columns=["customer_id"]), FIG_DIR / "feature_corr.png")

from IPython.display import Image, display
for f in FIG_DIR.glob("*.png"):
    display(Image(f))

### Load Phase - I

In [None]:
def dump_outputs(feats: pd.DataFrame, sales_obs: pd.DataFrame, daily_obs: pd.DataFrame,
                 cutoff: pd.Timestamp, obs_days: int, label_days: int):
    feats.to_csv(OUT_DIR / "features.csv", index=False)
    feats.to_parquet(OUT_DIR / "features.parquet", index=False)
    feats.to_json(OUT_DIR / "features.json", orient="records", indent=2)

    sales_obs.to_csv(OUT_DIR / "sales_observation.csv", index=False)
    sales_obs.to_parquet(OUT_DIR / "sales_observation.parquet", index=False)
    sales_obs.to_json(OUT_DIR / "sales_observation.json", orient="records", indent=2)

    daily_obs.to_csv(OUT_DIR / "daily_sales.csv", index=False)

    data_dict = {
        "features": {
            "customer_id": "Unique customer key",
            "country": "Country code",
            "city": "City name",
            "tenure_days": "Days since signup until cutoff",
            "freq": "Purchase count in observation window",
            "monetary": "Total spend in observation window",
            "avg_amount": "Average spend per purchase (obs window)",
            "max_amount": "Max spend in observation window",
            "recency_days": "Days since last purchase at cutoff",
            "avg_gap_days": "Avg days between purchases (obs window)",
            "label_churn": "1 if NO purchase in label window; else 0"
        }
    }
    save_json(data_dict, OUT_DIR / "data_dictionary.json")
    meta = {
        "run_timestamp": datetime.now().isoformat(timespec="seconds"),
        "rng_seed": RNG_SEED,
        "params": {
            "cutoff": str(cutoff.date()),
            "obs_days": obs_days,
            "label_days": label_days
        },
        "rows": {
            "features": int(len(feats)),
            "sales_obs": int(len(sales_obs)),
            "daily_obs": int(len(daily_obs))
        }
    }
    save_json(meta, OUT_DIR / "run_metadata.json")

    qa = {
        "n_customers_total": int(customers_v["customer_id"].nunique()),
        "n_customers_with_obs_purchases": int(sales_obs["customer_id"].nunique()),
        "n_features_rows": int(len(feats)),
        "churn_rate": float(feats["label_churn"].mean()),
        "obs_window_start": str((cutoff - timedelta(days=obs_days)).date()),
        "cutoff": str(cutoff.date()),
        "label_window_end": str((cutoff + timedelta(days=label_days)).date())
    }
    save_json(qa, OUT_DIR / "qa_report.json")
    return qa

qa = dump_outputs(feats, sales_obs, daily_obs, default_cutoff, obs_days, label_days)
qa

### Extract - II and EDA/Checks

In [None]:
import pandas as pd

# Load the saved features file
# df = pd.read_csv("output/features.csv")
# df = pd.read_parquet("output/features.parquet")
df = pd.read_json("output/features.json")

# Quick EDA to confirm shape and missingness
print(df.shape)
print(df.isnull().sum())
print(df.head(10))

### Load - II

In [None]:
df.to_csv("data.csv", index=False)

### Transform Phase -II

In [None]:
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer

# Load data
_df = pd.read_csv("data.csv")

# Separate features and target
X = _df.drop(columns=["label_churn", "customer_id"])
y = _df["label_churn"]

# Split BEFORE fitting anything (avoid leakage)
X_train, X_valid, y_train, y_valid = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)

# Infer column types from TRAIN ONLY
numeric_cols = X_train.select_dtypes(include=["number"]).columns.tolist()
categorical_cols = X_train.select_dtypes(include=["object", "category"]).columns.tolist()

# Pipelines
numeric_pipeline = Pipeline([
    ("imputer", SimpleImputer(strategy="mean")),
    ("scaler", StandardScaler()),
])

categorical_pipeline = Pipeline([
    ("imputer", SimpleImputer(strategy="most_frequent")),
    ("encoder", OneHotEncoder(handle_unknown="ignore", sparse_output=False)),  # dense
])

# Build ColumnTransformer (skip empty groups gracefully)
transformers = []
if numeric_cols:
    transformers.append(("num", numeric_pipeline, numeric_cols))
if categorical_cols:
    transformers.append(("cat", categorical_pipeline, categorical_cols))

preprocessor = ColumnTransformer(transformers)

# Fit on train, transform both
X_train_processed = preprocessor.fit_transform(X_train)
X_valid_processed = preprocessor.transform(X_valid)

# Feature names
num_features = numeric_cols
cat_features = []
if categorical_cols:
    cat_features = preprocessor.named_transformers_["cat"]["encoder"]\
                               .get_feature_names_out(categorical_cols).tolist()
all_features = num_features + cat_features

# Back to DataFrames with aligned indices
X_train_df = pd.DataFrame(X_train_processed, columns=all_features, index=X_train.index).reset_index(drop=True)
X_valid_df = pd.DataFrame(X_valid_processed, columns=all_features, index=X_valid.index).reset_index(drop=True)

# Reattach target
train_final_df = pd.concat([X_train_df, y_train.reset_index(drop=True)], axis=1)
valid_final_df = pd.concat([X_valid_df, y_valid.reset_index(drop=True)], axis=1)

In [None]:
# Save final datasets (Load - III)

# OR

# CAN BE USED FOR MODEL TRAINING AND VALIDATION DIRECTLY FROM HERE