In [1]:
"""
hirag_fix_pipeline.py

End-to-end pipeline:
- PDF -> text
- hierarchical parse (numbered headings)
- AzureOpenAIEmbeddings -> FAISS index
- hierarchical retrieval
- AzureChatOpenAI -> answers saved to .jsonl

Usage:
    - Set environment variables:
        AZURE_OPENAI_API_KEY
        AZURE_OPENAI_ENDPOINT (e.g. https://azure-openai-models-devs.openai.azure.com/)
        AZURE_EMBEDDING_DEPLOYMENT (e.g. text-embedding-3-large)
        AZURE_CHAT_DEPLOYMENT (e.g. gpt-4o)

    - Then run: python hirag_fix_pipeline.py
"""

'\nhirag_fix_pipeline.py\n\nEnd-to-end pipeline:\n- PDF -> text\n- hierarchical parse (numbered headings)\n- AzureOpenAIEmbeddings -> FAISS index\n- hierarchical retrieval\n- AzureChatOpenAI -> answers saved to .jsonl\n\nUsage:\n    - Set environment variables:\n        AZURE_OPENAI_API_KEY\n        AZURE_OPENAI_ENDPOINT (e.g. https://azure-openai-models-devs.openai.azure.com/)\n        AZURE_EMBEDDING_DEPLOYMENT (e.g. text-embedding-3-large)\n        AZURE_CHAT_DEPLOYMENT (e.g. gpt-4o)\n\n    - Then run: python hirag_fix_pipeline.py\n'

In [2]:
import os
import json
from pathlib import Path
from typing import List, Dict, Optional
from pypdf import PdfReader
import re
import uuid
import numpy as np
import faiss
from tqdm import tqdm
from dotenv import load_dotenv
from langchain_openai import AzureOpenAIEmbeddings, AzureChatOpenAI

load_dotenv() 

True

### config

In [3]:
AZURE_API_KEY = os.getenv("AZURE_OPENAI_API_KEY")
AZURE_ENDPOINT = os.getenv("AZURE_OPENAI_ENDPOINT")
EMBED_DEPLOY = os.getenv("AZURE_EMBEDDING_DEPLOYMENT")
CHAT_DEPLOY = os.getenv("AZURE_CHAT_DEPLOYMENT")
OPENAI_API_VERSION_EMB = os.getenv("AZURE_EMBEDDING_API_VERSION")
OPENAI_API_VERSION_CHAT = os.getenv("AZURE_CHAT_API_VERSION")

### I/O paths 

In [4]:
PDF_FILES = [
    "../data/raw/AUTOSAR_SWS_ECUStateManager.pdf",
    "../data/raw/Automotive_SPICE_PAM_31_EN.pdf",
]
TXT_DIR = Path("../data/txt")
MD_DIR = Path("../data/md")
INDEX_DIR = Path("../outputs/indices")
OUTPUT_ANS_FILE = Path("./hirag_answers.jsonl")

### Quries 

In [5]:
AUTOSAR_QUERIES = [
    "What are the activities executed before EcuM_Init in the Startup phase?",
    "Which ECU Manager responsibilities remain active during SLEEP phases when the BSW Mode Manager is not operational?",
    "How does the ECU Manager ensure consistency of configuration parameters across pre-compile, link-time, and post-build phases?",
    "What steps must be taken if a wakeup event is detected during the shutdown process?",
    "How does the ECU Manager handle a situation where the RAM integrity check fails after waking up from a sleep state?",
    "Is the EcuM_ValidateWakeupEvent function allowed to be called from an Interrupt Service Routine (ISR)?",
    "What is the responsibility of the BswM in relation to the EcuM during the UP phase?",
    "Explain the interaction between the SchM, Os, and EcuM during the StartPostOS sequence.",
    "Can the Dem module report errors during the driver initialization performed by EcuM_AL_DriverInitZero?",
    "What is the role of the BswM in ECU state transitions?",
    "Explain the difference between EcuM and Os in managing ECU lifecycle",
    "How does NVRAM interact with EcuM during reset cycles?",
    "Which shutdown targets are supported by the ECU Manager and how are they configured?",
    "What is the sequence of actions in the StartPostOS phase of the ECU Manager?",
    "List the steps of wakeup validation and their timeout handling in ECU Manager",
    "List all the activities in the exact order they are performed in the StartPreOS sequence.",
    "Describe the complete wakeup validation protocol, starting from the event detection by a driver to the final validation by a manager module.",
    "What are the differences between the Halt Sequence and the Poll Sequence in the SLEEP phase?",
    "What are the AUTOSAR Basic Software requirements [SRS_BSW] that the ECU Manager fulfills, and which remain unfulfilled?",
    "According to the configuration tables, what is the purpose of EcuMSetClockAllowedUsers?",
    "In Figure 10.1, what are the main containers and configuration parameters of EcuM?",
    "Explain the wakeup handling process as shown in Figure 9.12 (FlexRay transceiver wakeup by polling).",
    "According to Table 7.1, what is the second activity in the StartPreOS sequence and is it optional?",
    "In Figure 7.12 (Halt Sequence), what happens immediately after the Mcu_SetMode call if a wakeup is detected?",
    "From the Requirements Tracing table (6.1), which specific SWS requirement satisfies the SRS requirement [SRS_ModelMgm_09136]?",
]

SPICE_QUERIES = [
    "What are the specific rating scale refinements (like P+ or L-) and what percentage ranges do they represent? Explain the purpose of having this refinement.",
    "For a process to achieve Capability Level 3, what is the required rating for the 'Work Product Management' process attribute (PA 2.2) and why is it different from the requirement for Level 2?",
    "Describe the 'plug-in concept' mentioned in the document and its significance.",
    "What are the outcomes of the ACQ.11 Technical Requirements process, and how are they linked to acquisition needs?",
    "Explain the purpose and main outcomes of SYS.2 System Requirements Analysis.",
    "Which indicators does the Process Assessment Model use to judge whether process outcomes are achieved?",
    "What does ECU stand for, and in what context is it used in Automotive SPICE?",
    "Differentiate between PRM and PAM in Automotive SPICE.",
    "How is BP different from WP in the Process Assessment Model?",
    "What is the purpose of the PAM according to the SPICE framework?",
    "In the context of SYS.3, what does establishing bidirectional traceability support?",
    "Who are the members of the AutoSIG/SUG mentioned in the document history?",
    "What is the final outcome of successfully implementing the ACQ.4 Supplier Monitoring process?",
    "According to the measurement framework, what does a 'Largely achieved' (L) rating signify in terms of percentage and evidence?",
    "What are the two types of indicators provided by the Process Assessment Model?",
    "List the capability levels defined in ISO/IEC 33020 and the process attributes associated with each level.",
    "What are the steps of ACQ.3 Contract Agreement, and what are its output work products?",
    "How does the Process Capability Level Model (Table 16) define requirements for achieving Level 3?",
    "List all processes belonging to the 'Supporting life cycle processes category' along with their process IDs",
    "What is the process attribute ID for 'Quantitative control' and at which capability level is it first introduced?",
    "In the work product characteristics table, what is the WP ID for 'Software architectural design' and list three of its characteristics.",
    "According to Table 12 and Table 13, what are the rating scale categories and their percentage values?",
    "In Table 2–9, what are the different process groups under the Primary, Supporting, and Organizational life cycle categories?",
    "Explain the structure of Work Product Characteristics (Annex B, Table B.2).",
    "From Table 1, list five abbreviations with their meanings that are critical to Automotive SPICE.",
]

### Utilities 

In [6]:
def extract_text_from_pdf(pdf_path: str) -> str:
    reader = PdfReader(pdf_path)
    pages = []
    for p in reader.pages:
        try:
            text = p.extract_text() or ""
            pages.append(text)
        except Exception:
            pages.append("")
    return "\n\n".join(pages)

# numbered-heading parser with parent linking
numbered_header_re = re.compile(
    r"^\s*(\d+(?:\.\d+){0,3})\s*\.?\s+(?P<title>[A-Z0-9][^\n]{0,300})\s*$",
    re.M
)

code_header_re = re.compile(r"^(?P<code>[A-Z]{1,6}\.\d+)\s+(?P<title>.+)$", re.M)

In [7]:
def parse_numbered_headings(text: str) -> List[Dict]:
    """
    Return list of chunks with fields:
      id, level (int), title, parent_id (or None), parent_num (like "2.1"), content
    """
    matches = list(numbered_header_re.finditer(text))
    if not matches:
        matches = list(code_header_re.finditer(text))

    if not matches:
        return [{
            "id": str(uuid.uuid4()),
            "level": 1,
            "title": "Document",
            "parent_id": None,
            "parent_num": None,
            "content": text.strip(),
        }]

    # collect boundaries
    boundaries = []
    for m in matches:
        start = m.start()
        end = m.end()
        header_text = m.group(0)
        title = m.group("title").strip() if m.lastgroup and m.group("title") else header_text.strip()
        num_match = re.match(r"^\s*(\d+(?:\.\d+)*)", header_text)
        parent_num = num_match.group(1) if num_match else None
        boundaries.append((start, end, header_text, title, parent_num))

    boundaries.append((len(text), len(text), "", "", None))

    chunks = []
    for i in range(len(boundaries) - 1):
        header_line = boundaries[i][2]
        title = boundaries[i][3] or header_line
        parent_num = boundaries[i][4]
        content = text[boundaries[i][1]:boundaries[i+1][0]].strip()
        level = parent_num.count(".") + 1 if parent_num else 1
        chunks.append({
            "id": str(uuid.uuid4()),
            "level": level,
            "title": title,
            "parent_num": parent_num,
            "parent_id": None,  # fill below
            "content": content,
        })

    # assign parent_id by matching parent_num
    for idx, c in enumerate(chunks):
        if c["parent_num"]:
            parts = c["parent_num"].split(".")
            if len(parts) > 1:
                parent_num = ".".join(parts[:-1])
                # find nearest earlier chunk with that parent_num
                found = None
                for candidate in reversed(chunks[:idx]):
                    if candidate.get("parent_num") == parent_num:
                        found = candidate["id"]
                        break
                c["parent_id"] = found

    return chunks

### Embeddings & index 

In [8]:
def init_azure_embeddings():
    if not AZURE_API_KEY or not AZURE_ENDPOINT:
        raise RuntimeError("Azure API key and endpoint must be set in environment variables.")
    emb = AzureOpenAIEmbeddings(
        azure_deployment=EMBED_DEPLOY,
        openai_api_version=OPENAI_API_VERSION_EMB,
        azure_endpoint=AZURE_ENDPOINT,
        api_key=AZURE_API_KEY,
    )
    return emb

In [9]:
def build_embeddings_for_chunks(chunks: List[Dict], emb_model: AzureOpenAIEmbeddings, batch_size: int = 16) -> np.ndarray:
    texts = [(c.get("content") or c.get("title") or "") for c in chunks]
    # langchain style: embed_documents
    embs = []
    # we will in batches to avoid very long single call
    for i in range(0, len(texts), batch_size):
        batch = texts[i:i+batch_size]
        batch_emb = emb_model.embed_documents(batch)  # expected: list of vectors
        embs.extend(batch_emb)
    arr = np.array(embs).astype("float32")
    return arr

In [10]:
def build_faiss_index(embeddings: np.ndarray, out_dir: Path, index_name: str):
    out_dir.mkdir(parents=True, exist_ok=True)
    d = embeddings.shape[1]
    faiss.normalize_L2(embeddings)
    index = faiss.IndexFlatIP(d)
    index.add(embeddings)
    idx_path = out_dir / f"{index_name}.index"
    faiss.write_index(index, str(idx_path))
    return idx_path

In [11]:
def save_meta(chunks: List[Dict], out_dir: Path, index_name: str):
    out_dir.mkdir(parents=True, exist_ok=True)
    meta = {str(i): {
        "id": chunks[i]["id"],
        "title": chunks[i].get("title"),
        "level": chunks[i].get("level"),
        "parent_id": chunks[i].get("parent_id"),
        "content": chunks[i].get("content"),
    } for i in range(len(chunks))}
    path = out_dir / f"{index_name}_meta.json"
    path.write_text(json.dumps(meta, ensure_ascii=False, indent=2), encoding="utf-8")
    return path

### Hierarchical Retrever 

In [12]:
class HierarchicalRetriever:
    def __init__(self, index_path: Path, meta_path: Path, emb_model: AzureOpenAIEmbeddings):
        self.index = faiss.read_index(str(index_path))
        self.meta = json.loads(Path(meta_path).read_text(encoding="utf-8"))
        self.emb_model = emb_model

    def _embed_query(self, query: str):
        v = np.array(self.emb_model.embed_query(query), dtype="float32")  # list -> array
        faiss.normalize_L2(v.reshape(1, -1))
        return v.reshape(1, -1)

    def search_topk(self, query: str, k=5):
        v = self._embed_query(query)
        D, I = self.index.search(v, k)
        results = []
        for idx, score in zip(I[0], D[0]):
            if idx == -1:
                continue
            info = self.meta.get(str(idx))
            if info is None:
                # try numeric key
                info = self.meta.get(idx)
            results.append({"idx": idx, "score": float(score), **(info or {})})
        results = sorted(results, key=lambda x: -x["score"])
        return results

    def hierarchical_retrieve(self, query: str, k_header=3, k_section=3, k_clause=3):
        top = self.search_topk(query, k=max(k_header, k_section, k_clause))
        # headers = level==1
        headers = [r for r in top if r.get("level", 0) == 1][:k_header]
        if not headers:
            headers = top[:k_header]
        final_clauses = []
        for h in headers:
            q_section = f"{query} context: {h.get('title')}"
            sections = self.search_topk(q_section, k=k_section)
            if sections:
                top_section = sections[0]
                q_clause = f"{query} context: {h.get('title')} > {top_section.get('title')}"
                clauses = self.search_topk(q_clause, k=k_clause)
                final_clauses.extend(clauses)
        # dedupe by idx
        seen = set()
        deduped = []
        for c in sorted(final_clauses, key=lambda x: -x["score"]):
            if c["idx"] in seen:
                continue
            seen.add(c["idx"])
            deduped.append(c)
        return {"headers": headers, "clauses": deduped[:k_clause]}

### LLM Azure Chat 

In [13]:
def init_azure_chat():
    if not AZURE_API_KEY or not AZURE_ENDPOINT:
        raise RuntimeError("Azure API key and endpoint must be set in environment variables.")
    llm = AzureChatOpenAI(
        azure_deployment=CHAT_DEPLOY,
        openai_api_version=OPENAI_API_VERSION_CHAT,
        azure_endpoint=AZURE_ENDPOINT,
        api_key=AZURE_API_KEY,
        temperature=0.2,
        max_retries=2,
    )
    return llm

In [14]:
def answer_query_with_context(llm: AzureChatOpenAI, context: str, question: str) -> str:
    system_prompt = "You are an expert in AUTOSAR and Automotive SPICE. Answer concisely and cite which section (title) of the context you're using when relevant."
    user_prompt = f"Context:\n{context}\n\nQuestion: {question}\n\nAnswer:"
    # prefer invoke() style
    try:
        resp = llm.invoke([{"role": "system", "content": system_prompt}, {"role":"user", "content": user_prompt}])
        # different langchain versions may return differently
        if hasattr(resp, "content"):
            return resp.content
        if isinstance(resp, dict) and "content" in resp:
            return resp["content"]
        if isinstance(resp, list) and resp:
            return str(resp[0])
        # fallback
        return str(resp)
    except Exception as e:
        # fallback to call style if invoke not available
        try:
            resp = llm([{"role":"system","content": system_prompt}, {"role":"user","content": user_prompt}])
            # resp may be ChatResult-like
            if hasattr(resp, "content"):
                return resp.content
            if isinstance(resp, list) and resp:
                return str(resp[0])
            return str(resp)
        except Exception as e2:
            return f"LLM Error: {e} | fallback error: {e2}"

### main flow 

In [17]:
def to_serializable(obj):
    if isinstance(obj, (np.int32, np.int64)):
        return int(obj)
    if isinstance(obj, (np.float32, np.float64)):
        return float(obj)
    if isinstance(obj, (np.ndarray,)):
        return obj.tolist()
    return str(obj)

In [18]:
def main():
    TXT_DIR.mkdir(parents=True, exist_ok=True)
    MD_DIR.mkdir(parents=True, exist_ok=True)
    INDEX_DIR.mkdir(parents=True, exist_ok=True)

    # 1) Extract text from PDFs -> save txt
    print("Extracting text from PDFs...")
    txt_paths = []
    for pdf in PDF_FILES:
        pdf = Path(pdf)
        if not pdf.exists():
            print(f"Warning: PDF not found: {pdf}")
            continue
        text = extract_text_from_pdf(str(pdf))
        out_txt = TXT_DIR / (pdf.stem + ".txt")
        out_txt.write_text(text, encoding="utf-8")
        txt_paths.append(out_txt)
        print(f"Saved text: {out_txt}")

    if not txt_paths:
        print("No text files found. Exiting.")
        return

    # 2) Parse each txt into hierarchical chunks and combine
    all_chunks = []
    print("Parsing text files into hierarchical chunks...")
    for txt in txt_paths:
        raw = txt.read_text(encoding="utf-8")
        chunks = parse_numbered_headings(raw)
        # tag source
        for c in chunks:
            c["source"] = txt.name
        all_chunks.extend(chunks)
    print(f"Total chunks: {len(all_chunks)}")

    # 3) init embeddings & generate embeddings
    print("Initializing Azure embeddings...")
    emb_model = init_azure_embeddings()
    print("Building embeddings (this may take a while)...")
    embeddings = build_embeddings_for_chunks(all_chunks, emb_model, batch_size=8)
    print("Embeddings shape:", embeddings.shape)

    # 4) Build FAISS index + save meta
    index_name = "hirag_combined_clauses"
    print("Building FAISS index...")
    idx_path = build_faiss_index(embeddings, INDEX_DIR, index_name)
    meta_path = save_meta(all_chunks, INDEX_DIR, index_name)
    print(f"Index saved: {idx_path}, meta saved: {meta_path}")

    # 5) initialize retriever & llm
    retriever = HierarchicalRetriever(idx_path, meta_path, emb_model)
    llm = init_azure_chat()

    # 6) process queries and save answers
    out_f = OUTPUT_ANS_FILE.open("w", encoding="utf-8")
    print("Answering AUTOSAR queries...")
    for q in AUTOSAR_QUERIES:
        res = retriever.hierarchical_retrieve(q, k_header=2, k_section=2, k_clause=4)
        retrieved_texts = []
        for clause in res["clauses"]:
            title = clause.get("title") or ""
            content = clause.get("content") or ""
            retrieved_texts.append(f"{title}\n{content}")
        context = "\n\n".join(retrieved_texts) or "No context retrieved from documents."
        ans = answer_query_with_context(llm, context, q)
        record = {"query": q, "answer": ans, "context_snippets": res["clauses"]}
        out_f.write(json.dumps(record, ensure_ascii=False, default=to_serializable) + "\n")

        print("Answered:", q)

    print("Answering SPICE queries...")
    for q in SPICE_QUERIES:
        res = retriever.hierarchical_retrieve(q, k_header=2, k_section=2, k_clause=4)
        retrieved_texts = []
        for clause in res["clauses"]:
            title = clause.get("title") or ""
            content = clause.get("content") or ""
            retrieved_texts.append(f"{title}\n{content}")
        context = "\n\n".join(retrieved_texts) or "No context retrieved from documents."
        ans = answer_query_with_context(llm, context, q)
        record = {"query": q, "answer": ans, "context_snippets": res["clauses"]}
        out_f.write(json.dumps(record, ensure_ascii=False, default=to_serializable) + "\n")
        print("Answered:", q)

    out_f.close()
    print(f"All answers saved to {OUTPUT_ANS_FILE}")

In [19]:
if __name__ == "__main__":
    main()

Extracting text from PDFs...
Saved text: ..\data\txt\AUTOSAR_SWS_ECUStateManager.txt
Saved text: ..\data\txt\Automotive_SPICE_PAM_31_EN.txt
Parsing text files into hierarchical chunks...
Total chunks: 682
Initializing Azure embeddings...
Building embeddings (this may take a while)...
Embeddings shape: (682, 3072)
Building FAISS index...
Index saved: ..\outputs\indices\hirag_combined_clauses.index, meta saved: ..\outputs\indices\hirag_combined_clauses_meta.json
Answering AUTOSAR queries...
Answered: What are the activities executed before EcuM_Init in the Startup phase?
Answered: Which ECU Manager responsibilities remain active during SLEEP phases when the BSW Mode Manager is not operational?
Answered: How does the ECU Manager ensure consistency of configuration parameters across pre-compile, link-time, and post-build phases?
Answered: What steps must be taken if a wakeup event is detected during the shutdown process?
Answered: How does the ECU Manager handle a situation where the RAM i