In [21]:
import os
import fitz  # PyMuPDF
from pdf2image import convert_from_path
from PIL import Image
import pytesseract
import numpy as np
import torch
from sentence_transformers import SentenceTransformer
from transformers import CLIPProcessor, CLIPModel
import faiss
import pickle
import gradio as gr
from typing import List, Dict, Tuple
import warnings
warnings.filterwarnings('ignore')

print("‚úÖ Libraries imported!")


‚úÖ Libraries imported!


In [None]:
# Local PDF loader (replace Colab upload)
import os
from pathlib import Path
import pytesseract

# Load PDFs from the local `data/pdfs/` folder
pdf_dir = Path('data') / 'pdfs'
pdf_files = [str(p) for p in pdf_dir.glob('*.pdf')] if pdf_dir.exists() else []
print(f"\nüìÅ Found {len(pdf_files)} PDF files: {pdf_files}")

# Tesseract (Windows) - automatically set if installed in default location
tesseract_default = Path(r'C:\Program Files\Tesseract-OCR\tesseract.exe')
if tesseract_default.exists():
    pytesseract.pytesseract.tesseract_cmd = str(tesseract_default)
    print(f"‚ÑπÔ∏è Set pytesseract executable to {tesseract_default}")
else:
    print("‚ÑπÔ∏è Tesseract not found at default path. If OCR fails, install Tesseract and set pytesseract.pytesseract.tesseract_cmd manually.")

# Poppler note for pdf2image on Windows
print("‚ÑπÔ∏è If `pdf2image.convert_from_path` fails on Windows, set the `poppler_path` variable to the Poppler `bin` folder (example: r'C:\\path\\to\\poppler\\bin').")


üì§ Please upload your 3 PDF files:


Saving 1. Annual Report 2023-24.pdf to 1. Annual Report 2023-24.pdf
Saving 2. financials.pdf to 2. financials.pdf
Saving 3. FYP-Handbook-2023.pdf to 3. FYP-Handbook-2023.pdf

‚úÖ Uploaded 3 PDF files: ['1. Annual Report 2023-24.pdf', '2. financials.pdf', '3. FYP-Handbook-2023.pdf']


In [18]:
print("Loading embedding models... (this may take 1-2 minutes)")

# Text embedding model (Sentence-BERT)
text_encoder = SentenceTransformer('all-MiniLM-L6-v2')
print("‚úÖ Text encoder loaded (Sentence-BERT)")

# Image embedding model (CLIP)
clip_model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32")
clip_processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")
print("‚úÖ Image encoder loaded (CLIP)")


Loading embedding models... (this may take 1-2 minutes)
‚úÖ Text encoder loaded (Sentence-BERT)
‚úÖ Image encoder loaded (CLIP)


In [7]:
class DocumentProcessor:
    def __init__(self):
        self.text_chunks = []
        self.image_chunks = []
        self.metadata = []

    def extract_from_pdfs(self, pdf_paths: List[str]):
        """Extract text and images from PDFs"""
        for pdf_path in pdf_paths:
            print(f"\nüìÑ Processing: {pdf_path}")
            doc = fitz.open(pdf_path)

            for page_num in range(len(doc)):
                page = doc[page_num]

                # Extract text
                text = page.get_text()
                if text.strip():
                    # Chunk text into paragraphs
                    paragraphs = [p.strip() for p in text.split('\n\n') if len(p.strip()) > 50]
                    for para in paragraphs:
                        self.text_chunks.append(para)
                        self.metadata.append({
                            'type': 'text',
                            'source': pdf_path,
                            'page': page_num + 1
                        })

                # Extract images
                images = page.get_images()
                for img_index, img in enumerate(images):
                    try:
                        xref = img[0]
                        base_image = doc.extract_image(xref)
                        image_bytes = base_image["image"]

                        # Save image temporarily
                        img_path = f"temp_img_p{page_num}_{img_index}.png"
                        with open(img_path, "wb") as f:
                            f.write(image_bytes)

                        # OCR to extract text from image
                        pil_img = Image.open(img_path)
                        ocr_text = pytesseract.image_to_string(pil_img)

                        self.image_chunks.append({
                            'path': img_path,
                            'ocr_text': ocr_text,
                            'image': pil_img
                        })
                        self.metadata.append({
                            'type': 'image',
                            'source': pdf_path,
                            'page': page_num + 1,
                            'index': img_index
                        })
                    except:
                        pass

            doc.close()

        print(f"\n‚úÖ Extracted {len(self.text_chunks)} text chunks and {len(self.image_chunks)} images")
        return self.text_chunks, self.image_chunks, self.metadata

# Process PDFs
processor = DocumentProcessor()
text_chunks, image_chunks, metadata = processor.extract_from_pdfs(pdf_files)



üìÑ Processing: 1. Annual Report 2023-24.pdf

üìÑ Processing: 2. financials.pdf

üìÑ Processing: 3. FYP-Handbook-2023.pdf

‚úÖ Extracted 148 text chunks and 322 images


In [8]:
def generate_text_embeddings(texts: List[str], model) -> np.ndarray:
    """Generate embeddings for text chunks"""
    print("üîÑ Generating text embeddings...")
    embeddings = model.encode(texts, show_progress_bar=True)
    return embeddings

def generate_image_embeddings(image_data: List[Dict], clip_model, clip_processor) -> np.ndarray:
    """Generate embeddings for images using CLIP"""
    print("üîÑ Generating image embeddings...")
    embeddings = []

    for img_data in image_data:
        try:
            # Process image with CLIP
            inputs = clip_processor(images=img_data['image'], return_tensors="pt")
            with torch.no_grad():
                image_features = clip_model.get_image_features(**inputs)
            embeddings.append(image_features.numpy().flatten())
        except:
            # Fallback: use zero embedding
            embeddings.append(np.zeros(512))

    return np.array(embeddings)

# Generate embeddings
text_embeddings = generate_text_embeddings(text_chunks, text_encoder)
image_embeddings = generate_image_embeddings(image_chunks, clip_model, clip_processor)

print(f"‚úÖ Text embeddings shape: {text_embeddings.shape}")
print(f"‚úÖ Image embeddings shape: {image_embeddings.shape}")


üîÑ Generating text embeddings...


Batches:   0%|          | 0/5 [00:00<?, ?it/s]

üîÑ Generating image embeddings...


The channel dimension is ambiguous. Got image shape (3, 2, 3). Assuming channels are the first dimension. Use the [input_data_format](https://huggingface.co/docs/transformers/main/internal/image_processing_utils#transformers.image_transforms.rescale.input_data_format) parameter to assign the channel dimension.
The channel dimension is ambiguous. Got image shape (3, 2, 3). Assuming channels are the first dimension. Use the [input_data_format](https://huggingface.co/docs/transformers/main/internal/image_processing_utils#transformers.image_transforms.rescale.input_data_format) parameter to assign the channel dimension.
The channel dimension is ambiguous. Got image shape (3, 2, 3). Assuming channels are the first dimension. Use the [input_data_format](https://huggingface.co/docs/transformers/main/internal/image_processing_utils#transformers.image_transforms.rescale.input_data_format) parameter to assign the channel dimension.
The channel dimension is ambiguous. Got image shape (3, 2, 3). A

‚úÖ Text embeddings shape: (148, 384)
‚úÖ Image embeddings shape: (322, 512)


In [9]:
class VectorDatabase:
    def __init__(self):
        self.text_index = None
        self.image_index = None
        self.text_chunks = []
        self.image_chunks = []
        self.metadata = []

    def build_indexes(self, text_emb, image_emb, texts, images, meta):
        """Build FAISS indexes for text and images"""
        # Text index
        dimension_text = text_emb.shape[1]
        self.text_index = faiss.IndexFlatL2(dimension_text)
        self.text_index.add(text_emb.astype('float32'))

        # Image index
        dimension_image = image_emb.shape[1]
        self.image_index = faiss.IndexFlatL2(dimension_image)
        self.image_index.add(image_emb.astype('float32'))

        self.text_chunks = texts
        self.image_chunks = images
        self.metadata = meta

        print(f"‚úÖ Built FAISS indexes with {len(texts)} texts and {len(images)} images")

    def search_text(self, query: str, k: int = 5):
        """Search for relevant text chunks"""
        query_emb = text_encoder.encode([query])
        distances, indices = self.text_index.search(query_emb.astype('float32'), k)

        results = []
        for idx, dist in zip(indices[0], distances[0]):
            if idx < len(self.text_chunks):
                results.append({
                    'content': self.text_chunks[idx],
                    'metadata': self.metadata[idx],
                    'score': float(1 / (1 + dist))  # Convert distance to similarity
                })
        return results

    def search_image(self, query: str, k: int = 3):
        """Search for relevant images based on text query"""
        # Encode text query with CLIP
        inputs = clip_processor(text=[query], return_tensors="pt", padding=True)
        with torch.no_grad():
            text_features = clip_model.get_text_features(**inputs)
        query_emb = text_features.numpy()

        distances, indices = self.image_index.search(query_emb.astype('float32'), k)

        results = []
        for idx, dist in zip(indices[0], distances[0]):
            if idx < len(self.image_chunks):
                img_idx = len(self.text_chunks) + idx
                results.append({
                    'image': self.image_chunks[idx],
                    'metadata': self.metadata[img_idx] if img_idx < len(self.metadata) else {},
                    'score': float(1 / (1 + dist))
                })
        return results

# Build vector database
vector_db = VectorDatabase()
vector_db.build_indexes(text_embeddings, image_embeddings, text_chunks, image_chunks, metadata)


‚úÖ Built FAISS indexes with 148 texts and 322 images


In [22]:
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM

# Load free text generation model (runs on Colab GPU)
print("üîÑ Loading free LLM model... (may take 2-3 minutes)")
model_name = "google/flan-t5-large"
tokenizer = AutoTokenizer.from_pretrained(model_name)
llm_model = AutoModelForSeq2SeqLM.from_pretrained(model_name)

# Move to GPU if available
device = "cuda" if torch.cuda.is_available() else "cpu"
llm_model = llm_model.to(device)
print(f"‚úÖ LLM loaded on {device}!")

def generate_answer_with_context(query: str, context_chunks: List[Dict]) -> str:
    """Generate answer using free HuggingFace model"""

    # Build context from retrieved chunks (limit to fit model context window)
    context_text = "\n\n".join([
        f"Document: {chunk['metadata'].get('source', 'Unknown')}, Page {chunk['metadata'].get('page', 'N/A')}\n{chunk['content'][:400]}"
        for chunk in context_chunks[:3]  # Top 3 chunks
    ])

    # Create prompt for T5 model (works well with instruction format)
    prompt = f"""Based on the following context from documents, answer the question accurately and concisely.

Context:
{context_text}

Question: {query}

Answer:"""

    try:
        # Tokenize and generate
        inputs = tokenizer(prompt, return_tensors="pt", max_length=1024, truncation=True).to(device)
        outputs = llm_model.generate(
            inputs.input_ids,
            max_length=256,
            num_beams=4,
            temperature=0.7,
            do_sample=True,
            top_p=0.9
        )
        answer = tokenizer.decode(outputs[0], skip_special_tokens=True)

        # Add source citations
        sources = [f"{c['metadata'].get('source', 'Unknown')} (Page {c['metadata'].get('page', 'N/A')})"
                   for c in context_chunks[:3]]
        answer += f"\n\nSources: {', '.join(sources)}"

        return answer
    except Exception as e:
        return f"Error generating response: {str(e)}"

def rag_query(query: str) -> Tuple[str, List[Dict], List[Dict]]:
    """Complete RAG pipeline"""
    print(f"\nüîç Query: {query}")

    # Retrieve relevant text chunks
    text_results = vector_db.search_text(query, k=5)

    # Retrieve relevant images
    image_results = vector_db.search_image(query, k=2)

    # Generate answer
    answer = generate_answer_with_context(query, text_results)

    return answer, text_results, image_results

# Test the RAG system
test_query = "What is the total revenue mentioned in the financial report?"
answer, texts, images = rag_query(test_query)
print(f"\nüí° Answer:\n{answer}")


üîÑ Loading free LLM model... (may take 2-3 minutes)


tokenizer_config.json: 0.00B [00:00, ?B/s]

spiece.model:   0%|          | 0.00/792k [00:00<?, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

special_tokens_map.json: 0.00B [00:00, ?B/s]

config.json:   0%|          | 0.00/662 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/3.13G [00:00<?, ?B/s]

generation_config.json:   0%|          | 0.00/147 [00:00<?, ?B/s]

‚úÖ LLM loaded on cuda!

üîç Query: What is the total revenue mentioned in the financial report?

üí° Answer:
Revenues

Sources: 2. financials.pdf (Page 1), 2. financials.pdf (Page 1), 2. financials.pdf (Page 1)


In [23]:
def chatbot_interface(user_query):
    """Gradio chatbot function"""
    if not user_query.strip():
        return "Please enter a question.", ""

    # Run RAG query
    answer, text_results, image_results = rag_query(user_query)

    # Format retrieved chunks for display
    sources = "\n\n---\n\n**Retrieved Sources:**\n\n"
    for i, result in enumerate(text_results[:3], 1):
        sources += f"**[{i}] {result['metadata'].get('source', 'Unknown')} (Page {result['metadata'].get('page', 'N/A')})**\n"
        sources += f"Score: {result['score']:.3f}\n"
        sources += f"{result['content'][:200]}...\n\n"

    return answer, sources

# Create Gradio UI
with gr.Blocks(title="Multimodal RAG System", theme=gr.themes.Soft()) as demo:
    gr.Markdown("# ü§ñ Multimodal RAG System\nAsk questions about your PDF documents!")

    with gr.Row():
        with gr.Column():
            query_input = gr.Textbox(
                label="Enter your question",
                placeholder="e.g., What is the total revenue in 2023?",
                lines=3
            )
            submit_btn = gr.Button("üîç Search & Answer", variant="primary")

    with gr.Row():
        with gr.Column():
            answer_output = gr.Textbox(label="Answer", lines=10)

        with gr.Column():
            sources_output = gr.Textbox(label="Retrieved Context", lines=10)

    # Examples
    gr.Examples(
        examples=[
            ["What are the key financial highlights from the annual report?"],
            ["What is the FYP process described in the handbook?"],
            ["Show me information about revenue or expenditure"]
        ],
        inputs=query_input
    )

    submit_btn.click(
        fn=chatbot_interface,
        inputs=query_input,
        outputs=[answer_output, sources_output]
    )

# Launch the interface
demo.launch(share=True, debug=True)


Colab notebook detected. This cell will run indefinitely so that you can see errors and logs. To turn off, set debug=False in launch().
* Running on public URL: https://2eeec70341ec0feab8.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)



üîç Query: Explain the FYP evaluation criteria

üîç Query: tell me everything you know about fyp


üîç Query: What are the key highlights from 2023 annual report?
Keyboard interruption in main thread... closing server.
Killing tunnel 127.0.0.1:7860 <> https://2eeec70341ec0feab8.gradio.live




In [24]:
#CELL 10: Add Image Upload to Gradio UI

def chatbot_with_image(user_query, uploaded_image):
    """Enhanced chatbot with image upload support"""
    if not user_query.strip() and uploaded_image is None:
        return "Please enter a question or upload an image.", ""

    # Handle image query
    if uploaded_image is not None:
        # Convert uploaded image to PIL
        pil_image = Image.fromarray(uploaded_image)

        # Get CLIP embedding for uploaded image
        inputs = clip_processor(images=pil_image, return_tensors="pt")
        with torch.no_grad():
            image_features = clip_model.get_image_features(**inputs)
        query_emb = image_features.numpy()

        # Search in image database
        distances, indices = vector_db.image_index.search(query_emb.astype('float32'), 3)

        # Also search text with query
        if user_query.strip():
            text_results = vector_db.search_text(user_query, k=3)
        else:
            text_results = []
            user_query = "Describe this image and related information"
    else:
        # Text-only query
        answer, text_results, image_results = rag_query(user_query)
        sources = format_sources(text_results)
        return answer, sources

    # Generate answer with image context
    answer, text_results, image_results = rag_query(user_query)
    sources = format_sources(text_results)
    return answer, sources

def format_sources(text_results):
    """Format retrieved sources with metadata"""
    sources = "\n\n---\n\n**Retrieved Sources:**\n\n"
    for i, result in enumerate(text_results[:3], 1):
        sources += f"**[{i}] {result['metadata'].get('source', 'Unknown')} (Page {result['metadata'].get('page', 'N/A')})**\n"
        sources += f"Relevance Score: {result['score']:.3f}\n"
        sources += f"Preview: {result['content'][:250]}...\n\n"
    return sources

# Enhanced Gradio UI with Image Upload
with gr.Blocks(title="Multimodal RAG System", theme=gr.themes.Soft()) as demo:
    gr.Markdown("# ü§ñ Multimodal RAG System\n**Ask questions about PDFs or upload images for analysis!**")

    with gr.Row():
        with gr.Column(scale=1):
            query_input = gr.Textbox(
                label="üìù Enter your question",
                placeholder="e.g., What is the total revenue in 2023?",
                lines=3
            )
            image_input = gr.Image(
                label="üì∑ Upload Image (Optional)",
                type="numpy"
            )
            submit_btn = gr.Button("üîç Search & Answer", variant="primary")
            clear_btn = gr.Button("üóëÔ∏è Clear")

    with gr.Row():
        with gr.Column(scale=1):
            answer_output = gr.Textbox(label="üí° Answer", lines=12)

        with gr.Column(scale=1):
            sources_output = gr.Textbox(label="üìö Retrieved Context & Sources", lines=12)

    # Examples
    gr.Examples(
        examples=[
            ["What are the key financial highlights from the annual report?", None],
            ["What is the FYP evaluation process?", None],
            ["Show me information about total revenue in 2023", None],
            ["How many publications did Computer Science department have in 2023?", None]
        ],
        inputs=[query_input, image_input]
    )

    submit_btn.click(
        fn=chatbot_with_image,
        inputs=[query_input, image_input],
        outputs=[answer_output, sources_output]
    )

    clear_btn.click(
        fn=lambda: ("", None, "", ""),
        outputs=[query_input, image_input, answer_output, sources_output]
    )

# Launch the interface
demo.launch(share=True, debug=True)


Colab notebook detected. This cell will run indefinitely so that you can see errors and logs. To turn off, set debug=False in launch().
* Running on public URL: https://aed5c5441cf3bbee7c.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)



üîç Query: tell me top 3 ai tools as per the uploaded image

Keyboard interruption in main thread... closing server.
Killing tunnel 127.0.0.1:7860 <> https://aed5c5441cf3bbee7c.gradio.live




In [26]:
#CELL 11: Evaluation Metrics
import time
from sklearn.metrics import precision_score, recall_score, f1_score

class RAGEvaluator:
    def __init__(self):
        self.query_times = []
        self.retrieval_scores = []

    def evaluate_retrieval(self, queries_with_expected):
        """Evaluate Precision@K and Recall@K"""
        precisions = []
        recalls = []

        for query, expected_docs in queries_with_expected:
            # Retrieve top-k
            results = vector_db.search_text(query, k=5)
            retrieved_docs = [r['metadata'].get('source', '') for r in results]

            # Calculate metrics
            relevant = set(expected_docs)
            retrieved = set(retrieved_docs)

            if len(retrieved) > 0:
                precision = len(relevant.intersection(retrieved)) / len(retrieved)
                precisions.append(precision)

            if len(relevant) > 0:
                recall = len(relevant.intersection(retrieved)) / len(relevant)
                recalls.append(recall)

        return np.mean(precisions), np.mean(recalls)

    def measure_response_time(self, query):
        """Measure query response time"""
        start = time.time()
        answer, _, _ = rag_query(query)
        end = time.time()
        response_time = end - start
        self.query_times.append(response_time)
        return response_time

    def get_statistics(self):
        """Get performance statistics"""
        return {
            'avg_response_time': np.mean(self.query_times),
            'min_response_time': np.min(self.query_times),
            'max_response_time': np.max(self.query_times)
        }

# Test evaluation
evaluator = RAGEvaluator()

# Sample queries for testing
test_queries = [
    "What is the total revenue in the financial report?",
    "What are the key highlights from 2023?",
    "How many publications did CS department have?"
]

print("\nüìä EVALUATION RESULTS:\n")
print("=" * 50)

for query in test_queries:
    response_time = evaluator.measure_response_time(query)
    print(f"Query: {query}")
    print(f"Response Time: {response_time:.2f}s\n")

stats = evaluator.get_statistics()
print("=" * 50)
print(f"Average Response Time: {stats['avg_response_time']:.2f}s")
print(f"Min Response Time: {stats['min_response_time']:.2f}s")
print(f"Max Response Time: {stats['max_response_time']:.2f}s")
print("=" * 50)



üìä EVALUATION RESULTS:


üîç Query: What is the total revenue in the financial report?
Query: What is the total revenue in the financial report?
Response Time: 0.72s


üîç Query: What are the key highlights from 2023?
Query: What are the key highlights from 2023?
Response Time: 5.57s


üîç Query: How many publications did CS department have?
Query: How many publications did CS department have?
Response Time: 1.79s

Average Response Time: 2.69s
Min Response Time: 0.72s
Max Response Time: 5.57s


In [28]:
#CELL 12: Improve Prompting (Chain-of-Thought)
def generate_answer_with_cot(query: str, context_chunks: List[Dict]) -> str:
    """Generate answer using Chain-of-Thought prompting"""

    # Build context
    context_text = "\n\n".join([
        f"[Document: {chunk['metadata'].get('source', 'Unknown')}, Page {chunk['metadata'].get('page', 'N/A')}]\n{chunk['content'][:500]}"
        for chunk in context_chunks[:3]
    ])

    # Chain-of-Thought Few-Shot Prompt
    prompt = f"""You are an expert document analyst. Answer questions using step-by-step reasoning.

Example 1:
Question: What was the company's profit in Q2?
Reasoning: Let me break this down:
1. First, I need to find revenue data for Q2
2. Then locate the expenses for Q2
3. Finally calculate: Profit = Revenue - Expenses
Answer: Based on the financials, Q2 revenue was $5M and expenses were $3M, so profit was $2M.

Example 2:
Question: How many students enrolled?
Reasoning: Let me analyze:
1. Check the enrollment section
2. Look for specific year mentioned
3. Sum up all departments if needed
Answer: The report shows 1,200 students enrolled in 2023.

Now answer this question:

Context from Documents:
{context_text}

Question: {query}

Reasoning: Let me think step-by-step:"""

    try:
        inputs = tokenizer(prompt, return_tensors="pt", max_length=1024, truncation=True).to(device)
        outputs = llm_model.generate(
            inputs.input_ids,
            max_length=300,
            num_beams=4,
            temperature=0.7,
            do_sample=True,
            top_p=0.9
        )
        answer = tokenizer.decode(outputs[0], skip_special_tokens=True)

        # Add sources
        sources = [f"{c['metadata'].get('source', 'Unknown')} (Page {c['metadata'].get('page', 'N/A')})"
                   for c in context_chunks[:3]]
        answer += f"\n\nüìö Sources: {', '.join(set(sources))}"

        return answer
    except Exception as e:
        return f"Error: {str(e)}"

# Update rag_query to use CoT
def rag_query(query: str) -> Tuple[str, List[Dict], List[Dict]]:
    """Complete RAG pipeline with improved prompting"""
    text_results = vector_db.search_text(query, k=5)
    image_results = vector_db.search_image(query, k=2)
    answer = generate_answer_with_cot(query, text_results)  # Use CoT
    return answer, text_results, image_results

print("‚úÖ Enhanced prompting with Chain-of-Thought enabled!")


‚úÖ Enhanced prompting with Chain-of-Thought enabled!
