In [5]:
# """
# Healthcare User Story Extractor (Vertex AI + RAG + Async + BigQuery Export)
# ---------------------------------------------------------------------------
# Parses healthcare requirement docs and extracts validated user stories
# using Google Vertex AI (Gemini + Embeddings). Optionally exports results
# to BigQuery with dynamic schema inference.

# Deps:
#   pip install google-cloud-aiplatform langchain-google-vertexai PyPDF2 python-docx pydantic numpy tqdm
# Auth:
#   gcloud auth application-default login
# """

# LLM_MODEL = "gemini-2.0-flash"  # e.g., "gemini-1.5-pro"

# import os
# import re
# import json
# import uuid
# import asyncio
# import docx
# import xml.etree.ElementTree as ET
# import numpy as np
# from tqdm.auto import tqdm
# from PyPDF2 import PdfReader
# from typing import List, Iterable, Dict, Any, Optional
# from pydantic import BaseModel, Field, ValidationError

# from google.cloud import bigquery
# from langchain_google_vertexai import VertexAIEmbeddings, VertexAI

# # ========================== Helpers & Schema ==========================

# def _story_text_for_embedding(s: dict) -> str:
#     """Flatten story + ACs for richer similarity signals."""
#     ac = s.get("acceptance_criteria", []) or []
#     ac_text = " | ".join([f"G:{a.get('given','')} W:{a.get('when','')} T:{a.get('then','')}" for a in ac])
#     return f"{s.get('user_story','')} || {ac_text}"

# class AcceptanceCriteria(BaseModel):
#     given: str
#     when: str
#     then: str

# class Citation(BaseModel):
#     page: int
#     snippet: str

# class UserStory(BaseModel):
#     epic: str
#     story_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
#     user_story: str
#     acceptance_criteria: List[AcceptanceCriteria]
#     priority: str
#     dependencies: List[str] = []
#     non_functional: List[str] = []
#     source_requirement_ids: List[str] = []
#     assumptions: List[str] = []
#     open_questions: List[str] = []
#     citations: List[Citation] = []   # NEW

# SCHEMA = {
#     "epic": "",
#     "story_id": "",
#     "user_story": "As a <role>, I want <capability> so that <benefit>.",
#     "acceptance_criteria": [
#         {"given": "", "when": "", "then": ""}
#     ],
#     "priority": "Must|Should|Could|Won't",
#     "dependencies": [],
#     "non_functional": [],
#     "source_requirement_ids": [],
#     "assumptions": [],
#     "open_questions": [],
#     "citations": [{"page": 0, "snippet": ""}]  # NEW
# }

# # ========================== Parsing & Normalization ==========================

# BULLET_RE = re.compile(r"^\s*(?:[-*•]|[0-9]+[.)])\s+")
# REQ_ID_RE  = re.compile(r"^(REQ[-\s]?\d+|[A-Z]{2,}\d+|[0-9]+(?:\.[0-9]+)*)\b")
# REQ_KEYWORDS = re.compile(r"\b(system shall|shall|must|should|ability to|capability to|enable|allow)\b", re.I)

# def parse_pdf_pages(path: str) -> List[Dict[str, Any]]:
#     """Return list of {'page': int, 'text': str} for PDFs."""
#     reader = PdfReader(path)
#     pages = []
#     for idx, page in enumerate(reader.pages, start=1):
#         pages.append({"page": idx, "text": page.extract_text() or ""})
#     return pages

# def parse_docx(path: str) -> str:
#     doc = docx.Document(path)
#     return "\n".join([p.text for p in doc.paragraphs if p.text.strip()])

# def parse_xml(path: str) -> str:
#     tree = ET.parse(path)
#     root = tree.getroot()
#     return ET.tostring(root, encoding="unicode")

# def parse_json_text(path: str) -> str:
#     with open(path, "r", encoding="utf-8") as f:
#         return json.dumps(json.load(f), indent=2, ensure_ascii=False)

# def parse_file_text_or_pages(path: str) -> Dict[str, Any]:
#     """Return {'text': str} for non-PDFs, or {'pages': [{page, text}, ...]} for PDFs."""
#     path_l = path.lower()
#     if path_l.endswith(".pdf"):
#         return {"pages": parse_pdf_pages(path)}
#     if path_l.endswith(".docx"):
#         return {"text": parse_docx(path)}
#     if path_l.endswith(".xml"):
#         return {"text": parse_xml(path)}
#     if path_l.endswith(".json"):
#         return {"text": parse_json_text(path)}
#     with open(path, "r", encoding="utf-8", errors="ignore") as f:
#         return {"text": f.read()}

# def normalize_text(text: str) -> str:
#     """Join soft-wrapped lines, preserve bullets/headings, keep paragraph breaks."""
#     lines = text.splitlines()
#     out = []
#     buf = ""
#     for raw in lines:
#         line = raw.rstrip()
#         # blank line => paragraph break
#         if not line.strip():
#             if buf:
#                 out.append(buf)
#                 buf = ""
#             out.append("")
#             continue
#         # new bullet or heading/ID => start new requirement block
#         if BULLET_RE.match(line) or REQ_ID_RE.match(line):
#             if buf:
#                 out.append(buf)
#             buf = line.strip()
#             continue
#         # soft wrap: if buf doesn't end sentence punctuation, join
#         if buf and not buf.endswith((".", ":", ";")):
#             buf += " " + line.strip()
#         else:
#             if buf:
#                 out.append(buf)
#             buf = line.strip()
#     if buf:
#         out.append(buf)
#     # collapse consecutive blanks
#     cleaned = []
#     for s in out:
#         if s == "" and cleaned and cleaned[-1] == "":
#             continue
#         cleaned.append(s)
#     return "\n".join(cleaned)

# def normalize_page_text(pages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
#     return [{"page": p["page"], "text": normalize_text(p["text"])} for p in pages]

# import re
# from typing import List, Dict

# # ========================== Heading-based Epic Extraction ==========================

# # ========================== Heading-based Epic Extraction ==========================

# HEADING_RE = re.compile(r"^(\d+(?:\.\d+)*)(?:\s+)([A-Z][\w\s-]+.*)$")

# # Generic section titles we don't want to use as epics
# GENERIC_HEADINGS = {
#     "introduction", "purpose", "scope",
#     "functional requirements", "non-functional requirements",
#     "references", "appendix"
# }

# def extract_headings(text: str) -> Dict[str, str]:
#     """
#     Extract headings like '2.1.2 Clinician Portal Development'
#     Returns dict: {'2.1.2': 'Clinician Portal Development'}
#     Skips generic titles.
#     """
#     headings = {}
#     for line in text.splitlines():
#         m = HEADING_RE.match(line.strip())
#         if m:
#             sec_id = m.group(1)
#             title = m.group(2).strip()
#             # Skip generic container titles
#             if title.lower() not in GENERIC_HEADINGS:
#                 headings[sec_id] = title
#     return headings

# def assign_epic(req_id: str, headings: Dict[str, str]) -> str:
#     """
#     Assign epic based on the closest heading prefix.
#     Example: req_id='2.1.2.5' → epic='Clinician Portal Development'
#     Falls back to 'General' if none found.
#     """
#     for h in sorted(headings.keys(), key=lambda x: -len(x)):  # longest prefix first
#         if req_id.startswith(h):
#             return headings[h]
#     return "General"



# def split_requirements_with_epics(text: str) -> List[Dict[str, str]]:
#     """
#     Segment into atomic requirements with IDs (AUTO-n fallback),
#     and attach inferred epics from document headings.
#     """
#     headings = extract_headings(text)
#     lines = [l.strip() for l in text.split("\n") if l.strip()]
#     requirements = []
#     cur = None

#     def flush():
#         nonlocal cur
#         if cur and cur["text"].strip():
#             cur["text"] = re.sub(r"\s+", " ", cur["text"]).strip()
#             # Assign epic from headings
#             cur["epic"] = assign_epic(cur["req_id"], headings)
#             requirements.append(cur)
#         cur = None

#     for i, line in enumerate(lines):
#         rid = None
#         content = line
#         start_new = False

#         # Requirement ID pattern
#         m = re.match(r"^(REQ[-\s]?\d+|[0-9]+(?:\.[0-9]+)*)\b", line)
#         if m:
#             rid = m.group(0)
#             start_new = True
#             content = line[m.end():].strip() or line.strip()
#         elif re.match(r"^\s*[-*•]\s+", line):  # bullet point
#             start_new = True
#             content = line.strip()

#         if start_new:
#             flush()
#             rid = rid or f"AUTO-{len(requirements)+1}"
#             cur = {"req_id": rid, "text": content}
#         else:
#             if cur:
#                 cur["text"] += " " + content
#             else:
#                 cur = {"req_id": f"AUTO-{len(requirements)+1}", "text": content}

#     flush()
#     return requirements


# def chunked(seq: List[Any], n: int) -> Iterable[List[Any]]:
#     for i in range(0, len(seq), n):
#         yield seq[i:i+n]

# # ========================== RAG Index & Retrieval ==========================

# def page_chunks(pages: List[Dict[str, Any]], max_chars=1500, overlap=200) -> List[Dict[str, Any]]:
#     """Create sliding-window chunks with page provenance."""
#     chunks = []
#     for p in pages:
#         t = p["text"]
#         i = 0
#         while i < len(t):
#             j = min(len(t), i + max_chars)
#             chunk = t[i:j]
#             chunks.append({"page": p["page"], "text": chunk})
#             if j == len(t): break
#             i = j - overlap
#     return chunks

# def _cosines(query_emb, matrix):
#     sims = []
#     q = np.array(query_emb, dtype=float)
#     qn = np.linalg.norm(q)
#     for v in matrix:
#         v = np.array(v, dtype=float)
#         vn = np.linalg.norm(v)
#         sims.append(0.0 if qn == 0 or vn == 0 else float(np.dot(q, v) / (qn * vn)))
#     return sims

# def build_retriever(embedder: VertexAIEmbeddings, chunks: List[Dict[str, Any]]):
#     texts = [c["text"] for c in chunks]
#     embs = embedder.embed_documents(texts)
#     return {"chunks": chunks, "embs": embs}

# def retrieve_context(retriever: Dict[str, Any], embedder: VertexAIEmbeddings, query: str, top_k=3):
#     q_emb = embedder.embed_query(query)
#     sims = _cosines(q_emb, retriever["embs"])
#     idxs = np.argsort(sims)[::-1][:top_k]
#     results = []
#     for i in idxs:
#         c = retriever["chunks"][i]
#         results.append({"page": c["page"], "text": c["text"], "score": float(sims[i])})
#     return results

# # ========================== LLM Utils ==========================

# async def safe_llm_batch_async(llm, prompts, timeout=60):
#     """Run multiple LLM calls concurrently with timeout handling."""
#     tasks = [llm.ainvoke(p) for p in prompts]  # VertexAI async call
#     try:
#         return await asyncio.wait_for(asyncio.gather(*tasks, return_exceptions=True), timeout=timeout)
#     except asyncio.TimeoutError:
#         print("⏳ Timeout reached, skipping batch.")
#         return ["{}" for _ in prompts]

# FENCE_OPEN_RE = re.compile(r"^```(?:json|JSON)?\s*")
# FENCE_CLOSE_RE = re.compile(r"\s*```$")

# def clean_response(resp) -> str:
#     """Convert VertexAI response to a plain JSON string. Strips ```json fences."""
#     if isinstance(resp, Exception):
#         return "{}"
#     if isinstance(resp, str):
#         text = resp.strip()
#     elif hasattr(resp, "content") and isinstance(resp.content, str):
#         text = resp.content.strip()
#     elif hasattr(resp, "content"):
#         text = str(resp.content).strip()
#     else:
#         text = str(resp).strip()

#     # Strip code fences
#     text = FENCE_OPEN_RE.sub("", text)
#     text = FENCE_CLOSE_RE.sub("", text)
#     if "```" in text:
#         text = text.replace("```json", "").replace("```JSON", "").replace("```", "")
#     return text.strip()

# def extract_json_object(text: str) -> str:
#     """Try to salvage the first balanced {...} block from text."""
#     start = text.find("{")
#     if start == -1:
#         return text
#     depth = 0
#     for i in range(start, len(text)):
#         ch = text[i]
#         if ch == "{":
#             depth += 1
#         elif ch == "}":
#             depth -= 1
#             if depth == 0:
#                 return text[start:i+1]
#     return text[start:]  # fallback

# def cosine_similarity(a, b) -> float:
#     a = np.array(a, dtype=float)
#     b = np.array(b, dtype=float)
#     denom = (np.linalg.norm(a) * np.linalg.norm(b))
#     if denom == 0.0:
#         return 0.0
#     return float(np.dot(a, b) / denom)

# # ========================== Alignment Checks ==========================

# STOP = set(("the","and","of","to","in","a","for","on","with","by","is","be","as","at","or","an","from"))

# def key_terms(s: str):
#     toks = re.findall(r"[A-Za-z0-9_]+", s.lower())
#     return set(t for t in toks if t not in STOP and len(t) > 2)

# def alignment_score(req_text: str, citations: List[dict]) -> float:
#     req_terms = key_terms(req_text)
#     ctx_terms = set()
#     for c in citations or []:
#         ctx_terms |= key_terms(c.get("snippet",""))
#     if not req_terms or not ctx_terms:
#         return 0.0
#     return len(req_terms & ctx_terms) / float(len(req_terms | ctx_terms))

# def validate_alignment(requirement: Dict[str, str], story: Dict[str, Any], min_score=0.15):
#     score = alignment_score(requirement.get("text",""), story.get("citations", []))
#     ok = score >= min_score and len(story.get("citations", [])) > 0
#     return ok, score
# import csv

# import csv

# def export_testcases_csv(
#     stories,
#     requirements,
#     out_csv="testcases.csv",
#     area_path="Healthcare\\DayHealth",
#     iteration_path="Release 1",
#     synth_step_on_missing_ac=True
# ):
#     """
#     ADO-style export.
#     Writes one row per acceptance criterion.
#     If a story has no ACs and synth_step_on_missing_ac=True,
#     writes a single synthetic step so nothing is dropped.
#     """

#     if not stories:
#         print("⚠️ No stories to export.")
#         return

#     req_map = {r["req_id"]: r for r in requirements}

#     total_stories = len(stories)
#     stories_with_acs = 0
#     stories_without_acs = 0
#     rows_written = 0

#     with open(out_csv, "w", encoding="utf-8", newline="") as f:
#         w = csv.writer(f)
#         w.writerow([
#             "Test Case Title",
#             "Step Action",
#             "Step Expected",
#             "Requirement ID",
#             "Priority",
#             "Tags",
#             "Pages",
#             "Story Id",
#             "Epic",
#             "Area Path",
#             "Iteration Path",
#         ])

#         for s in stories:
#             rid = (s.get("source_requirement_ids") or [None])[0]
#             req = req_map.get(rid, {})
#             epic = (s.get("epic") or req.get("epic") or "General").strip()

#             # build tags
#             priority = (s.get("priority") or "").strip()
#             tags = []
#             if priority:
#                 tags.append(f"@priority_{priority}")
#             if rid:
#                 tags.append(f"@req_{rid}")
#             nf_join = " ".join(s.get("non_functional", []) or [])
#             if "HIPAA" in nf_join.upper():
#                 tags.append("@HIPAA")
#             if "21 CFR" in nf_join.upper() or "FDA" in nf_join.upper():
#                 tags.append("@FDA21CFR11")
#             tags_str = " ".join(tags)

#             # pages from citations
#             pages = ";".join(
#                 [str(c.get("page")) for c in (s.get("citations") or []) if c.get("page")]
#             )

#             acs = s.get("acceptance_criteria") or []
#             if acs:
#                 stories_with_acs += 1
#                 for ac in acs:
#                     given_ = (ac.get("given") or "").strip()
#                     when_  = (ac.get("when") or "").strip()
#                     then_  = (ac.get("then") or "").strip()

#                     step_action = " | ".join([p for p in [given_, when_] if p])
#                     step_expected = then_ or "Then: expected outcome is observed."

#                     w.writerow([
#                         s.get("user_story","").strip(),
#                         step_action,
#                         step_expected,
#                         rid,
#                         priority,
#                         tags_str,
#                         pages,
#                         s.get("story_id",""),
#                         epic,
#                         area_path,
#                         iteration_path,
#                     ])
#                     rows_written += 1
#             else:
#                 stories_without_acs += 1
#                 if synth_step_on_missing_ac:
#                     # create a single synthetic step so the test case isn’t lost
#                     title = s.get("user_story","").strip() or "User story (no AC specified)"
#                     w.writerow([
#                         title,
#                         "Given the system is available | When I perform the described capability",
#                         "Then the described outcome is achieved",
#                         rid,
#                         priority,
#                         tags_str,
#                         pages,
#                         s.get("story_id",""),
#                         epic,
#                         area_path,
#                         iteration_path,
#                     ])
#                     rows_written += 1
#                 # else: intentionally skip writing rows for no-AC stories

#     print("✅ Exported test cases to", out_csv)
#     print(f"   Stories total:            {total_stories}")
#     print(f"   Stories with ACs:         {stories_with_acs}")
#     print(f"   Stories without ACs:      {stories_without_acs}")
#     print(f"   Rows written (test steps):{rows_written}")


# # ========================== Extractor ==========================

# class HealthcareStoryExtractor:
#     def __init__(self, project_id, location="us-central1",
#                  embedding_model="text-embedding-005",
#                  classifier_model=LLM_MODEL):
#         self.project_id = project_id
#         self.location = location
#         self.embedder = VertexAIEmbeddings(
#             model=embedding_model,
#             project=project_id,
#             location=location
#         )
#         self.llm = VertexAI(
#             model_name=classifier_model,
#             temperature=0.2,   # slight diversity, still stable JSON
#             top_p=0.9,
#             top_k=40,
#             project=project_id,
#             location=location,
#         )

#     async def generate_user_stories_batch(
#         self,
#         requirements,
#         glossary,
#         actors,
#         constraints,
#         batch_size=5,
#         retriever: Optional[Dict[str, Any]] = None,
#     ):
#         # Use dumps to avoid quote-escaping hell in long strings
#         abstain = {
#             "epic": "",
#             "story_id": "",
#             "user_story": "",
#             "acceptance_criteria": [],
#             "priority": "",
#             "dependencies": [],
#             "non_functional": [],
#             "source_requirement_ids": [],
#             "assumptions": ["Insufficient context"],
#             "open_questions": ["Need clarification"],
#             "citations": [],
#         }

#         system_prompt = (
#             "You are a senior BA in healthcare software.\n"
#             "Use ONLY the provided glossary, actors, constraints, and CONTEXT SNIPPETS.\n"
#             "Cite which page(s) you used in the 'citations' field; include a short snippet from each page.\n"
#             "If the context is insufficient or unrelated, return EXACTLY this JSON:\n"
#             f"{json.dumps(abstain, indent=2)}\n"
#             "Return ONLY valid JSON with this schema (no markdown, no commentary):\n"
#             f"{json.dumps(SCHEMA, indent=2)}"
#         )

#         stories = []
#         for i in tqdm(range(0, len(requirements), batch_size), desc="LLM batches"):
#             batch_reqs = requirements[i:i + batch_size]
#             prompts = []
#             for req in batch_reqs:
#                 ctx = []
#                 if retriever is not None:
#                     hits = retrieve_context(retriever, self.embedder, req["text"], top_k=3)
#                     ctx = [{"page": h["page"], "snippet": h["text"][:500]} for h in hits]  # cap snippet

#                 user_prompt = (
#                     f"GLOSSARY: {glossary}\n"
#                     f"ACTORS: {actors}\n"
#                     f"CONSTRAINTS: {constraints}\n"
#                     f"CONTEXT SNIPPETS: {json.dumps(ctx, ensure_ascii=False)}\n"
#                     f"REQUIREMENT (ID: {req['req_id']}): {req['text']}"
#                 )
#                 prompts.append(f"{system_prompt}\n\n{user_prompt}")

#             responses = await safe_llm_batch_async(self.llm, prompts)

#             for req, resp in zip(batch_reqs, responses):
#                 raw_text = clean_response(resp)
#                 try:
#                     try:
#                         story_json = json.loads(raw_text)
#                     except json.JSONDecodeError:
#                         story_json = json.loads(extract_json_object(raw_text))

#                     us = UserStory(**story_json)

#                     # Force unique story_id & keep provenance
#                     us.story_id = str(uuid.uuid4())
#                     if not us.source_requirement_ids:
#                         us.source_requirement_ids = [req["req_id"]]

#                     stories.append(us.model_dump())  # pydantic v2
#                 except (json.JSONDecodeError, ValidationError) as e:
#                     print(f"❌ Invalid JSON for {req['req_id']}: {e}\nRaw output:\n{raw_text}\n")

#         self._last_requirements = requirements
#         return stories



#     def check_duplicates(self, stories, threshold=0.99):
#         """Return list of (story_id_i, story_id_j, similarity) for near-duplicates across different source reqs."""
#         texts = [_story_text_for_embedding(s) for s in stories if s.get("user_story")]
#         if not texts:
#             return []

#         embeddings = self.embedder.embed_documents(texts)
#         flagged = []

#         total_pairs = (len(embeddings) * (len(embeddings) - 1)) // 2
#         pbar = tqdm(total=total_pairs, desc="Duplicate check")
#         for i in range(len(embeddings)):
#             for j in range(i + 1, len(embeddings)):
#                 # provenance-aware: skip same-source pairs
#                 src_i = set(stories[i].get("source_requirement_ids", []))
#                 src_j = set(stories[j].get("source_requirement_ids", []))
#                 if not (src_i & src_j):
#                     sim = cosine_similarity(embeddings[i], embeddings[j])
#                     if sim >= threshold:
#                         flagged.append((stories[i]["story_id"], stories[j]["story_id"], sim))
#                 pbar.update(1)
#         pbar.close()
#         return flagged

#     def dedupe_stories(self, stories, threshold=0.99):
#         """Cluster near-duplicates and keep the best representative per cluster."""
#         if not stories:
#             return stories

#         texts = [_story_text_for_embedding(s) for s in stories]
#         embeddings = self.embedder.embed_documents(texts)

#         # Union-Find with progress
#         parent = list(range(len(stories)))
#         def find(x):
#             while parent[x] != x:
#                 parent[x] = parent[parent[x]]
#                 x = parent[x]
#             return x
#         def union(a, b):
#             ra, rb = find(a), find(b)
#             if ra != rb:
#                 parent[rb] = ra

#         total_pairs = (len(embeddings) * (len(embeddings) - 1)) // 2
#         pbar = tqdm(total=total_pairs, desc="Clustering dupes")
#         for i in range(len(embeddings)):
#             for j in range(i + 1, len(embeddings)):
#                 src_i = set(stories[i].get("source_requirement_ids", []))
#                 src_j = set(stories[j].get("source_requirement_ids", []))
#                 if not (src_i & src_j):
#                     sim = cosine_similarity(embeddings[i], embeddings[j])
#                     if sim >= threshold:
#                         union(i, j)
#                 pbar.update(1)
#         pbar.close()

#         clusters = {}
#         for idx in range(len(stories)):
#             r = find(idx)
#             clusters.setdefault(r, []).append(idx)

#         # pick representative: more ACs, then longer text
#         def score(k):
#             ac_len = len(stories[k].get("acceptance_criteria", []) or [])
#             txt_len = len(texts[k])
#             return (ac_len, txt_len)

#         kept = []
#         dropped_pairs = []  # (kept_id, dropped_id, cluster_size)
#         for _, idxs in clusters.items():
#             if len(idxs) == 1:
#                 kept.append(stories[idxs[0]])
#                 continue
#             best = max(idxs, key=score)
#             kept.append(stories[best])
#             for other in idxs:
#                 if other != best:
#                     dropped_pairs.append((stories[best]["story_id"], stories[other]["story_id"], len(idxs)))

#         if dropped_pairs:
#             preview = [(k, d, int(n)) for k, d, n in dropped_pairs]
#             print("🧹 Dedupe kept/dropped:", preview)

#         return kept


#     async def extract_from_file(
#         self,
#         file_path,
#         constraints="HIPAA, FDA 21 CFR Part 11",
#         batch_llm_size=20,
#         llm_inner_batch=5,
#         dedupe=True,
#         dup_threshold=0.99,
#         min_alignment=0.15,
#     ):
#         print("📥 Parsing document...")
#         parsed = parse_file_text_or_pages(file_path)

#         retriever = None
#         if "pages" in parsed:
#             print("🧹 Normalizing PDF pages & building RAG index...")
#             norm_pages = normalize_page_text(parsed["pages"])
#             chunks = page_chunks(norm_pages, max_chars=1500, overlap=200)
#             retriever = build_retriever(self.embedder, chunks)
#             full_text = "\n".join([p["text"] for p in norm_pages])
#         else:
#             full_text = parsed["text"]

#         print("🧩 Segmenting requirements...")
#         requirements = split_requirements_with_epics(full_text)
#         print(f"📌 Found requirements: {len(requirements)}")

#         # ✅ Limit for test mode
#         if TEST:
#             requirements = requirements[:10]
#             print(f"⚡ TEST mode active → using only {len(requirements)} requirements")

#         glossary = {"EHR": "Electronic Health Record", "HL7": "Data exchange standard"}
#         actors = {"Doctor": "Reviews patient data", "Nurse": "Updates vitals", "Patient": "Views reports"}

#         stories: List[Dict[str, Any]] = []
#         print("🧠 Generating stories (LLM)...")
#         for batch in tqdm(list(chunked(requirements, batch_llm_size)), desc="Requirement chunks"):
#             part = await self.generate_user_stories_batch(
#                 batch, glossary, actors, constraints, batch_size=llm_inner_batch, retriever=retriever
#             )
#             stories.extend(part)

#         print(f"🧾 Generated stories (pre-alignment, pre-dedupe): {len(stories)}")

#         # Alignment pass
#         print("🔎 Checking alignment with citations...")
#         req_map = {r["req_id"]: r for r in requirements}
#         aligned, needs_review = [], []
#         for s in stories:
#             rid = (s.get("source_requirement_ids") or [None])[0]
#             req = req_map.get(rid, {"text": ""})
#             ok, score = validate_alignment(req, s, min_score=min_alignment)
#             s["alignment_score"] = round(score, 3)
#             s["needs_review"] = not ok
#             (aligned if ok else needs_review).append(s)
#         print(f"✅ Aligned: {len(aligned)} | 🚩 Needs review: {len(needs_review)}")

#         final_stories = aligned + needs_review
#         if dedupe and final_stories:
#             dups = self.check_duplicates(final_stories, threshold=dup_threshold)
#             if dups:
#                 print("⚠️ Near-duplicate pairs:", [(a, b, round(sim, 3)) for a, b, sim in dups])
#             final_stories = self.dedupe_stories(final_stories, threshold=dup_threshold)
#             print(f"✅ Final stories after dedupe: {len(final_stories)}")
#         else:
#             print("⏭️ Skipping dedupe.")

#         return final_stories



#     # ------------------ BigQuery Export ------------------

#     def _infer_bq_schema(self, sample_row: dict):
#         """Infer BigQuery schema dynamically from a sample story dict."""
#         schema = []
#         for key, val in sample_row.items():
#             if isinstance(val, list) and val and isinstance(val[0], dict):
#                 schema.append(bigquery.SchemaField(key, "JSON"))
#             elif isinstance(val, list):
#                 schema.append(bigquery.SchemaField(key, "STRING", mode="REPEATED"))
#             elif isinstance(val, dict):
#                 schema.append(bigquery.SchemaField(key, "JSON"))
#             elif isinstance(val, str):
#                 schema.append(bigquery.SchemaField(key, "STRING"))
#             else:
#                 schema.append(bigquery.SchemaField(key, "STRING"))
#         return schema

#     def export_to_bq(self, stories, dataset_id="stories_dataset", table_id="user_stories"):
#         if not stories:
#             print("No stories to export")
#             return

#         client = bigquery.Client(project=self.project_id)
#         table_ref = f"{self.project_id}.{dataset_id}.{table_id}"

#         schema = self._infer_bq_schema(stories[0])

#         # Ensure dataset
#         try:
#             client.get_dataset(dataset_id)
#         except Exception:
#             dataset = bigquery.Dataset(f"{self.project_id}.{dataset_id}")
#             dataset.location = self.location
#             client.create_dataset(dataset, exists_ok=True)

#         # Ensure table
#         try:
#             client.get_table(table_ref)
#         except Exception:
#             table = bigquery.Table(table_ref, schema=schema)
#             client.create_table(table)

#         # Stringify dict fields (BQ JSON is fine, but this is robust)
#         rows = []
#         for story in stories:
#             row = story.copy()
#             for k, v in row.items():
#                 if isinstance(v, dict):
#                     row[k] = json.dumps(v, ensure_ascii=False)
#             rows.append(row)

#         print("⬆️ Exporting to BigQuery...")
#         errors = client.insert_rows_json(table_ref, rows)
#         if errors:
#             print(f"❌ Errors inserting rows: {errors}")
#         else:
#             print(f"✅ Inserted {len(rows)} stories into {table_ref}")


In [10]:
LLM_MODEL = "gemini-2.0-flash"  # e.g., "gemini-1.5-pro"

import os
import re
import json
import uuid
import asyncio
import docx
import xml.etree.ElementTree as ET
import numpy as np
from tqdm.auto import tqdm
from PyPDF2 import PdfReader
from typing import List, Iterable, Dict, Any, Optional
from pydantic import BaseModel, Field, ValidationError
from requirement_builder import HealthcareStoryExtractor
from google.cloud import bigquery
from langchain_google_vertexai import VertexAIEmbeddings, VertexAI
# Auto-detect project from your auth context
bq_client = bigquery.Client()
PROJECT_ID = bq_client.project
TEST = True   # set this to False for full run

# Configs (override via env)
FILE_PATH = os.environ.get("INPUT_FILE", "data/srs.pdf")
OUTPUT_JSON = os.environ.get("OUTPUT_JSON", "generated_user_stories.json")
DEDUPE = os.environ.get("DEDUPE", "true").lower() in {"1", "true", "yes"}
DUP_THRESHOLD = float(os.environ.get("DUP_THRESHOLD", "0.99"))
EXPORT = os.environ.get("EXPORT_TO_BQ", "false").lower() in {"1", "true", "yes"}
BATCH_LLM_SIZE = int(os.environ.get("BATCH_LLM_SIZE", "20"))
LLM_INNER_BATCH = int(os.environ.get("LLM_INNER_BATCH", "5"))

extractor = HealthcareStoryExtractor(project_id=PROJECT_ID)

# Run extraction
stories = await  extractor.extract_from_file(
        FILE_PATH,
        dedupe=DEDUPE,
        dup_threshold=DUP_THRESHOLD,
        batch_llm_size=BATCH_LLM_SIZE,
        llm_inner_batch=LLM_INNER_BATCH,
    TEST=TEST
    
    )
# Save test cases to CSV
# export_testcases_csv(stories, extractor._last_requirements, out_csv="testcases.csv")


# Save to JSON
with open(OUTPUT_JSON, "w", encoding="utf-8") as f:
    json.dump(stories, f, indent=2, ensure_ascii=False)
print(f"✅ Extracted user stories saved to {OUTPUT_JSON}")

import json

# Save segmented requirements
with open("requirements.json", "w", encoding="utf-8") as f:
    json.dump(extractor._last_requirements, f, indent=2, ensure_ascii=False)

# Save generated user stories
with open("stories.json", "w", encoding="utf-8") as f:
    json.dump(stories, f, indent=2, ensure_ascii=False)

print("✅ Saved requirements.json and stories.json")

# Optional: export to BigQuery
# if EXPORT:
#     extractor.export_to_bq(stories)




📥 Parsing document...


ERROR:asyncio:_GatheringFuture exception was never retrieved
future: <_GatheringFuture finished exception=CancelledError()>
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

asyncio.exceptions.CancelledError


🧹 Normalizing PDF pages & building RAG index...
🧩 Segmenting requirements...
📌 Found requirements: 170
⚡ TEST mode active → using only 10 requirements
🧠 Generating stories (LLM)...


Requirement chunks:   0%|          | 0/1 [00:00<?, ?it/s]

LLM batches:   0%|          | 0/2 [00:00<?, ?it/s]

🧾 Generated stories (pre-alignment, pre-dedupe): 10
🔎 Checking alignment with citations...
✅ Aligned: 2 | 🚩 Needs review: 8


Duplicate check:   0%|          | 0/45 [00:00<?, ?it/s]

Clustering dupes:   0%|          | 0/45 [00:00<?, ?it/s]

✅ Final stories after dedupe: 10
✅ Extracted user stories saved to generated_user_stories.json
✅ Saved requirements.json and stories.json


In [17]:
# !pip install google-cloud-aiplatform langchain-google-vertexai PyPDF2 python-docx pydantic


In [18]:
# stories

In [19]:
# ========================== Layer-3: Test Generation ==========================
# Generates:
#   - Gherkin .feature files (1 Scenario per AC) with tags: @priority_*, @req_*, @HIPAA...
#   - Step-definition stubs (pytest-bdd or behave) with E2E placeholders
#   - RTM coverage CSV: requirement_id ↔ story_id ↔ scenario_id
import csv
from pathlib import Path
from collections import defaultdict

_SAFE = re.compile(r"[^A-Za-z0-9._-]+")

def _safe_name(s: str, default: str = "item") -> str:
    s = (s or "").strip()
    if not s:
        return default
    s = re.sub(_SAFE, "_", s)
    return s[:80] or default  # keep filenames short-ish

def _ensure_dir(path: str | Path):
    Path(path).mkdir(parents=True, exist_ok=True)

def _detect_compliance_tags(story: Dict[str, Any]) -> list[str]:
    """Return compliance tags like HIPAA, FDA21CFR11 if mentioned in non_functional or citations."""
    tags = set()
    text_blobs = []
    for nfr in story.get("non_functional") or []:
        text_blobs.append(nfr or "")
    for c in story.get("citations") or []:
        text_blobs.append((c or {}).get("snippet", "") or "")
    blob = " ".join(text_blobs).lower()
    if "hipaa" in blob:
        tags.add("HIPAA")
    if "21 cfr part 11" in blob or "fda 21 cfr part 11" in blob or "21cfr part 11" in blob:
        tags.add("FDA21CFR11")
    if "iso 13485" in blob:
        tags.add("ISO13485")
    if "iec 62304" in blob:
        tags.add("IEC62304")
    if "iso 27001" in blob:
        tags.add("ISO27001")
    return sorted(tags)

def _story_scenarios(story: Dict[str, Any]) -> List[tuple[str, Dict[str, str]]]:
    """Return [(scenario_id, ac_dict), ...] where ac_dict has given/when/then."""
    out: List[tuple[str, Dict[str, str]]] = []
    acs = story.get("acceptance_criteria") or []
    for idx, ac in enumerate(acs, 1):
        sid = f"{story.get('story_id','S')}_AC{idx}"
        out.append((sid, ac))
    return out

def _extract_requirements(story: Dict[str, Any]) -> List[str]:
    r = story.get("source_requirement_ids") or []
    return [re.sub(_SAFE, "_", str(x))[:80] for x in r if x]

def _priority_tag(priority: str | None) -> str:
    p = (priority or "").strip() or "Unspecified"
    p = re.sub(r"\s+", "", p)
    return f"priority_{p}"

def export_gherkin_features(
    stories: List[Dict[str, Any]],
    out_dir: str = "features",
    feature_per_epic: bool = True,
) -> List[Path]:
    """
    Write .feature files from stories.
    - Group by 'epic' (default) OR one file per story if feature_per_epic=False
    - One Scenario per AC
    - Tags: @priority_*, @req_REQ-123, @HIPAA, @FDA21CFR11, ...
    """
    _ensure_dir(out_dir)

    groups: dict[str, list[Dict[str, Any]]] = defaultdict(list)
    if feature_per_epic:
        for s in stories:
            key = s.get("epic") or "General"
            groups[key].append(s)
    else:
        for s in stories:
            key = s.get("story_id") or "Story"
            groups[key].append(s)

    files: List[Path] = []
    for group_key, group_stories in groups.items():
        fname = _safe_name(group_key or "feature")
        path = Path(out_dir) / f"{fname}.feature"

        with open(path, "w", encoding="utf-8") as f:
            f.write(f"Feature: {group_key or 'User Stories'}\n\n")
            for s in group_stories:
                # Comment with the story text (nice for humans)
                f.write(f"  # {s.get('user_story','')}\n")

                # Build base tags
                tags = [f"@{_priority_tag(s.get('priority'))}"]
                # Requirement tags
                for rid in _extract_requirements(s):
                    tags.append(f"@req_{rid}")
                # Compliance tags
                for t in _detect_compliance_tags(s):
                    tags.append(f"@{t}")

                # Emit Scenarios (one per AC)
                for scenario_id, ac in _story_scenarios(s):
                    f.write("  " + " ".join(tags) + "\n")
                    scen_title = _safe_name(scenario_id, "Scenario")
                    f.write(f"  Scenario: {scen_title}\n")
                    f.write(f"    Given {ac.get('given','<given TBD>')}\n")
                    f.write(f"    When {ac.get('when','<when TBD>')}\n")
                    f.write(f"    Then {ac.get('then','<then TBD>')}\n\n")

        files.append(path)

    print(f"🧪 Wrote {len(files)} Gherkin feature file(s) to {out_dir}")
    return files

# -------- Step Stubs (pytest-bdd or behave) --------

_PYTEST_BDD_TEMPLATE = """# Auto-generated pytest-bdd step definitions.
# Run: pytest -k feature
import pytest
from pytest_bdd import given, when, then, scenarios

# Link feature(s)
scenarios("{feature_glob}")

# Example shared test data (E2E placeholders)
TEST_CONTEXT = {{
    "patient_id": "PAT-001",
    "session_id": "SES-001",
    "clinician_id": "DOC-123",
}}

@given("{given_text}")
def step_given():
    # TODO: implement setup for: {given_text}
    # e.g., create patient in DB using TEST_CONTEXT["patient_id"]
    pass

@when("{when_text}")
def step_when():
    # TODO: implement action for: {when_text}
    # e.g., call API to sign in/out, upload document, etc.
    pass

@then("{then_text}")
def step_then():
    # TODO: implement assertion for: {then_text}
    # e.g., assert response.status_code == 200 or record exists in DB
    pass
"""

_BEHAVE_TEMPLATE = """# Auto-generated behave step definitions.
# Run: behave -i {feature_glob}
from behave import given, when, then

# Example shared test data (E2E placeholders)
TEST_CONTEXT = {{
    "patient_id": "PAT-001",
    "session_id": "SES-001",
    "clinician_id": "DOC-123",
}}

@given('{given_text}')
def step_impl_given(context):
    # TODO: implement setup for: {given_text}
    pass

@when('{when_text}')
def step_impl_when(context):
    # TODO: implement action for: {when_text}
    pass

@then('{then_text}')
def step_impl_then(context):
    # TODO: implement assertion for: {then_text}
    pass
"""

def export_step_stubs(
    stories: List[Dict[str, Any]],
    out_dir: str = "steps",
    framework: str = "pytest-bdd",  # or "behave"
    feature_glob: str = "features/*.feature",
) -> Path:
    """Produce a single step file with example stubs and E2E placeholders."""
    _ensure_dir(out_dir)

    # Pick first non-empty AC as examples
    given_text = when_text = then_text = "TBD"
    for s in stories:
        for _, ac in _story_scenarios(s):
            given_text = ac.get("given") or "a precondition"
            when_text = ac.get("when") or "an action occurs"
            then_text = ac.get("then") or "an expected result"
            break
        if given_text != "TBD":
            break

    if framework.lower() == "behave":
        body = _BEHAVE_TEMPLATE.format(
            feature_glob=feature_glob,
            given_text=given_text.replace('"', '\\"'),
            when_text=when_text.replace('"', '\\"'),
            then_text=then_text.replace('"', '\\"'),
        )
        path = Path(out_dir) / "steps_behave.py"
    else:
        body = _PYTEST_BDD_TEMPLATE.format(
            feature_glob=feature_glob,
            given_text=given_text,
            when_text=when_text,
            then_text=then_text,
        )
        path = Path(out_dir) / "test_steps_bdd.py"

    with open(path, "w", encoding="utf-8") as f:
        f.write(body)

    print(f"🧩 Wrote step stubs for {framework} to {path}")
    return path

# -------- Coverage Matrix (RTM) --------

def _build_rtm_rows(stories: List[Dict[str, Any]]) -> List[List[str]]:
    """Rows: requirement_id, story_id, epic, priority, scenario_id, tags, pages"""
    rows: List[List[str]] = []
    for s in stories:
        pages = sorted({c.get("page") for c in (s.get("citations") or []) if isinstance(c.get("page"), int)})
        tags = [f"@{_priority_tag(s.get('priority'))}"] + [f"@req_{rid}" for rid in _extract_requirements(s)]
        tags += [f"@{t}" for t in _detect_compliance_tags(s)]

        scenarios = _story_scenarios(s)
        if scenarios:
            for scen_id, _ in scenarios:
                rows.append([
                    ";".join(s.get("source_requirement_ids") or ["-"]),
                    s.get("story_id", ""),
                    s.get("epic", ""),
                    s.get("priority", ""),
                    scen_id,
                    " ".join(tags),
                    ";".join(map(str, pages)) if pages else "",
                ])
        else:
            rows.append([
                ";".join(s.get("source_requirement_ids") or ["-"]),
                s.get("story_id", ""),
                s.get("epic", ""),
                s.get("priority", ""),
                "",  # no scenario
                " ".join(tags),
                ";".join(map(str, pages)) if pages else "",
            ])
    return rows

def export_traceability_csv(stories: List[Dict[str, Any]], path: str = "traceability.csv") -> Path:
    rows = _build_rtm_rows(stories)
    with open(path, "w", newline="", encoding="utf-8") as f:
        w = csv.writer(f)
        w.writerow(["requirement_id", "story_id", "epic", "priority", "scenario_id", "tags", "pages"])
        w.writerows(rows)
    print(f"📊 Wrote RTM to {path} ({len(rows)} rows)")
    return Path(path)

def flag_requirements_with_no_scenarios(stories: List[Dict[str, Any]]) -> List[str]:
    """Return list of requirement IDs that do not map to any Scenario."""
    req_to_scen: dict[str, int] = defaultdict(int)
    for s in stories:
        rids = s.get("source_requirement_ids") or ["-"]
        scen_count = len(_story_scenarios(s))
        for rid in rids:
            req_to_scen[rid] += scen_count
    return sorted([rid for rid, n in req_to_scen.items() if n == 0])

def generate_tests_from_stories(
    stories: List[Dict[str, Any]],
    feature_dir: str = "features",
    steps_dir: str = "steps",
    framework: str = "pytest-bdd",  # or "behave"
    feature_per_epic: bool = True,
    traceability_csv: str = "traceability.csv",
) -> Dict[str, Any]:
    """One-call orchestrator for Layer-3 outputs."""
    feature_files = export_gherkin_features(stories, out_dir=feature_dir, feature_per_epic=feature_per_epic)
    step_file = export_step_stubs(stories, out_dir=steps_dir, framework=framework, feature_glob=f"{feature_dir}/*.feature")
    rtm_file = export_traceability_csv(stories, path=traceability_csv)
    gaps = flag_requirements_with_no_scenarios(stories)

    if gaps:
        print(f"⚠️ Requirements with 0 scenarios: {gaps}")
    else:
        print("✅ All requirements have at least one scenario.")

    return {
        "feature_files": [str(p) for p in feature_files],
        "step_file": str(step_file),
        "rtm_csv": str(rtm_file),
        "gaps": gaps,
    }
# ======================== End Layer-3: Test Generation ========================


In [20]:
# ========================== Layer-3: CSV Exporters (Jira & ADO) ==========================
# Creates CSVs ready to import into Jira test plugins (Xray/Zephyr) and ADO Test Plans.

def _flatten_scenarios(stories: List[Dict[str, Any]]):
    """
    Yields rows: {
      requirement_id, story_id, epic, priority, scenario_id,
      given, when, then, tags (space-delimited), pages (semicolon-delimited)
    }
    Each AC (Given/When/Then) becomes ONE scenario row.
    """
    rows = []
    for s in stories:
        rids = s.get("source_requirement_ids") or ["-"]
        pages = sorted({c.get("page") for c in (s.get("citations") or []) if isinstance(c.get("page"), int)})
        tags = [f"@{_priority_tag(s.get('priority'))}"] + [f"@req_{rid}" for rid in _extract_requirements(s)]
        tags += [f"@{t}" for t in _detect_compliance_tags(s)]

        scenarios = _story_scenarios(s)
        if not scenarios:
            # still emit a row with empty scenario so importers can see the gap
            rows.append({
                "requirement_id": ";".join(rids),
                "story_id": s.get("story_id", ""),
                "epic": s.get("epic", ""),
                "priority": s.get("priority", ""),
                "scenario_id": "",
                "given": "",
                "when": "",
                "then": "",
                "tags": " ".join(tags),
                "pages": ";".join(map(str, pages)) if pages else "",
                "user_story": s.get("user_story", ""),
            })
            continue

        for scen_id, ac in scenarios:
            rows.append({
                "requirement_id": ";".join(rids),
                "story_id": s.get("story_id", ""),
                "epic": s.get("epic", ""),
                "priority": s.get("priority", ""),
                "scenario_id": scen_id,
                "given": (ac or {}).get("given", ""),
                "when": (ac or {}).get("when", ""),
                "then": (ac or {}).get("then", ""),
                "tags": " ".join(tags),
                "pages": ";".join(map(str, pages)) if pages else "",
                "user_story": s.get("user_story", ""),
            })
    return rows


def export_to_jira_csv(
    stories: List[Dict[str, Any]],
    path: str = "jira_testcases.csv",
    project_key: str | None = None,
    default_labels: list[str] | None = None,
    test_type: str = "Manual",     # Xray/Zephyr friendly
):
    """
    Produce a CSV that works well with Jira test plugins (Xray/Zephyr/TM4J) via CSV import.
    One row per Scenario (AC). Map fields during import:
      - Issue Type        -> "Test"
      - Project Key       -> your project (if not in file, select in UI)
      - Summary           -> Test Case title
      - Priority          -> Priority
      - Labels            -> space/comma separated
      - Requirement Keys  -> requirement_id (map to your "Requirement Link" field)
      - Test Type         -> Manual
      - Step Action       -> Given + When
      - Step Result       -> Then
      - Description       -> (optional) the original user story

    NOTE: Different Jira plugins have slightly different CSV headers. These columns are broadly compatible.
    You can re-map columns during the import wizard.
    """
    default_labels = default_labels or ["auto-generated", "vertex-ai", "traceable"]
    rows = _flatten_scenarios(stories)

    headers = [
        "Issue Type", "Project Key", "Summary", "Priority", "Labels",
        "Requirement Keys", "Test Type", "Step Action", "Step Data", "Step Result",
        "Description", "Tags", "Pages", "Story Id", "Epic"
    ]

    with open(path, "w", newline="", encoding="utf-8") as f:
        w = csv.writer(f)
        w.writerow(headers)
        for r in rows:
            summary = r["user_story"][:255] if r["user_story"] else (r["scenario_id"] or "Generated Test")
            step_action = " | ".join([x for x in [r["given"], r["when"]] if x])
            step_result = r["then"]
            labels_str = ",".join(default_labels)
            req_keys = r["requirement_id"]  # If your requirements already exist in Jira, use their keys here

            w.writerow([
                "Test",
                project_key or "",      # leave blank to pick in UI
                summary,
                r["priority"] or "",
                labels_str,
                req_keys,
                test_type,
                step_action,
                "",                     # Step Data (optional)
                step_result,
                r["user_story"],
                r["tags"],
                r["pages"],
                r["story_id"],
                r["epic"],
            ])

    print(f"🗂️  Wrote Jira-friendly CSV to {path}")
    return Path(path)


def export_to_ado_csv(
    stories: List[Dict[str, Any]],
    path: str = "ado_testcases.csv",
    area_path: str | None = None,
    iteration_path: str | None = None,
):
    """
    Produce an Azure DevOps Test Plans friendly CSV.
    Recommended field mapping in ADO import:
      - "Test Case Title"    -> Title of the test
      - "Step Action"        -> Given + When
      - "Step Expected"      -> Then
      - "Requirement ID"     -> requirement_id
      - "Priority"           -> Priority
      - "Tags"               -> Tags
      - "Area Path"          -> (optional) area_path (string)
      - "Iteration Path"     -> (optional) iteration_path (string)

    One row per Scenario (AC).
    """
    rows = _flatten_scenarios(stories)

    headers = [
        "Test Case Title", "Step Action", "Step Expected",
        "Requirement ID", "Priority", "Tags", "Pages",
        "Story Id", "Epic", "Area Path", "Iteration Path"
    ]

    with open(path, "w", newline="", encoding="utf-8") as f:
        w = csv.writer(f)
        w.writerow(headers)
        for r in rows:
            title = r["user_story"][:255] if r["user_story"] else (r["scenario_id"] or "Generated Test")
            step_action = " | ".join([x for x in [r["given"], r["when"]] if x])
            step_expected = r["then"]

            w.writerow([
                title,
                step_action,
                step_expected,
                r["requirement_id"],
                r["priority"] or "",
                r["tags"],
                r["pages"],
                r["story_id"],
                r["epic"],
                area_path or "",
                iteration_path or "",
            ])

    print(f"🗂️  Wrote ADO-friendly CSV to {path}")
    return Path(path)
# ====================== End Layer-3: CSV Exporters (Jira & ADO) ======================


In [11]:
from testcase_generator import TestCaseGenerator

tcgen = testcase_generator()
result = tcgen.generate(
    stories,
    feature_dir="features",
    steps_dir="steps",
    framework="pytest-bdd",     # or "behave"
    feature_per_epic=True,
    traceability_csv="traceability.csv",
)

NameError: name 'testcase_generator' is not defined

In [21]:
# After extraction + (optional) dedupe:
# stories = asyncio.run(extractor.extract_from_file(FILE_PATH, ...))

# 1) Generate features/steps/RTM as before
result = generate_tests_from_stories(
    stories,
    feature_dir="features",
    steps_dir="steps",
    framework="pytest-bdd",     # or "behave"
    feature_per_epic=True,
    traceability_csv="traceability.csv",
)

# 2) Create CSVs for Jira and ADO imports
export_to_jira_csv(
    stories,
    path="jira_testcases.csv",
    project_key="",                 # put your Jira project key or leave blank and select in UI
    default_labels=["auto-generated","vertex-ai","traceable"],
    test_type="Manual",
)

export_to_ado_csv(
    stories,
    path="ado_testcases.csv",
    area_path="Healthcare\\DayHealth",   # optional
    iteration_path="Release 1",          # optional
)


🧪 Wrote 6 Gherkin feature file(s) to features
🧩 Wrote step stubs for pytest-bdd to steps/test_steps_bdd.py
📊 Wrote RTM to traceability.csv (187 rows)
✅ All requirements have at least one scenario.
🗂️  Wrote Jira-friendly CSV to jira_testcases.csv
🗂️  Wrote ADO-friendly CSV to ado_testcases.csv


PosixPath('ado_testcases.csv')

In [22]:
"""
Coverage Matrix Generator (Enhanced)
------------------------------------
Merges Requirements ↔ Stories ↔ Test Cases into one CSV/Excel
with coverage flags and citations for traceability.

Inputs:
  - requirements.json
  - stories.json
  - testcases.csv

Output:
  - coverage_matrix.csv
"""

import json
import pandas as pd

# ------------------ Load Inputs ------------------
reqs = json.load(open("requirements.json", "r", encoding="utf-8"))
stories = json.load(open("stories.json", "r", encoding="utf-8"))
testcases = pd.read_csv("testcases.csv")

# Normalize requirements
df_reqs = pd.DataFrame(reqs)
if "epic" not in df_reqs.columns:
    df_reqs["epic"] = "General"

df_reqs = df_reqs.rename(columns={
    "req_id": "Requirement ID",
    "text": "Requirement Text",
    "epic": "Epic (Req)"
})

# Normalize stories
df_stories = pd.DataFrame(stories)
df_stories["Story Id"] = df_stories["story_id"]  # for join with testcases

# Ensure source req IDs exist
df_stories["source_requirement_ids"] = df_stories["source_requirement_ids"].apply(
    lambda x: x if isinstance(x, list) else []
)

# Flatten citations (page numbers + snippet preview)
def format_citations(cites):
    if not cites:
        return ""
    return "; ".join([f"p{c.get('page')}:{c.get('snippet','')[:80]}" for c in cites])

df_stories["Citations"] = df_stories["citations"].apply(format_citations)

# Explode stories by requirement ID
rows = []
for _, s in df_stories.iterrows():
    for rid in s["source_requirement_ids"]:
        rows.append({
            "Requirement ID": rid,
            "Story Id": s["story_id"],
            "User Story": s["user_story"],
            "Epic (Story)": s.get("epic", ""),
            "Alignment Score": s.get("alignment_score", 0.0),
            "Needs Review": s.get("needs_review", False),
            "Citations": s.get("Citations", "")
        })
df_map = pd.DataFrame(rows)

# Count test cases per story
tc_counts = testcases.groupby("Story Id").size().reset_index(name="Test Case Count")

# Merge requirements ↔ stories ↔ testcases
matrix = (
    df_reqs
    .merge(df_map, on="Requirement ID", how="left")
    .merge(tc_counts, on="Story Id", how="left")
    .fillna({"Test Case Count": 0})
)

# Coverage classification
def classify(row):
    has_story = pd.notna(row["Story Id"])
    has_test = row["Test Case Count"] > 0

    if has_story and has_test:
        return "✅ Covered"
    elif has_story and not has_test:
        return "⚠️ No tests"
    elif not has_story:
        # if requirement text exists but no story mapped
        return "⚠️ No stories"
    else:
        return "❌ Missing everything"

matrix["Coverage Status"] = matrix.apply(classify, axis=1)

# Save outputs
matrix.to_csv("coverage_matrix.csv", index=False, encoding="utf-8")
# matrix.to_excel("coverage_matrix.xlsx", index=False)

print("✅ Coverage matrix generated: coverage_matrix.csv")


✅ Coverage matrix generated: coverage_matrix.csv


In [23]:
"""
Coverage Matrix + Epic Rollup
-----------------------------
Generates requirement-level coverage matrix and
aggregated coverage metrics per Epic.

Inputs:
  - requirements.json
  - stories.json
  - testcases.csv

Outputs:
  - coverage_matrix.csv
  - epic_coverage.csv
"""

import json
import pandas as pd

# ------------------ Load Inputs ------------------
reqs = json.load(open("requirements.json", "r", encoding="utf-8"))
stories = json.load(open("stories.json", "r", encoding="utf-8"))
testcases = pd.read_csv("testcases.csv")

# ------------------ Normalize Requirements ------------------
df_reqs = pd.DataFrame(reqs)
if "epic" not in df_reqs.columns:
    df_reqs["epic"] = "General"

df_reqs = df_reqs.rename(columns={
    "req_id": "Requirement ID",
    "text": "Requirement Text",
    "epic": "Epic (Req)"
})

# ------------------ Normalize Stories ------------------
df_stories = pd.DataFrame(stories)
df_stories["Story Id"] = df_stories["story_id"]

df_stories["source_requirement_ids"] = df_stories["source_requirement_ids"].apply(
    lambda x: x if isinstance(x, list) else []
)

def format_citations(cites):
    if not cites:
        return ""
    return "; ".join([f"p{c.get('page')}:{c.get('snippet','')[:80]}" for c in cites])

df_stories["Citations"] = df_stories["citations"].apply(format_citations)

# Explode stories by requirement ID
rows = []
for _, s in df_stories.iterrows():
    for rid in s["source_requirement_ids"]:
        rows.append({
            "Requirement ID": rid,
            "Story Id": s["story_id"],
            "User Story": s["user_story"],
            "Epic (Story)": s.get("epic", ""),
            "Alignment Score": s.get("alignment_score", 0.0),
            "Needs Review": s.get("needs_review", False),
            "Citations": s.get("Citations", "")
        })
df_map = pd.DataFrame(rows)

# ------------------ Test Case Mapping ------------------
tc_counts = testcases.groupby("Story Id").size().reset_index(name="Test Case Count")

# Merge all
matrix = (
    df_reqs
    .merge(df_map, on="Requirement ID", how="left")
    .merge(tc_counts, on="Story Id", how="left")
    .fillna({"Test Case Count": 0})
)

# Coverage classification
def classify(row):
    has_story = pd.notna(row["Story Id"])
    has_test = row["Test Case Count"] > 0

    if has_story and has_test:
        return "✅ Covered"
    elif has_story and not has_test:
        return "⚠️ No tests"
    elif not has_story:
        return "⚠️ No stories"
    else:
        return "❌ Missing everything"

matrix["Coverage Status"] = matrix.apply(classify, axis=1)

# ------------------ Epic Rollup ------------------
epic_rollup = (
    matrix.groupby("Epic (Req)")
    .agg(
        total_reqs=("Requirement ID", "nunique"),
        with_stories=("Story Id", lambda x: x.notna().sum()),
        with_tests=("Test Case Count", lambda x: (x > 0).sum())
    )
    .reset_index()
)

epic_rollup["% Story Coverage"] = (epic_rollup["with_stories"] / epic_rollup["total_reqs"] * 100).round(1)
epic_rollup["% Test Coverage"] = (epic_rollup["with_tests"] / epic_rollup["total_reqs"] * 100).round(1)

# ------------------ Save Outputs ------------------
matrix.to_csv("coverage_matrix.csv", index=False, encoding="utf-8")
epic_rollup.to_csv("epic_coverage.csv", index=False, encoding="utf-8")

print("✅ Requirement-level coverage: coverage_matrix.csv")
print("✅ Epic-level rollup: epic_coverage.csv")


✅ Requirement-level coverage: coverage_matrix.csv
✅ Epic-level rollup: epic_coverage.csv


In [29]:
from compliance_validator import build_compliance_report
build_compliance_report(
    stories_path="stories.json",
    testcases_path="testcases.csv",
    out_csv="compliance_evidence.csv",
    out_xlsx="compliance_evidence.xlsx",
    project_id=PROJECT_ID,
    use_embeddings=True
)




✅ Compliance Evidence Report: compliance_evidence.csv and compliance_evidence.xlsx
   Stories analyzed: 150 | Stories with missing controls: 150


Unnamed: 0,Requirement ID,Story Id,Epic,Priority,User Story,Pages (Citations),Alignment Score,Needs Review,Matched Clauses,Clause Scores,Expected Controls,Detected Controls,Missing Controls,Evidence (Story + Steps)
0,1.1,7c4c1495-1f6d-4462-bd63-191fd15ea6ba,Digitize patient records and automate tracking.,Must,"As a Nurse, I want to access digitized patient...",4;4;6,0.449,False,FDA 21 CFR Part 11 11.10(e); FDA 21 CFR Part 1...,0.63; 0.61; 0.601; 0.552,audit_trail; data_integrity; e_signature; rbac...,,audit_trail; data_integrity; e_signature; rbac...,"As a Nurse, I want to access digitized patient..."
1,1.4,855160d4-16c6-4f25-a186-5279dcbbcfd3,Replace Trillium's current system with a new s...,Must,"As a Doctor, I want to access patient data thr...",5;5;6,0.179,False,FDA 21 CFR Part 11 11.10(e); FDA 21 CFR Part 1...,0.626; 0.594; 0.58; 0.56,audit_trail; data_integrity; e_signature; rbac...,,audit_trail; data_integrity; e_signature; rbac...,"As a Doctor, I want to access patient data thr..."
2,2.1.1,a99367a2-ad05-4a4b-a4d2-91390abb0115,Implement Database for Trillium Health,Must,"As a Doctor, I want to access patient clinical...",6;6,0.622,False,FDA 21 CFR Part 11 11.10(e); FDA 21 CFR Part 1...,0.612; 0.58; 0.573; 0.56,audit_trail; data_integrity; e_signature; rbac...,,audit_trail; data_integrity; e_signature; rbac...,"As a Doctor, I want to access patient clinical..."
3,2.1.2,5dd0d188-2605-4668-8305-d3155760a7e6,Clinician Portal Development,Must,"As a Doctor, I want to access and view patient...",6;13,0.222,False,ISO 13485 4.2.5; FDA 21 CFR Part 11 11.10(e); ...,0.549; 0.549; 0.542; 0.512,audit_trail; data_integrity; e_signature; rbac...,,audit_trail; data_integrity; e_signature; rbac...,"As a Doctor, I want to access and view patient..."
4,2.1.3,bf688cd2-f118-4e36-9bfd-432eebbbf55d,,Must,"As a Patient, I want to check in to Day Health...",10;9,0.184,False,FDA 21 CFR Part 11 11.100; FDA 21 CFR Part 11 ...,0.613; 0.608; 0.558; 0.543,audit_trail; data_integrity; e_signature; rbac...,,audit_trail; data_integrity; e_signature; rbac...,"As a Patient, I want to check in to Day Health..."
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
145,5.4.1.1,4246c8f4-e5ad-4a05-a704-fe8c5127152e,,Must,"As a Clinician, I want to be able to view a pa...",11;6;6;13,0.000,True,ISO 13485 4.2.5; ISO 27001 A.9; IEC 62304 5.1;...,0.538; 0.527; 0.521; 0.505,audit_trail; data_integrity; rbac; risk_manage...,,audit_trail; data_integrity; rbac; risk_manage...,"As a Clinician, I want to be able to view a pa..."
146,5.4.1.2,4b3aa627-8d93-433f-9189-fcd1de80e9c8,,Must,"As a Clinician, I want the application to comm...",15,0.000,True,ISO 27001 A.10; ISO 27001 A.9; FDA 21 CFR Part...,0.59; 0.561; 0.554; 0.549,audit_trail; data_integrity; e_signature; encr...,encryption,audit_trail; data_integrity; e_signature; rbac...,"As a Clinician, I want the application to comm..."
147,5.4.2,dcca3410-e2cd-48f3-8658-325a412eb531,,Must,"As a Nurse, I want to update patient vitals, s...",4,0.000,True,FDA 21 CFR Part 11 11.10(e); FDA 21 CFR Part 1...,0.633; 0.629; 0.588; 0.541,audit_trail; data_integrity; e_signature; rbac...,,audit_trail; data_integrity; e_signature; rbac...,"As a Nurse, I want to update patient vitals, s..."
148,5.4.3,85a4042f-8972-40b9-8def-ef8d803315ab,,Must,"As a user, I want to add, modify, or check pat...",5,0.000,True,FDA 21 CFR Part 11 11.10(e); FDA 21 CFR Part 1...,0.667; 0.631; 0.628; 0.624,audit_trail; data_integrity; e_signature; rbac...,,audit_trail; data_integrity; e_signature; rbac...,"As a user, I want to add, modify, or check pat..."


In [28]:
! pip install openpyxl

I0000 00:00:1756969455.938828   20878 fork_posix.cc:71] Other threads are currently calling into gRPC, skipping fork() handlers


Collecting openpyxl
  Downloading openpyxl-3.1.5-py2.py3-none-any.whl.metadata (2.5 kB)
Collecting et-xmlfile (from openpyxl)
  Downloading et_xmlfile-2.0.0-py3-none-any.whl.metadata (2.7 kB)
Downloading openpyxl-3.1.5-py2.py3-none-any.whl (250 kB)
Downloading et_xmlfile-2.0.0-py3-none-any.whl (18 kB)
Installing collected packages: et-xmlfile, openpyxl
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2/2[0m [openpyxl]1/2[0m [openpyxl]
[1A[2KSuccessfully installed et-xmlfile-2.0.0 openpyxl-3.1.5
