# SPARC-P PubApps Deployment Notebook

This notebook is the runnable version of Step 4 for deploying SPARC-P to UF RC PubApps.

## Resource Profiles
- **HiPerGator (parallel jobs)**: 4 GPUs, 16 CPU cores
- **PubApps (serving)**: 1x L4 GPU (24GB), 2 CPU cores, 16GB RAM

## Before You Run
- You are on your PubApps VM via SSH
- You have your project account (`SPARCP`)
- Trained models are available from HiPerGator at `/blue/jasondeanarnold/SPARCP/trained_models`
- Podman + systemd user services are available

In [None]:
# 1. Configuration
import os
import subprocess
import textwrap
from pathlib import Path

PROJECT = os.environ.get("SPARC_PUBAPPS_PROJECT", "SPARCP")
PUBAPPS_ROOT = Path(f"/pubapps/{PROJECT}")
MODEL_DIR = PUBAPPS_ROOT / "models"
CONDA_ENV = PUBAPPS_ROOT / "conda_envs" / "sparc_backend"
BACKEND_DIR = PUBAPPS_ROOT / "backend"
RIVA_MODEL_DIR = PUBAPPS_ROOT / "riva_models"

BASE_PATH = os.environ.get("SPARC_BASE_PATH", "/blue/jasondeanarnold/SPARCP")
HIPERGATOR_SOURCE_MODELS = os.environ.get(
    "SPARC_HIPERGATOR_SOURCE_MODELS",
    f"{BASE_PATH}/trained_models",
)
PUBAPPS_HOST = os.environ.get("SPARC_PUBAPPS_HOST", "pubapps-vm.rc.ufl.edu")
PUBAPPS_SSH_USER = os.environ.get("SPARC_PUBAPPS_SSH_USER", PROJECT)
PUBAPP_ALLOWED_ORIGINS = os.environ.get(
    "SPARC_CORS_ALLOWED_ORIGINS",
    "https://hpvcommunicationtraining.com,https://hpvcommunicationtraining.org",
)
FIREBASE_CREDS_PATH = Path(
    os.environ.get("SPARC_FIREBASE_CREDS", str(PUBAPPS_ROOT / "config" / "firebase-credentials.json"))
)

# Resource constraints
HPG_MAX_GPUS = 4
HPG_MAX_CORES = 16
PUBAPPS_GPU = "L4 (24GB)"
PUBAPPS_CORES = 2
PUBAPPS_RAM_GB = 16
UVICORN_WORKERS = 1  # tuned for 2 CPU cores and 16GB RAM

print(f"Project: {PROJECT}")
print(f"PubApps root: {PUBAPPS_ROOT}")
print(f"Conda env: {CONDA_ENV}")
print(f"Backend dir: {BACKEND_DIR}")
print(f"HiPerGator source models: {HIPERGATOR_SOURCE_MODELS}")
print(f"PubApps host: {PUBAPPS_HOST}")
print(f"PubApps SSH user: {PUBAPPS_SSH_USER}")
print(f"Allowed CORS origins: {PUBAPP_ALLOWED_ORIGINS}")
print(f"Firebase creds path: {FIREBASE_CREDS_PATH}")
print(f"HiPerGator resources: {HPG_MAX_GPUS} GPUs, {HPG_MAX_CORES} cores")
print(f"PubApps resources: {PUBAPPS_GPU}, {PUBAPPS_CORES} cores, {PUBAPPS_RAM_GB}GB RAM")

In [None]:
# 2. Command runner (safe by default)
EXECUTE = False  # Set True to actually run shell commands

def run(cmd: str, check: bool = True):
    print(f"$ {cmd}")
    if not EXECUTE:
        print("(dry-run) command not executed\n")
        return None
    result = subprocess.run(["bash", "-lc", cmd], capture_output=True, text=True)
    if result.stdout:
        print(result.stdout)
    if result.stderr:
        print(result.stderr)
    if check and result.returncode != 0:
        raise RuntimeError(f"Command failed: {cmd}")
    print()
    return result

## 3. Transfer Models from HiPerGator
Run this on HiPerGator or from a hop host with access to both systems.

In [None]:
# 3.1 Render model sync command
rsync_cmd = textwrap.dedent(f"""
rsync -avz --progress \
  {HIPERGATOR_SOURCE_MODELS}/ \
  {PUBAPPS_SSH_USER}@{PUBAPPS_HOST}:{MODEL_DIR}/
""").strip()
print(rsync_cmd)

## 4. PubApps Environment Setup

In [None]:
# 4.1 Create required directories
run(f"mkdir -p {PUBAPPS_ROOT} {MODEL_DIR} {BACKEND_DIR} {RIVA_MODEL_DIR} {PUBAPPS_ROOT / 'conda_envs'}")

In [None]:
# 4.2 Create backend conda environment
run("conda --version", check=False)
run(f"cd {PUBAPPS_ROOT}; conda env create -f environment_backend.yml -p {CONDA_ENV}")
run(f"conda run -p {CONDA_ENV} python -c 'import fastapi,langgraph,torch; print(\"backend env ok\")'")

## 5. Deploy Riva with Podman + Quadlet

In [None]:
# 5.1 Write quadlet service for Riva
quadlet_dir = Path.home() / '.config/containers/systemd'
quadlet_dir.mkdir(parents=True, exist_ok=True)
quadlet_file = quadlet_dir / 'riva-server.container'
quadlet_content = textwrap.dedent(f"""
[Unit]
Description=SPARC-P Riva Speech Server
After=network-online.target

[Container]
Image=nvcr.io/nvidia/riva/riva-speech:2.16.0-server
ContainerName=riva-server
AddDevice=/dev/nvidia0
AddDevice=/dev/nvidiactl
AddDevice=/dev/nvidia-uvm
Volume={RIVA_MODEL_DIR}:/data:Z
PublishPort=50051:50051
Environment=NVIDIA_VISIBLE_DEVICES=all
Exec=/opt/riva/bin/riva_server --riva_model_repo=/data/models

[Service]
Restart=always
TimeoutStartSec=300

[Install]
WantedBy=default.target
""").strip()
quadlet_file.write_text(quadlet_content)
print(f"Wrote {quadlet_file}")
print(quadlet_content)

In [None]:
# 5.2 Pull image and enable Riva service
run("podman pull nvcr.io/nvidia/riva/riva-speech:2.16.0-server")
run("systemctl --user daemon-reload")
run("systemctl --user enable --now riva-server")
run("systemctl --user status riva-server --no-pager", check=False)

## 6. Create FastAPI Backend + Systemd Service

In [None]:
# 6.1 Write backend main.py (integration-ready)
main_py = BACKEND_DIR / 'main.py'
main_content = textwrap.dedent('''
import asyncio
import base64
import os
import logging
from typing import Any, Dict, Optional

from fastapi import Depends, FastAPI, Header, HTTPException, status
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from pydantic import BaseModel, Field
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
from peft import PeftModel
import riva.client
from nemoguardrails import LLMRails, RailsConfig
from presidio_analyzer import AnalyzerEngine
from presidio_anonymizer import AnonymizerEngine
import firebase_admin
from firebase_admin import credentials, firestore

MODEL_BASE_PATH = os.getenv("SPARC_MODEL_BASE_PATH", "{MODEL_DIR}")
RIVA_SERVER = os.getenv("SPARC_RIVA_SERVER", "localhost:50051")
FIREBASE_CREDS = os.getenv("SPARC_FIREBASE_CREDS", "{PUBAPPS_ROOT}/config/firebase-credentials.json")
GUARDRAILS_DIR = os.getenv("SPARC_GUARDRAILS_DIR", os.path.join(os.path.dirname(__file__), "guardrails"))

API_AUTH_ENABLED = os.getenv("SPARC_API_AUTH_ENABLED", "true").strip().lower() == "true"
API_KEY = os.getenv("SPARC_API_KEY", "")
CORS_ALLOWED_ORIGINS = [
    origin.strip()
    for origin in os.getenv("SPARC_CORS_ALLOWED_ORIGINS", "{PUBAPP_ALLOWED_ORIGINS}").split(",")
    if origin.strip()
]
CORS_ALLOW_CREDENTIALS = os.getenv("SPARC_CORS_ALLOW_CREDENTIALS", "false").strip().lower() == "true"
CORS_ALLOWED_METHODS = ["GET", "POST", "OPTIONS"]
CORS_ALLOWED_HEADERS = ["Content-Type", "X-API-Key", "Authorization"]
API_CONTRACT_VERSION = "v1"

if not FIREBASE_CREDS:
    raise RuntimeError("SPARC_FIREBASE_CREDS is empty; set Firebase service account path")
if not os.path.isfile(FIREBASE_CREDS):
    raise RuntimeError(
        f"Firebase credentials file not found: {{FIREBASE_CREDS}}. "
        "Set SPARC_FIREBASE_CREDS to a valid path."
    )

if not firebase_admin._apps:
    cred = credentials.Certificate(FIREBASE_CREDS)
    firebase_admin.initialize_app(cred)
db = firestore.client()

logger = logging.getLogger("sparc_backend")
if not logger.handlers:
    logging.basicConfig(level=logging.INFO)

try:
    presidio_analyzer = AnalyzerEngine()
    presidio_anonymizer = AnonymizerEngine()
    PRESIDIO_AVAILABLE = True
except Exception as presidio_init_error:
    presidio_analyzer = None
    presidio_anonymizer = None
    PRESIDIO_AVAILABLE = False
    logger.warning("Presidio initialization failed; using fail-closed redaction placeholders: %s", presidio_init_error)


def sanitize_for_storage(text: Optional[str]) -> str:
    if not text:
        return ""
    if not PRESIDIO_AVAILABLE:
        return "[REDACTED]"
    try:
        findings = presidio_analyzer.analyze(text=text, language="en")
        if not findings:
            return text
        return presidio_anonymizer.anonymize(text=text, analyzer_results=findings).text
    except Exception:
        return "[REDACTED]"

guardrails_engine = None
GUARDRAILS_REFUSAL = "I can only discuss topics related to HPV vaccination and clinical communication training."

def load_guardrails_runtime() -> None:
    global guardrails_engine
    try:
        rails_config = RailsConfig.from_path(GUARDRAILS_DIR)
        guardrails_engine = LLMRails(rails_config)
        logger.info("Guardrails runtime loaded from %s", GUARDRAILS_DIR)
    except Exception as guardrails_error:
        guardrails_engine = None
        logger.exception("Guardrails initialization failed: %s", sanitize_for_storage(str(guardrails_error)))

async def _run_guardrails(text: str) -> str:
    if guardrails_engine is None:
        raise RuntimeError("Guardrails runtime not initialized")
    messages = [{"role": "user", "content": text}]
    if hasattr(guardrails_engine, "generate_async"):
        result = await guardrails_engine.generate_async(messages=messages)
    else:
        result = guardrails_engine.generate(messages=messages)
    if isinstance(result, dict):
        return str(result.get("content", result))
    return str(result)

async def enforce_guardrails_input(user_text: str) -> Dict[str, Any]:
    if not user_text or not user_text.strip():
        return {"allowed": False, "text": GUARDRAILS_REFUSAL, "reason": "empty_input"}
    try:
        rails_output = await _run_guardrails(user_text)
        blocked = GUARDRAILS_REFUSAL.lower() in rails_output.lower()
        if blocked:
            return {"allowed": False, "text": GUARDRAILS_REFUSAL, "reason": "input_rails_blocked"}
        return {"allowed": True, "text": user_text, "reason": "input_rails_allowed"}
    except Exception as guardrails_error:
        logger.exception("Input guardrails failed: %s", sanitize_for_storage(str(guardrails_error)))
        return {"allowed": False, "text": GUARDRAILS_REFUSAL, "reason": "input_rails_error"}

async def enforce_guardrails_output(output_text: str) -> Dict[str, Any]:
    if not output_text or not output_text.strip():
        return {"allowed": False, "text": GUARDRAILS_REFUSAL, "reason": "empty_output"}
    try:
        rails_output = await _run_guardrails(output_text)
        blocked = GUARDRAILS_REFUSAL.lower() in rails_output.lower()
        if blocked:
            return {"allowed": False, "text": GUARDRAILS_REFUSAL, "reason": "output_rails_blocked"}
        return {"allowed": True, "text": output_text, "reason": "output_rails_allowed"}
    except Exception as guardrails_error:
        logger.exception("Output guardrails failed: %s", sanitize_for_storage(str(guardrails_error)))
        return {"allowed": False, "text": GUARDRAILS_REFUSAL, "reason": "output_rails_error"}

app = FastAPI(title="SPARC-P Multi-Agent Backend", version="1.0.0")
app.add_middleware(
    CORSMiddleware,
    allow_origins=CORS_ALLOWED_ORIGINS,
    allow_credentials=CORS_ALLOW_CREDENTIALS,
    allow_methods=CORS_ALLOWED_METHODS,
    allow_headers=CORS_ALLOWED_HEADERS,
)

tokenizer = None
adapter_model = None
ADAPTER_FOR_MODE = {
    "caregiver": "caregiver",
    "coach": "coach",
    "supervisor": "supervisor",
}
ADAPTER_PATHS = {
    "caregiver": os.path.join(MODEL_BASE_PATH, "CaregiverAgent"),
    "coach": os.path.join(MODEL_BASE_PATH, "C-LEAR_CoachAgent"),
    "supervisor": os.path.join(MODEL_BASE_PATH, "SupervisorAgent"),
}
inference_lock = asyncio.Lock()

def generate_tokens_sync(model, **generate_kwargs):
    with torch.inference_mode():
        return model.generate(**generate_kwargs)


def select_adapter_for_mode(mode: str) -> str:
    normalized = (mode or "caregiver").strip().lower()
    return ADAPTER_FOR_MODE.get(normalized, "caregiver")


def require_api_key(x_api_key: Optional[str] = Header(default=None, alias="X-API-Key")) -> str:
    """Defense-in-depth auth guard for in-app API access."""
    if not API_AUTH_ENABLED:
        return "auth_disabled"
    if not API_KEY:
        raise HTTPException(
            status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
            detail="API key auth is enabled but SPARC_API_KEY is not configured",
        )
    if x_api_key != API_KEY:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid or missing API key",
        )
    return x_api_key


@app.on_event("startup")
async def load_models():
    global adapter_model, tokenizer
    base_model_name = "gpt-oss-120b"

    tokenizer = AutoTokenizer.from_pretrained(base_model_name)
    base_model = AutoModelForCausalLM.from_pretrained(
        base_model_name,
        load_in_4bit=True,
        device_map="auto",
    )

    adapter_model = PeftModel.from_pretrained(
        base_model,
        ADAPTER_PATHS["caregiver"],
        adapter_name="caregiver",
    )
    adapter_model.load_adapter(ADAPTER_PATHS["coach"], adapter_name="coach")
    adapter_model.load_adapter(ADAPTER_PATHS["supervisor"], adapter_name="supervisor")
    adapter_model.set_adapter("caregiver")

    load_guardrails_runtime()


class ChatRequest(BaseModel):
    session_id: str = Field(..., min_length=1, max_length=128, pattern=r"^[a-zA-Z0-9_-]+$")
    user_message: str = Field(..., min_length=1, max_length=10000)
    audio_data: Optional[str] = Field(default=None, max_length=2_000_000)


class ChatResponse(BaseModel):
    response_text: str
    audio_url: Optional[str] = None
    emotion: str
    animation_cues: Dict[str, str]
    coach_feedback: Optional[Dict[str, Any]] = None


@app.get("/health")
async def health_check():
    try:
        auth = riva.client.Auth(uri=RIVA_SERVER)
        riva.client.ASRService(auth)
        riva_ok = True
    except Exception:
        riva_ok = False

    model_ok = tokenizer is not None and adapter_model is not None
    status_text = "healthy" if model_ok else "degraded"
    health_payload = {
        "status": status_text,
        "models_loaded": model_ok,
        "ready_for_traffic": model_ok,
        "riva_connected": riva_ok,
        "api_auth_enabled": API_AUTH_ENABLED,
        "api_auth_configured": bool(API_KEY),
        "api_contract_version": API_CONTRACT_VERSION,
        "guardrails_loaded": guardrails_engine is not None,
        "firebase_creds_configured": bool(FIREBASE_CREDS),
    }
    http_status = status.HTTP_200_OK if model_ok else status.HTTP_503_SERVICE_UNAVAILABLE
    return JSONResponse(status_code=http_status, content=health_payload)


@app.post("/v1/chat", response_model=ChatResponse)
async def process_chat(request: ChatRequest, _api_key: str = Depends(require_api_key)):
    try:
        if adapter_model is None or tokenizer is None:
            raise HTTPException(status_code=503, detail="Model adapters are not initialized")

        session_ref = db.collection("sessions").document(request.session_id)
        session_state = session_ref.get().to_dict() or {}

        input_guard = await enforce_guardrails_input(request.user_message)
        if not input_guard["allowed"]:
            return ChatResponse(
                response_text=input_guard["text"],
                emotion="neutral",
                animation_cues={"gesture": "idle"},
                coach_feedback={"safe": False, "reason": input_guard["reason"]}
            )

        mode = session_state.get("mode", "caregiver")
        primary_adapter = select_adapter_for_mode(mode)

        prompt = f"[SESSION: {request.session_id}] User: {input_guard['text']}\\nAssistant:"
        model_inputs = tokenizer(prompt, return_tensors="pt", truncation=True, max_length=1024)
        model_inputs = {k: v.to(adapter_model.device) for k, v in model_inputs.items()}

        async with inference_lock:
            adapter_model.set_adapter(primary_adapter)
            output = await asyncio.to_thread(
                generate_tokens_sync,
                adapter_model,
                **model_inputs,
                max_new_tokens=180,
                do_sample=True,
                temperature=0.7,
                top_p=0.9,
                pad_token_id=tokenizer.eos_token_id,
            )
        decoded = tokenizer.decode(output[0], skip_special_tokens=True)
        response_text = decoded.split("Assistant:")[-1].strip() or "I’m here to help with HPV vaccine communication practice."

        output_guard = await enforce_guardrails_output(response_text)
        response_text = output_guard["text"]

        audio_url = None
        try:
            auth = riva.client.Auth(uri=RIVA_SERVER)
            tts = riva.client.SpeechSynthesisService(auth)
            tts_resp = tts.synthesize(response_text, voice_name="English-US.Female-1")
            audio_url = "data:audio/wav;base64," + base64.b64encode(tts_resp.audio).decode("utf-8")
        except Exception as riva_error:
            logger.warning("Riva TTS unavailable: %s", sanitize_for_storage(str(riva_error)))

        coach_feedback_text = ""
        try:
            feedback_prompt = f"Provide concise coaching feedback for this response: {response_text}"
            feedback_inputs = tokenizer(feedback_prompt, return_tensors="pt", truncation=True, max_length=512)
            feedback_inputs = {k: v.to(adapter_model.device) for k, v in feedback_inputs.items()}
            async with inference_lock:
                adapter_model.set_adapter("coach")
                feedback_tokens = await asyncio.to_thread(
                    generate_tokens_sync,
                    adapter_model,
                    **feedback_inputs,
                    max_new_tokens=80,
                    do_sample=False,
                    pad_token_id=tokenizer.eos_token_id,
                )
            coach_feedback_text = tokenizer.decode(feedback_tokens[0], skip_special_tokens=True)
        finally:
            async with inference_lock:
                adapter_model.set_adapter(primary_adapter)

        sanitized_user_message = sanitize_for_storage(request.user_message)
        sanitized_response_text = sanitize_for_storage(response_text)
        session_state["last_user_message"] = sanitized_user_message
        session_state["last_response"] = sanitized_response_text
        session_state["mode"] = mode
        session_state["phi_redaction"] = "presidio"
        session_state["phi_redaction_applied"] = True
        session_ref.set(session_state, merge=True)

        return ChatResponse(
            response_text=response_text,
            audio_url=audio_url,
            emotion="supportive",
            animation_cues={"gesture": "speaking", "intensity": "low"},
            coach_feedback={"safe": output_guard["allowed"], "reason": output_guard["reason"], "summary": coach_feedback_text[:500]},
        )
    except Exception as e:
        logger.exception("/v1/chat failed after sanitization path: %s", sanitize_for_storage(str(e)))
        raise HTTPException(status_code=500, detail="Internal server error")
''').strip()
main_content = (
    main_content
    .replace("{MODEL_DIR}", str(MODEL_DIR))
    .replace("{PUBAPPS_ROOT}", str(PUBAPPS_ROOT))
    .replace("{PUBAPP_ALLOWED_ORIGINS}", str(PUBAPP_ALLOWED_ORIGINS))
)

BACKEND_DIR.mkdir(parents=True, exist_ok=True)
main_py.write_text(main_content)
print(f"Wrote {main_py}")

In [None]:
# 6.2 C4/C5/M9/H2/H3/H5/H10/H11/H12/H13/H14 Smoke Test — Adapter/Auth/Config + Redaction + Contract + CORS + Guardrails + Async Inference + Health Readiness + Error Sanitization + Schema Constraints

backend_text = main_py.read_text()

required_markers = [
    "adapter_name=\"caregiver\"",
    "load_adapter(ADAPTER_PATHS[\"coach\"], adapter_name=\"coach\")",
    "load_adapter(ADAPTER_PATHS[\"supervisor\"], adapter_name=\"supervisor\")",
    "adapter_model.set_adapter(primary_adapter)",
    "adapter_model.set_adapter(\"coach\")",
    "def require_api_key(",
    "Header(default=None, alias=\"X-API-Key\")",
    "Depends(require_api_key)",
    "SPARC_FIREBASE_CREDS",
    "SPARC_MODEL_BASE_PATH",
    "SPARC_RIVA_SERVER",
    "os.path.isfile(FIREBASE_CREDS)",
    "from presidio_analyzer import AnalyzerEngine",
    "from presidio_anonymizer import AnonymizerEngine",
    "def sanitize_for_storage(",
    "sanitized_user_message = sanitize_for_storage(request.user_message)",
    "sanitized_response_text = sanitize_for_storage(response_text)",
    "session_state[\"phi_redaction_applied\"] = True",
    "API_CONTRACT_VERSION = \"v1\"",
    "session_id: str = Field(..., min_length=1, max_length=128, pattern=r\"^[a-zA-Z0-9_-]+$\")",
    "user_message: str = Field(..., min_length=1, max_length=10000)",
    "audio_data: Optional[str] = Field(default=None, max_length=2_000_000)",
    "api_contract_version\": API_CONTRACT_VERSION",
    "CORS_ALLOWED_ORIGINS = [",
    "CORS_ALLOW_CREDENTIALS = os.getenv(\"SPARC_CORS_ALLOW_CREDENTIALS\", \"false\")",
    "allow_origins=CORS_ALLOWED_ORIGINS",
    "allow_credentials=CORS_ALLOW_CREDENTIALS",
    "from nemoguardrails import LLMRails, RailsConfig",
    "load_guardrails_runtime()",
    "enforce_guardrails_input(request.user_message)",
    "enforce_guardrails_output(response_text)",
    "guardrails_loaded\": guardrails_engine is not None",
    "import asyncio",
    "inference_lock = asyncio.Lock()",
    "def generate_tokens_sync(",
    "await asyncio.to_thread(",
    "from fastapi.responses import JSONResponse",
    "model_ok = tokenizer is not None and adapter_model is not None",
    "ready_for_traffic\": model_ok",
    "status.HTTP_503_SERVICE_UNAVAILABLE",
    "return JSONResponse(status_code=http_status, content=health_payload)",
    "logger.exception(\"/v1/chat failed after sanitization path: %s\", sanitize_for_storage(str(e)))",
    "raise HTTPException(status_code=500, detail=\"Internal server error\")",
]

missing = [marker for marker in required_markers if marker not in backend_text]
assert not missing, f"Missing required markers: {missing}"

assert "caregiver_model = PeftModel.from_pretrained(base_model" not in backend_text, "Legacy shared-object adapter pattern remains"
assert "coach_model = PeftModel.from_pretrained(base_model" not in backend_text, "Legacy shared-object adapter pattern remains"
assert "supervisor_model = PeftModel.from_pretrained(base_model" not in backend_text, "Legacy shared-object adapter pattern remains"
assert "async def process_chat(request: ChatRequest):" not in backend_text, "Endpoint still lacks auth dependency"
assert "session_state[\"last_user_message\"] = request.user_message" not in backend_text, "Raw user message still persisted to Firebase"
assert "session_state[\"last_response\"] = response_text" not in backend_text, "Raw response still persisted to Firebase"
assert "user_transcript" not in backend_text, "Legacy request field still present"
assert "allow_origins=[\"*\"]" not in backend_text, "Wildcard CORS origins remain configured"
assert "allow_credentials=True" not in backend_text, "Credentialed wildcard CORS remains configured"
assert "blocked = [\"politics\", \"election\", \"gambling\", \"crypto\", \"finance advice\"]" not in backend_text, "Legacy keyword blocklist remains configured"
assert "output = adapter_model.generate(" not in backend_text, "Primary generation still blocks event loop"
assert "feedback_tokens = adapter_model.generate(" not in backend_text, "Coach generation still blocks event loop"
assert "\"models_loaded\": True" not in backend_text, "Health still hard-codes models_loaded=True"
assert "detail=str(e)" not in backend_text, "Raw exception details still leak to client"

print("✅ C4/C5/M9/H2/H3/H5/H10/H11/H12/H13/H14 validation passed: named adapters, auth guard, env config, Presidio redaction, unified v1 API contract, safe CORS policy, runtime Guardrails pipeline, non-blocking async inference path, readiness-aware health behavior, sanitized client error responses, and strict request schema constraints are configured.")

In [None]:
# 6.3 H11 Load Test Script — Health Responsiveness Under Chat Load
load_test_py = BACKEND_DIR / "h11_health_load_test.py"
load_test_content = textwrap.dedent("""
import os
import time
import statistics
import concurrent.futures

import requests

BASE_URL = os.getenv("SPARC_BASE_URL", "http://localhost:8000")
API_KEY = os.getenv("SPARC_API_KEY", "")
HEADERS = {"X-API-Key": API_KEY} if API_KEY else {}
CHAT_PAYLOAD = {
    "session_id": "h11-load",
    "user_message": "Help me discuss HPV vaccines with a hesitant caregiver."
}


def post_chat() -> int:
    response = requests.post(f"{BASE_URL}/v1/chat", json=CHAT_PAYLOAD, headers=HEADERS, timeout=120)
    return response.status_code


def ping_health() -> float:
    start = time.perf_counter()
    response = requests.get(f"{BASE_URL}/health", timeout=5)
    response.raise_for_status()
    return (time.perf_counter() - start) * 1000


health_latencies = []
with concurrent.futures.ThreadPoolExecutor(max_workers=12) as pool:
    chat_futures = [pool.submit(post_chat) for _ in range(30)]
    for _ in range(60):
        health_latencies.append(ping_health())
        time.sleep(0.2)
    chat_statuses = [f.result() for f in chat_futures]

health_p95 = statistics.quantiles(health_latencies, n=20)[18] if len(health_latencies) >= 20 else max(health_latencies)
health_success_ratio = sum(1 for latency in health_latencies if latency < 1500) / len(health_latencies)

assert all(code in (200, 401, 422) for code in chat_statuses), f"Unexpected chat status codes: {sorted(set(chat_statuses))}"
assert health_success_ratio >= 0.99, f"Health responsiveness dropped below target: {health_success_ratio:.3f}"
assert health_p95 < 1500, f"Health p95 latency too high under chat load: {health_p95:.1f}ms"

print(f"✅ H11 load test passed: /health p95={health_p95:.1f}ms, success_ratio={health_success_ratio:.3f}")
""").strip()
load_test_py.write_text(load_test_content)
print(f"Wrote {load_test_py}")
print("Run with: python h11_health_load_test.py")

In [None]:
# 6.2 Create systemd user service for FastAPI
systemd_dir = Path.home() / '.config/systemd/user'
systemd_dir.mkdir(parents=True, exist_ok=True)
service_file = systemd_dir / 'sparc-backend.service'
service_content = textwrap.dedent(f"""
[Unit]
Description=SPARC-P FastAPI Backend
After=network.target riva-server.service
Requires=riva-server.service

[Service]
Type=simple
Environment=PATH={CONDA_ENV}/bin:/usr/bin
Environment=PYTHONUNBUFFERED=1
WorkingDirectory={BACKEND_DIR}
ExecStart={CONDA_ENV}/bin/uvicorn main:app --host 0.0.0.0 --port 8000 --workers {UVICORN_WORKERS}
Restart=always
RestartSec=10

[Install]
WantedBy=default.target
""").strip()
service_file.write_text(service_content)
print(f"Wrote {service_file}")
print(service_content)
print(f"Configured uvicorn workers: {UVICORN_WORKERS}")

In [None]:
# 6.3 Enable backend service
run("systemctl --user daemon-reload")
run("systemctl --user enable --now sparc-backend")
run("systemctl --user status sparc-backend --no-pager", check=False)

## 7. Validation Checks
Set `EXECUTE = True` before running these checks.

In [None]:
# 7.1 Health and service checks
run("curl -s http://localhost:8000/health", check=False)
run("journalctl --user -u riva-server -n 50 --no-pager", check=False)
run("journalctl --user -u sparc-backend -n 50 --no-pager", check=False)
run(f"ls -lh {MODEL_DIR}", check=False)