# RAG Vision Pipeline

## Overview
This notebook demonstrates a Retrieval-Augmented Generation (RAG) pipeline for technical documentation using:
- PDF text and image extraction
- Image description generation
- Semantic search
- Context-aware response generation

## Prerequisites and Dependencies
We'll need several libraries for our pipeline:

In [None]:
# Install required libraries
!pip install pymupdf pytesseract langchain-community faiss-cpu ollama pillow

In [1]:
# Import required libraries
import os
import re
import ollama
import fitz  # PyMuPDF
from PIL import Image

# Langchain and ML imports
from langchain_community.embeddings import OllamaEmbeddings
from langchain_community.vectorstores import FAISS
from langchain_community.llms import Ollama
from langchain.text_splitter import RecursiveCharacterTextSplitter

## Configuration and Setup

In [None]:
# Global Configuration
PDF_PATH = "data/project_doc.pdf"
OUTPUT_DIR = "data/extracted_images"
os.makedirs(OUTPUT_DIR, exist_ok=True)

# Model Configuration
EMBEDDING_MODEL = "llama3.2"
VISION_MODEL = "llama3.2-vision"
LLM_MODEL = "llama3.2"

# Initialize models
embeddings = OllamaEmbeddings(model=EMBEDDING_MODEL)
vision_llm = Ollama(model=VISION_MODEL)
llm = Ollama(model=LLM_MODEL)
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=1500, 
    chunk_overlap=300
)

## Image Extraction and Description Functions

In [20]:
from typing import List


def extract_images_from_pdf(pdf_path, output_dir):
    """
    Extract images from PDF using PyMuPDF
    
    Args:
        pdf_path (str): Path to PDF file
        output_dir (str): Directory to save extracted images
    
    Returns:
        List of extracted image paths
    """
    # Open the PDF document
    doc = fitz.open(pdf_path)
    extracted_images = []

    # Iterate through each page of the PDF
    for page_num in range(len(doc)):
        page = doc[page_num]
        images = page.get_images(full=True)
        
        # Process each image in the page
        for img_index, img_info in enumerate(images):
            try:
                xref = img_info[0]
                # Create a Pixmap object from the xref
                pixmap = fitz.Pixmap(doc, xref)
                
                # Check if the pixmap has valid data
                if pixmap.width == 0 or pixmap.height == 0 or not pixmap.samples:
                    print(f"Skipping invalid image at page {page_num+1}, index {img_index}")
                    continue

                # If the image is in grayscale or other format, convert to RGB
                if pixmap.n > 4:  # CMYK or similar
                    pixmap = fitz.Pixmap(fitz.csRGB, pixmap)
                
                # Save the image to a PIL Image
                img = Image.frombytes(
                    "RGB", 
                    [pixmap.width, pixmap.height], 
                    pixmap.samples
                )
                
                # Save the image to the output directory
                img_path = os.path.join(
                    output_dir, 
                    f"page_{page_num+1}_image_{img_index}.png"
                )
                img.save(img_path)
                extracted_images.append(img_path)
            
            except Exception as e:
                print(f"Error processing image at page {page_num+1}, index {img_index}: {e}")
    
    return extracted_images


def describe_image(image_path, vision_llm):
    """
    Generate detailed description for an image
    
    Args:
        image_path (str): Path to image file
        vision_llm: Vision language model name
    
    Returns:
        str: Image description
    """
    print(image_path)
    # Use Ollama to analyze the image with Llama 3.2-Vision
    response = ollama.chat(
        model=vision_llm,
        messages=[{
        "role": "user",
        "content": "Analyze this technical documentation image in detail where don't mention the color of charts in the image focus more on content.",
        "images": [image_path]
        }],
    )

    # Extract the model's response about the image
    cleaned_text = response['message']['content'].strip()
    
    return clean_text_general(cleaned_text)

# Function to clean and structure the data
def clean_text_general(raw_text: str) -> str:
    """
    Cleans and structures raw text data for various use cases, including vector store preparation.

    Args:
        raw_text (str): Raw input text.

    Returns:
        str: Cleaned and structured text entries as a single string separated by newlines.
    """
    # Normalize whitespace
    normalized_text = re.sub(r'\s+', ' ', raw_text).strip()

    # Remove consecutive stars (**) in a sentence
    normalized_text = re.sub(r'\*{2,}', '', normalized_text)

    # Split into lines and remove unnecessary empty lines or excessive whitespace
    lines = normalized_text.split('.')
    cleaned_lines = [line.strip() for line in lines if line.strip()]

    # Join the cleaned lines into a single string separated by newlines
    return '\n'.join(cleaned_lines)

## PDF Content Extraction Function

In [21]:
def extract_pdf_content(pdf_path, output_dir, vision_llm):
    """
    Extract text and generate image descriptions
    
    Args:
        pdf_path (str): Path to PDF file
        output_dir (str): Directory to save images
        vision_llm: Vision language model
    
    Returns:
        List of page contents with text and image descriptions
    """
    # Extract images
    image_paths = extract_images_from_pdf(pdf_path, output_dir)
    
    # Describe images
    image_descriptions = {}
    for path in image_paths:
        try:
            # Convert the response to a JSON-compatible string if necessary
            description = describe_image(path, vision_llm)
            print(description)
            if isinstance(description, bytes):
                description = description.decode("utf-8")  # Convert bytes to string
            image_descriptions[path] = description
        except Exception as e:
            print(f"Error describing image {path}: {e}")
            image_descriptions[path] = "Error generating description."

    # Extract PDF text
    doc = fitz.open(pdf_path)
    page_contents = []
    
    for page_num in range(len(doc)):
        page = doc[page_num]
        page_text = page.get_text()
        
        # Add image descriptions for this page
        page_images_desc = [
            desc for path, desc in image_descriptions.items()
            if f"page_{page_num+1}_" in path
        ]
        
        if page_images_desc:
            page_text += "\n\n--- Image Descriptions ---\n"
            page_text += "\n".join(page_images_desc)
        
        page_contents.append({
            'page_content': page_text,
            'page_number': page_num + 1
        })
    
    return page_contents

## Vector Store and Retrieval Functions

In [22]:
def create_vector_store(page_contents, embeddings, text_splitter):
    """
    Create FAISS vector store from PDF contents
    
    Args:
        page_contents (List[Dict]): Extracted page contents
        embeddings: Embedding model
        text_splitter: Text splitting utility
    
    Returns:
        FAISS vector store
    """
    # Split documents into chunks
    split_docs = text_splitter.create_documents(
        [doc['page_content'] for doc in page_contents],
        metadatas=[{'page_number': doc['page_number']} for doc in page_contents]
    )
    
    # Create FAISS vector store
    return FAISS.from_documents(split_docs, embeddings)

def retrieve_context(vector_store, query, k=5):
    """
    Retrieve relevant context from vector store
    
    Args:
        vector_store: FAISS vector store
        query (str): Search query
        k (int): Number of top results
    
    Returns:
        Retrieved context documents
    """
    return vector_store.similarity_search(query, k=k)

## RAG Pipeline Execution

In [23]:

# Extract PDF content with images
page_contents = extract_pdf_content(PDF_PATH, OUTPUT_DIR, VISION_MODEL)
    
# Create vector store
vector_store = create_vector_store(page_contents, embeddings, text_splitter)

data/extracted_images/page_1_image_0.png
The provided image is a flowchart that illustrates the process of machine learning development and deployment, from data collection to model training and testing
Step 1: Data Collection * The first step in the process is collecting relevant data
* This can be done through various means such as web scraping, APIs, or manual entry
* The collected data is then stored in a database or data warehouse for further processing
Step 2: Feature Engineering * Once the data is collected, it needs to be preprocessed and transformed into a format suitable for machine learning algorithms
* This involves tasks such as feature scaling, normalization, and encoding categorical variables
* The resulting features are then stored in a separate dataset or database
Step 3: Model Training * With the preprocessed data, the next step is to train a machine learning model using one of several popular algorithms such as linear regression, decision trees, random forests, suppo

## Response Generation

In [24]:

from operator import itemgetter
from langchain.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser

parser = StrOutputParser()

template = """
    Context: {context}
    
    Query: {question}
    You are an assistant that provides answers to questions based on
    a given context incorporating any relevant details from the document, 
    including image descriptions, answer in a friendly way. 

    Answer the question based on the context. If you can't answer the
    question, reply "I don't know".
    """
prompt = PromptTemplate.from_template(template)
retriever = vector_store.as_retriever()

In [29]:

# Example queries
queries = [
    "Block diagram of project architecture"
]

# Run queries
for query in queries:
    print(f"\nQuery: {query}\n")
    
    # Retrieve context
    retrieved_context = retrieve_context(vector_store, query)
    
    # Extract context from the retrieved documents
    context_value = "\n".join(doc.page_content for doc in retrieved_context)

    chain_input = {
        "context": context_value,
        "question": query,
    }

    chain = (

         prompt
        | llm
        | parser
    )
    
    # Generate response
    response = chain.invoke(chain_input)
    
    print("Response:", response)
    print("-" * 50)


Query: Block diagram of project architecture

Response: Based on the provided context, I'll do my best to answer your question.

The project architecture is described as an end-to-end solution for scalable and accurate predictions for product sales forecasting and scalability of application.

A block diagram of the project architecture can be represented as follows:

1. Data Ingestion:
	* New data is added to the repository.
	* `incremental_load.py` file sends an API request to the ML pipeline along with the new data.
2. Data Validation:
	* The pipeline performs data validation steps based on the data validation report.
3. Model Training and Deployment:
	* If the data passes validation, a new model is trained and deployed.
4. Continuous Integration (CI) Framework:
	* The CI framework tests whether the development code should be pushed to production.
	* It includes unit tests and integration tests using pytest in Python.
5. Prediction Consumption Pipeline:
	* The pipeline consumes pred