Cell 1 — Imports + Config

In [None]:
import os
import json
from typing import Any, Dict, List, Optional

import requests
import pandas as pd
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type

from sqlalchemy import create_engine, text

API_BASE_URL = os.getenv("API_BASE_URL", "https://jsonplaceholder.typicode.com")
API_TOKEN = os.getenv("API_TOKEN", "")

MYSQL_HOST = os.getenv("MYSQL_HOST", "db")
MYSQL_PORT = int(os.getenv("MYSQL_PORT", "3306"))
MYSQL_DATABASE = os.getenv("MYSQL_DATABASE", "demo")
MYSQL_USER = os.getenv("MYSQL_USER", "demo_user")
MYSQL_PASSWORD = os.getenv("MYSQL_PASSWORD", "demo_pass")

print("API_BASE_URL:", API_BASE_URL)
print("MYSQL:", f"{MYSQL_USER}@{MYSQL_HOST}:{MYSQL_PORT}/{MYSQL_DATABASE}")


Cell 2 — MySQL Engine (SQLAlchemy)

In [None]:
# Using PyMySQL driver
mysql_url = (
    f"mysql+pymysql://{MYSQL_USER}:{MYSQL_PASSWORD}"
    f"@{MYSQL_HOST}:{MYSQL_PORT}/{MYSQL_DATABASE}"
)

engine = create_engine(mysql_url, pool_pre_ping=True)

# Quick connectivity check
with engine.connect() as conn:
    conn.execute(text("SELECT 1"))
print("✅ Connected to MySQL")


Cell 3 — API Client Helpers (with retries)

In [None]:
class ApiError(RuntimeError):
    pass

def build_headers() -> Dict[str, str]:
    headers = {"Accept": "application/json"}
    if API_TOKEN:
        # Common pattern; adjust to your API (e.g., "Token", "x-api-key", etc.)
        headers["Authorization"] = f"Bearer {API_TOKEN}"
    return headers

@retry(
    reraise=True,
    stop=stop_after_attempt(5),
    wait=wait_exponential(multiplier=1, min=1, max=10),
    retry=retry_if_exception_type((requests.RequestException, ApiError)),
)
def get_json(path: str, params: Optional[Dict[str, Any]] = None) -> Any:
    url = f"{API_BASE_URL.rstrip('/')}/{path.lstrip('/')}"
    resp = requests.get(url, headers=build_headers(), params=params, timeout=30)
    if resp.status_code >= 400:
        raise ApiError(f"API error {resp.status_code}: {resp.text[:300]}")
    return resp.json()


Cell 4 — Read Data from the API

In [None]:
data = get_json("/posts")  # list[dict]
print(type(data), len(data))
print(data[0])


Cell 5 — Normalize to a DataFrame

In [None]:
df = pd.json_normalize(data)
df.head()

Cell 6 — Basic Data Quality Checks (prototype)

In [None]:
assert not df.empty, "API returned no data"
assert "id" in df.columns, "Expected a primary key field 'id'"

# Example: enforce types
df["id"] = df["id"].astype(int)

df.isna().sum().head(20)

Cell 7 — Create a Target Table (idempotent)

In [None]:
create_sql = """
CREATE TABLE IF NOT EXISTS api_posts (
  id INT PRIMARY KEY,
  userId INT,
  title TEXT,
  body TEXT,
  ingested_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
"""

with engine.begin() as conn:
    conn.execute(text(create_sql))

print("✅ Ensured table exists: api_posts")


Cell 8 — Load into MySQL (Upsert Pattern)

In [None]:
from sqlalchemy import Table, MetaData, Column, Integer, Text, DateTime
from sqlalchemy.dialects.mysql import insert as mysql_insert

metadata = MetaData()

api_posts = Table(
    "api_posts",
    metadata,
    Column("id", Integer, primary_key=True),
    Column("userId", Integer),
    Column("title", Text),
    Column("body", Text),
    # ingested_at is handled by default in DDL
)

records = df[["id", "userId", "title", "body"]].to_dict(orient="records")

stmt = mysql_insert(api_posts).values(records)
upsert_stmt = stmt.on_duplicate_key_update(
    userId=stmt.inserted.userId,
    title=stmt.inserted.title,
    body=stmt.inserted.body,
)

with engine.begin() as conn:
    conn.execute(upsert_stmt)

print(f"✅ Upserted {len(records)} records into api_posts")


Cell 9 — Verify Load

In [None]:
with engine.connect() as conn:
    rows = conn.execute(text("SELECT COUNT(*) AS c FROM api_posts")).mappings().one()
    sample = conn.execute(text("SELECT * FROM api_posts ORDER BY id LIMIT 5")).mappings().all()

rows, sample[:2]
