In [1]:
import fitz  # PyMuPDF
import pytesseract
import os

# ----------- STEP 1: Setup Paths ----------- 
PDF_FOLDER = "data"  # Data folder containing the PDF files
BASE_OUTPUT_DIR = "outputs"  # Fixed output directory name

# Ensure the base output directory exists, and it is a directory (not a file)
if not os.path.exists(BASE_OUTPUT_DIR):
    os.makedirs(BASE_OUTPUT_DIR, exist_ok=True)
elif os.path.isfile(BASE_OUTPUT_DIR):
    print(f"[Error] {BASE_OUTPUT_DIR} exists as a file, but it should be a directory. Exiting...")
    exit()

# ----------- STEP 2: Process Each PDF ----------- 
pdf_files = [f for f in os.listdir(PDF_FOLDER) if f.lower().endswith('.pdf')]  # List all PDF files in the data folder

if not pdf_files:
    print("No PDF files found in the 'data' folder. Exiting...")
    exit()

# Process each PDF file one by one
for pdf_file in pdf_files:
    print(f"[*] Processing {pdf_file}...")

    # Setup file-specific paths
    PDF_PATH = os.path.join(PDF_FOLDER, pdf_file)
    IMAGE_DIR = os.path.join(BASE_OUTPUT_DIR, f"{os.path.splitext(pdf_file)[0]}_pages")
    OCR_DIR = os.path.join(BASE_OUTPUT_DIR, f"{os.path.splitext(pdf_file)[0]}_ocr")
    TEXT_PATH = os.path.join(BASE_OUTPUT_DIR, f"{os.path.splitext(pdf_file)[0]}_extracted_text.txt")

    # Ensure that the specific subdirectories for this PDF are created
    os.makedirs(IMAGE_DIR, exist_ok=True)
    os.makedirs(OCR_DIR, exist_ok=True)

    # ----------- STEP 2.1: Extract Text with PyMuPDF ----------- 
    doc = fitz.open(PDF_PATH)
    all_text = ""

    for page_num in range(len(doc)):
        page = doc.load_page(page_num)
        text = page.get_text()
        all_text += f"\n--- Page {page_num + 1} ---\n{text}"

    # Save extracted text
    with open(TEXT_PATH, "w", encoding="utf-8") as f:
        f.write(all_text)

    print(f"[✓] Text extracted to {TEXT_PATH}")

    # ----------- STEP 2.2: Convert PDF to Images using PyMuPDF ----------- 
    print("[*] Converting PDF to images using PyMuPDF...")
    
    for i in range(len(doc)):
        page = doc.load_page(i)
        pix = page.get_pixmap()  # This is the image representation of the page

        # Save image as PNG
        img_path = os.path.join(IMAGE_DIR, f"page_{i + 1}.png")
        pix.save(img_path)
        print(f"[✓] Saved: {img_path}")

    # ----------- STEP 2.3: OCR on Each Page Image ----------- 
    print("[*] Running OCR on page images...")

    for img_file in os.listdir(IMAGE_DIR):
        img_path = os.path.join(IMAGE_DIR, img_file)
        text = pytesseract.image_to_string(img_path)
        
        ocr_file = os.path.join(OCR_DIR, img_file.replace(".png", ".txt"))
        with open(ocr_file, "w", encoding="utf-8") as f:
            f.write(text)
        
        print(f"[✓] OCR saved: {ocr_file}")

    print(f"[✓] Finished processing {pdf_file}\n")

print("[✓] All PDFs processed successfully.")


[*] Processing file1.pdf...
[✓] Text extracted to outputs\file1_extracted_text.txt
[*] Converting PDF to images using PyMuPDF...
[✓] Saved: outputs\file1_pages\page_1.png
[✓] Saved: outputs\file1_pages\page_2.png
[✓] Saved: outputs\file1_pages\page_3.png
[✓] Saved: outputs\file1_pages\page_4.png
[✓] Saved: outputs\file1_pages\page_5.png
[✓] Saved: outputs\file1_pages\page_6.png
[✓] Saved: outputs\file1_pages\page_7.png
[✓] Saved: outputs\file1_pages\page_8.png
[✓] Saved: outputs\file1_pages\page_9.png
[✓] Saved: outputs\file1_pages\page_10.png
[✓] Saved: outputs\file1_pages\page_11.png
[✓] Saved: outputs\file1_pages\page_12.png
[✓] Saved: outputs\file1_pages\page_13.png
[✓] Saved: outputs\file1_pages\page_14.png
[✓] Saved: outputs\file1_pages\page_15.png
[✓] Saved: outputs\file1_pages\page_16.png
[✓] Saved: outputs\file1_pages\page_17.png
[✓] Saved: outputs\file1_pages\page_18.png
[✓] Saved: outputs\file1_pages\page_19.png
[✓] Saved: outputs\file1_pages\page_20.png
[✓] Saved: outputs\f

In [1]:
import os
import torch
from sentence_transformers import SentenceTransformer
from transformers import CLIPProcessor, CLIPModel
from PIL import Image
import numpy as np

# ----------- STEP 3: Setup Embedding Models -----------

# Load the text embedding model (Sentence-BERT)
text_model = SentenceTransformer('all-MiniLM-L6-v2')

# Load the CLIP model and processor for image embeddings
clip_processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")
clip_model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32")

# ----------- STEP 3.1: Embedding Text -----------

def embed_text(text):
    """Embed the text using Sentence-BERT"""
    embeddings = text_model.encode([text])  # Returns a list of embeddings
    return embeddings[0]  # Extract the embedding for the first sentence (since we have one)

# ----------- STEP 3.2: Embedding Images -----------

def embed_image(image_path):
    """Embed the image using CLIP"""
    image = Image.open(image_path)
    inputs = clip_processor(images=image, return_tensors="pt", padding=True)
    outputs = clip_model.get_image_features(**inputs)
    image_embedding = outputs.detach().cpu().numpy()  # Directly use the tensor
    return image_embedding


# ----------- STEP 3.3: Save the Embeddings for Each PDF -----------

# Base output directory for embeddings
VECTOR_DIR = "outputs/embeddings"
os.makedirs(VECTOR_DIR, exist_ok=True)

# Process each PDF (we assume you've already extracted text and saved images)
PDF_FOLDER = "data"
BASE_OUTPUT_DIR = "outputs"

# List all PDF files in the data folder
pdf_files = [f for f in os.listdir(PDF_FOLDER) if f.lower().endswith('.pdf')]

for pdf_file in pdf_files:
    print(f"[*] Processing {pdf_file} for embeddings...")

    # Define specific directories for each PDF
    IMAGE_DIR = os.path.join(BASE_OUTPUT_DIR, f"{os.path.splitext(pdf_file)[0]}_pages")
    TEXT_PATH = os.path.join(BASE_OUTPUT_DIR, f"{os.path.splitext(pdf_file)[0]}_extracted_text.txt")
    EMBEDDINGS_TEXT_PATH = os.path.join(VECTOR_DIR, f"{os.path.splitext(pdf_file)[0]}_text_embeddings.npy")
    EMBEDDINGS_IMAGE_PATH = os.path.join(VECTOR_DIR, f"{os.path.splitext(pdf_file)[0]}_image_embeddings.npy")

    # ----------- STEP 3.4: Embed and Store Text Embeddings -----------

    # Read extracted text
    with open(TEXT_PATH, "r", encoding="utf-8") as f:
        all_text = f.read()

    # Embed the text
    text_embedding = embed_text(all_text)

    # Save text embeddings as a numpy array
    np.save(EMBEDDINGS_TEXT_PATH, text_embedding)
    print(f"[✓] Saved text embeddings to {EMBEDDINGS_TEXT_PATH}")

    # ----------- STEP 3.5: Embed and Store Image Embeddings -----------

    # List all images in the page directory
    image_files = [f for f in os.listdir(IMAGE_DIR) if f.lower().endswith('.png')]
    all_image_embeddings = []

    for img_file in image_files:
        img_path = os.path.join(IMAGE_DIR, img_file)
        image_embedding = embed_image(img_path)
        all_image_embeddings.append(image_embedding)
        print(f"[✓] Processed image: {img_file}")

    # Convert the list of image embeddings into a numpy array
    all_image_embeddings = np.array(all_image_embeddings)

    # Save image embeddings as a numpy array
    np.save(EMBEDDINGS_IMAGE_PATH, all_image_embeddings)
    print(f"[✓] Saved image embeddings to {EMBEDDINGS_IMAGE_PATH}")

    print(f"[✓] Finished processing {pdf_file} for embeddings.\n")

print("[✓] All embeddings processed and saved successfully.")


  from .autonotebook import tqdm as notebook_tqdm
Using a slow image processor as `use_fast` is unset and a slow processor was saved with this model. `use_fast=True` will be the default behavior in v4.52, even if the model was saved with a slow processor. This will result in minor differences in outputs. You'll still be able to use a slow processor with `use_fast=False`.


[*] Processing file1.pdf for embeddings...
[✓] Saved text embeddings to outputs/embeddings\file1_text_embeddings.npy
[✓] Processed image: page_1.png
[✓] Processed image: page_10.png
[✓] Processed image: page_11.png
[✓] Processed image: page_12.png
[✓] Processed image: page_13.png
[✓] Processed image: page_14.png
[✓] Processed image: page_15.png
[✓] Processed image: page_16.png
[✓] Processed image: page_17.png
[✓] Processed image: page_18.png
[✓] Processed image: page_19.png
[✓] Processed image: page_2.png
[✓] Processed image: page_20.png
[✓] Processed image: page_21.png
[✓] Processed image: page_22.png
[✓] Processed image: page_23.png
[✓] Processed image: page_24.png
[✓] Processed image: page_25.png
[✓] Processed image: page_26.png
[✓] Processed image: page_27.png
[✓] Processed image: page_28.png
[✓] Processed image: page_29.png
[✓] Processed image: page_3.png
[✓] Processed image: page_30.png
[✓] Processed image: page_31.png
[✓] Processed image: page_32.png
[✓] Processed image: page_33

To support symlinks on Windows, you either need to activate Developer Mode or to run Python as an administrator. In order to activate developer mode, see this article: https://docs.microsoft.com/en-us/windows/apps/get-started/enable-your-device-for-development


In [37]:
import numpy as np
import faiss
import os
from sklearn.preprocessing import normalize
from sentence_transformers import SentenceTransformer
from PIL import Image
from transformers import CLIPProcessor, CLIPModel
import torch  # Ensure this is imported for tensor operations

# ----------- Configuration ----------- 
EMBEDDING_FOLDER = "outputs/embeddings"

# ----------- Load Embeddings ----------- 
def load_embeddings(file_path):
    return np.load(file_path)

def get_embedding_files(folder):
    return [os.path.join(folder, f) for f in os.listdir(folder) if f.endswith('.npy')]

embedding_files = get_embedding_files(EMBEDDING_FOLDER)
text_embeddings = [load_embeddings(f) for f in embedding_files if 'text' in f]
image_embeddings = [load_embeddings(f) for f in embedding_files if 'image' in f]

all_text_embeddings = np.concatenate(text_embeddings, axis=0) if text_embeddings else np.array([])
all_image_embeddings = np.concatenate(image_embeddings, axis=0) if image_embeddings else np.array([])

if all_text_embeddings.size:
    all_text_embeddings = all_text_embeddings.reshape(-1, all_text_embeddings.shape[-1])
    all_text_embeddings = normalize(all_text_embeddings)

if all_image_embeddings.size:
    all_image_embeddings = all_image_embeddings.reshape(-1, all_image_embeddings.shape[-1])
    all_image_embeddings = normalize(all_image_embeddings)

# ----------- FAISS Indexing ----------- 
def create_faiss_index(embedding_dim):
    return faiss.IndexFlatL2(embedding_dim)

embedding_dim_text = all_text_embeddings.shape[1] if all_text_embeddings.size else None
embedding_dim_image = all_image_embeddings.shape[1] if all_image_embeddings.size else None

if embedding_dim_text and embedding_dim_image and embedding_dim_text != embedding_dim_image:
    print("[Warning] Text and image embeddings have different dimensions. Creating separate indexes.")

text_index = create_faiss_index(embedding_dim_text) if all_text_embeddings.size else None
image_index = create_faiss_index(embedding_dim_image) if all_image_embeddings.size else None

if text_index is not None:
    text_index.add(all_text_embeddings.astype(np.float32))

if image_index is not None:
    image_index.add(all_image_embeddings.astype(np.float32))

print("[✓] FAISS indexes created and embeddings added.")

# ----------- Determine Number of PDFs ----------- 
num_pdfs = len(set(f.split('_')[0] for f in os.listdir(EMBEDDING_FOLDER) if f.endswith('.npy')))

# ----------- Search Function ----------- 
def search(query_embedding, index, top_k=5):
    distances, indices = index.search(query_embedding.astype(np.float32), top_k)
    return distances, indices

# ----------- Text Embedding Model ----------- 
text_model = SentenceTransformer('all-MiniLM-L6-v2')

# ----------- Image Embedding Model ----------- 
clip_model = CLIPModel.from_pretrained("openai/clip-vit-base-patch16")
clip_processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch16")

def embed_text(query):
    # Split query into 3 equal parts for consistent embedding structure
    parts = [query[i::3] for i in range(3)]  # Simple split into 3 parts
    embeddings = []

    for part in parts:
        inputs = clip_processor(text=[part], return_tensors="pt", padding=True, truncation=True)
        with torch.no_grad():
            outputs = clip_model.get_text_features(**inputs)
        embeddings.append(outputs.cpu().numpy())

    # Concatenate all embeddings (each is (1, 512)) → result (1, 1536)
    concatenated = np.concatenate(embeddings, axis=1)

    # Truncate to 1152 (if that's what was done during storage)
    embedding = concatenated[:, :1152]

    return normalize(embedding)

def embed_image(image_path):
    image = Image.open(image_path)
    inputs = clip_processor(images=image, return_tensors="pt", padding=True)
    outputs = clip_model.get_image_features(**inputs)
    embedding = outputs.detach().cpu().numpy()
    return normalize(embedding)

# ----------- Take Query from User ----------- 
query_type = input("Enter query type (text/image): ").strip().lower()

if query_type == "text":
    query_text = input("Enter your text query: ")
    query_text_embedding = embed_text(query_text)

    if query_text_embedding.shape[1] != embedding_dim_text:
        raise ValueError(f"Query embedding dimension {query_text_embedding.shape[1]} does not match index dimension {embedding_dim_text}.")

    if text_index is not None:
        top_k = num_pdfs  # Ensure it's the number of PDFs, not just the number of index entries
        print("[*] Searching for text query...")
        text_distances, text_indices = search(query_text_embedding, text_index, top_k=top_k)

        print("\nTop results for text query:")
        found_valid_results = False
        for i, (idx, dist) in enumerate(zip(text_indices[0], text_distances[0])):
            if idx != -1:  # Check if the result is valid
                pdf_name = pdf_names[idx] if idx < len(pdf_names) else "Unknown PDF"
                print(f"Rank {i+1}: PDF File '{pdf_name}', Distance {dist:.4f}")
                found_valid_results = True
            else:
                print(f"Rank {i+1}: No valid result found.")
        
        if not found_valid_results:
            print("No valid results found for the text query.")

elif query_type == "image":
    image_path = input("Enter path to image query: ").strip()
    query_image_embedding = embed_image(image_path)

    if query_image_embedding.shape[1] != embedding_dim_image:
        raise ValueError(f"Image embedding dimension {query_image_embedding.shape[1]} does not match index dimension {embedding_dim_image}.")

    if image_index is not None:
        top_k = num_pdfs  # Ensure it's the number of PDFs, not just the number of index entries
        print("[*] Searching for image query...")
        image_distances, image_indices = search(query_image_embedding, image_index, top_k=top_k)

        print("\nTop results for image query:")
        found_valid_results = False
        for i, (idx, dist) in enumerate(zip(image_indices[0], image_distances[0])):
            if idx != -1:  # Check if the result is valid
                pdf_name = pdf_names[idx] if idx < len(pdf_names) else "Unknown PDF"
                print(f"Rank {i+1}: PDF File '{pdf_name}', Distance {dist:.4f}")
                found_valid_results = True
            else:
                print(f"Rank {i+1}: No valid result found.")
        
        if not found_valid_results:
            print("No valid results found for the image query.")

else:
    print("❌ Invalid query type. Please enter 'text' or 'image'.")


[*] Searching for text query...

Top results for text query:
Rank 1: PDF File 'file1', Distance 1.9480
Rank 2: No valid result found.
Rank 3: No valid result found.


In [None]:
import os
import streamlit as st
from google import genai
from PyPDF2 import PdfReader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from dotenv import load_dotenv

# Set your API key for Gemini
load_dotenv()
GEMINI_API_KEY = 'AIzaSyC89YzwZQEzKiFCl-8tz8PIxXTUvD8RzQM'

# Initialize the Gemini client
client = genai.Client(api_key=GEMINI_API_KEY)

# ----------- Function to Generate Text Using Gemini ----------- #
def generate_text_from_gemini(query, retrieved_docs):
    """
    Generate a response using Gemini by combining the query and retrieved documents.
    
    Args:
    - query (str): The user's query or question.
    - retrieved_docs (list of str): List of retrieved documents (context) from the database.
    
    Returns:
    - response (str): The generated response based on the query and context.
    """
    
    # Combine the query with the retrieved documents to form a context string
    context = "\n".join(retrieved_docs)  # Join documents into a single context string
    prompt = f"Query: {query}\nContext:\n{context}\nAnswer:"

    try:
        # Generate content using Gemini API
        response = client.models.generate_content(
            model="gemini-2.5-pro-exp-03-25",  # Replace with the appropriate model name if different
            contents=prompt
        )
        
        # Extract the generated text from the response
        generated_text = response.text.strip() if response.text else 'Error: No valid response.'
        return generated_text
    
    except Exception as e:
        print(f"Error generating text: {e}")
        return "Error generating response."

# ----------- Function to Extract Text from PDF Files ----------- #
def get_pdf_text(pdf_docs):
    """Extract text from PDF files"""
    text = ""
    for pdf in pdf_docs:
        pdf_reader = PdfReader(pdf)
        for page in pdf_reader.pages:
            text += page.extract_text() or ""  
    return text

# ----------- Function to Split Text into Chunks ----------- #
def get_text_chunks(text):
    """Split text into chunks for better processing"""
    text_splitter = RecursiveCharacterTextSplitter(chunk_size=15000, chunk_overlap=100)
    return text_splitter.split_text(text)

# ----------- Function to Load All .txt Files from Outputs/ ----------- #
def load_text_from_file(file_path):
    """Load text content from a file"""
    if os.path.exists(file_path):
        with open(file_path, 'r', encoding='utf-8') as file:
            return file.read()
    return ""

# ----------- Streamlit UI Setup ----------- #
st.title("RAG Bot: Ask Questions from PDF Documents")
st.markdown("""
    Upload multiple PDF files, and the bot will answer your questions based on the content.
    """)

# File uploader for PDFs
pdf_docs = st.file_uploader("Upload Your PDF Files", accept_multiple_files=True, type="pdf", key="pdf_uploader")

if pdf_docs:
    with st.spinner("Processing PDFs..."):
        # Extract and process text from PDFs
        extracted_text = get_pdf_text(pdf_docs)
        text_chunks = get_text_chunks(extracted_text)
        
        # Save extracted text to the outputs/ directory
        output_dir = 'outputs/'
        os.makedirs(output_dir, exist_ok=True)
        extracted_file_path = os.path.join(output_dir, "extracted_text.txt")
        with open(extracted_file_path, 'w', encoding='utf-8') as f:
            f.write(extracted_text)
        
        st.success("PDFs processed successfully!")

# Fetch documents for RAG from all extracted_text.txt files in the outputs/ directory
output_dir = 'outputs/'
retrieved_docs = []
for filename in os.listdir(output_dir):
    if filename.endswith("extracted_text.txt"):
        file_path = os.path.join(output_dir, filename)
        retrieved_docs.append(load_text_from_file(file_path))

# User question input
user_query = st.text_input("Ask a question related to the uploaded documents:")

# When the user asks a question
if user_query and retrieved_docs:
    with st.spinner("Generating response..."):
        # Generate response using Gemini
        response = generate_text_from_gemini(user_query, retrieved_docs)
        
        # Display the response
        st.write("### Answer:")
        st.write(response)

# If no PDFs are uploaded
if not pdf_docs:
    st.warning("Please upload one or more PDF files to get started.")


In [1]:
from nltk.translate.bleu_score import sentence_bleu
from rouge_score import rouge_scorer

def evaluate_bleu_rouge(generated_response, reference_response):
    """
    Evaluate BLEU and ROUGE scores for the generated response.

    Args:
    - generated_response (str): Response generated by the system.
    - reference_response (str): Ideal/reference response.

    Returns:
    - dict: BLEU and ROUGE scores.
    """
    # BLEU Score
    bleu_score = sentence_bleu(
        [reference_response.split()],  # Reference is tokenized
        generated_response.split()     # Generated response is tokenized
    )

    # ROUGE Scores
    rouge = rouge_scorer.RougeScorer(['rouge1', 'rouge2', 'rougeL'], use_stemmer=True)
    rouge_scores = rouge.score(reference_response, generated_response)

    return {
        "BLEU": bleu_score,
        "ROUGE-1": rouge_scores['rouge1'].fmeasure,
        "ROUGE-2": rouge_scores['rouge2'].fmeasure,
        "ROUGE-L": rouge_scores['rougeL'].fmeasure
    }
