
# Career Agent – Integrated (Prod Informa + Dev Profiles) with Robust Streaming

This notebook:
- Implements **robust Bedrock streaming** (converse_stream + fallback to invoke_model_with_response_stream).
- Adds a **Prod retriever** for `internal_curated_informa_vectorstore` (Informa internal knowledge).
- Adds a **Dev retriever** (via `PG_DSN`) for the `internal_private_employee_profiles_vectorstore` (employee profiles).
- Provides a **unified `run_workflow()`** that pulls from profile + existing tools (if defined) + Informa Prod snippets, then streams the answer.

> You can keep using your previous helpers if you have them defined elsewhere (e.g., `job_tool`, `courses_tool`, `choose_candidates`, `synthesize_answer_llm`). The code checks for them and uses them when present.


In [None]:

# --- Setup & Environment ---
import os, json, time, urllib.parse, re
from typing import Iterator, List, Dict, Any, Optional, Tuple
from dataclasses import dataclass

try:
    from dotenv import load_dotenv
    load_dotenv()
except Exception:
    pass

AWS_BEDROCK_REGION = os.getenv("AWS_BEDROCK_REGION") or os.getenv("AWS_REGION") or "us-east-1"
PRIMARY_MODEL_ID = os.getenv("PRIMARY_LLM_MODEL_NAME", "us.anthropic.claude-3-7-sonnet-20250219-v1:0")

print("Using region:", AWS_BEDROCK_REGION)
print("Primary model:", PRIMARY_MODEL_ID)


In [None]:

# --- Robust Bedrock Streaming Utilities ---
import boto3
from botocore.exceptions import ClientError

def _normalize_bedrock_model_id(model_id: str) -> str:
    if not model_id:
        return "anthropic.claude-3-7-sonnet-20250219-v1:0"
    if model_id.startswith(("us.", "eu.")):
        model_id = model_id.split(".", 1)[1]
    return model_id

def _bedrock_client():
    region = os.getenv("AWS_BEDROCK_REGION") or os.getenv("AWS_REGION") or "us-east-1"
    endpoint = os.getenv("AWS_BEDROCK_ENDPOINT")  # optional
    kwargs = {"region_name": region}
    if endpoint:
        kwargs["endpoint_url"] = endpoint
    return boto3.client("bedrock-runtime", **kwargs)

SYSTEM_PROMPT = (
    "You are Informa’s internal career advisor. "
    "Write naturally and concisely, tailored to the employee’s background and the question. "
    "Explain tradeoffs, propose bridge steps if the target domain differs from the profile. "
    "Use only facts provided; do not invent links or data."
)

def _make_messages_body(user_text: str, intents: List[str], is_manager: bool, profile_fields: dict, sections: dict) -> dict:
    payload = {
        "query": user_text,
        "intents": intents,
        "persona": {"is_manager": bool(is_manager)},
        "profile": {
            "name": profile_fields.get("name"),
            "title": profile_fields.get("title"),
            "skills": profile_fields.get("skills") or [],
            "topics": profile_fields.get("topics") or [],
        },
        "retrieval": {
            "jobs":   [{"title": x.get("title"), "url": x.get("url")} for x in (sections.get("jobs") or [])][:8],
            "courses":[{"title": x.get("title"), "url": x.get("url")} for x in (sections.get("courses") or [])][:8],
            "development_plan":   [{"title": x.get("title") or (x.get("metadata") or {}).get("title","")} for x in (sections.get("development_plan") or [])][:6],
            "manager_toolkit":    [{"title": x.get("title") or (x.get("metadata") or {}).get("title","")} for x in (sections.get("manager_toolkit")  or [])][:6],
            "leadership_strategy":[{"title": x.get("title") or (x.get("metadata") or {}).get("title","")} for x in (sections.get("leadership_strategy") or [])][:6],
            "informa_strategy":   [{"title": x.get("title"), "snippet": x.get("snippet")} for x in (sections.get("informa_strategy") or [])][:8],
        }
    }
    return {
        "anthropic_version": "bedrock-2023-05-31",
        "max_tokens": 700,
        "temperature": 0.4,
        "system": SYSTEM_PROMPT,
        "messages": [{
            "role":"user",
            "content":[{"type":"text","text":
                "Using only this JSON, answer naturally. "
                "Pick items that best fit the query and profile; prefer intersection/bridge when needed. "
                "If info is insufficient, ask for the minimum missing detail.\n\n"
                + json.dumps(payload, ensure_ascii=False)}]
        }]
    }

def _anthropic_stream_fallback(br, model_id: str, body: dict) -> Iterator[str]:
    """Fallback using invoke_model_with_response_stream (Anthropic Messages format)."""
    resp = br.invoke_model_with_response_stream(
        modelId=model_id,
        body=json.dumps(body),
        accept="application/json",
        contentType="application/json",
    )
    for evt in resp.get("body"):
        if "chunk" not in evt:
            continue
        try:
            payload = json.loads(evt["chunk"]["bytes"].decode("utf-8"))
        except Exception:
            continue
        et = payload.get("type")
        if et == "content_block_delta":
            d = payload.get("delta", {})
            if d.get("type") == "text_delta":
                t = d.get("text", "")
                if t:
                    yield t
        elif et == "message_stop":
            break

def synthesize_answer_llm_stream(
    user_text: str,
    intents: List[str],
    is_manager: bool,
    profile_fields: dict,
    sections: dict,
    model_id: Optional[str] = None,
) -> Iterator[str]:
    """Streams text deltas (converse_stream first; falls back to invoke_model_with_response_stream)."""
    br = _bedrock_client()
    model_id = _normalize_bedrock_model_id(model_id or os.getenv("PRIMARY_LLM_MODEL_NAME", "anthropic.claude-3-7-sonnet-20250219-v1:0"))
    body = _make_messages_body(user_text, intents, is_manager, profile_fields, sections)

    # Fast path: Converse API (system passed in 'system' param, NOT as a message)
    system_prompts = [{"text": body.get("system", SYSTEM_PROMPT)}] if body.get("system") else [{"text": SYSTEM_PROMPT}]
    conv_msgs = []
    for m in body.get("messages", []):
        role = m.get("role", "user")
        if role not in ("user", "assistant"):
            role = "user"
        text_parts = [c.get("text", "") for c in (m.get("content") or []) if c.get("type") == "text"]
        conv_msgs.append({"role": role, "content": [{"text": "".join(text_parts)}]})

    inference = {"temperature": body.get("temperature", 0.4), "maxTokens": body.get("max_tokens", 700)}

    try:
        resp = br.converse_stream(
            modelId=model_id,
            system=system_prompts,
            messages=conv_msgs if conv_msgs else [{"role":"user","content":[{"text":"Hello"}]}],
            inferenceConfig=inference,
        )
        stream = resp.get("stream")
        if not stream:
            # Fallback immediately if no stream
            for t in _anthropic_stream_fallback(br, model_id, body):
                yield t
            return
        with stream as events:
            for event in events:
                if "contentBlockDelta" in event:
                    delta = event["contentBlockDelta"]["delta"].get("text")
                    if delta:
                        yield delta
                elif "messageStop" in event:
                    break
    except Exception as e:
        # Checksumming/proxy issues -> fallback
        for t in _anthropic_stream_fallback(br, model_id, body):
            yield t

# Simple notebook renderer
from IPython.display import display, Markdown
def render_stream(generator, refresh: float = 0.05, min_early_flush_chars: int = 60) -> str:
    buf, flushed, last = [], False, 0.0
    handle = display(Markdown(""), display_id=True)

    def flush():
        handle.update(Markdown("".join(buf)))

    for chunk in generator:
        buf.append(chunk)
        if not flushed and sum(len(x) for x in buf) >= min_early_flush_chars:
            flush(); flushed = True; last = time.time(); continue
        now = time.time()
        if (now - last) >= refresh:
            flush(); last = now
    flush()
    return "".join(buf)


In [None]:

# --- PG Retrievers: Prod Informa (Informa knowledge) + Dev Profiles (employee) ---
import numpy as np
import psycopg
from psycopg.rows import dict_row

# ---------- Prod Informa vector store ----------
_PROD_PG_DSN = (
    f"postgresql://v_svc_usr_aidb:{urllib.parse.quote('j<pW@qNsFIc!(OR', safe='')}"
    f"@elysiadb.iris.informa.com:5432/aidb?sslmode=require"
)
_INFORMA_PREFILTER_SQL = '''
SELECT e.uuid AS id, e.embedding, e.document, e.cmetadata
FROM ai.langchain_pg_embedding e
JOIN ai.langchain_pg_collection c ON c.uuid = e.collection_id
WHERE c.name = %(collection)s
  AND (
    e.document ILIKE '%%' || %(query)s || '%%'
    OR CAST(e.cmetadata AS TEXT) ILIKE '%%' || %(query)s || '%%'
  )
LIMIT %(k)s;
'''

def _pg_prod_conn():
    return psycopg.connect(_PROD_PG_DSN, row_factory=dict_row)

def _cosine(a: np.ndarray, b: np.ndarray) -> float:
    d = (np.linalg.norm(a) * np.linalg.norm(b))
    if d == 0:
        return 0.0
    return float(np.dot(a, b) / d)

def retrieve_informa_snippets(query: str, collection: str = "internal_curated_informa_vectorstore",
                              k: int = 8, pre_k: int = 48, max_chars: int = 1200) -> List[Dict[str, str]]:
    if not query:
        return []
    with _pg_prod_conn() as conn, conn.cursor() as cur:
        cur.execute(_INFORMA_PREFILTER_SQL, {"collection": collection, "query": query, "k": pre_k})
        rows = cur.fetchall()

    if not rows:
        # fallback: condense a long query to improve recall
        q2 = "digital transformation" if "digital transformation" in (query.lower()) else "informa strategy"
        with _pg_prod_conn() as conn, conn.cursor() as cur:
            cur.execute(_INFORMA_PREFILTER_SQL, {"collection": collection, "query": q2, "k": pre_k})
            rows = cur.fetchall()
        if not rows:
            return []

    embs, items = [], []
    for r in rows:
        emb = np.array(r.get("embedding") or [], dtype=np.float32)
        embs.append(emb)
        items.append({"document": r.get("document") or "", "meta": r.get("cmetadata") or {}})
    centroid = np.mean(np.stack(embs), axis=0) if embs else None
    if centroid is not None:
        scored = []
        for it, emb in zip(items, embs):
            it_score = _cosine(centroid, emb)
            scored.append((it_score, it))
        scored.sort(key=lambda x: x[0], reverse=True)
        items = [it for _, it in scored]

    out = []
    for i, it in enumerate(items[:k]):
        doc = it["document"]
        snippet = doc if len(doc) <= max_chars else (doc[:max_chars] + "…")
        out.append({"title": f"informa_ctx#{i+1}", "snippet": snippet})
    return out

# ---------- Dev employee profiles (via PG_DSN) ----------
PG_DSN = os.getenv("PG_DSN", "")  # e.g., postgresql://.../aidb?sslmode=require
_EMP_PREFILTER_SQL = '''
SELECT e.uuid AS id, e.embedding, e.document, e.cmetadata
FROM ai.langchain_pg_embedding e
JOIN ai.langchain_pg_collection c ON c.uuid = e.collection_id
WHERE c.name = %(collection)s
  AND (
    e.document ILIKE '%%' || %(query)s || '%%'
    OR CAST(e.cmetadata AS TEXT) ILIKE '%%' || %(query)s || '%%'
  )
LIMIT %(k)s;
'''

def _pg_dev_conn():
    if not PG_DSN:
        raise RuntimeError("PG_DSN not set for Dev employee profile DB")
    return psycopg.connect(PG_DSN, row_factory=dict_row)

def retrieve_profile_doc(email: Optional[str] = None, name: Optional[str] = None,
                         collection: str = "internal_private_employee_profiles_vectorstore",
                         k: int = 8, pre_k: int = 48) -> Optional[Dict[str, Any]]:
    q = (email or name or "").strip()
    if not q:
        return None
    with _pg_dev_conn() as conn, conn.cursor() as cur:
        cur.execute(_EMP_PREFILTER_SQL, {"collection": collection, "query": q, "k": pre_k})
        rows = cur.fetchall()
    if not rows:
        return None
    # Score by centroid for a tiny bit of quality
    embs, items = [], []
    for r in rows:
        emb = np.array(r.get("embedding") or [], dtype=np.float32)
        embs.append(emb)
        items.append({"document": r.get("document") or "", "meta": r.get("cmetadata") or {}})
    centroid = np.mean(np.stack(embs), axis=0) if embs else None
    if centroid is not None:
        scored = []
        for it, emb in zip(items, embs):
            scored.append((_cosine(centroid, emb), it))
        scored.sort(key=lambda x: x[0], reverse=True)
        return scored[0][1]
    return items[0] if items else None

def _safe_json_load(txt: str) -> Any:
    try:
        return json.loads(txt)
    except Exception:
        return None

def extract_profile_fields_from_doc(doc: Dict[str, Any]) -> Dict[str, Any]:
    # Try cmetadata first
    meta = doc.get("meta") or {}
    name = meta.get("name") or meta.get("full_name") or meta.get("employee_name")
    title = meta.get("title") or meta.get("job_title")
    skills = meta.get("skills") or []
    topics = meta.get("topics") or []
    is_manager = bool(meta.get("is_manager", False))

    if not (name and title and skills):
        # Try parsing document JSON (if it's JSON)
        j = _safe_json_load(doc.get("document", ""))
        if isinstance(j, dict):
            name = name or j.get("name")
            title = title or j.get("title")
            skills = skills or j.get("skills") or []
            topics = topics or j.get("topics") or []

    # Fallbacks
    if isinstance(skills, str):
        skills = [s.strip() for s in re.split(r"[;,\n]", skills) if s.strip()]
    if not isinstance(skills, list):
        skills = []

    return {"name": name, "title": title, "skills": skills, "topics": topics, "is_manager": is_manager}


In [None]:

# --- Unified run_workflow(): profile + existing KB/tools + Informa (Prod PG) ---
from types import SimpleNamespace

def run_workflow(
    user_text: str,
    email: Optional[str] = None,
    name: Optional[str] = None,
    division: Optional[str] = None,
    override_is_manager: Optional[bool] = None,
    stream: bool = False,
) -> Dict[str, Any]:

    # 1) Gate (optional user-defined)
    try:
        gate = prohibitor(user_text)  # your function if present
    except NameError:
        gate = {"allowed": True, "intents": []}
    if not gate.get("allowed", True):
        return {"blocked": True, "gate": gate, "answer": "out_of_scope"}

    # 2) Profile/state (prefer user's setup_state/get_profile_fields if present)
    state = None
    profile_fields: Dict[str, Any] = {}
    try:
        state, profile_meta = setup_state(
            email=email, name=name, division=division,
            override_is_manager=override_is_manager, user_text=user_text
        )
        try:
            profile_fields = get_profile_fields(email=email, name=name, division=division)  # your helper if present
        except NameError:
            profile_fields = {}
    except NameError:
        # Fallback: retrieve from Dev PG vector store
        doc = retrieve_profile_doc(email=email, name=name)
        if doc:
            profile_fields = extract_profile_fields_from_doc(doc)
        state = SimpleNamespace(is_manager=bool(profile_fields.get("is_manager", False)))

    # Is manager override
    if override_is_manager is not None:
        state.is_manager = bool(override_is_manager)

    profile_found = any([profile_fields.get("name"), profile_fields.get("title"), profile_fields.get("skills")])

    # 3) Intent detection
    try:
        intents = detect_intents(user_text)  # your function if present
    except NameError:
        t = (user_text or "").lower()
        intents = []
        if "job" in t: intents.append("job")
        if any(k in t for k in ["course", "learn", "training", "upskill"]): intents.append("courses")
        if "manager" in t: intents.append("manager_toolkit")
        intents = intents or ["general"]
    try:
        intents = intent_persona(gate.get("intents", intents))  # optional post-processing
    except NameError:
        pass

    # 4) Core retrieval (use existing helpers when available)
    sections: Dict[str, List[Dict[str, Any]]] = {
        "jobs": [], "courses": [], "development_plan": [],
        "manager_toolkit": [], "leadership_strategy": [], "informa_strategy": []
    }

    # If user has a single helper to fill sections, use it
    try:
        sections.update(retrieve_sections(intents=intents, profile_fields=profile_fields))
    except NameError:
        # Otherwise call tools individually if they exist
        try:
            if "job" in intents:
                sections["jobs"] = job_reflexion(
                    job_tool(user_text, profile_q=[], profile_fields=profile_fields)
                )
        except NameError:
            pass
        try:
            if "courses" in intents:
                sections["courses"] = courses_reflexion(
                    courses_tool(user_text, state, profile_q=[], profile_fields=profile_fields),
                    state.is_manager
                )
        except NameError:
            pass

        # Optional candidate selection
        try:
            if "job" in intents:
                sections["jobs"] = choose_candidates(
                    user_text, sections.get("jobs"), profile_fields, target="jobs", top_n=6
                )
        except NameError:
            pass
        try:
            if "courses" in intents:
                sections["courses"] = choose_candidates(
                    user_text, sections.get("courses"), profile_fields, target="courses", top_n=6
                )
        except NameError:
            pass

    # 5) Informa strategy snippets from Prod
    try:
        info_snips = retrieve_informa_snippets(user_text, "internal_curated_informa_vectorstore", k=8)
        sections["informa_strategy"] = info_snips
    except Exception as e:
        print(f"⚠️ Informa PG retrieval failed: {e}")

    # 6) Stream vs non-stream
    if stream:
        stream_fn = globals().get("synthesize_answer_llm_stream")
        if not callable(stream_fn):
            raise NameError("synthesize_answer_llm_stream is not defined. Run the streaming cell first.")
        gen = stream_fn(
            user_text=user_text,
            intents=intents,
            is_manager=state.is_manager,
            profile_fields=profile_fields or {},
            sections=sections,
            model_id=os.getenv("PRIMARY_LLM_MODEL_NAME", "us.anthropic.claude-3-7-sonnet-20250219-v1:0"),
        )
        return {
            "stream": gen,
            "blocked": False,
            "gate": {"intents": intents},
            "state": state,
            "profile_found": profile_found,
            "profile_fields": profile_fields,
            "sections": sections,
        }

    # Non-stream fallback
    try:
        text_answer = synthesize_answer_llm(  # user's non-stream function if present
            user_text=user_text,
            intents=intents,
            is_manager=state.is_manager,
            profile_fields=profile_fields,
            sections=sections,
        )
    except NameError:
        gen = synthesize_answer_llm_stream(
            user_text=user_text,
            intents=intents,
            is_manager=state.is_manager,
            profile_fields=profile_fields,
            sections=sections,
            model_id=os.getenv("PRIMARY_LLM_MODEL_NAME", "us.anthropic.claude-3-7-sonnet-20250219-v1:0"),
        )
        text_answer = "".join(list(gen))

    return {
        "answer": text_answer,
        "blocked": False,
        "gate": {"intents": intents},
        "state": state,
        "profile_found": profile_found,
        "profile_fields": profile_fields,
        "sections": sections,
    }


In [None]:

# --- Example usage (run after setting AWS creds and PG_DSN in your env) ---
# Sanity check: fetch a couple of Informa snippets from Prod
try:
    print("Sample Informa snippets:", [s["snippet"][:160] for s in retrieve_informa_snippets("digital transformation", k=2)])
except Exception as e:
    print("Informa snippet test error:", e)

# End-to-end (streaming)
query = "Analyze my current skillset against Informa's digital transformation needs and recommend 5 specific learning opportunities to close these gaps."
try:
    out = run_workflow(query, email=os.getenv("DEFAULT_USER_EMAIL"), stream=True)
    rendered = render_stream(out["stream"])
except Exception as e:
    print("Workflow test error:", e)
