**<h4>Author: IDIKA, UDUMA UDUMA</h4>**

**<h4>AI Project: Mini-RAG Chatbot System</h4>**

In [3]:
#Confirming that this project is executed in my virtual environment (rag_env)
import sys
print(sys.executable)

C:\Users\Admin\rag_env\Scripts\python.exe


In [4]:
#Creating a pip_install function to neatly install the required packages in the virtual environment
def pip_install(package):
    import sys
    import subprocess
    subprocess.check_call([sys.executable, "-m", "pip", "install", package])

In [5]:
#Installing the required packages
packages = ["sentence-transformers", "faiss-cpu", "pdfplumber", "python-docx", "transformers", "accelerate", "torch"]

for package in packages:
    print(f"Installing '{package}'...")
    pip_install(package)
    print(f"Installation of '{package}' completed✅\n")

Installing 'sentence-transformers'...
Installation of 'sentence-transformers' completed✅

Installing 'faiss-cpu'...
Installation of 'faiss-cpu' completed✅

Installing 'pdfplumber'...
Installation of 'pdfplumber' completed✅

Installing 'python-docx'...
Installation of 'python-docx' completed✅

Installing 'transformers'...
Installation of 'transformers' completed✅

Installing 'accelerate'...
Installation of 'accelerate' completed✅

Installing 'torch'...
Installation of 'torch' completed✅



**<h4><u>Steps to be Taken:</u></h4>**
> - **Step 1:** Document Upload & Text Extraction
> - **Step 2:** Text Cleaning, Preprocessing, and Chunking
> - **Step 3:** Embedding Computation
> - **Step 4:** Vector Index (FAISS)
> - **Step 5:** Retrieval & Prompt Construction
> - **Step 6:** Answer Generation with an LLM
> - **Step 7:** Simple Interactive Loop (Mini Chatbot)

**<h4><u>Step 1:</u> Document Upload & Text Extraction</h4>**
> - Create an uploads/ folder for storing temporary uploaded files
> - Define a function that accepts a file path
> - Automatically determine the file type (.pdf, .docx, .txt)
> - Extract text from the document using the right parser:<br>
         * PDF: pdfplumber<br>
         * DOCX: python-docx<br>
         * TXT: simple open()

In [8]:
#Importing the required libraries
import os
import pdfplumber
from docx import Document

In [9]:
# ----------------------------------
# Creating an upload directory
# ----------------------------------
UPLOAD_DIR = "uploads"
os.makedirs(UPLOAD_DIR, exist_ok=True)

# ----------------------------------
# Function: Extracting text from PDF
# ----------------------------------
def extract_text_from_pdf(file_path):
    text = []
    with pdfplumber.open(file_path) as pdf:
        for page in pdf.pages:
            content = page.extract_text()
            if content:
                text.append(content)
    return "\n".join(text)


# ----------------------------------
# Function: Extracting text from DOCX
# ----------------------------------
def extract_text_from_docx(file_path):
    doc = Document(file_path)
    return "\n".join([p.text for p in doc.paragraphs])


# ----------------------------------
# Function: Extracting text from TXT
# ----------------------------------
def extract_text_from_txt(file_path):
    with open(file_path, "r", encoding="utf-8", errors="ignore") as f:
        return f.read()


# ----------------------------------
# File type detection 
# ----------------------------------
def detect_file_type(file_path):
    #1. Try using the file extension
    ext = os.path.splitext(file_path)[1].lower()
    if ext in [".pdf", ".docx", ".txt"]:
        return ext

    #2. Peeking into the file signature (magic bytes)
    with open(file_path, "rb") as f:
        header = f.read(4)

    if header.startswith(b"%PDF"):
        return ".pdf"

    elif header.startswith(b"PK"):  #DOCX files are ZIP containers
        return ".docx"

    else:
        #Deafult fallback: assume text
        return ".txt"
    

# ----------------------------------
# Main Extraction Function 
# ----------------------------------
def extract_text(file_path):
    """
    This function extracts text from a document (PDF, DOCX, or TXT).
    It automatically detects the file type by extension.
    """
    file_type = detect_file_type(file_path)
    
    if file_type == ".pdf":         #Call the pdf extractor
        return extract_text_from_pdf(file_path)

    elif file_type == ".docx":     #Call the docx extractor
        return extract_text_from_docx(file_path)

    elif file_type == ".txt":      #Call the txt extractor
        return extract_text_from_txt(file_path)

    else:
        raise ValueError(f"❌Unsupported file type: '{ext}'\n Only .pdf, .docx, or .txt files are supported")

In [10]:
# ----------------------------------
# Interactive File Upload + Process
# ----------------------------------
from IPython.display import display
import ipywidgets as widgets
import shutil

upload_widget = widgets.FileUpload(
    accept='.pdf,.docx,.txt',
    multiple=False,             #Allows only single file upload
    description='Upload File'
)

process_button = widgets.Button(description="Process File", button_style='success')
output_area = widgets.Output()

# Global storage for access after processing
uploaded_file_path = None
extracted_text = None

def on_process_clicked(b):
    global uploaded_file_path, extracted_text   #Declaring global variables
    
    if not upload_widget.value:
        with output_area:
            output_area.clear_output()
            print("⚠ Please upload a file first.")
        return

    #Support both dict and tuple return formats
    upload_value = upload_widget.value
    if isinstance(upload_value, dict):
        uploaded_file = list(upload_value.values())[0]
    elif isinstance(upload_value, tuple):
        uploaded_file = upload_value[0]
    else:
        with output_area:
            output_area.clear_output()
            print("❌ Unexpected upload format:", type(upload_value))
        return

    #Get uploaded file data
    filename = uploaded_file.get("metadata", {}).get("name", "uploaded_file")
    file_path = os.path.join(UPLOAD_DIR, filename)

    #Save uploaded content to disk
    with open(file_path, "wb") as f:
        f.write(uploaded_file["content"])

    #Extract text
    text = extract_text(file_path)

    #Store results in global variables
    uploaded_file_path = file_path
    extracted_text = text

    with output_area:
        output_area.clear_output()
        print(f"✅ File uploaded and saved to: {file_path}")
        print(f"📄 Extracted {len(text)} characters of text.\n")
        print("Preview of first few characters:")
        print(text[:200], "...\n")    #Preview of first 200 characters

        
process_button.on_click(on_process_clicked)

display(upload_widget, process_button, output_area)

FileUpload(value=(), accept='.pdf,.docx,.txt', description='Upload File')

Button(button_style='success', description='Process File', style=ButtonStyle())

Output()

In [24]:
#Asserting that a file was uploaded and processed
assert uploaded_file_path is not None, "⚠ You must upload and process a file before proceeding!"

print("✅ File upload confirmed. Proceeding...")

✅ File upload confirmed. Proceeding...


**<h4><u>Step 2:</u> Text Cleaning, Preprocessing and Chunking**</h4>
> - Clean and normalize the extracted text.
> - Split text into overlapping chunks for better context continuity
> - Display sample chunks to verify the process

In [26]:
import re    #regular expression (regex) library
# ----------------------------------
# Text Cleaning Function
# ----------------------------------
def clean_text(text):
    """Normalize and clean extracted text."""
    #Normalizing line breaks and spaces
    text = re.sub(r'\s+', ' ', text)  #Replaces multiple spaces/newlines with one space

    #Removing continuous dot leaders (e.g., "Introduction ..... 3")
    text = re.sub(r'\.{3,}\s*\d*', ' ', text)

    #Removing spaced dot leaders
    text = re.sub(r'(\.\s){2,}\d*', ' ', text)

    #Removing isolated page numbers (e.g., "Page 2", "2")
    text = re.sub(r'\bPage\s*\d+\b', ' ', text, flags=re.IGNORECASE)
    text = re.sub(r'\b\d+\b', ' ', text)     #Removing stray digits

    #Removing non-ASCII characters
    text = re.sub(r'[^\x00-\x7F]+', ' ', text)  #Removes non-ASCII characters
  
    #Normalizing multiple spaces again
    text = re.sub(r'\s{2,}', ' ', text)

    #Trimming final whitespaces
    text = text.strip()

    #Return the cleaned text
    return text

# ----------------------------------
# Text Chunking Function
# ----------------------------------
def chunk_text(text, chunk_size=1000, overlap=100):
    """
    This function splits text into overlapping chunks
    :param text: Cleaned text.
    :param chunk_size: Number of characters per chunk
    :param overlap: Overlap between chunks to maintain context
    """
    chunks = []
    start = 0
    text_length = len(text)

    while start < text_length:
        end = min(start + chunk_size, text_length)
        chunk = text[start:end]
        chunks.append(chunk)
        start += chunk_size - overlap  #Redefining the start for the next iteration

    return chunks

In [27]:
# ----------------------------------
# Example Run After Extraction
# ----------------------------------
def process_extracted_text(file_path):
    cleaned = clean_text(extracted_text)
    chunks = chunk_text(cleaned)

    print(f"✅ Cleaned text length: {len(cleaned)} characters")
    print(f"✅ Created {len(chunks)} chunks (avg {len(chunks[0])} chars each)\n")

    #Preview first 2 chunks
    for i, c in enumerate(chunks[:2]):
        print(f"--- Chunk {i+1} ---")
        print(c[:200], "...\n")

    return chunks

chunks = process_extracted_text(uploaded_file_path)

✅ Cleaned text length: 1269 characters
✅ Created 2 chunks (avg 1000 chars each)

--- Chunk 1 ---
Reference (the registration number from the file management system for foreign nationals in France; this must be provided in any correspondence): Once your VLS-TS long stay visa is validated, you are  ...

--- Chunk 2 ---
/ / , you validated your VLS-TS long stay visa and paid the fees for your initial residence permit: Type: ETUDIANT Regulatory reference number: CESEDA R311- Fee amount: . Following this process, the f ...



**<h4><u>Step 3:</u> Creating Embeddings and Building a Vector Index</h4>**
> - Utilize a Sentence Transformer model to create the embeddings
> - Store the embeddings in a vector search index (FAISS)
> - Find the most semantically relevant chunks to a user query using a similarity search

In [29]:
#Importing the relevant libraries
from sentence_transformers import SentenceTransformer
import faiss
import numpy as np
import pickle

In [30]:
# ----------------------------------
# Loading pre-trained embedding model
# ----------------------------------
embedding_model = SentenceTransformer("all-MiniLM-L6-v2")

# ----------------------------------
# Creating embeddings from chunks
# ----------------------------------
def create_embeddings(chunks):
    print("🔍 Generating embeddings for all chunks...")
    embeddings = embedding_model.encode(chunks, show_progress_bar=True)
    return np.array(embeddings).astype("float32")

# ----------------------------------
# Normalize Function
# ----------------------------------
def normalize(vectors):
    """
    Normalizing vectors to unit length for cosine similarity
    """
    norms = np.linalg.norm(vectors, axis=1, keepdims=True)
    norms[norms == 0] = 1e-10   #Avoiding zero division error
    return vectors/norms


# ----------------------------------
# Process embedding & creating FAISS vector index
# ----------------------------------
def build_faiss_index(chunks, embedding_model):
    """
    Given text chunks and embedding model, creating a FAISS index using cosine similarity
    """

    #Encoding chunks
    embeddings = embedding_model.encode(chunks)

    #Normalizing embeddings
    embeddings_norm = normalize(embeddings)

    #Creating Index using Inner Product
    index = faiss.IndexFlatIP(embeddings.shape[1])            # IP = Inner Product
    index.add(np.array(embeddings_norm, dtype=np.float32))
    
    dim = embeddings.shape[1]     #embedding dimension
    print(f"✅ FAISS index built with {index.ntotal} vectors of dim {dim}")
    return index, embeddings_norm

# ----------------------------------
# Saving and loading index for reuse
# ----------------------------------
def save_index(index, chunks, path="vector_store"):
    os.makedirs(path, exist_ok=True)
    faiss.write_index(index, os.path.join(path, "index.faiss"))

    with open(os.path.join(path, "chunks.pkl"), "wb") as f:
        pickle.dump(chunks, f)

    print("💾 Vector store saved successfully.")

def load_index(path="vector_store"):
    index = faiss.read_index(os.path.join(path, "index.faiss"))
    with open(os.path.join(path, "chunks.pkl"), "rb") as f:
        chunks = pickle.load(f)

    print("📂 Vector store loaded successfully.")
    return index, chunks

In [31]:
# ----------------------------------
# Example test run
# ----------------------------------
embeddings = create_embeddings(chunks)
index, embeddings_norm = build_faiss_index(chunks, embedding_model)
save_index(index, chunks)

🔍 Generating embeddings for all chunks...


Batches:   0%|          | 0/1 [00:00<?, ?it/s]

✅ FAISS index built with 2 vectors of dim 384
💾 Vector store saved successfully.


**<h4><u>Step 4:</u></h4>**

In [33]:
!{sys.executable} -m pip install hf_xet



In [34]:
from transformers import pipeline, AutoTokenizer

In [35]:
# ----------------------------------
# Load the saved FAISS index and chunks
# ----------------------------------
index, chunks = load_index("vector_store")

📂 Vector store loaded successfully.


In [36]:
# Define model and tokenizer names
qa_model_name = "google/flan-t5-base"
qa_tokenizer = AutoTokenizer.from_pretrained(qa_model_name)
qa_model = pipeline(
    "text2text-generation",
    model=qa_model_name,
    tokenizer=qa_tokenizer,
    device=-1     #Uses CPU
)

MAX_TOKENS = 512  # FLAN-T5 max input length

def trim_context(context, max_tokens=MAX_TOKENS):
    """
    Ensures that the context doesn't exceed the model's max token length.
    """
    tokens = qa_tokenizer.encode(context, truncation=True, max_length=max_tokens)
    return qa_tokenizer.decode(tokens, skip_special_tokens=True)


def answer_question(query, index, chunks, embedding_model=embedding_model, top_k=2):
    """
    Retrieve relevant chunks, construct prompt, and generate answer using the QA model.
    """
    # Step 1: Encoding and normalizing the user query
    query_embedding = embedding_model.encode([query])
    query_embedding_norm = normalize(np.array(query_embedding, dtype=np.float32))

    # Step 2: Searching FAISS index using cosine similarity and returning only relevant result
    distances, indices = index.search(query_embedding_norm, top_k)
        
    # Step 3: Returning relevant chunks
    #Converting to list for easier handling
    scores = distances[0]
    retrieved_indices = indices[0]

    #Setting a similarity threshold
    SIMILARITY_THRESHOLD = 0.11        #Lowered value for normalized embeddings

    #Filtering out irrelevant chunks
    relevant_chunks = [
        chunks[i] for i, score in zip(retrieved_indices, scores)
        if score >= SIMILARITY_THRESHOLD
    ]

    #Optional:
    print("Similarity scores:", scores)

    #If no relevant chunk meets the threshold
    if not relevant_chunks:
        return "🤔 I couldn't find relevant information in the uploaded document to answer the question."
    
    # Step 5: Concatenate retrieved chunks into context
    context = " ".join(relevant_chunks)
    
    # Step 6: Trim context to fit model's token limit
    trimmed_context = trim_context(context)
    
    # Step 7: Create the full prompt
    prompt = (
        f"You are a helpful assistant. Use only the information in the context below to answer.\n"
        f"Context: {trimmed_context}\n\n"
        f"Question: {query}\n\n"
        f"Answer the question concisely based on the context above."
        f"If the answer cannot be found in the context, reply exactly with: "
        f"'🤔 I couldn't find relevant information in the uploaded document to answer the question.'"
    )
    
    # Step 8: Generate answer using the model
    response = qa_model(
        prompt,
        max_new_tokens=100,     # safer than max_length
        temperature=0.7,
        top_p=0.9,
        do_sample=True
    )
    
    # Step 9: Extract the generated text
    answer = response[0]["generated_text"].strip()
    
    # Step 10: Print results
    print(f"\n🧠 Question: {query}")
    print(f"\n💬 Answer:\n {answer}")


Device set to use cpu


In [37]:
#Testing the solution
question = input("Enter your question: ")
answer_question(question, index, chunks)

Enter your question:  When is one supposed to complete their medical visit?


Similarity scores: [ 0.3474173  -0.00118358]

🧠 Question: When is one supposed to complete their medical visit?

💬 Answer:
 within four months of your arrival in France


In [38]:
import gradio as gr
import os
import numpy as np
import faiss
from sentence_transformers import SentenceTransformer
from transformers import AutoTokenizer, pipeline

# ----------------------------------
# Global State
# ----------------------------------
state = {"index": None, "chunks": None}

# ----------------------------------
# Load Embedding Model
# ----------------------------------
embedding_model = SentenceTransformer("all-MiniLM-L6-v2")

# ----------------------------------
# Load QA Model (FLAN-T5)
# ----------------------------------
qa_model_name = "google/flan-t5-base"
qa_tokenizer = AutoTokenizer.from_pretrained(qa_model_name)
qa_model = pipeline(
    "text2text-generation",
    model=qa_model_name,
    tokenizer=qa_tokenizer,
    device=-1    #CPU
)

MAX_TOKENS = 512

# ----------------------------------
# Utility Functions
# ----------------------------------
def trim_context(context, max_tokens=MAX_TOKENS):
    tokens = qa_tokenizer.encode(context, truncation=True, max_length=max_tokens)
    return qa_tokenizer.decode(tokens, skip_special_tokens=True)

def normalize(vectors):
    """
    Normalizing vectors(embeddings) to unit length for cosine similarity
    """
    norms = np.linalg.norm(vectors, axis=1, keepdims=True)
    norms[norms == 0] = 1e-10   #Avoiding zero division error
    return vectors/norms

# ----------------------------------
# Helper Functions
# ----------------------------------
def extract_text(file_path):
    """
    This function extracts text from a document (PDF, DOCX, or TXT).
    It automatically detects the file type by extension.
    """
    file_type = detect_file_type(file_path)
    
    if file_type == ".pdf":         #Call the pdf extractor
        return extract_text_from_pdf(file_path)

    elif file_type == ".docx":     #Call the docx extractor
        return extract_text_from_docx(file_path)

    elif file_type == ".txt":      #Call the txt extractor
        return extract_text_from_txt(file_path)

    else:
        raise ValueError(f"❌Unsupported file type: '{ext}'\n Only .pdf, .docx, or .txt files are supported")

def clean_text(text):
    """Normalize and clean extracted text."""
    #Normalizing line breaks and spaces
    text = re.sub(r'\s+', ' ', text)  #Replaces multiple spaces/newlines with one space

    #Removing continuous dot leaders (e.g., "Introduction ..... 3")
    text = re.sub(r'\.{3,}\s*\d*', ' ', text)

    #Removing spaced dot leaders
    text = re.sub(r'(\.\s){2,}\d*', ' ', text)

    #Removing isolated page numbers (e.g., "Page 2", "2")
    text = re.sub(r'\bPage\s*\d+\b', ' ', text, flags=re.IGNORECASE)
    text = re.sub(r'\b\d+\b', ' ', text)     #Removing stray digits

    #Removing non-ASCII characters
    text = re.sub(r'[^\x00-\x7F]+', ' ', text)  #Removes non-ASCII characters
  
    #Normalizing multiple spaces again
    text = re.sub(r'\s{2,}', ' ', text)

    #Trimming final whitespaces
    text = text.strip()

    #Return the cleaned text
    return text


def chunk_text(text, chunk_size=1000, overlap=100):
    """
    This function splits text into overlapping chunks
    :param text: Cleaned text.
    :param chunk_size: Number of characters per chunk
    :param overlap: Overlap between chunks to maintain context
    """
    chunks = []
    start = 0
    text_length = len(text)

    while start < text_length:
        end = min(start + chunk_size, text_length)
        chunk = text[start:end]
        chunks.append(chunk)
        start += chunk_size - overlap  #Redefining the start for the next iteration

    return chunks

# ----------------------------------
# File Processing
# ----------------------------------
def process_uploaded_file(file_path):
    """
    Save uploaded file, extract text, create chunks, and build embeddings + FAISS index.
    """
    if file_path is None:
        return "⚠️ No file uploaded."

    # Step 1: Extracting text from the document and clean it
    text = extract_text(file_path)
    cleaned_text = clean_text(text)
    chunks = chunk_text(cleaned_text)

    if len(chunks) == 0:
        return "⚠ No readable text found in this document."

    # Step 2: Creating embeddings
    print("🔍 Generating embeddings for uploaded document...")   
    embeddings = embedding_model.encode(chunks)

    #Step 3: Normalizing embeddings (for cosine similarity)
    embeddings_norm = normalize(np.array(embeddings, dtype=np.float32))

    #Step 4: Building FAISS Index (Inner Product for Cosine Similarity)
    index = faiss.IndexFlatIP(embeddings.shape[1])
    index.add(embeddings_norm)

    # Store in global state
    state["index"] = index
    state["chunks"] = chunks

    return f"✅ Document '{os.path.basename(file_path)}' processed and indexed successfully!"
    

# ----------------------------------
# QA Pipeline
# ----------------------------------
def answer_question_gradio(query, index, chunks, embedding_model, top_k=2):
    """Retrieves top_k relevant chunks and generates an answer"""
    
    #if state["index"] is None or state["chunks"] is None:
        #return "⚠️ Knowledge base not loaded. Please upload or load a document first."
    
    # Step 1: Encoding and normalizing the user query
    query_embedding = embedding_model.encode([query])
    query_embedding_norm = normalize(np.array(query_embedding, dtype=np.float32))

    # Step 2: Searching FAISS index using cosine similarity and returning only relevant result
    distances, indices = index.search(query_embedding_norm, top_k)
    
     # Step 3: Returning relevant chunks
    #Converting to list for easier handling
    scores = distances[0]
    retrieved_indices = indices[0]

    #Setting a similarity threshold
    SIMILARITY_THRESHOLD = 0.11        #Lowered value for normalized embeddings

    #Filtering out irrelevant chunks
    relevant_chunks = [
        chunks[i] for i, score in zip(retrieved_indices, scores)
        if score >= SIMILARITY_THRESHOLD
    ]

    #Optional:
    print("Similarity scores:", scores)

    #If no relevant chunk meets the threshold
    if not relevant_chunks:
        return "🤔 I couldn't find relevant information in the uploaded document to answer the question."
      
    # Step 4: Concatenating retrieved chunks into context
    context = " ".join(relevant_chunks)
        
    # Step 5: Trimming context to fit model's token limit
    trimmed_context = trim_context(context)
       
    # Step 6: Creating the prompt
    prompt = (
        f"You are a helpful assistant. Use only the information in the context below to answer.\n"
        f"Context: {trimmed_context}\n\n"
        f"Question: {query}\n\n"
        f"Answer the question concisely based on the context above."
        f"If the answer cannot be found in the context, reply exactly with: "
        f"'🤔 I couldn't find relevant information in the uploaded document to answer the question.'"
    )
    
    # Step 7: Generating an answer
    response = qa_model(
        prompt,
        max_new_tokens=100,
        temperature=0.7,
        top_p=0.9,
        do_sample=True
    )
    
    # Step 8: Returning the final answer (for Gradio)
    answer = response[0]["generated_text"].strip()
    return answer

# ----------------------------------
# Gradio UI Functions
# ----------------------------------


def ask_question_gradio(question):
    """Interface function for Gradio to answer user queries."""
    if state["index"] is None or state["chunks"] is None:
        return "⚠ Please upload and process a document first"

    return answer_question_gradio(
        query=question, 
        index=state["index"], 
        chunks=state["chunks"],
        embedding_model=embedding_model
    )

# ----------------------------------
# Gradio Layout
# ----------------------------------

with gr.Blocks() as demo:
    gr.Markdown("<h2>📚 Mini RAG Chatbot</h2>")
    
    with gr.Tab("Upload & Process Document"):
        file_input = gr.File(
            label="Upload a PDF, DOCX, or TXT file",
            type="filepath",
            file_types=[".pdf", ".docx", ".txt"]
        )
        process_btn = gr.Button("Process Document")
        process_output = gr.Textbox(label="Status", interactive=False)
        process_btn.click(process_uploaded_file, inputs=file_input, outputs=process_output)

    with gr.Tab("Ask Questions"):
        question_input = gr.Textbox(label="Enter your question here")
        ask_btn = gr.Button("💬Ask")
        answer_output = gr.Textbox(label="Answer", interactive=False)
        ask_btn.click(ask_question_gradio, inputs=question_input, outputs=answer_output)

# ----------------------------------
# Launch the app
# ----------------------------------
import nest_asyncio
nest_asyncio.apply()  # Patching asyncio to allow nested event loops in notebooks

demo.launch(
    share=True,
    inbrowser=True,
    prevent_thread_lock=True
)

Device set to use cpu


* Running on local URL:  http://127.0.0.1:7860
* Running on public URL: https://8f5e1191b21fe864b4.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)


