<a href="https://colab.research.google.com/github/abdullahmujahidali/Vet-LangGraph/blob/main/pymupdf_VetAI.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [4]:
!pip install -q langchain langgraph langchain_openai openai pinecone langchain_community pymupdf

import os
from typing import List, Dict, Tuple, Annotated, TypedDict, Union, Any, Optional, Literal
from datetime import datetime
from enum import Enum
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain.prompts import ChatPromptTemplate
from langchain_core.messages import HumanMessage, AIMessage
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.document_loaders import PyPDFLoader
from pinecone import Pinecone
import base64
from google.colab import userdata
import json
import fitz

OPENAI_API_KEY = userdata.get('OPENAI_API_KEY')
PINECONE_API_KEY = userdata.get('PINECONE_API_KEY')
os.environ['OPENAI_API_KEY'] = OPENAI_API_KEY
os.environ['PINECONE_API_KEY'] = PINECONE_API_KEY

llm = ChatOpenAI(
    model="gpt-3.5-turbo",
    temperature=0
)

pc = Pinecone(
    api_key=os.getenv("PINECONE_API_KEY")
)

index_name = "veterinary-ai-index"
embeddings = OpenAIEmbeddings()


class VetState(TypedDict):
    messages: List[Union[HumanMessage, AIMessage]]
    current_input: str
    medical_records: Dict[str, Any]
    results: Dict[str, Any]
    metadata: Dict[str, Any]
    context: List[str]
    images: List[Dict[str, Any]]

In [5]:
def query_pinecone(query: str, top_k: int = 5) -> List[dict]:
    """Query Pinecone index for relevant context"""
    query_embedding = embeddings.embed_query(query)
    index = pc.Index(index_name)
    results = index.query(
        vector=query_embedding,
        top_k=top_k,
        include_metadata=True
    )
    return results.matches

def handle_error(state: VetState, error_message: str) -> VetState:
    new_state = state.copy()
    if "errors" not in new_state["results"]:
        new_state["results"]["errors"] = []
    new_state["results"]["errors"].append({
        "timestamp": datetime.now().isoformat(),
        "message": error_message,
        "context": state.get("context", [])
    })
    return new_state

def process_pdf(file_path: str) -> Tuple[List[str], List[Dict[str, Any]]]:
    """
    Process PDF document and extract both text and images
    Returns: Tuple of (text_chunks, image_data)
    """
    try:
        doc = fitz.open(file_path)
        text_chunks = []
        images = []

        for page_num in range(len(doc)):
            page = doc[page_num]
            text = page.get_text()
            if text.strip():
                text_chunks.append(text)
            image_list = page.get_images()
            for img_index, img in enumerate(image_list):
                try:
                    xref = img[0]
                    base_image = doc.extract_image(xref)
                    image_data = {
                        'page_num': page_num + 1,
                        'image_index': img_index,
                        'extension': base_image["ext"],
                        'content': base64.b64encode(base_image["image"]).decode(),
                        'metadata': {
                            'width': base_image.get('width'),
                            'height': base_image.get('height'),
                            'colorspace': base_image.get('colorspace')
                        }
                    }
                    images.append(image_data)
                except Exception as e:
                    print(f"Error extracting image {img_index} from page {page_num + 1}: {str(e)}")

        text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=1000,
            chunk_overlap=200,
            length_function=len,
        )

        if text_chunks:
            combined_text = "\n\n".join(text_chunks)
            final_chunks = text_splitter.split_text(combined_text)
        else:
            final_chunks = []

        doc.close()
        return final_chunks, images

    except Exception as e:
        print(f"Error processing PDF: {str(e)}")
        return [], []



def extract_tables(page: fitz.Page) -> List[Dict[str, Any]]:
    """Extract tables from a PDF page"""
    tables = []
    try:
        table_areas = page.find_tables()
        if table_areas:
            for table_index, table in enumerate(table_areas):
                table_data = {
                    'rows': table.extract(),
                    'bbox': table.bbox,
                    'table_index': table_index
                }
                tables.append(table_data)
    except Exception as e:
        print(f"Error extracting tables: {str(e)}")
    return tables

def create_data_processing_agent():
    def process_data(state: VetState) -> VetState:
        try:
            relevant_docs = query_pinecone(state["current_input"])
            context = [doc.metadata.get("text", "") for doc in relevant_docs]

            if state.get("images"):
                image_context = "Document contains the following images:\n"
                for idx, img in enumerate(state["images"]):
                    image_context += f"\nImage {idx + 1} on page {img['page_num']}:\n"
                    image_context += f"- Type: {img['extension']}\n"
                    image_context += f"- Dimensions: {img['metadata']['width']}x{img['metadata']['height']}\n"
                context.append(image_context)

            context_text = "\n\n".join(context)

            prompt = ChatPromptTemplate.from_messages([
                ("system", """You are a veterinary data processing agent conducting detailed initial examination.
                Consider medical history, current findings, and any images or tables present: {context}

                EXTRACT AND STRUCTURE:
                1. Detailed History
                   - Duration and progression of symptoms
                   - Previous treatments and responses
                   - Specific symptom frequencies and patterns

                2. Current Presentation
                   - Vital signs with normal ranges
                   - Physical exam findings with abnormalities highlighted
                   - Current medications with exact dosing

                3. Key Clinical Findings
                   - Specific abnormalities noted
                   - Response to treatments
                   - Changes in condition

                4. Laboratory Results
                   - All test results with reference ranges
                   - Highlight abnormal values
                   - Note trending changes

                5. Diagnostic Imaging
                   - All imaging findings
                   - Compare with previous if available

                6. Visual Data Analysis
                   - Describe any relevant images
                   - Analyze table contents
                   - Note any visual abnormalities

                OUTPUT FORMAT:
                - Patient Information (detailed demographics)
                - Comprehensive History (including timeline)
                - Current Clinical Status
                - Specific Abnormal Findings
                - Treatment History with Responses
                - Detailed Lab Results Analysis
                - Image and Table Analysis"""),
                ("human", "Process this veterinary case data in detail: {input}")
            ])

            messages = prompt.format_messages(
                input=state["current_input"],
                context=context_text
            )
            response = llm.invoke(messages)

            new_state = state.copy()
            new_state["messages"].append(response)
            new_state["results"]["DataProcessor"] = response.content
            new_state["context"] = context

            return new_state

        except Exception as e:
            return handle_error(state, f"Data Processing Error: {str(e)}")

    return process_data


def create_history_analysis_agent():
    def analyze_history(state: VetState) -> VetState:
        try:
            processed_data = state["results"]["DataProcessor"]
            context = state.get("context", [])

            prompt = ChatPromptTemplate.from_messages([
                ("system", """You are a veterinary history specialist analyzing disease progression.

                DETAILED ANALYSIS REQUIREMENTS:
                1. Timeline Construction
                   - Initial onset of symptoms
                   - Progression pattern
                   - Treatment trials and responses
                   - Key clinical events

                2. Treatment History Analysis
                   - Specific medications used (with doses)
                   - Duration of treatments
                   - Response patterns
                   - Treatment failures or successes
                   - Side effects noted

                3. Clinical Pattern Recognition
                   - Disease progression rate
                   - Symptom clusters
                   - Trigger factors
                   - Complicating factors
                   - Risk factors

                4. Owner Compliance & Home Care
                   - Treatment adherence
                   - Home monitoring
                   - Environmental factors
                   - Quality of life impacts

                FORMAT:
                Provide chronological analysis with:
                - Specific dates
                - Treatment durations
                - Response patterns
                - Clinical correlations"""),
                ("human", "Analyze this case history with detailed timeline: {processed_data}")
            ])

            messages = prompt.format_messages(
                processed_data=processed_data,
                context="\n\n".join(context)
            )
            response = llm.invoke(messages)

            new_state = state.copy()
            new_state["messages"].append(response)
            new_state["results"]["HistoryAnalyzer"] = response.content

            return new_state

        except Exception as e:
            return handle_error(state, f"History Analysis Error: {str(e)}")

    return analyze_history

def create_clinical_analysis_agent():
    def analyze_clinical(state: VetState) -> VetState:
        try:
            history_analysis = state["results"]["HistoryAnalyzer"]
            context = state.get("context", [])

            prompt = ChatPromptTemplate.from_messages([
                ("system", """You are a veterinary clinical analyst providing comprehensive medical insights.
                Consider this historical context: {context}

                CLINICAL ANALYSIS REQUIREMENTS:
                1. Physical Examination Findings
                   - Detailed review of all systems
                   - Vital signs trends
                   - Specific abnormalities with clinical significance
                   - Compare to previous exams

                2. Laboratory Data Analysis
                   - Review all results with reference ranges
                   - Trend analysis of serial results
                   - Clinical correlation of abnormalities
                   - Impact on treatment decisions

                3. Diagnostic Imaging Review
                   - Detailed findings analysis
                   - Progressive changes noted
                   - Clinical correlation
                   - Impact on treatment plan

                4. Treatment Response Assessment
                   - Effectiveness of current therapy
                   - Side effects observed
                   - Need for adjustments
                   - Compliance factors

                FORMAT:
                Provide detailed analysis with:
                - Specific clinical findings
                - Laboratory result interpretations
                - Treatment response evaluation
                - Recommendations for adjustments"""),
                ("human", "Provide detailed clinical analysis for: {history_analysis}")
            ])

            messages = prompt.format_messages(
                history_analysis=history_analysis,
                context="\n\n".join(context)
            )
            response = llm.invoke(messages)

            new_state = state.copy()
            new_state["messages"].append(response)
            new_state["results"]["ClinicalAnalyzer"] = response.content

            return new_state

        except Exception as e:
            return handle_error(state, f"Clinical Analysis Error: {str(e)}")

    return analyze_clinical

def create_diagnostic_agent():
    def diagnose(state: VetState) -> VetState:
        try:
            clinical_analysis = state["results"]["ClinicalAnalyzer"]
            context = state.get("context", [])

            diagnostic_context = query_pinecone(clinical_analysis)
            additional_context = [doc.metadata.get("text", "") for doc in diagnostic_context]
            combined_context = context + additional_context

            prompt = ChatPromptTemplate.from_messages([
                ("system", """You are a veterinary diagnostician providing expert assessment.
                Consider all findings and clinical context: {context}

                DIAGNOSTIC PROCESS:
                1. Primary Problem Assessment
                   - Define and rank clinical issues
                   - Document supporting evidence
                   - Note symptom patterns and severity
                   - Assess disease stage/progression

                2. Differential Analysis
                   - Comprehensive differential list
                   - Evidence supporting/refuting each
                   - Most likely diagnosis with rationale
                   - Rule-outs with justification

                3. Diagnostic Plan
                   - Specific tests needed with rationale
                   - Expected findings
                   - Monitoring parameters
                   - Follow-up timeline

                4. Treatment Recommendations
                   - Current treatment assessment
                   - Specific medication adjustments
                   - Dosing recommendations
                   - Monitoring plan

                FORMAT:
                Provide detailed assessment with:
                - Definitive diagnoses
                - Supporting evidence
                - Specific recommendations
                - Follow-up plan"""),
                ("human", "Provide comprehensive diagnostic assessment for: {clinical_analysis}")
            ])

            messages = prompt.format_messages(
                clinical_analysis=clinical_analysis,
                context="\n\n".join(combined_context)
            )
            response = llm.invoke(messages)

            new_state = state.copy()
            new_state["messages"].append(response)
            new_state["results"]["Diagnostics"] = response.content
            new_state["context"] = combined_context

            return new_state

        except Exception as e:
            return handle_error(state, f"Diagnostic Error: {str(e)}")

    return diagnose

def create_specialist_summary_agent():
    def generate_summaries(state: VetState) -> VetState:
        try:
            diagnostic_assessment = state["results"]["Diagnostics"]
            context = state.get("context", [])

            prompt = ChatPromptTemplate.from_messages([
                ("system", """You are a veterinary specialist providing comprehensive case assessment.
                Consider all diagnostic findings: {context}

                SPECIALIST ASSESSMENT REQUIREMENTS:
                1. Disease Process Analysis
                   - Detailed pathophysiology
                   - Disease stage and severity
                   - Complicating factors
                   - Prognosis assessment

                2. Treatment Plan Development
                   - Comprehensive treatment strategy
                   - Medication recommendations with dosing
                   - Therapeutic goals
                   - Expected outcomes

                3. Client Education
                   - Disease process explanation
                   - Treatment rationale
                   - Home care instructions
                   - Warning signs to monitor

                4. Follow-up Planning
                   - Monitoring parameters
                   - Recheck schedule
                   - Treatment milestones
                   - Quality of life assessment

                FORMAT:
                1. Specialist-Level Assessment
                2. Owner-Friendly Summary
                3. Detailed Treatment Plan
                4. Monitoring Strategy
                5. Follow-up Schedule"""),
                ("human", "Generate comprehensive specialist assessment for: {diagnostic_assessment}")
            ])

            messages = prompt.format_messages(
                diagnostic_assessment=diagnostic_assessment,
                context="\n\n".join(context)
            )
            response = llm.invoke(messages)

            new_state = state.copy()
            new_state["messages"].append(response)
            new_state["results"]["SpecialistSummary"] = response.content

            return new_state

        except Exception as e:
            return handle_error(state, f"Specialist Summary Error: {str(e)}")

    return generate_summaries

def create_connections_agent():
    def analyze_connections(state: VetState) -> VetState:
        try:
            previous_analyses = {
                "data_processing": state["results"]["DataProcessor"],
                "history": state["results"]["HistoryAnalyzer"],
                "clinical": state["results"]["ClinicalAnalyzer"],
                "diagnostics": state["results"]["Diagnostics"],
                "specialist": state["results"]["SpecialistSummary"]
            }

            context = state.get("context", [])

            prompt = ChatPromptTemplate.from_messages([
                ("system", """You are a veterinary case integration specialist.
                Consider all case analyses: {context}

                INTEGRATION REQUIREMENTS:
                1. Disease Pattern Analysis
                   - Connect symptoms to pathophysiology
                   - Identify disease interactions
                   - Note complicating factors
                   - Assess disease synergy

                2. Treatment Integration
                   - Analyze treatment interactions
                   - Identify synergistic approaches
                   - Note potential conflicts
                   - Optimize therapeutic strategy

                3. Monitoring Integration
                   - Key parameters to track
                   - Critical thresholds
                   - Warning signs
                   - Quality of life indicators

                4. Long-term Management
                   - Disease progression expectations
                   - Preventive strategies
                   - Quality of life considerations
                   - Client support needs

                FORMAT:
                Provide integrated assessment with:
                1. Unified Case Theory
                2. Comprehensive Treatment Strategy
                3. Integrated Monitoring Plan
                4. Long-term Management Guidelines"""),
                ("human", "Provide comprehensive case integration analysis for: {previous_analyses}")
            ])

            messages = prompt.format_messages(
                previous_analyses=json.dumps(previous_analyses, indent=2),
                context="\n\n".join(context)
            )
            response = llm.invoke(messages)

            new_state = state.copy()
            new_state["messages"].append(response)
            new_state["results"]["Connections"] = response.content

            return new_state

        except Exception as e:
            return handle_error(state, f"Connections Analysis Error: {str(e)}")

    return analyze_connections

def create_workflow():
    """Create and configure the workflow"""
    workflow = StateGraph(VetState)


    workflow.add_node("process_data", create_data_processing_agent())
    workflow.add_node("analyze_history", create_history_analysis_agent())
    workflow.add_node("analyze_clinical", create_clinical_analysis_agent())
    workflow.add_node("diagnose", create_diagnostic_agent())
    workflow.add_node("specialist_summary", create_specialist_summary_agent())
    workflow.add_node("analyze_connections", create_connections_agent())

    workflow.set_entry_point("process_data")

    workflow.add_edge("process_data", "analyze_history")
    workflow.add_edge("analyze_history", "analyze_clinical")
    workflow.add_edge("analyze_clinical", "diagnose")
    workflow.add_edge("diagnose", "specialist_summary")
    workflow.add_edge("specialist_summary", "analyze_connections")
    workflow.add_edge("analyze_connections", END)

    return workflow.compile()

def analyze_veterinary_case(file_path: str):
    """Main function to analyze a veterinary case"""
    try:
        print("Processing document...")
        text_chunks, images = process_pdf(file_path)

        initial_state = {
            "messages": [],
            "current_input": "\n\n".join(text_chunks),
            "medical_records": {},
            "results": {
                "DataProcessor": "",
                "HistoryAnalyzer": "",
                "ClinicalAnalyzer": "",
                "Diagnostics": "",
                "SpecialistSummary": "",
                "Connections": ""
            },
            "metadata": {},
            "context": []
        }

        print("Creating workflow...")
        app = create_workflow()

        print("Running analysis...")
        result = app.invoke(initial_state)

        return result

    except Exception as e:
        print(f"Error in analysis: {str(e)}")
        raise

In [6]:
    file_path = "./input.pdf"
    result = analyze_veterinary_case(file_path)

    print("\nAnalysis Results:")
    print("-" * 50)

    sections = [
        ("Data Processing", "DataProcessor"),
        ("History Analysis", "HistoryAnalyzer"),
        ("Clinical Analysis", "ClinicalAnalyzer"),
        ("Diagnostic Assessment", "Diagnostics"),
        ("Specialist Summary", "SpecialistSummary"),
        ("Connections Analysis", "Connections")
    ]

    for title, key in sections:
        print(f"\n{title}:")
        print("-" * 30)
        print(result["results"].get(key, "Not available"))

    if "errors" in result["results"]:
        print("\nErrors encountered:")
        for error in result["results"]["errors"]:
            print(f"- {error['message']} (at {error['timestamp']}")

Processing document...
Creating workflow...
Running analysis...

Analysis Results:
--------------------------------------------------

Data Processing:
------------------------------
### Patient Information:
- **Patient Name:** Jax Miracle
- **Patient ID:** CA2441B8
- **Species:** Canine Pomeranian
- **Sex:** Male, Neutered
- **Age:** 4 years old
- **Date of Birth:** December 20, 2020
- **Weight:** 6.1 kg
- **Microchip:** None

### Comprehensive History:
- **Symptoms:** Coughing, hacking, panting loudly
- **Duration of Symptoms:** Not specified
- **Vomiting:** None reported
- **Diarrhea:** Normal feces
- **Coughing/Sneezing:** Dry cough
- **Changes in Drinking:** No change
- **Changes in Urination:** No change
- **Changes in Appetite:** No change
- **New Lumps/Bumps:** None reported
- **Travel History:** No recent travel outside Kentucky/Southern IN region
- **Tick Exposure:** No ticks found in the past year
- **Heartworm Prevention:** Yes
- **Current Diet:** Variety, Pedigree canned f