In [None]:
# Step 1 — Environment sanity check

import sys
print("Python version:", sys.version)

# Core libraries we will use
import chromadb
from sentence_transformers import SentenceTransformer

print("Chroma version:", chromadb.__version__)

# Load embedding model (small + fast)
model = SentenceTransformer("all-MiniLM-L6-v2")
print("Embedding model loaded successfully")


In [None]:
# Step 2 — Bugs-only documents (in-memory dataset)

bugs = [
    {
        "id": "BUG-1001",
        "title": "Checkout button disabled on iOS 17",
        "component": "Checkout",
        "severity": "P0",
        "created_date": "2025-12-18",
        "closed_date": None,
        "text": "Steps: Open cart -> tap Checkout. Actual: button stays disabled. Expected: proceed to payment. Environment: iOS 17. Impact: users cannot place orders. Notes: happens more when returning from background."
    },
    {
        "id": "BUG-1002",
        "title": "Tracking page loads blank screen on slow network",
        "component": "Tracking",
        "severity": "P1",
        "created_date": "2025-12-12",
        "closed_date": None,
        "text": "Steps: Place order -> open tracking. Actual: blank screen for 10-15s then sometimes recovers. Expected: tracking map and ETA visible. Environment: 3G/poor Wi-Fi. Suspected: API timeout not handled."
    },
    {
        "id": "BUG-1003",
        "title": "Promo code applied but discount not reflected in total",
        "component": "Checkout",
        "severity": "P1",
        "created_date": "2025-11-29",
        "closed_date": "2025-12-05",
        "text": "Steps: Enter valid promo -> Apply. Actual: promo shows applied but total remains unchanged. Expected: total reduced. Environment: Android 14. Root cause: pricing refresh not triggered after promo response."
    },
    {
        "id": "BUG-1004",
        "title": "Login OTP auto-fill fails on iOS",
        "component": "Auth",
        "severity": "P2",
        "created_date": "2025-12-02",
        "closed_date": "2025-12-09",
        "text": "Steps: Request OTP -> wait for SMS. Actual: OTP not auto-filled; user must type manually. Expected: auto-fill OTP. Environment: iOS 16/17. Notes: regression after input component change."
    },
    {
        "id": "BUG-1005",
        "title": "Apple Pay payment succeeds but order stuck in pending",
        "component": "Payments",
        "severity": "P0",
        "created_date": "2025-12-20",
        "closed_date": None,
        "text": "Steps: Checkout with Apple Pay. Actual: payment success shown but order status remains Pending and no confirmation screen. Expected: order confirmed + receipt. Environment: iOS 17. Notes: backend callback received late or not processed."
    },
    {
        "id": "BUG-1006",
        "title": "Search results flicker and reset after scroll",
        "component": "Search",
        "severity": "P3",
        "created_date": "2025-12-01",
        "closed_date": "2025-12-03",
        "text": "Steps: Search for restaurant -> scroll results. Actual: list flickers and jumps to top. Expected: stable scrolling. Environment: Android 13. Suspected: state refresh triggers full rebuild."
    },
    {
        "id": "BUG-1007",
        "title": "Cart quantity changes not saved when reopening app",
        "component": "Cart",
        "severity": "P1",
        "created_date": "2025-12-08",
        "closed_date": None,
        "text": "Steps: Add item -> increase quantity -> force close app -> reopen. Actual: quantity resets to 1. Expected: quantity persists. Environment: iOS 17 + Android 14. Notes: local cache write might be failing."
    },
    {
        "id": "BUG-1008",
        "title": "Address selection selects wrong address when two are similar",
        "component": "Address",
        "severity": "P2",
        "created_date": "2025-11-22",
        "closed_date": "2025-11-28",
        "text": "Steps: Choose address from list (two with similar names). Actual: wrong address selected. Expected: selected address matches tapped row. Environment: Android 14. Notes: list key/ID mapping issue."
    },
    {
        "id": "BUG-1009",
        "title": "Refund status not updated after cancellation",
        "component": "Payments",
        "severity": "P2",
        "created_date": "2025-12-06",
        "closed_date": None,
        "text": "Steps: Cancel paid order -> open order details. Actual: refund status stays 'Processing' for days. Expected: status updated when refund completed. Environment: iOS/Android. Notes: missing polling or webhook event."
    },
    {
        "id": "BUG-1010",
        "title": "Push notification opens app to home instead of order details",
        "component": "Notifications",
        "severity": "P3",
        "created_date": "2025-12-10",
        "closed_date": "2025-12-15",
        "text": "Steps: Tap 'Order updated' push. Actual: opens Home screen. Expected: deep link to order details. Environment: iOS 17. Root cause: deep link parsing failure for certain payloads."
    },
]

# Optional: load bugs from CSV if present (fallback to in-memory list)
import os, csv

CSV_PATH = "bugs_sample_20.csv"  # put the file in the same folder as the notebook

def load_bugs_from_csv(path: str):
    bugs = []
    with open(path, newline="", encoding="utf-8") as f:
        reader = csv.DictReader(f)
        for row in reader:
            bugs.append({
                "id": row.get("id"),
                "title": row.get("title"),
                "component": row.get("component"),
                "severity": row.get("severity"),
                "created_date": row.get("created_date"),
                "closed_date": row.get("closed_date") or None,
                "text": row.get("text"),
            })
    return bugs

if os.path.exists(CSV_PATH):
    bugs = load_bugs_from_csv(CSV_PATH)
    print(f"[Cell 2] Loaded {len(bugs)} bugs from CSV: {CSV_PATH}")
else:
    print(f"[Cell 2] CSV not found ({CSV_PATH}). Using in-memory dataset with {len(bugs)} bugs.")


print("Bugs count:", len(bugs))
open_count = sum(1 for b in bugs if b["closed_date"] is None)
print("Open bugs:", open_count, "| Closed bugs:", len(bugs) - open_count)
#print("Sample bug:", bugs[0]["id"], "-", bugs[0]["severity"], "-", bugs[0]["component"])




In [None]:
import pandas as pd

def bugs_to_df(bugs):
    df = pd.DataFrame(bugs).copy()

    # Parse dates safely
    df["created_date"] = pd.to_datetime(df["created_date"], errors="coerce")
    df["closed_date"]  = pd.to_datetime(df["closed_date"], errors="coerce")

    # Derived fields
    df["is_open"] = df["closed_date"].isna()
    df["resolution_days"] = (df["closed_date"] - df["created_date"]).dt.total_seconds() / 86400

    return df

def analytics_reports(df):
    # 1) Open bugs by component
    open_by_component = (
        df[df["is_open"]]
        .groupby("component")["id"]
        .count()
        .sort_values(ascending=False)
        .rename("open_bugs")
        .reset_index()
    )

    # 2) Resolution time by component (closed only)
    closed = df[~df["is_open"]].dropna(subset=["resolution_days"])
    resolution_by_component = (
        closed.groupby("component")["resolution_days"]
        .agg(
            closed_bugs="count",
            median_days="median",
            avg_days="mean",
            p75_days=lambda s: s.quantile(0.75),
            p90_days=lambda s: s.quantile(0.90),
        )
        .sort_values(by="median_days", ascending=False)
        .reset_index()
    )

    # 3) Critical bugs (open) - global + by component
    # (handles severity safely even if some are missing)
    sev = df["severity"].fillna("").astype(str).str.strip().str.lower()
    open_critical = df[df["is_open"] & (sev == "p0")].copy()
    open_critical = open_critical.sort_values(by="created_date", ascending=True)

    open_critical_by_component = (
        open_critical.groupby("component")["id"]
        .count()
        .sort_values(ascending=False)
        .rename("open_critical_bugs")
        .reset_index()
    )

    return open_by_component, resolution_by_component, open_critical, open_critical_by_component


# ---------- RUN REPORTS ----------
df = bugs_to_df(bugs)

open_by_component, resolution_by_component, open_critical, open_critical_by_component = analytics_reports(df)

print("\n=== Open bugs by component (Top 20) ===")
print(open_by_component.head(20).to_string(index=False))

print("\n=== Resolution time by component (closed only) (Top 20 by median_days) ===")
print(resolution_by_component.head(20).to_string(index=False))

print("\n=== Open critical bugs (oldest first) (Top 50) ===")
cols = ["id", "title", "component", "severity", "created_date", "closed_date"]
available_cols = [c for c in cols if c in open_critical.columns]
print(open_critical[available_cols].head(50).to_string(index=False))

print("\n=== Open critical bugs by component ===")
print(open_critical_by_component.to_string(index=False))


# ---------- OPTIONAL: EXPORT TO CSV ----------
# Set to True if you want files saved next to your notebook/script
# EXPORT_CSV = True

# if EXPORT_CSV:
#     open_by_component.to_csv("open_by_component.csv", index=False)
#     resolution_by_component.to_csv("resolution_by_component.csv", index=False)
#     open_critical.to_csv("open_critical.csv", index=False)
#     open_critical_by_component.to_csv("open_critical_by_component.csv", index=False)
#     df.to_csv("bugs_all.csv", index=False)  # optional: full dataset
#     print("\nCSV saved: open_by_component.csv, resolution_by_component.csv, open_critical.csv, open_critical_by_component.csv, bugs_all.csv")


In [None]:
# Step 3 — Build a Chroma collection from bug docs (you fill the TODOs)

import chromadb
from chromadb.config import Settings
from sentence_transformers import SentenceTransformer
from typing import Any, cast
from chromadb.utils import embedding_functions

chroma_embedding = embedding_functions.SentenceTransformerEmbeddingFunction(
    model_name="all-MiniLM-L6-v2"
)


# 1) embedding model (already tested in Cell 1)
embed_model = SentenceTransformer("all-MiniLM-L6-v2")

def bug_to_text(bug: dict) -> str:
    """
    Convert one bug dict into a single searchable text string.
    """
    # TODO (YOU): build a text string using:
    # id, title, component, severity, created_date, closed_date, text
    # Hint: f"..."
    bug_id = bug["id"]
    title = bug["title"]
    component = bug["component"]
    severity = bug["severity"]
    created = bug["created_date"]
    closed = bug["closed_date"] if bug["closed_date"] else "open"
    details = bug["text"]

    result = (
        f"{bug_id} | {severity} | created : {created} | closed : {closed}\n"
        f"Title : {title}\n"
        f"Details : {details}"
    )
    return result
#test function
#print(bug_to_text(bugs[0]))

def embed_texts(texts: list[str]) -> list[list[float]]:
    """
    Create embeddings for a list of text strings.
    """
    vectors = embed_model.encode(texts, convert_to_numpy=True)
    return vectors.tolist()
# ---- test embed_texts ----

# sample_texts = [
#     bug_to_text(bugs[0]),
#     bug_to_text(bugs[1])
# ]

# embeddings = embed_texts(sample_texts)

# print("Number of embeddings:", len(embeddings))
# print("Embedding vector length:", len(embeddings[0]))

def build_chroma_collection(bugs: list[dict], collection_name: str = "bugs"):
    """
    Create/get a local Chroma collection and upsert bug docs + embeddings.
    Returns the Chroma collection object.
    """
    client = chromadb.PersistentClient(
    path="./chroma_db_st",
    settings=Settings(anonymized_telemetry=False)
)


    # TODO 1 (YOU): create/get collection
    collection = client.get_or_create_collection(name = collection_name,embedding_function=chroma_embedding)
    

    bug_ids = [b["id"] for b in bugs]

    # TODO 2 (YOU): build texts list from bugs using bug_to_text
    texts = [bug_to_text(bug)for bug in bugs]

    # TODO 3 (YOU): build embeddings from texts using embed_texts
    embeddings = embed_texts(texts)

    metadatas = [
        {
            "component": b["component"],
            "severity": b["severity"],
            "created_date": b["created_date"],
            "closed_date": b["closed_date"] if b["closed_date"] is not None else "OPEN",
        }
        for b in bugs
    ]
    
    embeddings = cast(Any, embeddings)
    metadatas = cast(Any, metadatas)

    # TODO 4 (YOU): upsert into collection (ids, documents, embeddings, metadatas)
    collection.upsert(
    ids=bug_ids,
    documents=texts,
    embeddings=embeddings,
    metadatas=metadatas)

    return collection

#---- test build_chroma_collection ----

collection = build_chroma_collection(bugs, collection_name="bugs")
print("Chroma collection ready. Count:", collection.count())

# Optional quick sanity: fetch 1 item back
item = collection.get(ids=[bugs[0]["id"]])
print("Fetched id:", item["ids"][0])
print("Fetched metadata:", item["metadatas"][0])




In [None]:
# Step 4 — Simple similarity search (retrieve top matching bugs)

query = "From the retrieved bugs only: are there any OPEN P0 (critical) issues related to Apple Pay pending orders? List bug IDs and explain briefly."
results = collection.query(
    query_texts=[query],
    n_results=3
)

# Print top matches
for i in range(len(results["ids"][0])):
    bug_id = results["ids"][0][i]
    doc_preview = results["documents"][0][i][:160].replace("\n", " ")
    meta = results["metadatas"][0][i]
    print(f"{i+1}) {bug_id} | {meta['severity']} | {meta['component']} | {meta['closed_date']}")
    print("   ", doc_preview, "...\n")


In [None]:
import requests
import textwrap

OLLAMA_URL = "http://localhost:11434/api/generate"
MODEL = "qwen2.5"   # change to your installed model name

def build_llm_context(results, max_chars=4000):
    docs = results["documents"][0]
    metas = results["metadatas"][0]
    ids  = results["ids"][0]

    parts = []
    total = 0
    for bug_id, meta, doc in zip(ids, metas, docs):
        title = meta.get("title", "")
        severity = meta.get("severity", "")
        component = meta.get("component", "")
        block = f"BUG_ID: {bug_id}\nTITLE: {title}\nSEVERITY: {severity}\nCOMPONENT: {component}\nTEXT:\n{doc}\n"
        if total + len(block) > max_chars:
            break
        parts.append(block)
        total += len(block)
    return "\n---\n".join(parts)

def ollama_generate(prompt, model=MODEL):
    payload = {
        "model": model,
        "prompt": prompt,
        "stream": False,
        "temperature": 0.2
    }
    r = requests.post(OLLAMA_URL, json=payload, timeout=60)
    r.raise_for_status()
    return r.json()["response"]

# Use the same query + results from Cell 4
context = build_llm_context(results)

prompt = f"""
You are a QA assistant.
Answer the user's question using ONLY the bug context below.
If context is insufficient, say: "Not enough evidence in retrieved bugs."

USER QUESTION:
{query}

BUG CONTEXT:
{context}

Return:
1) Short answer (2-4 lines)
2) List the bug IDs you used
"""

answer = ollama_generate(prompt)

print("\n=== LLM Answer ===")
print(textwrap.fill(answer.strip(), width=100))


In [None]:
# Cell 6 — Hybrid Router (Rules + LLM fallback) — REFACTORED + supports list/count for open & closed

import re

CLOSED_SYNONYMS   = ["closed", "resolved", "solved", "fixed", "done"]
OPEN_SYNONYMS     = ["open", "pending", "active"]
CRITICAL_SYNONYMS = ["critical", "p0", "blocker", "sev0"]
LIST_WORDS        = ["list", "show", "display"]
COUNT_WORDS       = ["how many", "count", "number", "total"]


# -----------------------------
# Helpers (parsing + filtering)
# -----------------------------
def extract_metric(q: str) -> str | None:
    ql = (q or "").lower()
    if "median" in ql:
        return "median_days"
    if any(x in ql for x in ["average", "avg", "mean"]):
        return "avg_days"
    if "p75" in ql or "75th" in ql:
        return "p75_days"
    if "p90" in ql or "90th" in ql:
        return "p90_days"
    return None


def extract_component(q: str, known_components: list[str]) -> str | None:
    ql = (q or "").lower()

    norm = [(c, str(c).strip().lower()) for c in known_components if str(c).strip()]
    norm.sort(key=lambda x: len(x[1]), reverse=True)

    for original, lc in norm:
        if lc and lc in ql:
            return str(original).strip()
    return None


def filter_df_by_component(view_df, component: str | None):
    if not component:
        return view_df
    return view_df[view_df["component"].astype(str).str.strip().str.lower() == component.strip().lower()]


def known_components_from_df(df):
    return sorted(df["component"].dropna().astype(str).unique().tolist())


def has_any(ql: str, words: list[str]) -> bool:
    return any(w in ql for w in words)


# -----------------------------
# Analytics handlers
# -----------------------------
def show_resolution_metric(question: str, resolution_by_component, df):
    ql = question.lower()
    metric = extract_metric(question) or "median_days"

    component = extract_component(question, known_components_from_df(df))

    view = resolution_by_component[["component", metric]].copy()
    view = filter_df_by_component(view, component)

    if not component:
        view = view.sort_values(by=metric, ascending=False)

    print(f"\n--- Resolution metric: {metric}" + (f" | Component: {component}" if component else "") + " ---")
    if view.empty:
        print("No data found for that component/metric (maybe no closed bugs for it yet).")
    else:
        print(view.to_string(index=False))


def show_open_bugs_list(df, question: str):
    component = extract_component(question, known_components_from_df(df))
    view = df[df["is_open"]].copy()
    view = filter_df_by_component(view, component)

    cols = ["id", "title", "component", "severity", "created_date"]
    cols = [c for c in cols if c in view.columns]

    print("\n--- Open bugs (detailed)" + (f" | Component: {component}" if component else "") + " ---")
    if view.empty:
        print("No open bugs found." if not component else "No open bugs found for that component.")
    else:
        print(view[cols].sort_values(by="created_date", ascending=True).to_string(index=False))


def show_closed_bugs_list(df, question: str):
    component = extract_component(question, known_components_from_df(df))
    view = df[~df["is_open"]].copy()
    view = filter_df_by_component(view, component)

    cols = ["id", "title", "component", "severity", "created_date", "closed_date", "resolution_days"]
    cols = [c for c in cols if c in view.columns]

    print("\n--- Closed bugs (detailed)" + (f" | Component: {component}" if component else "") + " ---")
    if view.empty:
        print("No closed bugs found." if not component else "No closed bugs found for that component.")
    else:
        # sort by closed_date if available
        sort_col = "closed_date" if "closed_date" in view.columns else "created_date"
        print(view[cols].sort_values(by=sort_col, ascending=True).to_string(index=False))


def show_open_bugs_count(open_by_component, df, question: str):
    component = extract_component(question, known_components_from_df(df))
    view = open_by_component.copy()
    view = filter_df_by_component(view, component)

    print("\n--- Open bugs count" + (f" | Component: {component}" if component else "") + " ---")
    if component and view.empty:
        print(f"{component}  0")
    else:
        print(view.to_string(index=False))


def show_closed_bugs_count(df, question: str):
    component = extract_component(question, known_components_from_df(df))
    view = df[~df["is_open"]].copy()
    view = filter_df_by_component(view, component)

    n = int(view.shape[0])
    if component:
        print(f"\nClosed bugs for {component}: {n}")
    else:
        print(f"\nClosed bugs (total): {n}")


def show_open_critical(df, open_critical, open_critical_by_component, question: str):
    component = extract_component(question, known_components_from_df(df))

    view_crit = open_critical.copy()
    view_crit = filter_df_by_component(view_crit, component)

    cols = ["id", "title", "component", "severity", "created_date"]
    cols = [c for c in cols if c in view_crit.columns]

    print("\n--- Open P0 (critical) bugs (oldest first)" + (f" | Component: {component}" if component else "") + " ---")
    if view_crit.empty and component:
        print("No open P0 bugs found for that component.")
    else:
        if not view_crit.empty:
            print(view_crit[cols].sort_values(by="created_date", ascending=True).to_string(index=False))
        else:
            print("No open P0 bugs found.")

    print("\n--- Open P0 (critical) bugs by component" + (f" | Component: {component}" if component else "") + " ---")
    view_crit_comp = open_critical_by_component.copy()
    view_crit_comp = filter_df_by_component(view_crit_comp, component)

    if view_crit_comp.empty and component:
        print(f"{component}  0")
    else:
        print(view_crit_comp.to_string(index=False))


def analytics_dispatch(user_question: str, df, open_by_component, resolution_by_component, open_critical, open_critical_by_component):
    ql = user_question.lower()
    print("=== Analytics (computed by Python, not guessed) ===")

    wants_list  = has_any(ql, LIST_WORDS)
    wants_count = has_any(ql, COUNT_WORDS)

    mentions_open     = has_any(ql, OPEN_SYNONYMS) or "open bugs" in ql
    mentions_closed   = has_any(ql, CLOSED_SYNONYMS) or "closed bugs" in ql
    mentions_critical = has_any(ql, CRITICAL_SYNONYMS)

    # 1) Resolution metrics (median/avg/p75/p90 etc.)
    if has_any(ql, ["median", "average", "avg", "mean", "p75", "p90", "percentile", "resolution", "time to close", "sla"]):
        show_resolution_metric(user_question, resolution_by_component, df)
        return

    # 2) Special negation: "open bugs not only critical"
    if mentions_open and (("not only" in ql) or ("not just" in ql)):
        show_open_bugs_list(df, user_question) if wants_list or ("all open" in ql) else show_open_bugs_count(open_by_component, df, user_question)
        return

    # 3) Critical/P0 requests
    if mentions_critical:
        show_open_critical(df, open_critical, open_critical_by_component, user_question)
        return

    # 4) Closed requests (count vs list)
    if mentions_closed:
        if wants_list:
            show_closed_bugs_list(df, user_question)
        else:
            # default for closed is count if they ask "how many/count/number/total"
            if wants_count:
                show_closed_bugs_count(df, user_question)
            else:
                # if they didn't specify list/count, count is safer
                show_closed_bugs_count(df, user_question)
        return

    # 5) Open requests (count vs list)
    if mentions_open:
        if wants_list:
            show_open_bugs_list(df, user_question)
        else:
            show_open_bugs_count(open_by_component, df, user_question)
        return

    # 6) Default view
    show_open_bugs_count(open_by_component, df, user_question)


# -----------------------------
# Routing (Hybrid)
# -----------------------------
def rule_route(q: str) -> str | None:
    ql = (q or "").strip().lower()

    analytics_patterns = [
        r"\bmedian\b", r"\baverage\b", r"\bavg\b", r"\bmean\b",
        r"\bp\d{2}\b",
        r"\bpercentile\b",
        r"\bhow many\b", r"\bcount\b", r"\bnumber of\b", r"\btotal\b",
        r"\bby component\b", r"\bbreakdown\b",
        r"\bresolution time\b", r"\btime to close\b", r"\bsla\b",
        r"\btrend\b", r"\bover (last|past)\b", r"\bper week\b", r"\bper month\b",
        r"\bopen\b", r"\bclosed\b", r"\bresolved\b", r"\bfixed\b", r"\bsolved\b",
    ]
    if any(re.search(p, ql) for p in analytics_patterns):
        return "ANALYTICS"

    rag_patterns = [
        r"\bis there (a|any) (known )?bug\b",
        r"\bknown issue\b",
        r"\bsimilar bug\b",
        r"\brelated to\b",
        r"\bwhy does\b",
        r"\bwhat causes\b",
        r"\bwhich bug\b",
    ]
    if any(re.search(p, ql) for p in rag_patterns):
        return "RAG"

    return None


def llm_route(q: str, model: str = MODEL) -> str:
    router_prompt = f"""
You are a strict classifier for a QA assistant.

Decide which tool should answer the user's question:
- ANALYTICS: counts/medians/percentiles/trends/breakdowns/open/closed totals, comparisons across components, listing open/closed bugs.
- RAG: finding/identifying/summarizing specific bugs or known issues based on bug text.

Return ONLY one word: ANALYTICS or RAG.

USER QUESTION:
{q}
""".strip()

    out = ollama_generate(router_prompt, model=model).strip().upper()
    if "ANALYTICS" in out:
        return "ANALYTICS"
    if "RAG" in out:
        return "RAG"
    return "RAG"


# -----------------------------
# Main entry point
# -----------------------------
def answer_question(user_question: str, top_k: int = 3):
    """
    Requires from earlier cells:
      - df, open_by_component, resolution_by_component, open_critical, open_critical_by_component (Cell 2.5)
      - collection (Cell 3)
      - build_llm_context, ollama_generate (Cell 5)
      - MODEL, OLLAMA_URL already set (Cell 5)
    """
    route = rule_route(user_question)
    if route is None:
        route = llm_route(user_question)

    print(f"\n[Router] Route = {route}\n")

    if route == "ANALYTICS":
        analytics_dispatch(
            user_question=user_question,
            df=df,
            open_by_component=open_by_component,
            resolution_by_component=resolution_by_component,
            open_critical=open_critical,
            open_critical_by_component=open_critical_by_component,
        )
        return

    # --- RAG route ---
    rag_query = user_question
    rag_results = collection.query(
        query_texts=[rag_query],
        n_results=top_k,
        include=["documents", "metadatas", "distances"]
    )

    print("=== RAG Top Matches ===")
    for i in range(len(rag_results["ids"][0])):
        bug_id = rag_results["ids"][0][i]
        meta = rag_results["metadatas"][0][i]
        dist = rag_results["distances"][0][i]
        doc_preview = rag_results["documents"][0][i][:160].replace("\n", " ")
        print(f"{i+1}) {bug_id} | {meta.get('severity')} | {meta.get('component')} | {meta.get('closed_date')} | dist={dist:.4f}")
        print("   ", doc_preview, "...\n")

    context = build_llm_context(rag_results)
    prompt = f"""
You are a QA assistant.
Answer the user's question using ONLY the bug context below.
If context is insufficient, say: "Not enough evidence in retrieved bugs."

USER QUESTION:
{rag_query}

BUG CONTEXT:
{context}

Return:
1) Short answer (2-4 lines)
2) List the bug IDs you used
""".strip()

    llm_answer = ollama_generate(prompt)
    # --- Grounding / validation ---
    if retrieval_is_weak(rag_results, max_dist_threshold=0.55):
        print("=== Final Answer (Safe Refusal) ===")
        print(format_safe_refusal(rag_query, rag_results))
        return

    ok, reason, used_ids = validate_llm_answer(llm_answer, rag_results, min_ids=1)
    if not ok:
        print("=== Final Answer (Safe Refusal) ===")
        print(format_safe_refusal(rag_query, rag_results))
    # (Optional debug)
    # print(f"[Debug] Validation failed: {reason}")
        return

    print("=== Final Answer (Grounded) ===")
    print(llm_answer.strip())
    print("\nEvidence bug IDs:", ", ".join(used_ids))

    


# --- Try examples ---
#answer_question("list the open bugs for payments?")
answer_question("App is slow sometimes and feels buggy?")
#answer_question("how many resolved bugs for checkout?")
#answer_question("list closed bugs for checkout")
#answer_question("What’s the p90 resolution time by component?")
#answer_question("Is there a known bug where Apple Pay succeeds but order stays pending?")


In [88]:
# Cell 7 — Grounding / Validation Layer (RAG answers)

import re

BUG_ID_PATTERN = r"\bBUG-\d+\b"

def extract_bug_ids(text: str):
    return sorted(set(re.findall(BUG_ID_PATTERN, text or "")))

def retrieval_is_weak(rag_results, max_dist_threshold: float = 0.55):
    """
    Simple evidence check:
    - If no results → weak
    - If best (lowest) distance is above threshold → weak
    Note: lower distance = more similar in Chroma results you printed.
    """
    try:
        dists = rag_results.get("distances", [[]])[0]
        if not dists:
            return True
        best = min(dists)
        return best > max_dist_threshold
    except Exception:
        return True

def validate_llm_answer(llm_answer: str, rag_results, min_ids: int = 1):
    """
    Enforces:
    - LLM must cite at least one BUG-xxxx
    - All cited bug IDs must be among retrieved results
    Returns: (is_ok: bool, reason: str, used_ids: list[str])
    """
    retrieved_ids = set(rag_results.get("ids", [[]])[0])
    used_ids = extract_bug_ids(llm_answer)

    if len(used_ids) < min_ids:
        return (False, "LLM did not cite any bug IDs.", used_ids)

    # any hallucinated IDs?
    hallucinated = [bid for bid in used_ids if bid not in retrieved_ids]
    if hallucinated:
        return (False, f"LLM cited bug IDs not in retrieved context: {hallucinated}", used_ids)

    return (True, "OK", used_ids)

def format_safe_refusal(rag_query: str, rag_results):
    """
    Safe refusal with transparency.
    """
    retrieved = rag_results.get("ids", [[]])[0]
    if not retrieved:
        return f"Not enough evidence in retrieved bugs for: '{rag_query}'. (No matches returned.)"

    # show top 1–3 candidates to help user rephrase
    top = retrieved[:3]
    return (
        f"Not enough evidence in retrieved bugs for: '{rag_query}'.\n"
        f"Closest matches found: {', '.join(top)}.\n"
        "Try rephrasing with more details (platform, screen, steps, error message)."
    )


In [96]:
PROJECT_ROOT = os.path.abspath(os.path.join(os.getcwd(), ".."))

from qa_rag import build_state_from_csv_or_memory, answer_question



ImportError: cannot import name 'build_state_from_csv_or_memory' from 'qa_rag' (unknown location)