# Interactive Document Processing Workflow

This notebook provides an improved user experience for the document processing phase of GraphRAG MCP, addressing key friction points:

- ✅ **Real-time progress feedback** during processing
- ✅ **Incremental processing** - process documents one by one
- ✅ **Error handling** with recovery options
- ✅ **Interactive workflow** with visual feedback
- ✅ **Processing analytics** and performance insights

## 🎯 What You'll Accomplish

Transform your document collection into a searchable knowledge graph with:
- Real-time progress tracking
- Individual document status monitoring
- Error recovery and retry mechanisms
- Performance analytics and optimization tips

## 📋 Setup and Initialization

In [None]:
import asyncio
import sys
import time
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path

import matplotlib.pyplot as plt
import pandas as pd

# Progress tracking
from tqdm.notebook import tqdm

# Add project root to path
project_root = Path.cwd().parent
sys.path.insert(0, str(project_root))

# Import GraphRAG MCP components
from graphrag_mcp.core.analyzer import AdvancedAnalyzer
from graphrag_mcp.core.citation_manager import CitationTracker
from graphrag_mcp.core.document_processor import DocumentProcessor
from graphrag_mcp.core.graphiti_engine import GraphitiKnowledgeGraph

print("✅ All imports successful!")
print(f"📁 Project root: {project_root}")

## 🔧 Configuration and Project Setup

In [None]:
# Project configuration
PROJECT_NAME = "my-research"  # Change this to your project name
TEMPLATE = "academic"
DOCUMENTS_FOLDER = "../examples"  # Change to your documents folder

# Processing configuration
MAX_CONCURRENT_DOCS = 3  # Process up to 3 documents simultaneously
RETRY_ATTEMPTS = 3
TIMEOUT_SECONDS = 300  # 5 minutes per document

print("🏗️  Project Configuration:")
print(f"   Project Name: {PROJECT_NAME}")
print(f"   Template: {TEMPLATE}")
print(f"   Documents Folder: {DOCUMENTS_FOLDER}")
print(f"   Max Concurrent: {MAX_CONCURRENT_DOCS}")
print(f"   Retry Attempts: {RETRY_ATTEMPTS}")

## 📁 Document Discovery and Validation

In [None]:
@dataclass
class DocumentStatus:
    """Track processing status for each document"""
    path: Path
    name: str
    size_mb: float
    status: str = "pending"  # pending, processing, completed, failed
    start_time: datetime | None = None
    end_time: datetime | None = None
    error_message: str | None = None
    entities_found: int = 0
    citations_found: int = 0
    processing_time: float = 0.0

    @property
    def processing_speed(self) -> float:
        """Pages per minute estimate"""
        if self.processing_time > 0:
            return (self.size_mb * 10) / (self.processing_time / 60)  # Rough estimate
        return 0.0

def discover_documents(folder_path: str) -> list[DocumentStatus]:
    """Discover and validate PDF documents in the specified folder"""
    documents = []
    folder = Path(folder_path)

    if not folder.exists():
        print(f"❌ Folder not found: {folder_path}")
        return documents

    print(f"🔍 Scanning folder: {folder_path}")

    for pdf_file in folder.glob("**/*.pdf"):
        try:
            size_mb = pdf_file.stat().st_size / (1024 * 1024)
            doc_status = DocumentStatus(
                path=pdf_file,
                name=pdf_file.name,
                size_mb=round(size_mb, 2)
            )
            documents.append(doc_status)
            print(f"   📄 {pdf_file.name} ({size_mb:.2f} MB)")
        except Exception as e:
            print(f"   ⚠️  Error reading {pdf_file.name}: {e}")

    print(f"\n📊 Found {len(documents)} PDF documents")
    print(f"   Total size: {sum(doc.size_mb for doc in documents):.2f} MB")

    return documents

# Discover documents
documents = discover_documents(DOCUMENTS_FOLDER)

if not documents:
    print("\n❌ No documents found. Please check your DOCUMENTS_FOLDER path.")
else:
    # Display documents table
    df = pd.DataFrame([
        {
            'Document': doc.name,
            'Size (MB)': doc.size_mb,
            'Status': doc.status,
            'Est. Time (min)': round(doc.size_mb * 2, 1)  # Rough estimate
        } for doc in documents
    ])
    print("\n📋 Document Processing Queue:")
    print(df.to_string(index=False))

## 🏗️ Initialize Processing Components

In [None]:
# Initialize core components
print("🔧 Initializing processing components...")

try:
    # Document processor
    doc_processor = DocumentProcessor()
    print("   ✅ Document Processor initialized")

    # Advanced analyzer
    analyzer = AdvancedAnalyzer()
    print("   ✅ Advanced Analyzer initialized")

    # Graphiti knowledge graph
    graphiti_engine = GraphitiKnowledgeGraph()
    print("   ✅ Graphiti Knowledge Graph initialized")

    # Citation tracker
    citation_tracker = CitationTracker()
    print("   ✅ Citation Tracker initialized")

    print("\n🚀 All components ready for processing!")

except Exception as e:
    print(f"❌ Component initialization failed: {e}")
    print("\n🔧 Troubleshooting tips:")
    print("   - Ensure Ollama is running: ollama serve")
    print("   - Check Neo4j is running: docker ps")
    print("   - Verify models are installed: ollama list")

## 🔄 Interactive Document Processing

In [None]:
async def process_single_document(doc_status: DocumentStatus) -> bool:
    """Process a single document with detailed tracking"""
    doc_status.status = "processing"
    doc_status.start_time = datetime.now()

    try:
        # Step 1: Basic document processing
        print(f"📄 Processing: {doc_status.name}")
        doc_data = doc_processor.process_document(str(doc_status.path))

        # Step 2: Advanced analysis
        print(f"🔍 Analyzing: {doc_status.name}")
        corpus_doc = analyzer.analyze_for_corpus(str(doc_status.path))

        # Step 3: Citation tracking
        print(f"📚 Extracting citations: {doc_status.name}")
        # Add citation tracking logic here

        # Step 4: Add to knowledge graph
        print(f"🕸️  Adding to graph: {doc_status.name}")
        success = await graphiti_engine.add_document(
            document_content=corpus_doc.content,
            document_id=f"{PROJECT_NAME}_{doc_status.path.stem}",
            metadata={
                "title": corpus_doc.title,
                "project": PROJECT_NAME,
                "template": TEMPLATE,
                "filename": doc_status.name,
                "entities": corpus_doc.entities,
                "processing_date": datetime.now().isoformat(),
                **corpus_doc.metadata
            },
            source_description=f"{TEMPLATE} document from {PROJECT_NAME} project"
        )

        # Update status
        doc_status.entities_found = len(corpus_doc.entities)
        doc_status.citations_found = len(getattr(corpus_doc, 'citations', []))
        doc_status.status = "completed" if success else "failed"

        print(f"✅ Completed: {doc_status.name} ({doc_status.entities_found} entities, {doc_status.citations_found} citations)")
        return success

    except Exception as e:
        doc_status.status = "failed"
        doc_status.error_message = str(e)
        print(f"❌ Failed: {doc_status.name} - {e}")
        return False

    finally:
        doc_status.end_time = datetime.now()
        if doc_status.start_time:
            doc_status.processing_time = (doc_status.end_time - doc_status.start_time).total_seconds()

async def process_documents_with_progress(documents: list[DocumentStatus]):
    """Process documents with real-time progress tracking"""
    if not documents:
        print("No documents to process.")
        return

    print(f"🚀 Starting processing of {len(documents)} documents...")
    print(f"⚙️  Processing up to {MAX_CONCURRENT_DOCS} documents simultaneously")

    # Create progress bar
    progress_bar = tqdm(total=len(documents), desc="Processing Documents", unit="doc")

    # Process documents with concurrency control
    semaphore = asyncio.Semaphore(MAX_CONCURRENT_DOCS)

    async def process_with_semaphore(doc_status):
        async with semaphore:
            success = await process_single_document(doc_status)
            progress_bar.update(1)
            return success

    # Start processing
    start_time = time.time()

    # Process all documents
    tasks = [process_with_semaphore(doc) for doc in documents]
    results = await asyncio.gather(*tasks, return_exceptions=True)

    progress_bar.close()

    # Summary
    total_time = time.time() - start_time
    successful = sum(1 for doc in documents if doc.status == "completed")
    failed = sum(1 for doc in documents if doc.status == "failed")

    print("\n📊 Processing Complete:")
    print(f"   ✅ Successful: {successful}/{len(documents)}")
    print(f"   ❌ Failed: {failed}/{len(documents)}")
    print(f"   ⏱️  Total time: {total_time/60:.1f} minutes")
    print(f"   ⚡ Average: {total_time/len(documents):.1f} seconds per document")

# Run the processing
if documents:
    await process_documents_with_progress(documents)
else:
    print("❌ No documents to process. Please check the DOCUMENTS_FOLDER path.")

## 📊 Processing Analytics and Results

In [None]:
# Create results DataFrame
if documents:
    results_df = pd.DataFrame([
        {
            'Document': doc.name,
            'Status': doc.status,
            'Size (MB)': doc.size_mb,
            'Processing Time (s)': doc.processing_time,
            'Entities Found': doc.entities_found,
            'Citations Found': doc.citations_found,
            'Speed (MB/min)': doc.processing_speed if doc.processing_time > 0 else 0,
            'Error': doc.error_message if doc.error_message else ''
        } for doc in documents
    ])

    print("📋 Detailed Processing Results:")
    print(results_df.to_string(index=False))

    # Summary statistics
    completed_docs = results_df[results_df['Status'] == 'completed']

    if not completed_docs.empty:
        print("\n📈 Performance Statistics:")
        print(f"   Average processing time: {completed_docs['Processing Time (s)'].mean():.1f} seconds")
        print(f"   Fastest document: {completed_docs['Processing Time (s)'].min():.1f} seconds")
        print(f"   Slowest document: {completed_docs['Processing Time (s)'].max():.1f} seconds")
        print(f"   Total entities extracted: {completed_docs['Entities Found'].sum()}")
        print(f"   Total citations found: {completed_docs['Citations Found'].sum()}")
        print(f"   Average entities per document: {completed_docs['Entities Found'].mean():.1f}")

        # Visualization
        plt.figure(figsize=(15, 10))

        # Processing time by document size
        plt.subplot(2, 2, 1)
        plt.scatter(completed_docs['Size (MB)'], completed_docs['Processing Time (s)'], alpha=0.7)
        plt.xlabel('Document Size (MB)')
        plt.ylabel('Processing Time (seconds)')
        plt.title('Processing Time vs Document Size')

        # Entities found per document
        plt.subplot(2, 2, 2)
        plt.bar(range(len(completed_docs)), completed_docs['Entities Found'])
        plt.xlabel('Document Index')
        plt.ylabel('Entities Found')
        plt.title('Entities Extracted per Document')

        # Processing speed distribution
        plt.subplot(2, 2, 3)
        plt.hist(completed_docs['Processing Time (s)'], bins=10, alpha=0.7)
        plt.xlabel('Processing Time (seconds)')
        plt.ylabel('Frequency')
        plt.title('Processing Time Distribution')

        # Status summary
        plt.subplot(2, 2, 4)
        status_counts = results_df['Status'].value_counts()
        plt.pie(status_counts.values, labels=status_counts.index, autopct='%1.1f%%')
        plt.title('Processing Status Summary')

        plt.tight_layout()
        plt.show()

    # Failed documents analysis
    failed_docs = results_df[results_df['Status'] == 'failed']
    if not failed_docs.empty:
        print("\n❌ Failed Documents Analysis:")
        for _, row in failed_docs.iterrows():
            print(f"   📄 {row['Document']}: {row['Error']}")

else:
    print("No processing results to display.")

## 🔄 Error Recovery and Retry

In [None]:
def get_failed_documents() -> list[DocumentStatus]:
    """Get list of failed documents for retry"""
    return [doc for doc in documents if doc.status == "failed"]

def reset_document_for_retry(doc_status: DocumentStatus):
    """Reset document status for retry"""
    doc_status.status = "pending"
    doc_status.start_time = None
    doc_status.end_time = None
    doc_status.error_message = None
    doc_status.processing_time = 0.0

# Check for failed documents
failed_docs = get_failed_documents()

if failed_docs:
    print(f"❌ Found {len(failed_docs)} failed documents:")
    for doc in failed_docs:
        print(f"   📄 {doc.name}: {doc.error_message}")

    # Option to retry failed documents
    retry_choice = input("\n🔄 Would you like to retry failed documents? (y/n): ")

    if retry_choice.lower() == 'y':
        print("\n🔄 Retrying failed documents...")

        # Reset failed documents
        for doc in failed_docs:
            reset_document_for_retry(doc)

        # Retry processing
        await process_documents_with_progress(failed_docs)

        # Show updated results
        still_failed = get_failed_documents()
        if still_failed:
            print(f"\n⚠️  {len(still_failed)} documents still failed after retry.")
            print("\n🔧 Troubleshooting suggestions:")
            print("   - Check if files are corrupted or password-protected")
            print("   - Verify Ollama service is stable")
            print("   - Try processing individually with smaller batch sizes")
        else:
            print("\n✅ All documents processed successfully after retry!")

else:
    print("✅ No failed documents to retry.")

## 📊 Knowledge Graph Status

In [None]:
# Check knowledge graph status
try:
    print("🕸️  Knowledge Graph Status:")

    # Get basic statistics from Graphiti
    # Note: This would need to be implemented in the GraphitiKnowledgeGraph class
    print(f"   📊 Project: {PROJECT_NAME}")
    print(f"   📄 Documents processed: {len([doc for doc in documents if doc.status == 'completed'])}")
    print(f"   🔗 Total entities: {sum(doc.entities_found for doc in documents)}")
    print(f"   📚 Total citations: {sum(doc.citations_found for doc in documents)}")

    # Knowledge graph readiness
    successful_docs = [doc for doc in documents if doc.status == "completed"]
    if successful_docs:
        print("\n✅ Knowledge graph ready for MCP server!")
        print(f"   🚀 Next step: graphrag-mcp serve {PROJECT_NAME} --transport stdio")
        print("   🔌 Connect to Claude Desktop for interactive research")
    else:
        print("\n❌ Knowledge graph not ready - no documents processed successfully")

except Exception as e:
    print(f"❌ Error checking knowledge graph status: {e}")

## 🎯 Next Steps and Recommendations

In [None]:
print("🎯 Next Steps:")
print("\n1. 🚀 Start MCP Server:")
print(f"   graphrag-mcp serve {PROJECT_NAME} --transport stdio")

print("\n2. 🔌 Connect to Claude Desktop:")
print("   Add this to ~/.config/claude-desktop/config.json:")
print('   "mcpServers": {"""')
print(f'     "{PROJECT_NAME}": {{')
print('       "command": "graphrag-mcp",')
print(f'       "args": ["serve", "{PROJECT_NAME}", "--transport", "stdio"]')
print('     }')
print('   }"""')

print("\n3. 💬 Start Research Chat:")
print("   Conversational Mode:")
print('   - "Ask knowledge graph: What are the main themes?"')
print('   - "Explore topic: machine learning applications"')
print('   - "Find connections between concepts"')

print("\n   Literature Review Mode:")
print('   - "Get facts with citations about transformers"')
print('   - "Verify claim with sources: [your claim]"')
print('   - "Generate bibliography in APA style"')

print("\n4. 📈 Performance Optimization:")
if documents:
    avg_time = sum(doc.processing_time for doc in documents if doc.processing_time > 0) / len([doc for doc in documents if doc.processing_time > 0])
    if avg_time > 120:  # 2 minutes
        print("   ⚠️  Processing time is high. Consider:")
        print("     - Reducing document size before processing")
        print("     - Using a more powerful GPU/CPU")
        print("     - Processing in smaller batches")
    else:
        print("   ✅ Processing performance is good")

print("\n5. 📊 Quality Assurance:")
if documents:
    avg_entities = sum(doc.entities_found for doc in documents) / len([doc for doc in documents if doc.entities_found > 0])
    if avg_entities < 10:
        print("   ⚠️  Low entity extraction. Consider:")
        print("     - Checking document quality (OCR, formatting)")
        print("     - Adjusting entity extraction templates")
    else:
        print(f"   ✅ Good entity extraction ({avg_entities:.1f} per document)")

print("\n🎉 Your GraphRAG MCP system is ready for interactive research!")