<a href="https://colab.research.google.com/github/gitsu11/Test-Cases-RAG/blob/rag_test_visible/rag_test_v3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## **Installing pre-reqs**

In [None]:
!pip -q install faiss-cpu sentence-transformers langchain langchain-community langchain-text-splitters pypdf python-docx llama-cpp-python pandas tqdm requests

## **Setting up google drive and folders**

In [None]:
from google.colab import drive
drive.mount('/content/drive')  # uncomment if you want persistence

import os, pathlib
BASE_DIR = "/content/drive/MyDrive/Colab"
DATA_DIR = f"{BASE_DIR}/dataset_raw"   # raw zips/files
DOCS_DIR = f"{BASE_DIR}/docs"          # normalized text
OUT_DIR  = f"{BASE_DIR}/rag_outputs"   # indices, exports, model
for d in (DATA_DIR, DOCS_DIR, OUT_DIR): pathlib.Path(d).mkdir(parents=True, exist_ok=True)

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


## **Getting data from zenodo**

In [None]:
import requests, os, json, re, shutil, zipfile
from pathlib import Path

ZENODO_REC_ID = "13880060"  # Requirements data sets (user stories)
api_url = f"https://zenodo.org/api/records/{ZENODO_REC_ID}"
rec = requests.get(api_url, timeout=60).json()

files = rec.get("files", []) or rec.get("metadata", {}).get("related_identifiers", [])  # fallback
downloaded = []

if "files" in rec:
    for f in rec["files"]:
        url = f["links"]["self"]
        name = f["key"]
        dest = Path(DATA_DIR)/name
        if not dest.exists():
            with requests.get(url, stream=True, timeout=120) as r:
                r.raise_for_status()
                with open(dest, "wb") as fh:
                    shutil.copyfileobj(r.raw, fh)
        downloaded.append(str(dest))
else:
    print("Could not find file list in record JSON; open the Zenodo page and download manually.")

print(f"Downloaded {len(downloaded)} files")
downloaded[:8]

Downloaded 22 files


['/content/drive/MyDrive/Colab/dataset_raw/g16-mis.txt',
 '/content/drive/MyDrive/Colab/dataset_raw/g21-badcamp.txt',
 '/content/drive/MyDrive/Colab/dataset_raw/g17-cask.txt',
 '/content/drive/MyDrive/Colab/dataset_raw/g10-scrumalliance.txt',
 '/content/drive/MyDrive/Colab/dataset_raw/g26-racdam.txt',
 '/content/drive/MyDrive/Colab/dataset_raw/g13-planningpoker.txt',
 '/content/drive/MyDrive/Colab/dataset_raw/g12-camperplus.txt',
 '/content/drive/MyDrive/Colab/dataset_raw/g19-alfred.txt']

In [None]:
for p in list(Path(DATA_DIR).glob("*.zip")):
    with zipfile.ZipFile(p, 'r') as zf:
        zf.extractall(DATA_DIR)
    print("Extracted:", p.name)

In [None]:
import gzip, shutil
from pathlib import Path

RAW_DIR      = "/content/drive/MyDrive/Colab/dataset_raw"   # your raw data folder
EXTRACT_DIR  = "/content/drive/MyDrive/Colab/extracted_raw" # new folder for decompressed text

Path(EXTRACT_DIR).mkdir(parents=True, exist_ok=True)

def is_gzip_sig(path):
    with open(path, "rb") as f:
        return f.read(3) == b"\x1f\x8b\x08"  # gzip magic number

decompressed_files = []

for p in Path(RAW_DIR).rglob("*"):
    if not p.is_file():
        continue
    # check extension OR signature
    if p.suffix.lower() == ".gz" or is_gzip_sig(str(p)):
        out_name = p.stem  # remove .gz extension
        out_path = Path(EXTRACT_DIR) / (out_name + ".txt")

        try:
            with gzip.open(p, "rb") as f_in, open(out_path, "wb") as f_out:
                shutil.copyfileobj(f_in, f_out)
            decompressed_files.append(out_path)
            print(f"✅ Decompressed: {p.name} → {out_path.name}")
        except Exception as e:
            print(f"⚠️ Failed on {p.name}: {e}")

print(f"\nTotal decompressed: {len(decompressed_files)}")

✅ Decompressed: g16-mis.txt → g16-mis.txt
✅ Decompressed: g21-badcamp.txt → g21-badcamp.txt
✅ Decompressed: g17-cask.txt → g17-cask.txt
✅ Decompressed: g10-scrumalliance.txt → g10-scrumalliance.txt
✅ Decompressed: g26-racdam.txt → g26-racdam.txt
✅ Decompressed: g13-planningpoker.txt → g13-planningpoker.txt
✅ Decompressed: g12-camperplus.txt → g12-camperplus.txt
✅ Decompressed: g19-alfred.txt → g19-alfred.txt
✅ Decompressed: g08-frictionless.txt → g08-frictionless.txt
✅ Decompressed: g22-rdadmp.txt → g22-rdadmp.txt
✅ Decompressed: g25-duraspace.txt → g25-duraspace.txt
✅ Decompressed: g27-culrepo.txt → g27-culrepo.txt
✅ Decompressed: g24-unibath.txt → g24-unibath.txt
✅ Decompressed: g28-zooniverse.txt → g28-zooniverse.txt
✅ Decompressed: g02-federalspending.txt → g02-federalspending.txt
✅ Decompressed: g03-loudoun.txt → g03-loudoun.txt
✅ Decompressed: g14-datahub.txt → g14-datahub.txt
✅ Decompressed: g18-neurohub.txt → g18-neurohub.txt
✅ Decompressed: g11-nsf.txt → g11-nsf.txt
✅ Decompre

## **Normalize everything to plain text**

In [None]:
import re, glob
from pypdf import PdfReader
from docx import Document

def file_to_text(path):
    p = path.lower()
    if p.endswith(".pdf"):
        try:
            pages = []
            reader = PdfReader(path)
            for i, pg in enumerate(reader.pages, start=1):
                t = pg.extract_text() or ""
                pages.append(f"[PAGE {i}] {t}")
            return "\n".join(pages)
        except: return ""
    if p.endswith(".docx"):
        try:
            doc = Document(path)
            return "\n".join(p.text for p in doc.paragraphs)
        except: return ""
    if any(p.endswith(ext) for ext in (".txt",".md",".rst",".csv")):
        try:
            with open(path, "r", errors="ignore") as f: return f.read()
        except: return ""
    return ""  # unsupported

def clean_text(s):
    s = s.replace("\x00"," ")
    s = re.sub(r"[ \t]+"," ", s)
    s = re.sub(r"\n{3,}","\n\n", s)
    return s.strip()

import os, pathlib
count=0
for root, _, files in os.walk(EXTRACT_DIR):
    for fn in files:
        src = os.path.join(root, fn)
        txt = clean_text(file_to_text(src))
        if txt and len(txt) > 50:
            out = f"{DOCS_DIR}/{pathlib.Path(fn).stem}.txt"
            with open(out, "w") as f: f.write(txt)
            count += 1
print("Normalized text files:", count)

Normalized text files: 22


## **Chunking, Embedding, FAISS**

In [None]:
from langchain_text_splitters import RecursiveCharacterTextSplitter
from sentence_transformers import SentenceTransformer
import numpy as np, faiss, pickle, glob, os

# Load normalized docs
docs = []
for fp in sorted(glob.glob(f"{DOCS_DIR}/*.txt")):
    with open(fp, "r", errors="ignore") as f:
        docs.append({"source": os.path.basename(fp), "text": f.read()})
print("Docs:", len(docs))

# Chunk
splitter = RecursiveCharacterTextSplitter(
    chunk_size=1200, chunk_overlap=150,
    separators=["\n## ","\n# ","\n\n","\n"," ",""]
)
chunks = []
for d in docs:
    for ch in splitter.split_text(d["text"]):
        chunks.append({"source": d["source"], "text": ch})
print("Chunks:", len(chunks))

# Embeddings + FAISS (cosine via normalized IP)
EMB_NAME = "sentence-transformers/all-MiniLM-L6-v2"
embed_model = SentenceTransformer(EMB_NAME)

B=128; vecs=[]
texts = [c["text"] for c in chunks]
for i in range(0, len(texts), B):
    vecs.append(embed_model.encode(texts[i:i+B], convert_to_numpy=True, normalize_embeddings=True))
vecs = np.vstack(vecs).astype("float32")

index = faiss.IndexFlatIP(vecs.shape[1])
index.add(vecs)

# Persist
with open(f"{OUT_DIR}/chunks.pkl","wb") as f: pickle.dump(chunks,f)
faiss.write_index(index, f"{OUT_DIR}/faiss.index")
with open(f"{OUT_DIR}/emb_model.txt","w") as f: f.write(EMB_NAME)
print("Index built.")

Docs: 22
Chunks: 216
Index built.


**Retriever helper**

In [None]:
def retrieve(query, k=8):
    q = embed_model.encode([query], convert_to_numpy=True, normalize_embeddings=True).astype("float32")
    D, I = index.search(q, k)
    hits=[]
    for rank,(idx,score) in enumerate(zip(I[0],D[0]), start=1):
        hits.append({"rank":rank,"score":float(score),"source":chunks[idx]["source"],"text":chunks[idx]["text"]})
    return hits

## **Local LLM via llama_cpp**


In [None]:
import os, subprocess, pathlib

MODEL_URL = "https://huggingface.co/TheBloke/Mistral-7B-Instruct-v0.2-GGUF/resolve/main/mistral-7b-instruct-v0.2.Q4_K_M.gguf"
MODEL_PATH = f"{OUT_DIR}/mistral-7b-instruct.Q4_K_M.gguf"
if not os.path.exists(MODEL_PATH):
    !wget -q -O "$MODEL_PATH" "$MODEL_URL"

from llama_cpp import Llama
llm = Llama(model_path=MODEL_PATH, n_ctx=8192, n_threads=8, n_gpu_layers=2, temperature=0.2)

def chat_local(messages, max_tokens=1024, temperature=0.2):
    out = llm.create_chat_completion(messages=messages, temperature=temperature, max_tokens=max_tokens)
    return out["choices"][0]["message"]["content"]

llama_model_loader: loaded meta data with 24 key-value pairs and 291 tensors from /content/drive/MyDrive/Colab/rag_outputs/mistral-7b-instruct.Q4_K_M.gguf (version GGUF V3 (latest))
llama_model_loader: Dumping metadata keys/values. Note: KV overrides do not apply in this output.
llama_model_loader: - kv   0:                       general.architecture str              = llama
llama_model_loader: - kv   1:                               general.name str              = mistralai_mistral-7b-instruct-v0.2
llama_model_loader: - kv   2:                       llama.context_length u32              = 32768
llama_model_loader: - kv   3:                     llama.embedding_length u32              = 4096
llama_model_loader: - kv   4:                          llama.block_count u32              = 32
llama_model_loader: - kv   5:                  llama.feed_forward_length u32              = 14336
llama_model_loader: - kv   6:                 llama.rope.dimension_count u32              = 128
llama_model

## **Prompt and RAG generation**

In [None]:
TESTCASE_SYSTEM = """You are a senior QA engineer. Generate rigorous, unambiguous test cases
based ONLY on the provided context snippets. If context is missing details, list assumptions and gaps."""

TESTCASE_USER_TEMPLATE = """Context (RAG snippets):
---
{context}
---
Task: Generate test cases for the scope: "{scope}".
If acceptance criteria (AC) appear in the context, map each test to at least one AC id/phrase.

Output (Markdown):
- Feature: <name>
- Scope: <scope>
- Assumptions/Notes: <gaps or clarifications needed>
- Test Cases (table):
  | ID | Title | Type (pos/neg/edge) | Pre-Conditions | Steps | Expected Result | Priority (H/M/L) | MapsTo (AC id/phrase) |
After the table:
- Coverage: bullets of ACs covered and any missing ACs
- Additional Negative/Edge Ideas: bullets
"""

def make_context(hits, max_chars=1000):
    blocks=[]
    for h in hits:
        t = h["text"].strip()
        if len(t) > max_chars: t = t[:max_chars] + "..."
        blocks.append(f"[{h['source']} • score={h['score']:.2f}]\n{t}")
    return "\n\n".join(blocks)

def generate_test_cases(scope_query, k=8, use="local"):
    hits = retrieve(scope_query, k=k)
    ctx = make_context(hits)
    messages = [
        {"role":"system","content":TESTCASE_SYSTEM},
        {"role":"user","content":TESTCASE_USER_TEMPLATE.format(context=ctx, scope=scope_query)}
    ]
    out = chat_local(messages)  # or chat_api(messages)
    return out, hits

## Defining function to save model outputs to drive.

In [None]:
import os, time, re
import pandas as pd

def parse_markdown_table(md_text: str):
    """
    Extracts the first Markdown table from LLM output and returns as DataFrame.
    Assumes '|' separated table rows with a header line.
    """
    lines = [ln.strip() for ln in md_text.splitlines() if ln.strip().startswith("|")]
    if not lines:
        return pd.DataFrame()

    # Remove markdown alignment row (| --- | --- | etc.)
    table_lines = [ln for ln in lines if not re.match(r'^\|\s*:?-+:?\s*\|', ln)]
    rows = [[c.strip() for c in ln.strip("|").split("|")] for ln in table_lines]

    if len(rows) < 2:
        return pd.DataFrame()

    header, body = rows[0], rows[1:]
    return pd.DataFrame(body, columns=header)

def generate_and_save(scope_query, k=8, use="local"):
    # Run your existing generator
    md_text, evidence = generate_test_cases(scope_query, k=k, use=use)

    # Timestamped filenames
    ts = time.strftime("%Y%m%d-%H%M%S")
    base = scope_query.strip().replace(" ", "_")[:50]

    md_path = os.path.join(OUT_DIR, f"{base}_{ts}.md")
    ev_path = os.path.join(OUT_DIR, f"{base}_{ts}_evidence.txt")
    csv_path = os.path.join(OUT_DIR, f"{base}_{ts}.csv")

    # Save markdown
    with open(md_path, "w", encoding="utf-8") as f:
        f.write(md_text)

    # Save evidence
    with open(ev_path, "w", encoding="utf-8") as f:
        for h in evidence:
            f.write(f"[{h['source']} • score={h['score']:.2f}]\n{h['text']}\n\n")

    # Parse table → CSV
    df = parse_markdown_table(md_text)
    if not df.empty:
        df.to_csv(csv_path, index=False)
        print(f"✅ Saved CSV with {len(df)} rows: {csv_path}")
    else:
        print("⚠️ No markdown table parsed, skipping CSV export.")

    print(f"✅ Saved outputs:\n- {md_path}\n- {ev_path}")
    return md_text, evidence

In [None]:
#trying it
md_text, evidence = generate_test_cases(
    "User registration: email verification, strong password policy, duplicate email handling",
    k=10, use="local"
)
print(md_text[:1500])

llama_perf_context_print:        load time =   48945.95 ms
llama_perf_context_print: prompt eval time =   48945.47 ms /  2605 tokens (   18.79 ms per token,    53.22 tokens per second)
llama_perf_context_print:        eval time =   76518.28 ms /  1023 runs   (   74.80 ms per token,    13.37 tokens per second)
llama_perf_context_print:       total time =  126427.37 ms /  3628 tokens
llama_perf_context_print:    graphs reused =        990


 - Feature: User Registration
- Scope: Email verification, strong password policy, duplicate email handling
- Assumptions/Notes: The system allows users to register with a valid email address and a strong password.

- Test Cases (table):
  | ID | Title | Type (pos/neg/edge) | Pre-Conditions | Steps | Expected Result | Priority (H/M/L) | MapsTo (AC id/phrase) |
  | 1 | Valid Email Registration | Positive | None | User enters a valid email address and a strong password during registration | User is successfully registered and receives a verification email | High | As a user, I want to register with a valid email address (g21-badcamp.txt, g10-scrumalliance.txt, g13-planningpoker.txt, g10-scrumalliance.txt, g03-loudoun.txt, g27-culrepo.txt, g04-recycling.txt, g26-racdam.txt, g02-federalspending.txt, g12-camperplus.txt) |
  | 2 | Invalid Email Registration | Negative | None | User enters an invalid email address during registration | Registration fails with an error message | Medium | As a 

In [None]:
print(evidence)

[{'rank': 1, 'score': 0.28261128067970276, 'source': 'g21-badcamp.txt', 'text': "As a anonymoususer, I want to view a list of sponsors, so that I can thank all the awesome sponsors.\nAs a anonymoususer, I want to view a list of user profiles, so that I know who is attending the conference.\nAs a trainingcoordinator, I want to email all the trainers at once from the website for info and updates, so that It is easier to use.\nAs a attendee, I want to have a very clear map where the buildings and rooms are, so that I can make it to class on time.\nAs a trainer, I want to edit my training node myself, so that the training coordinators don't have to and links to slides.\nAs a trainee, I want to limit to one registration per day per authenticated user, so that we don't have duplicate spaces.\nAs a trainingcoordinator, I want to have an admin view that helps track registration status for each attendee so that, so that we can see if attendend, refunded, no show.\nAs a attendee, I want to be ad

In [None]:
#trying it
md_text, evidence = generate_test_cases(
    "User login: email verification, wrong credentials, password change",
    k=10, use="local"
)
print(md_text[:1500])

Llama.generate: 17 prefix-match hit, remaining 2577 prompt tokens to eval
llama_perf_context_print:        load time =   48945.95 ms
llama_perf_context_print: prompt eval time =   47374.71 ms /  2577 tokens (   18.38 ms per token,    54.40 tokens per second)
llama_perf_context_print:        eval time =   76186.71 ms /  1023 runs   (   74.47 ms per token,    13.43 tokens per second)
llama_perf_context_print:       total time =  124550.12 ms /  3600 tokens
llama_perf_context_print:    graphs reused =        990


 - Feature: User Login
- Scope: Email verification, wrong credentials, password change
- Assumptions/Notes: The system allows users to register with a valid email address and create a password.

- Test Cases (table):
  | ID | Title | Type (pos/neg/edge) | Pre-Conditions | Steps | Expected Result | Priority (H/M/L) | MapsTo (AC id/phrase) |
  | 1 | Valid Email and Correct Credentials | Positive | User is registered with a valid email and password | Enter valid email and password in login form | User is granted access to the system | High | As a user, I want to log in with valid email and password (g04-recycling.txt) |
  | 2 | Valid Email and Incorrect Credentials | Negative | User is registered with a valid email and incorrect password | Enter valid email and incorrect password in login form | User is not granted access to the system | Medium | As a user, I want to log in with incorrect credentials (g04-recycling.txt) |
  | 3 | Invalid Email and Correct Credentials | Negative | User ent

In [None]:
print(evidence)

[{'rank': 1, 'score': 0.2052965611219406, 'source': 'g04-recycling.txt', 'text': "As a user, I want to view all locations of recycling centers on a map, so that I can check which routes to take to drop off waste.\nAs a user, I want to upload my week's schedule, so that I can get recommendations for recycling centers that best fit my availability.\nAs a user, I want to link my email account to my profile, so that I can get a temporary password in case I forget my own one.\nAs a user, I want to contact the administrators, so that I can give feedback or ask for help.\nAs an admin, I want to add recycling center information, so that I can keep the database up-to-date over time.\nAs an admin, I want to view user error logs, so that I can fix or review any issues that are being faced by users of the system.\nAs an admin, I want to onboard recycling centers on the platform, so that I can increase information accuracy.\nAs a superuser, I want to update the recycling center information, so that