In [1]:
# Section 1: Imports

import os
import re
import glob
import fitz  # PyMuPDF
import faiss
import numpy as np
import gradio as gr
import google.genai as genai
from dotenv import load_dotenv
from datetime import datetime
from sentence_transformers import SentenceTransformer

print("Libraries imported successfully")

Libraries imported successfully


In [2]:
# Section 2: Load environment variables

load_dotenv()
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")
CONTENT_FOLDER_PATH = os.getenv("CONTENT_FOLDER_PATH")
DOCUMENT_TITLE = os.getenv("DOCUMENT_TITLE", "Uploaded Documents")

print("Environment variables Loaded successfully")

Environment variables Loaded successfully


In [3]:
# Section 3: Load text content from multiple PDF and TXT files in a folder

def load_documents(folder_path):
    if not folder_path:
        raise ValueError("Folder path is not set.")

    all_text = ""
    pdf_files = glob.glob(os.path.join(folder_path, "*.pdf"))
    txt_files = glob.glob(os.path.join(folder_path, "*.txt"))
    total_files = 0

    # Load PDFs
    for pdf_file in pdf_files:
        try:
            doc = fitz.open(pdf_file)
            for page in doc:
                all_text += page.get_text()
            total_files += 1
        except Exception as e:
            print(f"Error reading PDF {pdf_file}: {e}")

    # Load TXTs
    for txt_file in txt_files:
        try:
            with open(txt_file, "r", encoding="utf-8") as f:
                all_text += f.read() + "\n"
            total_files += 1
        except Exception as e:
            print(f"Error reading TXT {txt_file}: {e}")

    if total_files == 0:
        raise FileNotFoundError(f"No PDF or TXT files found in folder: {folder_path}")

    print(f"Loaded text from {total_files} document(s) in '{folder_path}'")
    return all_text

# Load and confirm success
notes_text = load_documents(CONTENT_FOLDER_PATH)

Loaded text from 1 document(s) in 'source_materials'


In [4]:
# Section 4: Split text into chunks

# Recommended defaults for good context balance with Gemini (chunk_size=400, overlap=100)
def split_text(text, chunk_size=400, overlap=100):
    sentences = re.split(r'(?<=[.!?])\s+', text)
    chunks = []
    current_chunk = []

    for sentence in sentences:
        current_chunk.append(sentence.strip())
        word_count = len(" ".join(current_chunk).split())

        if word_count >= chunk_size:
            chunk = " ".join(current_chunk).strip()
            chunks.append(chunk)

            if overlap > 0:
                overlap_words = chunk.split()[-overlap:]
                current_chunk = [" ".join(overlap_words)]
            else:
                current_chunk = []

    if current_chunk:
        leftover = " ".join(current_chunk).strip()
        if len(leftover.split()) > overlap:
            chunks.append(leftover)
            
    # For debugging: Show total number of chunks and preview first few for inspection
    # print(f"Total Chunks Created: {len(chunks)}")
    # for i, ch in enumerate(chunks[:3]):
    #     print(f"Chunk {i+1} Preview: {ch[:200]}...\n")

    return chunks

chunks = split_text(notes_text)

In [5]:
# Section 5: Generate embeddings and set up FAISS index

embed_model = SentenceTransformer('all-MiniLM-L6-v2')  # Fast & lightweight embedding model
embeddings = embed_model.encode(chunks, show_progress_bar=True)
dimension = embeddings.shape[1]

# Use HNSW index (M=32: controls the number of graph links per node)
index = faiss.IndexHNSWFlat(dimension, 32)
index.hnsw.efConstruction = 200
index.add(np.array(embeddings))

print(f"Embeddings generated and added to FAISS index (total: {len(embeddings)})")

Batches:   0%|          | 0/1 [00:00<?, ?it/s]

Embeddings generated and added to FAISS index (total: 5)


In [6]:
# Section 6: Define Search Function

def search_notes(question, top_k=5, max_distance_threshold=0.3, enable_fallback=True):
    """
    Retrieves the most relevant chunks based on vector similarity.

    Args:
        question (str): User question.
        top_k (int): Number of chunks to retrieve.
        max_distance_threshold (float): Maximum distance (smaller = more similar).
        enable_fallback (bool): If True, returns the top result even if below threshold.

    Returns:
        list of (chunk, distance) tuples: Filtered chunks most relevant to the question.
    """
    question_vec = embed_model.encode([question])
    distances, indices = index.search(np.array(question_vec), top_k)

    results = [
        (chunks[i], dist)
        for i, dist in zip(indices[0], distances[0])
        if i < len(chunks) and dist < max_distance_threshold
    ]

    if enable_fallback and not results and indices[0][0] < len(chunks):
        fallback_index = indices[0][0]
        results.append((chunks[fallback_index], distances[0][0]))

    #For debugging: print number of matched chunks and top match distance
    # print(f"Retrieved {len(results)} chunk(s) for question: \"{question}\"")
    # if results:
    #     print(f"Top match distance: {results[0][1]:.4f}")

    return results

In [7]:
# Section 7: Connect to Gemini model

if not GEMINI_API_KEY:
    raise ValueError("GEMINI_API_KEY is not set in the .env file")

client = genai.Client(api_key=GEMINI_API_KEY)

In [8]:
# Section 8: Define Answer Generation Function

def generate_answer(question, top_k=5):
    context_chunks = search_notes(question, top_k=top_k)
    context = "\n".join([chunk for chunk, _ in context_chunks])

    # Limit context to avoid token overflow
    if len(context.split()) > 400:
        context = " ".join(context.split()[:400])

    prompt = f"""You are a helpful, knowledgeable, and concise assistant. Answer the following question using only the information provided in the context below.

Use complete sentences or point form as appropriate. If the answer is not found in the context, respond with:
"I'm sorry, but the information needed to answer this question is not available in the provided documents."

Context:
{context}

Question: {question}

Answer:"""

    try:
        response = client.models.generate_content(
            model="gemini-2.0-flash",
            contents=[prompt]
        )
        answer = response.text.strip()
    except Exception as e:
        return f"Gemini API error: {e}"

    def is_fallback_answer(text):
        if not text or text.strip() == "":
            return True
        normalized = text.lower().strip()
        fallback_phrases = [
            "i cannot answer", "not available", "not enough information",
            "i don't know", "do not know", "not covered"
        ]
        return text.startswith(".") or text.count(".") > 10 or any(phrase in normalized for phrase in fallback_phrases)

    if is_fallback_answer(answer):
        return "Sorry, I couldn't find an answer based on the available content."

    return answer

In [24]:
# Section 9: Set Up Gradio Chatbot UI

with gr.Blocks(title=f"Q&A Assistant for {DOCUMENT_TITLE}") as demo:
    gr.Markdown(f"## Q&A Assistant for {DOCUMENT_TITLE}")
    gr.Markdown(f"Ask any question related to your {DOCUMENT_TITLE.lower()}. The assistant will only use the provided content to generate answers.")

    with gr.Row():
        question_input = gr.Textbox(
            label="Your Question",
            placeholder=f"Ask a question based on your {DOCUMENT_TITLE.lower()}...",
            lines=3,
            scale=3
        )
        submit_btn = gr.Button("Get Answer", scale=1)

    with gr.Row():
        answer_output = gr.Textbox(
            label="Answer",
            lines=8,
            max_lines=12,
            interactive=False,
            show_copy_button=True
        )

    def chatbot_interface(question):
        if not question.strip():
            return "Please enter a valid question."

        answer = generate_answer(question)

        with open("answer_log.txt", "a", encoding="utf-8") as f:
            f.write(f"Q: {question}\nA: {answer}\n{datetime.now()}\n\n")

        return answer

    submit_btn.click(
        fn=chatbot_interface,
        inputs=question_input,
        outputs=answer_output,
        show_progress=True
    )

    demo.launch(share=False)

* Running on local URL:  http://127.0.0.1:7863
* To create a public link, set `share=True` in `launch()`.
