#**Project Setup and Environment**


## Unzipping Project Files

This cell extracts the contents of the uploaded zip file `swirl-rag-pipeline-candidate-test.zip` into the current working directory.  

This is the initial setup step, which makes all necessary code files, data, and resources available for the subsequent pipeline execution.

After successfully unzipping, the notebook will have access to all files required to run the retrieval-augmented generation pipeline.



In [None]:
!unzip swirl-rag-pipeline-candidate-test.zip

Archive:  swirl-rag-pipeline-candidate-test.zip
   creating: rag/
  inflating: rag/.DS_Store           
  inflating: __MACOSX/rag/._.DS_Store  
  inflating: rag/test.md             
  inflating: __MACOSX/rag/._test.md  
   creating: rag/data/
  inflating: rag/data/byd_seal_external.json  
  inflating: __MACOSX/rag/data/._byd_seal_external.json  
  inflating: rag/data/byd_seal_facts.md  
  inflating: __MACOSX/rag/data/._byd_seal_facts.md  


## Installing Required Libraries

This cell installs the essential Python packages needed for the project:

- **sentence-transformers:** For generating semantic embeddings of text.
- **faiss-cpu:** Facebook's library for efficient similarity search, used to quickly find relevant document chunks.
- **transformers:** Hugging Face library for loading and running pre-trained language models like GPT-Neo.
- **accelerate:** Utility to improve the efficiency and ease of running large models on different hardware setups.

These packages enable the retrieval-augmented generation pipeline to encode, retrieve, and generate text effectively.


In [None]:
!pip install sentence-transformers faiss-cpu transformers accelerate


Collecting faiss-cpu
  Downloading faiss_cpu-1.12.0-cp312-cp312-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl.metadata (5.1 kB)
Downloading faiss_cpu-1.12.0-cp312-cp312-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl (31.4 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m31.4/31.4 MB[0m [31m66.3 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: faiss-cpu
Successfully installed faiss-cpu-1.12.0


In [None]:
from sentence_transformers import SentenceTransformer
import faiss
import numpy as np
from transformers import AutoTokenizer, AutoModelForCausalLM
import torch


 # **Data Preparation**

## Convert Markdown Facts to JSON Format

This cell reads the BYD Seal facts stored in a markdown file (`byd_seal_facts.md`), extracts individual fact lines, and converts them into a structured JSON format.

**Process details:**
- Reads the markdown file line by line.
- Selects lines that represent facts based on specific patterns (lines starting with bullet points or containing bold text).
- Cleans up formatting such as removing markdown bold syntax (`**`) and bullet characters.
- Tags facts containing sensitive keywords like *warranty*, *price*, or *AED*.
- Creates a JSON array with fact chunks, each containing an ID, the cleaned fact text, and sensitivity flag.
- Saves the structured data as `byd_seal_facts.json` for use in retrieval tasks.

This transformation prepares raw textual data into a machine-readable format needed for embedding and similarity search steps later.


In [None]:
import re
import json

md_path = "rag/data/byd_seal_facts.md"
json_path = "rag/data/byd_seal_facts.json"

chunks = []
with open(md_path, encoding='utf-8') as f:
    text = f.read()

lines = text.split('\n')

for i, line in enumerate(lines):
    line = line.strip()
    if (line.startswith('*') and len(line) > 2) or (':' in line and '**' in line):
        fact = re.sub(r'\*\*([^*]+)\*\*', r'\1', line)
        fact = re.sub(r'^(\*+|\-+)\s*', '', fact)
        fact = fact.strip()
        if fact:
            is_sensitive = any(word in fact.lower() for word in ['warranty', 'price', 'aed'])
            chunks.append({
                "doc_id": "BYDSEAL",
                "chunk_id": f"c{i}",
                "text": fact,
                "is_sensitive": is_sensitive
            })

with open(json_path, "w", encoding='utf-8') as f:
    json.dump([{"id": "BYDSEAL", "chunks": chunks}], f, ensure_ascii=False, indent=2)

print(f"Converted {len(chunks)} facts from markdown to JSON at {json_path}")


Converted 118 facts from markdown to JSON at rag/data/byd_seal_facts.json


## Processing External BYD Seal Transcript Data

This cell loads external data from a JSON file (`byd_seal_external.json`) containing video transcripts related to the BYD Seal.

**What this code does:**

- Reads the JSON data which contains video IDs and their transcript text.
- Splits the transcript content into smaller chunks by splitting sentences at periods (`.`).
- Strips whitespace and discards empty chunks.
- Marks all chunks as non-sensitive (you can modify sensitivity based on keywords if required).
- Constructs a list of processed documents, each with an ID and its associated list of text chunks.
- Saves the processed documents and chunks to a new JSON file (`byd_seal_external_processed.json`) for downstream embedding and retrieval use.

---

This step prepares unstructured transcript text into manageable chunks for semantic embedding and similarity search in the retrieval-augmented generation pipeline.


In [None]:
import json

def load_json(filepath, source_name):
    with open(filepath, 'r', encoding='utf-8') as f:
        data = json.load(f)
    chunks = []
    for doc in data:
        for chunk in doc.get('chunks', []):
            chunks.append({
                'text': chunk['text'],
                'source': source_name,
                'doc_id': doc.get('id', 'unknown'),
                'chunk_id': chunk['chunk_id'],
                'is_sensitive': chunk.get('is_sensitive', False)
            })
    return chunks

facts_chunks = load_json("rag/data/byd_seal_facts.json", "facts")
external_chunks = load_json("rag/data/byd_seal_external_processed.json", "external")

all_chunks = facts_chunks + external_chunks
print(f"Loaded {len(facts_chunks)} fact chunks and {len(external_chunks)} external chunks")


Loaded 118 fact chunks and 32418 external chunks


# Embedding and Indexing

## Generating and Indexing Text Embeddings

This cell performs the following key steps:

- **Initialize the embedding model:** Loads the `all-MiniLM-L6-v2` SentenceTransformer, a lightweight, state-of-the-art model for converting text chunks into dense vector embeddings.
- **Extract text chunks:** Prepares a list of all text chunks from the combined dataset (`all_chunks`).
- **Generate embeddings:** Encodes all text chunks into numerical vector representations using the embedding model. This transforms text into a format suitable for similarity search.
- **Build FAISS index:** Creates a FAISS index using the `IndexFlatL2` method configured to the embedding dimensionality. FAISS provides efficient similarity search for high-dimensional vectors.
- **Add embeddings to index:** Stores all generated embeddings in the FAISS index for fast nearest neighbor retrieval.
- **Output:** Prints the shape of the embeddings array and confirms the number of vectors indexed.

---

This setup enables rapid retrieval of the most relevant text chunks during the question-answering step by semantic similarity.


In [None]:
embed_model = SentenceTransformer('all-MiniLM-L6-v2')

texts = [chunk['text'] for chunk in all_chunks]
print("Generating embeddings...")
embeddings = embed_model.encode(texts, convert_to_numpy=True)
print(f"Embeddings shape: {embeddings.shape}")

dim = embeddings.shape[1]
index = faiss.IndexFlatL2(dim)
index.add(embeddings)
print(f"FAISS index contains {index.ntotal} vectors")


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


modules.json:   0%|          | 0.00/349 [00:00<?, ?B/s]

config_sentence_transformers.json:   0%|          | 0.00/116 [00:00<?, ?B/s]

README.md: 0.00B [00:00, ?B/s]

sentence_bert_config.json:   0%|          | 0.00/53.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/612 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/90.9M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/350 [00:00<?, ?B/s]

vocab.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/190 [00:00<?, ?B/s]

Generating embeddings...
Embeddings shape: (32536, 384)
FAISS index contains 32536 vectors


## Loading Data, Generating Embeddings, and Building FAISS Index

This cell performs several essential steps to prepare the data for retrieval:

1. **`load_json(filepath, source_name)` function:**
   - Loads JSON documents containing text chunks from a given filepath.
   - Extracts each chunk with its metadata and tags it with the source name (e.g., "facts" or "external").
   - Returns a flat list of chunks with keys: `text`, `source`, `doc_id`, `chunk_id`, and `is_sensitive`.

2. **Loading chunks from data files:**
   - Loads fact-based chunks from `byd_seal_facts.json`.
   - Loads processed external chunks from `byd_seal_external_processed.json`.
   - Combines both sets into `all_chunks`.

3. **Generating embeddings for all chunks:**
   - Extracts the text from each chunk to a list.
   - Uses the SentenceTransformer embedding model to convert all texts into dense vector embeddings.
   - Prints the shape of the resulting embeddings tensor.

4. **Creating a FAISS index for fast similarity search:**
   - Initializes a FAISS index with L2 distance for the embedding dimension.
   - Adds all embeddings to the index for later efficient retrieval.
   - Outputs the total number of vectors stored.

---

These steps set up the retrieval infrastructure allowing quick and accurate similarity search over the combined knowledge base.


In [None]:
import json
import faiss
import numpy as np

def load_json(filepath, source_name):
    with open(filepath, 'r', encoding='utf-8') as f:
        data = json.load(f)
    chunks = []
    for doc in data:
        for chunk in doc.get('chunks', []):
            chunks.append({
                'text': chunk['text'],
                'source': source_name,
                'doc_id': doc.get('id', 'unknown'),
                'chunk_id': chunk['chunk_id'],
                'is_sensitive': chunk.get('is_sensitive', False)
            })
    return chunks

facts_chunks = load_json("rag/data/byd_seal_facts.json", "facts")
external_chunks = load_json("rag/data/byd_seal_external_processed.json", "external")

all_chunks = facts_chunks + external_chunks
print(f"Loaded {len(facts_chunks)} fact chunks and {len(external_chunks)} external chunks")

texts = [chunk['text'] for chunk in all_chunks]
print("Generating embeddings...")
embeddings = embed_model.encode(texts, convert_to_numpy=True)
print(f"Embeddings shape: {embeddings.shape}")

dim = embeddings.shape[1]
index = faiss.IndexFlatL2(dim)
index.add(embeddings)
print(f"FAISS index contains {index.ntotal} vectors")

Loaded 118 fact chunks and 32418 external chunks
Generating embeddings...
Embeddings shape: (32536, 384)
FAISS index contains 32536 vectors


# **Language Model Integration**


## Loading and Using the GPT-Neo Language Model

This cell sets up the generative language model used for answering user queries in the RAG pipeline.

**Details:**

- Imports necessary modules from the `transformers` and `torch` libraries.
- Specifies the model name: `"EleutherAI/gpt-neo-1.3B"`—a 1.3 billion parameter causal language model.
- Loads the tokenizer and model using Hugging Face’s `AutoTokenizer` and `AutoModelForCausalLM`.
- Uses `device_map="auto"` to automatically place model layers on available devices (CPU/GPU).
- Uses `torch_dtype=torch.float16` to reduce memory usage with half precision.
- Employs `offload_folder="offload_dir"` to offload model data to disk if GPU memory is limited.
- Defines the `generate_answer` function:
  - Takes a text prompt as input.
  - Tokenizes and moves input tensors to the model’s device.
  - Generates a response with a maximum of 150 new tokens without sampling (greedy decoding).
  - Decodes and returns the generated text, skipping special tokens.

---

This setup enables the RAG pipeline to generate natural language answers based on retrieved context.


In [None]:
# model_name = "TheBloke/vicuna-7b-1.1-HF"

# tokenizer = AutoTokenizer.from_pretrained(model_name)
# model = AutoModelForCausalLM.from_pretrained(
#     model_name,
#     device_map="auto",
#     torch_dtype=torch.float16,
#     offload_folder="offload_dir"  # <-- Add this line here
# )
from transformers import AutoTokenizer, AutoModelForCausalLM
import torch

model_name = "EleutherAI/gpt-neo-1.3B"  # or "EleutherAI/gpt-neo-1.3B" for smaller model

tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForCausalLM.from_pretrained(
    model_name,
    device_map="auto",
    torch_dtype=torch.float16,
    offload_folder="offload_dir"  # Use if memory constrained
)

def generate_answer(prompt):
    inputs = tokenizer(prompt, return_tensors="pt").to(model.device)
    outputs = model.generate(**inputs, max_new_tokens=150, do_sample=False)
    return tokenizer.decode(outputs[0], skip_special_tokens=True)


tokenizer_config.json:   0%|          | 0.00/200 [00:00<?, ?B/s]

config.json: 0.00B [00:00, ?B/s]

vocab.json: 0.00B [00:00, ?B/s]

merges.txt: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/90.0 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/5.31G [00:00<?, ?B/s]

In [None]:
!pip install safetensors




## Local Answer Generation Function

This function `generate_answer_local` generates a natural language answer to a user question based strictly on provided context.

**How it works:**

- Constructs a prompt instructing the model to answer **only** using the given context, ensuring no hallucination.
- The prompt includes:
  - A brief instruction as a helpful assistant.
  - The provided context text.
  - The user's question.
- Tokenizes the prompt and moves it to the model's device (CPU/GPU).
- Uses the language model to generate a response with up to 150 new tokens, using greedy decoding (no sampling).
- Decodes the generated tokens into readable text.
- Extracts and returns the generated answer part after the "Answer:" marker, trimming excess whitespace.

---

This method enables controlled, context-grounded answer generation in the retrieval-augmented generation pipeline.


In [None]:
def generate_answer_local(question, context):
    prompt = (f"You are a helpful assistant. Answer the question using ONLY the context below.\n"
              f"Context:\n{context}\n\nQuestion: {question}\nAnswer:")
    inputs = tokenizer(prompt, return_tensors="pt").to(model.device)
    outputs = model.generate(
        **inputs,
        max_new_tokens=150,
        do_sample=False,
        pad_token_id=tokenizer.eos_token_id
    )
    answer = tokenizer.decode(outputs[0], skip_special_tokens=True)
    return answer.split("Answer:")[-1].strip()

# **Retrieval and Answer Generation**

## Retrieval and Sensitivity Check Functions

This cell defines two important functions used in the RAG pipeline:

1. **`retrieve_top_chunks(query, k=5)`**
   - Takes a user query string and retrieves the top `k` most relevant text chunks.
   - Steps:
     - Encodes the query into an embedding using the SentenceTransformer model.
     - Performs a nearest neighbor search on the FAISS index to find the closest text chunks.
     - Returns the corresponding chunks from `all_chunks` for use in answer generation.
   - Default `k=5` means it retrieves the top 5 relevant chunks.

2. **`is_sensitive_question(question)`**
   - Checks if a user’s question contains keywords indicating sensitivity.
   - Currently looks for words like `'price'`, `'warranty'`, and `'availability'`.
   - Returns `True` if any of these keywords are found (case-insensitive), otherwise `False`.
   - This helps decide whether to flag or handle the question differently based on content sensitivity.

---

These functions enable efficient retrieval of relevant context and apply basic filtering for sensitive questions in the pipeline.


In [None]:
def retrieve_top_chunks(query, k=5):
    query_emb = embed_model.encode([query], convert_to_numpy=True)
    distances, indices = index.search(query_emb, k)
    return [all_chunks[i] for i in indices[0]]

def is_sensitive_question(question):
    keywords = ['price', 'warranty', 'availability']
    return any(word in question.lower() for word in keywords)


# Generating Answers with Context and Guardrails

This cell defines two functions central to generating user answers while handling sensitive content appropriately.

---

## `generate_answer_local(question, context)`

- Constructs a prompt instructing the language model to answer using *only* the provided context.
- Tokenizes and moves the input prompt to the model device.
- Generates a response with a maximum of 150 new tokens using greedy decoding.
- Extracts and returns the generated answer text after the "Answer:" label.
- Ensures the model does not hallucinate beyond the given context.

---

## `answer_with_guardrails(question)`

- Retrieves the top 10 relevant chunks based on the user question.
- Separates chunks into `facts` (verified data) and `external` (broader sources).
- Checks if the question is sensitive (e.g., about price, warranty).
  - If sensitive and facts chunks exist, returns only factual information with citation.
  - If sensitive but no verified facts, refuses to answer.
- For non-sensitive questions:
  - Prefers to answer from `facts` chunks if available.
  - Otherwise, uses non-sensitive external chunks.
  - If no safe external chunks exist, refuses to answer.
- Generates a final natural language answer using the chosen chunk's text as context.
- Returns a structured response with:
  - The answer text appended with citation (document ID and chunk ID).
  - Status indicating whether answered or refused.
  - Citation metadata specifying source and chunk.

---

This approach ensures trustworthy, context-grounded answers with responsible handling of sensitive information.


In [None]:
def generate_answer_local(question, context):
    prompt = (f"You are a helpful assistant. Answer the question using ONLY the context below.\n"
              f"Context:\n{context}\n\nQuestion: {question}\nAnswer:")
    inputs = tokenizer(prompt, return_tensors="pt").to(model.device)
    outputs = model.generate(
        **inputs,
        max_new_tokens=150,
        do_sample=False,
        pad_token_id=tokenizer.eos_token_id
    )
    answer = tokenizer.decode(outputs[0], skip_special_tokens=True)
    return answer.split("Answer:")[-1].strip()

def answer_with_guardrails(question):
    retrieved = retrieve_top_chunks(question, k=10)
    facts_chunks = [c for c in retrieved if c['source'] == 'facts']
    external_chunks = [c for c in retrieved if c['source'] == 'external']

    if is_sensitive_question(question):
        if facts_chunks:
            chosen = facts_chunks[0]
            return {
                "answer": f"{chosen['text']} [{chosen['doc_id']}:{chosen['chunk_id']}]",
                "status": "answered",
                "citations": [{"source": "facts", "doc_id": chosen['doc_id'], "chunk_id": chosen['chunk_id']}]
            }
        else:
            return {
                "answer": "Sorry, this information is not available in our verified facts dataset.",
                "status": "refused"
            }

    if facts_chunks:
        chosen = facts_chunks
    else:
        safe_external = [c for c in external_chunks if not c['is_sensitive']]
        if safe_external:
            chosen = safe_external[0]
        else:
            return {
                "answer": "No safe answer can be provided based on our datasets.",
                "status": "refused"
            }

    answer_text = generate_answer_local(question, chosen['text'])
    return {
        "answer": f"{answer_text} [{chosen['doc_id']}:{chosen['chunk_id']}]",
        "status": "answered",
        "citations": [{"source": chosen['source'], "doc_id": chosen['doc_id'], "chunk_id": chosen['chunk_id']}]
    }


## Enhanced Retrieval and Answer Generation with Keyword Prioritization and Guardrails

This section defines a comprehensive approach to retrieving relevant context chunks and generating answers while respecting sensitivity and topical relevance.

---

## Core Functions

### 1. `retrieve_top_chunks(query, k=10)`
- Encodes the user query into an embedding.
- Performs a similarity search using FAISS to retrieve the top `k` relevant text chunks.
- Returns the associated chunks from the combined knowledge base.

### 2. `is_sensitive_question(question)`
- Checks if the question contains any sensitivity-related keywords (e.g., "price", "warranty", "availability", etc.).
- Returns `True` if any keyword matches (case-insensitive).
- Used to apply stricter answer policies on sensitive topics.

### 3. `generate_answer_local(question, context)`
- Creates a prompt instructing the language model to answer *only* using the given context.
- Runs the prompt through the loaded causal LM to generate a response.
- Returns the cleaned generated answer.

---

## Keyword-Based Prioritization

- `FACT_KEYWORDS` defines thematic categories with associated keywords related to the product attributes (battery, power, range, pricing, safety, etc.).
- `get_relevant_keyword_groups(question)` scans the question for these keywords, returning the relevant thematic groups.
- Based on these groups, the pipeline prioritizes chunks from the verified facts dataset (`facts_chunks`) that match those keywords, improving answer relevance.

---

## Guardrail Logic in `answer_with_guardrails(question)`

- Retrieves the top 20 chunks to allow filtering and prioritization.
- Separates chunks into:
  - `facts_chunks`: Verified factual information.
  - `external_chunks`: Broader external data.
- Prioritizes `facts_chunks` relevant to the question’s keywords.
- If the question is sensitive:
  - Responds strictly with the top matching fact chunk or refuses if unavailable.
- For non-sensitive questions:
  - Prefers to answer from top prioritized fact chunks.
  - Falls back on non-sensitive external chunks if no fact chunks are available.
  - Refuses to answer if no safe data is available.
- Answers are generated locally with the chosen chunk’s text as context.
- The final response includes the answer text, status, and citation metadata for transparency.

---

This design ensures trustworthiness by grounding responses in verified facts, improves focus with keyword prioritization, and responsibly handles sensitive queries.


In [None]:
def retrieve_top_chunks(query, k=10):
    query_emb = embed_model.encode([query], convert_to_numpy=True)
    distances, indices = index.search(query_emb, k)
    return [all_chunks[i] for i in indices[0]]

def is_sensitive_question(question):
    keywords = ['price', 'warranty', 'availability', 'pricing', 'cost', 'guarantee', 'available']
    return any(word in question.lower() for word in keywords)

def generate_answer_local(question, context):
    prompt = (f"You are a helpful assistant. Answer the question using ONLY the context below.\n"
              f"Context:\n{context}\n\nQuestion: {question}\nAnswer:")
    inputs = tokenizer(prompt, return_tensors="pt").to(model.device)
    outputs = model.generate(
        **inputs,
        max_new_tokens=150,
        do_sample=False,
        pad_token_id=tokenizer.eos_token_id
    )
    answer = tokenizer.decode(outputs[0], skip_special_tokens=True)
    return answer.split("Answer:")[-1].strip()

FACT_KEYWORDS = {
    "battery": ["battery", "capacity", "kwh", "charging", "fast charge", "charging time"],
    "power": ["power", "kw", "torque", "acceleration", "speed", "km/h", "top speed"],
    "range": ["range", "wltp", "km", "efficiency", "electric range"],
    "dimensions": ["length", "width", "height", "wheelbase", "mm", "dimensions"],
    "pricing": ["price", "pricing", "cost", "warranty", "guarantee", "available"],
    "exterior": ["color", "exterior", "paint", "design", "style"],
    "safety": ["airbags", "safety", "abs", "brake", "acc", "lane assist"],
    "interior": ["interior", "seats", "audio", "steering", "infotainment", "space"],
}

def get_relevant_keyword_groups(question):
    question_lower = question.lower()
    relevant_groups = []
    for category, keywords in FACT_KEYWORDS.items():
        if any(word in question_lower for word in keywords):
            relevant_groups.append(category)
    return relevant_groups

def answer_with_guardrails(question):
    retrieved = retrieve_top_chunks(question, k=20) # Retrieve more chunks to filter
    facts_chunks = [c for c in retrieved if c['source'] == 'facts']
    external_chunks = [c for c in retrieved if c['source'] == 'external']

    relevant_groups = get_relevant_keyword_groups(question)

    # Prioritize facts based on keyword relevance
    prioritized_facts = []
    if relevant_groups:
        for chunk in facts_chunks:
            if any(keyword in chunk['text'].lower() for group in relevant_groups for keyword in FACT_KEYWORDS[group]):
                prioritized_facts.append(chunk)
    # Add any remaining facts if not enough prioritized facts were found
    if len(prioritized_facts) < 5: # Arbitrary threshold, can be adjusted
         prioritized_facts.extend([c for c in facts_chunks if c not in prioritized_facts])
    facts_chunks = prioritized_facts[:10] # Use top 10 prioritized facts

    # Filter external chunks to exclude sensitive ones if the question is sensitive
    if is_sensitive_question(question):
        if facts_chunks:
             chosen = facts_chunks[0]
             return {
                "answer": f"{chosen['text']} [{chosen['doc_id']}:{chosen['chunk_id']}]",
                "status": "answered",
                "citations": [{"source": "facts", "doc_id": chosen['doc_id'], "chunk_id": chosen['chunk_id']}]
            }
        else:
            return {
                "answer": "Sorry, this information is not available in our verified facts dataset.",
                "status": "refused"
            }

    # If not a sensitive question, but facts are available, use facts
    if facts_chunks:
        chosen = facts_chunks[0]
        answer_text = generate_answer_local(question, chosen['text'])
        return {
            "answer": f"{answer_text} [{chosen['doc_id']}:{chosen['chunk_id']}]",
            "status": "answered",
            "citations": [{"source": chosen['source'], "doc_id": chosen['doc_id'], "chunk_id": chosen['chunk_id']}]
        }
    else:
        # If no facts, use non-sensitive external chunks
        safe_external = [c for c in external_chunks if not c['is_sensitive']]
        if safe_external:
            chosen = safe_external[0]
            answer_text = generate_answer_local(question, chosen['text'])
            return {
                "answer": f"{answer_text} [{chosen['doc_id']}:{chosen['chunk_id']}]",
                "status": "answered",
                "citations": [{"source": chosen['source'], "doc_id": chosen['doc_id'], "chunk_id": chosen['chunk_id']}]
            }
        else:
            return {
                "answer": "No safe answer can be provided based on our datasets.",
                "status": "refused"
            }

# **Testing and Interaction**

## Testing the RAG Pipeline with Sample Questions

This cell contains a list of example questions designed to test the retrieval-augmented generation (RAG) pipeline.

**Purpose:**

- Verify how the system handles a variety of question types including:
  - *Non-sensitive* questions expected to be answered from factual or external sources.
  - *Sensitive* questions related to pricing, availability, or warranty that trigger guardrails.
- Assess the accuracy and safety of responses by checking if sensitive queries are answered appropriately or refused.

**Usage:**

- The commented-out code block loops through each test question.
- For each question, it calls `answer_with_guardrails()` to obtain the answer.
- Prints the question and the corresponding retrieved answer or refusal.

**Note:**

- Uncomment and run this block to perform comprehensive testing of your RAG pipeline.
- Adjust or expand the test question list to cover additional scenarios as needed.

---

Running these tests helps ensure the pipeline performs reliably and respects content sensitivity policies before deployment or submission.


In [None]:
# test_questions = [
#     "What is the battery capacity of the BYD Seal?", # Non-sensitive, should be in facts
#     "Tell me about the BYD Seal's design features.", # Non-sensitive, likely in external
#     "What is the price of the BYD Seal?", # Sensitive, should be in facts or refused
#     "What is the warranty period for the BYD Seal?", # Sensitive, should be in facts or refused
#     "Is the BYD Seal available for purchase?", # Sensitive, should be in facts or refused
#     "What is the range of the BYD Seal?", # Non-sensitive, should be in facts
#     "What is the maximum power output of the BYD Seal?", # Non-sensitive, should be in facts
#     "Can you tell me about the interior of the BYD Seal?", # Non-sensitive, likely in external
#     "How fast can the BYD Seal accelerate from 0 to 100 km/h?", # Non-sensitive, should be in facts
#     "What kind of charging port does the BYD Seal use?", # Non-sensitive, might be in either
#     "Where can I buy a BYD Seal?", # Sensitive, should be in facts or refused
#     "What is the exterior color options for the BYD Seal?", # Non-sensitive, likely in external
#     "Tell me about the safety features of the BYD Seal.", # Non-sensitive, might be in either
#     "What is the price range?", # Sensitive, should be in facts or refused
#     "Is there any information about the extended warranty?", # Sensitive, should be in facts or refused
# ]

# print("Testing RAG pipeline with various questions:")
# for question in test_questions:
#     print(f"\nQuestion: {question}")
#     result = answer_with_guardrails(question)
#     print(f"Result: {result}")


In [None]:
import json

test_questions = [
    "What is the battery capacity of the BYD Seal?",  # Non-sensitive, should be answered from facts
    "What is the price of the BYD Seal?",             # Sensitive, should be answered from facts if available, otherwise refused
    "What is the warranty for the BYD Seal?",         # Sensitive, should be answered from facts if available, otherwise refused
    "Is the BYD Seal available for purchase?",        # Sensitive, should be answered from facts if available, otherwise refused
    "Tell me about the design features of the BYD Seal.",  # Non-sensitive, should be answered
    "What is the capital of France?",                  # General knowledge, likely outside context, should be refused or answered cautiously
    "What is the top speed of the BYD Seal?",         # Non-sensitive, should be answered
    "Can you tell me about the interior of the BYD Seal?"  # Non-sensitive, should be answered
]

for question in test_questions:
    print(f"Question: {question}")
    result = answer_with_guardrails(question)
    print(json.dumps(result, indent=2))
    print("-" * 20)

Question: What is the battery capacity of the BYD Seal?
{
  "answer": "82.5 kWh\n\nQuestion: What is the battery capacity of the BYD Seal [BYDSEAL:c75]",
  "status": "answered",
  "citations": [
    {
      "source": "facts",
      "doc_id": "BYDSEAL",
      "chunk_id": "c75"
    }
  ]
}
--------------------
Question: What is the price of the BYD Seal?
{
  "answer": "BYD Seal Premium - AED 154,900 [BYDSEAL:c287]",
  "status": "answered",
  "citations": [
    {
      "source": "facts",
      "doc_id": "BYDSEAL",
      "chunk_id": "c287"
    }
  ]
}
--------------------
Question: What is the warranty for the BYD Seal?
{
  "answer": "BYD Seal Premium - AED 154,900 [BYDSEAL:c287]",
  "status": "answered",
  "citations": [
    {
      "source": "facts",
      "doc_id": "BYDSEAL",
      "chunk_id": "c287"
    }
  ]
}
--------------------
Question: Is the BYD Seal available for purchase?
{
  "answer": "Sorry, this information is not available in our verified facts dataset.",
  "status": "refu

## Interactive Command-Line Question Answering Loop

This cell implements a simple interactive loop for testing the RAG pipeline locally.

**How it works:**

- Continuously prompts the user to enter a question via the console.
- Typing `"exit"` (case-insensitive) ends the loop and exits.
- For each entered question:
  - Calls the `answer_with_guardrails()` function to generate an answer with retrieval and sensitivity checks.
  - Formats the response dictionary as a nicely indented JSON string.
  - Prints the answer and related metadata to the console.
- Prints a separator line after each answer for readability.

---

This loop provides an easy way to manually test and interact with the pipeline in a local environment without a web interface.


In [None]:
import json # Import the json library

while True:
    question = input("Please enter your question (or type 'exit' to quit): ")
    if question.lower() == "exit":
        print("Exiting. Goodbye!")
        break

    response = answer_with_guardrails(question)
    # Use json.dumps to format the dictionary as a JSON string
    print("Answer:", json.dumps(response, indent=2))
    print("-" * 40)

Please enter your question (or type 'exit' to quit): exit
Exiting. Goodbye!


# **Deployment**

# Flask API for Question Answering

This cell sets up a simple Flask web API to serve the RAG pipeline for answering questions remotely.

---

**Key components:**

- **Flask app initialization:** Creates a Flask application instance.
- **`/ask` endpoint:** Accepts POST requests containing a JSON payload with a `"question"` field.
- **Request handling:**
  - Parses the incoming JSON data.
  - Validates that a question is provided, returning an error response if not.
  - Passes the question to the `answer_with_guardrails` function to get an answer.
- **Response:**
  - Returns the generated answer and metadata as a JSON response.

---

**Note:**
- The `app.run()` line is commented out for safety in notebook environments.
- For production deployment, a WSGI server like Gunicorn should be used.
- This API can be used to integrate your RAG pipeline with other applications or frontends.

---

This setup allows the RAG system to be accessed programmatically over HTTP.


In [None]:
from flask import Flask, request, jsonify

app = Flask(__name__)

@app.route('/ask', methods=['POST'])
def ask_question():
    data = request.get_json()
    question = data.get('question')

    if not question:
        return jsonify({"error": "No question provided"}), 400

    result = answer_with_guardrails(question)
    return jsonify(result)

if __name__ == '__main__':
    # This is for running the Flask app directly for testing.
    # In a production environment, you might use a production-ready WSGI server like Gunicorn.
    # app.run(debug=True)
    pass


In [None]:
design_content = """
# BYD Seal RAG Pipeline Design

## Data Ingestion Process

The data ingestion process involves loading information from two distinct sources: `byd_seal_facts.md` and `byd_seal_external.json`.

1.  **Loading Facts Data:** The `byd_seal_facts.md` file, which is a markdown file containing structured facts about the BYD Seal, is read line by line. Lines starting with '*' or containing '**' and ':' are identified as potential facts. Markdown formatting (like '*' and '**') is removed, and each extracted fact is treated as a separate chunk. A simple check for keywords like 'warranty', 'price', and 'aed' is performed to initially mark chunks as potentially sensitive. Each fact chunk is assigned a `doc_id` of "BYDSEAL" and a unique `chunk_id`.

2.  **Loading External Data:** The `byd_seal_external.json` file contains unstructured data, simulated from video transcripts. The JSON data is loaded, and the `transcriptText` content for each document is extracted. This text content is then split into chunks based on sentence boundaries (using '.' as a delimiter). Each external chunk is assigned its video ID as `doc_id`, a unique `chunk_id`, and is initially marked as non-sensitive (`is_sensitive: False`).

3.  **Combining Chunks:** The chunks from both the facts and external sources are combined into a single list (`all_chunks`). Each chunk object includes the text content, its source (`"facts"` or `"external"`), `doc_id`, `chunk_id`, and the `is_sensitive` flag.

## Vector Database Setup (FAISS)

A FAISS (Facebook AI Similarity Search) index is used as the vector database to enable efficient similarity search for retrieving relevant chunks based on user queries.

1.  **Embedding Model:** The `SentenceTransformer('all-MiniLM-L6-v2')` model is used to generate dense vector embeddings for the text content of each chunk. This model is chosen for its balance of performance and computational efficiency.

2.  **Embedding Generation:** The `encode()` method of the Sentence Transformer model is used to generate embeddings for all the text chunks in `all_chunks`. The embeddings are converted to a NumPy array.

3.  **FAISS Index Creation:** A `faiss.IndexFlatL2` index is created. This is a basic index type that performs a brute-force L2 (Euclidean) distance search. The dimension of the index is set to the dimension of the generated embeddings.

4.  **Adding Embeddings to Index:** The generated embeddings are added to the FAISS index using the `add()` method. This makes the chunks searchable based on their vector representations.

## Retrieval Logic and Fact Prioritization

The retrieval logic is designed to find the most relevant chunks to a user's query while prioritizing information from the verified "facts" dataset.

1.  **Query Embedding:** When a user submits a query, the same Sentence Transformer model used for chunk embedding is used to generate an embedding for the query.

2.  **FAISS Search:** The query embedding is used to perform a similarity search (`index.search()`) on the FAISS index. This returns the indices of the top-k most similar chunks in the `all_chunks` list, along with their distances.

3.  **Chunk Retrieval:** The indices returned by the FAISS search are used to retrieve the actual chunk objects from the `all_chunks` list.

4.  **Fact Prioritization:** The retrieved chunks are separated into `facts_chunks` and `external_chunks` based on their 'source' attribute. In the `answer_with_guardrails` function, facts are checked first for answering, ensuring that verified information is preferred.

## Guardrails Implementation

Guardrails are implemented to handle sensitive information and ensure grounded responses.

1.  **Sensitive Question Detection:** A simple keyword-based approach (`is_sensitive_question`) is used to identify questions that are likely to be sensitive (e.g., containing "price", "warranty", "availability").

2.  **Sensitive Information Handling Logic:**
    *   If a question is detected as sensitive: The pipeline *only* attempts to find a relevant chunk within the `facts_chunks` list. If a relevant fact is found, it is used to directly provide the answer (as facts are assumed to be verified for sensitivity). If no relevant fact chunk is found, the pipeline returns a predefined "refused" message, preventing the use of potentially unverified external data for sensitive topics.
    *   If a question is *not* sensitive: The pipeline first checks if any `facts_chunks` were retrieved. If yes, the most relevant fact chunk is used as context for the LLM. If no `facts_chunks` are available, it then looks at the `external_chunks`. From the external chunks, it *only* considers those explicitly marked as non-sensitive (`is_sensitive: False`) during ingestion. If a safe external chunk is found, it's used as context. If no facts or safe external chunks are available, a "refused" status is returned.

3.  **Grounded Generation:** The `generate_answer_local` function explicitly instructs the LLM in the prompt to "Answer the question using ONLY the context below." This prompt engineering aims to reduce the likelihood of the LLM generating information not present in the provided retrieved chunk.

4.  **Citations:** The `answer_with_guardrails` function includes the `doc_id` and `chunk_id` of the chosen chunk in the final answer string and in a separate `citations` field in the JSON response. This provides traceability for the source of the information.

## Integration with Local LLM

A local Large Language Model (`TheBloke/vicuna-7b-1.1-HF`) is used to generate human-readable answers based on the retrieved context.

1.  **Model Loading:** The `AutoTokenizer` and `AutoModelForCausalLM` classes from the `transformers` library are used to load the tokenizer and model. `device_map="auto"` is used to automatically distribute the model layers across available devices (like GPU), and `offload_folder` is specified to handle larger models that might exceed available GPU memory.

2.  **Answer Generation:** The `generate_answer_local` function prepares the prompt with the user question and the retrieved context. This prompt is tokenized and fed to the loaded LLM. The `model.generate()` method is used to produce a response, with parameters set to ensure deterministic generation (`do_sample=False`).

## API Endpoint Design

A simple REST API endpoint is created using Flask to interact with the RAG pipeline.

1.  **Endpoint:** A single `/ask` endpoint is exposed.
2.  **Method:** It accepts `POST` requests.
3.  **Request:** The request body is expected to be a JSON object containing the user's question under the key `"question"`.
4.  **Processing:** Upon receiving a request, the API extracts the question, calls the `answer_with_guardrails` function to get the result from the RAG pipeline.
5.  **Response:** The API returns a JSON response containing the generated `"answer"`, the `"status"` (e.g., "answered" or "refused"), and a list of `"citations"` (if available). Error handling is included for cases where no question is provided.
"""

with open("DESIGN.md", "w", encoding="utf-8") as f:
    f.write(design_content)

print("Created DESIGN.md")

Created DESIGN.md


In [None]:
readme_content = """
# BYD Seal RAG Pipeline

## Project Description

This project implements a Retrieval Augmented Generation (RAG) pipeline designed to answer user questions about the BYD Seal car. It utilizes two knowledge sources: a structured "facts" dataset and an unstructured "external" dataset (simulated from video transcripts). The pipeline prioritizes factual information and incorporates guardrails to prevent generating responses for sensitive topics (price, warranty, availability) based on external data, while providing citations for the generated answers.

## Setup Instructions

1.  **Clone the repository (if applicable):**
    ```bash
    # Assuming the code is in a repository
    # git clone <repository_url>
    # cd <repository_name>
    ```

2.  **Install dependencies:**
    This project requires the following Python packages. You can install them using pip:
    ```bash
    pip install sentence-transformers faiss-cpu transformers accelerate flask safetensors torch numpy
    ```
    *(Note: Some dependencies like `torch` might require specific installation based on your hardware, e.g., for GPU support. Refer to their official documentation for details.)*

3.  **Download the knowledge source data:**
    Ensure the `rag/data/byd_seal_facts.md` and `rag/data/byd_seal_external.json` files are present in the `rag/data/` directory. The pipeline will process these files.

4.  **Download the Sentence Transformer model:**
    The code will automatically download the `all-MiniLM-L6-v2` model on the first run if it's not cached.

5.  **Download the Local LLM:**
    The code uses the `TheBloke/vicuna-7b-1.1-HF` model. This model will be downloaded automatically on the first run. Ensure you have enough disk space and potentially a compatible GPU for efficient inference. The model is offloaded to disk if necessary.

## How to Run

The pipeline involves two main steps: data ingestion and running the API.

### 1. Data Ingestion and Vector Database Creation

Run the provided Python script (or Jupyter notebook cells) that performs the following:
*   Loads data from `byd_seal_facts.md` and `rag/data/byd_seal_external.json`.
*   Processes and chunks the data.
*   Generates embeddings for all chunks using the Sentence Transformer model.
*   Builds a FAISS index for efficient similarity search.
*   Loads the local LLM (`TheBloke/vicuna-7b-1.1-HF`).

This step prepares the vector database and the language model for answering questions.

### 2. Running the API

The API is implemented using Flask. To run the API, execute the Python script containing the Flask application.

```python
# Assuming your API code is in a file named app.py
# from flask import Flask, request, jsonify
# # ... (your existing API code) ...
# if __name__ == '__main__':
#     app.run(debug=True) # Or use a production server like Gunicorn
```

If you are running in a Jupyter environment, you can execute the cell containing the Flask app definition. For a production setup, consider using a WSGI server like Gunicorn.

## How to Use the API Endpoint

Once the API is running, you can send POST requests to the `/ask` endpoint with a JSON body containing your question.

**Endpoint:** `/ask`
**Method:** `POST`
**Request Body:**
```json
{
  "question": "What is the range of the BYD Seal?"
}
```

**Example using `curl`:**
```bash
curl -X POST -H "Content-Type: application/json" -d '{"question": "What is the battery capacity of the BYD Seal?"}' http://127.0.0.1:5000/ask
```
*(Note: Replace `http://127.0.0.1:5000` with the actual address and port where your Flask app is running.)*

**Response Body:**
The API will return a JSON object with the following structure:
```json
{
  "answer": "The BYD Seal has a battery capacity of 82.5 kWh [BYDSEAL:c5]",
  "status": "answered",
  "citations": [
    {
      "source": "facts",
      "doc_id": "BYDSEAL",
      "chunk_id": "c5"
    }
  ]
}
```
or for a refused question:
```json
{
  "answer": "Sorry, this information is not available in our verified facts dataset.",
  "status": "refused"
}
```

## Guardrails Overview

The RAG pipeline incorporates the following guardrails:

1.  **Fact Prioritization:** When retrieving relevant information for a query, chunks from the "facts" dataset are prioritized over chunks from the "external" dataset.
2.  **Sensitive Information Handling:** The pipeline identifies sensitive questions based on keywords like "price", "warranty", and "availability".
    *   If a sensitive question is asked, the pipeline *only* considers answers derived from the "facts" dataset.
    *   If no relevant information is found in the "facts" dataset for a sensitive question, the pipeline refuses to answer and returns a predefined message.
    *   Sensitive chunks from the "external" dataset are ignored for sensitive questions.
3.  **Grounded Answers:** The language model is instructed to generate answers *only* based on the provided context (retrieved chunks) to minimize hallucinations.
4.  **Citations:** All answered questions include citations indicating the source, document ID, and chunk ID of the information used to generate the answer.
"""

with open("README.md", "w", encoding="utf-8") as f:
    f.write(readme_content)

print("Created README.md")

Created README.md
