In [1]:
from pathlib import Path
from typing import List, Optional, Union, Literal, Tuple, Any
from PIL import Image
from docling.document_converter import DocumentConverter, PdfFormatOption
from docling.datamodel.base_models import InputFormat, DocumentStream
from docling.datamodel.pipeline_options import (
    AcceleratorDevice,
    AcceleratorOptions, 
    PdfPipelineOptions,
    smolvlm_picture_description
)
from docling_core.types.doc import PictureItem
import io
import base64
import json

  from .autonotebook import tqdm as notebook_tqdm


In [None]:
class DocumentProcessor:
    def __init__(
        self,
        device: Optional[str] = None,
        num_threads: int = 8
    ):
        """
        Initialize the processor

        Args:
            device: Device for processing ('cuda', 'mps', 'cpu', or 'auto'). 
                If None, will use 'auto'.
            num_threads: Number of threads to use for processing
            picture_description: Type of picture description to use:
                - 'none': No picture description
                - 'smolVLM': Lightweight vision-language model
                - 'granite': Advanced vision-language model
            images_scale: Scale factor for images (default: 300/72.0)
        """
        device_map = {
            "cuda": AcceleratorDevice.CUDA,
            "mps": AcceleratorDevice.MPS, 
            "cpu": AcceleratorDevice.CPU,
            "auto": AcceleratorDevice.AUTO,
        }
        
        self.device = device or "auto"
        if self.device not in device_map:
            msg = f"Invalid device '{device}'. Must be one of: {list(device_map.keys())}"
            raise ValueError(msg)

        self.pipeline_options = PdfPipelineOptions()
        self.pipeline_options.images_scale = 300/72.0
        self.pipeline_options.generate_page_images = True
        self.pipeline_options.generate_picture_images = True
        self.pipeline_options.do_formula_enrichment = True
        self.pipeline_options.accelerator_options = AcceleratorOptions(
            num_threads=num_threads,
            device=device_map[self.device]
        )
        self.pipeline_options.do_picture_description = False
        prompt = "Describe what you can see in this image. Focus only on what is visually present. Be concise and accurate in three sentences maximum."
        self.pipeline_options.picture_description_options = smolvlm_picture_description
        self.pipeline_options.picture_description_options.prompt = prompt
        self.pipeline_options.picture_description_options.generation_config = {
            "max_new_tokens": 500,
            "do_sample": False,
        }

        self.converter = DocumentConverter(
            format_options={
                InputFormat.PDF: PdfFormatOption(
                    pipeline_options=self.pipeline_options
                )
            }
        )

    def __call__(
        self, 
        file_path: Union[str, Path]
    ) -> Tuple[List[str], List[Image.Image], List[Image.Image]]:
        """
        Process a document and return markdown pages and images

        Args:
            file_path: Path to the PDF file

        Returns:
            Tuple containing:
            - List of markdown strings (one per page)
            - List of extracted images (figures/graphs)
            - List of page images
        """
        file_path = Path(file_path) if isinstance(file_path, str) else file_path

        result = self.converter.convert(file_path)

        page_images = []
        extracted_images = []
        markdown_pages = []
        picture_descriptions = {}

        # Get page images from the conversion result
        for page_no, page in result.document.pages.items():
            if hasattr(page, 'image') and page.image is not None:
                page_images.append(page.image.pil_image)

        # Collect images and descriptions
        for element, _level in result.document.iterate_items():
            if isinstance(element, PictureItem):
                page_no = element.prov[0].page_no
                
                if hasattr(element, 'image') and element.image is not None:
                    extracted_images.append(element.image.pil_image)
                
                if page_no not in picture_descriptions:
                    picture_descriptions[page_no] = []
                
                if element.annotations:
                    ann = element.annotations[0]
                    desc = f"**AI-Generated Image Description:** {ann.text}\n<!-- end image description -->"
                    picture_descriptions[page_no].append(desc)

        # Process markdown pages
        for i in range(result.document.num_pages()):
            page_no = i + 1
            page_md = result.document.export_to_markdown(page_no=page_no)
            
            if page_no in picture_descriptions:
                parts = page_md.split("<!-- image -->")
                new_page_md = parts[0]
                
                for idx, part in enumerate(parts[1:]):
                    if idx < len(picture_descriptions[page_no]):
                        description = picture_descriptions[page_no][idx]
                        new_page_md += f"<!-- image -->\n{description}\n{part}"
                    else:
                        new_page_md += f"<!-- image -->{part}"
                
                page_md = new_page_md
            
            markdown_pages.append(page_md)

        return markdown_pages, extracted_images, page_images

In [None]:
class DocumentProcessor:
    def __init__(
        self,
        device: Optional[str] = None,
        num_threads: int = 8
    ):
        device_map = {
            "cuda": AcceleratorDevice.CUDA,
            "mps": AcceleratorDevice.MPS, 
            "cpu": AcceleratorDevice.CPU,
            "auto": AcceleratorDevice.AUTO,
        }

        self.device = device or "auto"
        if self.device not in device_map:
            msg = f"Invalid device '{device}'. Must be one of: {list(device_map.keys())}"
            raise ValueError(msg)

        self.pipeline_options = PdfPipelineOptions()
        self.pipeline_options.images_scale = 150/72.0
        self.pipeline_options.generate_page_images = True
        self.pipeline_options.generate_picture_images = True
        self.pipeline_options.do_formula_enrichment = True
        self.pipeline_options.accelerator_options = AcceleratorOptions(
            num_threads=num_threads,
            device=device_map[self.device]
        )
        self.pipeline_options.do_picture_description = False

        self.converter = DocumentConverter(
            format_options={
                InputFormat.PDF: PdfFormatOption(
                    pipeline_options=self.pipeline_options
                )
            }
        )

    def __call__(
        self, 
        file_path: Union[str, Path]
    ) -> Tuple[List[str], List[Image.Image], List[Image.Image]]:
        file_path = Path(file_path) if isinstance(file_path, str) else file_path

        result = self.converter.convert(file_path)

        extracted_images = []
        markdown_pages = []
        pages_with_images = []

        # Extract images
        for element, _level in result.document.iterate_items():
            if isinstance(element, PictureItem):
                page_no = element.prov[0].page_no

                if hasattr(element, 'image') and element.image is not None:
                    extracted_images.append(element.image.pil_image)
                    if page_no not in pages_with_images:
                        pages_with_images.append(page_no)

        # Process markdown pages
        for page in range(result.document.num_pages()):
            page_no = page + 1
            page_md = result.document.export_to_markdown(page_no=page_no)
            markdown_pages.append(page_md)

        return markdown_pages, extracted_images, pages_with_images

In [None]:
class DocumentProcessor:
    def __init__(
        self,
    ):
        self.pipeline_options = PdfPipelineOptions()
        self.pipeline_options.images_scale = 300/72.0
        self.pipeline_options.generate_page_images = True
        self.pipeline_options.generate_picture_images = True
        self.pipeline_options.do_formula_enrichment = True
        self.pipeline_options.accelerator_options = AcceleratorOptions(
            num_threads=8,
            device=AcceleratorDevice.CUDA
        )
        self.pipeline_options.do_picture_description = False
        prompt = "Describe what you can see in this image. Focus only on what is visually present. Be concise and accurate in three sentences maximum."
        self.pipeline_options.picture_description_options = smolvlm_picture_description
        self.pipeline_options.picture_description_options.prompt = prompt
        self.pipeline_options.picture_description_options.generation_config = {
            "max_new_tokens": 500,
            "do_sample": False,
        }

        self.converter = DocumentConverter(
            format_options={
                InputFormat.PDF: PdfFormatOption(
                    pipeline_options=self.pipeline_options
                )
            }
        )

    def __call__(
        self, 
        base64_content: str
    ) -> Tuple[List[str], List[str], List[str]]:
        """
        Process a document and return markdown pages and base64 encoded images

        Args:
            base64_content: Base64 encoded PDF file content

        Returns:
            Tuple containing:
            - List of markdown strings (one per page)
            - List of base64 encoded extracted images (figures/graphs)
            - List of base64 encoded page images
        """
        # Decode base64 content to bytes
        pdf_content = base64.b64decode(base64_content)
        
        # Create BytesIO object and DocumentStream
        pdf_stream = io.BytesIO(pdf_content)
        source = DocumentStream(name="doc.pdf", stream=pdf_stream)

        # Convert using DocumentStream
        result = self.converter.convert(source)

        page_images = []
        extracted_images = []
        markdown_pages = []
        picture_descriptions = {}

        # Get page images from the conversion result
        for page_no, page in result.document.pages.items():
            if hasattr(page, 'image') and page.image is not None:
                page_images.append(str(page.image.uri))

        # Collect images and descriptions
        for element, _level in result.document.iterate_items():
            if isinstance(element, PictureItem):
                page_no = element.prov[0].page_no
                
                if hasattr(element, 'image') and element.image is not None:
                    extracted_images.append(str(element.image.uri))
                
                if page_no not in picture_descriptions:
                    picture_descriptions[page_no] = []
                
                if element.annotations:
                    ann = element.annotations[0]
                    desc = f"**AI-Generated Image Description:** {ann.text}\n<!-- end image description -->"
                    picture_descriptions[page_no].append(desc)

        # Process markdown pages
        for i in range(result.document.num_pages()):
            page_no = i + 1
            page_md = result.document.export_to_markdown(page_no=page_no)
            
            if page_no in picture_descriptions:
                parts = page_md.split("<!-- image -->")
                new_page_md = parts[0]
                
                for idx, part in enumerate(parts[1:]):
                    if idx < len(picture_descriptions[page_no]):
                        description = picture_descriptions[page_no][idx]
                        new_page_md += f"<!-- image -->\n{description}\n{part}"
                    else:
                        new_page_md += f"<!-- image -->{part}"
                
                page_md = new_page_md
            
            markdown_pages.append(page_md)

        return markdown_pages, extracted_images, page_images

In [None]:
doc_processor = DocumentProcessor()

In [None]:
import base64

with open("samples/[table] Black White Minimalist Simple Creative Freelancer Invoice (1).pdf", 'rb') as pdf_file:
    encoded_string = base64.b64encode(pdf_file.read())
    base64_string = encoded_string.decode('utf-8')

In [None]:
output_file = "../models/v0.1.0/document_base64.txt"
with open(output_file, "w") as f:
    f.write('"'+base64_string+'"')

In [None]:
output_file = "../models/v0.1.0/sample_payload.json"

# Create a JSON payload
payload = {"pdf_content": base64_string}

# Write the JSON payload to a file
with open(output_file, "w") as f:
    json.dump(payload, f)

In [None]:
markdown_pages, extracted_images, page_images = doc_processor(base64_string)

In [32]:
doc_processor = DocumentProcessor(
    device="cuda",
    num_threads=8,
)

In [36]:
markdown_pages, extracted_images, pages_with_images = doc_processor("samples/sample.pptx")

In [37]:
markdown_pages

['Harnessing High Frequency Data To Inform Development and Humanitarian Interventions\n\nChristopher B. Barrett\n\nKeynote address to the World Bank conference\n\nThe Pulse of Progress: Harnessing High-Frequency Survey Data for Development Research in the Polycrisis Era\n\nWashington, DC\n\nDecember 17, 2024\n\n<!-- image -->',
 'Statistically representative observational data essential for accurate descriptive/predictive analysis. Often useful for inferential analysis.\n\nWorld Bank established LSMS &gt;40 years ago for comparable measurement of living standards defined broadly. With improved measurement came improved analysis. \n\n\t\t\t\t– Angus Deaton 1997 (and 2018)\n\n“To direct scarce resources to where they can do the greatest good, actions must be guided by reliable information … Measurement drives diagnosis and response. ” \n\n\t\t\t\t– Barrett (Science 2010)\n\n<!-- image -->\n\nWhy national survey data?\n\n<!-- image -->\n\n<!-- image -->',
 'Living standards are dynamic. A

In [30]:
extracted_images[0] == extracted_images[0]

True

In [None]:
print(markdown_pages[6])

In [None]:
extracted_images[0].save("test0.png")
extracted_images[1].save("test1.png")
extracted_images[2].save("test2.png")
extracted_images[3].save("test3.png")
extracted_images[4].save("test4.png")

In [None]:
counter = 0
for i in extracted_images:
    i.save(f"test{counter}.png")
    counter += 1