In [3]:
# !pip install Docling

In [12]:
import json
from typing import Dict, List, Any, Optional
from docling.document_converter import DocumentConverter
from docling_core.types.doc import DoclingDocument, NodeItem, TextItem, TableItem, PictureItem


class DoclingHierarchyMapper:
    """
    Creates AWS Textract-like hierarchical mapping from Docling documents.
    Maps sections -> subsections -> paragraphs with relationship tracking.
    """

    def __init__(self):
        self.converter = DocumentConverter()
        self.hierarchy_map = {}
        self.element_counter = 0

    def convert_document(self, source: str) -> DoclingDocument:
        """Convert document using Docling"""
        result = self.converter.convert(source)
        return result.document

    def generate_element_id(self, element_type: str) -> str:
        """Generate unique element ID"""
        self.element_counter += 1
        return f"{element_type}_{self.element_counter:04d}"

    def extract_hierarchy_mapping(self, document: DoclingDocument) -> Dict[str, Any]:
        """
        Extract hierarchical mapping from DoclingDocument.
        Returns AWS Textract-like structure with sections, subsections, and paragraphs.
        """
        self.hierarchy_map = {
            "document_metadata": self._extract_document_metadata(document),
            "sections": [],
            "element_relationships": {},
            "content_summary": {
                "total_sections": 0,
                "total_subsections": 0,
                "total_paragraphs": 0,
                "total_tables": 0,
                "total_images": 0
            }
        }

        # Debug: Print document structure
        print(f"Document has body: {document.body is not None}")
        if document.body:
            print(f"Body has children: {hasattr(document.body, 'children')}")
            if hasattr(document.body, 'children'):
                print(f"Number of body children: {len(document.body.children)}")

        # Process the document body structure
        if document.body:
            self._process_body_structure(document, document.body)

        # If no sections found from body, try processing all text items directly
        if not self.hierarchy_map["sections"]:
            print("No sections found in body, trying direct text processing...")
            self._process_text_items_directly(document)

        # Process tables separately
        self._process_tables(document)

        # Process images separately
        self._process_images(document)

        # Update summary counts
        self._update_content_summary()

        return self.hierarchy_map

    def _extract_document_metadata(self, document: DoclingDocument) -> Dict[str, Any]:
        """Extract document-level metadata"""
        metadata = {
            "page_count": len(document.pages) if document.pages else 0,
            "total_text_items": len(document.texts),
            "total_tables": len(document.tables),
            "total_pictures": len(document.pictures),
            "document_type": "multi_format_document"
        }

        # Add origin information if available
        if hasattr(document, 'origin') and document.origin:
            metadata["source"] = document.origin.filename if hasattr(document.origin, 'filename') else str(document.origin)

        return metadata

    def _process_body_structure(self, document: DoclingDocument, body_node: NodeItem, parent_id: str = None, level: int = 0):
        """Process the document body structure recursively"""
        if not body_node or not hasattr(body_node, 'children'):
            return

        current_section = None

        for child_ref in body_node.children:
            try:
                child_item = self._resolve_reference(document, child_ref)

                if child_item and hasattr(child_item, 'label'):
                    item_type = self._classify_item_type(child_item)

                    if item_type == "section_heading":
                        current_section = self._create_section(child_item, parent_id, level)
                        self.hierarchy_map["sections"].append(current_section)

                        # Process children of this section
                        if hasattr(child_item, 'children') and child_item.children:
                            self._process_section_children(document, child_item, current_section["id"], level + 1)

                    elif item_type == "paragraph" and current_section:
                        paragraph = self._create_paragraph(child_item, current_section["id"])
                        current_section["paragraphs"].append(paragraph)
                        self._add_relationship(current_section["id"], paragraph["id"], "contains")

                    elif item_type == "list" and current_section:
                        list_item = self._create_list_item(child_item, current_section["id"])
                        current_section["lists"].append(list_item)
                        self._add_relationship(current_section["id"], list_item["id"], "contains")

            except Exception as e:
                print(f"Error processing child reference {child_ref}: {e}")
                continue

    def _process_text_items_directly(self, document: DoclingDocument):
        """Process text items directly when no body structure is found"""
        current_section = None

        for i, text_item in enumerate(document.texts):
            item_type = self._classify_item_type(text_item)

            if item_type == "section_heading":
                current_section = self._create_section(text_item, None, 0)
                self.hierarchy_map["sections"].append(current_section)

            elif item_type == "paragraph":
                if current_section is None:
                    # Create a default section if none exists
                    default_section = {
                        "id": self.generate_element_id("section"),
                        "type": "section",
                        "level": 0,
                        "title": "Document Content",
                        "parent_id": None,
                        "subsections": [],
                        "paragraphs": [],
                        "lists": [],
                        "tables": [],
                        "images": [],
                        "bounding_box": None,
                        "metadata": {
                            "item_type": "default",
                            "page_number": None
                        }
                    }
                    self.hierarchy_map["sections"].append(default_section)
                    current_section = default_section

                paragraph = self._create_paragraph(text_item, current_section["id"])
                current_section["paragraphs"].append(paragraph)
                self._add_relationship(current_section["id"], paragraph["id"], "contains")

    def _process_section_children(self, document: DoclingDocument, section_node: NodeItem, parent_section_id: str, level: int):
        """Process children of a section node"""
        if not hasattr(section_node, 'children'):
            return

        for child_ref in section_node.children:
            try:
                child_item = self._resolve_reference(document, child_ref)

                if child_item:
                    item_type = self._classify_item_type(child_item)

                    if item_type == "section_heading":
                        # This is a subsection
                        subsection = self._create_section(child_item, parent_section_id, level)

                        # Find parent section and add subsection
                        parent_section = self._find_section_by_id(parent_section_id)
                        if parent_section:
                            parent_section["subsections"].append(subsection)
                            self._add_relationship(parent_section_id, subsection["id"], "contains")

                        # Process subsection children
                        if hasattr(child_item, 'children') and child_item.children:
                            self._process_section_children(document, child_item, subsection["id"], level + 1)

                    elif item_type == "paragraph":
                        paragraph = self._create_paragraph(child_item, parent_section_id)
                        parent_section = self._find_section_by_id(parent_section_id)
                        if parent_section:
                            parent_section["paragraphs"].append(paragraph)
                            self._add_relationship(parent_section_id, paragraph["id"], "contains")
            except Exception as e:
                print(f"Error processing section child {child_ref}: {e}")
                continue

    def _classify_item_type(self, item) -> str:
        """Classify the type of document item"""
        if hasattr(item, 'label'):
            label = item.label.lower()
            # More comprehensive heading detection
            if any(heading in label for heading in ['title', 'heading', 'section', 'header', 'caption']):
                return "section_heading"
            elif any(list_word in label for list_word in ['list', 'item', 'bullet']):
                return "list"
            elif any(para in label for para in ['paragraph', 'text', 'body']):
                return "paragraph"

        # Default classification based on content
        if hasattr(item, 'text') and item.text:
            text = item.text.strip()
            text_length = len(text)

            # Heuristics for section headings
            if text_length < 200 and text_length > 0:
                # Check if it looks like a heading
                lines = text.split('\n')
                if len(lines) <= 2:  # Short, likely a heading
                    # Additional checks for heading characteristics
                    if (text.isupper() or
                        any(char.isdigit() for char in text[:10]) or  # Starts with numbers
                        text.endswith(':') or
                        len(text.split()) <= 10):  # Short phrases
                        return "section_heading"

            return "paragraph"

        return "unknown"

    def _create_section(self, item, parent_id: str, level: int) -> Dict[str, Any]:
        """Create a section dictionary"""
        section_id = self.generate_element_id("section")

        section = {
            "id": section_id,
            "type": "section",
            "level": level,
            "title": item.text if hasattr(item, 'text') else "",
            "parent_id": parent_id,
            "subsections": [],
            "paragraphs": [],
            "lists": [],
            "tables": [],
            "images": [],
            "bounding_box": self._extract_bounding_box(item),
            "metadata": {
                "item_type": item.label if hasattr(item, 'label') else "unknown",
                "page_number": self._get_page_number(item)
            }
        }

        return section

    def _create_paragraph(self, item, parent_id: str) -> Dict[str, Any]:
        """Create a paragraph dictionary"""
        paragraph_id = self.generate_element_id("paragraph")

        paragraph = {
            "id": paragraph_id,
            "type": "paragraph",
            "parent_id": parent_id,
            "text": item.text if hasattr(item, 'text') else "",
            "bounding_box": self._extract_bounding_box(item),
            "metadata": {
                "item_type": item.label if hasattr(item, 'label') else "unknown",
                "page_number": self._get_page_number(item),
                "word_count": len(item.text.split()) if hasattr(item, 'text') and item.text else 0
            }
        }

        return paragraph

    def _create_list_item(self, item, parent_id: str) -> Dict[str, Any]:
        """Create a list item dictionary"""
        list_id = self.generate_element_id("list")

        list_item = {
            "id": list_id,
            "type": "list",
            "parent_id": parent_id,
            "items": [],
            "bounding_box": self._extract_bounding_box(item),
            "metadata": {
                "item_type": item.label if hasattr(item, 'label') else "unknown",
                "page_number": self._get_page_number(item)
            }
        }

        return list_item

    def _process_tables(self, document: DoclingDocument):
        """Process tables in the document"""
        for table in document.tables:
            table_id = self.generate_element_id("table")

            table_data = {
                "id": table_id,
                "type": "table",
                "rows": [],
                "columns": [],
                "bounding_box": self._extract_bounding_box(table),
                "metadata": {
                    "page_number": self._get_page_number(table)
                }
            }

            # Extract table structure if available
            if hasattr(table, 'data') and table.data:
                table_data["rows"] = self._extract_table_rows(table.data)

            # Try to associate table with nearest section
            nearest_section = self._find_nearest_section(table)
            if nearest_section:
                nearest_section["tables"].append(table_data)
                self._add_relationship(nearest_section["id"], table_id, "contains")

    def _process_images(self, document: DoclingDocument):
        """Process images in the document"""
        for image in document.pictures:
            image_id = self.generate_element_id("image")

            image_data = {
                "id": image_id,
                "type": "image",
                "bounding_box": self._extract_bounding_box(image),
                "metadata": {
                    "page_number": self._get_page_number(image)
                }
            }

            # Try to associate image with nearest section
            nearest_section = self._find_nearest_section(image)
            if nearest_section:
                nearest_section["images"].append(image_data)
                self._add_relationship(nearest_section["id"], image_id, "contains")

    def _extract_bounding_box(self, item) -> Optional[Dict[str, float]]:
        """Extract bounding box information if available"""
        if hasattr(item, 'bbox') and item.bbox:
            return {
                "left": item.bbox.l,
                "top": item.bbox.t,
                "width": item.bbox.w,
                "height": item.bbox.h
            }
        return None

    def _get_page_number(self, item) -> Optional[int]:
        """Get page number for an item"""
        if hasattr(item, 'prov') and item.prov:
            for prov in item.prov:
                if hasattr(prov, 'page_no'):
                    return prov.page_no
        return None

    def _extract_table_rows(self, table_data) -> List[Dict[str, Any]]:
        """Extract table rows from table data"""
        rows = []
        if hasattr(table_data, 'table_cells'):
            # Group cells by row
            row_groups = {}
            for cell in table_data.table_cells:
                row_idx = cell.row_idx if hasattr(cell, 'row_idx') else 0
                if row_idx not in row_groups:
                    row_groups[row_idx] = []
                row_groups[row_idx].append({
                    "text": cell.text if hasattr(cell, 'text') else "",
                    "column_idx": cell.col_idx if hasattr(cell, 'col_idx') else 0
                })

            # Convert to list of rows
            for row_idx in sorted(row_groups.keys()):
                rows.append({
                    "row_idx": row_idx,
                    "cells": sorted(row_groups[row_idx], key=lambda x: x['column_idx'])
                })

        return rows

    def _resolve_reference(self, document: DoclingDocument, ref):
        """Resolve a JSON pointer reference to an actual item"""
        try:
            # Handle different types of references
            if hasattr(ref, 'ref'):
                # This is a RefItem object, get the actual reference string
                ref_str = ref.ref
            elif isinstance(ref, str):
                # This is already a string reference
                ref_str = ref
            else:
                # Try to convert to string
                ref_str = str(ref)

            # Parse JSON pointer (e.g., "#/texts/0")
            if ref_str.startswith('#/'):
                parts = ref_str[2:].split('/')  # Remove '#/' and split
            else:
                parts = ref_str.split('/')

            if len(parts) >= 2:
                collection_name = parts[0]
                try:
                    index = int(parts[1])
                except ValueError:
                    print(f"Warning: Could not resolve reference {ref_str}: invalid literal for int() with base 10: '{parts[1]}'")
                    return None

                if collection_name == 'texts' and index < len(document.texts):
                    return document.texts[index]
                elif collection_name == 'tables' and index < len(document.tables):
                    return document.tables[index]
                elif collection_name == 'pictures' and index < len(document.pictures):
                    return document.pictures[index]
                elif collection_name == 'groups' and hasattr(document, 'groups') and index < len(document.groups):
                    return document.groups[index]
                else:
                    print(f"Warning: Reference {ref_str} points to non-existent item")

        except (ValueError, IndexError, AttributeError) as e:
            print(f"Warning: Could not resolve reference {ref}: {e}")

        return None

    def _find_section_by_id(self, section_id: str) -> Optional[Dict[str, Any]]:
        """Find a section by ID in the hierarchy"""
        def search_sections(sections):
            for section in sections:
                if section["id"] == section_id:
                    return section
                # Search in subsections
                found = search_sections(section["subsections"])
                if found:
                    return found
            return None

        return search_sections(self.hierarchy_map["sections"])

    def _find_nearest_section(self, item) -> Optional[Dict[str, Any]]:
        """Find the nearest section for an item (simplified heuristic)"""
        # This is a simplified implementation
        # In practice, you'd use spatial relationships and page positioning
        if self.hierarchy_map["sections"]:
            return self.hierarchy_map["sections"][-1]  # Return last section for now
        return None

    def _add_relationship(self, parent_id: str, child_id: str, relationship_type: str):
        """Add a relationship between elements"""
        if parent_id not in self.hierarchy_map["element_relationships"]:
            self.hierarchy_map["element_relationships"][parent_id] = []

        self.hierarchy_map["element_relationships"][parent_id].append({
            "child_id": child_id,
            "relationship": relationship_type
        })

    def _update_content_summary(self):
        """Update content summary counts"""
        def count_elements(sections):
            section_count = len(sections)
            subsection_count = 0
            paragraph_count = 0
            table_count = 0
            image_count = 0

            for section in sections:
                subsection_count += len(section["subsections"])
                paragraph_count += len(section["paragraphs"])
                table_count += len(section["tables"])
                image_count += len(section["images"])

                # Recursively count subsections
                sub_counts = count_elements(section["subsections"])
                subsection_count += sub_counts[1]
                paragraph_count += sub_counts[2]
                table_count += sub_counts[3]
                image_count += sub_counts[4]

            return section_count, subsection_count, paragraph_count, table_count, image_count

        counts = count_elements(self.hierarchy_map["sections"])
        self.hierarchy_map["content_summary"].update({
            "total_sections": counts[0],
            "total_subsections": counts[1],
            "total_paragraphs": counts[2],
            "total_tables": counts[3],
            "total_images": counts[4]
        })

    def get_section_tree(self) -> Dict[str, Any]:
        """Get a simplified section tree view"""
        def build_tree(sections):
            tree = {}
            for section in sections:
                tree[section["title"]] = {
                    "id": section["id"],
                    "level": section["level"],
                    "paragraph_count": len(section["paragraphs"]),
                    "subsections": build_tree(section["subsections"])
                }
            return tree

        return build_tree(self.hierarchy_map["sections"])

    def export_to_json(self, filepath: str):
        """Export hierarchy mapping to JSON file"""
        with open(filepath, 'w', encoding='utf-8') as f:
            json.dump(self.hierarchy_map, f, indent=2, ensure_ascii=False)

    def print_hierarchy_summary(self):
        """Print a summary of the document hierarchy"""
        print("Document Hierarchy Summary:")
        print("-" * 50)

        summary = self.hierarchy_map["content_summary"]
        print(f"Total Sections: {summary['total_sections']}")
        print(f"Total Subsections: {summary['total_subsections']}")
        print(f"Total Paragraphs: {summary['total_paragraphs']}")
        print(f"Total Tables: {summary['total_tables']}")
        print(f"Total Images: {summary['total_images']}")

        print("\nSection Structure:")
        tree = self.get_section_tree()
        self._print_tree(tree, 0)

    def _print_tree(self, tree, level):
        """Print tree structure with indentation"""
        for title, data in tree.items():
            indent = "  " * level
            print(f"{indent}- {title} (ID: {data['id']}, Paragraphs: {data['paragraph_count']})")
            if data['subsections']:
                self._print_tree(data['subsections'], level + 1)


In [13]:
# Example usage and debugging
def main():
    # Initialize the mapper
    mapper = DoclingHierarchyMapper()

    # Convert a document (can be PDF, DOCX, etc.)
    document_path = "Contract document.pdf"  # Replace with actual path

    try:
        # Convert document
        document = mapper.convert_document(document_path)

        # Debug information
        print("=== DEBUG INFORMATION ===")
        print(f"Document texts count: {len(document.texts)}")
        print(f"Document tables count: {len(document.tables)}")
        print(f"Document pictures count: {len(document.pictures)}")

        # Sample first few text items
        print("\nFirst 5 text items:")
        for i, text_item in enumerate(document.texts[:5]):
            print(f"  {i}: {text_item.label if hasattr(text_item, 'label') else 'No label'} - {text_item.text[:100] if hasattr(text_item, 'text') else 'No text'}...")

        # Extract hierarchy mapping
        hierarchy = mapper.extract_hierarchy_mapping(document)

        # Print summary
        mapper.print_hierarchy_summary()

        # Export to JSON
        mapper.export_to_json("document_hierarchy.json")

        # Example: Access specific elements
        print("\n=== RESULTS ===")
        if hierarchy["sections"]:
            first_section = hierarchy["sections"][0]
            print(f"First section title: {first_section['title']}")
            print(f"First section level: {first_section['level']}")
            print(f"First section paragraphs: {len(first_section['paragraphs'])}")
            print(f"First section subsections: {len(first_section['subsections'])}")
        else:
            print("No sections found in document")

    except Exception as e:
        print(f"Error processing document: {e}")
        import traceback
        traceback.print_exc()


if __name__ == "__main__":
    main()

=== DEBUG INFORMATION ===
Document texts count: 279
Document tables count: 2
Document pictures count: 0

First 5 text items:
  0: section_header - This draft agreement is subject to change/fine tuning before final award of the contract...
  1: section_header - (Sample Contract Agreement)...
  2: section_header - AGREEMENT FOR HALL OF RESIDENCE NO. - __...
  3: text - THIS  AGREEMENT has  been  made  on  this  __ th day  of  October,  2012  at  IIT  Kanpur BETWEEN In...
  4: text - AND M/s __________ registered under ________________________ and having it's office at _____________...
Document has body: True
Body has children: True
Number of body children: 152
No sections found in body, trying direct text processing...
Document Hierarchy Summary:
--------------------------------------------------
Total Sections: 69
Total Subsections: 0
Total Paragraphs: 72
Total Tables: 2
Total Images: 0

Section Structure:
- This draft agreement is subject to change/fine tuning before final award of the

In [7]:
!ls

'Contract document.pdf'   sample_data
