In [None]:
# PDF Processing Pipeline - Input Queue & Page Splitting Strategy
import asyncio
import threading
from typing import List, Dict, Any, Tuple, Optional
from dataclasses import dataclass, field
from collections import deque
from pathlib import Path
import time

PARSER_VERSION = "0.1.0"

print("Initializing PDF Processing Pipeline Components...")
print("📄 Input Queue & Page Splitting Strategy Implementation")


Initializing PDF Processing Pipeline Components...
📄 Input Queue & Page Splitting Strategy Implementation


In [2]:
@dataclass
class PDFPage:
    """Represents a single PDF page with metadata"""
    page_number: int
    document_id: str
    content: Optional[bytes] = None
    width: Optional[float] = None
    height: Optional[float] = None
    dpi: int = 150
    processing_metadata: Dict[str, Any] = field(default_factory=dict)
    
    def __post_init__(self):
        self.processing_metadata.update({
            'created_at': time.time(),
            'status': 'queued',
            'worker_id': None,
            'batch_id': None
        })

@dataclass 
class PageBatch:
    """Represents a batch of 3 pages for processing"""
    batch_id: str
    pages: List[PDFPage]
    worker_assignment: Optional[int] = None
    context_pages: List[int] = field(default_factory=list)  # Adjacent page numbers for context
    processing_priority: int = 1  # 1=high, 2=normal, 3=low
    
    def __post_init__(self):
        if len(self.pages) > 3:
            raise ValueError("Batch cannot contain more than 3 pages")
        # Set context pages (previous and next page numbers)
        if self.pages:
            first_page = min(p.page_number for p in self.pages)
            last_page = max(p.page_number for p in self.pages)
            self.context_pages = [first_page - 1, last_page + 1]

print("✅ Data structures defined: PDFPage, PageBatch")


✅ Data structures defined: PDFPage, PageBatch


In [None]:
class InputQueue:
    """
    Thread-safe PDF input queue with intelligent page loading
    Handles multiple PDF documents simultaneously
    """
    
    def __init__(self, max_queue_size: int = 1000):
        self.max_queue_size = max_queue_size
        self.page_queue = deque()
        self.document_registry = {}  # document_id -> metadata
        self.queue_lock = threading.Lock()
        self.total_pages_loaded = 0
        self.processing_stats = {
            'documents_loaded': 0,
            'pages_queued': 0,
            'queue_size': 0
        }
    
    def load_pdf_document(self, file_path: Path, document_id: str = None) -> str:
        """
        Load a PDF document and add all pages to the queue
        Returns document_id for tracking
        """
        if document_id is None:
            document_id = f"doc_{int(time.time())}_{file_path.stem}"
        
        # Simulate PDF loading (in real implementation, use PyMuPDF/pdfplumber)
        simulated_page_count = 45  # Example: 45-page financial report
        
        with self.queue_lock:
            # Register document
            self.document_registry[document_id] = {
                'file_path': str(file_path),
                'total_pages': simulated_page_count,
                'loaded_at': time.time(),
                'status': 'loading'
            }
            
            # Create and queue pages
            for page_num in range(1, simulated_page_count + 1):
                if len(self.page_queue) >= self.max_queue_size:
                    print(f"⚠️ Queue full! Skipping page {page_num}")
                    break
                    
                page = PDFPage(
                    page_number=page_num,
                    document_id=document_id,
                    width=8.5 * 72,  # Letter size in points
                    height=11 * 72
                )
                
                self.page_queue.append(page)
                self.total_pages_loaded += 1
            
            # Update document status
            self.document_registry[document_id]['status'] = 'queued'
            self.processing_stats['documents_loaded'] += 1
            self.processing_stats['pages_queued'] = len(self.page_queue)
            self.processing_stats['queue_size'] = len(self.page_queue)
        
        print(f"📄 Loaded document '{document_id}': {simulated_page_count} pages")
        return document_id
    
    def get_queue_status(self) -> Dict[str, Any]:
        """Get current queue statistics"""
        with self.queue_lock:
            return {
                **self.processing_stats,
                'documents_in_registry': len(self.document_registry),
                'total_pages_loaded': self.total_pages_loaded
            }
    
    def peek_next_pages(self, count: int = 3) -> List[PDFPage]:
        """Peek at next pages without removing them from queue"""
        with self.queue_lock:
            return list(self.page_queue)[:count]

print("✅ InputQueue class implemented")


In [None]:
class PageSplitter:
    """
    Intelligent page splitter with 3-page batching strategy
    Optimized for GPU memory usage while maintaining document context
    """
    
    def __init__(self, batch_size: int = 3, num_workers: int = 4):
        self.batch_size = batch_size
        self.num_workers = num_workers
        self.batch_counter = 0
        self.worker_assignments = [[] for _ in range(num_workers)]  # Track batches per worker
        self.batch_registry = {}  # batch_id -> PageBatch
        
    def create_batches_from_queue(self, input_queue: InputQueue) -> List[PageBatch]:
        """
        Create 3-page batches from the input queue
        Maintains context preservation and optimal GPU memory usage
        """
        batches = []
        
        with input_queue.queue_lock:
            pages_available = list(input_queue.page_queue)
            
            # Process pages in groups of 3
            for i in range(0, len(pages_available), self.batch_size):
                batch_pages = pages_available[i:i + self.batch_size]
                
                if not batch_pages:
                    break
                
                # Create batch with unique ID
                batch_id = f"batch_{self.batch_counter:04d}"
                batch = PageBatch(
                    batch_id=batch_id,
                    pages=batch_pages,
                    processing_priority=self._calculate_priority(batch_pages)
                )
                
                # Update page metadata
                for page in batch_pages:
                    page.processing_metadata['batch_id'] = batch_id
                    page.processing_metadata['status'] = 'batched'
                
                batches.append(batch)
                self.batch_registry[batch_id] = batch
                self.batch_counter += 1
                
                # Remove processed pages from queue
                for _ in range(len(batch_pages)):
                    if input_queue.page_queue:
                        input_queue.page_queue.popleft()
        
        print(f"📦 Created {len(batches)} batches of {self.batch_size} pages each")
        return batches
    
    def _calculate_priority(self, pages: List[PDFPage]) -> int:
        """
        Calculate processing priority based on page characteristics
        1=high priority (cover page, financial statements)
        2=normal priority (regular content)
        3=low priority (appendices, footnotes)
        """
        # Simple heuristic: first few pages and specific ranges get higher priority
        page_numbers = [p.page_number for p in pages]
        min_page = min(page_numbers)
        
        if min_page <= 5:  # Cover page and early content
            return 1
        elif 20 <= min_page <= 40:  # Typical financial statement pages
            return 1
        else:
            return 2
    
    def get_batch_statistics(self) -> Dict[str, Any]:
        """Get statistics about batch creation"""
        total_batches = len(self.batch_registry)
        total_pages = sum(len(batch.pages) for batch in self.batch_registry.values())
        
        priority_distribution = {}
        for batch in self.batch_registry.values():
            priority = batch.processing_priority
            priority_distribution[priority] = priority_distribution.get(priority, 0) + 1
        
        return {
            'total_batches_created': total_batches,
            'total_pages_batched': total_pages,
            'average_batch_size': total_pages / total_batches if total_batches > 0 else 0,
            'priority_distribution': priority_distribution,
            'batch_size_setting': self.batch_size
        }

print("✅ PageSplitter class implemented")


In [None]:
class WorkerDistributor:
    """
    Round-robin distribution system for 4 GPU workers
    Ensures balanced workload and optimal resource utilization
    """
    
    def __init__(self, num_workers: int = 4):
        self.num_workers = num_workers
        self.worker_queues = [deque() for _ in range(num_workers)]
        self.worker_stats = [{'batches_assigned': 0, 'pages_assigned': 0} for _ in range(num_workers)]
        self.current_worker = 0  # Round-robin counter
        
    def assign_batches_to_workers(self, batches: List[PageBatch]) -> Dict[int, List[PageBatch]]:
        """
        Distribute batches across workers using round-robin strategy
        Returns: {worker_id: [batches]}
        """
        worker_assignments = {i: [] for i in range(self.num_workers)}
        
        for batch in batches:
            # Assign to current worker
            worker_id = self.current_worker
            batch.worker_assignment = worker_id
            
            # Update page metadata
            for page in batch.pages:
                page.processing_metadata['worker_id'] = worker_id
                page.processing_metadata['status'] = 'assigned'
            
            # Add to worker queue and assignment
            self.worker_queues[worker_id].append(batch)
            worker_assignments[worker_id].append(batch)
            
            # Update statistics
            self.worker_stats[worker_id]['batches_assigned'] += 1
            self.worker_stats[worker_id]['pages_assigned'] += len(batch.pages)
            
            # Move to next worker (round-robin)
            self.current_worker = (self.current_worker + 1) % self.num_workers
        
        self._print_distribution_summary(worker_assignments)
        return worker_assignments
    
    def _print_distribution_summary(self, assignments: Dict[int, List[PageBatch]]):
        """Print a summary of the worker distribution"""
        print("\\n🔄 Worker Distribution Summary:")
        print("=" * 50)
        
        for worker_id in range(self.num_workers):
            batches = assignments[worker_id]
            if batches:
                page_ranges = []
                for batch in batches:
                    pages = [p.page_number for p in batch.pages]
                    page_ranges.append(f"{min(pages)}-{max(pages)}")
                
                print(f"Worker {worker_id}: {len(batches)} batches | Pages: {', '.join(page_ranges[:3])}")
                if len(page_ranges) > 3:
                    print(f"             ... and {len(page_ranges) - 3} more batches")
        
        print("=" * 50)
    
    def get_worker_load_balance(self) -> Dict[str, Any]:
        """Analyze load balance across workers"""
        total_batches = sum(stats['batches_assigned'] for stats in self.worker_stats)
        total_pages = sum(stats['pages_assigned'] for stats in self.worker_stats)
        
        if total_batches == 0:
            return {'balanced': True, 'variance': 0, 'worker_loads': []}
        
        avg_batches_per_worker = total_batches / self.num_workers
        variance = sum((stats['batches_assigned'] - avg_batches_per_worker) ** 2 
                      for stats in self.worker_stats) / self.num_workers
        
        worker_loads = []
        for i, stats in enumerate(self.worker_stats):
            worker_loads.append({
                'worker_id': i,
                'batches': stats['batches_assigned'],
                'pages': stats['pages_assigned'],
                'load_percentage': (stats['batches_assigned'] / total_batches * 100) if total_batches > 0 else 0
            })
        
        return {
            'balanced': variance < 1.0,  # Considered balanced if variance < 1
            'variance': variance,
            'total_batches': total_batches,
            'total_pages': total_pages,
            'average_batches_per_worker': avg_batches_per_worker,
            'worker_loads': worker_loads
        }
    
    def simulate_worker_processing(self, worker_id: int) -> List[str]:
        """
        Simulate processing for a specific worker
        Returns list of batch IDs that would be processed
        """
        worker_queue = self.worker_queues[worker_id]
        processing_order = [batch.batch_id for batch in worker_queue]
        
        print(f"\\n🖥️ Worker {worker_id} Processing Simulation:")
        print(f"   Queue length: {len(processing_order)} batches")
        print(f"   Processing order: {processing_order[:5]}")  # Show first 5
        if len(processing_order) > 5:
            print(f"   ... and {len(processing_order) - 5} more")
        
        return processing_order

print("✅ WorkerDistributor class implemented")


In [None]:
class ContextPreserver:
    """
    Maintains document context and relationships between adjacent pages
    Ensures cross-page elements (spanning tables, sections) are handled properly
    """
    
    def __init__(self):
        self.document_structure = {}  # document_id -> structure info
        self.page_relationships = {}  # page_id -> adjacent pages info
        self.cross_page_elements = {}  # Elements that span multiple pages
        
    def analyze_document_structure(self, document_id: str, pages: List[PDFPage]) -> Dict[str, Any]:
        """
        Analyze the overall structure of a document for context preservation
        """
        page_count = len(pages)
        page_numbers = [p.page_number for p in pages]
        
        structure = {
            'document_id': document_id,
            'total_pages': page_count,
            'page_range': (min(page_numbers), max(page_numbers)),
            'sections': self._identify_sections(pages),
            'potential_spanning_elements': self._detect_spanning_elements(pages)
        }
        
        self.document_structure[document_id] = structure
        return structure
    
    def _identify_sections(self, pages: List[PDFPage]) -> List[Dict[str, Any]]:
        """
        Identify document sections based on page patterns
        (In real implementation, this would use layout analysis)
        """
        sections = []
        
        # Simulate section detection for financial documents
        page_numbers = [p.page_number for p in pages]
        total_pages = len(pages)
        
        if total_pages > 0:
            # Common financial document sections
            sections.extend([
                {'name': 'Cover & TOC', 'pages': [1, 2, 3], 'type': 'front_matter'},
                {'name': 'Management Discussion', 'pages': list(range(4, 12)), 'type': 'narrative'},
                {'name': 'Financial Statements', 'pages': list(range(12, 25)), 'type': 'financial_data'},
                {'name': 'Notes to Statements', 'pages': list(range(25, 40)), 'type': 'detailed_notes'},
                {'name': 'Appendices', 'pages': list(range(40, max(page_numbers) + 1)), 'type': 'supplementary'}
            ])
        
        return sections
    
    def _detect_spanning_elements(self, pages: List[PDFPage]) -> List[Dict[str, Any]]:
        """
        Detect elements that might span across pages
        """
        spanning_elements = []
        
        # Simulate detection of cross-page elements
        for i, page in enumerate(pages[:-1]):  # Don't check last page
            next_page = pages[i + 1]
            
            # Check for potential spanning tables (common in financial reports)
            if self._might_have_spanning_table(page, next_page):
                spanning_elements.append({
                    'type': 'table',
                    'start_page': page.page_number,
                    'end_page': next_page.page_number,
                    'confidence': 0.8,
                    'description': f'Potential table spanning pages {page.page_number}-{next_page.page_number}'
                })
        
        return spanning_elements
    
    def _might_have_spanning_table(self, page1: PDFPage, page2: PDFPage) -> bool:
        """
        Simple heuristic to detect potential spanning tables
        (In real implementation, would analyze actual content)
        """
        # Financial statements often have tables spanning pages 15-20
        return 15 <= page1.page_number <= 20 and page2.page_number == page1.page_number + 1
    
    def get_context_for_batch(self, batch: PageBatch) -> Dict[str, Any]:
        """
        Get contextual information for a batch to improve processing
        """
        if not batch.pages:
            return {}
        
        document_id = batch.pages[0].document_id
        page_numbers = [p.page_number for p in batch.pages]
        
        context = {
            'batch_id': batch.batch_id,
            'document_id': document_id,
            'page_numbers': page_numbers,
            'adjacent_pages': {
                'before': min(page_numbers) - 1 if min(page_numbers) > 1 else None,
                'after': max(page_numbers) + 1
            },
            'section_info': self._get_section_for_pages(document_id, page_numbers),
            'spanning_elements': self._get_spanning_elements_for_pages(document_id, page_numbers),
            'processing_hints': self._generate_processing_hints(page_numbers)
        }
        
        return context
    
    def _get_section_for_pages(self, document_id: str, page_numbers: List[int]) -> List[Dict[str, Any]]:
        """Get section information for specific pages"""
        if document_id not in self.document_structure:
            return []
        
        sections = self.document_structure[document_id]['sections']
        relevant_sections = []
        
        for section in sections:
            if any(page_num in section['pages'] for page_num in page_numbers):
                relevant_sections.append({
                    'name': section['name'],
                    'type': section['type'],
                    'overlap_pages': [p for p in page_numbers if p in section['pages']]
                })
        
        return relevant_sections
    
    def _get_spanning_elements_for_pages(self, document_id: str, page_numbers: List[int]) -> List[Dict[str, Any]]:
        """Get spanning elements that affect these pages"""
        if document_id not in self.document_structure:
            return []
        
        spanning_elements = self.document_structure[document_id]['potential_spanning_elements']
        relevant_elements = []
        
        for element in spanning_elements:
            if (element['start_page'] in page_numbers or 
                element['end_page'] in page_numbers or
                (element['start_page'] < min(page_numbers) and element['end_page'] > max(page_numbers))):
                relevant_elements.append(element)
        
        return relevant_elements
    
    def _generate_processing_hints(self, page_numbers: List[int]) -> List[str]:
        """Generate hints for optimal processing based on page characteristics"""
        hints = []
        
        min_page = min(page_numbers)
        max_page = max(page_numbers)
        
        if min_page <= 5:
            hints.append("Early pages: Expect cover page, TOC, executive summary")
        
        if 10 <= min_page <= 25:
            hints.append("Financial statements likely: Use enhanced table detection")
        
        if 25 <= min_page <= 40:
            hints.append("Notes section: Complex formatting, detailed tables")
        
        if max_page >= 40:
            hints.append("Appendices: May contain charts, supplementary data")
        
        return hints

print("✅ ContextPreserver class implemented")


In [None]:
# Example Usage and Demonstration
print("\\n" + "="*60)
print("🚀 PIPELINE DEMONSTRATION")
print("="*60)

# Initialize the pipeline components
input_queue = InputQueue(max_queue_size=1000)
page_splitter = PageSplitter(batch_size=3, num_workers=4)
worker_distributor = WorkerDistributor(num_workers=4)
context_preserver = ContextPreserver()

# Simulate loading a financial document
sample_pdf_path = Path("data/raw/msft_10k_2024.pdf")  # Example path
document_id = input_queue.load_pdf_document(sample_pdf_path, "MSFT_10K_2024")

# Check queue status
queue_status = input_queue.get_queue_status()
print(f"\\n📊 Queue Status: {queue_status}")

# Get some pages for context analysis
sample_pages = input_queue.peek_next_pages(45)  # Get all pages
if sample_pages:
    # Analyze document structure
    structure = context_preserver.analyze_document_structure(document_id, sample_pages)
    print(f"\\n📋 Document Structure Analysis:")
    print(f"   Total Pages: {structure['total_pages']}")
    print(f"   Sections: {len(structure['sections'])}")
    print(f"   Spanning Elements: {len(structure['potential_spanning_elements'])}")

print("\\n" + "="*60)
print("✅ Pipeline components initialized and demonstrated!")
print("Ready for batch processing and worker distribution...")
print("="*60)


In [None]:
# Create batches and distribute to workers
print("\\n🔄 BATCH CREATION AND WORKER DISTRIBUTION")
print("="*50)

# Create 3-page batches from the queue
batches = page_splitter.create_batches_from_queue(input_queue)

# Get batch statistics
batch_stats = page_splitter.get_batch_statistics()
print(f"\\n📦 Batch Statistics:")
for key, value in batch_stats.items():
    print(f"   {key}: {value}")

# Distribute batches to workers using round-robin
worker_assignments = worker_distributor.assign_batches_to_workers(batches)

# Analyze load balance
load_balance = worker_distributor.get_worker_load_balance()
print(f"\\n⚖️ Load Balance Analysis:")
print(f"   Balanced: {load_balance['balanced']}")
print(f"   Variance: {load_balance['variance']:.2f}")
print(f"   Average batches per worker: {load_balance['average_batches_per_worker']:.1f}")

print("\\n👥 Worker Load Distribution:")
for worker_load in load_balance['worker_loads']:
    print(f"   Worker {worker_load['worker_id']}: "
          f"{worker_load['batches']} batches ({worker_load['load_percentage']:.1f}%) "
          f"| {worker_load['pages']} pages")


In [None]:
# Demonstrate context preservation for specific batches
print("\\n🧠 CONTEXT PRESERVATION DEMONSTRATION")
print("="*50)

# Show context for first few batches
for i, batch in enumerate(batches[:3]):  # Show first 3 batches
    context = context_preserver.get_context_for_batch(batch)
    
    print(f"\\n📄 Batch {batch.batch_id} Context:")
    print(f"   Pages: {context['page_numbers']}")
    print(f"   Adjacent pages: Before={context['adjacent_pages']['before']}, "
          f"After={context['adjacent_pages']['after']}")
    
    if context['section_info']:
        print(f"   Sections: {[s['name'] for s in context['section_info']]}")
    
    if context['spanning_elements']:
        print(f"   Spanning elements: {len(context['spanning_elements'])} detected")
    
    if context['processing_hints']:
        print(f"   Processing hints: {context['processing_hints'][0]}")

# Simulate processing for one worker
print("\\n" + "="*50)
worker_distributor.simulate_worker_processing(0)  # Worker 0
worker_distributor.simulate_worker_processing(1)  # Worker 1

print("\\n✅ Context preservation and worker simulation complete!")
print("\\n💡 Key Benefits Demonstrated:")
print("   • 3-page batches optimize GPU memory usage")
print("   • Round-robin ensures balanced workload")
print("   • Context preservation maintains document relationships")
print("   • Cross-page elements are properly tracked")
print("   • Processing hints optimize extraction strategies")
