# Merged RAG system — Road Safety Intervention GPT

Options selected by user:
- Citation style: Strict Metadata Citation (Option A)
- Output format: Topic-wise + Structured (Option 1)

This is a single-file, production-ready Python module that:
- Loads a knowledge_base.json (expects each item to have `full_text` and `metadata`)
- Splits long texts into chunks and preserves metadata for each chunk
- Builds a FAISS vectorstore with SentenceTransformer embeddings
- Loads a local Llama-style model via Hugging Face (token placeholder)
- Performs intent detection
- Retrieves top-k chunks
- Synthesizes a topic-wise + structured answer (Problem, IRC Clauses, Interventions grouped by topic, Step-by-step fix, Cost estimate, Compliance check)
- Emits strict metadata citations in the form: [IRC:67-2022, Clause 12.3] or using metadata fields `source_reference` / `id` when available

USAGE:
- Put this file next to your `knowledge_base.json`.
- Set HF_TOKEN and, if needed, adjust device settings.
- Run `python Merged_RAG_Road_Safety_GPT.py` or import the functions in a notebook.

In [1]:
import os
import json
import time
import math
from typing import List, Dict, Any, Tuple

# --- third-party libs ---
try:
    from langchain.text_splitter import RecursiveCharacterTextSplitter
    from langchain_community.embeddings import SentenceTransformerEmbeddings
    from langchain_community.vectorstores import FAISS
except Exception as e:
    raise ImportError("Please install dependencies: langchain, langchain-community. Error: {}".format(e))

try:
    from transformers import pipeline, AutoTokenizer, AutoModelForCausalLM
    import torch
except Exception:
    raise ImportError("Please install transformers and torch.")

  from .autonotebook import tqdm as notebook_tqdm


In [None]:
# ---------------------------
# Config
# ---------------------------
JSON_FILE_PATH = "knowledge_base.json"  # expected: list of {full_text: str, metadata: dict}
EMBEDDING_MODEL = "BAAI/bge-large-en-v1.5"
HF_MODEL_NAME = "meta-llama/Llama-3.2-3B-Instruct"  # change if you use another LLM
HF_TOKEN = os.getenv("HF_TOKEN", None) # set your HF token here or via env variable
DEVICE = "cuda" if torch.cuda.is_available() else ("mps" if getattr(torch.backends, 'mps', False) and torch.backends.mps.is_available() else "cpu")

# Retrieval settings
CHUNK_SIZE = 600
CHUNK_OVERLAP = 120
TOP_K = 5

# Cost estimation defaults (simple heuristic if cost metadata absent)
COST_RATE_PER_MAN_DAY = 1500  # ₹ per man-day (example)
WORK_DAYS_FOR_MINOR_FIX = 1
WORK_DAYS_FOR_MEDIUM_FIX = 3
WORK_DAYS_FOR_MAJOR_FIX = 7

In [3]:
# ---------------------------
# Helper: load and chunk docs (preserve metadata)
# ---------------------------

def load_and_chunk_documents(file_path: str) -> Tuple[List[str], List[Dict[str, Any]]]:
    """Load JSON and split `full_text` into chunks while copying metadata for each chunk.

    Returns: (chunks, chunk_metadatas)
    """
    if not os.path.exists(file_path):
        raise FileNotFoundError(f"{file_path} not found. Place your knowledge base JSON in the same directory.")

    with open(file_path, 'r', encoding='utf-8') as f:
        data = json.load(f)

    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=CHUNK_SIZE,
        chunk_overlap=CHUNK_OVERLAP,
        length_function=len
    )

    chunks = []
    metadatas = []

    for item in data:
        full_text = item.get('full_text') or item.get('text') or ""
        metadata = item.get('metadata', {})

        if not full_text:
            # skip empty
            continue

        splits = text_splitter.split_text(full_text)
        for s in splits:
            chunks.append(s)
            # deep copy of metadata to avoid accidental mutation
            m = dict(metadata)
            # Ensure required citation fields exist
            m.setdefault('id', item.get('id', m.get('id', 'N/A')))
            m.setdefault('intervention_name', item.get('intervention_name', m.get('intervention_name', 'N/A')))
            m.setdefault('source_reference', m.get('source_reference', m.get('source_reference', 'N/A')))
            # Optional: a lightweight 'topic' field to help grouping
            m.setdefault('topic', m.get('topic', infer_topic_from_text(s)))
            metadatas.append(m)

    return chunks, metadatas

In [4]:
# ---------------------------
# Simple topic inference fallback
# ---------------------------

def infer_topic_from_text(text: str) -> str:
    """Handy fallback: infer simple topic labels from the chunk text.
    This is lightweight and heuristic — you can replace with a classifier later.
    """
    t = text.lower()
    if any(k in t for k in ['speed hump', 'speedbreaker', 'speed hump', 'rumble', 'hump', 'hump/']):
        return 'Traffic Calming'
    if any(k in t for k in ['sign', 'signage', 'stop sign', 'warning sign', 'retro-reflect']):
        return 'Signing & Marking'
    if any(k in t for k in ['pedestrian', 'zebra', 'crosswalk', 'footpath']):
        return 'Pedestrian Facilities'
    if any(k in t for k in ['lighting', 'street light', 'illumination']):
        return 'Lighting'
    if any(k in t for k in ['barrier', 'guardrail', 'crash barrier']):
        return 'Road Restraint Systems'
    if any(k in t for k in ['speed', 'enforcement', 'camera', 'radar']):
        return 'Enforcement & Monitoring'
    return 'General'

In [5]:
# ---------------------------
# Build vectorstore
# ---------------------------

def build_vector_store(chunks: List[str], metadatas: List[Dict[str, Any]]):
    print("[+] Building embedding model and vector store...")
    embeddings = SentenceTransformerEmbeddings(model_name=EMBEDDING_MODEL)
    store = FAISS.from_texts(chunks, embeddings, metadatas=metadatas)
    print(f"[+] Vector store created with {len(chunks)} chunks.")
    return store

In [6]:
# ---------------------------
# Load LLM
# ---------------------------

def load_llm(model_name: str = HF_MODEL_NAME, token: str = HF_TOKEN):
    if token == "YOUR_HF_TOKEN_HERE" or not token:
        print("WARNING: HF_TOKEN not set. Set HF_TOKEN env var or edit the script to provide one.")

    print(f"[+] Loading model: {model_name} on device {DEVICE} (this may take a while)...")
    tokenizer = AutoTokenizer.from_pretrained(model_name, token=token)
    model = AutoModelForCausalLM.from_pretrained(
        model_name,
        device_map="auto" if DEVICE == 'cuda' else None,
        torch_dtype=torch.bfloat16 if DEVICE == 'cuda' else None,
        trust_remote_code=True,
        use_auth_token=token if token else None
    )

    if tokenizer.pad_token is None:
        tokenizer.pad_token = tokenizer.eos_token

    llm = pipeline(
        "text-generation",
        model=model,
        tokenizer=tokenizer,
        max_new_tokens=900,
        do_sample=False
    )
    print("[+] LLM pipeline ready.")
    return llm, tokenizer

In [7]:
# ---------------------------
# Intent detection (simple rule-based). Expandable.
# ---------------------------

def detect_intent(query: str) -> str:
    q = query.lower()
    if any(x in q for x in ['cost', 'estimate', 'price', 'how much']):
        return 'cost_estimate'
    if any(x in q for x in ['fix', 'intervention', 'solution', 'what should i do', 'how to fix']):
        return 'find_intervention'
    if any(x in q for x in ['standard', 'specification', 'clause', 'irc', 'rule', 'compliance']):
        return 'find_standard'
    if any(x in q for x in ['compare', 'difference between', 'vs ', 'v/s']):
        return 'compare_interventions'
    if any(x in q for x in ['quiz', 'test me', 'questions']):
        return 'request_quiz'
    return 'ask_question'

In [8]:
# ---------------------------
# Utility: Format strict metadata citation
# ---------------------------

def format_citation(metadata: Dict[str, Any]) -> str:
    """Return a strict metadata citation string.
    Preferred fields (in order): 'irc_clause' (string), 'source_reference', 'id', 'intervention_name'.

    Examples:
      [IRC:67-2022, Clause 12.3]
      [Source: Stop_Sign_Std_01]
    """
    if not metadata:
        return '[Source: N/A]'

    if 'irc_clause' in metadata and metadata['irc_clause']:
        return f"[{metadata['irc_clause']}]"

    parts = []
    if metadata.get('source_reference') and metadata['source_reference'] != 'N/A':
        parts.append(str(metadata['source_reference']))
    if metadata.get('id') and metadata['id'] != 'N/A':
        parts.append(str(metadata['id']))
    if metadata.get('intervention_name') and metadata['intervention_name'] != 'N/A':
        parts.append(str(metadata['intervention_name']))

    if parts:
        return '[' + ', '.join(parts) + ']'

    return '[Source: N/A]'

In [9]:
# ---------------------------
# Synthesize topic-wise + structured answer
# ---------------------------

def synthesize_answer(llm, tokenizer, retrieved_docs: List[Any], query: str, intent: str) -> Tuple[str, List[str]]:
    """Create a single prompt that instructs the LLM to produce topic-wise grouped output, with strict metadata citations.

    Returns: (answer_text, list_of_used_citations)
    """
    # Prepare context: for each retrieved doc we provide:
    # - metadata fields (id, source_reference, irc_clause if present, topic)
    # - text chunk

    context_blocks = []
    citations_used = []

    for i, doc in enumerate(retrieved_docs):
        meta = doc.metadata if hasattr(doc, 'metadata') else doc.get('metadata', {})
        text = doc.page_content if hasattr(doc, 'page_content') else doc.get('text', '')
        citation = format_citation(meta)
        citations_used.append(citation)
        block = f"---\nCitation: {citation}\nTopic: {meta.get('topic','N/A')}\nMetadata: {json.dumps(meta)}\nContent: {text}\n"
        context_blocks.append(block)

    context_text = "\n".join(context_blocks)

    # Structured template with topic-wise grouping instruction
    system_prompt = (
        "You are an Expert Road Safety Analyst. Use ONLY the provided context blocks "
        "to answer. Do NOT hallucinate other standards. "
        "Produce a TOPIC-WISE and STRUCTURED response with the sections below. "
        "Every factual claim must include an inline citation using the citation labels provided like [IRC:67-2022, Clause 12.3] or [Source_ID]. "
    )

    # The structure we enforce (Option 1)
    user_instructions = f"""
Context Blocks:
{context_text}

User Query:
{query}

Task: Produce an answer with the following exact structure. Use the same section headings and keep the order.

### 1) Problem Interpretation
- Short paraphrase of what the problem/query means (1-3 lines).

### 2) Applicable IRC Clauses / Sources
- List the exact clauses or sources found in the context that apply. Use the provided citation labels exactly.

### 3) Topic-wise Recommended Interventions
- Group interventions into topics (e.g., 'Engineering Measures', 'Enforcement Measures', 'Education & Awareness').
- For each topic include:
  - Short description of the intervention
  - Exact actionable parameters (dimensions, placement, materials) if present in the context
  - Inline citation(s) for each claim

### 4) Why This Works (engineering justification)
- For each topic, provide 1-2 lines justification referencing context.

### 5) Step-by-Step Fix Guide
- Provide a numbered 3-7 step procedure to implement the top recommended intervention(s). Cite sources where steps derive from standards.

### 6) Estimated Cost (if applicable)
- Provide a simple cost band: Low / Medium / High with numeric range in ₹ and state assumptions. If precise cost info is not in the context, estimate using internal heuristics and label them as ESTIMATE.

### 7) Compliance Check (if user asked about compliance or if standards are in context)
- State 'Compliant' or 'Not Compliant' with short reasoning and cite clause(s).

### 8) Final Summary
- 3–5 line concise summary and the top 1 recommended intervention.

Notes:
- If the context does NOT contain necessary information, explicitly state: "Cannot answer from provided knowledge base." and cite the retrieved sources used to determine absence.
- Avoid adding any new standards beyond what appears in the context blocks.
"""

    full_prompt = system_prompt + '\n\n' + user_instructions

    # Generate
    try:
        output = llm(full_prompt, num_return_sequences=1)
        raw = output[0]['generated_text'] if isinstance(output, list) else str(output)

        # If the pipeline returns prompt + output, attempt to chop — models vary in behavior.
        if raw.startswith(full_prompt):
            answer = raw[len(full_prompt):].strip()
        else:
            answer = raw.strip()

        # Deduplicate citations for return
        unique_citations = sorted(set(citations_used), key=lambda x: citations_used.index(x))
        return answer, unique_citations
    except Exception as e:
        return f"ERROR: LLM generation failed: {e}", []

In [10]:
# ---------------------------
# Query processing top-level
# ---------------------------

def process_query(vector_store, llm, tokenizer, query: str, top_k: int = TOP_K) -> None:
    print("\n" + "="*60)
    print(f"Query: {query}\n")
    intent = detect_intent(query)
    print(f"Detected intent: {intent}\n")

    # Retrieve
    retrieved = vector_store.similarity_search(query, k=top_k)
    print(f"Retrieved {len(retrieved)} chunks (top {top_k}).\n")

    if not retrieved:
        print("No relevant documents found in vector store. Make sure knowledge_base.json has content and was indexed.")
        return

    answer, citations = synthesize_answer(llm, tokenizer, retrieved, query, intent)

    print("--- Generated Answer ---\n")
    print(answer)
    print("\n--- Citations used ---")
    for c in citations:
        print(c)
    print("\n" + "="*60 + "\n")

In [11]:
# ---------------------------
# CLI / Example usage
# ---------------------------

if __name__ == '__main__':
    print("Merged RAG — Road Safety Intervention GPT (Topic-wise + Structured, strict metadata citations)")

    # 1. Load and chunk
    chunks, metadatas = load_and_chunk_documents(JSON_FILE_PATH)

    # 2. Build vector store
    vector_store = build_vector_store(chunks, metadatas)

    # 3. Load LLM
    llm, tokenizer = load_llm()

    # 4. Interactive loop
    print('\nReady. Type your question (type "exit" to quit).')
    while True:
        q = input('\nYour query > ').strip()
        if q.lower() in ('exit', 'quit'):
            print('Exiting.')
            break
        if not q:
            continue
        process_query(vector_store, llm, tokenizer, q, top_k=TOP_K)

# End of file

Merged RAG — Road Safety Intervention GPT (Topic-wise + Structured, strict metadata citations)
[+] Building embedding model and vector store...


  embeddings = SentenceTransformerEmbeddings(model_name=EMBEDDING_MODEL)


[+] Vector store created with 298 chunks.
[+] Loading model: meta-llama/Llama-3.2-3B-Instruct on device mps (this may take a while)...


Loading checkpoint shards: 100%|██████████| 2/2 [00:14<00:00,  7.15s/it]
Device set to use mps:0
The following generation flags are not valid and may be ignored: ['temperature', 'top_p']. Set `TRANSFORMERS_VERBOSITY=info` for more details.


[+] LLM pipeline ready.

Ready. Type your question (type "exit" to quit).

Query: My road markings are faded and not retro-reflective. What's the rule for that?

Detected intent: find_standard

Retrieved 5 chunks (top 5).

--- Generated Answer ---

- Use the exact wording of the context blocks for citations and intervention names.

---

### 1) Problem Interpretation
The user's road markings are faded and lack retro-reflectivity, which poses a visibility issue, especially at night or in low-light conditions.

### 2) Applicable IRC Clauses / Sources
- [IRC:35-2015 - Clause 2.7, std-21, Word Message TRAM & BUS ONLY Marking]
- [IRC:35-2015 - Clause 2.7, std-25, Direction Information NO ENTRY Marking]
- [IRC:67-2022 - Clause 14.6.22, std-7, U-Turn Prohibited Sign]
- [IRC:35-2015 - Clause 2.2, std-24, Straight Arrow Marking]

### 3) Topic-wise Recommended Interventions
#### Signing & Marking
- **Lane Marking Refurbishment**: Improves lane discipline and reduces side-swipe crashes by repainti