recall: three key motivations for medical RAG
- evidence-based medicine
- credibility and trust
- dynamic knowledge updates

document processing pipeline

In [1]:

"""
Document processing service for RAG system
Handles PDF extraction, text chunking, and preprocessing
"""

import os
import fitz # PyMuPDF
import re
from typing import List, Dict, Tuple
from pathlib import Path

class DocumentProcessor:
    def __init__(self, chunk_size: int = 1000, chunk_overlap: int = 200):
        """
        Initialize document processor

        Args:
            chunk_size: Maximum size of each text chunk
            chunk_overlap: Overlap between chunks for context preservation
        """
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap


ModuleNotFoundError: No module named 'fitz'

In [None]:

def extract_text_from_pdf(self, pdf_path: str) -> str:
    """
    Extract text from PDF file

    Args:
        pdf_path: Path to the PDF file

    Returns:
        Extracted text content
    """
    try:
        doc = fitz.open(pdf_path)
        text = ""

        for page_num in range(len(doc)):
            page = doc[page_num]
            text += page.get_text()

        doc.close()
        return text
    except Exception as e:
        print(f"Error extracting text from {pdf_path}: {str(e)}")
        return ""


In [None]:

def clean_text(self, text: str) -> str:
    """
    Clean and preprocess extracted text

    Args:
        text: Raw extracted text

    Returns:
        Cleaned text
    """
# Remove excessive whitespace
    text = re.sub(r'\s+', ' ', text)

# Remove special characters but keep medical terminology
    text = re.sub(r'[^\w\s\-\.\,\:\;\(\)\%\+\=\<\>]', '', text)

# Remove page numbers and headers/footers patterns
    text = re.sub(r'\b\d+\s*$', '', text, flags=re.MULTILINE)

# Remove excessive newlines
    text = re.sub(r'\n+', '\n', text)

    return text.strip()


vector store service implementation

In [None]:
"""
Vector store service for RAG system
Handles embeddings generation, storage, and similarity search using FAISS
"""

import json
import numpy as np
import requests
from typing import List, Dict, Tuple, Optional

class SimpleVectorStore:
    def __init__(self, dimension: int = 1536):
        """
        Initialize vector store

        Args:
            dimension: Dimension of embedding vectors (OpenAI embeddings are 1536)
        """
        self.dimension = dimension
        self.vectors = []
        self.metadata = []
        self.index_to_id = {}
        self.next_id = 0


In [None]:
def search(self, query_vector: np.ndarray, k: int = 5) -> List[Tuple[Dict, float]]:
    """Search for similar vectors"""
    if not self.vectors:
        return []

# Calculate cosine similarity
    similarities = []
    query_norm = np.linalg.norm(query_vector)

    for i, vector in enumerate(self.vectors):
        vector_norm = np.linalg.norm(vector)
        if vector_norm == 0 or query_norm == 0:
            similarity = 0
        else:
            similarity = np.dot(query_vector, vector) / (query_norm * vector_norm)
        similarities.append((i, similarity))

# Sort by similarity (descending)
    similarities.sort(key=lambda x: x[1], reverse=True)

# Return top k results
    results = []
    for i, (idx, score) in enumerate(similarities[:k]):
        results.append((self.metadata[idx], score))

    return results


In [None]:
def generate_embeddings(self, texts: List[str]) -> List[List[float]]:
    """
    Generate embeddings for list of texts using OpenAI API

    Args:
        texts: List of text strings to embed

    Returns:
        List of embedding vectors
    """
    if not self.api_key:
        raise ValueError("OpenAI API key not available")

    url = "https://api.openai.com/v1/embeddings"
    headers = {
        "Authorization": f"Bearer {self.api_key}",
        "Content-Type": "application/json"
    }

    all_embeddings = []
    batch_size = 100  # OpenAI API limit

    for i in range(0, len(texts), batch_size):
        batch_texts = texts[i:i + batch_size]

        data = {
            "input": batch_texts,
            "model": "text-embedding-ada-002"
        }

        try:
            response = requests.post(url, headers=headers, json=data)
            response.raise_for_status()

            result = response.json()
            embeddings = [item["embedding"] for item in result["data"]]
            all_embeddings.extend(embeddings)

            print(f"Generated embeddings for batch {i//batch_size + 1}/{(len(texts)-1)//batch_size + 1}")

        except Exception as e:
            print(f"Error generating embeddings for batch {i//batch_size + 1}: {e}")
            # Add zero vectors for failed batch
            all_embeddings.extend([[0.0] * 1536 for _ in batch_texts])

    return all_embeddings

building the complete RAG pipeline

In [None]:

"""
RAG Service - Retrieval Augmented Generation
Integrates document processing, vector search, and LLM generation
"""

import os
from typing import List, Dict, Optional
from .document_processor import DocumentProcessor
from .vector_store import VectorStoreService
import requests
import json

class RAGService:
    def __init__(self,
                 documents_dir: str = "rag_resources",
                 vector_store_path: str = "medical_vector_store.json",
                 chunk_size: int = 1000,
                 chunk_overlap: int = 200):
        """
        Initialize RAG service

        Args:
            documents_dir: Directory containing medical documents
            vector_store_path: Path to vector store file
            chunk_size: Size of text chunks
            chunk_overlap: Overlap between chunks
        """
        self.documents_dir = documents_dir
        self.doc_processor = DocumentProcessor(chunk_size, chunk_overlap)
        self.vector_store = VectorStoreService(vector_store_path)
        self.api_key = self._get_api_key()


In [None]:

def initialize_knowledge_base(self, force_rebuild: bool = False) -> bool:
    """
    Initialize the medical knowledge base from documents

    Args:
        force_rebuild: Whether to rebuild even if vector store exists

    Returns:
        True if successful, False otherwise
    """
# Check if vector store already exists and has content
    store_info = self.vector_store.get_store_info()
    if store_info['total_vectors'] > 0 and not force_rebuild:
        print(f"Knowledge base already initialized with {store_info['total_vectors']} vectors")
        print(f"Sources: {store_info['sources']}")
        return True

    print("Initializing medical knowledge base...")

# Check if documents directory exists
    if not os.path.exists(self.documents_dir):
        print(f"Documents directory {self.documents_dir} not found!")
        return False

    try:
# Process all PDF documents
        print(f"Processing documents from {self.documents_dir}...")
        all_chunks = self.doc_processor.process_documents_directory(self.documents_dir)

        if not all_chunks:
            print("No documents were processed successfully!")
            return False

# Filter for medical content
        print("Filtering for medical content...")
        medical_chunks = self.doc_processor.filter_medical_content(all_chunks)

        print(f"Found {len(medical_chunks)} relevant medical text chunks")

        if not medical_chunks:
            print("No relevant medical content found!")
            return False

# Add to vector store
        print("Adding documents to vector store...")
        self.vector_store.add_documents(medical_chunks)

# Print summary
        store_info = self.vector_store.get_store_info()
        print(f"\n✅ Knowledge base initialized successfully!")
        print(f"Total vectors: {store_info['total_vectors']}")
        print(f"Sources: {store_info['sources']}")

        return True

    except Exception as e:
        print(f"Error initializing knowledge base: {e}")
        return False


enhanced medical analysis with RAG

In [None]:

def enhanced_medical_analysis(self,
                            original_prompt: str,
                            patient_context: str = "",
                            prediction_results: Dict = None) -> str:
    """
    Provide enhanced medical analysis using RAG

    Args:
        original_prompt: Original analysis prompt
        patient_context: Patient information context
        prediction_results: AI prediction results

    Returns:
        Enhanced analysis with medical literature support
    """
    if not self.api_key:
        return "Error: OpenAI API key not available"

# Create search query for relevant literature
    search_query = f"diabetic retinopathy {original_prompt}"
    if prediction_results:
        search_query += f" grade {prediction_results.get('value', '')} {prediction_results.get('class', '')}"

# Get relevant medical context
    medical_context = self.get_relevant_medical_context(search_query, max_context_length=2500)

# Construct enhanced prompt
    enhanced_prompt = f"""You are an expert ophthalmologist providing analysis based on current medical literature and best practices.

{patient_context}

RELEVANT MEDICAL LITERATURE:
{medical_context}

ORIGINAL ANALYSIS REQUEST:
{original_prompt}

Please provide a comprehensive analysis that:
1. Incorporates insights from the relevant medical literature above
2. Follows evidence-based best practices
3. Cites specific findings from the literature when relevant
4. Provides both technical medical assessment and patient-friendly explanation
5. Includes appropriate recommendations based on current guidelines

**IMPORTANT FORMATTING INSTRUCTIONS:**
- When referencing literature or research findings, use the format: ***According to the literature, [finding]*** or ***Research indicates that [finding]*** or ***Studies show that [finding]***
- Make all literature citations bold and italic using ***text*** format
- This will help patients easily identify evidence-based information
- Example: ***According to recent studies, patients with moderate diabetic retinopathy have a 25% risk of progression within one year***

When referencing the literature, mention the source papers to add credibility to your analysis."""

    return self._call_gpt_with_enhanced_prompt(enhanced_prompt)


VQA integration and enhanced prompting

In [None]:

def analyze_retinal_image_and_heatmap(original_image, heatmap_figure, prediction_results, patient_age=None, diabetes_duration=None):
"""Analyze retinal image with heatmap using GPT-4o-mini vision capabilities enhanced with RAG"""
api_url = "https://api.openai.com/v1/chat/completions"
api_key = get_api_key()

if not api_key:
    raise ValueError("API key not found")

# Initialize RAG service
rag = initialize_rag_service()

headers = {
    "Authorization": f"Bearer {api_key}",
    "Content-Type": "application/json"
}

# Encode images to base64
original_b64 = encode_image_to_base64(original_image)
heatmap_b64 = encode_image_to_base64(heatmap_figure)

# Construct detailed prompt
patient_info = ""
if patient_age and diabetes_duration:
    patient_info = f"\nPatient Information:\n- Age: {patient_age} years\n- Duration of diabetes: {diabetes_duration} years\n"

# Get relevant medical literature context using RAG
medical_context = ""
if rag:
    try:
        search_query = f"diabetic retinopathy grade {prediction_results['value']} {prediction_results['class']} analysis heatmap fundus examination"
        medical_context = rag.get_relevant_medical_context(search_query, max_context_length=2000)
    except Exception as e:
        print(f"Warning: Could not retrieve medical context: {e}")


advanced prompting engineering for medical rag

In [None]:


python
# Enhanced prompt with medical literature context
    base_prompt = f"""You are an expert ophthalmologist AI assistant analyzing retinal images for diabetic retinopathy.

{patient_info}
AI Model Results:
- Predicted Class: {prediction_results['class']}
- Severity Grade: {prediction_results['value']}
- Confidence: {prediction_results['probability']:.2%}

I'm showing you two images:
1. The original retinal fundus photograph
2. A GradCAM heatmap visualization showing which areas the AI model focused on for its prediction"""

    if medical_context:
        enhanced_prompt = f"""{base_prompt}

RELEVANT MEDICAL LITERATURE:
{medical_context}

Please provide a comprehensive analysis that incorporates insights from the current medical literature above, including:

1. **Clinical Assessment**: Explain what the AI prediction means in medical terms, referencing relevant literature
2. **Heatmap Analysis**: Describe what the highlighted areas represent and their clinical significance based on current research
3. **Key Findings**: Identify specific retinal features visible in the image that support the diagnosis, citing literature when relevant
4. **Evidence-Based Patient Explanation**: Provide a clear, patient-friendly explanation supported by research findings
5. **Current Guidelines Recommendations**: Suggest appropriate next steps based on the latest clinical guidelines
6. **Monitoring Protocol**: Advise on follow-up frequency and warning signs based on evidence-based practices

**IMPORTANT FORMATTING INSTRUCTIONS:**
- When referencing literature or research findings, use the format: ***According to the literature, [finding]*** or ***Research indicates that [finding]*** or ***Studies show that [finding]***
- Make all literature citations bold and italic using ***text*** format
- This will help patients easily identify evidence-based information
- Example: ***According to recent studies, GradCAM highlighted areas typically indicate microaneurysms which are early signs of diabetic retinopathy***

When relevant, cite the medical literature to support your analysis and recommendations."""
        

QA enhancement

In [None]:

def answer_retinal_question(question, context_analysis, prediction_results, patient_age=None, diabetes_duration=None):
"""Answer specific questions about the retinal analysis using RAG-enhanced responses"""

# Initialize RAG service
rag = initialize_rag_service()

# Prepare patient context
patient_info = ""
if patient_age and diabetes_duration:
    patient_info = f"Patient: {patient_age} years old, diabetes for {diabetes_duration} years. "

patient_context = f"{patient_info}Current AI Results: {prediction_results['class']} (Grade {prediction_results['value']}, {prediction_results['probability']:.2%} confidence)"

# Use RAG-enhanced question answering if available
if rag:
    try:
        return rag.enhanced_question_answering(
            question=question,
            previous_analysis=context_analysis,
            patient_context=patient_context,
            prediction_results=prediction_results
        )
    except Exception as e:
        print(f"Warning: RAG-enhanced answering failed, falling back to basic method: {e}")

# Fallback to original method# ... [existing fallback code]
