> Safety note: This notebook includes optional integrations (OpenAI/Azure, Microsoft Graph, Whisper). Only enable what you need and keep secrets in `.env` or WordPress `wp-config.php` per `docs/SECRETS.md`.

# Lumina Contextual Research Assistant (AM-Class) — Notebook

This notebook wires Lumina (Contextual Research Assistant), a scheduling agent, audio ingestion, resonance overlay, and pipeline glue for the `tec-tgcr` repository. It’s designed to run in VS Code with Python 3.10+ and uses only local creds/env per `docs/SECRETS.md`.

## 1) Configure Python Environment

- Uses `pyproject.toml` as the source of truth.
- Installs extra runtime tools used here (dotenv, yaml, pydantic, httpx, msal, msgraph-sdk, apscheduler, soundfile, openai-whisper or azure-cognitiveservices-speech).
- Verify interpreter and versions.

In [None]:
# Verify interpreter and install optional extras if missing (idempotent)
import sys, subprocess, json
print("Python:", sys.version)

extras = [
    "python-dotenv", "pyyaml", "pydantic>=2", "httpx", "msal", "msgraph-sdk",
    "apscheduler", "soundfile", # for AMR/WAV IO
]
# Whisper or Azure Speech; prefer Whisper CPU if not on Azure
preferred_asr = ["openai-whisper"]

def pip_install(pkgs):
    try:
        subprocess.check_call([sys.executable, "-m", "pip", "install", "--quiet", *pkgs])
        return True
    except Exception as e:
        print("pip install error:", e)
        return False

_ = pip_install(extras)
_ = pip_install(preferred_asr)

import importlib
mods = {m: importlib.util.find_spec(m) is not None for m in [
    "dotenv", "yaml", "pydantic", "httpx", "msal", "msgraph", "apscheduler", "soundfile", "whisper"
]}
print("Modules:", json.dumps(mods, indent=2))

## 2) Load Project Paths and .env

Detect repo root, update `sys.path`, load `.env`, and set commonly used directories.

In [None]:
from pathlib import Path
import os, sys
from datetime import datetime

ROOT = Path(__file__).resolve().parents[2] if '__file__' in globals() else Path.cwd()
if (ROOT / 'pyproject.toml').exists():
    pass
else:
    # Fallback: try to locate repo root by walking up
    cur = Path.cwd()
    while cur != cur.parent:
        if (cur / 'pyproject.toml').exists():
            ROOT = cur
            break
        cur = cur.parent

SRC = ROOT / 'src'
DATA = ROOT / 'data'
DOCS = ROOT / 'docs'
AIWF = ROOT / 'ai-workflow'
APPS = ROOT / 'apps'
OUTPUT = AIWF / 'output'
OUTPUT.mkdir(parents=True, exist_ok=True)

if str(SRC) not in sys.path:
    sys.path.insert(0, str(SRC))

from dotenv import load_dotenv
load_dotenv(ROOT / '.env')

print('Resolved paths:')
for p in [ROOT, SRC, DATA, DOCS, AIWF, APPS, OUTPUT]:
    print(' -', p)

## 3) Read Agent Config and Credentials

Parse `config/agent.yml` and `config/tec-verified-credential.json`; read select secrets from environment and validate.

In [None]:
import yaml, json
from pydantic import BaseModel

class Creds(BaseModel):
    graph_client_id: str | None = None
    graph_tenant_id: str | None = None
    azure_speech_key: str | None = None
    azure_speech_region: str | None = None

creds = Creds(
    graph_client_id=os.getenv('GRAPH_CLIENT_ID'),
    graph_tenant_id=os.getenv('GRAPH_TENANT_ID'),
    azure_speech_key=os.getenv('AZURE_SPEECH_KEY'),
    azure_speech_region=os.getenv('AZURE_SPEECH_REGION'),
)

agent_yml = (ROOT / 'config' / 'agent.yml')
vc_json = (ROOT / 'config' / 'tec-verified-credential.json')
agent_cfg = yaml.safe_load(agent_yml.read_text()) if agent_yml.exists() else {}
vc_cfg = json.loads(vc_json.read_text()) if vc_json.exists() else {}

masked = lambda s: (s[:3] + '***' + s[-2:]) if s and len(s) > 6 else (s or None)
print('GRAPH_CLIENT_ID:', masked(creds.graph_client_id))
print('GRAPH_TENANT_ID:', masked(creds.graph_tenant_id))
print('AZURE_SPEECH_KEY:', masked(creds.azure_speech_key))
print('AZURE_SPEECH_REGION:', creds.azure_speech_region or None)
print('agent.yml keys:', list(agent_cfg.keys()) if agent_cfg else [])
print('verified-credential fields:', list(vc_cfg.keys()) if vc_cfg else [])

## 4) Import TEC Modules and Prompt Templates

Expose helpers to list available templates and call the agent runner programmatically.

In [None]:
from importlib import import_module

# prompt_templates.py helper
try:
    pt = import_module('ai-workflow.prompt_templates')
except Exception:
    pt = None

# tec_agent_runner (CLI-ish module)
try:
    agent_runner = import_module('tec_agent_runner')
except Exception:
    agent_runner = None

print('prompt_templates available:', bool(pt))
print('tec_agent_runner available:', bool(agent_runner))

available_templates = []
if pt and hasattr(pt, 'TEMPLATES'):
    available_templates = list(getattr(pt, 'TEMPLATES').keys())
print('Templates:', available_templates)

## 5) Define Lumina Instruction Schema (AM-Class)

Pydantic schema to serialize persona, structural rules, command templates, and safety rails (versioned).

In [None]:
from pydantic import BaseModel, Field
from typing import List, Dict, Any
import json, yaml

class StructuralRules(BaseModel):
    hypothesis_anchoring: bool = True
    dual_channel: bool = True
    clause_stack: bool = True
    neurochem_map: bool = True
    cutaway_logic: bool = True
    symbolic_recursion: bool = True

class CommandTemplate(BaseModel):
    name: str
    description: str
    template: str

class SafetyRails(BaseModel):
    allow_medical_advice: bool = False
    allow_financial_advice: bool = False
    disallowed_sources: List[str] = Field(default_factory=lambda: ["reddit-only", "x-says"])  # illustrative

class LuminaPersona(BaseModel):
    version: str = "AM-1.0"
    designation: str = "Lumina — Contextual Research Assistant (AM-Class)"
    traits: List[str] = ["myth-scientific", "scholarly", "snappy", "precise"]
    anchors: List[str] = ["Lumina", "Kaznak", "Karma", "Arcadia"]
    structural: StructuralRules = StructuralRules()
    commands: List[CommandTemplate] = Field(default_factory=lambda: [
        CommandTemplate(
            name="default-summary",
            description="Summarize content in Arcadia’s myth-scientific voice",
            template="Working Hypothesis: {hypothesis}\nScholarly: {scholarly}\nResonant: {resonant}\nNeurochemistry: OXY={oxy}, DOP={dop}, ADR={adr}\nMic-Line: {mic}"
        )
    ])
    safety: SafetyRails = SafetyRails()

persona = LuminaPersona()
print(persona.model_dump_json(indent=2))

# Save reusable spec for apps/agents
spec_path_json = OUTPUT / 'lumina_persona_am.json'
spec_path_yaml = OUTPUT / 'lumina_persona_am.yaml'
spec_path_json.write_text(persona.model_dump_json(indent=2))
spec_path_yaml.write_text(yaml.safe_dump(json.loads(persona.model_dump_json()), sort_keys=False))
print('Saved persona spec to:', spec_path_json.name, 'and', spec_path_yaml.name)

## 6) Compile Prompt Stack (System + Persona + Task)

Compose chat prompts from policy + persona + task; provide simple renderers for chat APIs.

In [None]:
from typing import Dict, List

def compile_chat_messages(persona: LuminaPersona, task: str) -> List[Dict[str,str]]:
    system = (
        "You are Lumina (AM-Class), a myth-scientific research assistant. "
        "Follow the structural rules: Working Hypothesis, dual-channel (scholarly + resonant), "
        "neurochemical indexing, and close with a mic-line. Maintain TEC cosmology continuity."
    )
    sys_persona = persona.model_dump_json()
    return [
        {"role": "system", "content": system},
        {"role": "system", "content": f"Persona:: {sys_persona}"},
        {"role": "user", "content": task},
    ]

# quick unit check
test_msgs = compile_chat_messages(persona, "Summarize the resonance of Sleep Token - The Summoning.")
print('Stacked messages:', len(test_msgs))

## 7) Context Ingestion from repo

Index text from `docs/`, `data/`, and `ai-workflow/output` with provenance tracking.

In [None]:
import re
from typing import Tuple

def iter_text_files(root: Path, patterns=(".md", ".txt", ".json")):
    for p in root.rglob('*'):
        if p.suffix.lower() in patterns and p.is_file():
            yield p

def load_text_with_provenance(paths: list[Path], max_chars=10000) -> list[dict]:
    out = []
    for p in paths:
        try:
            raw = p.read_text(encoding='utf-8', errors='ignore')
        except Exception:
            continue
        txt = re.sub(r"\s+", " ", raw).strip()
        out.append({"path": str(p.relative_to(ROOT)), "preview": txt[:max_chars]})
    return out

sources = list(iter_text_files(DOCS)) + list(iter_text_files(DATA)) + list(iter_text_files(AIWF / 'output'))
ingest = load_text_with_provenance(sources[:50])  # cap for demo
print('Ingested docs (previewed):', len(ingest))

## 8) Resonance Overlay Scoring

$R = w_o OXY + w_d DOP + w_a ADR$ normalized to [0,1].

In [None]:
from typing import NamedTuple

class Resonance(NamedTuple):
    oxy: float
    dop: float
    adr: float
    score: float

def clamp01(x: float) -> float:
    return max(0.0, min(1.0, x))

def resonance_score(oxy: float, dop: float, adr: float, w=(0.4,0.4,0.2)) -> Resonance:
    o, d, a = map(clamp01, (oxy, dop, adr))
    s = clamp01(o*w[0] + d*w[1] + a*w[2])
    return Resonance(o, d, a, s)

print('Resonance demo:', resonance_score(0.8, 0.6, 0.2))

## 9) Scheduling Agent (Microsoft Graph, device-code flow)

Create/update/list calendar events. Requires `GRAPH_CLIENT_ID` and `GRAPH_TENANT_ID` in `.env`.

In [None]:
import sqlite3, time
import msal
from msgraph import GraphServiceClient
try:
    from azure.identity import UsernamePasswordCredential  # optional alt flow if needed
except Exception:
    UsernamePasswordCredential = None

DB = OUTPUT / 'schedule_meta.sqlite3'
conn = sqlite3.connect(DB)
conn.execute("CREATE TABLE IF NOT EXISTS events (id TEXT PRIMARY KEY, subject TEXT, when_utc TEXT, created_at TEXT)")
conn.commit()

SCOPES = ["Calendars.ReadWrite"]
CLIENT_ID = creds.graph_client_id
TENANT_ID = creds.graph_tenant_id

def get_token_device_code() -> str | None:
    if not CLIENT_ID or not TENANT_ID:
        print('Graph creds missing; skip auth.')
        return None
    app = msal.PublicClientApplication(CLIENT_ID, authority=f"https://login.microsoftonline.com/{TENANT_ID}")
    flow = app.initiate_device_flow(scopes=[f"https://graph.microsoft.com/.default"])
    if 'user_code' not in flow:
        print('Failed to create device flow:', flow)
        return None
    print('Device code:', flow['user_code'])
    print('Visit:', flow['verification_uri'])
    res = app.acquire_token_by_device_flow(flow)
    if 'access_token' in res:
        return res['access_token']
    print('Auth error:', res)
    return None

# Lazy client wrapper
class GraphClient:
    def __init__(self, token: str):
        self.token = token
        self.client = GraphServiceClient(lambda: {"Authorization": f"Bearer {token}"})

    async def create_event(self, subject: str, start_iso: str, end_iso: str):
        body = {
            "subject": subject,
            "start": {"dateTime": start_iso, "timeZone": "UTC"},
            "end": {"dateTime": end_iso, "timeZone": "UTC"}
        }
        evt = await self.client.me.events.post(body)
        conn.execute("INSERT OR REPLACE INTO events(id, subject, when_utc, created_at) VALUES(?,?,?,?)",
                     (evt.id, subject, start_iso, datetime.utcnow().isoformat()))
        conn.commit()
        return evt

    async def list_upcoming(self, top=5):
        return await self.client.me.calendarview.get(query_parameters={"top": top})

print('Scheduling DB:', DB)

## 10) Audio Notes: Transcribe (Whisper CPU or Azure Speech)

Loads audio from `data/evidence/*.amr|wav`, transcribes locally by default.

In [None]:
import soundfile as sf
import glob

try:
    import whisper
except Exception:
    whisper = None

AUDIO_DIR = DATA / 'evidence'

def transcribe_local_whisper(audio_path: Path, model_name='base') -> dict:
    if whisper is None:
        raise RuntimeError('whisper not installed')
    model = whisper.load_model(model_name)
    result = model.transcribe(str(audio_path))
    return result

# Demo discovery only (do not force long jobs)
audio_files = [Path(p) for p in glob.glob(str(AUDIO_DIR / '*.amr'))] + [Path(p) for p in glob.glob(str(AUDIO_DIR / '*.wav'))]
print('Found audio files:', [p.name for p in audio_files])

## 11) Task Queue and Orchestration (asyncio + APScheduler)

Minimal async queue and periodic jobs.

In [None]:
import asyncio
from apscheduler.schedulers.asyncio import AsyncIOScheduler

queue: asyncio.Queue = asyncio.Queue()

async def job_research(payload):
    await asyncio.sleep(0.1)
    return {"ok": True, "kind": "research", "payload": payload}

async def worker():
    while True:
        task = await queue.get()
        try:
            if task.get('type') == 'research':
                res = await job_research(task.get('payload', {}))
                print('Job done:', res)
        finally:
            queue.task_done()

scheduler = AsyncIOScheduler()
scheduler.add_job(lambda: print('housekeeping tick'), 'interval', seconds=60)
scheduler.start()

print('Scheduler started. Queue ready.')

## 12) Integrate `tec_agent_runner`

Invoke the agent runner from the notebook, capturing logs to `ai-workflow/output/`.

In [None]:
import io, contextlib

RUN_LOG = OUTPUT / f"agent_run_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}.log"

def run_agent_from_notebook(manifest_path: Path | None = None):
    if agent_runner is None:
        print('tec_agent_runner not importable')
        return None
    argv = []
    if manifest_path:
        argv += ["--manifest", str(manifest_path)]
    buf = io.StringIO()
    with contextlib.redirect_stdout(buf), contextlib.redirect_stderr(buf):
        try:
            if hasattr(agent_runner, 'main'):
                agent_runner.main(argv)
            else:
                print('No main() in tec_agent_runner')
        except SystemExit:
            pass
        except Exception as e:
            print('run error:', e)
    RUN_LOG.write_text(buf.getvalue())
    print('Wrote run log:', RUN_LOG.name)

# Example (commented):
# run_agent_from_notebook(ROOT / 'agents' / 'manifests' / 'airth_research_guard.json')

## 13) Persist Outputs to `ai-workflow/output` and `docs/exports`

Write compiled prompts, transcripts, resonance reports, and schedule receipts.

In [None]:
from datetime import date
import json

EXPORTS = DOCS / 'exports'
EXPORTS.mkdir(parents=True, exist_ok=True)

def save_json(obj: dict, name: str, outdir: Path = OUTPUT) -> Path:
    p = outdir / name
    p.write_text(json.dumps(obj, indent=2, ensure_ascii=False))
    return p

def daily_summary_path(stem: str) -> Path:
    return EXPORTS / f"{stem}_{date.today().isoformat()}.json"

print('Exports dir:', EXPORTS)

## 14) Run Unit Tests with pytest

Runs repo tests and emits JUnit XML to `docs/exports/test-results.xml`.

In [None]:
import os

XML_OUT = EXPORTS / 'test-results.xml'
print('Running pytest...')
!pytest -q --junitxml "{XML_OUT}"

## 15) End-to-End Demo (prompt → research → schedule → transcript → report)

This cell demonstrates a stitched flow using previously defined helpers.

In [None]:
import asyncio, random

async def demo_flow():
    # 1) Persona + prompt stack
    task = "Summarize TEC Codex resonance of recent docs and propose next 1 appointment to improve stability."
    msgs = compile_chat_messages(persona, task)

    # 2) Ingest a few docs
    ctx = ingest[:5]

    # 3) Compute resonance for a fake feature vector
    r = resonance_score(random.random(), random.random(), random.random())

    # 4) (Optional) schedule event via Graph — only if token obtained
    token = None  # set via get_token_device_code() if you want to run it live
    created_event = None
    # token = get_token_device_code()
    # if token:
    #     gc = GraphClient(token)
    #     now = datetime.utcnow()
    #     start = (now.replace(microsecond=0)).isoformat() + 'Z'
    #     end = (now.replace(microsecond=0)).isoformat() + 'Z'
    #     created_event = await gc.create_event('TEC Health Check', start, end)

    # 5) Transcribe first audio (if any) — skip heavy run by default
    transcript = {"skipped": True}
    # if audio_files:
    #     transcript = transcribe_local_whisper(audio_files[0])

    report = {
        "messages": msgs,
        "context_sample": ctx,
        "resonance": {"oxy": r.oxy, "dop": r.dop, "adr": r.adr, "score": r.score},
        "scheduled": bool(created_event),
        "transcript_meta": list(map(lambda p: p.name, audio_files))[:3],
        "timestamp": datetime.utcnow().isoformat(),
    }
    p = save_json(report, 'demo_report.json')
    print('Saved demo_report:', p)

# To run the demo, uncomment:
# await demo_flow()

## Call LUMINAI_API_URL (WordPress agent) — Example

Reads `LUMINAI_API_URL` from environment and posts a simple message array.

In [None]:
import os, httpx, json

LUMINAI_API_URL = os.getenv('LUMINAI_API_URL', '').strip() or None
print('LUMINAI_API_URL:', 'set' if LUMINAI_API_URL else 'not set')

async def call_wp_agent(prompt: str) -> dict:
    if not LUMINAI_API_URL:
        return {"error": "LUMINAI_API_URL not set"}
    payload = {"messages": [{"role": "user", "content": prompt}]}
    async with httpx.AsyncClient(timeout=60) as client:
        r = await client.post(LUMINAI_API_URL, json=payload)
        try:
            return r.json()
        except Exception:
            return {"status": r.status_code, "text": r.text[:500]}

# Example (commented to avoid accidental external call):
# resp = await call_wp_agent('Hello Lumina, quick resonance check on today\'s plan.')
# print(resp)