In [1]:
import os
import time
import re
import torch
import numpy as np
from typing import Dict, Generator, Iterable, List
from sentence_transformers import SentenceTransformer
from fastlite import database
from openai import OpenAI
import httpx
import json
import app_db
try:
    from dotenv import load_dotenv
except ImportError:
    load_dotenv = None

In [2]:
MODEL_NAME = "BAAI/bge-small-en-v1.5"
LLM_MODEL = "gpt-5.2"

# Domain glossary + synonym handling for query expansion.
SYNONYM_GROUPS = [
    {
        "canonical": "myway",
        "aliases": ["myway", "prepay", "prepaid"],
        "note": "Prepay and MyWay refer to the same program.",
    },
]

GLOSSARY_SNIPPETS = [
    "Glossary: 'Prepay' and 'MyWay' refer to the same program; treat them as identical terms.",
]

In [3]:
db = database("scraper.db")

In [4]:
device = "cuda" if torch.cuda.is_available() else "cpu"
model = SentenceTransformer(MODEL_NAME, device=device)
model.max_seq_length = 512

In [5]:
if load_dotenv:
    load_dotenv()
else:
    print("python-dotenv not installed; set OPENAI_API_KEY in the environment.")

In [6]:
def _expand_query_variants(query: str) -> List[str]:
    """Return query variants by swapping known synonym aliases."""
    variants = {query}
    for group in SYNONYM_GROUPS:
        aliases = group.get("aliases", [])
        if not aliases:
            continue
        pattern = re.compile(r"\b(" + "|".join(map(re.escape, aliases)) + r")\b", re.IGNORECASE)
        next_variants = set()
        for v in variants:
            if pattern.search(v):
                for alias in aliases:
                    next_variants.add(pattern.sub(alias, v))
            else:
                next_variants.add(v)
        variants = next_variants
    return list(variants)


def _query_embeddings(queries: Iterable[str]) -> np.ndarray:
    return model.encode(
        list(queries),
        normalize_embeddings=True,
        show_progress_bar=False,
    )


def search_embeddings(db, query, top_k=5):
    """Search stored embeddings with a text query and return top matches."""
    t0 = time.perf_counter()
    embeddings = list(db.t.embeddings())
    if not embeddings:
        print("No embeddings found in database")
        return []

    query_variants = _expand_query_variants(query)
    query_embeddings = _query_embeddings(query_variants)

    scored = []
    for row in embeddings:
        embedding = np.frombuffer(row["embedding"], dtype=np.float32)
        score = float(np.max(np.dot(query_embeddings, embedding)))
        scored.append((score, row["chunk_id"]))

    scored.sort(key=lambda x: x[0], reverse=True)
    print(f"timing: search_embeddings {time.perf_counter() - t0:.3f}s")
    return scored[:top_k]

In [7]:
def get_parent_extracts(db, scored_results, max_extracts=None):
    """Return de-duplicated parent extracts for scored chunk results."""
    t0 = time.perf_counter()
    extracts = []
    seen_extract_ids = set()

    for score, chunk_id in scored_results:
        chunk = db.t.chunks[chunk_id]
        extract_id = chunk["extract_id"]
        if extract_id in seen_extract_ids:
            continue
        seen_extract_ids.add(extract_id)

        extract = db.t.extracts[extract_id]
        extracts.append(
            {
                "score": score,
                "chunk_id": chunk_id,
                "extract_id": extract_id,
                "text": extract["text"].strip(),
            }
        )

        if max_extracts is not None and len(extracts) >= max_extracts:
            break

    print(f"timing: get_parent_extracts {time.perf_counter() - t0:.3f}s")
    return extracts

In [8]:
def build_context(extracts, glossary: Iterable[str] | None = None):
    """Assemble extracts into a single context string."""
    parts = []
    if glossary:
        for note in glossary:
            parts.append("[glossary]\n" + note)
    for item in extracts:
        header = f"[extract_id={item['extract_id']} score={item['score']:.4f}]"
        parts.append(header + "\n" + item["text"])
    return "\n\n---\n\n".join(parts)

In [9]:
def build_source_links(db, scored_results, max_sources=3):
    """Return de-duplicated source links for scored chunk results."""
    sources = []
    seen_extract_ids = set()

    for score, chunk_id in scored_results:
        chunk = db.t.chunks[chunk_id]
        extract_id = chunk["extract_id"]
        if extract_id in seen_extract_ids:
            continue
        seen_extract_ids.add(extract_id)

        extract = db.t.extracts[extract_id]
        page = db.t.pages[extract["page_id"]]
        url = page["url"]

        sources.append(
            {
                "score": score,
                "chunk_id": chunk_id,
                "extract_id": extract_id,
                "url": url,
            }
        )

        if max_sources is not None and len(sources) >= max_sources:
            break

    return sources

In [10]:
def answer_query_with_context(query, top_k=5, max_extracts=3):
    """Search, gather context, and ask the LLM to answer."""
    t0 = time.perf_counter()
    scored = search_embeddings(db, query, top_k=top_k)
    extracts = get_parent_extracts(db, scored, max_extracts=max_extracts)
    context = build_context(extracts, glossary=GLOSSARY_SNIPPETS)

    if not context:
        print("No context available to send to the LLM.")
        return None

    client = OpenAI(http_client=httpx.Client(verify=False))
    t_llm = time.perf_counter()
    response = client.responses.create(
        model=LLM_MODEL,
        reasoning={"effort": "none"},
        text={"verbosity": "low"},
        input=[
            {
                "role": "system",
                "content": [
                    {
                        "type": "input_text",
                        "text": (
                            "Answer the question using only the provided context. "
                            "If the answer is not in the context, say you don't know."
                        ),
                    }
                ],
            },
            {
                "role": "user",
                "content": [
                    {
                        "type": "input_text",
                        "text": f"Question: {query}\n\nContext:\n{context}",
                    }
                ],
            },
        ],
    )

    print(f"timing: llm_response {time.perf_counter() - t_llm:.3f}s")
    print(f"timing: total {time.perf_counter() - t0:.3f}s")
    print(response.output_text)
    sources = build_source_links(db, scored, max_sources=max_extracts)
    if sources:
        print("\nSources:")
        for source in sources:
            print(f"- {source['url']}")
    return response.output_text, sources


def _event_get(event, key, default=None):
    if isinstance(event, dict):
        return event.get(key, default)
    return getattr(event, key, default)


def stream_answer_with_context(
    query, top_k=5, max_extracts=3
) -> Generator[Dict, None, None]:
    """Stream the LLM response as text deltas plus a final sources event."""
    t0 = time.perf_counter()
    cached = app_db.get_cache_answer(query)
    if cached:
        sources = []
        if cached.get("sources_json"):
            try:
                sources = json.loads(cached["sources_json"])
            except Exception:
                sources = []
        yield {"type": "cache", "cache_id": cached.get("id")}
        text = cached.get("answer_text", "")
        for i in range(0, len(text), 80):
            yield {"type": "delta", "text": text[i : i + 80]}
        yield {"type": "sources", "sources": sources}
        yield {"type": "done"}
        return
    scored = search_embeddings(db, query, top_k=top_k)
    extracts = get_parent_extracts(db, scored, max_extracts=max_extracts)
    context = build_context(extracts, glossary=GLOSSARY_SNIPPETS)
    sources = build_source_links(db, scored, max_sources=max_extracts)

    if not context:
        yield {
            "type": "delta",
            "text": "I couldn't find relevant context in the embeddings database.",
        }
        yield {"type": "sources", "sources": sources}
        yield {"type": "done"}
        return

    client = OpenAI(http_client=httpx.Client(verify=False))
    t_llm = time.perf_counter()
    stream = client.responses.create(
        model=LLM_MODEL,
        reasoning={"effort": "none"},
        text={"verbosity": "low"},
        input=[
            {
                "role": "system",
                "content": [
                    {
                        "type": "input_text",
                        "text": (
                            "Answer the question using only the provided context. "
                            "If the answer is not in the context, say you don't know."
                        ),
                    }
                ],
            },
            {
                "role": "user",
                "content": [
                    {
                        "type": "input_text",
                        "text": f"Question: {query}\n\nContext:\n{context}",
                    }
                ],
            },
        ],
        stream=True,
    )

    for event in stream:
        if _event_get(event, "type") == "response.output_text.delta":
            delta = _event_get(event, "delta")
            if delta:
                yield {"type": "delta", "text": delta}

    print(f"timing: llm_stream {time.perf_counter() - t_llm:.3f}s")
    print(f"timing: total {time.perf_counter() - t0:.3f}s")
    yield {"type": "sources", "sources": sources}
    yield {"type": "done"}

In [None]:
# answer_query_with_context("transfer service")

timing: search_embeddings 0.073s
timing: get_parent_extracts 0.001s
timing: llm_response 4.310s
timing: total 4.445s
A transfer of service is when a customer with current active service moves to another location within JEA’s service area and needs service moved with **no interruption**.

To process a transfer, the customer must provide **both** the **stop date** for the current premise and the **start date** for the new premise **during the same call**. The stop and start dates can **overlap up to 30 days**; if they need both active more than 30 days, use the **additional service** process.

When processing, you must select the **Transfer** button (or the transfer may bill a new/incorrect deposit). If the transfer isn’t processed until the end of the script, **no part of the start or stop will be completed**.

Sources:
- https://connections/?docs=residential/start-stop-transfer-traditional-service/transfer-service
- https://connections/?docs=residential/start-stop-transfer-traditional-

'A transfer of service is when a customer with current active service moves to another location within JEA’s service area and needs service moved with **no interruption**.\n\nTo process a transfer, the customer must provide **both** the **stop date** for the current premise and the **start date** for the new premise **during the same call**. The stop and start dates can **overlap up to 30 days**; if they need both active more than 30 days, use the **additional service** process.\n\nWhen processing, you must select the **Transfer** button (or the transfer may bill a new/incorrect deposit). If the transfer isn’t processed until the end of the script, **no part of the start or stop will be completed**.'