In [None]:
pip install PyMuPDF

In [None]:
import re
import fitz  # PyMuPDF
from typing import List, Dict, Tuple, Optional
from dataclasses import dataclass

@dataclass
class KnowledgeSegment:
    """Represents a semantic knowledge segment"""
    title: str
    content: str
    syntax: str  # Syntax information if available
    examples: List[str]  # List of examples 
    options: str  # Options information if available
    level: int  # Hierarchy level (0=main section, 1=subsection, etc.)
    page_number: int
    segment_type: str  # 'section', 'definition', 'example', 'syntax', 'options'
    section_component: str  # The main component this segment belongs to

class PDFKnowledgeExtractor:
    def __init__(self):
        # Font size thresholds for different hierarchy levels
        self.main_heading_size = 14  # Main sections like "Definitions", "AQL (Access)"
        self.sub_heading_size = 10   # Sub-items like "assembly language", "total"
        self.body_text_size = 9      # Regular content
        
        # Patterns for different content types
        self.definition_pattern = r'^[a-zA-Z][a-zA-Z0-9\s_-]+$'
        self.example_pattern = r'^\s*Example\s*$'
        self.syntax_pattern = r'^\s*Syntax\s*$'
        self.options_pattern = r'^\s*Options\s*$'
        
        # Header/footer patterns that indicate section components
        self.header_footer_patterns = [
            r'^Pick Systems Reference Manual$',
            r'^Definitions$',
            r'^AQL \(Access\)$',
            r'^Page \d+$',
            r'^Attribute\-defining Items$',
            r'^Background\/Phantom Process$',
            r'^Pick\/BASIC\—FlashBASIC$',
            r'^FlashBASIC Debugger$',
            r'^C Functions$',
            r'^Editor$',
            r'^System Files$',
            r'^Output Processor$',
            r'^Proc$',
            r'^Processing Codes$',
            r'^Runoff$',
            r'^Spooler$',
            r'^System Debugger$',
            r'^Tape$',
            r'^TCL$',
            r'^Update processor \(UP\)$',
            r'^Unix$',
            r'^Index$',
            r'^Customer Service$',
            r'^Reader\'s Comments$',
        ]
        
        # Map section headers to component names
        self.section_component_map = {
            'Pick Systems Reference Manual': 'General',
            'Definitions': 'Definitions',
            'AQL (Access)': 'AQL',
            'Attribute-defining Items': 'Attribute-defining Items',
            'Background/Phantom Process': 'Background Process',
            'Pick/BASIC—FlashBASIC': 'FlashBASIC',
            'FlashBASIC Debugger': 'FlashBASIC Debugger',
            'C Functions': 'C Functions',
            'Editor': 'Editor',
            'System Files': 'System Files',
            'Output Processor': 'Output Processor',
            'Proc': 'Proc',
            'Processing Codes': 'Processing Codes',
            'Runoff': 'Runoff',
            'Spooler': 'Spooler',
            'System Debugger': 'System Debugger',
            'Tape': 'Tape',
            'TCL': 'TCL',
            'Update processor (UP)': 'Update Processor',
            'Unix': 'Unix',
            'Index': 'Index',
            'Customer Service': 'Customer Service',
            'Reader\'s Comments': 'Reader Comments',
        }
        
    def extract_text_with_formatting(self, pdf_path: str) -> List[Dict]:
        """Extract text with formatting information from PDF"""
        doc = fitz.open(pdf_path)
        formatted_blocks = []
        
        for page_num in range(len(doc)):
            page = doc.load_page(page_num)
            blocks = page.get_text("dict")["blocks"]
            
            for block in blocks:
                if "lines" in block:  # Text block
                    for line in block["lines"]:
                        for span in line["spans"]:
                            text = span["text"].strip()
                            if text:  # Skip empty text
                                formatted_blocks.append({
                                    'text': text,
                                    'font_size': span["size"],
                                    'font_flags': span["flags"],  # Bold, italic flags
                                    'bbox': span["bbox"],  # Bounding box for position
                                    'page': page_num + 1
                                })
        
        doc.close()
        return formatted_blocks
    
    def is_bold(self, font_flags: int) -> bool:
        """Check if text is bold based on font flags"""
        return bool(font_flags & 2**4)  # Bold flag
    
    def get_indentation_level(self, bbox: Tuple[float, float, float, float]) -> int:
        """Determine indentation level based on x-coordinate"""
        x0 = bbox[0]
        if x0 < 80:
            return 0  # Main heading
        elif x0 < 140:
            return 1  # First level indent
        elif x0 < 180:
            return 2  # Second level indent
        else:
            return 3  # Deep indent
    
    def is_page_header_footer(self, text: str) -> bool:
        """Identify page headers, footers, and page numbers"""
        text_clean = text.strip()
        
        for pattern in self.header_footer_patterns:
            if re.match(pattern, text_clean, re.IGNORECASE):
                return True
        
        return False
    
    def get_section_component_from_header(self, text: str) -> Optional[str]:
        """Extract section component name from header text, excluding generic headers"""
        text_clean = text.strip()
        
        # Skip generic headers that don't indicate section changes
        generic_headers = [
            r'^Pick Systems Reference Manual$',
            r'^Page \d+$',
        ]
        
        for generic_pattern in generic_headers:
            if re.match(generic_pattern, text_clean, re.IGNORECASE):
                return None
        
        # Check against known section headers
        for pattern in self.header_footer_patterns:
            if re.match(pattern, text_clean, re.IGNORECASE):
                # Find the corresponding component name
                for section_name, component_name in self.section_component_map.items():
                    if re.match(pattern, section_name, re.IGNORECASE):
                        return component_name
                
                # If no mapping found, clean the text and use as component
                clean_text = re.sub(r'[^\w\s]', '', text_clean).strip()
                return clean_text if clean_text else None
        
        return None
    
    def track_page_sections_smart(self, blocks: List[Dict]) -> Dict[int, str]:
        """Smart tracking of section components, handling alternating headers"""
        page_sections = {}
        current_section = "General"  # Default section
        section_changes = {}  # Track where sections change
        
        # First pass: identify actual section changes
        for block in blocks:
            page_num = block['page']
            text = block['text']
            
            # Check if this is a meaningful section header (not generic)
            section_component = self.get_section_component_from_header(text)
            if section_component:
                section_changes[page_num] = section_component
        
        # Second pass: assign sections to all pages
        current_section = "General"
        for page_num in range(1, max(block['page'] for block in blocks) + 1):
            # Check if this page has a section change
            if page_num in section_changes:
                current_section = section_changes[page_num]
            
            page_sections[page_num] = current_section
        
        return page_sections
    
    def is_content_continuation(self, current_block: Dict, next_block: Dict) -> bool:
        """Determine if next block is a continuation of current content across pages"""
        # Different pages but similar formatting
        if current_block['page'] != next_block['page']:
            # Check if formatting is similar (same font size and flags)
            font_match = abs(current_block['font_size'] - next_block['font_size']) < 1
            flag_match = current_block['font_flags'] == next_block['font_flags']
            
            # Check if the content logically continues (starts with lowercase or continuation words)
            next_text = next_block['text'].strip()
            continuation_indicators = [
                next_text[0].islower() if next_text else False,  # Starts with lowercase
                next_text.startswith(('and', 'or', 'but', 'the', 'to', 'of', 'in', 'on', 'at', 'for')),
                not next_text[0].isupper() if next_text else False,  # Not starting with capital
            ]
            
            return font_match and flag_match and any(continuation_indicators)
        
        # Same page - use original logic
        return (abs(current_block['font_size'] - next_block['font_size']) < 1 and
                current_block['font_flags'] == next_block['font_flags'] and
                abs(current_block['bbox'][0] - next_block['bbox'][0]) < 10)
    
    def classify_content_type(self, text: str, font_size: float, is_bold: bool, indent_level: int) -> str:
        """Classify the type of content based on formatting and text"""
        text_lower = text.lower().strip()
        text_clean = text.strip()
        
        # Skip page headers and footers
        if self.is_page_header_footer(text):
            return 'page_header_footer'
        
        # Main section headers
        if font_size >= self.main_heading_size and is_bold and indent_level == 0:
            return 'main_section'
        
        # Special keywords - improved detection
        # Exact matches for section headers
        if text_clean.lower() in ['example', 'syntax', 'options']:
            return text_clean.lower()
        
        # Pattern matches for section headers with colons or other punctuation
        if re.match(r'^(example|syntax|options)\s*[:.-]?\s*$', text_lower):
            if 'example' in text_lower:
                return 'example'
            elif 'syntax' in text_lower:
                return 'syntax'
            elif 'options' in text_lower:
                return 'options'
        
        # Definition headers (bold, medium size, indented)
        if is_bold and font_size >= self.sub_heading_size and indent_level == 1:
            return 'definition_header'
        
        # Regular content
        return 'content'
    
    def merge_continuation_lines(self, blocks: List[Dict]) -> List[Dict]:
        """Merge text blocks that are part of the same paragraph"""
        merged_blocks = []
        i = 0
        
        while i < len(blocks):
            current_block = blocks[i].copy()
            
            # Skip if this is a page header/footer
            if self.is_page_header_footer(current_block['text']):
                i += 1
                continue
            
            # Look ahead for continuation lines
            j = i + 1
            while j < len(blocks):
                next_block = blocks[j]
                
                # Skip page headers/footers in continuation checking
                if self.is_page_header_footer(next_block['text']):
                    j += 1
                    continue
                
                # Check if next block is a continuation
                if self.is_content_continuation(current_block, next_block):
                    # Merge the text
                    current_block['text'] += ' ' + next_block['text']
                    j += 1
                else:
                    break
            
            merged_blocks.append(current_block)
            i = j
        
        return merged_blocks
    
    def extract_knowledge_segments_with_merging(self, pdf_path: str) -> List[KnowledgeSegment]:
        """Extract and merge related knowledge segments (content, syntax, examples, options)"""
        formatted_blocks = self.extract_text_with_formatting(pdf_path)
        
        # Smart tracking of section components
        page_sections = self.track_page_sections_smart(formatted_blocks)
        
        merged_blocks = self.merge_continuation_lines(formatted_blocks)
        
        # Intermediate segments before merging
        raw_segments = []
        current_main_section = ""
        current_subsection = ""
        current_content = []
        current_level = 0
        current_page = 1
        current_section_component = "General"
        current_content_type = "content"  # Track current content type
        
        for block in merged_blocks:
            text = block['text']
            font_size = block['font_size']
            is_bold = self.is_bold(block['font_flags'])
            indent_level = self.get_indentation_level(block['bbox'])
            content_type = self.classify_content_type(text, font_size, is_bold, indent_level)
            page_num = block['page']
            
            # Update current section component based on page
            if page_num in page_sections:
                current_section_component = page_sections[page_num]
            
            # Skip page headers/footers
            if content_type == 'page_header_footer':
                continue
            
            if content_type == 'main_section':
                # Only treat as new main section if different
                if text != current_main_section:
                    # Save previous section if exists
                    if current_main_section and current_content:
                        raw_segments.append({
                            'title': current_main_section,
                            'content': '\n'.join(current_content),
                            'level': 0,
                            'page_number': current_page,
                            'segment_type': current_content_type,  # Use current_content_type
                            'section_component': current_section_component,
                            'parent_key': current_main_section
                        })
                    
                    current_main_section = text
                    current_subsection = ""
                    current_content = []
                    current_level = 0
                    current_page = page_num
                    current_content_type = "content"  # Reset to content for main sections
                
            elif content_type == 'definition_header':
                # Save previous subsection if exists
                if current_subsection and current_content:
                    raw_segments.append({
                        'title': f"{current_main_section} - {current_subsection}",
                        'content': '\n'.join(current_content),
                        'level': 1,
                        'page_number': current_page,
                        'segment_type': current_content_type,  # Use current_content_type
                        'section_component': current_section_component,
                        'parent_key': current_subsection
                    })
                
                current_subsection = text
                current_content = []
                current_level = 1
                current_page = page_num
                current_content_type = "content"  # Reset to content for new definition
                
            elif content_type in ['example', 'syntax', 'options']:
                # Save current content first if exists
                if current_content:
                    title = current_subsection if current_subsection else current_main_section
                    parent_key = current_subsection if current_subsection else current_main_section
                    raw_segments.append({
                        'title': f"{current_main_section} - {title}",
                        'content': '\n'.join(current_content),
                        'level': current_level,
                        'page_number': current_page,
                        'segment_type': current_content_type,  # Use current_content_type
                        'section_component': current_section_component,
                        'parent_key': parent_key
                    })
                
                # Start collecting example/syntax/options content
                current_content = []
                current_content_type = content_type  # Update content type to match what we're collecting
                
            else:  # Regular content
                current_content.append(text)
        
        # Don't forget the last segment
        if current_content:
            title = current_subsection if current_subsection else current_main_section
            parent_key = current_subsection if current_subsection else current_main_section
            raw_segments.append({
                'title': f"{current_main_section} - {title}" if current_subsection else current_main_section,
                'content': '\n'.join(current_content),
                'level': current_level,
                'page_number': current_page,
                'segment_type': current_content_type,  # Use current_content_type
                'section_component': current_section_component,
                'parent_key': parent_key
            })
        
        print("DEBUG: Raw segments sample (2100:2130):")
        for i, seg in enumerate(raw_segments[2100:2130], 2100):
            print(f"  [{i}] Type: {seg['segment_type']}, Parent: {seg['parent_key']}, Content: {seg['content'][:50]}...")
        
        # Now merge related segments by parent_key
        merged_result = self.merge_related_segments(raw_segments)
        
        print(f"\nDEBUG: After merging - sample segments:")
        for i, seg in enumerate(merged_result[100:110], 100):
            print(f"  [{i}] Title: {seg.title}")
            print(f"      Has Content: {bool(seg.content.strip())}")
            print(f"      Has Syntax: {bool(seg.syntax.strip())}")
            print(f"      Has Examples: {len(seg.examples)}")
            print(f"      Has Options: {bool(seg.options.strip())}")
        
        return merged_result
    
    def merge_related_segments(self, raw_segments: List[Dict]) -> List[KnowledgeSegment]:
        """Merge content, syntax, examples, and options for the same knowledge item"""
        merged_segments = []
        segment_groups = {}
        
        # Group segments by parent_key and section_component
        for segment in raw_segments:
            key = f"{segment['section_component']}::{segment['parent_key']}"
            
            if key not in segment_groups:
                segment_groups[key] = {
                    'content': [],
                    'syntax': [],
                    'examples': [],
                    'options': [],
                    'title': segment['title'],
                    'level': segment['level'],
                    'page_number': segment['page_number'],
                    'segment_type': segment['segment_type'],
                    'section_component': segment['section_component']
                }
            
            content = segment['content']
            segment_type = segment['segment_type']  # Use the actual segment_type from raw data
            
            # Distribute content based on segment_type
            if segment_type == 'syntax':
                # Preserve "Syntax" label if not already present
                if not content.strip().startswith('Syntax'):
                    syntax_content = f"Syntax\n{content}"
                else:
                    syntax_content = content
                segment_groups[key]['syntax'].append(syntax_content)
                
            elif segment_type == 'example':
                # Preserve "Example" label if not already present
                if not content.strip().startswith('Example'):
                    example_content = f"Example\n{content}"
                else:
                    example_content = content
                segment_groups[key]['examples'].append(example_content)
                
            elif segment_type == 'options':
                # Preserve "Options" label if not already present
                if not content.strip().startswith('Options'):
                    options_content = f"Options\n{content}"
                else:
                    options_content = content
                segment_groups[key]['options'].append(options_content)
                
            else:  # content or other types
                # Regular content
                segment_groups[key]['content'].append(content)
        
        # Convert grouped segments to KnowledgeSegment objects
        for key, group in segment_groups.items():
            merged_segments.append(KnowledgeSegment(
                title=group['title'],
                content='\n\n'.join(group['content']) if group['content'] else '',
                syntax='\n\n'.join(group['syntax']) if group['syntax'] else '',
                examples=group['examples'],  # Keep as list since examples are handled separately
                options='\n\n'.join(group['options']) if group['options'] else '',
                level=group['level'],
                page_number=group['page_number'],
                segment_type=group['segment_type'],
                section_component=group['section_component']
            ))
        
        return merged_segments
    
    def filter_segments(self, segments: List[KnowledgeSegment], 
                       min_content_length: int = 30) -> List[KnowledgeSegment]:
        """Filter out segments that are too short or not meaningful"""
        filtered = []
        for segment in segments:
            # Skip table of contents and index-like content
            if any(keyword in segment.title.lower() for keyword in 
                   ['table of contents', 'index', 'customer service', "reader's comments"]):
                continue
            
            # Calculate total content length - now including options
            total_content = segment.content + segment.syntax + segment.options + ' '.join(segment.examples)
            
            # Skip very short content
            if len(total_content.strip()) < min_content_length:
                continue
            
            filtered.append(segment)
        
        return filtered
    
    def export_for_rag(self, segments: List[KnowledgeSegment], 
                  output_format: str = 'jsonl') -> List[Dict]:
        """Export segments in a format suitable for RAG pipeline with separate fields"""
        print(f"DEBUG: Starting export_for_rag with {len(segments)} segments")
        rag_documents = []
        
        for i, segment in enumerate(segments):
            
            # Create separate fields for each content type
            doc = {
                'id': f"doc_{i:04d}",
                'title': segment.title,
                'content': segment.content.strip() if segment.content else '',
                'syntax': segment.syntax.strip() if segment.syntax else '',
                'examples': segment.examples if segment.examples else [],
                'options': segment.options.strip() if segment.options else '',
                'metadata': {
                    'level': segment.level,
                    'page_number': segment.page_number,
                    'segment_type': segment.segment_type,
                    'section_component': segment.section_component,
                    'has_syntax': bool(segment.syntax),
                    'has_examples': bool(segment.examples),
                    'has_options': bool(segment.options),
                    'example_count': len(segment.examples),
                    'content_length': len(segment.content) if segment.content else 0,
                    'syntax_length': len(segment.syntax) if segment.syntax else 0,
                    'options_length': len(segment.options) if segment.options else 0
                }
            }
            rag_documents.append(doc)
        
        print(f"DEBUG: Finished export_for_rag, created {len(rag_documents)} documents")
        return rag_documents

# Usage example
def main():
    extractor = PDFKnowledgeExtractor()
    
    # Extract knowledge segments with smart merging
    pdf_path = r"C:\RocketBuild\Manual_D3.pdf"  # Replace with your PDF path
    segments = extractor.extract_knowledge_segments_with_merging(pdf_path)
    print(f"Extracted {len(segments)} knowledge segments from PDF")

    # Filter segments
    filtered_segments = extractor.filter_segments(segments)
    
    # Export for RAG pipeline
    rag_documents = extractor.export_for_rag(filtered_segments)
    
    # Display results with section component information
    print(f"Extracted {len(filtered_segments)} merged knowledge segments:")
    
    # Group by section component for better overview
    by_component = {}
    for segment in filtered_segments:
        component = segment.section_component
        if component not in by_component:
            by_component[component] = []
        by_component[component].append(segment)
    
    # Show summary by component
    print("\n=== Section Component Summary ===")
    for component, segs in by_component.items():
        syntax_count = sum(1 for seg in segs if seg.syntax)
        example_count = sum(len(seg.examples) for seg in segs)
        options_count = sum(1 for seg in segs if seg.options)
        print(f"{component}: {len(segs)} segments ({syntax_count} with syntax, {example_count} total examples, {options_count} with options)")
    
    # Show sample merged segments
    print("\n=== Sample Merged Segments ===")
    for component, segs in list(by_component.items())[:2]:  # Show first 2 components
        print(f"\n--- {component} Component ---")
        for segment in segs[:2]:  # Show first 2 segments per component
            print(f"\nTitle: {segment.title}")
            print(f"Component: {segment.section_component}")
            print(f"Page: {segment.page_number}")
            print(f"Has Syntax: {bool(segment.syntax)}")
            print(f"Has Options: {bool(segment.options)}")
            print(f"Examples: {len(segment.examples)}")
            if segment.content:
                print(f"Content Preview: {segment.content[:100]}...")
            if segment.syntax:
                print(f"Syntax Preview: {segment.syntax[:100]}...")
            if segment.options:
                print(f"Options Preview: {segment.options[:100]}...")
            if segment.examples:
                print(f"Example Preview: {segment.examples[0][:100]}...")
    
    # Save to file for RAG pipeline
    import json
    with open('merged_knowledge_segments.jsonl', 'w', encoding='utf-8') as f:
        for doc in rag_documents:
            f.write(json.dumps(doc, ensure_ascii=False) + '\n')
    
    print(f"\nExported {len(rag_documents)} merged documents to merged_knowledge_segments.jsonl")
    
    # Save detailed report
    with open('extraction_report.txt', 'w', encoding='utf-8') as f:
        f.write("PDF Knowledge Extraction Report (Fixed Merging)\n")
        f.write("=" * 50 + "\n\n")
        
        f.write("Section Component Summary:\n")
        f.write("-" * 25 + "\n")
        for component, segs in by_component.items():
            syntax_count = sum(1 for seg in segs if seg.syntax)
            example_count = sum(len(seg.examples) for seg in segs)
            options_count = sum(1 for seg in segs if seg.options)
            f.write(f"{component}: {len(segs)} segments ({syntax_count} with syntax, {example_count} examples, {options_count} with options)\n")
        
        f.write(f"\nTotal merged segments: {len(filtered_segments)}\n")
        f.write(f"Total pages processed: {max(seg.page_number for seg in filtered_segments)}\n")
        
        # Statistics
        total_with_syntax = sum(1 for seg in filtered_segments if seg.syntax)
        total_with_examples = sum(1 for seg in filtered_segments if seg.examples)
        total_with_options = sum(1 for seg in filtered_segments if seg.options)
        total_examples = sum(len(seg.examples) for seg in filtered_segments)
        
        f.write(f"\nStatistics:\n")
        f.write(f"- Segments with syntax: {total_with_syntax}\n")
        f.write(f"- Segments with examples: {total_with_examples}\n")
        f.write(f"- Segments with options: {total_with_options}\n")
        f.write(f"- Total examples: {total_examples}\n")
    
    print("Detailed report saved to extraction_report.txt")

if __name__ == "__main__":
    main()

DEBUG: Raw segments sample (2100:2130):
  [2100] Type: example, Parent: _CP_release_all, Content: /*The following example releases all previously se...
  [2101] Type: content, Parent: _CP_replace, Content: equivalent to the FlashBASIC statement: result = r...
  [2102] Type: syntax, Parent: _CP_replace, Content: int _CP_replace(CPSTR** result, CPSTR* string1, in...
  [2103] Type: example, Parent: _CP_replace, Content: /* The following example prints "c^b". */ CPSTR * ...
  [2104] Type: content, Parent: _CP_replace_bridge, Content: equivalent to the following FlashBASIC statement: ...
  [2105] Type: syntax, Parent: _CP_replace_bridge, Content: int _CP_replace_bridge(int expression, CPSTR* stri...
  [2106] Type: content, Parent: _CP_rewind, Content: equivalent to the FlashBASIC statement: rewind typ...
  [2107] Type: syntax, Parent: _CP_rewind, Content: int _CP_rewind(int type)...
  [2108] Type: example, Parent: _CP_rewind, Content: /* The following example rewinds the tape. */ _CP_...
  