In [1]:
# PT Training Plan Generation + QA Demo (Synthetic Data)
# Outputs:
# - pt_training_plans.csv
# - pt_training_run_report.json
#
# Assumes these two input files exist in the same notebook directory:
# - medical_redacted.csv
# - patient_survey.csv
#
# Standard library only (works without pandas).

import csv
import json
from collections import Counter
from datetime import datetime

print("PT demo: pipeline starting")

MEDICAL_CSV = "medical_redacted.csv"
SURVEY_CSV = "patient_survey.csv"

def load_csv(path):
    with open(path, "r", newline="", encoding="utf-8") as f:
        return list(csv.DictReader(f))

medical = load_csv(MEDICAL_CSV)
survey = load_csv(SURVEY_CSV)

# Merge step
medical_by_id = {r["patient_id"].strip(): r for r in medical}
survey_by_id = {r["patient_id"].strip(): r for r in survey}
all_ids = sorted(set(medical_by_id) | set(survey_by_id))

merged = []
merge_errors = []

for pid in all_ids:
    m = medical_by_id.get(pid)
    s = survey_by_id.get(pid)
    if not m or not s:
        merge_errors.append({
            "patient_id": pid,
            "missing_medical": not bool(m),
            "missing_survey": not bool(s),
        })
        continue
    merged.append({**m, **s})

print("Merged rows:", len(merged), "| Merge errors:", len(merge_errors))

# Knowledge base stand-ins
CONTRA_RULES = {
    "ACL reconstruction": ["closed_chain_only", "no_deep_squats"],
    "Rotator cuff strain": ["avoid_overhead_loading_if_painful"],
    "Lower back pain": ["avoid_spinal_flexion_if_painful"],
}

EXERCISE_LIBRARY = {
    "no_deep_squats": [
        {"exercise": "Supported sit-to-stand (partial range)", "category": "strength", "dose": "2 x 8 reps"},
        {"exercise": "Heel slides", "category": "mobility", "dose": "2 x 10 reps"},
    ],
    "closed_chain_only": [
        {"exercise": "Wall-supported mini-squat (shallow)", "category": "strength", "dose": "2 x 6 reps"},
        {"exercise": "Terminal knee extension with band", "category": "strength", "dose": "2 x 10 reps"},
    ],
    "avoid_overhead_loading_if_painful": [
        {"exercise": "Scapular retraction holds", "category": "stability", "dose": "2 x 20 seconds"},
        {"exercise": "External rotation with light band (elbow tucked)", "category": "strength", "dose": "2 x 8 reps"},
    ],
    "avoid_spinal_flexion_if_painful": [
        {"exercise": "McGill curl-up (modified)", "category": "stability", "dose": "2 x 6 reps"},
        {"exercise": "Bird-dog", "category": "stability", "dose": "2 x 6 reps each side"},
    ],
}

def normalize(text):
    return " ".join((text or "").strip().split())

def extract_constraints(primary_condition, self_constraints_text):
    constraints = set(CONTRA_RULES.get(primary_condition, []))
    t = (self_constraints_text or "").lower()
    if "deep squat" in t:
        constraints.add("no_deep_squats")
    if "spinal flexion" in t:
        constraints.add("avoid_spinal_flexion_if_painful")
    return sorted(constraints)

def generate_plan(row):
    pid = row["patient_id"].strip()
    condition = normalize(row.get("primary_condition"))
    mobility = (row.get("mobility_level") or "").strip().lower()
    goal = normalize(row.get("session_goal"))
    pain = int(row["pain_0_10"]) if (row.get("pain_0_10") or "").isdigit() else None

    constraints = extract_constraints(condition, row.get("self_reported_constraints", ""))

    # Pull candidate exercises from constraints
    exercises = []
    for c in constraints:
        exercises.extend(EXERCISE_LIBRARY.get(c, []))

    # Sequence: mobility -> stability -> strength
    order = {"mobility": 0, "stability": 1, "strength": 2}
    exercises = sorted(exercises, key=lambda ex: order.get(ex["category"], 9))

    # Simple complexity cap based on mobility
    max_items = 4 if mobility in ["low", "medium"] else 5
    exercises = exercises[:max_items]

    lines = []
    lines.append(f"Patient: {pid}")
    lines.append(f"Condition: {condition} | Mobility: {mobility} | Pain: {pain if pain is not None else 'NA'}")
    if goal:
        lines.append(f"Goal: {goal}")
    if constraints:
        lines.append("Constraints: " + ", ".join(constraints))

    red_flags = normalize(row.get("red_flags", ""))
    if red_flags:
        lines.append("Safety note: red flags present. Escalate to clinician review if symptoms worsen.")

    lines.append("Session plan:")
    for i, ex in enumerate(exercises, 1):
        lines.append(f"{i}. {ex['exercise']} ({ex['dose']})")

    return {
        "patient_id": pid,
        "primary_condition": condition,
        "mobility_level": mobility,
        "pain_0_10": str(pain) if pain is not None else "",
        "session_goal": goal,
        "constraints": ";".join(constraints),
        "plan_item_count": str(len(exercises)),
        "plan_text": "\n".join(lines),
    }

QUALITY_SCORE = {"low": 0, "medium": 1, "high": 2}

def qa_check(plan_row):
    issues = []

    if not plan_row["session_goal"]:
        issues.append("missing_session_goal")

    if plan_row["plan_item_count"] == "0":
        issues.append("no_exercises_generated")

    # Constraint violation example
    if "no_deep_squats" in plan_row["constraints"]:
        if "deep squat" in plan_row["plan_text"].lower():
            issues.append("constraint_violation_deep_squat")

    # Quality label
    if len(issues) == 0:
        quality = "high"
    elif len(issues) == 1:
        quality = "medium"
    else:
        quality = "low"

    return issues, quality

plans = []
quality_counts = Counter()

for r in merged:
    plan = generate_plan(r)
    issues, quality = qa_check(plan)
    plan["qa_issues"] = ";".join(issues)
    plan["ai_plan_quality"] = quality
    plan["quality_score"] = str(QUALITY_SCORE[quality])
    quality_counts[quality] += 1
    plans.append(plan)

# Write outputs
PLANS_CSV = "pt_training_plans.csv"
REPORT_JSON = "pt_training_run_report.json"

with open(PLANS_CSV, "w", newline="", encoding="utf-8") as f:
    writer = csv.DictWriter(f, fieldnames=list(plans[0].keys()))
    writer.writeheader()
    writer.writerows(plans)

report = {
    "run_timestamp_utc": datetime.utcnow().isoformat() + "Z",
    "inputs": {"medical_csv": MEDICAL_CSV, "survey_csv": SURVEY_CSV},
    "outputs": {"plans_csv": PLANS_CSV, "run_report_json": REPORT_JSON},
    "merged_rows": len(merged),
    "merge_errors_count": len(merge_errors),
    "quality_distribution": dict(quality_counts),
    "merge_errors_sample": merge_errors[:5],
}

with open(REPORT_JSON, "w", encoding="utf-8") as f:
    json.dump(report, f, indent=2)

print("Wrote outputs:", PLANS_CSV, REPORT_JSON)
print("Quality distribution:", dict(quality_counts))
print("\nFirst plan preview:\n")
print(plans[0]["plan_text"])


PT demo: pipeline starting
Merged rows: 3 | Merge errors: 0


  "run_timestamp_utc": datetime.utcnow().isoformat() + "Z",


Wrote outputs: pt_training_plans.csv pt_training_run_report.json
Quality distribution: {'high': 2, 'medium': 1}

First plan preview:

Patient: P001
Condition: ACL reconstruction | Mobility: low | Pain: 5
Goal: Restore knee flexion
Constraints: closed_chain_only, no_deep_squats
Session plan:
1. Heel slides (2 x 10 reps)
2. Wall-supported mini-squat (shallow) (2 x 6 reps)
3. Terminal knee extension with band (2 x 10 reps)
4. Supported sit-to-stand (partial range) (2 x 8 reps)
