In [None]:
! pip install crewai PyPDF2 openai

In [None]:
! pip install --upgrade PyPDF2 crewai gradio

In [None]:
! pip install ratelimit bleach python-dotenv

Key Elements

Enterprise-grade document analysis system leveraging state-of-the-art LLMs and agent-based architecture.
This system implements a sophisticated multi-agent approach for document processing, analysis, and interactive querying.

Key Features:
- Distributed agent-based processing
- Real-time analysis pipeline
- Fault-tolerant error handling
- Stateful document chat capabilities
- Progress monitoring and callback system

Architecture Overview:
1. Document Extraction Layer
2. Analysis Pipeline Layer
3. Interactive Query Layer
4. Presentation Layer


In [None]:
# Required installations - run in a cell:
! pip install agentops plotly ipywidgets PyPDF2 crewai pandas
 

In [None]:
%pip install -U agentops

In [None]:
! pip install --upgrade agentops


In [None]:
import os
import time
import traceback
from crewai import Agent, Task, Crew
import PyPDF2
import json
import gradio as gr
from datetime import datetime
from typing import Dict, List, Any
from queue import Queue
from threading import Thread

def extract_text_from_pdf(pdf_file):
    """Extract text from a PDF file."""
    try:
        pdf_reader = PyPDF2.PdfReader(pdf_file.name)
        text = ""
        for page in pdf_reader.pages:
            text += page.extract_text()
        return text
    except Exception as e:
        raise Exception(f"Error extracting text from PDF: {str(e)}")


# Default reference data
reference_data = {
    "limitation of liability": "30 days",
    "owner expiry date": "30 days",
    "notice period": "30 days",
    "agreement duration": "12 months",
    "payment terms": "30 days",
    "confidentiality period": "3 years",
    "insurance coverage": "$1,000,000",
    "maximum monthly hours": "120"
}

class OutputManager:
    def __init__(self):
        self.output_queue = Queue()
        self.agent_progress_queue = Queue()
        self.verbose_queue = Queue()  # New queue for verbose output
        self.process_flow_queue = Queue()  # New queue for process flow
        self.is_running = True
        self.output_thread = Thread(target=self._process_output)
        self.output_thread.start()
        self.callbacks = []
        self.agent_progress_callbacks = []
        self.verbose_callbacks = []  # New callbacks for verbose output
        self.process_flow_callbacks = []  # New callbacks for process flow

    def _process_output(self):
        while self.is_running:
            # Process regular output
            if not self.output_queue.empty():
                output = self.output_queue.get()
                for callback in self.callbacks:
                    callback(output)
            
            # Process agent progress
            if not self.agent_progress_queue.empty():
                progress = self.agent_progress_queue.get()
                for callback in self.agent_progress_callbacks:
                    callback(progress)
            
            # Process verbose output
            if not self.verbose_queue.empty():
                verbose = self.verbose_queue.get()
                for callback in self.verbose_callbacks:
                    callback(verbose)
            
            # Process flow visualization
            if not self.process_flow_queue.empty():
                flow = self.process_flow_queue.get()
                for callback in self.process_flow_callbacks:
                    callback(flow)
            
            time.sleep(0.1)

    def add_output(self, output: str):
        self.output_queue.put(output)

    def add_agent_progress(self, progress: str):
        self.agent_progress_queue.put(progress)

    def add_verbose_output(self, verbose: str):
        self.verbose_queue.put(f"[{datetime.now().strftime('%H:%M:%S')}] {verbose}")

    def add_process_flow(self, flow: str):
        self.process_flow_queue.put(f"→ {flow}")

    def register_callback(self, callback):
        self.callbacks.append(callback)

    def register_agent_progress_callback(self, callback):
        self.agent_progress_callbacks.append(callback)

    def register_verbose_callback(self, callback):
        self.verbose_callbacks.append(callback)

    def register_process_flow_callback(self, callback):
        self.process_flow_callbacks.append(callback)

    def stop(self):
        self.is_running = False
        self.output_thread.join()

class MemoryTracker:
    def __init__(self):
        self.memory_logs: Dict[str, List[Dict[str, Any]]] = {}
        
    def log_memory_event(self, agent_name: str, event_type: str, content: str):
        timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        
        if agent_name not in self.memory_logs:
            self.memory_logs[agent_name] = []
            
        event = {
            "timestamp": timestamp,
            "type": event_type,
            "content": content
        }
        
        self.memory_logs[agent_name].append(event)
        return event
    
    def get_memory_summary(self, agent_name: str = None) -> str:
        if agent_name:
            if agent_name not in self.memory_logs:
                return f"No memory events logged for {agent_name}"
            
            summary = f"Memory Events for {agent_name}:\n\n"
            for event in self.memory_logs[agent_name]:
                summary += f"[{event['timestamp']}] {event['type']}\n{event['content']}\n\n"
            return summary
        else:
            full_summary = "Complete Memory Status:\n\n"
            for agent in self.memory_logs.keys():
                full_summary += f"=== {agent} ===\n"
                full_summary += self.get_memory_summary(agent) + "\n"
            return full_summary

class DocumentAnalyzerCrew:
    def __init__(self, document_text, reference_data, api_key, output_manager):
        os.environ["OPENAI_API_KEY"] = api_key
        self.output_manager = output_manager
        self.memory_tracker = MemoryTracker()
        
        self.text_extraction_agent = Agent(
            role="Text Extraction Specialist",
            goal="Extract precise key information from documents",
            backstory="An expert in parsing complex documents and identifying critical information with high accuracy",
            verbose=True,
            allow_delegation=False,
            memory=True
        )

        self.comparison_agent = Agent(
            role="Document Comparison Expert",
            goal="Compare extracted document data against reference standards",
            backstory="A meticulous analyst specializing in identifying discrepancies between document contents and expected standards",
            verbose=True,
            allow_delegation=False,
            memory=True
        )

        self.reporting_agent = Agent(
            role="Detailed Report Generator",
            goal="Create comprehensive and clear analysis reports",
            backstory="A skilled communicator who transforms technical findings into clear, actionable insights",
            verbose=True,
            allow_delegation=False,
            memory=True
        )

        self.document_text = document_text
        self.reference_data = reference_data

    def analyze_document(self):
        try:
            # Extraction Phase
            self.output_manager.add_output("🔍 Starting Text Extraction Phase...")
            self.output_manager.add_agent_progress("🤖 Text Extraction Agent: Beginning document analysis...")
            self.output_manager.add_process_flow("Initiating Text Extraction Phase")
            
            event = self.memory_tracker.log_memory_event(
                self.text_extraction_agent.role,
                "Task Start",
                "Initiating document extraction"
            )
            self.output_manager.add_verbose_output(f"Memory Event: {event['type']} - {event['content']}")

            extraction_task = Task(
                description=f"""
                Extract key information from the document text:
                Document Content (first 2000 chars): {self.document_text[:2000]}
                
                Focus on these key points:
                {', '.join(self.reference_data.keys())}
                """,
                agent=self.text_extraction_agent,
                expected_output="Dictionary containing extracted key-value pairs from the document"
            )

            extraction_crew = Crew(
                agents=[self.text_extraction_agent],
                tasks=[extraction_task],
                verbose=True
            )

            self.output_manager.add_agent_progress("📝 Text Extraction Agent: Processing document content...")
            self.output_manager.add_process_flow("Executing Text Extraction")
            extraction_result = extraction_crew.kickoff()
            self.output_manager.add_output("✅ Text Extraction Complete")
            self.output_manager.add_agent_progress("✅ Text Extraction Agent: Completed extraction process")
            self.output_manager.add_process_flow("Text Extraction Complete")
            
            event = self.memory_tracker.log_memory_event(
                self.text_extraction_agent.role,
                "Task Complete",
                f"Extraction Results: {str(extraction_result)[:200]}..."
            )
            self.output_manager.add_verbose_output(f"Memory Event: {event['type']} - {event['content']}")

            # Comparison Phase
            self.output_manager.add_output("🔬 Starting Comparison Analysis...")
            self.output_manager.add_agent_progress("🤖 Comparison Agent: Starting comparison analysis...")
            self.output_manager.add_process_flow("Initiating Comparison Phase")
            
            event = self.memory_tracker.log_memory_event(
                self.comparison_agent.role,
                "Task Start",
                "Initiating comparison analysis"
            )
            self.output_manager.add_verbose_output(f"Memory Event: {event['type']} - {event['content']}")

            comparison_task = Task(
                description=f"""
                Compare extracted data against reference standards:
                Reference Data: {json.dumps(self.reference_data, indent=2)}
                Extracted Data: {extraction_result}
                """,
                agent=self.comparison_agent,
                expected_output="Detailed comparison report highlighting matches and discrepancies"
            )

            comparison_crew = Crew(
                agents=[self.comparison_agent],
                tasks=[comparison_task],
                verbose=True
            )

            self.output_manager.add_agent_progress("📊 Comparison Agent: Analyzing data differences...")
            self.output_manager.add_process_flow("Executing Comparison Analysis")
            comparison_result = comparison_crew.kickoff()
            self.output_manager.add_output("✅ Comparison Analysis Complete")
            self.output_manager.add_agent_progress("✅ Comparison Agent: Completed analysis")
            self.output_manager.add_process_flow("Comparison Analysis Complete")
            
            event = self.memory_tracker.log_memory_event(
                self.comparison_agent.role,
                "Task Complete",
                f"Comparison Results: {str(comparison_result)[:200]}..."
            )
            self.output_manager.add_verbose_output(f"Memory Event: {event['type']} - {event['content']}")

            # Reporting Phase
            self.output_manager.add_output("📝 Generating Final Report...")
            self.output_manager.add_agent_progress("🤖 Report Generator: Starting report creation...")
            self.output_manager.add_process_flow("Initiating Report Generation")
            
            event = self.memory_tracker.log_memory_event(
                self.reporting_agent.role,
                "Task Start",
                "Generating comprehensive report"
            )
            self.output_manager.add_verbose_output(f"Memory Event: {event['type']} - {event['content']}")

            reporting_task = Task(
                description=f"""
                Generate a comprehensive analysis report based on:
                Extraction Results: {extraction_result}
                Comparison Analysis: {comparison_result}
                
                Include:
                1. Executive Summary
                2. Key Findings
                3. Detailed Analysis
                4. Recommendations
                """,
                agent=self.reporting_agent,
                expected_output="Comprehensive analysis report with findings and recommendations"
            )

            reporting_crew = Crew(
                agents=[self.reporting_agent],
                tasks=[reporting_task],
                verbose=True
            )

            self.output_manager.add_agent_progress("📊 Report Generator: Compiling findings and recommendations...")
            self.output_manager.add_process_flow("Executing Report Generation")
            final_report = reporting_crew.kickoff()
            self.output_manager.add_output("✅ Final Report Generated")
            self.output_manager.add_agent_progress("✅ Report Generator: Report completed")
            self.output_manager.add_process_flow("Report Generation Complete")
            
            event = self.memory_tracker.log_memory_event(
                self.reporting_agent.role,
                "Task Complete",
                f"Final Report: {str(final_report)[:200]}..."
            )
            self.output_manager.add_verbose_output(f"Memory Event: {event['type']} - {event['content']}")

            return str(extraction_result), str(comparison_result), str(final_report)

        except Exception as e:
            error_msg = f"Analysis Error: {str(e)}\n{traceback.format_exc()}"
            self.output_manager.add_output(f"❌ Error: {error_msg}")
            self.output_manager.add_agent_progress("❌ Error occurred during analysis")
            self.output_manager.add_process_flow("Error in Analysis Process")
            self.output_manager.add_verbose_output(f"Error: {error_msg}")
            return error_msg, error_msg, error_msg

class DocumentChatCrew:
    def __init__(self, document_report, api_key, output_manager):
        os.environ["OPENAI_API_KEY"] = api_key
        self.output_manager = output_manager
        self.memory_tracker = MemoryTracker()
        
        self.chat_agent = Agent(
            role="Document Chat Specialist",
            goal="Provide accurate and contextual responses about the document",
            backstory=f"""Expert AI assistant analyzing this document report:
            {document_report}""",
            verbose=True,
            allow_delegation=False,
            memory=True
        )
        
        self.document_report = document_report

    def chat_with_document(self, user_message):
        try:
            self.output_manager.add_agent_progress("🤖 Chat Agent: Starting to process your question...")
            self.output_manager.add_process_flow("Initiating Chat Processing")
            
            event = self.memory_tracker.log_memory_event(
                self.chat_agent.role,
                "Chat Query",
                f"User Question: {user_message}"
            )
            self.output_manager.add_verbose_output(f"Memory Event: {event['type']} - {event['content']}")
            
            self.output_manager.add_agent_progress("🤔 Chat Agent: Analyzing document context and formulating response...")
            self.output_manager.add_process_flow("Analyzing Document Context")
            
            chat_task = Task(
                description=f"""
                Based on this document context:
                {self.document_report}
                
                Answer this user query:
                {user_message}
                """,
                agent=self.chat_agent,
                expected_output="Detailed response to user query based on document context"
            )

            chat_crew = Crew(
                agents=[self.chat_agent],
                tasks=[chat_task],
                verbose=True
            )

            self.output_manager.add_agent_progress("✍️ Chat Agent: Generating detailed response...")
            self.output_manager.add_process_flow("Generating Response")
            response = chat_crew.kickoff()
            self.output_manager.add_agent_progress("✅ Chat Agent: Response generated successfully!")
            self.output_manager.add_process_flow("Response Generation Complete")
            
            event = self.memory_tracker.log_memory_event(
                self.chat_agent.role,
                "Chat Response",
                f"Response: {str(response)[:200]}..."
            )
            self.output_manager.add_verbose_output(f"Memory Event: {event['type']} - {event['content']}")
            
            return str(response), self.memory_tracker.get_memory_summary(self.chat_agent.role)

        except Exception as e:
            error_msg = f"Chat Error: {str(e)}\n{traceback.format_exc()}"
            self.output_manager.add_agent_progress("❌ Chat Agent: Error occurred during processing")
            self.output_manager.add_process_flow("Error in Chat Process")
def launch_gradio_interface():
    output_manager = OutputManager()
    
    with gr.Blocks(title="📑 Advanced Document Analysis System", theme=gr.themes.Soft()) as demo:
        # States
        document_state = gr.State(None)
        output_state = gr.State("")
        memory_state = gr.State("")
        verbose_state = gr.State("")  # New state for verbose output
        process_flow_state = gr.State("")  # New state for process flow
        
        with gr.Tabs():
            # Document Analysis Tab
            with gr.TabItem("📄 Document Analysis"):
                gr.Markdown("""
                # 📄 Intelligent Document Analysis
                Upload a PDF document and let our AI agents analyze it in detail.
                """)
                
                with gr.Row():
                    with gr.Column(scale=1):
                        api_key = gr.Textbox(
                            label="OpenAI API Key",
                            type="password",
                            placeholder="Enter your OpenAI API key"
                        )
                        pdf_file = gr.File(
                            label="Upload PDF Document",
                            file_types=[".pdf"]
                        )
                        analyze_btn = gr.Button(
                            "🚀 Start Analysis",
                            variant="primary"
                        )
                
                with gr.Row():
                    with gr.Column(scale=2):
                        with gr.Group():
                            gr.Markdown("### 📊 Analysis Progress")
                            extraction_status = gr.Textbox(
                                label="Extraction Phase",
                                placeholder="Waiting to start...",
                                lines=2
                            )
                            comparison_status = gr.Textbox(
                                label="Comparison Phase",
                                placeholder="Waiting to start...",
                                lines=2
                            )
                            report_status = gr.Textbox(
                                label="Final Report",
                                placeholder="Waiting to start...",
                                lines=4
                            )
                
                with gr.Row():
                    with gr.Column(scale=1):
                        verbose_output = gr.TextArea(
                            label="🔍 Real-time Agent Activity",
                            placeholder="Agent activities will appear here in real-time...",
                            lines=10,
                            max_lines=15,
                            interactive=False
                        )
                    with gr.Column(scale=1):
                        memory_output = gr.TextArea(
                            label="🧠 Agent Memory Status",
                            placeholder="Agent memory events will be tracked here...",
                            lines=10,
                            max_lines=15,
                            interactive=False
                        )
                    with gr.Column(scale=1):
                        agent_progress = gr.TextArea(
                            label="🔄 Agent Progress",
                            placeholder="Step-by-step agent progress will appear here...",
                            lines=10,
                            max_lines=15,
                            interactive=False
                        )
                
                # New section for process flow visualization
                with gr.Row():
                    process_flow = gr.TextArea(
                        label="⚡ Process Flow Visualization",
                        placeholder="Step-by-step process flow will be shown here...",
                        lines=5,
                        max_lines=10,
                        interactive=False
                    )
            
            # Document Chat Tab
            with gr.TabItem("💬 Chat with Document"):
                gr.Markdown("""
                # 💬 Interactive Document Chat
                Ask questions about the analyzed document and get detailed responses.
                """)
                
                with gr.Row():
                    with gr.Column(scale=1):
                        chat_message = gr.Textbox(
                            label="Your Question",
                            placeholder="Ask anything about the document..."
                        )
                        chat_btn = gr.Button(
                            "🤖 Ask Question",
                            variant="primary"
                        )
                    
                    with gr.Column(scale=2):
                        chat_response = gr.TextArea(
                            label="💬 AI Response",
                            placeholder="AI responses will appear here...",
                            lines=8
                        )
                        chat_memory = gr.TextArea(
                            label="🧠 Chat Memory Log",
                            placeholder="Chat agent memory events will be tracked here...",
                            lines=8
                        )
                        chat_agent_progress = gr.TextArea(
                            label="🔄 Chat Agent Progress",
                            placeholder="Step-by-step chat agent progress will appear here...",
                            lines=8,
                            interactive=False
                        )
                
                # New section for chat process flow visualization
                with gr.Row():
                    chat_process_flow = gr.TextArea(
                        label="⚡ Chat Process Flow",
                        placeholder="Step-by-step chat process flow will be shown here...",
                        lines=5,
                        max_lines=10,
                        interactive=False
                    )

        # Update functions
        def update_output(output):
            output_state.value += f"\n{output}"
            return output_state.value

        def update_verbose(verbose):
            verbose_state.value += f"\n{verbose}"
            return verbose_state.value

        def update_process_flow(flow):
            process_flow_state.value += f"\n{flow}"
            return process_flow_state.value

        def update_agent_progress(progress):
            return progress

        # Register callbacks
        output_manager.register_callback(lambda x: gr.update(value=update_output(x)))
        output_manager.register_agent_progress_callback(lambda x: gr.update(value=x))
        output_manager.register_verbose_callback(lambda x: gr.update(value=update_verbose(x)))
        output_manager.register_process_flow_callback(lambda x: gr.update(value=update_process_flow(x)))

        # Analysis workflow
        def analyze_document(api_key, pdf_file):
            try:
                # Reset states
                output_state.value = ""
                verbose_state.value = ""
                process_flow_state.value = ""
                
                document_text = extract_text_from_pdf(pdf_file)
                analyzer = DocumentAnalyzerCrew(
                    document_text=document_text,
                    reference_data={},  # Add your reference data here
                    api_key=api_key,
                    output_manager=output_manager
                )
                
                extraction, comparison, report = analyzer.analyze_document()
                document_state.value = report
                
                return (
                    extraction,
                    comparison,
                    report,
                    analyzer.memory_tracker.get_memory_summary(),
                    verbose_state.value,
                    process_flow_state.value,
                    gr.update(value="Analysis complete!")
                )
            except Exception as e:
                error_msg = f"Analysis Error: {str(e)}\n{traceback.format_exc()}"
                return error_msg, error_msg, error_msg, error_msg, error_msg, error_msg, error_msg

        # Chat workflow
        def chat_with_document(api_key, message):
            try:
                if not document_state.value:
                    return "Please analyze a document first.", "", "", "No document analyzed yet."
                
                # Reset states for chat
                process_flow_state.value = ""
                
                chat_crew = DocumentChatCrew(
                    document_report=document_state.value,
                    api_key=api_key,
                    output_manager=output_manager
                )
                
                response, memory = chat_crew.chat_with_document(message)
                return response, memory, process_flow_state.value, "Chat processing complete!"
            except Exception as e:
                error_msg = f"Chat Error: {str(e)}"
                return error_msg, error_msg, error_msg, error_msg

        # Event handlers
        analyze_btn.click(
            fn=analyze_document,
            inputs=[api_key, pdf_file],
            outputs=[
                extraction_status,
                comparison_status,
                report_status,
                memory_output,
                verbose_output,
                process_flow,
                agent_progress
            ],
            show_progress=True
        )

        chat_btn.click(
            fn=chat_with_document,
            inputs=[api_key, chat_message],
            outputs=[
                chat_response,
                chat_memory,
                chat_process_flow,
                chat_agent_progress
            ],
            show_progress=True
        )

    return demo

if __name__ == "__main__":
    demo = launch_gradio_interface()
    demo.queue()
    demo.launch(share=False, debug=True)