# Lab 03 — Resilient API Harvester + SQL Extractor with Provenance

**Focus Areas:** API Harvester (rate‑limited, paginated, with retries & snapshots) and SQL Extractor (parameterized, chunked, Parquet)

See **Appendices** below for additional information on what *with Provenance* means.

---

## Outcomes

By the end of this lab, you will be able to:

1. Run a **local REST API** backed by SQLite (Datasette) and a lightweight **auth + rate‑limit proxy** that enforces Bearer tokens and emits `429` with `Retry-After`.
2. Implement a production‑style **API harvester** that supports **query params**, **cursor pagination**, **exponential backoff with jitter**, and **idempotent incremental fetch**.
3. Persist **raw JSON snapshots** per page with a **provenance manifest** (source URL, params, status, checksum, timestamp).
4. Normalize harvested JSON into pandas DataFrames and write to **Parquet** (optionally partitioned) for downstream validation/profiling.
5. Use **parameterized SQL** against SQLite via `pd.read_sql_query` with **chunked reads** and append to Parquet in a streaming fashion.

## Prerequisites & Setup

- Python 3.13 with `requests`, `pandas`, `numpy`, `pyarrow`, `datasette`, `fastapi`, `uvicorn`, `httpx`, `aiofiles` installed (proxy uses FastAPI).
- JupyterLab or VS Code with Jupyter extension.
- Completion of Lab 02 (reuse of **SQLite** setup).

### Setup Steps

1. Create a clean workspace and env
2. Reuse / obtain the SQLite DB (Northwind)
3. Start Datasette on port 8001 (same as Lab 02)
4. Start an **Auth + Rate‑Limit Proxy** (FastAPI) on port 8002
5. Start this notebook

### 1) Create a clean workspace and env

```bash
mkdir -p lab03 && cd lab03
python -m venv .venv
source .venv/bin/activate   # Windows: .venv\Scripts\activate
pip install --upgrade pip
pip install requests pandas numpy pyarrow datasette fastapi uvicorn aiofiles httpx
```

### 2) Reuse / obtain the SQLite DB (Northwind)

```bash
# If you already have northwind.db from Lab 02, copy it here. Otherwise:
curl -L -o northwind.db \
  https://raw.githubusercontent.com/jpwhite3/northwind-SQLite3/main/dist/northwind.db
```

### 3) Start Datasette on port 8001 (same as Lab 02)

```bash
datasette northwind.db -h 127.0.0.1 -p 8001
```

Leave this running. Open a new terminal for the **proxy**.

### 4) Start an **Auth + Rate‑Limit Proxy** (FastAPI) on port 8002

Create `proxy.py` with the following contents:

```python
# proxy.py — Bearer auth + fixed-window rate limit + pass-through to Datasette
import time, asyncio, os
from typing import Optional
from fastapi import FastAPI, Request, Response, HTTPException
from fastapi.responses import JSONResponse
import httpx

DATASETTE = os.getenv("UPSTREAM", "http://127.0.0.1:8001")
REQUIRED_TOKEN = os.getenv("API_TOKEN", "super-secret-token")
RATE_LIMIT = int(os.getenv("RATE_LIMIT", 60))  # requests per minute per token

app = FastAPI()

# simple in-memory counters (sufficient for lab)
_counters = {}
_window_starts = {}

async def check_rate_limit(token: str) -> Optional[int]:
    now = int(time.time())
    window = now // 60
    key = (token, window)
    if _window_starts.get(key) is None:
        _window_starts[key] = window
        _counters[key] = 0
    _counters[key] += 1
    remaining = RATE_LIMIT - _counters[key]
    if remaining < 0:
        reset = (window + 1) * 60 - now
        return reset
    return None

@app.middleware("http")
async def enforce_auth_and_rate_limit(request: Request, call_next):
    # enforce bearer token
    auth = request.headers.get("authorization", "")
    if not auth.lower().startswith("bearer "):
        return JSONResponse({"error": "missing bearer token"}, status_code=401)
    token = auth.split(" ", 1)[1]
    if token != REQUIRED_TOKEN:
        return JSONResponse({"error": "invalid token"}, status_code=403)

    # rate limit
    reset = await check_rate_limit(token)
    if reset is not None:
        headers = {"Retry-After": str(reset), "X-RateLimit-Reset": str(reset)}
        return JSONResponse({"error": "rate limit exceeded"}, status_code=429, headers=headers)

    return await call_next(request)

@app.api_route("/{path:path}", methods=["GET"])
async def proxy(path: str, request: Request):
    # Very small pass-through for GET to Datasette
    params = dict(request.query_params)
    upstream_url = f"{DATASETTE}/{path}"
    async with httpx.AsyncClient(timeout=30.0) as client:
        r = await client.get(upstream_url, params=params)
        return Response(content=r.content, status_code=r.status_code, headers=dict(r.headers), media_type=r.headers.get("content-type"))
```

Run the proxy:

```bash
export API_TOKEN=super-secret-token
uvicorn proxy:app --host 127.0.0.1 --port 8002 --reload
```

> The proxy exposes the same endpoints as Datasette but now requires `Authorization: Bearer super-secret-token` and enforces a per‑minute request cap (default 60). It returns `429` with `Retry-After` when exceeded.

---

## Part A — API Harvester with Retries, Pagination, and Snapshots

### A1. Project scaffolding

Set up the base configuration and create necessary directories for artifacts.

In [None]:
import os, json, hashlib, time, datetime as dt
from pathlib import Path

BASE = "http://127.0.0.1:8002"   # go through proxy
DB   = "northwind"
TABLE = "Orders"
PAGE_SIZE = 200
TOKEN = "super-secret-token"

ARTIFACTS = Path("artifacts")
RAW_DIR = ARTIFACTS / "raw"
MANIFEST = ARTIFACTS / "manifest.jsonl"
PARQUET_DIR = ARTIFACTS / "parquet"

for d in [RAW_DIR, PARQUET_DIR]:
    d.mkdir(parents=True, exist_ok=True)

print(f"✓ Created artifact directories: {RAW_DIR}, {PARQUET_DIR}")
print(f"✓ Manifest will be saved to: {MANIFEST}")

### A2. Resilient GET helper with exponential backoff + jitter and `Retry-After`

This function implements robust retry logic with:
- Exponential backoff with jitter
- Respect for `Retry-After` headers (429 responses)
- Handling of transient errors (500, 502, 503, 504)
- Timeout and connection error handling

In [None]:
import requests, random

class HarvestError(Exception):
    pass

def resilient_get(url, params=None, headers=None, max_retries=6):
    """GET with exponential backoff, jitter, and Retry-After support."""
    backoff = 0.5
    headers = headers or {}
    
    for attempt in range(1, max_retries+1):
        try:
            resp = requests.get(url, params=params, headers=headers, timeout=20)
            
            if resp.status_code == 200:
                return resp
            
            if resp.status_code == 429:
                ra = resp.headers.get("Retry-After")
                delay = float(ra) if ra else backoff
            elif resp.status_code in (500, 502, 503, 504):
                delay = backoff
            else:
                raise HarvestError(f"Unexpected {resp.status_code}: {resp.text[:200]}")
            
            # exponential backoff with jitter
            jitter = random.uniform(0, 0.25 * delay)
            print(f"  → Attempt {attempt}: status {resp.status_code}, sleeping {delay+jitter:.2f}s")
            time.sleep(delay + jitter)
            backoff = min(backoff * 2, 8.0)
            
        except (requests.Timeout, requests.ConnectionError) as e:
            print(f"  → Attempt {attempt}: {type(e).__name__}, sleeping {backoff:.2f}s")
            time.sleep(backoff)
            backoff = min(backoff * 2, 8.0)
    
    raise HarvestError(f"Failed after {max_retries} retries: {url}")

print("✓ Resilient GET helper defined")

### A3. Cursor pagination + raw snapshot persistence

Implement the core harvesting logic:
- Paginate through API results using cursor tokens
- Save raw JSON snapshots for each page
- Record provenance metadata in a manifest

In [None]:
import pandas as pd

HEADERS = {"Authorization": f"Bearer {TOKEN}"}

def sha256_bytes(b: bytes) -> str:
    """Compute SHA-256 hash of bytes."""
    return hashlib.sha256(b).hexdigest()

def save_snapshot(payload_bytes: bytes, meta: dict) -> Path:
    """Save raw JSON snapshot and append metadata to manifest."""
    ts = dt.datetime.now(dt.UTC).strftime("%Y%m%dT%H%M%SZ")
    fname = f"{meta['table']}_{ts}_{meta['page']:05d}.json"
    fpath = RAW_DIR / fname
    fpath.write_bytes(payload_bytes)
    
    # append to manifest as JSONL
    record = {
        **meta,
        "timestamp": ts,
        "sha256": sha256_bytes(payload_bytes),
        "bytes": len(payload_bytes)
    }
    with MANIFEST.open("a", encoding="utf-8") as fh:
        fh.write(json.dumps(record) + "\n")
    
    return fpath

def harvest_table(base, db, table, page_size=100, start_cursor=None, extra_params=None):
    """Harvest a table with cursor pagination, saving snapshots."""
    url = f"{base}/{db}/{table}.json"
    params = {"_size": page_size}
    if extra_params:
        params.update(extra_params)
    
    next_tok = start_cursor
    page = 0
    all_rows = []
    columns = None  # Store column names from first response
    
    while True:
        if next_tok:
            params["_next"] = next_tok
        
        resp = resilient_get(url, params=params, headers=HEADERS)
        payload_bytes = resp.content
        payload = resp.json()
        rows = payload.get("rows", [])
        
        # Capture column names from first page
        if columns is None:
            columns = payload.get("columns", [])
            print(f"Columns: {columns}")
        
        page += 1
        
        save_snapshot(payload_bytes, {
            "source": url,
            "params": params.copy(),
            "table": table,
            "status": resp.status_code,
            "page": page,
            "columns": columns  # Add to manifest
        })
        
        print(f"Page {page}: {len(rows)} rows")
        
        if not rows:
            break
        
        all_rows.extend(rows)
        next_tok = payload.get("next")
        if not next_tok:
            break
    
    # Create DataFrame with explicit column names
    df = pd.DataFrame(all_rows, columns=columns)
    return df    

print("✓ Harvesting functions defined")

### Harvest the Orders table

Execute the initial harvest of all Orders data.

In [None]:
print(f"Harvesting {TABLE} from {BASE}/{DB}...\n")
orders = harvest_table(BASE, DB, "Orders", page_size=PAGE_SIZE)

print(f"\n✓ Harvested {len(orders)} total rows")
print(f"✓ Shape: {orders.shape}")
print(f"\nFirst few rows:")
orders.head()

### Verify snapshots and manifest

Check that raw snapshots and the provenance manifest were created correctly.

In [None]:
# List raw snapshots
snapshots = list(RAW_DIR.glob("*.json"))
print(f"✓ Created {len(snapshots)} snapshot files:")
for snap in snapshots[:5]:
    print(f"  - {snap.name}")
if len(snapshots) > 5:
    print(f"  ... and {len(snapshots) - 5} more")

# Check manifest
if MANIFEST.exists():
    manifest_lines = MANIFEST.read_text().strip().split("\n")
    print(f"\n✓ Manifest has {len(manifest_lines)} entries")
    print(f"\nSample manifest entry:")
    print(json.dumps(json.loads(manifest_lines[0]), indent=2))

### A4. Incremental harvesting with a high‑watermark

Implement incremental data fetching using a watermark strategy based on `OrderDate`.

In [None]:
WATERMARK_FILE = ARTIFACTS / "last_watermark.txt"

def read_watermark(default="1997-01-01"):
    """Read the last watermark value, or return default."""
    if WATERMARK_FILE.exists():
        return WATERMARK_FILE.read_text().strip()
    return default

def write_watermark(value: str):
    """Write the new watermark value."""
    WATERMARK_FILE.write_text(value)

print("✓ Watermark functions defined")

### Perform incremental harvest

In [None]:
last = read_watermark()
print(f"Starting watermark: {last}\n")

# Harvest only rows since last watermark
orders_inc = harvest_table(
    BASE, DB, "Orders", page_size=PAGE_SIZE,
    extra_params={"OrderDate__gte": last}  # Datasette accepts column filters
)

# Advance watermark to the max OrderDate we saw (if any)
if not orders_inc.empty and "OrderDate" in orders_inc.columns:
    new_wm = max(str(d) for d in orders_inc["OrderDate"])
    write_watermark(new_wm)
    print(f"\n✓ Advanced watermark to: {new_wm}")
else:
    print(f"\n✓ No new rows; watermark unchanged: {last}")

print(f"\nIncremental harvest retrieved {len(orders_inc)} rows")

### A5. Normalize and persist to Parquet (partitioned)

Transform the harvested data and write partitioned Parquet files.

In [None]:
from pathlib import Path
import pyarrow as pa, pyarrow.parquet as pq

# Check actual column names and strip table prefix if present
print("Available columns:", orders_inc.columns.tolist())
print("DataFrame shape:", orders_inc.shape)

# Simple normalization: select a subset + parse dates
use = orders_inc[["OrderID","CustomerID","OrderDate","ShipCountry","Freight"]].copy()
use["OrderDate"] = pd.to_datetime(use["OrderDate"], errors="coerce").dt.date

# Partition by ShipCountry for faster downstream filters
out_root = PARQUET_DIR / "orders"
out_root.mkdir(parents=True, exist_ok=True)

for country, g in use.groupby("ShipCountry"):
    table = pa.Table.from_pandas(g, preserve_index=False)
    pq.write_table(table, out_root / f"shipcountry={country}.parquet")

# Quick QA
files = list(out_root.glob("*.parquet"))
print(f"\n✓ Created {len(files)} partitioned Parquet files")
print(f"\nSample files:")
for f in files[:5]:
    print(f"  - {f.name}")
if len(files) > 5:
    print(f"  ... and {len(files) - 5} more")

### A6. (Optional) Harvest a second table and join

Demonstrate harvesting additional tables and performing joins.

In [None]:
print("Harvesting Order Details table...\n")
DETAILS_PAGE_SIZE = 1_000
# To make faster, modify max requests in proxy.py and restart
details = harvest_table(BASE, DB, "Order+Details", page_size=DETAILS_PAGE_SIZE)

# Normalize column names (Datasette may include spaces)
details.columns = [c.replace(" ", "_") for c in details.columns]

print(f"\n✓ Harvested {len(details)} detail rows")
print(f"\nColumns: {list(details.columns)}")

### Join orders to details and persist

In [None]:
# Join orders to details on OrderID
merged = details.merge(use, on="OrderID", how="inner")

print(f"✓ Joined dataset has {len(merged)} rows")
print(f"\nFirst few rows:")
merged.head()

In [None]:
# Persist merged data to Parquet
line_items_path = PARQUET_DIR / "line_items.parquet"
merged.to_parquet(line_items_path, index=False)

print(f"✓ Saved joined data to {line_items_path}")
print(f"  File size: {line_items_path.stat().st_size:,} bytes")

---

## Part B — SQL Extractor with Parameterization & Chunked Reads

### B1. Parameterized queries with user inputs

Demonstrate safe SQL queries using parameterization to prevent SQL injection.

In [None]:
import sqlite3
# Update file path as needed
conn = sqlite3.connect("lab03/northwind.db")

country = "USA"
start_date = "1997-01-01"

q = """
SELECT o.OrderID, o.CustomerID, o.OrderDate, o.ShipCountry,
       d.ProductID, d.UnitPrice, d.Quantity, d.Discount
FROM Orders o
JOIN [Order Details] d ON o.OrderID = d.OrderID
WHERE o.ShipCountry = ? AND o.OrderDate >= ?
ORDER BY o.OrderDate
"""
params = (country, start_date)

rows = pd.read_sql_query(q, conn, params=params)

print(f"✓ Parameterized query returned {len(rows)} rows")
print(f"\nQuery: ShipCountry = '{country}' AND OrderDate >= '{start_date}'")
print(f"\nFirst few rows:")
rows.head()

### B2. Chunked reads → streaming Parquet append

Process large result sets in chunks to manage memory efficiently.

In [None]:
out_path = PARQUET_DIR / "orders_joined.parquet"
writer = None
total_rows = 0

print("Processing query in chunks of 25,000 rows...\n")

for i, chunk in enumerate(pd.read_sql_query(q, conn, params=params, chunksize=25_000), 1):
    # Optional transforms
    chunk["OrderDate"] = pd.to_datetime(chunk["OrderDate"], errors="coerce")
    
    table = pa.Table.from_pandas(chunk, preserve_index=False)
    
    if writer is None:
        writer = pq.ParquetWriter(out_path, table.schema)
    
    writer.write_table(table)
    total_rows += len(chunk)
    print(f"  Chunk {i}: {len(chunk)} rows (total: {total_rows})")

if writer:
    writer.close()

print(f"\n✓ Wrote {total_rows} total rows to {out_path}")

### Verify the streamed Parquet file

In [None]:
ver = pd.read_parquet(out_path)

print(f"✓ Verification: read {len(ver)} rows from Parquet")
print(f"\nRandom sample:")
ver.sample(3, random_state=42)

### B3. (Optional) Partitioned write by date

Partition data by year for optimized queries.

In [None]:
# Partition by year of OrderDate
rows2 = pd.read_sql_query(q, conn, params=params)
rows2["OrderDate"] = pd.to_datetime(rows2["OrderDate"], errors="coerce")
rows2["year"] = rows2["OrderDate"].dt.year

print("Writing partitioned files by year...\n")

for yr, g in rows2.groupby("year"):
    yr_path = PARQUET_DIR / f"orders_year={yr}.parquet"
    pq.write_table(pa.Table.from_pandas(g, preserve_index=False), yr_path)
    print(f"  Year {yr}: {len(g)} rows → {yr_path.name}")

year_files = list(PARQUET_DIR.glob("orders_year=*.parquet"))
print(f"\n✓ Created {len(year_files)} year-partitioned files")

### Clean up database connection

In [None]:
conn.close()
print("✓ Database connection closed")

---

## Part C — Wrap‑Up & Reflection

### Reflection Questions

**1. How does your harvester treat `429` vs `500`? What signal does `Retry-After` convey? Why add jitter?**

- **429 (Rate Limit):** The harvester respects the `Retry-After` header when present, which tells us exactly how long to wait before retrying. If no `Retry-After` is present, we fall back to exponential backoff. This is a "client fault" that we can resolve by waiting.

- **500+ (Server Errors):** These are transient server-side failures. We use exponential backoff to give the server time to recover.

- **Jitter:** Adding random jitter (0-25% of the delay) prevents the "thundering herd" problem where multiple clients retry simultaneously, potentially overwhelming a recovering service. It spreads out retry attempts across time.

**2. Explain your high‑watermark logic. What happens if the API returns out‑of‑order data?**

- The **high-watermark** tracks the maximum `OrderDate` seen in each harvest run and is persisted to `last_watermark.txt`.
- On subsequent runs, we only request rows with `OrderDate >= last_watermark`, making the harvest incremental and efficient.
- **Out-of-order data:** If data arrives late (e.g., an order backdated after we've advanced the watermark), we'll miss it. Solutions:
  - Use `>=` with deduplication on write
  - Use an `updated_at` field instead of `created_at`
  - Run periodic backfills with an overlap window

**3. Why are parameterized queries critical, even for local labs? Provide a small example of a risky string‑formatted SQL.**

**Critical because:**
- Prevents SQL injection attacks
- Handles special characters correctly (quotes, backslashes)
- Separates code from data
- Good habit for production code

**Risky example:**
```python
# DANGEROUS - Never do this!
country = "USA'; DROP TABLE Orders; --"
query = f"SELECT * FROM Orders WHERE ShipCountry = '{country}'"
# This would execute: SELECT * FROM Orders WHERE ShipCountry = 'USA'; DROP TABLE Orders; --'
```

**Safe example:**
```python
# SAFE - Always do this
query = "SELECT * FROM Orders WHERE ShipCountry = ?"
params = (country,)
pd.read_sql_query(query, conn, params=params)
```

**4. When would you choose chunked reads? Name one trade‑off.**

**Use chunked reads when:**
- Working with datasets larger than available RAM
- Streaming data to disk or another system
- Need to start processing before full query completes
- Want to show progress for long-running queries

**Trade-off:**
- **Complexity vs Memory:** Chunked processing adds code complexity (managing the writer, aggregating results) but prevents memory exhaustion. You also can't easily perform operations that require the full dataset (like certain aggregations or sorts) without collecting chunks.

---

## Summary of Artifacts Created

Let's review what was created during this lab:

In [None]:
print("=" * 60)
print("ARTIFACT SUMMARY")
print("=" * 60)

# Raw snapshots
snapshots = list(RAW_DIR.glob("*.json"))
print(f"\n📁 Raw JSON Snapshots: {len(snapshots)} files")
print(f"   Location: {RAW_DIR}")
total_bytes = sum(f.stat().st_size for f in snapshots)
print(f"   Total size: {total_bytes:,} bytes")

# Manifest
if MANIFEST.exists():
    manifest_lines = len(MANIFEST.read_text().strip().split("\n"))
    print(f"\n📋 Provenance Manifest: {manifest_lines} entries")
    print(f"   Location: {MANIFEST}")

# Watermark
if WATERMARK_FILE.exists():
    wm = WATERMARK_FILE.read_text().strip()
    print(f"\n🔖 High-Watermark: {wm}")
    print(f"   Location: {WATERMARK_FILE}")

# Parquet files
parquet_files = list(PARQUET_DIR.rglob("*.parquet"))
print(f"\n📊 Parquet Files: {len(parquet_files)} files")
print(f"   Location: {PARQUET_DIR}")
for pf in parquet_files:
    size = pf.stat().st_size
    rel_path = pf.relative_to(PARQUET_DIR)
    print(f"   - {rel_path} ({size:,} bytes)")

print("\n" + "=" * 60)
print("✓ Lab 03 Complete!")
print("=" * 60)

---

## Key Takeaways

1. **Resilient API Harvesting:** Implemented exponential backoff with jitter and proper handling of rate limits and transient errors.

2. **Provenance Tracking:** Every page fetch is recorded with checksums, timestamps, and metadata for full audit trails.

3. **Incremental Processing:** High-watermark strategy enables efficient incremental harvests without re-downloading existing data.

4. **Partitioned Storage:** Parquet files organized by dimensions (ShipCountry, year) for optimized downstream queries.

5. **SQL Best Practices:** Parameterized queries prevent injection attacks; chunked reads manage memory efficiently.

6. **Production-Ready Patterns:** Auth headers, retry budgets, structured manifests, and idempotent operations prepare you for real-world data pipelines.