# Project 2 — Incremental Batch ETL + Idempotency (SQLite)

**Audience:** Technical team

This notebook simulates a daily incremental batch pipeline that:
- generates deterministic daily transactions
- loads into **staging**
- performs **idempotent upserts** into **fact** (re-runs do not duplicate)
- runs basic data quality checks

This is a common pattern for batch orchestration tools (Airflow, Prefect, Dagster) without requiring any external infra.

In [ ]:
import pandas as pd
import sqlite3
from datetime import datetime
import random

## 1) Generate deterministic daily data
Deterministic IDs allow safe reprocessing for the same `run_date`.

In [ ]:
def generate_daily_transactions(run_date: str, n: int = 25) -> pd.DataFrame:
    """Generate a deterministic dataset for a given date (safe for reruns)."""
    random.seed(hash(run_date) % 10_000)
    rows = []
    for i in range(n):
        txn_id = f"{run_date.replace('-', '')}-{i:04d}"
        customer_id = random.choice([10, 11, 12, 13, 14])
        amount = round(random.random() * 500, 2)
        rows.append({"txn_id": txn_id, "customer_id": customer_id, "amount": amount, "run_date": run_date})
    df = pd.DataFrame(rows)
    print(f"[GEN] {run_date} generated rows: {len(df)}")
    return df


## 2) Database schema
- `staging_transactions`: landing area
- `fact_transactions`: analytics-ready table with `loaded_at` audit column

`txn_id` is the primary key enabling idempotent upserts.

In [ ]:
def connect_sqlite(db_path: str = "pipeline.db") -> sqlite3.Connection:
    print("[DB] Using SQLite DB:", db_path)
    return sqlite3.connect(db_path)

def init_schema(conn: sqlite3.Connection) -> None:
    cur = conn.cursor()
    cur.execute("""CREATE TABLE IF NOT EXISTS staging_transactions(
        txn_id TEXT PRIMARY KEY,
        customer_id INTEGER NOT NULL,
        amount REAL NOT NULL,
        run_date TEXT NOT NULL
    );""")
    cur.execute("""CREATE TABLE IF NOT EXISTS fact_transactions(
        txn_id TEXT PRIMARY KEY,
        customer_id INTEGER NOT NULL,
        amount REAL NOT NULL,
        run_date TEXT NOT NULL,
        loaded_at TEXT NOT NULL
    );""")
    conn.commit()
    print("[DB] Tables ensured.")


## 3) Load staging and upsert into fact
The `ON CONFLICT` clause ensures idempotency.

In [ ]:
def load_staging(conn: sqlite3.Connection, df: pd.DataFrame) -> None:
    print("[LOAD] Loading staging...")
    cur = conn.cursor()
    for _, r in df.iterrows():
        cur.execute(
            """INSERT INTO staging_transactions(txn_id, customer_id, amount, run_date)
               VALUES(?,?,?,?)
               ON CONFLICT(txn_id) DO UPDATE SET
                 customer_id=excluded.customer_id,
                 amount=excluded.amount,
                 run_date=excluded.run_date
            """,
            (r["txn_id"], int(r["customer_id"]), float(r["amount"]), r["run_date"]),
        )
    conn.commit()
    n = conn.execute("SELECT COUNT(*) FROM staging_transactions").fetchone()[0]
    print("[LOAD] staging rows:", n)

def upsert_fact(conn: sqlite3.Connection) -> None:
    print("[FACT] Upserting into fact...")
    loaded_at = datetime.utcnow().isoformat()
    cur = conn.cursor()
    cur.execute(f"""INSERT INTO fact_transactions(txn_id, customer_id, amount, run_date, loaded_at)
        SELECT txn_id, customer_id, amount, run_date, '{loaded_at}'
        FROM staging_transactions
        ON CONFLICT(txn_id) DO UPDATE SET
          customer_id=excluded.customer_id,
          amount=excluded.amount,
          run_date=excluded.run_date,
          loaded_at=excluded.loaded_at
    """)
    conn.commit()
    n = conn.execute("SELECT COUNT(*) FROM fact_transactions").fetchone()[0]
    print("[FACT] fact rows:", n)


## 4) Data quality checks
Basic checks:
- nulls
- duplicates (PK)
- non-negative amounts

In [ ]:
def dq_checks(conn: sqlite3.Connection) -> None:
    print("[DQ] Running checks...")

    nulls = conn.execute(
        "SELECT COUNT(*) FROM fact_transactions WHERE customer_id IS NULL OR amount IS NULL"
    ).fetchone()[0]
    assert nulls == 0, f"Nulls found: {nulls}"

    neg = conn.execute("SELECT COUNT(*) FROM fact_transactions WHERE amount < 0").fetchone()[0]
    assert neg == 0, f"Negative amounts found: {neg}"

    dup = conn.execute("""SELECT COUNT(*) FROM (
        SELECT txn_id FROM fact_transactions GROUP BY txn_id HAVING COUNT(*) > 1
    )""").fetchone()[0]
    assert dup == 0, f"Duplicates found: {dup}"

    print("[DQ] OK ✅")


## 5) Run demo
We run day 1, re-run day 1 (no duplication), then day 2.

In [ ]:
def run_pipeline_for_day(run_date: str) -> None:
    conn = connect_sqlite("pipeline.db")
    try:
        init_schema(conn)
        df = generate_daily_transactions(run_date, n=25)
        load_staging(conn, df)
        upsert_fact(conn)
        dq_checks(conn)

        report = pd.read_sql_query(
            "SELECT customer_id, SUM(amount) AS total FROM fact_transactions GROUP BY customer_id ORDER BY total DESC",
            conn,
        )
        print("[REPORT] Customer totals:")
        display(report.head(10))
    finally:
        conn.close()
        print("[DB] Connection closed.")

day1 = "2026-02-10"
day2 = "2026-02-11"

run_pipeline_for_day(day1)
print("\n--- RE-RUN SAME DAY (IDEMPOTENT) ---\n")
run_pipeline_for_day(day1)
print("\n--- NEXT DAY ---\n")
run_pipeline_for_day(day2)
