In [49]:
# Work by:
# Amit shomrat - 308032218
# Leon Nizovtsov - 314801713

In [None]:
import os
import subprocess
import fitz  # PyMuPDF
import json
import re
import easyocr
import numpy as np
from langchain_text_splitters import RecursiveCharacterTextSplitter
import cv2
from sentence_transformers import SentenceTransformer
from qdrant_client import QdrantClient
from qdrant_client.http.models import Distance, VectorParams, PointStruct, Filter, FieldCondition, MatchValue, MatchAny
from sklearn.metrics.pairwise import cosine_similarity
import uuid

In [51]:
# === STEP 1: CHUNKING ===

def debug_print_chunking(text_added, image_added):
    """
    Print a debug summary of the text and image chunks added.
    """
    summary = []
    if text_added:
        summary.append("✅ text")
    if image_added:
        summary.append(f"✅ image")
    if not text_added and not image_added:
        summary.append("⚪ skipped")
    print(f"({', '.join(summary)})")

In [52]:
def add_text_chunk(text_splitter, text_content, page_num, all_metadata, pdf_path):
    """
    Add a text chunk to the metadata.
    Args:
        text_content (str): The text content to add
        page_num (int): The page number of the chunk
        all_metadata (dict): The metadata dictionary
        pdf_path (str): The path to the PDF file

    Returns:
        bool: True if the text chunk was added successfully, False otherwise
    """
    print("Converting text chunk")

    try:
        if text_content.strip():
            text_chunks = text_splitter.split_text(text_content)
            for i, chunk in enumerate(text_chunks):
                all_metadata[pdf_path]["chunks"].append({
                    "type": "text",
                    "page": page_num + 1,
                    "chunk_number": i,
                    "content": chunk.strip()
                })
    except Exception as e:
        print(f"Error converting text chunk: {e}")
        return False
    return True

In [53]:
def sheet_descriptions(image_path, page_num, model="llava:7b", max_chars=300):
    """
    Convert an image to text using Llava via subprocess.
    Args:
        image_path (str): Path to the image file
        page_num (int): The page number of the chunk
        model (str): The model to use for text extraction (ignored in subprocess version)
        max_chars (int): The maximum number of characters to return
    
    Returns:
        dict: A dictionary containing the image description and metadata
        None: If the image file is not found or the processing fails
    """
    if not os.path.exists(image_path):
        print(f"❌ Image file not found: {image_path}")
        return None
    prompt = (
        f"Image: {image_path}\n"
        "You are an OCR-style diagram transcriber.\n"
        "Analyze the image and output ONLY in this format:\n"
        "Type: [Flowchart / Directed Graph / UML Diagram]\n"
        "NODES:\n"
        "- <NodeID or order>: \"<Exact text inside node>\" (approx. number of text lines)\n"
        "EDGES:\n"
        "<Node1> -> <Node2>\n"
        "<Node2> -> <Node3>\n"
        "...\n"
        "ALL TEXT:\n"
        f"Copy verbatim ALL text seen anywhere in the image (limit {max_chars} characters).\n"
        "RULES:\n"
        "- Do NOT add introductions or explanations.\n"
        "- Do NOT infer or interpret meaning.\n"
        "- If a node has no visible label, assign an incremental ID (Box1, Box2, …).\n"
        "- Output plain text only, following the format above."
    )

    try:
        # Construct the llava command
        cmd = ["ollama","run","llava:7b"]

        # Run the command and capture output
        process = subprocess.Popen(
            cmd,
            stdin=subprocess.PIPE,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            text=True,
            encoding="utf-8",     # ← add this
            errors="replace"      # ← and this (never crash on odd bytes)
        )

        # Send prompt to stdin
        stdout, stderr = process.communicate(input=prompt)

        if process.returncode != 0:
            print(f"❌ Llava process failed: {stderr}")
            return None

        text = stdout.strip()
        if not text:
            print("❌ No valid response received from Llava")
            return None

        # safety trim to max_chars at word boundary
        if len(text) > max_chars:
            text = text[:max_chars].rsplit(" ", 1)[0] + "…"

        return {
            "type": "image_description",
            "page": page_num,
            "content": text,
            "image_path": image_path
        }

    except Exception as e:
        print(f"❌ Failed to run Llava: {e}")
        return None


In [54]:
def add_image_chunk(page, page_num, all_metadata, output_dir, pdf_path):
    """
    Add an image chunk to the metadata.
    Args:
        page (fitz.Page): The page to extract the image from
        page_num (int): The page number of the chunk
        all_metadata (dict): The metadata dictionary
        output_dir (str): The output directory
        pdf_path (str): The path to the PDF file

    Returns:
        bool: True if the image chunk was added successfully, False otherwise
    """
    print("Converting image chunk")
    try:
        pix = page.get_pixmap()
        img_data = pix.tobytes("png")
        image_filename = f'{pdf_path.replace(".pdf", "")}_page_{page_num + 1}.png'
        image_path = os.path.join(output_dir, image_filename)
        with open(image_path, 'wb') as f:
            f.write(img_data)
        all_metadata[pdf_path]['chunks'].append(sheet_descriptions(image_path, page_num + 1))
    except Exception as e:
        print(f"Error converting image chunk: {e}")
        return False

    return True

In [55]:
def ocr_text_extraction (page):
    """
    Extract text from a page using OCR.
    Args:
        page (fitz.Page): The page to extract the text from
    
    Returns:
        str: The extracted text
    """
    # Page extraction pre - processing and cleaning:
    pix = page.get_pixmap(dpi=300)
    img = np.frombuffer(pix.samples, dtype=np.uint8).reshape(pix.height, pix.width, pix.n)
    if pix.n == 4:
        img = cv2.cvtColor(img, cv2.COLOR_BGRA2BGR)
    elif pix.n == 3:
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    
    img = cv2.GaussianBlur(img, (3, 3), 0)
    img = cv2.resize(img, None, fx=2, fy=2, interpolation=cv2.INTER_CUBIC)
    
    if img.ndim == 3:
        img = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)  # or COLOR_RGBA2GRAY if needed

    _, img = cv2.threshold(img, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
    
    # Text extraction:
    reader = easyocr.Reader(['en'])
    # Tries two angles: 0 and 90 and provide the most confident result. 
    # We need to find the best angle for the page. the rotation info should try the angles that it set to, 
    # but the results are not accurate as rotating the page through the page.set_rotation(90) of fitz.
    ocr_results = reader.readtext(img, rotation_info=[0, 90])
    ocr_text = " ".join([result[1] for result in ocr_results])
    return ocr_text


In [56]:
def extract_text_and_images_from_patent(pdf_path, output_dir="extracted_images"):
    """
    Extract text and images from a patent PDF file.
    
    Args:
        pdf_path (str): Path to the patent PDF file
        output_dir (str): Directory to save extracted images
        enable_ocr (bool): Whether to use OCR for pages with minimal text (SLOW!)
    
    Returns:
        list: List of chunks with metadata in the format:
              {"type": "text", "page": page_number, "content": text or image_path}
              {"type": "image", "page": page_number, "image_description": image_description, "content": image_path}
    """
    # Create output directory if it doesn't exist
    os.makedirs(output_dir, exist_ok=True)

    doc = fitz.open(pdf_path)
    total_pages = len(doc)
    all_metadata = {pdf_path: {
                      "chunks": []}}
    
    separators = [
    "\n\n",  # First try to split on double newlines (paragraphs)
    "! ",    # Split on exclamation marks followed by space
    "? ",    # Split on question marks followed by space
    ". ",    # Split on periods followed by space
    "\n",    # Then try single newlines
    " ",     # Then spaces
    ""       # Finally, character by character if needed
    ]

    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=500,
        chunk_overlap=100,
        length_function=len,
        separators=separators,
        is_separator_regex=False
    )

    print(f"Processing {total_pages} pages...")
    
    for page_num in range(total_pages):
        page = doc[page_num]
        print(f"📄 Processing page {page_num + 1}/{total_pages}...", end=" ")
        
        # Extract text from page pdf plain text
        text_content = page.get_text()
        if not text_content.strip():
            text_content = ocr_text_extraction(page)

        matches = re.findall(r'sheet\s+.+?\s+of\s+.+?(?=[\.\n]|$)', text_content, flags=re.IGNORECASE)
        image_added = False
        text_added = False
        if matches:
            image_added = add_image_chunk(page, page_num, all_metadata, output_dir, pdf_path)
        else:
            text_added = add_text_chunk(text_splitter, text_content, page_num, all_metadata, pdf_path)
        debug_print_chunking(text_added, image_added)

    # Close the document
    doc.close()
    print(f"Extraction complete! Found {len([c for c in all_metadata[pdf_path]['chunks'] if c['type'] == 'text'])} text chunks and {len([c for c in all_metadata[pdf_path]['chunks'] if c['type'] == 'image'])} images.")
    
    return all_metadata


In [57]:
def save_chunks_metadata(chunks, metadata_file="all_metadata.json"):
    """
    Save the chunks metadata to a JSON file.
    If file exists, merge new data with existing data.
    
    Args:
        chunks (dict): Dictionary with PDF path as key and chunk data as value
        metadata_file (str): Path to save the metadata file
    """
    # Load existing data if file exists
    existing_data = {}
    if os.path.exists(metadata_file):
        try:
            with open(metadata_file, 'r', encoding='utf-8') as f:
                existing_data = json.load(f)
            print(f"Loaded existing data from {metadata_file}")
        except (json.JSONDecodeError, Exception) as e:
            print(f"Warning: Could not load existing data ({e}), starting fresh")
            existing_data = {}
    
    # Merge new chunks with existing data
    existing_data.update(chunks)
    
    # Save combined data
    with open(metadata_file, 'w', encoding='utf-8') as f:
        json.dump(existing_data, f, indent=2, ensure_ascii=False)
        
    print(f"Metadata saved to {metadata_file}")

In [58]:
def load_chunks_metadata(metadata_file="all_metadata.json"):
    """
    Load chunks metadata from JSON file.
    
    Args:
        metadata_file (str): Path to the metadata file
        
    Returns:
        list: List of chunk dictionaries
    """
    if not os.path.exists(metadata_file):
        # Create empty JSON file if it doesn't exist
        print(f"Creating new metadata file: {metadata_file}")
        with open(metadata_file, 'w', encoding='utf-8') as f:
            json.dump({}, f, indent=2)
        return []
    
    # Context manager form - file always closed even if error
    with open(metadata_file, 'r', encoding='utf-8') as f:
        chunks = json.load(f)
    
    print(f"Loaded {len(chunks)} groups of chunks from {metadata_file}")
    return chunks


In [59]:
# === STEP 2: VECTOR STORE ===
def create_vector_store(chunks, model_name="all-MiniLM-L6-v2", collection_name="patent_chunks"):
    """
    Create vector store using SentenceTransformer and Qdrant.
    
    Args:
        chunks (list): List of chunk dictionaries
        model_name (str): SentenceTransformer model name 
                          (passed default "all-MiniLM-L6-v2" which is popular and balanced
                          embedding vector size 384)
        collection_name (str): Qdrant collection name
        
    Returns:
        tuple: (qdrant_client, sentence_transformer_model)
    """
    print(f"\n=== Step 2: Creating Vector Store ===")

    # Initialize SentenceTransformer
    print(f"Loading SentenceTransformer model: {model_name}")
    model = SentenceTransformer(model_name)
    
    # Extract text content for encoding
    texts = [chunk['content'] for chunk in chunks]
    
    # Create embeddings
    print("Creating embeddings for all text and image chunks...")
    embeddings = model.encode(texts, show_progress_bar=True)
    vector_size = embeddings.shape[1]
    print(f"Created embeddings: {embeddings.shape[0]} vectors of size {vector_size}")
    
    # Initialize in-memory (RAM) Qdrant client
    print("Setting up in-memory Qdrant vector database...")
    client = QdrantClient(":memory:")
    
    # Create collection:
    # 1. vectors_config: size=vector_size, distance=Distance.COSINE
    # 2. size: number of dimensions in the vector space
    # 3. distance: distance metric used for similarity search
    # 4. COSINE: cosine similarity
    # 5. id: unique identifier for each point
    # 6. vector: embedding vector
    client.create_collection(
        collection_name=collection_name,
        vectors_config=VectorParams(size=vector_size, distance=Distance.COSINE),
    )
    print(f"Created Qdrant collection: {collection_name}")
    
    # Prepare points for insertion
    points = []
    # Each chunk and its corresponding embedding are zipped together
    # and then enumerated to get the index and the chunk and embedding
    # the index is used to create a unique ID for each point
    # the chunk is used to create the payload
    # the embedding is used to create the vector
    for i, (chunk, embedding) in enumerate(zip(chunks, embeddings)):
        point = PointStruct(
            id=str(uuid.uuid4()),  # Unique ID for each point
            vector=embedding.tolist(),  # Convert numpy array to list
            payload={
                "type": chunk["type"],
                "page": chunk["page"],
                "content": chunk["content"],
                "chunk_index": i
            }
        )
        points.append(point)
    
    # Insert vectors into Qdrant
    client.upsert(
        collection_name=collection_name,
        points=points
    )
    
    print(f"✅ Stored {len(points)} vectors in Qdrant collection")
    print(f"✅ Vector store ready for semantic search!")
    
    return client, model

In [60]:
# === STEP 3: QUESTION INPUT ===
def load_questions(questions_file="questions.txt"):
    """
    Load questions from a text file.
    
    Args:
        questions_file (str): Path to the questions file
        
    Returns:
        list: List of question strings
    """
    print(f"\n=== Step 3: Loading Questions ===")
    
    # Check if questions file exists
    if not os.path.exists(questions_file):
        print(f"❌ Error: Questions file '{questions_file}' not found!")
        return []
    
    # Load questions from file
    inside_hidden = False
    count_hidden = 0
    questions = []
    try:
        with open(questions_file, 'r', encoding='utf-8') as f:
            for line_num, line in enumerate(f, 1):
                question = line.strip()
                if question == "[[HIDDEN]]":
                    inside_hidden = True
                    count_hidden += 1
                    continue
                if question == "[[/HIDDEN]]":
                    inside_hidden = False
                    count_hidden += 1
                    continue
                if not inside_hidden:
                    questions.append(question)
                    print(f"  Q{line_num - count_hidden}: {question}")
        
        print(f"✅ Loaded {len(questions)} questions from '{questions_file}'")
        
    except Exception as e:
        print(f"❌ Error reading questions file: {e}")
        return []
    
    if not questions:
        print("⚠️  No questions found in file!")
        return []
    
    return questions

In [61]:
# === STEP 4: RAG PROMPT CONSTRUCTION ===
def retrieve_relevant_chunks(question, client, model, collection_name="patent_chunks", top_k=3):
    """
    Retrieve top-k relevant text chunks for a question using vector similarity.
    
    Args:
        question (str): The question to search for
        client: Qdrant client
        model: SentenceTransformer model
        collection_name (str): Name of Qdrant collection
        top_k (int): Number of chunks to retrieve
        
    Returns:
        list: List of relevant chunks with metadata including embeddings of the form:
                                {
                                    'content': str,
                                    'page': int,
                                    'chunk_index': int,
                                    'similarity': float,
                                    'embedding': list
                                }
    """
    # Convert question to embedding
    question_embedding = model.encode([question])

    # Search for similar chunks in Qdrant (with vectors) - using query_points (newer API)
    search_results = client.query_points(
        collection_name=collection_name,
        query=question_embedding[0].tolist(),
        limit=top_k,
        query_filter=Filter(
            must=[
                FieldCondition(key="type", match=MatchValue(value="text"))
            ]
        ),
        with_vectors=True  # Include vectors in results
    )
    
    # Extract chunks with similarity scores and embeddings
    relevant_chunks = []
    for result in search_results.points:
        relevant_chunks.append({
            'content': result.payload['content'],
            'page': result.payload['page'],
            'chunk_index': result.payload['chunk_index'],
            'similarity': result.score,
            'embedding': result.vector  # Include the stored embedding
        })
    
    return relevant_chunks

In [62]:
# TODO: Instead of taking the greatest score, we have to take the greatest for the top chosen texts chunks (For each chosen text, take the top image).
def top_similar_images(relevant_chunks, chunks, max_images=2, client=None, collection_name="patent_chunks", max_threshold=0.4):
    """
    Find up to 2 most relevant image chunks based on similarity to the relevant text chunks.
    
    Args:
        relevant_chunks (list): Already retrieved relevant text chunks (with embeddings) of the form:
                                {
                                    'content': str,
                                    'page': int,
                                    'chunk_index': int,
                                    'similarity': float,
                                    'embedding': list
                                }
        chunks (list): All chunks (text and image)
        max_images (int): Maximum number of images to return
        client: Qdrant client
        
    Returns:
        dict of top-k most relevant image chunks of the form:
                                {
                                    'page': int,
                                    'content': str,
                                    'chunk_index': int,
                                    'similarity': float,
                                    'embedding': list
                                }
    """
    
    # Find image chunks, for candidate store only the page number then compare with client Qdrant.
    candidate_images = [chunk for chunk in chunks if chunk['type'] == 'image_description']
    
    if not candidate_images:
        return []
    
    # Use pre-computed embeddings from relevant chunks (no re-encoding!)
    relevant_text_embeddings = [chunk['embedding'] for chunk in relevant_chunks]
    

    # take the embeddings that match the candidate_images form client qdrant - using query_points (newer API)
    candidates_images_results = client.query_points(
        collection_name=collection_name,
        query=[0.0] * 384,  # Dummy vector (not used for filtering)
        limit=1000,  # Large limit to get all matches
        query_filter=Filter(
            must=[
                FieldCondition(
                    key="page",
                    match=MatchAny(any=[img['page'] for img in candidate_images])
                ),
                FieldCondition(
                    key="type", 
                    match=MatchValue(value="image_description")
                )
            ]
        ), with_vectors=True )
    
    candidates_images_embeddings = []
    for result in candidates_images_results.points:
        candidates_images_embeddings.append({
            'page': result.payload['page'],
            'content': result.payload['content'],
            'chunk_index': result.payload['chunk_index'],
            'similarity': result.score,
            'embedding': result.vector
        })


    # TODO: we need to modify this to take img from given similirity score threshold.
    
    # Calculate max similarity between each image and any relevant text chunk
    similarities = []
    for candidate_embedding in candidates_images_embeddings:
        # Get similarity scores between this image and all relevant text chunks
        img_similarities = cosine_similarity([candidate_embedding['embedding']], relevant_text_embeddings)[0]
        # Take the maximum similarity (best match with any relevant text)
        max_similarity = max(img_similarities)
        similarities.append(max_similarity)
    
    # Add similarity scores to image data
    for i, img in enumerate(candidate_images):
        img['similarity'] = similarities[i]
    
    # Sort by similarity (highest first) and return top max_images
    candidate_images.sort(key=lambda x: x['similarity'], reverse=True)
    selected_images = []
    for img in candidate_images[:max_images]:
        if img['similarity'] >= max_threshold:
            selected_images.append(img)
        else:
            break
    if selected_images:
        # Debug: Print similarity scores
        print(f"     Top {max_images} image similarity scores (vs pre-computed relevant text embeddings):")
        for i, img in enumerate(selected_images):
            print(f"       Image {i+1}: Page {img['page']}, Max Similarity = {img['similarity']:.3f}, Path = {img['image_path']}")
        return selected_images
    else:
        print(f"     No images found with similarity score >= {max_threshold}")
        return None

In [63]:
def construct_rag_prompt(question, question_index, relevant_chunks, selected_images_chunks, max_context_bytes=2000):
    """
    Construct RAG prompt with question, context, and images.
    
    Args:
        question (str): The question
        relevant_chunks (list): Retrieved text chunks
        selected_images (list): Paths to relevant images
        max_context_bytes (int): Maximum context length in bytes
        
    Returns:
        str1: Formatted prompt for Llava
        str2: Formatted prompt for Llama
    """
    # Sort relevant chunks by similarity (highest first)
    relevant_chunks.sort(key=lambda x: x['similarity'], reverse=True)
    # Add relevant chunks to the context, but not exceeding the max_context_bytes limit
    context_parts = []
    total_bytes = 0
    for chunk in relevant_chunks:
        chunk_text = chunk['content']
        chunk_bytes = len(chunk_text.encode('utf-8'))
        
        # Check if adding this chunk would exceed limit
        if total_bytes + chunk_bytes <= max_context_bytes:
            context_parts.append(f"[Page {chunk['page']}] {chunk_text}")
            total_bytes += chunk_bytes
        else:
            # Add partial chunk to reach exactly max_context_bytes
            remaining_bytes = max_context_bytes - total_bytes
            if remaining_bytes > 50:  # Only add if meaningful amount remaining
                # encode for snniping the exact amount of bytes, then decode to utf-8 back.
                partial_text = chunk_text.encode('utf-8')[:remaining_bytes].decode('utf-8', errors='ignore')
                context_parts.append(f"[Page {chunk['page']}] {partial_text}")
            break

    question_bytes = len(question.encode('utf-8'))
    if selected_images_chunks:
        images_context = []
        image_list = ""
        for index, image_chunk in enumerate(selected_images_chunks):
            image_list += f"\nImage {index+1}: {image_chunk['image_path']}"
            images_context.append(f"\nImage {index+1}-{image_chunk['content']}")
        images_list_bytes = len("".join(image_list).encode('utf-8'))
        images_context_bytes = len("".join(images_context).encode('utf-8'))
        
        # Join the context parts with newlines.
        text_context = "\n".join(context_parts)

        # Construct final prompt for Llava
        prompt_llava = f"""Question {question_index} [bytes: {question_bytes}]:\n{question}\nText-Context [bytes: {max_context_bytes - images_list_bytes - question_bytes}]:\n{text_context.encode('utf-8')[:max_context_bytes - images_list_bytes - question_bytes].decode('utf-8', errors='ignore')}\nImages-Paths[bytes: {images_list_bytes}]: {image_list}"""

        # Build context from relevant chunks for Llama:
        images_context = "".join(images_context)
        context_bytes_with_images = max_context_bytes - images_context_bytes - question_bytes
        prompt_llama = f"""Question {question_index} [bytes: {question_bytes}]:\n{question}\nText-Context [bytes: {context_bytes_with_images}]:\n{text_context.encode('utf-8')[:context_bytes_with_images].decode('utf-8', errors='ignore')}\nImages-Context [bytes: {images_context_bytes}]:\n{images_context}"""
    else:
        prompt_llava = prompt_llama = f"""Question {question_index} [bytes: {question_bytes}]:\n{question}\nText-Context [bytes: {total_bytes}]:\n{context_parts[:max_context_bytes]}"""    
    
    return prompt_llava, prompt_llama


In [64]:
# Using the models based on the question prompt.
def process_questions_with_rag(questions, chunks, client, model):
    """
    Process all questions using RAG pipeline. (retrieve relevant chunks, top similar images, construct rag prompt)
    
    Args:
        questions (list): List of questions
        chunks (list): All chunks (text and image)
        client: Qdrant client 
        model: SentenceTransformer model
        
    Returns:
        list: List of constructed prompts of the form:
                                {
                                    'question': str,
                                    'llava_prompt': str,
                                    'llama_prompt': str,
                                    'relevant_pages': list,
                                    'selected_images_chunks': list
                                }
    """
    print(f"\n=== Step 4: RAG Prompt Construction ===")
    print(f"Processing {len(questions)} questions...")
    
    # Clear prompt files at the start of each run
    with open("prompt_llama.txt", "w") as f:
        f.write("")  # Clear the file
    with open("prompt_llava.txt", "w") as f:
        f.write("")  # Clear the file
    
    prompts = []
    
    for i, question in enumerate(questions, 1):
        print(f"\n🔍 Processing Question {i}/{len(questions)}: '{question[:50]}...'")
        
        # 1. Retrieve top-k relevant text chunks
        relevant_chunks = retrieve_relevant_chunks(question, client, model, top_k=3)
        print(f"   Retrieved {len(relevant_chunks)} relevant chunks")
        
        # Show text similarity scores
        for j, chunk in enumerate(relevant_chunks):
            print(f"     Chunk {j+1}: Page {chunk['page']}, Similarity = {chunk['similarity']:.3f}")
        
        # 2. Find nearby images using similarity scoring with relevant text
        relevant_pages = [chunk['page'] for chunk in relevant_chunks]
        selected_images_chunks = top_similar_images(relevant_chunks,chunks, max_images=2, client=client)
        
        # 3. Construct prompt
        llava_prompt, llama_prompt = construct_rag_prompt(question, i, relevant_chunks, selected_images_chunks)
        prompts.append({
            'question': question,
            'llava_prompt': llava_prompt,
            'llama_prompt': llama_prompt,
            'relevant_pages': relevant_pages,
            'selected_images_chunks': selected_images_chunks
        })
        
    print(f"\n✅ Constructed {len(prompts)} RAG prompts")
    return prompts

In [65]:
# === STEP 5: ANSWER GENERATION ===
def call_ollama_llama(prompt, model="llama3:latest", max_chars=300):
    """
    Call LLaMA via ollama for text-only questions.
    
    Args:
        prompt (str): The input prompt
        model (str): LLaMA model to use
        max_chars (int): Maximum characters for the answer
        
    Returns:
        str: Generated answer
    """
    try:
        # Prepare the command to call ollama
        cmd = ["ollama", "run", model]
        
        # Add instruction to limit response length
        full_prompt = f"""{prompt}

Please provide a concise answer based ONLY on the provided context. Do not use external knowledge. Keep your answer under {max_chars} characters."""
        
        # Use subprocess to call ollama
        process = subprocess.Popen(
            cmd,
            stdin=subprocess.PIPE,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            text=True,
            encoding='utf-8',
            errors= "replace"
        )
        with open("prompt_llama.txt", "a") as f:
            f.write(full_prompt + "\n\n")
        # Send prompt and get response
        stdout, stderr = process.communicate(input=full_prompt, timeout=60)
        
        if process.returncode != 0:
            print(f"❌ Error calling ollama: {stderr}")
            return "Error: Unable to generate answer"
        
        # Clean and truncate the response
        answer = stdout.strip()
        if len(answer) > max_chars:
            answer = answer[:max_chars].rsplit(' ', 1)[0] + "..."
        
        return answer
        
    except subprocess.TimeoutExpired:
        print("❌ Timeout calling ollama")
        return "Error: Timeout generating answer"
    except Exception as e:
        print(f"❌ Error calling ollama: {e}")
        return "Error: Unable to generate answer"

In [66]:
def call_ollama_llava(prompt, model="llava:7b", max_chars=300):
    """
    Call LLaVA via ollama for text and image questions.
    
    Args:
        prompt (str): The input prompt
        model (str): LLaVA model to use
        max_chars (int): Maximum characters for the answer
    """

    try:
        cmd = ["ollama", "run", model]
        
        full_prompt = f"""{prompt}
Please provide a concise answer based ONLY on the provided context. Do not use external knowledge. Keep your answer under {max_chars} characters."""
        
        process = subprocess.Popen(
            cmd,
            stdin=subprocess.PIPE,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            text=True,
            encoding='utf-8',
            errors="replace"
        )
        
        with open("prompt_llava.txt", "a") as f:
            f.write(full_prompt + "\n\n")

        stdout, stderr = process.communicate(input=full_prompt, timeout=120)
        
        if process.returncode != 0:
            print(f"❌ Error calling ollama llava: {stderr}")
            return 
        
        answer = stdout.strip()
        if len(answer) > max_chars:
            answer = answer[:max_chars].rsplit(' ', 1)[0] + "..."
        return answer
    
    except subprocess.TimeoutExpired:
        print("❌ Timeout calling ollama llava")
        return 
    except Exception as e:
        print(f"❌ Error calling ollama llava: {e}")
        return 

In [68]:
def test_ollama_models():
    result = subprocess.run(["ollama", "list"], capture_output=True, text=True)
    available = result.stdout.lower()

    return {
        "llama3": "llama3" in available,   # catches llama3:latest / llama3:8b / llama3:70b
        "llava": "llava" in available
    }



In [67]:
# === STEP 6: ANSWERS TO FILE ===
def generate_answers(rag_prompts, output_file="both_models_answers.txt"):
    """
    Generate answers for all questions using ollama (LLaMA/LLaVA).
    
    Args:
        rag_prompts (list): List of RAG prompt dictionaries
        output_file (str): File to save answers
        
    Returns:
        list: List of answers
    """
    print(f"\n=== Step 5: Answer Generation ===")
    print(f"Generating answers for {len(rag_prompts)} questions using ollama...")
    
    answers = []
    
    # Check if ollama is available and test models
    try:
        result = subprocess.run(["ollama", "--version"], 
                              capture_output=True, text=True, timeout=10)


        if result.returncode != 0:
            print("❌ Error: ollama is not available or not working properly")
            return []
        
        # Test available models
        models = test_ollama_models()
        
        if not models.get('llama3'):
            print("⚠️  Warning: No LLaMA model found. Run: ollama pull llama3")
        if not models['llava']:
            print("⚠️  Warning: No LLaVA model found. You may need to run 'ollama pull llava'")
            
    except Exception as e:
        print(f"❌ Error: Cannot access ollama - {e}")
        print("   Please make sure ollama is installed and running")
        return []
    
    # Process each question
    for i, prompt_data in enumerate(rag_prompts, 1):
        question = prompt_data['question']
        llava_prompt = prompt_data['llava_prompt']
        llama_prompt = prompt_data['llama_prompt']
        
        # # Extract image paths safely (handle cases with 0, 1, or 2+ images)
        # selected_images = prompt_data['selected_images_chunks']
        # image_paths = [img['image_path'] for img in selected_images] if selected_images else []
        
        print(f"\n🤖 Generating answer {i}/{len(rag_prompts)}")
        print(f"   Question: {question}")
        
        # # ( We will use both Llama and Llava for image questions and Llama only for text questions (we have))
        # if image_paths and len(image_paths) > 0:
        #     print(f"   Images include, using LLaVA with {len(image_paths)} images: {image_paths}")

        answer_llava = call_ollama_llava(llava_prompt)
        answer_llava = answer_llava[:300]  # Ensure answers don't exceed 300 characters and handle None values
        
        answer_llama = call_ollama_llama(llama_prompt)
        answer_llama = answer_llama[:300]
        
        # Handle None values for character counting
        llama_chars = len(answer_llama) if answer_llama else 0
        llava_chars = len(answer_llava) if answer_llava else 0
        
        answers.append({
            'question_number': i,
            'question': question,
            'answer_llama': answer_llama or "Error: LLaMA failed",
            'answer_llava': answer_llava or "Error: LLaVA failed", 
            'char_count_llama': llama_chars,
            'char_count_llava': llava_chars
        })
        
        print(f"Answer LLaVA ({llava_chars} chars):\n{answer_llava}")
        print(f"Answer LLaMA ({llama_chars} chars):\n{answer_llama}")
    
    # Save answers to file
    try:
        with open(output_file, 'w', encoding='utf-8') as f:
            for ans_data in answers:
                f.write(f"Question {ans_data['question_number']}: {ans_data['question']}\n")
                f.write(f"Answer LLaMA ({ans_data['char_count_llama']} chars):\n{ans_data['answer_llama']}\n")
                f.write(f"Answer LLaVA ({ans_data['char_count_llava']} chars):\n{ans_data['answer_llava']}\n")
        
        print(f"\n✅ Answers saved to {output_file}")
        
        print(f"   Total answers: {len(answers)}")
        
    except Exception as e:
        print(f"❌ Error saving answers: {e}")
    
    return answers

In [69]:
def save_similarity_results(evaluations, llama_avg, llava_avg, output_file="evaluation_results.txt"):
    """
    Save similarity evaluation results to a file.
    
    Args:
        evaluations (dict): All evaluation data
        llama_avg (float): LLaMA average similarity score
        llava_avg (float): LLaVA average similarity score
        output_file (str): Output file path
    """
    try:
        with open(output_file, 'w', encoding='utf-8') as f:
            f.write("=== SEMANTIC SIMILARITY EVALUATION RESULTS ===\n\n")
            
            # Write detailed results for each question
            for i, (question, data) in enumerate(evaluations.items(), 1):
                f.write(f"Question {i}: {data['question']}\n")
                f.write("-" * 80 + "\n")
                
                # LLaMA evaluation
                f.write(f"LLaMA Answer: {data['llama_answer'][:100]}...\n")
                f.write(f"LLaMA Similarity Score: {data['llama_similarity']:.4f}\n\n")
                
                # LLaVA evaluation
                f.write(f"LLaVA Answer: {data['llava_answer'][:100]}...\n")
                f.write(f"LLaVA Similarity Score: {data['llava_similarity']:.4f}\n")
                
                f.write("\n" + "="*80 + "\n\n")
            
            # Write summary
            f.write("=== SUMMARY ===\n\n")
            f.write(f"LLaMA Average Similarity: {llama_avg:.4f}\n")
            f.write(f"LLaVA Average Similarity: {llava_avg:.4f}\n")
            f.write(f"Better Model: {'LLaVA' if llava_avg > llama_avg else 'LLaMA' if llama_avg > llava_avg else 'Tie'}\n")
        
        print(f"✅ Detailed evaluation results saved to {output_file}")
        
    except Exception as e:
        print(f"❌ Error saving evaluation results: {e}")

In [70]:
def evaluate_single_answer(prompt, answer, model_name, collection_name="prompts_chunks"):
    """
    Evaluate a single answer based on semantic similarity with the prompt.
    
    Args:
        prompt (str): The RAG prompt containing question and context
        answer (str): The model's answer
        model_name (str): SentenceTransformer model name
        
    Returns:
        float: Semantic similarity score between prompt and answer (0-1 scale)
    """
    # Handle error cases
    if not answer or answer.startswith("Error:"):
        return 0.0
    
    # Initialize model
    model = SentenceTransformer(model_name)
    
    # Encode entire prompt and answer as single embeddings
    prompt_embedding = model.encode([prompt], show_progress_bar=False)
    answer_embedding = model.encode([answer], show_progress_bar=False)
    
    # Compute cosine similarity between the two single embeddings
    similarity_matrix = cosine_similarity(prompt_embedding, answer_embedding)
    similarity_score = similarity_matrix[0][0]  # Extract the single similarity value
    
    return float(similarity_score)

In [71]:
def answers_eval(rag_prompts, answers, output_file="answers.txt", model_name="all-MiniLM-L6-v2"):
    """
    Evaluate answers from both LLaMA and LLaVA models using semantic similarity.
    
    This function computes the semantic similarity between each model's answer 
    and its corresponding RAG prompt (containing question + context).
    
    Args:
        rag_prompts (list): List of RAG prompt dictionaries with 'llama_prompt' and 'llava_prompt'
        answers (list): List of answer dictionaries with 'answer_llama' and 'answer_llava'
        output_file (str): Path to save evaluation results
        model_name (str): SentenceTransformer model for computing embeddings
        
    Returns:
        dict: Evaluation results with similarity scores for each question and model
    """
    print(f"\n=== Answer Evaluation ===")
    if not answers:
        print(f"❌ Error: No answers to evaluate!")
        return {}
    

    # Evaluate both answers for this question
    evaluations = {}
    llama_scores = []
    llava_scores = []
    file_output = open(output_file, "w")
    
    for i, (answer, prompt) in enumerate(zip(answers, rag_prompts)):
        llama_similarity = evaluate_single_answer(prompt['llama_prompt'], answer['answer_llama'], model_name)
        llava_similarity = evaluate_single_answer(prompt['llava_prompt'], answer['answer_llava'], model_name) if prompt['llava_prompt'] else 0.0
        
        evaluations[answer['question']] = {
            'question': answer['question'],
            'llama_answer': answer['answer_llama'],
            'llava_answer': answer['answer_llava'],
            'llama_similarity': llama_similarity,
            'llava_similarity': llava_similarity
        }
        
        llama_scores.append(llama_similarity)
        llava_scores.append(llava_similarity)

        print(f"Question {i+1} Similarity Scores:")
        print(f"  LLaMA: {llama_similarity:.4f}")
        print(f"  LLaVA: {llava_similarity:.4f}")
        
    # Calculate and display averages
    llama_avg = sum(llama_scores) / len(llama_scores) if llama_scores else 0.0
    llava_avg = sum(llava_scores) / len(llava_scores) if llava_scores else 0.0
    
    print(f"\n=== Evaluation Summary ===")
    print(f"LLaMA Average Similarity: {llama_avg:.4f}")
    print(f"LLaVA Average Similarity: {llava_avg:.4f}")
    
    # Save results to file
    save_similarity_results(evaluations, llama_avg, llava_avg, output_file)
    
    # Write best answers to answers.txt
    try:
        with open("answers.txt", "w", encoding='utf-8') as f:
            f.write("=== BEST ANSWERS BASED ON SIMILARITY SCORES ===\n\n")
            
            for i, (question, evaluation_data) in enumerate(evaluations.items(), 1):
                # Determine which model has higher similarity
                if evaluation_data['llama_similarity'] > evaluation_data['llava_similarity']:
                    best_model = "LLaMA"
                    best_answer = evaluation_data['llama_answer']
                    best_similarity = evaluation_data['llama_similarity']
                else:
                    best_model = "LLaVA"
                    best_answer = evaluation_data['llava_answer']
                    best_similarity = evaluation_data['llava_similarity']
                
                # Write to file
                f.write(f"Question {i}: {evaluation_data['question']}\n")
                f.write(f"Best Answer ({best_model} - Similarity: {best_similarity:.4f}):\n")
                f.write(f"{best_answer}\n\n")
                f.write("-" * 80 + "\n\n")
                
                print(f"Question {i}: {best_model} wins (Similarity: {best_similarity:.4f})")
        
        print(f"✅ Best answers written to answers.txt")
        
    except Exception as e:
        print(f"❌ Error writing best answers: {e}")
        
    print(f"✅ Evaluated {len(evaluations)} questions")    
    return evaluations


In [72]:
def main():
    """
    Main function to execute the RAG pipeline steps
    """
    # TODO: add stoper for the entire process
    # pdf switch
    pdf_path = "US11960514.pdf"
    
    # Check if patent PDF exists
    if not os.path.exists(pdf_path):
        print(f"Error: {pdf_path} not found in the current directory.")
        return
    
    print("=== RAG Pipeline for Patent Analysis ===")
    print(f"Processing: {pdf_path}\n")
    
    # === STEP 1: CHUNKING ===
    print("=== Step 1: Chunking the Patent ===")
    
    # Check if we already have processed any chunks
    all_metadata = load_chunks_metadata()
    if pdf_path in all_metadata:
        print(f"Loaded existing chunks for {pdf_path}")
        chunks = all_metadata[pdf_path]["chunks"]
    else:
        print("Processing patent PDF...")
        all_metadata = extract_text_and_images_from_patent(pdf_path)
        save_chunks_metadata(all_metadata)
        chunks = all_metadata[pdf_path]["chunks"]
    
    # Print Step 1 summary
    text_chunks = [c for c in chunks if c['type'] == 'text']
    image_chunks = [c for c in chunks if c['type'] == 'image_description']
    
    print(f"\n=== Step 1 Summary ===")
    print(f"Total chunks: {len(chunks)}")
    print(f"Text chunks: {len(text_chunks)}")
    print(f"Image chunks: {len(image_chunks)}")
    
    # === STEP 2: VECTOR STORE ===
    client, model = create_vector_store(chunks)
    
    # === STEP 3: QUESTION INPUT ===
    questions = load_questions()
    
    # === STEP 4: RAG PROMPT CONSTRUCTION ===
    if questions:  # Only proceed if we have questions
        rag_prompts = process_questions_with_rag(questions, chunks, client, model)
    else:
        print("⚠️  No questions to process - skipping RAG prompt construction")
        rag_prompts = []
    
    # === STEP 5: ANSWER GENERATION ===
    answers = []
    if rag_prompts:  # Only proceed if we have prompts
        answers = generate_answers(rag_prompts)
    else:
        print("⚠️  No prompts to process - skipping answer generation")
    
    print(f"\n=== Pipeline Complete ===")
    print(f"✅ Step 1: Patent chunked into {len(chunks)} pieces")
    print(f"✅ Step 2: {len(text_chunks)} text chunks vectorized and stored")
    print(f"✅ Step 3: {len(questions)} questions loaded and ready")
    print(f"✅ Step 4: {len(rag_prompts)} RAG prompts constructed")
    print(f"✅ Step 5: {len(answers)} answers generated and saved")
    
    # Optional: Run evaluation if answers were generated
    if answers and rag_prompts:
        print(f"\n=== Optional: Running Answer Evaluation ===")
        evaluation_results = answers_eval(rag_prompts, answers)
        return chunks, client, model, questions, rag_prompts, answers, evaluation_results
    
    return chunks, client, model, questions, rag_prompts, answers

In [None]:
if __name__ == "__main__":
    main()

=== RAG Pipeline for Patent Analysis ===
Processing: US11960514.pdf

=== Step 1: Chunking the Patent ===
Creating new metadata file: all_metadata.json
Processing patent PDF...
Processing 26 pages...
📄 Processing page 1/26... 

Neither CUDA nor MPS are available - defaulting to CPU. Note: This module is much faster with a GPU.
Neither CUDA nor MPS are available - defaulting to CPU. Note: This module is much faster with a GPU.


Converting text chunk
(✅ text)
📄 Processing page 2/26... Converting image chunk


Neither CUDA nor MPS are available - defaulting to CPU. Note: This module is much faster with a GPU.


(✅ image)
📄 Processing page 3/26... Converting image chunk


Neither CUDA nor MPS are available - defaulting to CPU. Note: This module is much faster with a GPU.


(✅ image)
📄 Processing page 4/26... Converting image chunk


Neither CUDA nor MPS are available - defaulting to CPU. Note: This module is much faster with a GPU.


(✅ image)
📄 Processing page 5/26... Converting image chunk


Neither CUDA nor MPS are available - defaulting to CPU. Note: This module is much faster with a GPU.


(✅ image)
📄 Processing page 6/26... Converting image chunk


Neither CUDA nor MPS are available - defaulting to CPU. Note: This module is much faster with a GPU.


(✅ image)
📄 Processing page 7/26... Converting image chunk


Neither CUDA nor MPS are available - defaulting to CPU. Note: This module is much faster with a GPU.


(✅ image)
📄 Processing page 8/26... Converting image chunk
