### Agent 3 Lab Assignment

**By: Mohammad Wasil Jalali**

What: This cell defines a constant string variable called META_SYSTEM_PROMPT.

Why: This prompt acts as the foundational instruction manual for the LLM. It clearly outlines the agent's identity (a research assistant), its primary objective (answering questions from provided documents), and crucial constraints (avoiding hallucination, staying relevant, being concise). This ensures the LLM behaves consistently according to the assignment's requirements for a specialized agent.


In [1]:
# --- Meta System Prompt for Research Assistant Agent ---
META_SYSTEM_PROMPT = """
You are a highly specialized Research Assistant AI. Your primary function is to assist users by providing accurate, concise, and relevant answers based solely on the information contained within the documents provided to you through the Retrieval-Augmented Generation (RAG) system.

CRITICAL RULES:
1.  FOCUS: Answer only the specific question asked by the user.
2.  SOURCE: Base ALL your responses strictly on the context retrieved from the provided documents.
3.  NO HALLUCINATION: NEVER fabricate, invent, or assume information not explicitly stated in the retrieved context.
4.  CLARITY: If the retrieved context does not contain sufficient information to answer the question fully, clearly state what information is missing or that the question cannot be answered based on the provided documents.
5.  RELEVANCE: Stay directly relevant to the user's query and the provided context. Avoid tangential discussions.
6.  BREVITY: Provide clear and concise answers, avoiding unnecessary verbosity unless detail is required for accuracy.

Your goal is to act as a precise and reliable conduit between the user and the specific knowledge contained in the loaded documents, ensuring trustworthiness and factual accuracy in every response.
"""

print("Meta System Prompt defined successfully.")

Meta System Prompt defined successfully.


In [4]:
import os
import PyPDF2
import pdfplumber
import pandas as pd
from pathlib import Path
import re

print("Libraries for data loading imported successfully.")

Libraries for data loading imported successfully.


What: This cell defines a Python function load_document_text that accepts a file path.

Why: Centralizing the document loading logic into a function makes the code reusable and cleaner. This function abstracts the complexity of handling different file formats (PDF vs. TXT) and potential errors during reading, providing a single point where text is extracted from a given file path for subsequent processing steps.

In [5]:
def load_document_text(file_path):
    """
    Loads text from a file (.txt or .pdf).
    Args:
        file_path (str): Path to the document file.
    Returns:
        str: Extracted text content from the file.
    """
    file_extension = Path(file_path).suffix.lower()

    if file_extension == '.txt':
        try:
            with open(file_path, 'r', encoding='utf-8') as file:
                return file.read()
        except UnicodeDecodeError:
            # Fallback for potential encoding issues
            with open(file_path, 'r', encoding='latin-1') as file:
                return file.read()
    elif file_extension == '.pdf':
        text_content = ""
        try:
            with pdfplumber.open(file_path) as pdf:
                for page in pdf.pages:
                    text_content += page.extract_text() or "" # Handle potential None returns
        except Exception as e:
            print(f"Error extracting text from PDF {file_path}: {e}")
            # Fallback using PyPDF2
            try:
                with open(file_path, 'rb') as file:
                    reader = PyPDF2.PdfReader(file)
                    for page_num in range(len(reader.pages)):
                        page = reader.pages[page_num]
                        text_content += page.extract_text()
            except Exception as e2:
                print(f"Fallback method also failed for {file_path}: {e2}")
                return ""
        return text_content
    else:
        print(f"Unsupported file type: {file_extension}. Supported types: .txt, .pdf")
        return ""

# Example usage (assuming you have a sample document in the 'data' folder named 'sample.pdf' or 'sample.txt')
# sample_doc_path = "../data/sample.pdf"  # Adjust path as needed
# sample_text = load_document_text(sample_doc_path)
# print(sample_text[:500]) # Print first 500 characters to verify loading

print("Document loading function defined successfully.")

Document loading function defined successfully.


What: This cell defines a function load_all_documents_from_directory that scans a specified folder.

Why: This function scales the loading process beyond a single file. It iterates through the data directory, automatically finding and using the load_document_text function to load all supported documents (.txt, .pdf) into a dictionary, preparing the full knowledge base for the RAG system.

In [6]:
def load_all_documents_from_directory(directory_path="/content/"):
    """
    Loads text from all supported files (.txt, .pdf) in a given directory.
    Args:
        directory_path (str): Path to the directory containing documents.
    Returns:
        dict: A dictionary mapping filenames to their extracted text content.
              e.g., {'doc1.pdf': 'text...', 'doc2.txt': 'text...'}
    """
    directory = Path(directory_path)
    documents = {}

    if not directory.exists():
        print(f"Directory {directory_path} does not exist. Please place your documents there.")
        return documents

    for file_path in directory.glob("*"):
        if file_path.is_file() and file_path.suffix.lower() in ['.txt', '.pdf']:
            print(f"Loading {file_path.name}...")
            text = load_document_text(str(file_path))
            if text.strip(): # Only add if text extraction was successful and not empty
                documents[file_path.name] = text
            else:
                print(f"Failed to load or got empty text from {file_path.name}")

    print(f"Loaded {len(documents)} documents successfully.")
    return documents

# Example usage (this will attempt to load all documents from the ../data directory)
# all_docs = load_all_documents_from_directory()
# print(list(all_docs.keys())) # Print filenames loaded

print("Function to load all documents defined successfully.")

Function to load all documents defined successfully.


What: This cell defines a function chunk_text that takes a large block of text and splits it into smaller, overlapping segments.

Why: Large documents cannot be fed entirely into an LLM's context window. Chunking breaks the text into manageable pieces that fit within the model's limits while retaining some context through overlap. This allows the RAG system to retrieve the most relevant small piece of information for a given query

In [7]:
def chunk_text(text, chunk_size=512, overlap=50):
    """
    Splits text into overlapping chunks of specified size.
    Args:
        text (str): The text to chunk.
        chunk_size (int): Maximum number of characters per chunk.
        overlap (int): Number of characters to overlap between chunks.
    Returns:
        list: List of text chunks.
    """
    if not text:
        return []

    chunks = []
    start = 0
    text_length = len(text)

    while start < text_length:
        end = start + chunk_size
        # Ensure we don't go past the end of the text
        if end > text_length:
            end = text_length
        chunk = text[start:end]
        chunks.append(chunk)
        # Move start forward by chunk_size minus overlap
        start = end - overlap

        # Prevent infinite loop if chunk_size <= overlap
        if chunk_size <= overlap:
             print("Warning: chunk_size should be greater than overlap to avoid infinite loops.")
             break

        # If the remaining text is less than chunk_size, take it all without overlap
        if text_length - start < chunk_size:
            if start < text_length:
                final_chunk = text[text_length - chunk_size:]
                if final_chunk != chunks[-1]: # Avoid adding duplicate if overlap caused it
                     chunks.append(final_chunk)
            break

    return [chunk for chunk in chunks if chunk.strip()] # Remove empty chunks

# Example usage
# sample_text = "This is a long text that needs to be split into smaller chunks..." * 20
# sample_chunks = chunk_text(sample_text, chunk_size=100, overlap=10)
# print(f"Number of chunks created: {len(sample_chunks)}")
# print(f"First chunk: {repr(sample_chunks[0])}")

print("Text chunking function defined successfully.")

Text chunking function defined successfully.


What: This cell defines a function load_and_chunk_documents that orchestrates the previous steps.

Why: This function provides a single, convenient step to take all documents from the data directory, load their text, and then apply the chunking logic to every piece of text. It produces a unified list of all document chunks, which is the ideal input format for creating the vector database index.

In [8]:
def load_and_chunk_documents(directory_path="/conent/", chunk_size=512, overlap=50):
    """
    Loads all documents from a directory and chunks their text.
    Args:
        directory_path (str): Path to the directory containing documents.
        chunk_size (int): Size of each text chunk.
        overlap (int): Overlap between chunks.
    Returns:
        list: A list of dictionaries, each containing 'source' (filename) and 'content' (chunk text).
    """
    all_docs = load_all_documents_from_directory(directory_path)
    all_chunks = []

    for filename, text in all_docs.items():
        print(f"Chunking {filename}...")
        chunks = chunk_text(text, chunk_size=chunk_size, overlap=overlap)
        for i, chunk in enumerate(chunks):
            all_chunks.append({
                "source": f"{filename}_chunk_{i}",
                "content": chunk
            })

    print(f"Total number of chunks created: {len(all_chunks)}")
    return all_chunks

# Example usage (this will load and chunk all documents from the ../data directory)
# all_document_chunks = load_and_chunk_documents()

print("Load and chunk function defined successfully.")

Load and chunk function defined successfully.


In [11]:
from sentence_transformers import SentenceTransformer
import faiss
import numpy as np

print("Libraries for embeddings and FAISS imported successfully.")



Libraries for embeddings and FAISS imported successfully.


What: This cell loads a pre-trained sentence transformer model and creates an empty FAISS index object.

Why: The sentence transformer model is responsible for converting text (both chunks and queries) into dense vector representations. The FAISS index is the data structure that will store these vectors efficiently and allow for rapid similarity searches, forming the backbone of the retrieval mechanism.

In [12]:
# Initialize the embedding model
# Using a lightweight but effective model suitable for sentence similarity
embedding_model_name = "all-MiniLM-L6-v2"
embedding_model = SentenceTransformer(embedding_model_name)

print(f"Embedding model '{embedding_model_name}' loaded successfully.")

# Define function to create FAISS index
def create_faiss_index(dimension):
    """
    Creates a FAISS index for similarity search.
    Args:
        dimension (int): Dimensionality of the embeddings.
    Returns:
        faiss.IndexFlatIP: FAISS index object.
    """
    # Use Inner Product (IP) index; cosine similarity can be computed from IP
    # Normalize embeddings beforehand if using IP for cosine similarity
    index = faiss.IndexFlatIP(dimension)
    return index

print("FAISS index creation function defined successfully.")

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


modules.json:   0%|          | 0.00/349 [00:00<?, ?B/s]

config_sentence_transformers.json:   0%|          | 0.00/116 [00:00<?, ?B/s]

README.md: 0.00B [00:00, ?B/s]

sentence_bert_config.json:   0%|          | 0.00/53.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/612 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/90.9M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/350 [00:00<?, ?B/s]

vocab.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/190 [00:00<?, ?B/s]

Embedding model 'all-MiniLM-L6-v2' loaded successfully.
FAISS index creation function defined successfully.


What: This cell defines a function that takes the list of document chunks, generates embeddings for each chunk's content, normalizes them, and adds them to the FAISS index.

Why: This step builds the searchable knowledge base. By embedding all the text chunks and storing them in FAISS, we create a structure that allows the system to quickly find the chunks most semantically similar to a user's query, which is essential for retrieving relevant context.

In [13]:
def populate_faiss_index(chunks_list, embedding_model, index):
    """
    Generates embeddings for text chunks and adds them to the FAISS index.
    Args:
        chunks_list (list): List of dictionaries containing 'content'.
        embedding_model (SentenceTransformer): The embedding model.
        index (faiss.Index): The FAISS index.
    Returns:
        list: The list of chunks corresponding to the indices in the FAISS index.
    """
    contents = [chunk['content'] for chunk in chunks_list]

    print(f"Generating embeddings for {len(contents)} chunks...")
    # Generate embeddings
    embeddings = embedding_model.encode(contents, show_progress_bar=True)

    # Normalize embeddings for cosine similarity (when using IndexFlatIP)
    faiss.normalize_L2(embeddings)

    # Convert embeddings to float32 numpy array (required by FAISS)
    embeddings_f32 = embeddings.astype('float32')

    # Add embeddings to the index
    index.add(embeddings_f32)

    print(f"Added {index.ntotal} vectors to the FAISS index.")
    return chunks_list # Return the list of chunks to map results back later

print("FAISS population function defined successfully.")

FAISS population function defined successfully.


What: This cell defines a function that takes a user query, embeds it, searches the FAISS index, and returns the top k most similar document chunks.

Why: This is the "Retrieval" part of RAG. When a user asks a question, this function finds the specific pieces of information from the knowledge base that are most likely to contain the answer, providing the necessary context for the LLM to generate a response.

In [14]:
def retrieve_relevant_chunks(query, index, chunks_list, embedding_model, top_k=5):
    """
    Retrieves the top_k most relevant text chunks for a given query.
    Args:
        query (str): The user's query.
        index (faiss.Index): The populated FAISS index.
        chunks_list (list): The list of chunks corresponding to the index.
        embedding_model (SentenceTransformer): The embedding model.
        top_k (int): Number of top results to retrieve.
    Returns:
        list: A list of the top_k relevant chunks (dictionaries with 'source' and 'content').
    """
    # Embed the query
    query_embedding = embedding_model.encode([query])
    faiss.normalize_L2(query_embedding)
    query_embedding_f32 = query_embedding.astype('float32')

    # Perform similarity search
    scores, indices = index.search(query_embedding_f32, top_k)

    # Retrieve the corresponding chunks
    relevant_chunks = []
    for idx in indices[0]: # indices[0] because batch size was 1
        if idx != -1 and idx < len(chunks_list): # Check for valid index
            relevant_chunks.append(chunks_list[idx])

    return relevant_chunks, scores[0]

print("Retrieval function defined successfully.")

Retrieval function defined successfully.


In [15]:
from transformers import pipeline, AutoTokenizer, AutoModelForCausalLM, AutoModelForSeq2SeqLM
import torch

print("Libraries for Hugging Face models imported successfully.")

Libraries for Hugging Face models imported successfully.


What: This cell downloads and sets up the specific LLM model (flan-t5-base) and creates a Hugging Face pipeline for text-to-text generation.

Why: This step loads the core component responsible for "Generation" in RAG. The chosen model (FLAN-T5) will take the retrieved context and the user's question (formatted as a prompt) and generate the final answer.

In [16]:
# Let's use a relatively light but capable model like FLAN-T5 base
llm_model_name = "google/flan-t5-base"

print(f"Loading LLM model: {llm_model_name}")
tokenizer = AutoTokenizer.from_pretrained(llm_model_name)
model = AutoModelForSeq2SeqLM.from_pretrained(llm_model_name)

# Check if CUDA is available and move model to GPU if possible (optional, CPU works too)
device = 0 if torch.cuda.is_available() else -1
print(f"Using device: {'GPU' if device == 0 else 'CPU'}")

# Create a text generation pipeline
generator_pipeline = pipeline(
    "text2text-generation",
    model=model,
    tokenizer=tokenizer,
    device=device, # Use -1 for CPU, 0 for GPU 0, etc.
    pad_token_id=tokenizer.eos_token_id # Important for T5 models
)

print(f"LLM pipeline initialized successfully using {llm_model_name}.")

Loading LLM model: google/flan-t5-base


tokenizer_config.json: 0.00B [00:00, ?B/s]

spiece.model:   0%|          | 0.00/792k [00:00<?, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

special_tokens_map.json: 0.00B [00:00, ?B/s]

config.json: 0.00B [00:00, ?B/s]

model.safetensors:   0%|          | 0.00/990M [00:00<?, ?B/s]

generation_config.json:   0%|          | 0.00/147 [00:00<?, ?B/s]

Device set to use cpu


Using device: CPU
LLM pipeline initialized successfully using google/flan-t5-base.


What: This cell defines the central function rag_agent_query that orchestrates the entire RAG process, including the Maker-Checker loop.

Why: This function integrates all previous components (retrieval, LLM generation, and the checker logic). It performs the "Maker" step by retrieving context and generating an answer. Then, it implements the "Checker" step using simple_check_answer to validate the generated answer against the provided context, embodying the core logic of the assignment.

In [17]:
def format_prompt_for_llm(question, context_str, meta_prompt=META_SYSTEM_PROMPT):
    """
    Formats the final prompt for the LLM using the question, context, and meta prompt.
    """
    prompt = f"""
{meta_prompt}

Context Information:
--------------------
{context_str}

--------------------
Question: {question}
--------------------
Answer:
"""
    return prompt.strip()


def simple_check_answer(answer, context_str, question):
    """
    A simple checker to see if the answer seems grounded in the context.
    This is a basic heuristic and not foolproof.
    """
    # Check if answer contains text that appears verbatim or very similarly in the context
    answer_lower = answer.lower().strip()
    context_lower = context_str.lower()

    # Simple keyword overlap check (very basic)
    answer_words = set(re.findall(r'\w+', answer_lower))
    context_words = set(re.findall(r'\w+', context_lower))

    common_words = answer_words.intersection(context_words)
    total_answer_words = len(answer_words)

    if total_answer_words > 0:
        overlap_ratio = len(common_words) / total_answer_words
        # If less than 30% of answer words appear in context, flag for review
        if overlap_ratio < 0.30:
            print("WARNING: Low word overlap between answer and context detected. Potential hallucination risk.")
            return False
    else:
        # If no words overlap and answer isn't obviously "I don't know" style, flag
        if "cannot be answered" not in answer_lower and "not found" not in answer_lower and len(answer) > 10:
             print("WARNING: No word overlap found between answer and context. Potential hallucination risk.")
             return False

    # Basic safety check: Look for keywords indicating uncertainty
    if "based on the provided context" in answer_lower or \
       "the document states" in answer_lower or \
       "according to the information" in answer_lower:
           return True # These phrases suggest grounding

    # If passes basic checks
    return True


def rag_agent_query(query, index, chunks_list, embedding_model, generator_pipeline, top_k=5, max_new_tokens=250):
    """
    Main function to perform RAG query with Maker-Checker logic.
    """
    print(f"Processing query: {query}")

    # --- MAKER STEP ---
    # 1. Retrieve relevant chunks
    relevant_chunks, scores = retrieve_relevant_chunks(query, index, chunks_list, embedding_model, top_k=top_k)

    if not relevant_chunks:
        print("No relevant chunks found for the query.")
        return "I couldn't find any information in the provided documents related to your question."

    # 2. Combine retrieved contexts
    context_str = "\n---\n".join([f"Source: {chunk['source']}\nContent: {chunk['content']}" for chunk in relevant_chunks])

    # 3. Format prompt for LLM
    formatted_prompt = format_prompt_for_llm(query, context_str)

    # 4. Generate answer using LLM
    print("Generating answer using LLM...")
    try:
        result = generator_pipeline(
            formatted_prompt,
            max_new_tokens=max_new_tokens, # Limit output length
            do_sample=False, # Use greedy decoding for consistency
            temperature=0.0, # Deterministic output
            num_return_sequences=1
        )
        generated_answer = result[0]['generated_text']
    except Exception as e:
        print(f"Error generating answer: {e}")
        return "An error occurred while generating the answer."

    # --- CHECKER STEP ---
    print("\n--- MAKER-CHECKER LOOP ---")
    is_validated = simple_check_answer(generated_answer, context_str, query)

    if not is_validated:
        print("Checker flagged potential issue with the generated answer.")
        # For now, we'll return the answer but indicate caution
        # In a more complex system, you might retry with different prompts/contexts
        final_answer = f"[CAUTION: This answer might not be fully grounded in the provided documents.]\n\n{generated_answer}"
    else:
        print("Checker validated the generated answer.")
        final_answer = generated_answer

    print("--- END MAKER-CHECKER LOOP ---\n")

    return final_answer, relevant_chunks, scores

print("Main RAG agent function with Maker-Checker defined successfully.")

Main RAG agent function with Maker-Checker defined successfully.


What: This cell defines the main workflow function run_full_rag_workflow that calls all other functions in sequence, including the new validate_input function.

Why: This function provides a single entry point to execute the entire Agentic RAG pipeline from start to finish. It ensures the safety measure of input validation is performed before processing and demonstrates how all individual components work together as a complete system, fulfilling all tasks of the assignment.

In [19]:
def run_full_rag_workflow(query, data_dir="/content/", chunk_size=512, overlap=50, top_k=5):
    """
    Executes the complete RAG workflow: Load -> Chunk -> Embed -> Index -> Retrieve -> Generate -> Check.
    """
    print("--- Starting Full RAG Workflow ---")

    # 1. Load and chunk documents
    print("\n1. Loading and chunking documents...")
    chunks_list = load_and_chunk_documents(data_dir, chunk_size, overlap)
    if not chunks_list:
        print("No documents loaded/chunked. Exiting workflow.")
        return None

    # 2. Create FAISS index
    print("\n2. Creating FAISS index...")
    embedding_dimension = embedding_model.get_sentence_embedding_dimension()
    faiss_index = create_faiss_index(embedding_dimension)

    # 3. Populate index with embeddings
    print("\n3. Generating embeddings and populating FAISS index...")
    chunks_list_with_ids = populate_faiss_index(chunks_list, embedding_model, faiss_index)

    # 4. Query the RAG system
    print("\n4. Processing user query...")
    final_response, retrieved_chunks, retrieval_scores = rag_agent_query(
        query, faiss_index, chunks_list_with_ids, embedding_model, generator_pipeline, top_k=top_k
    )

    print("\n--- Final Response ---")
    print(final_response)
    print("\n--- Retrieved Chunks Used ---")
    for i, chunk in enumerate(retrieved_chunks):
         print(f"\n{i+1}. Source: {chunk['source']}")
         print(f"Content: {chunk['content'][:200]}...") # Show first 200 chars

    print("\n--- END FULL RAG WORKFLOW ---")
    return final_response

# Example usage:
# Place a sample PDF or TXT file in the '../data' directory (e.g., 'sample_research_paper.pdf')
# Then run:
response = run_full_rag_workflow("What are the main findings of the research?")

--- Starting Full RAG Workflow ---

1. Loading and chunking documents...
Loading Air-quality.pdf...
Loaded 1 documents successfully.
Chunking Air-quality.pdf...
Total number of chunks created: 208

2. Creating FAISS index...

3. Generating embeddings and populating FAISS index...
Generating embeddings for 208 chunks...


Batches:   0%|          | 0/7 [00:00<?, ?it/s]

Token indices sequence length is longer than the specified maximum sequence length for this model (965 > 512). Running this sequence through the model will result in indexing errors
The following generation flags are not valid and may be ignored: ['temperature']. Set `TRANSFORMERS_VERBOSITY=info` for more details.


Added 208 vectors to the FAISS index.

4. Processing user query...
Processing query: What are the main findings of the research?
Generating answer using LLM...

--- MAKER-CHECKER LOOP ---
Checker validated the generated answer.
--- END MAKER-CHECKER LOOP ---


--- Final Response ---
Historical air quality data were obtained from the National Environmental Protection Agency (NEPA), and real-time meteorological data were collected from the OpenWeather API (https://openweathermap.org/api). The code and processed data supporting the findings of this study are available at: https://git hub.com/Moh Wasil/AI-Ba sed_Air- Quality-Prediction-and-Classification_System. Declarations Ethics approval and consent to participate This study did not involve any human participants, animals, or ethically sensitive data. This research did not involve any human participants, animals, or ethically sensitive data requiring consent to publish.

--- Retrieved Chunks Used ---

1. Source: Air-quality.pdf_chunk_18