In [1]:
from docling.datamodel.base_models import InputFormat
from docling.document_converter import DocumentConverter, PdfFormatOption
from docling.datamodel.pipeline_options import PdfPipelineOptions, TableFormerMode
from docling_core.transforms.chunker import HierarchicalChunker
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
from pathlib import Path
import json
from tqdm import tqdm
import time
from concurrent.futures import ThreadPoolExecutor
import logging
pipeline_options = PdfPipelineOptions(do_table_structure=True)
pipeline_options.table_structure_options.mode = TableFormerMode.ACCURATE  # use more accurate TableFormer model

doc_converter = DocumentConverter(
    format_options={
        InputFormat.PDF: PdfFormatOption(pipeline_options=pipeline_options)
    }
)

ModuleNotFoundError: No module named 'docling'

In [None]:
from transformers import AutoTokenizer
from huggingface_hub import login
from PIL import Image
import torch
from colpali_engine import ColPali
from transformers.models.paligemma.configuration_paligemma import PaliGemmaConfig
import gradio as gr

def setup_models():
    
    try:
        pali_config = PaliGemmaConfig(
            vocab_size=32000,
            hidden_size=4096,
            intermediate_size=11008,
            num_hidden_layers=32,
            num_attention_heads=32,
            max_position_embeddings=8192,
            rms_norm_eps=1e-6,
            use_cache=True,
            pad_token_id=0,
            bos_token_id=1,
            eos_token_id=2,
            tie_word_embeddings=False,
            use_memory_efficient_attention=True,
            hidden_act="silu"
        )
        
        # Инициализация ColPali
        colpali = ColPali(config=pali_config)
        
        # Загружаем Qwen2-VL напрямую из transformers
        from transformers import Qwen2VLForCausalLM
        
        qwen_model = Qwen2VLForCausalLM.from_pretrained(
            "Qwen/Qwen2-VL-7B",
            device_map="auto",
            trust_remote_code=True,
            torch_dtype=torch.float16
        )
        
        qwen_tokenizer = AutoTokenizer.from_pretrained(
            "Qwen/Qwen2-VL-7B",
            trust_remote_code=True
        )
        
        return colpali, qwen_model, qwen_tokenizer
        
    except Exception as e:
        print(f"Ошибка при загрузке моделей: {str(e)}")
        raise

def process_image(image, colpali, qwen_model, qwen_tokenizer):
    try:
        # ColPali обработка
        colpali_result = colpali.process_image(image)
        
        # Qwen2-VL обработка
        qwen_inputs = qwen_tokenizer(
            text="Describe this image in detail:",
            images=image,
            return_tensors="pt"
        ).to(qwen_model.device)
        
        with torch.no_grad():
            qwen_outputs = qwen_model.generate(
                **qwen_inputs,
                max_new_tokens=100,
                num_return_sequences=1,
                do_sample=True,
                temperature=0.7,
                top_p=0.9
            )
        
        qwen_result = qwen_tokenizer.decode(qwen_outputs[0], skip_special_tokens=True)
        
        return f"""
{colpali_result}

{qwen_result}
"""
        
    except Exception as e:
        return (e)

def main():
    login_to_hf()
    
    colpali, qwen_model, qwen_tokenizer = setup_models()
    
    demo = gr.Interface(
        fn=lambda img: process_image(img, colpali, qwen_model, qwen_tokenizer),
        inputs=gr.Image(type="pil"),
        outputs=gr.Textbox(label="Результат"),
        title="ColPali + Qwen2-VL Demo",
        description="Загрузите изображение для анализа обеими моделями"
    )
    
    demo.launch(share=True)

if __name__ == "__main__":
    main()

In [2]:
import requests
import os

pdfs = {
    "MALM": "https://www.ikea.com/us/en/assembly_instructions/malm-4-drawer-chest-white__AA-2398381-2-100.pdf",
    "BILLY": "https://www.ikea.com/us/en/assembly_instructions/billy-bookcase-white__AA-1844854-6-2.pdf",
    "BOAXEL": "https://www.ikea.com/us/en/assembly_instructions/boaxel-wall-upright-white__AA-2341341-2-100.pdf",
    "ADILS": "https://www.ikea.com/us/en/assembly_instructions/adils-leg-white__AA-844478-6-2.pdf",
    "MICKE": "https://www.ikea.com/us/en/assembly_instructions/micke-desk-white__AA-476626-10-100.pdf"
}

output_dir = "data"
os.makedirs(output_dir, exist_ok=True)

for name, url in pdfs.items():
    response = requests.get(url)
    pdf_path = os.path.join(output_dir, f"{name}.pdf")

    with open(pdf_path, "wb") as f:
        f.write(response.content)

    print(f"Downloadeчd {name} to {pdf_path}")

print("Downloaded files:", os.listdir(output_dir))

Downloadeчd MALM to data/MALM.pdf
Downloadeчd BILLY to data/BILLY.pdf
Downloadeчd BOAXEL to data/BOAXEL.pdf
Downloadeчd ADILS to data/ADILS.pdf
Downloadeчd MICKE to data/MICKE.pdf
Downloaded files: ['MICKE.pdf', 'BOAXEL.pdf', 'ADILS.pdf', 'MALM.pdf', 'BILLY.pdf']


In [12]:
from docling.document_converter import DocumentConverter
from docling_core.transforms.chunker import HierarchicalChunker
from transformers import AutoModel, AutoTokenizer
import torch
from typing import List, Dict, Any
import numpy as np
from dataclasses import dataclass
from pathlib import Path
import json

In [17]:
    def _convert_document(self, document_path: str):
        print(f"\n=== Шаг 1: Конвертация документа ===")
        print(f"Исходный файл: {document_path}")
        
        result = self.converter.convert(document_path)
        
        print(f"Тип документа: {type(result.document)}")
        print(f"Количество страниц: {result.document.num_pages if hasattr(result.document, 'num_pages') else 'Неизвестно'}")
        
        return result

In [18]:
    def _get_chunks(self, doc):        
        chunks = list(self.chunker.chunk(doc))
        
        if chunks:
            print(f"Текст: {chunks[0].text[:200]}..." if hasattr(chunks[0], 'text'))
            print(f"Заголовки: {chunks[0].meta.headings if hasattr(chunks[0].meta, 'headings') )
        
        return chunks

In [19]:
    def _process_content(self, chunks: List) -> List[Dict]:
        
        processed_chunks = []
        
        for i, chunk in enumerate(chunks, 1):
            chunk_data = {
                'text': chunk.text if hasattr(chunk, 'text') else None,
                'metadata': {
                    'headings': chunk.meta.headings if hasattr(chunk.meta, 'headings') else [],
                    'page_number': chunk.meta.doc_items[0].prov[0].page_no if hasattr(chunk.meta, 'doc_items') else None,
                }
            }
            
            if hasattr(chunk.meta, 'doc_items') and chunk.meta.doc_items:
                bbox = chunk.meta.doc_items[0].prov[0].bbox
                chunk_data['metadata']['position'] = {
                    'left': bbox.l,
                    'top': bbox.t,
                    'right': bbox.r,
                    'bottom': bbox.b
                }
            
            processed_chunks.append(chunk_data)
            
            if i % 10 == 0:
                print({i}/{len(chunks)})
        
        print({len(processed_chunks)})
        return processed_chunks
        

In [20]:
    def _save_results(self, document_path: str, processed_content: List[Dict]) -> Dict:
        
        output_dir = Path("processed_documents")
        output_dir.mkdir(exist_ok=True)
        
        doc_name = Path(document_path).stem
        base_path = output_dir / doc_name
        
        # Сохраняем контент
        content_path = f"{base_path}_content.json"
        with open(content_path, 'w', encoding='utf-8') as f:
            json.dump(processed_content, f, ensure_ascii=False, indent=2)
        print(content_path})
            
        metadata = {
            'document_path': document_path,
            'num_chunks': len(processed_content),
            'output_files': {
                'content': str(content_path)
            }
        }
        
        # Сохраняем метаданные
        metadata_path = f"{base_path}_metadata.json"
        with open(metadata_path, 'w', encoding='utf-8') as f:
            json.dump(metadata, f, ensure_ascii=False, indent=2)
        print(f"Метаданные сохранены в: {metadata_path}")
            
        return metadata

In [21]:
    def process_document(self, document_path: str) -> ProcessingResult:
        doc_result = self._convert_document(document_path)
        
        chunks = self._get_chunks(doc_result.document)
        
        processed_content = self._process_content(chunks)
        
        metadata = self._save_results(document_path, processed_content)
        
        return ProcessingResult(
            chunks=processed_content,
            metadata=metadata,
            raw_doc=doc_result.document,
            chunked_doc=chunks
        )

In [None]:
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

@dataclass
class ProcessingResult:
    """Результаты обработки документа"""
    chunks: List[Dict]
    metadata: Dict
    raw_doc: Any
    processing_time: float

class DocumentProcessor:
    def __init__(self, 
                 chunk_size: int = 300,           # Уменьшенный размер чанка
                 chunk_overlap: int = 30,         # Уменьшенное перекрытие
                 max_workers: int = 4,            # Количество потоков
                 page_batch_size: int = 10):      # Размер пакета страниц
        
        self.converter = DocumentConverter()
        self.chunker = HierarchicalChunker(
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap,
            split_on_headings=True
        )
        self.max_workers = max_workers
        self.page_batch_size = page_batch_size
        self.start_time = time.time()

    def _process_page_batch(self, doc, start_page: int, end_page: int) -> List[Dict]:
        """Обработка пакета страниц"""
        try:
            # Получаем элементы только для указанных страниц
            elements = [elem for elem in doc.body.get_elements() 
                       if hasattr(elem, 'page_number') and 
                       start_page <= elem.page_number <= end_page]
            
            processed_elements = []
            for element in elements:
                elem_data = {
                    'type': 'text' if hasattr(element, 'text') else 'image',
                    'page_number': element.page_number,
                    'content': None,
                    'metadata': {}
                }
                
                # Обработка текста
                if hasattr(element, 'text'):
                    elem_data['content'] = element.text
                    if hasattr(element, 'meta'):
                        elem_data['metadata']['headings'] = element.meta.headings if hasattr(element.meta, 'headings') else []
                
                # Обработка изображения
                elif hasattr(element, 'image_data'):
                    elem_data['content'] = 'image_data_present'
                    elem_data['metadata']['image_info'] = {
                        'format': getattr(element, 'format', 'unknown'),
                        'size': getattr(element, 'size', 'unknown')
                    }
                
                processed_elements.append(elem_data)
            
            return processed_elements
            
        except Exception as e:
            logger.error(f"Ошибка при обработке страниц {start_page}-{end_page}: {str(e)}")
            return []

    def process_document(self, 
                        document_path: str, 
                        start_page: Optional[int] = None, 
                        end_page: Optional[int] = None) -> ProcessingResult:
        """Обработка документа с возможностью указания диапазона страниц"""
        self.start_time = time.time()
        logger.info(f"Начало обработки документа: {document_path}")
        
        try:
            # Конвертация документа
            doc_result = self.converter.convert(document_path)
            doc = doc_result.document
            
            # Определение диапазона страниц
            total_pages = doc.num_pages if hasattr(doc, 'num_pages') else 0
            start_page = start_page or 1
            end_page = min(end_page or total_pages, total_pages)
            
            logger.info(f"Всего страниц: {total_pages}, обрабатываем: {start_page}-{end_page}")
            
            # Разбиваем на пакеты страниц
            page_ranges = [
                (i, min(i + self.page_batch_size - 1, end_page))
                for i in range(start_page, end_page + 1, self.page_batch_size)
            ]
            
            # Параллельная обработка пакетов
            processed_chunks = []
            with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
                futures = [
                    executor.submit(self._process_page_batch, doc, start, end)
                    for start, end in page_ranges
                ]
                
                # Собираем результаты с прогресс-баром
                for future in tqdm(futures, desc="Обработка пакетов страниц"):
                    processed_chunks.extend(future.result())
            
            # Сохранение результатов
            metadata = self._save_results(document_path, processed_chunks, start_page, end_page)
            
            total_time = time.time() - self.start_time
            logger.info(f"Обработка завершена за {total_time:.2f} секунд")
            
            return ProcessingResult(
                chunks=processed_chunks,
                metadata=metadata,
                raw_doc=doc,
                processing_time=total_time
            )
            
        except Exception as e:
            logger.error(f"Ошибка при обработке документа: {str(e)}", exc_info=True)
            raise

    def _save_results(self, 
                     document_path: str, 
                     processed_content: List[Dict],
                     start_page: int,
                     end_page: int) -> Dict:
        """Сохранение результатов"""
        output_dir = Path("processed_documents")
        output_dir.mkdir(exist_ok=True)
        
        doc_name = Path(document_path).stem
        base_path = output_dir / f"{doc_name}_pages_{start_page}-{end_page}"
        
        # Сохранение контента
        content_path = f"{base_path}_content.json"
        with open(content_path, 'w', encoding='utf-8') as f:
            json.dump(processed_content, f, ensure_ascii=False, indent=2)
        
        metadata = {
            'document_path': document_path,
            'pages_processed': {
                'start': start_page,
                'end': end_page
            },
            'num_chunks': len(processed_content),
            'processing_time': time.time() - self.start_time,
            'output_files': {
                'content': str(content_path)
            }
        }
        
        # Сохранение метаданных
        metadata_path = f"{base_path}_metadata.json"
        with open(metadata_path, 'w', encoding='utf-8') as f:
            json.dump(metadata, f, ensure_ascii=False, indent=2)
            
        return metadata

if __name__ == "__main__":
    document_path = "/Users/ivan/Downloads/digital_production_5.pdf"
    
    # Создаем процессор с оптимизированными параметрами
    processor = DocumentProcessor(
        chunk_size=300,          # Небольшой размер чанка
        chunk_overlap=30,        # Минимальное перекрытие
        max_workers=4,           # Количество потоков
        page_batch_size=5        # Размер пакета страниц
    )
    
    try:
        # Обработка всего документа
        results = processor.process_document(
            document_path,
            start_page=1,    # Начальная страница
            end_page=79      # Последняя страница
        )
        
        print("\n=== Результаты обработки ===")
        print(f"Обработано чанков: {len(results.chunks)}")
        print(f"Время обработки: {results.processing_time:.2f} секунд")
        
        # Анализ результатов
        text_chunks = [c for c in results.chunks if c['type'] == 'text']
        image_chunks = [c for c in results.chunks if c['type'] == 'image']
        
        print(f"\nНайдено текстовых блоков: {len(text_chunks)}")
        print(f"Найдено изображений: {len(image_chunks)}")
        
    except Exception as e:
        logger.error(f"Ошибка при обработке: {str(e)}", exc_info=True)

2024-12-07 18:30:29,973 - INFO - Начало обработки документа: /Users/ivan/Downloads/digital_production_5.pdf
2024-12-07 18:30:30,605 - INFO - Going to convert document batch...


Fetching 9 files:   0%|          | 0/9 [00:00<?, ?it/s]

2024-12-07 18:30:45,525 - INFO - Processing document digital_production_5.pdf
