
# CSC API Pipeline (Flattened for Fabric)

This notebook contains the full flattened logic of the CSC API CLI version:
- Config loading and environment setup
- SQL Server connectivity
- OAuth2 token authentication
- JSON payload generation (full or partial mode)
- API submission and response handling
- Optional diagnostics (DB test, schema check, dry-run diff and smoke_test as run_smoke() <-- this set to run by default)

This entire notebook(all cells) can be safely run out of the box with the existing default settings. 
Only the fake payload smoke-test will run until # main() in section 'Run Submission' is uncommented.  

> Designed for standalone use in Microsoft Fabric (no external .py dependencies)



## Configuration
File: `config.py`

In [22]:
# api_pipeline/config.py

import os
from dotenv import load_dotenv
from pathlib import Path

# Dev
# Try .env first, fallback to env.txt
# Robust load: notebook and CLI
for candidate in [".env", "env.txt"]:
    env_path = Path(candidate)
    if env_path.exists():
        load_dotenv(env_path)
        print(f"[config] Loaded environment from: {env_path}")
        break
else:
    print("[config] No .env or env.txt found – using system environment variables only")

# Config flags
DEBUG = os.getenv("DEBUG", "false").strip().lower() == "true"
USE_PARTIAL_PAYLOAD = os.getenv("USE_PARTIAL_PAYLOAD", "true").strip().lower() == "true"
TABLE_NAME = os.getenv("TABLE_NAME", "ssd_api_data_staging_anon")

# debug current mode
print(f"[config] DEBUG = {DEBUG}")
print(f"[config] USE_PARTIAL_PAYLOAD = {USE_PARTIAL_PAYLOAD}")
print(f"[config] TABLE_NAME = {TABLE_NAME}")

      
    

# --- Required individual components ---
USER_SERVER = os.getenv("USER_SERVER")
USER_DATABASE = os.getenv("USER_DATABASE")
API_ENDPOINT = os.getenv("API_ENDPOINT")
LA_CODE = os.getenv("LA_CODE")


# --- Derived vals ---
# fallback: try SQL_CONN_STR if present, else build from parts
_sql_conn_str_env = os.getenv("SQL_CONN_STR")
if _sql_conn_str_env:
    SQL_CONN_STR = _sql_conn_str_env
else:
    SQL_CONN_STR = f"DRIVER={{ODBC Driver 17 for SQL Server}};SERVER={USER_SERVER};DATABASE={USER_DATABASE};Trusted_Connection=yes;"

API_ENDPOINT_LA = f"{API_ENDPOINT}/children_social_care_data/{LA_CODE}/children"

# --- Other config ---
CLIENT_ID = os.getenv("CLIENT_ID")
CLIENT_SECRET = os.getenv("CLIENT_SECRET")
SCOPE = os.getenv("SCOPE")
TOKEN_ENDPOINT = os.getenv("TOKEN_ENDPOINT")
SUPPLIER_KEY = os.getenv("SUPPLIER_KEY")
BATCH_SIZE = int(os.getenv("BATCH_SIZE", 100))



# # ----------- Config OVERIDE -----------
# # D2I Overide block

# # Within 'Subscription details' block, ensure that your 'Status' has been set to 'Active' by the DfE

# # https://pp-find-and-use-an-api.education.gov.uk/
# $api_endpoint = "https://pp-api.education.gov.uk/children-in-social-care-data-receiver-test/1" # 'Base URL'


# # From the 'subscription key' block
# $supplier_key = "6736ad89172548dcaa3529896892ab3f" # 'Primary key' or 'Secondary key'

# # From the 'Native OAuth Application-flow' block
# $token_endpoint = "https://login.microsoftonline.com/cc0e4d98-b9f9-4d58-821f-973eb69f19b7/oauth2/v2.0/token" # 'OAuth token endpoint'
# $client_id = "fe28c5a9-ea4f-4347-b419-189eb761fa42"  # 'OAuth Client ID'
# $client_secret = "mR_8Q~G~Wdm2XQ2E8-_hr0fS5FKEns4wHNLtbdw7" # 'Primary key' or 'Secondary key'
# $scope = "api://children-in-social-care-data-receiver-test-1-live_6b1907e1-e633-497d-ac74-155ab528bc17/.default" # 'OAuth Scope'

# $la_code        = 845   # e.g. 846 etc.
# # ----------- Config OVERIDE END -----------




# --- Enforced json structure elements
REQUIRED_FIELDS = [
    "la_child_id",
    "mis_child_id",
    "child_details"
]

ALLOWED_PURGE_BLOCKS = [
    "social_care_episodes",
    "child_protection_plans",
    "child_in_need_plans",
    "health_and_wellbeing",
    "care_leavers"
]



# --- Env var validation helper ---
def validate_env_vars(required_vars):
    missing = [var for var in required_vars if not os.getenv(var)]
    if missing:
        raise EnvironmentError(f"Missing required environment variables: {', '.join(missing)}")


[config] Loaded environment from: env.txt
[config] DEBUG = True
[config] USE_PARTIAL_PAYLOAD = True
[config] TABLE_NAME = ssd_api_data_staging_anon


## Utility Functions
File: `utils.py`

In [23]:
# api_pipeline/utils.py

import time
import tracemalloc
import pyodbc  # for notebook/test_db_connection snippet(end block)

# --- Optional memory_profiler support ----------------------------------------
# If memory_profiler not installed (e.g., in PyInstaller EXE), degrade
# gracefully and omit "Mem delta" from benchmark output
try:
    from memory_profiler import memory_usage as _memory_usage
except Exception:
    _memory_usage = None

def _safe_memory_usage_first_sample():
    """
    Returns the first memory usage sample in MiB if memory_profiler is available,
    otherwise returns None. Never raises.
    """
    if _memory_usage is None:
        return None
    try:
        samples = _memory_usage()
        return samples[0] if samples else None
    except Exception:
        return None

# # --- CLI / .exe fallback config (not needed in notebook)
# try:
#     # running as proper package
#     from .config import DEBUG, USE_PARTIAL_PAYLOAD
# except ImportError:
#     try:
#         # Fallback -running loose scripts
#         from config import DEBUG, USE_PARTIAL_PAYLOAD
#     except Exception:
#         # Last‑ditch -defaults avoid import‑time crashes in --help or EXE smoke tests
#         DEBUG = False
#         USE_PARTIAL_PAYLOAD = False



# --- Logging / mode helpers ---------------------------------------------------
def log_debug(msg: str):
    if DEBUG:
        print(msg)

def announce_mode():
    print("Running in development mode" if DEBUG else "▶ Running in production mode")
    print("Partial delta payload mode enabled" if USE_PARTIAL_PAYLOAD else "▶ Full non-delta payload mode only")

# --- Benchmark decorator ------------------------------------------------------
def benchmark_section(label: str):
    """
    Decorator to benchmark a function section.
    - Always measures elapsed time and tracemalloc peak memory.
    - If memory_profiler is available, also reports process "Mem delta" in MiB.
    """
    def decorator(func):
        def wrapper(*args, **kwargs):
            print(f"Starting {label}...")
            start = time.time()
            tracemalloc.start()

            mem_before = _safe_memory_usage_first_sample()

            result = func(*args, **kwargs)

            mem_after = _safe_memory_usage_first_sample()
            current, peak = tracemalloc.get_traced_memory()
            tracemalloc.stop()

            elapsed = time.time() - start

            if DEBUG:
                peak_mib = peak / (1024 ** 2)
                if mem_before is not None and mem_after is not None:
                    mem_delta = mem_after - mem_before
                    print(f"Finished {label}: {elapsed:.2f}s | Mem delta: {mem_delta:.2f} MiB | Peak mem: {peak_mib:.2f} MiB")
                else:
                    # memory_profiler not present (or failed): omit Mem delta
                    print(f"Finished {label}: {elapsed:.2f}s | Peak mem: {peak_mib:.2f} MiB")

            return result
        return wrapper
    return decorator

# ----------------------------------------------------------------------
# # Test DB connection
# # drop-in block to .ipynb or main()
# import sys
# from .config import SQL_CONN_STR  # or: from config import SQL_CONN_STR
# from utils import test_db_connection
# if not test_db_connection(SQL_CONN_STR):
#     sys.exit(1)  # Stop


## Authentication
File: `auth.py`

In [24]:
# api_pipeline/auth.py
import requests

# from .config import CLIENT_ID, CLIENT_SECRET, SCOPE, TOKEN_ENDPOINT
# from .utils import log_debug

# # ---- local imports with fallback for notebook/debug use ----
# try:
#     from .config import CLIENT_ID, CLIENT_SECRET, SCOPE, TOKEN_ENDPOINT
#     from .utils import log_debug
# except ImportError:
#     from config import CLIENT_ID, CLIENT_SECRET, SCOPE, TOKEN_ENDPOINT
#     from utils import log_debug


def get_oauth_token():
    payload = {
        "client_id": CLIENT_ID,
        "client_secret": CLIENT_SECRET,
        "scope": SCOPE,
        "grant_type": "client_credentials"
    }

    try:
        response = requests.post(TOKEN_ENDPOINT, data=payload)
        response.raise_for_status()
        token = response.json()["access_token"]
        print("OAuth token retrieved.")
        
        log_debug(f"TOKEN (first 10 chars): {token[:10]}...")
        
        return token
    except Exception as e:
        print(f"OAuth Token Error: {e}")
        return None


## Database Access
File: `db.py`

In [25]:
import json

# from .config import TABLE_NAME, USE_PARTIAL_PAYLOAD
# from .payload import generate_partial_payload, generate_deletion_payload
# from .utils import benchmark_section

# # ---- local imports with fallback for notebook/debug use ----
# try:
#     from .config import TABLE_NAME, USE_PARTIAL_PAYLOAD
#     from .payload import generate_partial_payload, generate_deletion_payload
#     from .utils import benchmark_section
# except ImportError:
#     from config import TABLE_NAME, USE_PARTIAL_PAYLOAD
#     from payload import generate_partial_payload, generate_deletion_payload
#     from utils import benchmark_section


# ---- DATA ----
# PEP 484 signature:
# def update_partial_payloads(conn: pyodbc.Connection) -> None:
@benchmark_section("update_partial_payloads()")
def update_partial_payloads(conn):
    """
    Update table with generated partial JSON payloads

    Args:
        conn: Open database connection
    """
    cursor = conn.cursor()

    # Select rows with both current, previous JSON
    cursor.execute(f"""
        SELECT person_id, row_state, json_payload, previous_json_payload 
        FROM {TABLE_NAME}
        WHERE 
            json_payload IS NOT NULL
            AND previous_json_payload IS NOT NULL
            AND row_state <> 'unchanged'
    """)

    updates = []

    # ---- DIAGNOSTIC COUNTERS ----
    total_checked = 0
    skipped_due_to_state = 0
    skipped_due_to_equal_json = 0
    new_record_count = 0
    deletion_count = 0
    delta_count = 0
    error_count = 0

    # ---------------------------------------

    for person_id, row_state, curr_raw, prev_raw in cursor.fetchall():
        total_checked += 1
        try:
            # ---- EARLY EXIT: skip if new or unchanged ----
            if row_state.lower() == "unchanged":
                skipped_due_to_state += 1
                continue

            # ---- EARLY EXIT: skip if identical JSON strings ----
            if curr_raw == prev_raw:
                skipped_due_to_equal_json += 1
                continue

            curr = json.loads(curr_raw)  # Parse current JSON
            prev = json.loads(prev_raw)  # Parse previous JSON

            # Generate appropriate payload by row state
            if row_state.lower() == "deleted":
                partial = generate_deletion_payload(prev)
                deletion_count += 1
            else:
                partial = generate_partial_payload(curr, prev)
                delta_count += 1

            # Serialise partial and escape quotes for SQL
            json_out = json.dumps(partial, separators=(',', ':'), ensure_ascii=False).replace("'", "''")
            updates.append((json_out, person_id))

        except Exception as e:
            print(f"Error for {person_id}: {e}")
            error_count += 1

    # Apply updates to db
    for json_out, pid in updates:
        cursor.execute(f"""
            UPDATE {TABLE_NAME}
            SET partial_json_payload = ?
            WHERE person_id = ?
        """, json_out, pid)

    conn.commit()
    print(f"Updated {len(updates)} partial_json_payload records")

    # ---- DIAGS OUTPUT ----
    print(f"[DIAG] Checked: {total_checked}")
    print(f"[DIAG] Skipped (state): {skipped_due_to_state}")
    print(f"[DIAG] Skipped (identical JSON): {skipped_due_to_equal_json}")
    print(f"[DIAG] Deleted payloads: {deletion_count}")
    print(f"[DIAG] Delta payloads: {delta_count}")
    print(f"[DIAG] Errors: {error_count}")
    # --------------------------------------



# PEP 484 signature:
# def get_pending_records(cursor: pyodbc.Cursor) -> List[Dict[str, Any]]:
@benchmark_section("get_pending_records()")  # Performance monitor
def get_pending_records(cursor):
    """
    Fetch pending/error records with non-empty payload

    Args:
        cursor: Active database cursor

    Returns:
        List of records with parsed JSON payload.
    """
    col = "partial_json_payload" if USE_PARTIAL_PAYLOAD else "json_payload"

    # Select valid rows by status and payload
    cursor.execute(f"""
        SELECT person_id, {col}
        FROM {TABLE_NAME}
        WHERE submission_status IN ('pending', 'error')
        AND {col} IS NOT NULL AND LTRIM(RTRIM({col})) <> ''
    """)

    results = []

    for pid, payload in cursor.fetchall():
        try:
            results.append({
                "person_id": pid,
                "json": json.loads(payload)  # Parse JSON safely
            })
        except:
            print(f"Skipping invalid JSON for person_id {pid}")

    return results



# ---- DB UPDATES ----
# PEP 484 signature:
# def update_api_success(cursor: pyodbc.Cursor, person_id: str, uuid: str, timestamp: str) -> None:
@benchmark_section("update_api_success()")  # Performance monitor
def update_api_success(cursor, person_id, uuid, timestamp):
    """
    Mark record as sent with API response and timestamp

    Args:
        cursor: Active database cursor
        person_id: Person identifier
        uuid: API response reference
        timestamp: Submission timestamp
    """
    cursor.execute(f"""
        UPDATE {TABLE_NAME}
        SET submission_status='sent',
            api_response=?,
            submission_timestamp=?,
            previous_hash=current_hash,
            previous_json_payload=json_payload,
            row_state='unchanged'
        WHERE person_id = ?
    """, uuid, timestamp, person_id)



# PEP 484 signature:
# def update_api_failure(cursor: pyodbc.Cursor, person_id: str, message: str) -> None:
@benchmark_section("update_api_failure()")  # Performance monitor
def update_api_failure(cursor, person_id, message):
    """
    Mark record as failed, store API error message

    Args:
        cursor: Active database cursor
        person_id: Person identifier
        message: Error message from API or client
    """
    cursor.execute(f"""
        UPDATE {TABLE_NAME}
        SET submission_status='error',
            api_response=?
        WHERE person_id = ?
    """, message[:500], person_id)  # Truncate to max allowed size



## Payload Builder
File: `payload.py`

In [26]:
# api_pipeline/payload.py

import json

# from .config import REQUIRED_FIELDS, ALLOWED_PURGE_BLOCKS
# from .utils import benchmark_section

# # ---- local imports with fallback for notebook/debug use ----
# try:
#     from .config import REQUIRED_FIELDS, ALLOWED_PURGE_BLOCKS
#     from .utils import benchmark_section
# except ImportError:
#     from config import REQUIRED_FIELDS, ALLOWED_PURGE_BLOCKS
#     from utils import benchmark_section





# [DIAG] Add diagnostics
from time import perf_counter  # [DIAG]
_recursive_diff_call_count = 0  # [DIAG]
_recursive_diff_total_time = 0  # [DIAG]

_prune_call_count = 0  # [DIAG]
_prune_total_time = 0  # [DIAG]


@benchmark_section("generate_partial_payload()")  # Performance monitor
def generate_partial_payload(current, previous):
    """
    Create partial payload with required and changed fields

    Args:
        current: Current full JSON payload
        previous: Previous full JSON payload

    Returns:
        Partial payload with required identifiers and detected changes
    """
    # Copy required fields
    partial = {k: current[k] for k in REQUIRED_FIELDS if k in current}

    # Compute structural differences
    diff = recursive_diff(current, previous)

    # Add new fields not already present
    for k, v in diff.items():
        if k not in partial:
            partial[k] = v

    return partial


@benchmark_section("generate_deletion_payload()")  # Performance monitor
def generate_deletion_payload(previous):
    """
    Create deletion payload using identifiers and purge flag

    Args:
        previous: Previous full JSON payload

    Returns:
        Minimal payload with identifiers and deletion marker
    """
    return {
        "la_child_id": previous.get("la_child_id"),  # Unique child identifier
        "mis_child_id": previous.get("mis_child_id"),  # MIS identifier
        "purge": True  # Purge signal for deletion
    }


@benchmark_section("recursive_diff()")  # Performance monitor
def recursive_diff(curr, prev):
    # [DIAG] Start timing and count
    global _recursive_diff_call_count, _recursive_diff_total_time  # [DIAG]
    _recursive_diff_call_count += 1  # [DIAG]
    _start = perf_counter()  # [DIAG]

    if isinstance(curr, dict) and isinstance(prev, dict):
        diff = {}

        for key in curr:
            # Skip if key unchanged
            if key not in prev or curr[key] != prev[key]:

                # Recurse into nested dicts
                if isinstance(curr[key], dict) and isinstance(prev.get(key), dict):
                    nested = recursive_diff(curr[key], prev[key])
                    if nested:
                        diff[key] = nested  # Include only if changes present

                # Recurse into lists
                elif isinstance(curr[key], list) and isinstance(prev.get(key), list):
                    if curr[key] != prev[key]:
                        # Pass parent key to control 'purge' inclusion
                        diff[key] = prune_unchanged_list(
                            curr[key], prev[key], parent_key=key
                        )

                # Handle scalars or mismatched types
                else:
                    diff[key] = curr[key]

        _recursive_diff_total_time += perf_counter() - _start  # [DIAG]
        return diff  # Return dict of differences

    # Return scalar diff, or empty if no change
    result = {} if curr == prev else curr
    _recursive_diff_total_time += perf_counter() - _start  # [DIAG]
    return result


@benchmark_section("prune_unchanged_list()")  # Performance monitor
def prune_unchanged_list(curr_list, prev_list, parent_key=None):
    # [DIAG] Start timing and count
    global _prune_call_count, _prune_total_time  # [DIAG]
    _prune_call_count += 1  # [DIAG]
    _start = perf_counter()  # [DIAG]

    result = []

    for curr_item in curr_list:
        matched_prev = None

        # Detect ID key
        id_key = next((k for k in curr_item if k.endswith("_id")), None)

        if id_key:
            # Find match in previous by ID
            for prev_item in prev_list:
                if prev_item.get(id_key) == curr_item.get(id_key):
                    matched_prev = prev_item
                    break

        if matched_prev:
            # Diff current item against matched previous
            item_diff = recursive_diff(curr_item, matched_prev)

            # Always retain ID key
            if id_key:
                item_diff[id_key] = curr_item[id_key]

            if item_diff:
                # Set 'purge' flag only for allowed blocks
                if parent_key in ALLOWED_PURGE_BLOCKS:
                    item_diff["purge"] = False
                result.append(item_diff)

        else:
            # Unmatched: treat as new item
            result.append(curr_item)

    _prune_total_time += perf_counter() - _start  # [DIAG]
    return result


# [DIAG] summarise usage and timings
def print_diff_stats():
    print(f"\n[DIAG] recursive_diff() calls: {_recursive_diff_call_count}")
    print(f"[DIAG] Total time in recursive_diff(): {_recursive_diff_total_time:.2f}s")
    if _recursive_diff_call_count:
        print(f"[DIAG] Avg time per recursive_diff(): {_recursive_diff_total_time / _recursive_diff_call_count:.6f}s")

    print(f"\n[DIAG] prune_unchanged_list() calls: {_prune_call_count}")
    print(f"[DIAG] Total time in prune_unchanged_list(): {_prune_total_time:.2f}s")
    if _prune_call_count:
        print(f"[DIAG] Avg time per prune_unchanged_list(): {_prune_total_time / _prune_call_count:.6f}s")




## API Submission
File: `api.py`

In [27]:
# api_pipeline/api.py
import json
import re
import time
from datetime import datetime
import requests

# from .config import BATCH_SIZE, API_ENDPOINT_LA
# from .db import update_api_success, update_api_failure
# from .utils import benchmark_section, log_debug


# # ---- local imports with fallback for notebook/debug use ----
# try:
#     from .config import BATCH_SIZE, API_ENDPOINT_LA
#     from .db import update_api_success, update_api_failure
#     from .utils import benchmark_section, log_debug
# except ImportError:
#     from config import BATCH_SIZE, API_ENDPOINT_LA
#     from db import update_api_success, update_api_failure
#     from utils import benchmark_section, log_debug


# ---- API ----
# PEP 484 signature:
# def process_batches(records: List[Dict[str, Any]], headers: Dict[str, str], conn: pyodbc.Connection, max_retries: int = 3) -> None:
@benchmark_section("process_batches()")  # Performance monitor
def process_batches(records, headers, conn, max_retries=3):
    """
    Submit payloads in batches to API with retry logic

    Args:
        records: List of dicts with 'person_id' and parsed JSON
        headers: HTTP headers for API call
        conn: Open database connection
        max_retries: Retry count before marking as failure
    """
    total = len(records)
    cursor = conn.cursor()

    for i in range(0, total, BATCH_SIZE):
        batch = records[i:i + BATCH_SIZE]


        log_debug(f"Processing batch {i + 1} to {i + len(batch)} of {total}")  # DEBUG
        
        payload = [r["json"] for r in batch]
        payload_str = json.dumps(payload)

        retries = 0
        retry_delay = 5  # Seconds

        while retries < max_retries:
            try:
                # Submit batch to API
                resp = requests.post(API_ENDPOINT_LA, headers=headers, data=payload_str)
                raw_text = resp.text.strip()

                if resp.status_code == 200:
                    try:
                        # Parse JSON response
                        response_items = json.loads(raw_text)
                        if not isinstance(response_items, list):
                            print("Invalid API response: expected list, got", type(response_items).__name__)
                            for rec in batch:
                                update_api_failure(cursor, rec["person_id"], "Invalid JSON structure from API")
                            conn.commit()
                            break
                    except Exception:
                        # Response invalid or unreadable
                        print("Failed to parse JSON response. Logging all as failed.")
                        for rec in batch:
                            update_api_failure(cursor, rec["person_id"], f"Invalid JSON response: {raw_text}")
                        conn.commit()
                        break

                    if len(response_items) == len(batch):
                        for rec, item in zip(batch, response_items):
                            try:
                                # Parse UUID and timestamp
                                date_part, time_part, uuid = item.split("_")
                                timestamp = datetime.strptime(f"{date_part} {time_part}", "%Y-%m-%d %H:%M:%S.%f")
                            except Exception:
                                # Fallback to partial match and current time
                                uuid = item.split("_")[-1]
                                timestamp = datetime.now()

                            update_api_success(cursor, rec["person_id"], uuid, timestamp)

                        conn.commit()
                        break  # Exit retry loop

                    else:
                        # Response count mismatch
                        print(f"Mismatched response count to sent records: expected {len(batch)}, got {len(response_items)}")
                        for rec in batch:
                            update_api_failure(cursor, rec["person_id"], "Response count mismatch")
                        conn.commit()
                        break

                else:
                    status = resp.status_code
                    detail = resp.text

                    # Map known statuses to explanation
                    msg_map = {
                        204: "No content",
                        400: "Malformed Payload",
                        401: "Invalid API token",
                        403: "API access disallowed",
                        413: "Payload exceeds limit",
                        429: "Rate limit exceeded"
                    }

                    api_msg = msg_map.get(status, f"Unexpected Error: {status}")
                    print(f"API error {status}: {api_msg}")
                    print("API response (truncated):", detail[:250])

                    retryable = status in [401, 403, 429]

                    if not retryable or retries == max_retries - 1:
                        # Final failure
                        handle_batch_failure(cursor, batch, status, api_msg, detail)
                        conn.commit()
                        break
                    else:
                        # Wait and retry
                        print(f"Retrying in {retry_delay}s (retry {retries + 1}/{max_retries})...")
                        time.sleep(retry_delay)
                        retry_delay = min(30, retry_delay * 2)
                        retries += 1

            except Exception as e:
                # Network or unexpected exception
                print(f"Request failed: {e}")
                for rec in batch:
                    update_api_failure(cursor, rec["person_id"], str(e))
                conn.commit()
                break


# PEP 484 signature:
# def handle_batch_failure(cursor: pyodbc.Cursor, batch: List[Dict[str, Any]], status_code: int, error_message: str, error_detail: str) -> None:
@benchmark_section("handle_batch_failure()")  # Performance monitor
def handle_batch_failure(cursor, batch, status_code, error_message, error_detail):
    """
    Handle API batch failure by logging error messages per record

    Args:
        cursor: Active database cursor
        batch: List of records submitted in batch
        status_code: HTTP status returned by API
        error_message: General API error description
        error_detail: Raw response detail from API
    """
    # Extract failing record indexes from API response
    index_matches = re.findall(r"\[(\d+)\]", error_detail)
    failed_indexes = set(index_matches)

    for i, record in enumerate(batch):
        person_id = record["person_id"]

        # Assign message based on match to error index
        if str(i) in failed_indexes:
            msg = f"API error ({status_code}): {error_message} — {error_detail}"
        else:
            msg = f"API error ({status_code}): {error_message} — Record valid but batch failed"

        update_api_failure(cursor, person_id, msg)
        print(f"Logged API error for person_id {person_id}: {msg}")


In [31]:
# api_pipeline/test.py

import json
import time
import requests
import pyodbc

# from .config import (
#     SQL_CONN_STR,
#     CLIENT_ID,
#     CLIENT_SECRET,
#     SCOPE,
#     TOKEN_ENDPOINT,
#     SUPPLIER_KEY,
#     API_ENDPOINT_LA,
# )

# ----------------- helpers -----------------

def _print_header(title: str):
    print("\n" + "=" * 80)
    print(title)
    print("=" * 80)

def _ok(label: str):
    print(f"[OK] {label}")

def _fail(label: str, err):
    print(f"[FAIL] {label}: {err}")

# ----------------- public tests -----------------

def test_db_connection():
    """
    Verifies SQL connectivity using configured SQL_CONN_STR
    Uses harmless SELECT 1. Does not touch staging tables
    Returns True/False
    """
    _print_header("Database connectivity (test_db_connection)")
    try:
        with pyodbc.connect(SQL_CONN_STR, timeout=5) as conn:
            with conn.cursor() as cur:
                cur.execute("SELECT 1")
                cur.fetchone()
        _ok("Database connection successful (SELECT 1)")
        return True
    except Exception as e:
        _fail("Database connection failed", e)
        return False


def test_schema():
    """
    Checks essential columns exist in staging table
    This is *advisory* and does not change data
    Returns True/False
    """
    _print_header("Schema check (test_schema)")
    table_name = "ssd_api_data_staging_anon"
    expected = ["id", "json_payload", "partial_json_payload", "submission_status"]
    recommended = ["row_state", "previous_json_payload"]

    try:
        with pyodbc.connect(SQL_CONN_STR, timeout=10) as conn:
            cur = conn.cursor()
            cur.execute(f"SELECT TOP 0 * FROM {table_name}")
            columns = [column[0] for column in cur.description]

        missing = [col for col in expected if col not in columns]
        if missing:
            print(f"Missing REQUIRED columns in {table_name}: {', '.join(missing)}")
            return False
        _ok("Required columns present: " + ", ".join(expected))

        missing_recommended = [col for col in recommended if col not in columns]
        if missing_recommended:
            print("Note: missing recommended columns: " + ", ".join(missing_recommended))
        else:
            _ok("Recommended columns present: " + ", ".join(recommended))

        return True
    except Exception as e:
        _fail("Schema check failed", e)
        return False


# ----------------- optional orchestrator -----------------

def run_smoke():
    """
    Composite 'smoke' run that chks:
      - DB connectivity (SELECT 1)
      - Token acquisition
      - Harmless POST to API_ENDPOINT_LA with dummy payload
      - Advisory schema check (won't block install if table doesn't exist yet)
    Returns True/False and output compact summary, also for CI or scheduled tasks
    """
    _print_header("Smoke run (no data required)")
    start = time.time()
    db_ok = test_db_connection()
    api_ok = _smoke_post_to_api()
    schema_ok = test_schema()

    overall = db_ok and api_ok and schema_ok
    duration = time.time() - start

    print("\nSummary:")
    print(f"  DB connectivity : {'PASS' if db_ok else 'FAIL'}")
    print(f"  API POST/Auth   : {'PASS' if api_ok else 'FAIL'}")
    print(f"  Schema advisory : {'PASS' if schema_ok else 'FAIL'}")
    print(f"Completed in {duration:.2f}s")

    return overall


def _smoke_post_to_api():
    """
    Acquires token and sends a dummy but valid POST to the API.
    This tests auth + POST body formatting without using real child data.
    """
    from config import LA_CODE

    print("Requesting token for smoke test...")
    token_data = {
        "grant_type": "client_credentials",
        "client_id": CLIENT_ID,
        "client_secret": CLIENT_SECRET,
        "scope": SCOPE
    }

    try:
        response = requests.post(TOKEN_ENDPOINT, data=token_data)
        response.raise_for_status()
        access_token = response.json().get("access_token")
    except Exception as e:
        print("Token acquisition failed:", e)
        print("Response body:", getattr(e.response, "text", "No response body"))
        return False

    headers = {
        "Authorization": f"Bearer {access_token.strip()}",
        "Content-Type": "application/json; charset=utf-8",
        "SupplierKey": str(SUPPLIER_KEY).strip(),
        "User-Agent": "Microsoft PowerShell/6.0.0"
    }


    # dummy payload from DfE 0.8.0 spec
    dummy_child = {
        "la_child_id": f"Child1234{LA_CODE}",  # make id la specific for tests
        "mis_child_id": "Supplier-Child-1234",
        "child_details": {
            "unique_pupil_number": "ABC0123456789",
            "former_unique_pupil_number": "DEF0123456789",
            "unique_pupil_number_unknown_reason": "UN1",
            "first_name": "John",
            "surname": "Doe",
            "date_of_birth": "2022-06-14",
            "expected_date_of_birth": "2022-06-14",
            "sex": "M",
            "ethnicity": "WBRI",
            "disabilities": ["HAND", "VIS"],
            "postcode": "AB12 3DE",
            "uasc_flag": True,
            "uasc_end_date": "2022-06-14",
            "purge": False
        },
        "purge": False
    }

    dummy_payload = [dummy_child]  # Must be array

    print("Sending dummy POST to API...")
    try:
        response = requests.post(API_ENDPOINT_LA, headers=headers, json=dummy_payload)
        print(f"Status: {response.status_code}")
        if response.status_code in [200, 400, 422]:
            print("API responded to dummy POST (as expected)")
            return True
        else:
            print("Unexpected API response:")
            print(response.text[:1000])
            return False
    except Exception as e:
        print("POST request failed:", e)
        return False


## Main Controller
File: `main.py`

In [29]:
# api_pipeline/main.py
# core pipeline execution logic: connecting to DB, generating payload, authenticating, and API sends
# 
import pyodbc
import json
from datetime import datetime
import re

# from .config import SQL_CONN_STR, USE_PARTIAL_PAYLOAD, SUPPLIER_KEY, API_ENDPOINT_LA
# from .auth import get_oauth_token
# from .db import update_partial_payloads, get_pending_records, update_api_success, update_api_failure
# from .api import process_batches
# from .utils import benchmark_section, log_debug, announce_mode


# # ---- local imports with fallback for notebook/debug use ----
# try:
#     from .config import SQL_CONN_STR, USE_PARTIAL_PAYLOAD, SUPPLIER_KEY, API_ENDPOINT_LA
#     from .auth import get_oauth_token
#     from .db import update_partial_payloads, get_pending_records, update_api_success, update_api_failure
#     from .api import process_batches
#     from .utils import benchmark_section, log_debug, announce_mode
# except ImportError:
#     from config import SQL_CONN_STR, USE_PARTIAL_PAYLOAD, SUPPLIER_KEY, API_ENDPOINT_LA
#     from auth import get_oauth_token
#     from db import update_partial_payloads, get_pending_records, update_api_success, update_api_failure
#     from api import process_batches
#     from utils import benchmark_section, log_debug, announce_mode


@benchmark_section("main()")
def main():

    announce_mode()
    log_debug(f"Connecting to DB using: {SQL_CONN_STR}")
    try:
        conn = pyodbc.connect(SQL_CONN_STR, timeout=10)
    except Exception as e:
        print(f"Database connection failed: {e}")
        log_debug(f"Failed to connect DB or timeout occured.")
        return

    if USE_PARTIAL_PAYLOAD:
        
            update_partial_payloads(conn)
            log_debug("Partial payloads updated.")
        
    token = get_oauth_token()
    if not token:
        print("No token retrieved. Exiting.")
        return

    headers = {
        "Authorization": f"Bearer {token.strip()}",
        "Content-Type": "application/json",
        "SupplierKey": str(SUPPLIER_KEY).strip(),
        "User-Agent": "Microsoft PowerShell/6.0.0"
    }

    
    # DEBUG
    # restrict output of secret(s) in console debug|logging    
    def format_header_value(key, value, mask_len=5):
        """Safe, readable debug formatting for headers."""
        k = str(key).lower()
        v = str(value)
    
        if k == "authorization":
            # match "<scheme> <credentials>" with any spacing, any case
            m = re.match(r'^\s*([A-Za-z]+)\s+(.+)\s*$', v)
            if m and m.group(1).lower() == "bearer":
                scheme = "Bearer"
                token = m.group(2)
                return f"{scheme} {token[:mask_len]}..."
            # if not Bearer or no space, mask first part anyway
            return f"{v[:mask_len]}..."
    
        if k == "supplierkey":
            return f"{v[:mask_len]}..."
        return v
    
    # DEBUG
    header_preview = "\n".join(f"    {k}: {format_header_value(k, v)}" for k, v in headers.items())
    log_debug("Headers preview:\n" + header_preview)
    log_debug(f"API endpoint: {API_ENDPOINT_LA}")
    log_debug("\nFetching pending records from DB...")
    
    cursor = conn.cursor()
    
    records = get_pending_records(cursor)
    log_debug(f"Fetched {len(records)} pending records.")  

    if not records:
        print("No pending records to send.")
        return

    print(f"Sending {len(records)} records...")
    log_debug("Beginning batch API submission...") 
    process_batches(records, headers, conn)

    conn.close()





## Optional: Diagnostics | Dry Run

In [33]:

# # Dry run: compare hashes, generate diff report (no submission)
# print_diff_stats()

# # DB connection test
# test_db_connection(SQL_CONN_STR)

# # Schema advisory check
# test_schema()

# Run smoke tests(incl the above) with fake payload
run_smoke()


[DIAG] recursive_diff() calls: 0
[DIAG] Total time in recursive_diff(): 0.00s

[DIAG] prune_unchanged_list() calls: 0
[DIAG] Total time in prune_unchanged_list(): 0.00s


## Run Submission

Uncomment the below to enable the main/full pipeline - by default this set to pull only from the api_data_staging_ANON table(as failsafe)

In [None]:
# main()

Starting main()...
Running in development mode
Partial delta payload mode enabled
Connecting to DB using: DRIVER={ODBC Driver 17 for SQL Server};SERVER=ESLLREPORTS04V;DATABASE=HDM_Local;Trusted_Connection=yes;
Starting update_partial_payloads()...
Updated 0 partial_json_payload records
[DIAG] Checked: 0
[DIAG] Skipped (state): 0
[DIAG] Skipped (identical JSON): 0
[DIAG] Deleted payloads: 0
[DIAG] Delta payloads: 0
[DIAG] Errors: 0
Finished update_partial_payloads(): 0.23s | Mem delta: 0.03 MiB | Peak mem: 0.15 MiB
Partial payloads updated.
OAuth token retrieved.
TOKEN (first 10 chars): eyJ0eXAiOi...
Headers preview:
    Authorization: Bearer eyJ0e...
    Content-Type: application/json
    SupplierKey: 6736a...
    User-Agent: Microsoft PowerShell/6.0.0
API endpoint: https://pp-api.education.gov.uk/children-in-social-care-data-receiver-test/1/children_social_care_data/845/children

Fetching pending records from DB...
Starting get_pending_records()...
Finished get_pending_records(): 0.23