In [None]:
# Cell 1: Imports & setup
# import re
# import json
from pathlib import Path
from typing import List, Dict, Tuple

import torch
import numpy as np
import faiss
import pandas as pd
import requests

from PyPDF2 import PdfReader
import docx
import openpyxl
from sentence_transformers import SentenceTransformer

In [39]:
# Cell 2: Utility functions for text extraction
def extract_text_from_pdf(path: str) -> str:
    reader = PdfReader(path)
    pages = []
    for p in reader.pages:
        try:
            pages.append(p.extract_text() or "")
        except Exception:
            pages.append("")
    return "\n".join(pages)

def extract_text_from_docx(path: str) -> str:
    doc = docx.Document(path)
    return "\n".join(p.text for p in doc.paragraphs)

def extract_text_from_txt(path: str) -> str:
    with open(path, "r", encoding="utf-8", errors="ignore") as f:
        return f.read()

def extract_text_from_xlsx(path: str) -> str:
    wb = openpyxl.load_workbook(path, data_only=True)
    texts = []
    for sheet in wb.worksheets:
        for row in sheet.iter_rows(values_only=True):
            row_txt = " ".join("" if v is None else str(v) for v in row)
            texts.append(row_txt)
    return "\n".join(texts)

def extract_text(path: str) -> str:
    p = Path(path)
    if p.suffix.lower() == ".pdf":
        return extract_text_from_pdf(str(p))
    if p.suffix.lower() in [".docx", ".doc"]:
        return extract_text_from_docx(str(p))
    if p.suffix.lower() in [".txt", ".md"]:
        return extract_text_from_txt(str(p))
    if p.suffix.lower() in [".xlsx", ".xls"]:
        return extract_text_from_xlsx(str(p))
    try:
        return extract_text_from_txt(str(p))
    except Exception:
        return ""


In [40]:
# Cell 3: Chunking utility
def chunk_text(text: str, chunk_size: int = 800, overlap: int = 200) -> List[str]:
    cleaned = " ".join(text.split())
    chunks = []
    i = 0
    L = len(cleaned)
    while i < L:
        end = min(i + chunk_size, L)
        chunks.append(cleaned[i:end])
        i = end - overlap if end < L else end
    return [c for c in chunks if len(c.strip()) > 20]

In [41]:
# Cell 4: RAG index builder class
class SimpleRAGIndex:
    def __init__(self, embedding_model_name: str = "sentence-transformers/all-mpnet-base-v2"):
        device = "cuda" if torch.cuda.is_available() else "cpu"
        print(f"[RAG] Loading embedding model on device: {device}")
        self.model = SentenceTransformer(embedding_model_name, device=device)
        self.dim = self.model.get_sentence_embedding_dimension()
        self.index = None
        self.metadatas = []

    def build(self, docs: List[Tuple[str, str]], batch_size: int = 64):
        all_chunks = []
        metas = []
        for doc_id, text in docs:
            chunks = chunk_text(text)
            for i, c in enumerate(chunks):
                metas.append({"doc_id": doc_id, "chunk_id": f"{doc_id}__{i}", "text": c})
                all_chunks.append(c)
        embs = []
        for i in range(0, len(all_chunks), batch_size):
            batch = all_chunks[i:i+batch_size]
            print(f"[Embedding] batch {i}/{len(all_chunks)}")
            emb = self.model.encode(batch, convert_to_numpy=True, show_progress_bar=False, device=self.model.device)
            embs.append(emb)
        vectors = np.vstack(embs).astype("float32") if embs else np.zeros((0, self.dim), dtype='float32')
        self.index = faiss.IndexFlatL2(self.dim)
        self.index.add(vectors)
        self.metadatas = metas
        print(f"[RAG] Built index with {self.index.ntotal} vectors.")

    def retriever(self, query: str, top_k: int = 6) -> List[Dict]:
        q_emb = self.model.encode([query], convert_to_numpy=True, device=self.model.device).astype("float32")
        D, I = self.index.search(q_emb, top_k)
        results = []
        for score, idx in zip(D[0], I[0]):
            if idx < 0 or idx >= len(self.metadatas):
                continue
            meta = dict(self.metadatas[idx])
            meta["score"] = float(score)
            results.append(meta)
        return results


In [42]:
# Cell 5: Mistral API call (strict JSON output)
def call_mistral_generate(api_key: str, prompt: str, model: str,
                          max_tokens: int, temperature: float = 0.0) -> str:
    """
    Mistral Chat API call with strict system instruction to return ONLY valid JSON.
    """
    url = "https://api.mistral.ai/v1/chat/completions"
    headers = {
        "Authorization": f"Bearer {api_key}",
        "Content-Type": "application/json",
    }
    payload = {
        "model": model,
        "messages": [
            {"role": "system", "content": "You are an assistant that ONLY outputs strictly valid JSON. "
                                          "No markdown, no comments, no explanations. "
                                          "If unsure, output an empty JSON object {\"tests\": []}."},
            {"role": "user", "content": prompt}
        ],
        "max_tokens": max_tokens,
        "temperature": temperature
    }

    resp = requests.post(url, headers=headers, json=payload, timeout=120)
    if resp.status_code != 200:
        print("❌ Error:", resp.status_code, resp.text)
        resp.raise_for_status()
    data = resp.json()
    return data["choices"][0]["message"]["content"]


In [43]:
# Cell 6: Prompt template (strict JSON, with style rules)
PROMPT_TEMPLATE = """
You are an expert automotive HMI test author. 
Your ONLY task is to output a **valid JSON object** with the following schema:

{
  "tests": [
    {
      "test_name": "string",
      "test_description": "string",
      "steps": [
        {
          "step_name": "string",
          "action": "string",
          "expected_result": "string"
        }
      ]
    }
  ]
}

===== STYLE RULES (MANDATORY) =====
1. Test Name:
   - Must start with a 3-digit sequence number (001_, 002_, 003_, …).
   - Then repeat the test description text.
   - Example: "001_To Check the Availability of Plug and charge during different MMI states".

2. Test Description:
   - Should be identical to the Test Name but without the 3-digit prefix.
   - Example: "To Check the Availability of Plug and charge during different MMI states".

3. Step Names:
   - Always numbered: "Step 1", "Step 2", "Step 3", restarting at Step 1 for each test case.
   - Never use descriptive names like "Activate Plug & Charge". Only "Step X".

4. Action Descriptions:
   - Must be **multi-line and descriptive**, not single short sentences.
   - Should include: preconditions, configurations, CAN signals, and actions.
   - Use bullet points or numbered sub-steps when needed (just as in the expected output).

5. Expected Results:
   - Must exist for EVERY step.
   - Write them clearly, not vague. If unclear, set to: "Observe / Verify behavior matches requirement".

===== Example (shortened) =====
{
  "tests": [
    {
      "test_name": "001_To Check the Availability of Plug and charge during different MMI states",
      "test_description": "To Check the Availability of Plug and charge during different MMI states",
      "steps": [
        {
          "step_name": "Step 1",
          "action": "Set Below Condition:\\n+ Bat Unit Powered\\n+ Power State - ON\\n+ Language: English",
          "expected_result": "Is the user able to set the preconditions?"
        },
        {
          "step_name": "Step 2",
          "action": "Set below CAN signals:\\n1. V_WakeUpType = 000 (Full Wake-Up Mode) [Frame: BCM_A110]\\n2. V_WakeUpSleepCommand = 11 (WakeUp) [Frame: BCM_A110]",
          "expected_result": "Plug and Charge feature should not be available during 'Check Welcome screen'."
        }
      ]
    }
  ]
}

===== INPUTS =====
Requirements excerpt (the new doc to test):
<<<INCOMING_REQ>>>

Relevant context from indexed documents (RAG):
{context}

===== FINAL INSTRUCTIONS =====
- Ensure output is valid JSON (no comments, no markdown, no trailing commas).
- Follow the style rules strictly.
- Return ONLY JSON, nothing else.
"""

In [50]:
# Cell 7: Parse the output safely

import json
import re

def load_json_strict(s: str):
    """Strip markdown fences/BOM and parse strict JSON."""
    s = s.strip()
    s = s.lstrip("\ufeff")          # remove BOM if present
    s = re.sub(r"^\s*```json", "", s)
    s = re.sub(r"```$", "", s)
    return json.loads(s)


def safe_json_parse(raw: str, chunk_id, api_key, model) -> dict:
    """
    Parse model output into JSON safely.
    If parsing fails, call Mistral again to repair/complete the JSON.
    """
    cleaned = raw.strip().replace("```json", "").replace("```", "")
    start, end = cleaned.find("{"), cleaned.rfind("}")
    if start == -1 or end == -1:
        raise ValueError("No JSON object found in model output.")
    json_str = cleaned[start:end+1]

    # --- cleanup ---
    json_str = re.sub(r",(\s*[}\]])", r"\1", json_str)  # remove trailing commas
    json_str = re.sub(r"[\x00-\x1F]", "", json_str)     # remove control chars

    try:
        data = load_json_strict(json_str)
    except Exception:
        # Save broken output
        fname = f"failed_chunk_{chunk_id or 'unknown'}.json"
        with open(fname, "w", encoding="utf-8") as f:
            f.write(raw)
        print(f"⚠️ Saved broken JSON to {fname}. Attempting repair with Mistral...")

        if api_key and model:
            repair_prompt = f"""
You are given partial or invalid JSON for automotive test cases.
Please fix it so that it becomes strictly valid JSON, following this schema:

{{
  "tests": [
    {{
      "test_name": "string",
      "test_description": "string",
      "steps": [
        {{
          "step_name": "string",
          "action": "string",
          "expected_result": "string"
        }}
      ]
    }}
  ]
}}

Broken JSON starts below:
{json_str}

Return ONLY the corrected JSON.
"""
            fixed = call_mistral_generate(
                api_key=api_key,
                prompt=repair_prompt,
                model=model,
                max_tokens=1600
            )
            try:
                data = load_json_strict(fixed)
            except Exception as e2:
                fname2 = f"failed_chunk_{chunk_id or 'unknown'}_repair.json"
                with open(fname2, "w", encoding="utf-8") as f:
                    f.write(fixed)
                raise RuntimeError(f"❌ Repair attempt also failed. Saved to {fname2}\nError: {e2}")
        else:
            raise RuntimeError(f"❌ JSON parse failed for chunk {chunk_id}. No repair attempted (missing API key/model).")

    # --- numbering enforcement ---
    tests = data.get("tests", [])
    for i, t in enumerate(tests, start=1):
        desc = t.get("test_description", "").strip()
        if not desc:
            continue
        prefix = f"{i:03d}_"
        if not t.get("test_name", "").startswith(prefix):
            t["test_name"] = prefix + desc
        t["test_description"] = desc
        for j, s in enumerate(t.get("steps", []), start=1):
            s["step_name"] = f"Step {j}"

    return {"tests": tests}



In [None]:
# Cell 9: Load docs
index_dir = "./requirements_docs"
query_doc = "./PlugNCharge_requirements_doc.xlsx"
mistral_api_key = "YOUR_MISTRAL_API_KEY_HERE"  # replace with your key
out_excel = "./generated_tests.xlsx"
embed_model = "sentence-transformers/all-MiniLM-L6-v2"  # faster
mistral_model = "codestral-2508"

In [52]:
# Cell 8: Save to Excel
def tests_to_excel(tests: List[Dict], out_path: str):
    rows = []
    for t in tests:
        test_name, test_desc = t.get("test_name", ""), t.get("test_description", "")
        steps = t.get("steps", [])
        if not steps:
            rows.append({
                "Test Name": test_name,
                "Test Description": test_desc,
                "Step Name": "",
                "Action Description": "",
                "Expected Results": ""
            })
        else:
            for s in steps:
                rows.append({
                    "Test Name": test_name,
                    "Test Description": test_desc,
                    "Step Name": s.get("step_name", ""),
                    "Action Description": s.get("action", "").replace("\n", "\n"),
                    "Expected Results": s.get("expected_result", "")
                })
    df = pd.DataFrame(rows, columns=[
        "Test Name", "Test Description", "Step Name", "Action Description", "Expected Results"
    ])
    # Excel preserves line breaks if keep_default_na=False and strings contain "\n"
    df.to_excel(out_path, index=False, engine="openpyxl")
    print(f"[Output] written {len(df)} rows to {out_path}")


In [34]:
# Cell 10: Build RAG
docs = []
for p in Path(index_dir).glob("*"):
    if p.is_file():
        txt = extract_text(str(p))
        docs.append((p.name, txt))
print(f"[Indexing] found {len(docs)} documents to index.")

# Build RAG (only once!)
rag = SimpleRAGIndex(embedding_model_name=embed_model)
rag.build(docs)


[Indexing] found 7 documents to index.
[RAG] Loading embedding model on device: cuda
[Embedding] batch 0/31258
[Embedding] batch 64/31258
[Embedding] batch 128/31258
[Embedding] batch 192/31258
[Embedding] batch 256/31258
[Embedding] batch 320/31258
[Embedding] batch 384/31258
[Embedding] batch 448/31258
[Embedding] batch 512/31258
[Embedding] batch 576/31258
[Embedding] batch 640/31258
[Embedding] batch 704/31258
[Embedding] batch 768/31258
[Embedding] batch 832/31258
[Embedding] batch 896/31258
[Embedding] batch 960/31258
[Embedding] batch 1024/31258
[Embedding] batch 1088/31258
[Embedding] batch 1152/31258
[Embedding] batch 1216/31258
[Embedding] batch 1280/31258
[Embedding] batch 1344/31258
[Embedding] batch 1408/31258
[Embedding] batch 1472/31258
[Embedding] batch 1536/31258
[Embedding] batch 1600/31258
[Embedding] batch 1664/31258
[Embedding] batch 1728/31258
[Embedding] batch 1792/31258
[Embedding] batch 1856/31258
[Embedding] batch 1920/31258
[Embedding] batch 1984/31258
[Embed

In [53]:
# Cell 11: Extract input text
incoming_text = extract_text(query_doc)

In [54]:
# Cell 12: Chunked test generation workflow
def generate_tests_chunked(
    rag: SimpleRAGIndex,
    incoming_text: str,
    api_key: str = mistral_api_key,
    model: str = mistral_model,
    chunk_size: int = 2000,
    overlap: int = 500
) -> List[Dict]:
    """
    Split incoming_text into smaller chunks, generate test cases per chunk,
    and merge results.
    """
    chunks = chunk_text(incoming_text, chunk_size=chunk_size, overlap=overlap)
    print(f"[Chunked] split incoming doc into {len(chunks)} chunks.")

    all_tests = []
    for i, chunk in enumerate(chunks):
        print(f"\n[Chunk {i+1}/{len(chunks)}] processing...")
        retrieved = rag.retriever(chunk, top_k=3)
        context_blocks = "\n\n---\n\n".join([f"[{r['doc_id']}|score={r['score']}]\n{r['text']}" for r in retrieved])
        prompt = PROMPT_TEMPLATE.replace("{context}", context_blocks).replace("<<<INCOMING_REQ>>>", chunk[:1500])

        raw = call_mistral_generate(api_key=api_key, prompt=prompt, model=model, max_tokens=1200, temperature=0.0)

        try:
            payload = safe_json_parse(raw, chunk_id=i+1, api_key=mistral_api_key, model=mistral_model)
            tests = payload.get("tests", [])
            print(f"   -> got {len(tests)} tests.")
            all_tests.extend(tests)
        except Exception as e:
            print(f"   ❌ Failed to parse chunk {i+1}: {e}")
            continue

    # Final renumbering across all chunks
    for i, t in enumerate(all_tests, start=1):
        desc = t.get("test_description", "").strip()
        prefix = f"{i:03d}_"
        t["test_name"] = prefix + desc if desc else f"{prefix}Untitled Test"
        t["test_description"] = desc if desc else t["test_name"]
        for j, s in enumerate(t.get("steps", []), start=1):
            s["step_name"] = f"Step {j}"

    print(f"[Chunked] total tests collected: {len(all_tests)}")
    return all_tests


In [55]:
# Cell 13: Save to Excel
tests = generate_tests_chunked(
    rag,
    incoming_text,
    api_key=mistral_api_key,
    model=mistral_model
)

tests_to_excel(tests, out_excel)

[Chunked] split incoming doc into 8 chunks.

[Chunk 1/8] processing...
   -> got 2 tests.

[Chunk 2/8] processing...
   -> got 2 tests.

[Chunk 3/8] processing...
   -> got 2 tests.

[Chunk 4/8] processing...
   -> got 3 tests.

[Chunk 5/8] processing...
   -> got 2 tests.

[Chunk 6/8] processing...
⚠️ Saved broken JSON to failed_chunk_6.json. Attempting repair with Mistral...
   -> got 4 tests.

[Chunk 7/8] processing...
   -> got 0 tests.

[Chunk 8/8] processing...
   -> got 2 tests.
[Chunked] total tests collected: 17
[Output] written 59 rows to ./generated_tests.xlsx
