In [9]:
# Installs

#%pip install minio

# 01 API Ingestion bronze

1. Load config.
2. Build endpoint URL.
3. Test connectivity.
4. Fetch paginated raw JSON pages.
5. Save raw JSON pages + manifest to bronze.


In [10]:
# Library imports
from datetime import datetime, timezone
import json
from pathlib import Path

import requests
import yaml
from minio import Minio
import io

# Prereq - check storage works

In [11]:
# Storage vars
MINIO_ENDPOINT = "localhost:9000"      # no scheme for minio sdk
MINIO_SECURE = False                  # we're using http locally
BUCKET = "incident-pipeline-test"
ENV_FILE = Path("../docker/.env")         # repo root relative

In [12]:
# Load env file for creds
def load_env_file(path: Path) -> dict:
    if not path.exists():
        raise FileNotFoundError(f"Missing env file: {path.resolve()}")
    env = {}
    for line in path.read_text(encoding="utf-8").splitlines():
        line = line.strip()
        if not line or line.startswith("#") or "=" not in line:
            continue
        k, v = line.split("=", 1)
        env[k.strip()] = v.strip()
    return env

In [13]:
# Assign creds to var
env = load_env_file(ENV_FILE)

client = Minio(
    MINIO_ENDPOINT,
    access_key=env["MINIO_ROOT_USER"],
    secret_key=env["MINIO_ROOT_PASSWORD"],
    secure=MINIO_SECURE,
)

# list buckets to prove connection
print("Buckets:", [b.name for b in client.list_buckets()])

Buckets: ['incident-pipeline', 'incident-pipeline-test', 'mlflow-artifacts']


## Step 1: Load config from YAML file

In [14]:
# Load config from YAML file
config_path = Path('../config/config.yaml')
with config_path.open('r', encoding='utf-8') as f:
    cfg = yaml.safe_load(f)

# Print config values for verification
source_cfg = cfg['source']
print('api_base_url:', source_cfg['api_base_url'])
print('incident_path:', source_cfg['incident_path'])
print('page_size:', source_cfg['page_size'])
print('max_records:', source_cfg['max_records'])
print('bronze:', cfg.get('storage', {}).get('bronze', '.local/data/bronze'))


api_base_url: https://incidents.j-hollands.workers.dev/
incident_path: api/now/table/incident
page_size: 1000
max_records: 10000
bronze: .local/data/bronze


## Step 2: Build endpoint URL

In [15]:
# Function to build the full API endpoint from config values
def build_endpoint_url(cfg: dict) -> str:
    source_cfg = cfg['source']
    api_base_url = source_cfg['api_base_url']
    incident_path = source_cfg['incident_path']
    return f"{api_base_url.rstrip('/')}/{incident_path.lstrip('/')}"

endpoint_url = build_endpoint_url(cfg)
endpoint_url


'https://incidents.j-hollands.workers.dev/api/now/table/incident'

## Step 3: Connectivity test

In [16]:
# Perform a test request to check connectivity and response structure
test_params = {
    'sysparm_limit': 1,
    'sysparm_offset': 0,
    'sysparm_display_value': 'true',
}
test_response = requests.get(endpoint_url, params=test_params, timeout=30)
print('Status code:', test_response.status_code)
test_response.raise_for_status()
test_payload = test_response.json()
print('Top-level keys:', list(test_payload.keys()))
print('Records returned:', len(test_payload.get('result', [])))


Status code: 200
Top-level keys: ['result']
Records returned: 1


## Step 4: Production functions

In [17]:
# Function to fetch all incident pages with pagination
def fetch_incident_pages(cfg: dict, timeout: int = 30) -> tuple[list[dict], int]:
    source_cfg = cfg['source']
    runtime_cfg = cfg.get('runtime', {})
    page_size = int(source_cfg['page_size'])
    max_records = int(source_cfg['max_records'])
    use_env_proxy = bool(runtime_cfg.get('use_env_proxy', False))
    endpoint_url = build_endpoint_url(cfg)

    session = requests.Session()
    session.trust_env = use_env_proxy

    pages = []
    offset = 0
    total_records = 0

    while total_records < max_records:
        remaining = max_records - total_records
        request_limit = min(page_size, remaining)
        params = {
            'sysparm_limit': request_limit,
            'sysparm_offset': offset,
            'sysparm_display_value': 'true',
        }

        response = session.get(endpoint_url, params=params, timeout=timeout)
        response.raise_for_status()
        payload = response.json()
        batch = payload.get('result', [])

        pages.append(payload)
        batch_count = len(batch)

        if batch_count == 0:
            break

        total_records += batch_count

        if batch_count < request_limit:
            break

        offset += request_limit

    return pages, total_records

# Save raw output to bronze storage, including manifest metadata
def save_raw_pages_to_bronze(pages: list[dict], cfg: dict, endpoint_url: str, total_records: int, minio_client, bucket: str, prefix_root: str = "bronze") -> str:
    # Create a timestamped folder prefix for this ingestion run.
    run_id = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
    run_prefix = f"{prefix_root.rstrip('/')}/incidents_raw/run_ts={run_id}"

    # Write each page as raw JSON
    for idx, payload in enumerate(pages, start=1):
        key = f"{run_prefix}/incidents_raw_page_{idx:03d}.json"
        data = json.dumps(payload, ensure_ascii=False).encode("utf-8")

        minio_client.put_object(
            bucket_name=bucket,
            object_name=key,
            data=io.BytesIO(data),
            length=len(data),
            content_type="application/json"
        )

    # Write manifest
    manifest = {
        "endpoint_url": endpoint_url,
        "run_id": run_id,
        "page_count": len(pages),
        "record_count": total_records,
        "saved_at_utc": datetime.now(timezone.utc).isoformat(),
        "format": "raw_json_pages",
        "storage": {"bucket": bucket, "prefix": run_prefix},
    }

    manifest_key = f"{run_prefix}/manifest.json"
    manifest_bytes = json.dumps(manifest, indent=2).encode("utf-8")

    minio_client.put_object(
        bucket_name=bucket,
        object_name=manifest_key,
        data=io.BytesIO(manifest_bytes),
        length=len(manifest_bytes),
        content_type="application/json",
    )

    return run_prefix


# Main function to run the ingestion process
def run_ingestion(minio_client, bucket: str = BUCKET, config_path: str = '../config/config.yaml') -> tuple[str, int, str]:
    with Path(config_path).open('r', encoding='utf-8') as f:
        cfg_local = yaml.safe_load(f)

    endpoint = build_endpoint_url(cfg_local)
    pages, record_count = fetch_incident_pages(cfg=cfg_local)
    run_prefix = save_raw_pages_to_bronze(
        pages=pages,
        cfg=cfg_local,
        endpoint_url=endpoint,
        total_records=record_count,
        minio_client=minio_client,
        bucket=bucket,
        prefix_root="bronze",
    )
    return run_prefix, record_count, endpoint


## Step 5: Demonstrate the raw JSON bronze output

In [18]:
# Run the ingestion from API
pages, record_count = fetch_incident_pages(cfg=cfg)

# Save the ouput to the storage location
run_prefix = save_raw_pages_to_bronze(
    pages=pages,
    cfg=cfg,
    endpoint_url=endpoint_url,
    total_records=record_count,
    minio_client=client,          
    bucket=BUCKET,
    prefix_root="bronze",
)

# Confirm record count written
print(f"Successfully ingested {record_count} records from endpoint {endpoint_url}")
print(f"Bronze raw JSON written to: s3://{BUCKET}/{run_prefix}")

# List a few file names to test
objs = list(client.list_objects(BUCKET, prefix=run_prefix, recursive=True))
print("First objects:", [o.object_name for o in objs[:5]])

Successfully ingested 10000 records from endpoint https://incidents.j-hollands.workers.dev/api/now/table/incident
Bronze raw JSON written to: s3://incident-pipeline-test/bronze/incidents_raw/run_ts=20260228T130029Z
First objects: ['bronze/incidents_raw/run_ts=20260228T130029Z/incidents_raw_page_001.json', 'bronze/incidents_raw/run_ts=20260228T130029Z/incidents_raw_page_002.json', 'bronze/incidents_raw/run_ts=20260228T130029Z/incidents_raw_page_003.json', 'bronze/incidents_raw/run_ts=20260228T130029Z/incidents_raw_page_004.json', 'bronze/incidents_raw/run_ts=20260228T130029Z/incidents_raw_page_005.json']


In [19]:
# Read and display the first record in the first file we wrote
page_obj = next(
    (o for o in objs if o.object_name.endswith(".json") and "page_" in o.object_name),
    None
)
if page_obj is None:
    raise RuntimeError("No page_*.json objects found under this run_prefix.")

page_key = page_obj.object_name
print("Reading object:", page_key)

resp = client.get_object(BUCKET, page_key)
try:
    payload = json.loads(resp.read().decode("utf-8"))
finally:
    resp.close()
    resp.release_conn()

records = payload.get("result") or []
if not records:
    raise RuntimeError(f"{page_key} had no records in payload['result'].")

row = records[0]

# Print example record attributes 
print("Sample sys_id:", row.get("sys_id"))
print("Sample number:", row.get("number"))
print("Sample state:", row.get("state"))
print("Sample active:", row.get("active"))
print("Sample sys_updated_on:", row.get("sys_updated_on"))

Reading object: bronze/incidents_raw/run_ts=20260228T130029Z/incidents_raw_page_001.json
Sample sys_id: d98abc2af1c9ef3b3071043ad7526b01
Sample number: INC1200000
Sample state: Closed
Sample active: false
Sample sys_updated_on: 17-11-2025 20:47:57
