In [None]:
import hashlib
import logging
import sys
import streamlit as st
import tempfile
from collections.abc import Iterable, Iterator
from functools import partial
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Type, Union
import shutil
from pydantic import BaseModel, ConfigDict, model_validator, validate_call

from docling.datamodel.base_models import (
    ConversionStatus,
    DoclingComponentType,
    DocumentStream,
    ErrorItem,
    InputFormat,
)
from docling.datamodel.document import (
    ConversionResult,
    InputDocument,
    _DocumentConversionInput,
)
from docling.datamodel.pipeline_options import PipelineOptions
from docling.datamodel.settings import (
    DEFAULT_PAGE_RANGE,
    DocumentLimits,
    PageRange,
    settings,
)
from docling.exceptions import ConversionError
from docling.pipeline.asr_pipeline import AsrPipeline
from docling.pipeline.base_pipeline import BasePipeline
from docling.pipeline.simple_pipeline import SimplePipeline
from docling.pipeline.standard_pdf_pipeline import StandardPdfPipeline
from docling.utils.utils import chunkify

import importlib

_log = logging.getLogger(__name__)

backend_classes = {
    'CsvDocumentBackend': 'docling.backend.csv_backend',
    'MsWordDocumentBackend': 'docling.backend.msword_backend',
    'MsExcelDocumentBackend': 'docling.backend.msexcel_backend',
    'MsPowerpointDocumentBackend': 'docling.backend.mspowerpoint_backend',
    'AsciiDocBackend': 'docling.backend.asciidoc_backend',
    'MarkdownDocumentBackend': 'docling.backend.md_backend',
    'HTMLDocumentBackend': 'docling.backend.html_backend',
    'DoclingParseV4DocumentBackend': 'docling.backend.docling_parse_v4_backend',
    'PatentUsptoDocumentBackend': 'docling.backend.xml.uspto_backend',
    'JatsDocumentBackend': 'docling.backend.xml.jats_backend',
    'DoclingJSONBackend': 'docling.backend.json.docling_json_backend',
    'NoOpBackend': 'docling.backend.noop_backend',
    'AbstractDocumentBackend': 'docling.backend.abstract_backend',
}

globals().update({
    name: getattr(importlib.import_module(module), name)
    for name, module in backend_classes.items()
})

class FormatOption(BaseModel):
    pipeline_cls: Type[BasePipeline]
    pipeline_options: Optional[PipelineOptions] = None
    backend: Type[AbstractDocumentBackend]

    model_config = ConfigDict(arbitrary_types_allowed=True)

    @model_validator(mode="after")
    def set_defaults_if_needed(self) -> "FormatOption":
        if self.pipeline_options is None:
            self.pipeline_options = self.pipeline_cls.get_default_options()
        return self

FORMAT_DEFAULTS: dict[InputFormat, tuple[Type[BasePipeline], Type[AbstractDocumentBackend]]] = {
    InputFormat.CSV: (SimplePipeline, CsvDocumentBackend),
    InputFormat.XLSX: (SimplePipeline, MsExcelDocumentBackend),
    InputFormat.DOCX: (SimplePipeline, MsWordDocumentBackend),
    InputFormat.PPTX: (SimplePipeline, MsPowerpointDocumentBackend),
    InputFormat.MD: (SimplePipeline, MarkdownDocumentBackend),
    InputFormat.ASCIIDOC: (SimplePipeline, AsciiDocBackend),
    InputFormat.HTML: (SimplePipeline, HTMLDocumentBackend),
    InputFormat.XML_USPTO: (SimplePipeline, PatentUsptoDocumentBackend),
    InputFormat.XML_JATS: (SimplePipeline, JatsDocumentBackend),
    InputFormat.IMAGE: (StandardPdfPipeline, DoclingParseV4DocumentBackend),
    InputFormat.PDF: (StandardPdfPipeline, DoclingParseV4DocumentBackend),
    InputFormat.JSON_DOCLING: (SimplePipeline, DoclingJSONBackend),
    InputFormat.AUDIO: (AsrPipeline, NoOpBackend),
}

def get_format_option(input_format: InputFormat) -> FormatOption:
    try:
        pipeline_cls, backend = FORMAT_DEFAULTS[input_format]
        return FormatOption(pipeline_cls=pipeline_cls, backend=backend)
    except KeyError:
        raise RuntimeError(f"No default options configured for {input_format}")

class DocumentConverter:
    def __init__(
        self,
        allowed_formats: Optional[List[InputFormat]] = None,
        format_options: Optional[Dict[InputFormat, FormatOption]] = None,
    ):
        self.allowed_formats = allowed_formats or list(InputFormat)
        self.format_to_options = {
            fmt: format_options.get(fmt) if format_options and fmt in format_options else get_format_option(fmt)
            for fmt in self.allowed_formats
        }
        self.initialized_pipelines: Dict[Tuple[Type[BasePipeline], str], BasePipeline] = {}

    def get_pipeline_hash(self, pipeline_options: PipelineOptions) -> str:
        options_str = str(pipeline_options.model_dump())
        return hashlib.md5(options_str.encode("utf-8"), usedforsecurity=False).hexdigest()

    def get_or_create_pipeline(self, doc_format: InputFormat) -> Optional[BasePipeline]:
        fmt_option = self.format_to_options.get(doc_format)
        if not fmt_option or not fmt_option.pipeline_options:
            return None

        pipeline_cls = fmt_option.pipeline_cls
        options_hash = self.get_pipeline_hash(fmt_option.pipeline_options)
        key = (pipeline_cls, options_hash)

        if key not in self.initialized_pipelines:
            self.initialized_pipelines[key] = pipeline_cls(pipeline_options=fmt_option.pipeline_options)

        return self.initialized_pipelines[key]

    @validate_call(config=ConfigDict(strict=True))
    def convert_single(self, source: Union[Path, str, DocumentStream], **kwargs) -> ConversionResult:
        return next(self.convert_multiple([source], **kwargs))

    @validate_call(config=ConfigDict(strict=True))
    def convert_multiple(
        self,
        source: Iterable[Union[Path, str, DocumentStream]],
        headers: Optional[Dict[str, str]] = None,
        raises_on_error: bool = True,
        max_num_pages: int = sys.maxsize,
        max_file_size: int = sys.maxsize,
        page_range: PageRange = DEFAULT_PAGE_RANGE,
    ) -> Iterator[ConversionResult]:
        limits = DocumentLimits(max_num_pages=max_num_pages, max_file_size=max_file_size, page_range=page_range)
        conv_input = _DocumentConversionInput(path_or_stream_iterator=source, limits=limits, headers=headers)

        for batch in chunkify(conv_input.docs(self.format_to_options), settings.perf.doc_batch_size):
            for result in map(partial(self.process_document, raises_on_error=raises_on_error), batch):
                yield result

    def process_document(self, in_doc: InputDocument, raises_on_error: bool) -> ConversionResult:
        if in_doc.format not in self.allowed_formats:
            error_msg = f"Unsupported format: {in_doc.file}"
            if raises_on_error:
                raise ConversionError(error_msg)
            return ConversionResult(
                input=in_doc,
                status=ConversionStatus.SKIPPED,
                errors=[ErrorItem(component_type=DoclingComponentType.USER_INPUT, module_name="", error_message=error_msg)]
            )

        pipeline = self.get_or_create_pipeline(in_doc.format)
        if not pipeline:
            if raises_on_error:
                raise ConversionError(f"No pipeline available for: {in_doc.file}")
            return ConversionResult(input=in_doc, status=ConversionStatus.FAILURE)

        return pipeline.execute(in_doc, raises_on_error=raises_on_error)

# --- Streamlit UI ---
st.set_page_config(page_title="Automated Metadata Generation", layout="centered", page_icon="📄")

st.markdown("## 📄 Automated Meta Data Generation")
st.caption("Easily convert documents into clean, structured Markdown.")

with st.container():
    st.markdown("### 📤 Upload Your Document")
    uploaded_file = st.file_uploader("Supported formats: PDF, DOCX, XLSX, PPTX, MD, HTML, TXT, etc.", type=None)

if uploaded_file:
    file_suffix = Path(uploaded_file.name).suffix.lower()
    with tempfile.NamedTemporaryFile(delete=False, suffix=file_suffix) as tmp_file:
        tmp_file.write(uploaded_file.read())
        tmp_path = Path(tmp_file.name)

    if file_suffix == ".txt":
        md_path = tmp_path.with_suffix(".md")
        shutil.copy(tmp_path, md_path)
        tmp_path = md_path

    converter = DocumentConverter()

    with st.spinner("⚙️ Converting your document..."):
        try:
            result = converter.convert_single(tmp_path)
            md = result.document.export_to_markdown()
            st.success("✅ Conversion successful!")
            st.markdown("---")
            st.markdown("### 📝 Markdown Output")
            st.markdown(md, unsafe_allow_html=True)
            st.download_button("⬇️ Download Markdown File", md, file_name="converted.md", mime="text/markdown")
        except Exception as e:
            st.error(f"❌ Error during conversion: `{str(e)}`")
else:
    st.info("📎 Please upload a document to begin the conversion.")
# cd C:\Users\arhat\OneDrive\Desktop\metadata_genai
# python -m jupyter nbconvert --to script document_converter_ui.ipynb
# streamlit run document_converter_ui.py
# the code runs

2025-06-25 21:55:55.456 
  command:

    streamlit run C:\Users\arhat\AppData\Roaming\Python\Python311\site-packages\ipykernel_launcher.py [ARGUMENTS]
