# Session 6 — Pipelines & Orchestration

Full vs Incremental loads, job scheduling, monitoring (Airflow concepts), parameterization, and Medallion architecture.


## Environment Setup

In [None]:
import sys, sqlite3, pandas as pd, numpy as np, matplotlib.pyplot as plt
from pathlib import Path
print(sys.version)
import seaborn as sns; sns.set_theme()
DB_PATH = Path('course.db')
conn = sqlite3.connect(DB_PATH)
conn.execute('PRAGMA foreign_keys=ON;')
print('SQLite ready at', DB_PATH.resolve())
def run_sql(q, params=None):
    params = params or {}
    df = pd.read_sql_query(q, conn, params=params)
    display(df)
    return df

## Full Load vs Incremental Loads

**Full Load** copies the entire source into the destination each run.  
**Incremental Load** copies only **new or changed** records since the last run.

![Full vs Incremental](/mnt/data/images/full_vs_incremental.png)

**Why it matters:** Incrementals reduce compute, bandwidth, and downtime; full loads are simpler and good for initial backfills or small tables.

In [None]:
# Demo: emulate a source table with change tracking (updated_at)
conn.executescript('''
DROP TABLE IF EXISTS src_customers;
DROP TABLE IF EXISTS dw_customers;

CREATE TABLE src_customers(
  id INTEGER PRIMARY KEY, name TEXT, city TEXT, updated_at TEXT
);
INSERT INTO src_customers VALUES
 (1,'Aria','Austin','2024-10-01'),
 (2,'Ben','Berlin','2024-10-01'),
 (3,'Chloe','Chicago','2024-10-01');

CREATE TABLE dw_customers(
  id INTEGER PRIMARY KEY, name TEXT, city TEXT, updated_at TEXT
);
'''); conn.commit()

print("FULL LOAD #1 → copy all rows")
conn.executescript("INSERT INTO dw_customers SELECT * FROM src_customers;"); conn.commit()
print("DW after full load:"); 
_ = pd.read_sql_query("SELECT * FROM dw_customers", conn); display(_)

print("Simulate changes at source (1 update, 1 insert)")
conn.executescript('''
UPDATE src_customers SET city='Aurora', updated_at='2024-10-05' WHERE id=3;
INSERT INTO src_customers VALUES (4,'Dai','Denver','2024-10-05');
'''); conn.commit()

print("INCREMENTAL LOAD using updated_at watermark")
last_watermark = pd.read_sql_query("SELECT MAX(updated_at) AS wm FROM dw_customers;", conn).iloc[0]['wm']
print("Last watermark:", last_watermark)
changed = pd.read_sql_query("SELECT * FROM src_customers WHERE updated_at > ?", conn, params=[last_watermark])
display(changed)

# Upsert (SQLite: replace strategy via INSERT OR REPLACE)
conn.executemany("INSERT OR REPLACE INTO dw_customers(id,name,city,updated_at) VALUES (?,?,?,?)", changed.values.tolist())
conn.commit()

print("DW after incremental:")
display(pd.read_sql_query("SELECT * FROM dw_customers ORDER BY id", conn))

## Job Scheduling

Schedulers coordinate when pipelines run.
![Scheduling](/mnt/data/images/job_scheduling.png)

**Common options:** cron, Airflow, Prefect, Dagster, dbt Cloud.  
Below are **cron** and an **Airflow-style DAG** (illustrative).

In [None]:
# Example: a cron entry (documentation-only string)
cron_example = """
# Run incremental load at 2:05 AM daily
5 2 * * * /usr/bin/python3 /opt/pipelines/incremental_load.py >> /var/log/pipeline.log 2>&1
"""
print(cron_example)

In [None]:
# Airflow DAG pseudo-example (doesn't execute here, for reference)
airflow_dag = '''from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def full_load(**context):
    # call your SQL scripts / Python to do initial full load
    pass

def incremental_load(**context):
    # pull last watermark, extract changes, upsert
    pass

with DAG(
    dag_id="customers_etl",
    start_date=datetime(2024,10,1),
    schedule_interval="@daily",
    catchup=False,
    default_args={"retries": 1},
) as dag:
    t1 = PythonOperator(task_id="full_load", python_callable=full_load)
    t2 = PythonOperator(task_id="incremental_load", python_callable=incremental_load)
    t1 >> t2
'''
print(airflow_dag)

## Monitoring & Observability

What to track: **task success/failure, duration, data volumes, row counts, anomalies, alerts**.  
![Monitoring](/mnt/data/images/monitoring.png)

Below are simple **row-count checks** and **alert stubs**.

In [None]:
# Row-count check before/after load
before = pd.read_sql_query("SELECT COUNT(*) AS n FROM dw_customers", conn).iloc[0]['n']
# simulate another change
conn.execute("INSERT INTO src_customers VALUES (5,'Eva','El Paso','2024-10-06')"); conn.commit()
# incremental again
last_wm = pd.read_sql_query("SELECT MAX(updated_at) AS wm FROM dw_customers", conn).iloc[0]['wm']
delta = pd.read_sql_query("SELECT * FROM src_customers WHERE updated_at > ?", conn, params=[last_wm])
conn.executemany("INSERT OR REPLACE INTO dw_customers(id,name,city,updated_at) VALUES (?,?,?,?)", delta.values.tolist())
conn.commit()
after = pd.read_sql_query("SELECT COUNT(*) AS n FROM dw_customers", conn).iloc[0]['n']

if after < before:
    print("ALERT: Row count dropped unexpectedly!")
else:
    print("OK: Row count non-decreasing.", before, "->", after)

## Parameterization

Pass **environment-specific parameters** (e.g., schema names, file paths, watermarks) to avoid hard-coding.  
![Parameterization](/mnt/data/images/parameterization.png)

We'll use a simple dictionary; in production use YAML/JSON or environment variables.

In [None]:
import os, json
CONFIG = {
    "watermark_table": "dw_customers",
    "batch_size": 5000,
    "email_alerts": False,
    "target_schema": "main"
}
print(json.dumps(CONFIG, indent=2))

def get_watermark(conn, table, col="updated_at"):
    q = f"SELECT MAX({col}) AS wm FROM {table}"
    return pd.read_sql_query(q, conn).iloc[0]['wm']

print("Current watermark:", get_watermark(conn, CONFIG["watermark_table"]))

## Medallion Architecture (Bronze → Silver → Gold)

**Bronze:** raw ingested data (minimal changes)  
**Silver:** cleaned & conformed data (types fixed, duplicates removed)  
**Gold:** business-ready aggregates and marts

![Medallion](/mnt/data/images/medallion_architecture.png)

Below we stage customer data through bronze→silver→gold using SQLite tables.

In [None]:
conn.executescript('''
DROP TABLE IF EXISTS bronze_customers;
DROP TABLE IF EXISTS silver_customers;
DROP TABLE IF EXISTS gold_customer_stats;

CREATE TABLE bronze_customers AS SELECT * FROM src_customers; -- raw
CREATE TABLE silver_customers AS
  SELECT DISTINCT id, name, TRIM(city) AS city, updated_at FROM bronze_customers
  WHERE name IS NOT NULL; -- simple cleanup
CREATE TABLE gold_customer_stats AS
  SELECT city, COUNT(*) AS n_customers
  FROM silver_customers GROUP BY city;
'''); conn.commit()

print("Bronze sample:"); display(pd.read_sql_query("SELECT * FROM bronze_customers LIMIT 5", conn))
print("Silver sample:"); display(pd.read_sql_query("SELECT * FROM silver_customers LIMIT 5", conn))
print("Gold sample:"); display(pd.read_sql_query("SELECT * FROM gold_customer_stats", conn))