# Setup

In [None]:
import os
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "")
if not OPENAI_API_KEY:
    print("WARNING: OPENAI_API_KEY not set. Set it with %env or your environment before running API cells.")

In [None]:
import os
import re
import time
import json
import math
import numpy as np
import xml.etree.ElementTree as ET
from dataclasses import dataclass
from typing import List, Dict, Optional, Tuple
from collections import Counter
import hashlib
import random
from openai import OpenAI

# Functions

In [None]:
# ---------- Text utilities ----------
def normalize_text(s: str) -> str:
    s = s.replace("\r\n", "\n").replace("\r", "\n")
    s = re.sub(r"[ \t]+", " ", s)
    s = re.sub(r"\n{3,}", "\n\n", s)
    return s.strip()


# ---------- Simple cosine similarity ----------
def cosine_sim(a: np.ndarray, b: np.ndarray) -> float:
    denom = (np.linalg.norm(a) * np.linalg.norm(b)) + 1e-12
    return float(np.dot(a, b) / denom)

# Prompts

In [None]:
# ---------- Prompt store (XML) ----------
PROMPTS_XML = """<?xml version="1.0" encoding="UTF-8"?>
<prompts version="1.0">
  <prompt id="classify_scope.system"><![CDATA[
You are a domain gate for a RAG system.
Decide whether the user query is IN SCOPE for one of the allowed DOMAINS, or OUT OF SCOPE.
Rules:
1) If none of the domains clearly apply, set in_scope=false and domain=null.
2) Domain must be one of DOMAINS or null. Never invent domains.
3) Be conservative: low confidence => out of scope.
Return STRICT JSON with keys: in_scope (bool), domain (string|null), reason (string).
  ]]></prompt>

  <prompt id="classify_scope.user"><![CDATA[
DOMAINS = {DOMAINS}
QUERY = {QUERY}
  ]]></prompt>

  <prompt id="generate.system"><![CDATA[
You are a RAG assistant. Use ONLY the provided context.
If the question is out of scope, answer: "I don't have information on this topic."
Keep answers concise (≤ 6 sentences).
When evidence exists, reference sources in-text using [doc_id] markers that match citations.
Do not fabricate links or content beyond the given context.
  ]]></prompt>

  <prompt id="generate.user"><![CDATA[
Question: {QUERY}

Context:
{CONTEXT}
  ]]></prompt>
</prompts>
"""

# Classes

In [None]:

class _SafeDict(dict):
    def __missing__(self, key):
        return "{" + key + "}"


class PromptStore:
    def __init__(self, xml_text: str):
        self._prompts: Dict[str, str] = {}
        self._load(xml_text)

    def _load(self, xml_text: str):
        root = ET.fromstring(xml_text)
        for node in root.findall(".//prompt"):
            pid = node.attrib.get("id")
            if not pid:
                continue
            text = (node.text or "").strip()
            self._prompts[pid] = text

    def get(self, prompt_id: str) -> str:
        if prompt_id not in self._prompts:
            raise KeyError(f"Prompt id not found: {prompt_id}")
        return self._prompts[prompt_id]

    def render(self, prompt_id: str, **kwargs) -> str:
        return self.get(prompt_id).format_map(_SafeDict(**kwargs))


prompts = PromptStore(PROMPTS_XML)

print("Cell 1 ready: utils + PromptStore loaded with prompt ids",
list(prompts._prompts.keys()))

# Embeddings

In [None]:
client = OpenAI(api_key=OPENAI_API_KEY)
EMBED_MODEL = os.getenv("OPENAI_MODEL_EMBED", "text-embedding-3-small")  # 1536 dims

def _l2_normalize_rows(X: np.ndarray) -> np.ndarray:
    norms = np.linalg.norm(X, axis=1, keepdims=True) + 1e-12
    return X / norms

def embed_texts_openai(texts: List[str], model: str = EMBED_MODEL) -> np.ndarray:
    """
    Returns a 2D numpy array [n_texts, dim], L2-normalized.
    """
    resp = client.embeddings.create(model=model, input=texts)
    vecs = [d.embedding for d in resp.data]
    X = np.array(vecs, dtype=np.float32)
    return _l2_normalize_rows(X)

def embed_text_openai(text: str, model: str = EMBED_MODEL) -> np.ndarray:
    return embed_texts_openai([text], model=model)[0]