## Environment Variables

In [1]:
# Storage stuff
PROCESSED_FOLDER = "processed_data"
EXTRACTION_FOLDER = "extracted_data"

# Database stuff
LOCAL_DB_PATH = "mydatabase.db"

# Vectorization stuff
VECT_MODEL_NAME = "MrLight/dse-qwen2-2b-mrl-v1"
VECT_MODEL_LOCAL_PATH = "weights_vect/"

## Imports

In [2]:
%pip install llama-index
%pip install llama-index-embeddings-huggingface
%pip install datasets
%pip install sqlite_vec
%pip install pymupdf


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m25.0.1[0m[39;49m -> [0m[32;49m25.1.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython3.11 -m pip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m25.0.1[0m[39;49m -> [0m[32;49m25.1.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython3.11 -m pip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m25.0.1[0m[39;49m -> [0m[32;49m25.1.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython3.11 -m pip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.



In [3]:
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from typing import List, Tuple, Dict, Any, Set, Optional, Deque
from huggingface_hub import snapshot_download
from datasets import load_dataset
from collections import deque
from datetime import datetime
from uuid import uuid4, UUID
import concurrent.futures
from io import BytesIO
from PIL import Image
import numpy as np
import sqlite_vec
import traceback
import aiosqlite
import argparse
import tempfile
import asyncio
import sqlite3
import struct
import torch
import fitz
import time
import json
import tqdm
import sys
import os
import io

  from .autonotebook import tqdm as notebook_tqdm


## Create DB

In [4]:
conn = sqlite3.connect(LOCAL_DB_PATH)
conn.enable_load_extension(True)
sqlite_vec.load(conn)
conn.enable_load_extension(False)

vec_version, = conn.execute("select vec_version()").fetchone()
print(f"vec_version={vec_version}")

cursor = conn.cursor()



# Create regular documents table
cursor.execute("""
CREATE TABLE documents (
    document_id TEXT PRIMARY KEY,
    title TEXT NOT NULL,
    upload_date DATETIME DEFAULT CURRENT_TIMESTAMP,
    total_pages INTEGER NOT NULL
)
""")

# Create regular page_images table
cursor.execute("""
CREATE TABLE page_images (
    page_id TEXT PRIMARY KEY,
    document_id TEXT NOT NULL,
    page_number INTEGER NOT NULL,
    page_text TEXT NOT NULL,
    latex_code TEXT,
    FOREIGN KEY (document_id) REFERENCES documents(document_id)
)
""")

# Create virtual table for page_images' vector data
cursor.execute("""
CREATE VIRTUAL TABLE page_images_vectors USING vec0(
    page_id TEXT PRIMARY KEY,
    vector_data FLOAT[1536]
)
""")

# Create trigger to delete from page_images_vectors when a page_image is deleted
cursor.execute("""
CREATE TRIGGER delete_page_images_vector
AFTER DELETE ON page_images
BEGIN
    DELETE FROM page_images_vectors WHERE page_id = OLD.page_id;
END;
""")

conn.commit()
conn.close()

vec_version=v0.1.6


OperationalError: table documents already exists

## Extract from dataset

In [6]:
def ensure_directory_exists(directory):
    """Create directory if it doesn't exist."""
    if not os.path.exists(directory):
        os.makedirs(directory)
        print(f"Created directory: {directory}")



def process_sample(sample, index, output_dir):
    """Process a single sample from the dataset."""
    try:
        # Extract image and save as JPG
        img_bytes = sample["page_image"]
        img = Image.open(BytesIO(img_bytes))
        img_path = os.path.join(output_dir, f"sample_{index}.jpg")
        img.save(img_path, optimize=True)
        
        # Extract LaTeX content and save as TXT
        latex_content = sample["latex_content"]
        txt_path = os.path.join(output_dir, f"sample_{index}.txt")
        with open(txt_path, "w", encoding="utf-8") as f:
            f.write(latex_content)
        
        return True
    except Exception as e:
        print(f"Error processing sample {index}: {str(e)}")
        return False



def extract_dataset(num_samples, output_dir, batch_size=4):
    """Extract images and LaTeX code from the ArXiv-tables dataset."""
    # Ensure output directory exists
    ensure_directory_exists(output_dir)
    
    print(f"Loading dataset 'staghado/ArXiv-tables'...")
    dataset = load_dataset("staghado/ArXiv-tables", split="train")
    
    # Determine actual number of samples to extract
    available_samples = len(dataset)
    num_samples = min(num_samples, available_samples)
    
    print(f"Dataset loaded successfully! Total available samples: {available_samples}")
    print(f"Extracting {num_samples} samples to {output_dir}...")
    
    successful = 0
    
    # Use ThreadPoolExecutor for parallel processing
    with concurrent.futures.ThreadPoolExecutor(max_workers=batch_size) as executor:
        # Create a dictionary of futures to their index
        future_to_idx = {
            executor.submit(process_sample, dataset[i], i, output_dir): i 
            for i in range(num_samples)
        }
        
        # Process as completed with progress bar
        for future in tqdm.tqdm(concurrent.futures.as_completed(future_to_idx), 
                               total=num_samples, 
                               desc="Extracting samples"):
            idx = future_to_idx[future]
            try:
                if future.result():
                    successful += 1
            except Exception as e:
                print(f"Exception occurred while processing sample {idx}: {str(e)}")
    
    print(f"Extraction complete! Successfully processed {successful}/{num_samples} samples.")
    print(f"Files saved to: {output_dir}")



def main():
    """Main entry point."""
    extract_dataset(
        num_samples=10,
        output_dir=EXTRACTION_FOLDER,
        batch_size=4
    )



if __name__ == "__main__":
    main()

Loading dataset 'staghado/ArXiv-tables'...
Dataset loaded successfully! Total available samples: 1328
Extracting 10 samples to extracted_data...


Extracting samples: 100%|██████████| 10/10 [00:00<00:00, 67.50it/s]

Extraction complete! Successfully processed 10/10 samples.
Files saved to: extracted_data





## Download embedding model

In [7]:
if not os.path.exists(VECT_MODEL_LOCAL_PATH):
    print(f"Downloading model to {VECT_MODEL_LOCAL_PATH}")
    snapshot_download(
        repo_id="MrLight/dse-qwen2-2b-mrl-v1",
        local_dir=VECT_MODEL_LOCAL_PATH,
        local_dir_use_symlinks=False
    )
else:
    print(f"Model already exists at {VECT_MODEL_LOCAL_PATH}")

Model already exists at weights_vect/


## Load embedding Model (takes 6m on M4 Pro (a lot))

In [9]:
if torch.backends.mps.is_available():
    DEVICE = "mps"
    print("MPS backend is available. Using Apple Silicon GPU.")
elif torch.cuda.is_available():
    DEVICE = "cuda"
    print(f"CUDA Available: {torch.cuda.is_available()}")
    try:
        print(f"Device: {torch.cuda.current_device()}")
        print(f"Device Name: {torch.cuda.get_device_name(0)}")
    except Exception as e:
        print(f"Could not get CUDA device details: {e}")
else:
    DEVICE = "cpu"
    print("MPS and CUDA not available. Using CPU.")

print(f"Loading model: {VECT_MODEL_LOCAL_PATH}")
model = HuggingFaceEmbedding(
    model_name=VECT_MODEL_LOCAL_PATH,
    device=DEVICE,
    trust_remote_code=True,
    local_files_only=True
)
print(model)

No sentence-transformers model found with name weights_vect/. Creating a new one with mean pooling.


MPS backend is available. Using Apple Silicon GPU.
Loading model: weights_vect/
model_name='weights_vect/' embed_batch_size=10 callback_manager=<llama_index.core.callbacks.base.CallbackManager object at 0x309546650> num_workers=None max_length=32768 normalize=True query_instruction=None text_instruction=None cache_folder=None show_progress_bar=False


## Functions for vectorization

In [32]:
async def embed_text(request: dict):
    try:
        embeddings = []
        
        for text in request["texts"]:
            print(f"Processing user query: {text}\n")
            
            query_embedding = model.get_query_embedding(text)
            
            if isinstance(query_embedding, np.ndarray):
                query_embedding = query_embedding.tolist()
            
            embeddings.append(query_embedding)

        return embeddings

    except Exception as e:
        import traceback
        print(f"Error details: {traceback.format_exc()}")
        raise Exception(f"Text embedding failed: {str(e)}")



async def embed_images(files: dict):
    try:
        if len(files) != 1:
            raise ValueError("Expected exactly one file for embedding")
        file_tuple = list(files.values())[0]
        _, image_bytes, _ = file_tuple
        
        # Save to a temporary file
        with tempfile.NamedTemporaryFile(delete=False, suffix='.jpg') as tmp_file:
            tmp_file.write(image_bytes)
            tmp_file_path = tmp_file.name
        
        print(f"Processing image from temporary file: {tmp_file_path}")
        
        # Generate embedding using the file path
        image_embedding = model.get_image_embedding(tmp_file_path)
        
        # Clean up the temporary file
        os.unlink(tmp_file_path)
        
        if isinstance(image_embedding, np.ndarray):
            image_embedding = image_embedding.tolist()
        
        return image_embedding

    except Exception as e:
        import traceback
        print(f"Error details: {traceback.format_exc()}")
        raise Exception(f"Image embedding failed: {str(e)}")

## Fill DB

### Functions for processing status

In [33]:
def _calculate_estimated_time(embedding_queue, document_id, total_pages, processed_pages) -> str:
    """Calculate estimated completion time based on current processing rate"""
    if document_id not in embedding_queue.document_start_times:
        return "Unknown"
        
    if processed_pages == 0:
        return "Calculating..."
    
    start_time = embedding_queue.document_start_times[document_id]
    current_time = time.time()
    elapsed_time = current_time - start_time
    
    # Calculate pages per second
    pages_per_second = processed_pages / elapsed_time
    
    if pages_per_second <= 0:
        return "Unknown"
        
    # Calculate remaining time
    remaining_pages = total_pages - processed_pages
    remaining_seconds = remaining_pages / pages_per_second
    
    # Format remaining time
    if remaining_seconds < 60:
        return f"Less than a minute"
    elif remaining_seconds < 3600:
        minutes = int(remaining_seconds / 60)
        return f"About {minutes} minute{'s' if minutes > 1 else ''}"
    else:
        hours = int(remaining_seconds / 3600)
        return f"About {hours} hour{'s' if hours > 1 else ''}"



async def get_document_processing_status(embedding_queue, document_id: str) -> Optional[Dict[str, Any]]:
    """
    Get detailed processing status for a specific document.

    Args:
        document_id (str): The unique identifier of the document.

    Returns:
        Optional[Dict[str, Any]]: A dictionary with processing status or None if not found.
    """
    # Check if document is actively being processed
    is_in_progress = (document_id in embedding_queue.document_total_pages)
    
    if not is_in_progress:
        # Check database for completed document
        async with aiosqlite.connect(LOCAL_DB_PATH) as conn:
            await conn._execute(conn._conn.enable_load_extension, True)
            await conn._execute(sqlite_vec.load, conn._conn)

            query = """
                SELECT 
                    d.total_pages,
                    (SELECT COUNT(*) FROM page_images_vectors piv 
                     WHERE piv.page_id IN (
                         SELECT pi.page_id FROM page_images pi 
                         WHERE pi.document_id = d.document_id
                     )) as processed_pages
                FROM documents d
                WHERE d.document_id = ?
            """
            async with conn.execute(query, (document_id,)) as cursor:
                record = await cursor.fetchone()
                if record:
                    total_pages, processed_pages = record
                    
                    # A document is considered complete if it has processed pages 
                    # and has been fully processed through the pipeline
                    is_complete = processed_pages > 0
                    
                    return {
                        "status": "completed" if is_complete else "not_found",
                        "total_pages": total_pages,
                        "processed_pages": processed_pages,
                        "progress_percentage": round((processed_pages / total_pages) * 100, 2)
                    }
            await conn._execute(conn._conn.enable_load_extension, False)
        return None

    # Document is currently being processed
    total_pages = embedding_queue.document_total_pages[document_id]
    processed_pages = embedding_queue.processed_pages_count.get(document_id, 0)
    
    # Calculate estimated time based on current processing rate
    estimated_time = _calculate_estimated_time(embedding_queue, document_id, total_pages, processed_pages)
    
    return {
        "status": "processing",
        "total_pages": total_pages,
        "processed_pages": processed_pages,
        "progress_percentage": round((processed_pages / total_pages) * 100, 2),
        "estimated_completion_time": estimated_time
    }

### Functions to store PDF files in DB

In [34]:
async def _db_store_pdf_data(
    conn: aiosqlite.Connection,
    document_id: str,
    title: str,
    total_pages: int
) -> None:
    print(f"Inputs to store pdf data: {document_id}, {title}, {total_pages}")

    query = """
        INSERT INTO documents (
            document_id, title, total_pages
        ) VALUES (?, ?, ?)
    """
    await conn.execute(query, (
        document_id, title, total_pages
    ))



async def _db_store_image_with_vector(
    conn: aiosqlite.Connection,
    document_id: str,
    page_number: int,
    page_text: str,
    vector_data: Optional[List[float]] = None,
    page_id: Optional[str] = None,
    latex_code: Optional[str] = None
) -> str:
    print(f"Vector data: {vector_data}")

    if page_id is None:
        page_id = str(uuid4())

    query = """
        INSERT INTO page_images (
            page_id, document_id, page_number, page_text, latex_code
        ) VALUES (?, ?, ?, ?, ?)
    """
    await conn.execute(query, (page_id, document_id, page_number, page_text, latex_code))

    if vector_data:
        vector_query = """
            INSERT INTO page_images_vectors (page_id, vector_data)
            VALUES (?, ?)
        """
        await conn.execute(vector_query, (page_id, vector_data))

    return page_id



async def db_store_pdf_file(
    document_id: str,
    title: str,
    page_texts: List[str],
    vectors: Optional[List[List[float]]] = None,
    page_ids: Optional[List[str]] = None,
    latex_code: Optional[List[str]] = None,
) -> None:
    async with aiosqlite.connect(LOCAL_DB_PATH) as conn:
        await conn.execute("BEGIN")
        try:
            await _db_store_pdf_data(
                conn, document_id, title, len(page_texts)
            )
            print(f"Storing {len(page_texts)} images and vectors")
            print(f"Vectors: {vectors}")
            for i, page_text in enumerate(page_texts):
                page_vector = vectors[i] if vectors and i < len(vectors) else None
                page_id = page_ids[i] if page_ids and i < len(page_ids) else None
                await _db_store_image_with_vector(
                    conn, document_id, i, page_text, page_vector, page_id, latex_code
                )
            await conn.commit()
        except Exception as e:
            await conn.rollback()
            raise ValueError(f"Failed to store PDF file: {str(e)}")

### Functions to store page vectors in DB

In [35]:
async def store_page_vector(document_id: str, page_number: int, vector: list, page_id: Optional[str] = None) -> None:
    """Store page vector in database with page_id if provided"""
    print(f"[{datetime.now()}] Storing page vector in database - Document: {document_id}, Page: {page_number}")
    print(f"SAMPLE VECTOR: {vector[:1]}")  # Log first element for verification

    # Ensure vector length matches the expected dimension (1536)
    if len(vector) != 1536:
        raise ValueError(f"Vector length {len(vector)} does not match expected dimension 1536")

    # Convert vector to binary blob
    vector_blob = struct.pack(f'<{len(vector)}f', *vector)

    async with aiosqlite.connect(LOCAL_DB_PATH) as conn:
        await conn._execute(conn._conn.enable_load_extension, True)
        await conn._execute(sqlite_vec.load, conn._conn)

        if page_id:
            # Use page_id if provided
            query = """
                INSERT OR REPLACE INTO page_images_vectors (page_id, vector_data) VALUES (?, ?)
            """
            try:
                await conn.execute(query, (page_id, vector_blob))
                await conn.commit()
                print(f"[{datetime.now()}] Successfully stored page vector - Document: {document_id}, Page: {page_number}, Page ID: {page_id}")
            except Exception as e:
                print(f"[{datetime.now()}] Database error while storing page vector: {str(e)}")
                raise
            finally:
                await conn._execute(conn._conn.enable_load_extension, False)
        else:
            # Fall back to document_id and page_number
            select_query = """
                SELECT page_id FROM page_images 
                WHERE document_id = ? AND page_number = ?
            """
            async with conn.execute(select_query, (document_id, page_number)) as cursor:
                row = await cursor.fetchone()
                if row:
                    page_id = row[0]
                    query = """
                        INSERT OR REPLACE INTO page_images_vectors (page_id, vector_data) VALUES (?, ?)
                    """
                    try:
                        await conn.execute(query, (page_id, vector_blob))
                        await conn.commit()
                        print(f"[{datetime.now()}] Successfully stored page vector - Document: {document_id}, Page: {page_number}")
                    except Exception as e:
                        print(f"[{datetime.now()}] Database error while storing page vector: {str(e)}")
                        raise
                    finally:
                        await conn._execute(conn._conn.enable_load_extension, False)
                else:
                    print(f"[{datetime.now()}] ERROR: Page not found for Document: {document_id}, Page: {page_number}")
                    await conn._execute(conn._conn.enable_load_extension, False)
                    raise ValueError("Page not found")

### Class embedding Queue

In [36]:
class EmbeddingQueue:
    def __init__(self):
        self.image_queue: Deque[Dict[str, Any]] = deque()
        self.current_task = None
        self.lock = asyncio.Lock()
        self.processing = False
        self.processed_pages_count: Dict[str, int] = {}  # Track number of processed pages per document
        self.document_total_pages: Dict[str, int] = {}
        self.failed_pages: Dict[str, List[int]] = {}
        self.document_start_times: Dict[str, float] = {}

        print(f"[{datetime.now()}] Initialized EmbeddingQueue instance")


    def start_background_tasks(self):
        """Start background tasks that require a running event loop"""
        asyncio.create_task(self._check_stalled_documents_periodically())
        print(f"[{datetime.now()}] Started background task for checking stalled documents")


    async def _check_stalled_documents_periodically(self):
        """Periodically check for and process stalled documents"""
        while True:
            try:
                await asyncio.sleep(300)  # Check every 5 minutes
                await self._check_stalled_documents()
            except Exception as e:
                print(f"[{datetime.now()}] Error in stalled document checker: {str(e)}")


    def _start_stalled_checker(self):
        """Start a background task to periodically check for stalled documents"""
        asyncio.create_task(self._check_stalled_documents_periodically())


    async def _check_stalled_documents(self):
        """Check for documents that have stalled processing and compute their vectors"""
        current_time = time.time()
        documents_to_process = []
        
        print(f"[{datetime.now()}] Checking for stalled documents...")
        
        async with self.lock:
            for doc_id in list(self.processed_pages_count.keys()):
                # Skip documents that are already completed
                if doc_id not in self.document_total_pages:
                    continue
                    
                # Check if this document has been processing for more than 10 minutes
                if doc_id not in self.document_start_times:
                    self.document_start_times[doc_id] = current_time
                    continue
                    
                time_processing = current_time - self.document_start_times[doc_id]
                if time_processing < 600:  # Less than 10 minutes
                    continue
                
                # Calculate percentage processed
                processed_count = self.processed_pages_count[doc_id]
                total_expected = self.document_total_pages[doc_id]
                percent_processed = (processed_count / total_expected) * 100
                
                # Get count of pages still in queue for this document
                pages_in_queue = 0
                for task in self.image_queue:
                    if task['document_id'] == doc_id:
                        pages_in_queue += 1
                
                # If no tasks are pending and we have at least 70% of pages, 
                # or if more than 20 minutes have passed and we have at least 50% of pages
                stalled_timeout = (pages_in_queue == 0 and percent_processed >= 70) or \
                                (time_processing >= 1200 and percent_processed >= 50)
                                
                if stalled_timeout:
                    print(f"[{datetime.now()}] Document {doc_id} appears stalled:")
                    print(f"- Processing time: {time_processing:.1f} seconds")
                    print(f"- Processed {processed_count}/{total_expected} pages ({percent_processed:.1f}%)")
                    print(f"- Pages still in queue: {pages_in_queue}")
                    
                    # Document is considered complete if stalled conditions are met
                    self._mark_document_complete(doc_id, processed_count, total_expected)
        
        # Process any stalled documents (outside of lock)
        for doc_id in documents_to_process:
            print(f"[{datetime.now()}] Processing stalled document: {doc_id}")
            try:
                await self._process_document_vector(doc_id)
            except Exception as e:
                print(f"[{datetime.now()}] Error processing stalled document {doc_id}: {str(e)}")


    async def add_image_task(self, document_id: str, document_title: str, image_bytes: bytes, page_number: int, total_pages: int, page_id: Optional[str] = None):
        """Add a new image embedding task to the queue"""
        print(f"[{datetime.now()}] Adding new task - Document ID: {document_id}, Page: {page_number+1}/{total_pages}")

        if document_id not in self.processed_pages_count:
            self.processed_pages_count[document_id] = 0
            self.document_total_pages[document_id] = total_pages
            self.failed_pages[document_id] = []
            self.document_start_times[document_id] = time.time()
            print(f"[{datetime.now()}] Initialized new document tracking - ID: {document_id}, Total Pages: {total_pages}")

        task = {
            'document_id': document_id,
            'document_title': document_title,
            'image_bytes': image_bytes,
            'page_number': page_number,
            'page_id': page_id,
            'timestamp': time.time(),
            'retry_count': 0
        }
        self.image_queue.append(task)

        queue_stats = {
            'queue_size': len(self.image_queue),
            'documents_in_progress': len(self.processed_pages_count),
            'current_document': document_id,
            'page_number': page_number
        }
        print(f"[{datetime.now()}] Task added to queue. Queue stats: {json.dumps(queue_stats)}")

        if not self.processing:
            print(f"[{datetime.now()}] Queue processor not running. Starting new processing task.")
            asyncio.create_task(self.process_queue())


    async def process_queue(self):
        """Process all tasks in the queue with improved error handling and retries"""
        if self.processing:
            print(f"[{datetime.now()}] Queue processor already running")
            return

        self.processing = True
        print(f"[{datetime.now()}] Starting queue processor")

        failed_tasks = []  # Track failed tasks for retry

        while self.image_queue:
            async with self.lock:
                task = self.image_queue.popleft()
                self.current_task = task

            document_id = task['document_id']
            page_number = task['page_number']
            page_id = task.get('page_id')
            wait_time = time.time() - task['timestamp']

            print(f"""[{datetime.now()}] Starting task processing:
                - Document ID: {document_id}
                - Page: {page_number}
                - Queue size: {len(self.image_queue)}
                - Wait time: {wait_time:.2f} seconds""")

            try:
                processing_start = time.time()
                files = {
                    'files': ('image.png', task['image_bytes'], 'image/png')
                }

                try:
                    print(f"[{datetime.now()}] Calling embed_image API for document {document_id}, page {page_number}")
                    vector = await embed_images(files)  # This has built-in retry logic
                    
                    # Store vector in database
                    await store_page_vector(document_id, page_number, vector, page_id)
                    
                    # Update processed page count
                    self.processed_pages_count[document_id] = self.processed_pages_count.get(document_id, 0) + 1
                    
                    # Check if document is complete
                    await self._check_document_completion(document_id)
                    
                    processing_time = time.time() - processing_start

                    print(f"""[{datetime.now()}] Task completed successfully:
                        - Document ID: {document_id}
                        - Page: {page_number}
                        - Processing time: {processing_time:.2f} seconds
                        - Vector size: {len(vector)}""")

                except Exception as e:
                    print(f"[{datetime.now()}] Error embedding image for document {document_id}, page {page_number}: {str(e)}")
                    
                    # Track this failed page
                    if document_id in self.failed_pages:
                        self.failed_pages[document_id].append(page_number)
                    
                    # Add to failed tasks queue for later retry if under retry limit
                    retry_count = task.get('retry_count', 0) + 1
                    if retry_count <= 3:  # Limit retries to 3 attempts
                        task['retry_count'] = retry_count
                        task['last_error'] = str(e)
                        task['last_attempt'] = time.time()
                        failed_tasks.append(task)
                        print(f"[{datetime.now()}] Added to retry queue (attempt {retry_count}/3)")
                    else:
                        print(f"[{datetime.now()}] Max retries exceeded for document {document_id}, page {page_number}")
                        # Check if we should compute document vector despite this failure
                        await self._check_document_completion(document_id)

            except Exception as e:
                print(f"[{datetime.now()}] Error processing task for document {document_id}, page {page_number}: {str(e)}")
            finally:
                self.current_task = None

        # Process failed tasks if any
        if failed_tasks:
            print(f"[{datetime.now()}] Processing {len(failed_tasks)} failed tasks after a delay")
            # Wait a bit before retrying
            await asyncio.sleep(10)
            # Add failed tasks back to the queue
            async with self.lock:
                for task in failed_tasks:
                    self.image_queue.append(task)
            # Process the queue again
            return await self.process_queue()
            
        self.processing = False
        print(f"[{datetime.now()}] Queue processor finished - no more tasks in queue")


    async def _check_document_completion(self, document_id: str):
        """Check if a document's page processing is complete"""
        if document_id not in self.processed_pages_count or document_id not in self.document_total_pages:
            return
            
        total_pages = self.document_total_pages[document_id]
        processed_pages = self.processed_pages_count[document_id]
        
        # Calculate what percentage of pages are done
        percent_complete = (processed_pages / total_pages) * 100
        
        # Count pages still in queue for this document
        pages_in_queue = 0
        for task in self.image_queue:
            if task['document_id'] == document_id:
                pages_in_queue += 1
        
        # Document is considered complete if either:
        # 1. All pages processed successfully (100%)
        # 2. No more pages in queue AND at least 70% processed
        # 3. At least 95% processed (regardless of queue)
        is_complete = (
            (processed_pages == total_pages) or
            (pages_in_queue == 0 and percent_complete >= 70) or
            (percent_complete >= 95)
        )
        
        if is_complete:
            print(f"[{datetime.now()}] Document processing complete: {document_id} with {processed_pages}/{total_pages} pages ({percent_complete:.1f}%)")
            self._mark_document_complete(document_id, processed_pages, total_pages)


    def _mark_document_complete(self, document_id: str, processed_pages: int, total_pages: int):
        """Mark a document as complete and clean up tracking data"""
        print(f"[{datetime.now()}] Marking document as complete: {document_id}")
        print(f"[{datetime.now()}] - Processed pages: {processed_pages}/{total_pages}")
        
        # Clean up tracking data
        if document_id in self.processed_pages_count:
            del self.processed_pages_count[document_id]
        if document_id in self.document_total_pages:
            del self.document_total_pages[document_id]
        if document_id in self.failed_pages:
            del self.failed_pages[document_id]
        if document_id in self.document_start_times:
            del self.document_start_times[document_id]


    async def pause_current_task(self):
        """Pause the current task and return it to the queue"""
        async with self.lock:
            if self.current_task:
                print(f"[{datetime.now()}] Pausing current task and returning to queue")
                self.image_queue.appendleft(self.current_task)
                self.current_task = None
            self.processing = False
            print(f"[{datetime.now()}] Queue processing paused")


    async def resume_processing(self):
        """Resume queue processing if paused"""
        async with self.lock:
            if not self.processing and self.image_queue:
                print(f"[{datetime.now()}] Resuming queue processing")
                self.processing = True
                try:
                    await self.process_queue()
                except Exception as e:
                    print(f"[{datetime.now()}] Error during queue processing: {str(e)}")
                    self.processing = False

#### Start it

In [37]:
print(f"[{datetime.now()}] Initializing global embedding queue")
embedding_queue = EmbeddingQueue()

[2025-05-25 17:39:33.946785] Initializing global embedding queue
[2025-05-25 17:39:33.946865] Initialized EmbeddingQueue instance


### Run the filling process

In [38]:
# Global tracking of documents being processed
documents_in_process: Dict[str, Dict[str, Any]] = {}
# Set to track documents that have been completed
completed_documents: Set[str] = set()



async def process_page(page, zoom: float = 1.5) -> Tuple[bytes, str]:
    """Process a single page to extract image and text"""
    mat = fitz.Matrix(zoom, zoom)
    pix = page.get_pixmap(matrix=mat)
    img_bytes = pix.tobytes("jpeg")
    text = page.get_text()
    return img_bytes, text



async def pdf_to_images_and_text(pdf_path: str) -> Tuple[List[bytes], List[str]]:
    """Convert PDF to list of images and text with better error handling"""
    doc = fitz.open(pdf_path)
    try:
        # Process pages in chunks to avoid memory issues
        chunk_size = 4
        pages = [page for page in doc]
        results = []
        
        for i in range(0, len(pages), chunk_size):
            chunk = pages[i:i + chunk_size]
            try:
                chunk_results = await asyncio.gather(
                    *(process_page(page) for page in chunk),
                    return_exceptions=True
                )
                
                # Handle any exceptions in processing
                valid_results = []
                for j, result in enumerate(chunk_results):
                    if isinstance(result, Exception):
                        print(f"Error processing page {i+j}: {str(result)}")
                        # Add empty placeholder
                        valid_results.append((b'', ''))
                    else:
                        valid_results.append(result)
                        
                results.extend(valid_results)
            except Exception as e:
                print(f"Error processing chunk {i//chunk_size}: {str(e)}")
                # Add empty placeholders for the whole chunk
                results.extend([(b'', '') for _ in range(len(chunk))])
            
        images_bytes, texts = zip(*results) if results else ([], [])
        return list(images_bytes), list(texts)
    finally:
        doc.close()



async def image_to_image_and_text(image_path: str) -> Tuple[List[bytes], List[str]]:
    """Convert an image file to a list with one image and empty text for DB compatibility"""
    try:
        with open(image_path, 'rb') as f:
            image_bytes = f.read()
        
        # Validate that this is actually an image
        try:
            img = Image.open(io.BytesIO(image_bytes))
            # Convert to JPEG format for consistency with PDF processing
            if img.format != 'JPEG':
                byte_array = io.BytesIO()
                # Convert to RGB if needed (for PNG with transparency, etc.)
                if img.mode in ('RGBA', 'LA') or (img.mode == 'P' and 'transparency' in img.info):
                    background = Image.new('RGB', img.size, (255, 255, 255))
                    background.paste(img, mask=img if img.mode == 'RGBA' else None)
                    img = background
                img.save(byte_array, format='JPEG', optimize=True)
                image_bytes = byte_array.getvalue()
            img.close()
        except Exception as e:
            print(f"Error validating image: {str(e)}")
            raise ValueError(f"Invalid image file: {image_path}")
            
        # Return as a list to be compatible with PDF processing
        return [image_bytes], [""]  # Empty text as placeholder
    except Exception as e:
        print(f"Error processing image file {image_path}: {str(e)}")
        traceback.print_exc()
        raise



def create_thumbnail(image_data: bytes, size=(400, 400)) -> bytes:
    """Create thumbnail from image bytes with error handling"""
    try:
        image = Image.open(io.BytesIO(image_data))
        image.thumbnail(size)
        byte_array = io.BytesIO()
        image.save(byte_array, format='JPEG', optimize=True)
        return byte_array.getvalue()
    except Exception as e:
        print(f"Error creating thumbnail: {str(e)}")
        # Return a minimal valid JPEG as fallback
        return b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x01\x00H\x00H\x00\x00\xff\xdb\x00C\x00\x08\x06\x06\x07\x06\x05\x08\x07\x07\x07\t\t\x08\n\x0c\x14\r\x0c\x0b\x0b\x0c\x19\x12\x13\x0f\x14\x1d\x1a\x1f\x1e\x1d\x1a\x1c\x1c $.\' ",#\x1c\x1c(7),01444\x1f\'9=82<.342\xff\xdb\x00C\x01\t\t\t\x0c\x0b\x0c\x18\r\r\x182!\x1c!22222222222222222222222222222222222222222222222222\xff\xc0\x00\x11\x08\x00\x01\x00\x01\x03\x01"\x00\x02\x11\x01\x03\x11\x01\xff\xc4\x00\x1f\x00\x00\x01\x05\x01\x01\x01\x01\x01\x01\x00\x00\x00\x00\x00\x00\x00\x00\x01\x02\x03\x04\x05\x06\x07\x08\t\n\x0b\xff\xc4\x00\xb5\x10\x00\x02\x01\x03\x03\x02\x04\x03\x05\x05\x04\x04\x00\x00\x01}\x01\x02\x03\x00\x04\x11\x05\x12!1A\x06\x13Qa\x07"q\x142\x81\x91\xa1\x08#B\xb1\xc1\x15R\xd1\xf0$3br\x82\t\n\x16\x17\x18\x19\x1a%&\'()*456789:CDEFGHIJSTUVWXYZcdefghijstuvwxyz\x83\x84\x85\x86\x87\x88\x89\x8a\x92\x93\x94\x95\x96\x97\x98\x99\x9a\xa2\xa3\xa4\xa5\xa6\xa7\xa8\xa9\xaa\xb2\xb3\xb4\xb5\xb6\xb7\xb8\xb9\xba\xc2\xc3\xc4\xc5\xc6\xc7\xc8\xc9\xca\xd2\xd3\xd4\xd5\xd6\xd7\xd8\xd9\xda\xe1\xe2\xe3\xe4\xe5\xe6\xe7\xe8\xe9\xea\xf1\xf2\xf3\xf4\xf5\xf6\xf7\xf8\xf9\xfa\xff\xc4\x00\x1f\x01\x00\x03\x01\x01\x01\x01\x01\x01\x01\x01\x01\x00\x00\x00\x00\x00\x00\x01\x02\x03\x04\x05\x06\x07\x08\t\n\x0b\xff\xc4\x00\xb5\x11\x00\x02\x01\x02\x04\x04\x03\x04\x07\x05\x04\x04\x00\x01\x02w\x00\x01\x02\x03\x11\x04\x05!1\x06\x12AQ\x07aq\x13"2\x81\x08\x14B\x91\xa1\xb1\xc1\t#3R\xf0\x15br\xd1\n\x16$4\xe1%\xf1\x17\x18\x19\x1a&\'()*56789:CDEFGHIJSTUVWXYZcdefghijstuvwxyz\x82\x83\x84\x85\x86\x87\x88\x89\x8a\x92\x93\x94\x95\x96\x97\x98\x99\x9a\xa2\xa3\xa4\xa5\xa6\xa7\xa8\xa9\xaa\xb2\xb3\xb4\xb5\xb6\xb7\xb8\xb9\xba\xc2\xc3\xc4\xc5\xc6\xc7\xc8\xc9\xca\xd2\xd3\xd4\xd5\xd6\xd7\xd8\xd9\xda\xe2\xe3\xe4\xe5\xe6\xe7\xe8\xe9\xea\xf2\xf3\xf4\xf5\xf6\xf7\xf8\xf9\xfa\xff\xda\x00\x0c\x03\x01\x00\x02\x11\x03\x11\x00?\x00\xfe\xfe(\xa2\x8a\x00\xff\xd9'



async def save_to_local(file_content: bytes, filename: str):
    """
    Save the file content to a local directory.

    Args:
        file_content (bytes): The content of the file to save.
        filename (str): The filename to use for saving, including extension (e.g., '123.pdf' or '456_full.jpg').
    """
    # Ensure the storage directory exists
    os.makedirs(PROCESSED_FOLDER, exist_ok=True)
    
    # Construct the full file path
    file_path = os.path.join(PROCESSED_FOLDER, filename)
    
    # Use a helper function via run_in_executor to avoid blocking
    def write_file():
        with open(file_path, 'wb') as f:
            f.write(file_content)
    
    # Run the synchronous file operation in a thread pool
    loop = asyncio.get_running_loop()
    await loop.run_in_executor(None, write_file)
    
    print(f"File saved locally: {file_path}")



def check_and_resize_for_vect(image_bytes: bytes, max_pixels: int = 1000000) -> bytes:
    """Resize image if needed for vector embedding with error handling"""
    try:
        if not image_bytes:
            print("Warning: Empty image bytes provided")
            return image_bytes
            
        image = Image.open(io.BytesIO(image_bytes))
        current_pixels = image.width * image.height
        
        if current_pixels > max_pixels:
            scale_factor = (max_pixels / current_pixels) ** 0.5
            new_width = int(image.width * scale_factor)
            new_height = int(image.height * scale_factor)
            image = image.resize((new_width, new_height), Image.Resampling.LANCZOS)
            
            buffer = io.BytesIO()
            image.save(buffer, format='JPEG', optimize=True)
            return buffer.getvalue()
        
        return image_bytes
    except Exception as e:
        print(f"Error resizing image: {str(e)}")
        return image_bytes  # Return original bytes on error



async def verify_vector_in_database(page_id: UUID) -> bool:
    async with aiosqlite.connect(LOCAL_DB_PATH) as conn:
        await conn._execute(conn._conn.enable_load_extension, True)
        await conn._execute(sqlite_vec.load, conn._conn)

        conn.row_factory = aiosqlite.Row
        query = """
            SELECT EXISTS(
                SELECT 1
                FROM page_images_vectors
                WHERE page_id = ?
            ) as has_vector
        """
        async with conn.execute(query, (str(page_id),)) as cursor:
            result = await cursor.fetchone()
            await conn._execute(conn._conn.enable_load_extension, False)
            return result['has_vector'] == 1



async def retry_missing_page_vectors(document_id: UUID, page_ids: List[UUID], images: List[bytes], max_retries: int = 3) -> int:
    """Retry storing vectors for pages that are missing them"""
    retry_count = 0
    fixed_pages = 0
    
    while retry_count < max_retries:
        missing_vectors = []
        
        # Check which pages are missing vectors
        for idx, page_id in enumerate(page_ids):
            if not await verify_vector_in_database(page_id):
                if idx < len(images) and images[idx]:  # Make sure we have image data
                    missing_vectors.append((idx, page_id))
        
        if not missing_vectors:
            print(f"All vectors present for document {document_id} after retries")
            return fixed_pages
            
        print(f"Found {len(missing_vectors)} pages with missing vectors for document {document_id}, retry {retry_count + 1}/{max_retries}")
        
        # Retry each missing vector
        for idx, page_id in missing_vectors:
            try:
                # Resize image for vector embedding
                resized_image = check_and_resize_for_vect(images[idx])
                
                # Queue image for embedding with higher priority
                await embedding_queue.add_image_task(
                    str(document_id),
                    "Retry", # Title doesn't matter for retries
                    resized_image,
                    idx,
                    len(page_ids),
                    str(page_id),
                    priority=True  # Mark as high priority retry
                )
                
                print(f"Requeued page {idx} (ID: {page_id}) for document {document_id}")
                fixed_pages += 1
            except Exception as e:
                print(f"Error requeuing page {idx} for document {document_id}: {str(e)}")
        
        # Wait before checking again
        retry_delay = 60 * (retry_count + 1)  # Increase wait time with each retry
        print(f"Waiting {retry_delay} seconds before checking vectors again...")
        await asyncio.sleep(retry_delay)
        retry_count += 1
    
    return fixed_pages



async def verify_document_vector(document_id: UUID) -> bool:
    async with aiosqlite.connect(LOCAL_DB_PATH) as conn:
        await conn._execute(conn._conn.enable_load_extension, True)
        await conn._execute(sqlite_vec.load, conn._conn)

        conn.row_factory = aiosqlite.Row
        query = """
            SELECT 
                (SELECT COUNT(*) FROM page_images WHERE document_id = ?) as total_pages,
                (SELECT COUNT(*) FROM page_images_vectors piv 
                 JOIN page_images pi ON pi.page_id = piv.page_id 
                 WHERE pi.document_id = ?) as pages_with_vectors
        """
        async with conn.execute(query, (str(document_id), str(document_id))) as cursor:
            result = await cursor.fetchone()
            await conn._execute(conn._conn.enable_load_extension, False)
            
            if not result or result['total_pages'] == 0:
                return False
                
            # Document is considered complete if at least 90% of pages have vectors
            return (result['pages_with_vectors'] / result['total_pages']) >= 0.9


async def compute_and_store_document_vector(document_id: UUID) -> bool:
    async with aiosqlite.connect(LOCAL_DB_PATH) as conn:
        await conn._execute(conn._conn.enable_load_extension, True)
        await conn._execute(sqlite_vec.load, conn._conn)

        conn.row_factory = aiosqlite.Row
        
        # Check if page vectors exist
        vector_query = """
            SELECT COUNT(*) as vector_count
            FROM page_images_vectors piv
            JOIN page_images pi ON piv.page_id = pi.page_id
            WHERE pi.document_id = ?
        """
        async with conn.execute(vector_query, (str(document_id),)) as cursor:
            result = await cursor.fetchone()
        
        if not result or result['vector_count'] == 0:
            print(f"No page vectors found for document {document_id}")
            await conn._execute(conn._conn.enable_load_extension, False)
            return False
        
        # Count total pages
        pages_query = """
            SELECT COUNT(*) as page_count
            FROM page_images
            WHERE document_id = ?
        """
        async with conn.execute(pages_query, (str(document_id),)) as cursor:
            pages_result = await cursor.fetchone()
            
        total_pages = pages_result['page_count'] if pages_result else 0
        vectorized_pages = result['vector_count']
        
        # Document is considered successfully processed if at least 90% of pages have vectors
        success = total_pages > 0 and (vectorized_pages / total_pages) >= 0.9
        
        if success:
            print(f"Document {document_id} successfully processed: {vectorized_pages}/{total_pages} pages vectorized")
            # Mark document as completed in memory
            completed_documents.add(str(document_id))
        else:
            print(f"Document {document_id} not fully processed: {vectorized_pages}/{total_pages} pages vectorized")
            
        await conn._execute(conn._conn.enable_load_extension, False)
        return success



async def monitor_document_progress(document_id: UUID, filename: str, images: List[bytes], page_ids: List[UUID], total_timeout: int = 3600):
    """Monitor progress of document processing with timeout and verify vectors exist"""
    doc_str = str(document_id)
    start_time = time.time()
    last_check_time = start_time
    last_processed = 0
    
    print(f"Starting monitoring for document {doc_str} ({filename})")
    
    while True:
        try:
            # Get document status
            status = await get_document_processing_status(embedding_queue, doc_str)
            
            current_time = time.time()
            elapsed = current_time - start_time
            
            # If document is not found in queue or database
            if not status:
                if elapsed > 300:  # After 5 minutes, if not found, we have a problem
                    print(f"Document {doc_str} not found in queue or database after 5 minutes. Possible error.")
                    return False
                await asyncio.sleep(30)
                continue
                
            # If document is completed according to queue
            if status.get("status") == "completed":
                # Verify document vector exists
                if not await verify_document_vector(document_id):
                    print(f"Document {doc_str} marked as completed but missing document vector. Computing now.")
                    if await compute_and_store_document_vector(document_id):
                        print(f"Successfully created document vector for {doc_str}")
                    else:
                        print(f"Failed to create document vector for {doc_str}")
                        # Try to fix missing page vectors first
                        fixed_pages = await retry_missing_page_vectors(document_id, page_ids, images)
                        if fixed_pages > 0:
                            # Wait for vectors to be processed
                            await asyncio.sleep(60)
                            # Try computing document vector again
                            if await compute_and_store_document_vector(document_id):
                                print(f"Successfully created document vector for {doc_str} after fixing page vectors")
                            else:
                                print(f"Still failed to create document vector for {doc_str}")
                                return False
                
                print(f"Document {doc_str} ({filename}) processing completed successfully.")
                completed_documents.add(doc_str)
                return True
                
            # Check progress
            total_pages = status.get("total_pages", 0)
            processed = status.get("processed_pages", 0)
            failed = status.get("failed_pages", 0)
            progress = status.get("progress_percentage", 0)
            
            # Calculate processing rate
            time_since_last = current_time - last_check_time
            if time_since_last >= 60:  # Update every minute
                pages_per_minute = (processed - last_processed) / (time_since_last / 60) if time_since_last > 0 else 0
                last_check_time = current_time
                last_processed = processed
                
                # Calculate estimated time remaining
                pages_remaining = total_pages - processed
                estimated_minutes = pages_remaining / max(0.1, pages_per_minute) if pages_per_minute > 0 else float('inf')
                
                print(f"""Document {doc_str} ({filename}) progress:
                    - Elapsed time: {elapsed:.0f} seconds
                    - Pages: {processed}/{total_pages} ({progress:.1f}%)
                    - Failed pages: {failed}
                    - Processing rate: {pages_per_minute:.1f} pages/minute
                    - Estimated remaining: {estimated_minutes:.1f} minutes""")
                
                # If no progress for 5 minutes and we have processed pages, retry missing vectors
                if pages_per_minute < 0.1 and processed > 0 and elapsed > 300:
                    print(f"Document {doc_str} processing appears stalled. Retrying missing vectors.")
                    fixed_pages = await retry_missing_page_vectors(document_id, page_ids, images)
                    if fixed_pages > 0:
                        print(f"Requeued {fixed_pages} pages for vector processing")
            
            # Check for timeout
            if elapsed > total_timeout:
                print(f"Document {doc_str} processing timed out after {total_timeout} seconds.")
                # Force document vector calculation if we have enough pages
                if processed / total_pages >= 0.7:  # If at least 70% complete, calculate vector
                    print(f"Document {doc_str} has {processed}/{total_pages} pages processed. Forcing vector calculation.")
                    
                    # Try to fix missing page vectors first
                    fixed_pages = await retry_missing_page_vectors(document_id, page_ids, images)
                    
                    # Wait a bit for vectors to be processed
                    if fixed_pages > 0:
                        print(f"Waiting for {fixed_pages} requeued vectors to be processed...")
                        await asyncio.sleep(60)
                    
                    # Calculate document vector
                    if await compute_and_store_document_vector(document_id):
                        print(f"Successfully created document vector for timed out document {doc_str}")
                        completed_documents.add(doc_str)
                        return True
                    else:
                        print(f"Failed to create document vector for timed out document {doc_str}")
                        return False
                return False
                
            # Sleep before next check
            await asyncio.sleep(30)
            
        except Exception as e:
            print(f"Error monitoring document {doc_str}: {str(e)}")
            traceback.print_exc()
            await asyncio.sleep(30)



async def process_single_pdf(pdf_path: str, timeout: int = 3600):
    """Process a single PDF file with consistent IDs across folder and DB"""
    try:
        # Generate document ID - will be used consistently in both folder and DB
        document_id = uuid4()
        filename = os.path.basename(pdf_path)
        print(f"Processing {pdf_path}")
        
        # Track start time
        start_time = time.time()
        documents_in_process[str(document_id)] = {
            "filename": filename,
            "start_time": start_time,
            "status": "extracting"
        }

        # Extract images and text from PDF
        images, text_pages = await pdf_to_images_and_text(pdf_path)
        total_pages = len(images)

        print(f"Extracted {total_pages} images and text from {filename}")
        
        documents_in_process[str(document_id)]["status"] = "metadata"
        documents_in_process[str(document_id)]["total_pages"] = total_pages

        # Store PDF to folder with document_id
        documents_in_process[str(document_id)]["status"] = "storing"
        with open(pdf_path, 'rb') as f:
            pdf_content = f.read()
        await save_to_local(pdf_content, f"{document_id}.pdf")

        # Generate page_ids before processing to ensure consistency between folder and DB
        page_ids = [str(uuid4()) for _ in range(total_pages)]
        
        # Store in database with page_ids first to ensure DB entries exist
        documents_in_process[str(document_id)]["status"] = "database_init"
        await db_store_pdf_file(
            document_id=str(document_id),
            title=filename,
            page_texts=text_pages,
            page_ids=page_ids
        )
        
        # Track successfully processed pages
        processed_pages = []
        
        # Process and store images to folder
        documents_in_process[str(document_id)]["status"] = "image_processing"
        for page_idx, (image_data, page_id) in enumerate(zip(images, page_ids)):
            # Skip empty images (from extraction errors)
            if not image_data:
                print(f"Warning: Empty image data for page {page_idx}. Skipping folder storing.")
                continue
                
            try:
                # Use consistent page_id for both folder and database
                # store full resolution image
                await save_to_local(image_data, f"{page_id}_full.jpg")

                # Create and store thumbnail
                thumb_data = create_thumbnail(image_data)
                await save_to_local(thumb_data, f"{page_id}_thumb.jpg")

                # Queue image for embedding with the specific page_id
                resized_image = check_and_resize_for_vect(image_data)
                await embedding_queue.add_image_task(
                    str(document_id),
                    filename,
                    resized_image,
                    page_idx,
                    total_pages,
                    str(page_id)  # Pass page_id to ensure consistency
                )
                
                processed_pages.append(page_idx)
            except Exception as e:
                print(f"Error processing page {page_idx} for {filename}: {str(e)}")
                traceback.print_exc()
                # Continue with other pages despite error

        documents_in_process[str(document_id)]["status"] = "monitoring"
        # Launch background task to monitor progress (don't await it)
        monitor_task = asyncio.create_task(
            monitor_document_progress(document_id, filename, images, page_ids, timeout)
        )
        documents_in_process[str(document_id)]["monitor_task"] = monitor_task

        print(f"Successfully queued {filename} for processing")
        return document_id, monitor_task

    except Exception as e:
        print(f"Error processing {pdf_path}: {e}")
        traceback.print_exc()
        return None, None



async def process_single_image(image_path: str, timeout: int = 3600):
    """Process a single image file as a one-page document"""
    try:
        # Generate document ID - will be used consistently in both folder and DB
        document_id = uuid4()
        filename = os.path.basename(image_path)
        print(f"Processing image {image_path}")

        latex_path = image_path.replace(".jpg", ".txt")
        print(f"Looking for LaTeX file: {latex_path}")

        # Track start time
        start_time = time.time()
        documents_in_process[str(document_id)] = {
            "filename": filename,
            "start_time": start_time,
            "status": "extracting"
        }

        try:
            with open(latex_path, 'r', encoding='utf-8') as f:
                latex_code = f.read()
        except Exception as e:
            print(f"Error reading LaTeX file {latex_path}: {str(e)}")

        # Extract image bytes
        images, text_pages = await image_to_image_and_text(image_path)
        total_pages = 1  # Single page for images

        print(f"Extracted image as a single-page document: {filename}")
        documents_in_process[str(document_id)]["status"] = "metadata"
        documents_in_process[str(document_id)]["total_pages"] = total_pages

        documents_in_process[str(document_id)]["metadata"] = {
            "title": filename,
        }

        # Store original image to folder with document_id
        documents_in_process[str(document_id)]["status"] = "storing"
        
        # Determine the file extension from the original file
        file_ext = os.path.splitext(filename)[1].lower()
        await save_to_local(images[0], f"{document_id}{file_ext}")

        # Generate page_id for consistency between folder and DB
        page_ids = [str(uuid4())]
        
        # Store in database with page_id first to ensure DB entry exists
        documents_in_process[str(document_id)]["status"] = "database_init"
        await db_store_pdf_file(
            document_id=str(document_id),
            title=filename,
            page_texts=text_pages,
            page_ids=page_ids,
            latex_code=latex_code
        )
        
        # Process and store images to folder
        documents_in_process[str(document_id)]["status"] = "image_processing"
        
        try:
            # Use consistent page_id for both folder and database
            # store full resolution image
            await save_to_local(images[0], f"{page_ids[0]}_full.jpg")

            # Create and store thumbnail
            thumb_data = create_thumbnail(images[0])
            await save_to_local(thumb_data, f"{page_ids[0]}_thumb.jpg")

            # Queue image for embedding with the specific page_id
            resized_image = check_and_resize_for_vect(images[0])
            await embedding_queue.add_image_task(
                str(document_id),
                filename,
                resized_image,
                0,  # page_idx
                total_pages,
                str(page_ids[0])  # Pass page_id to ensure consistency
            )
            
        except Exception as e:
            print(f"Error processing image {filename}: {str(e)}")
            traceback.print_exc()

        documents_in_process[str(document_id)]["status"] = "monitoring"
        # Launch background task to monitor progress (don't await it)
        monitor_task = asyncio.create_task(
            monitor_document_progress(document_id, filename, images, page_ids, timeout)
        )
        documents_in_process[str(document_id)]["monitor_task"] = monitor_task

        print(f"Successfully queued image {filename} for processing")
        return document_id, monitor_task

    except Exception as e:
        print(f"Error processing {image_path}: {e}")
        traceback.print_exc()
        return None, None



async def process_pdf_folder(folder_path: str, concurrent_limit: int = 2, timeout: int = 3600):
    """Process PDFs from subfolders, using subfolder names with concurrency control"""
    successful = 0
    failed = 0
    pending_tasks = []
    
    # Create a semaphore to limit concurrent processing
    semaphore = asyncio.Semaphore(concurrent_limit)
    
    async def process_with_semaphore(file_path):
        async with semaphore:
            # Check file extension to determine processing method
            file_ext = os.path.splitext(file_path)[1].lower()
            if file_ext == '.pdf':
                return await process_single_pdf(file_path, timeout)
            elif file_ext in ('.png', '.jpg', '.jpeg', '.gif', '.bmp', '.tiff', '.webp'):
                return await process_single_image(file_path, timeout)
            else:
                print(f"Unsupported file type: {file_ext}")
                return None, None
    
    # Walk through all subfolders
    for root, dirs, files in os.walk(folder_path):
        # Find PDF, image, and text files
        pdf_files = [f for f in files if f.lower().endswith('.pdf')]
        image_files = [f for f in files if f.lower().endswith(('.png', '.jpg', '.jpeg', '.gif', '.bmp', '.tiff', '.webp'))]
        total_files = len(pdf_files) + len(image_files)
        
        if not total_files:
            print(f"No PDF or image files found in folder: {os.path.basename(root)}")
            continue
        
        print(f"\nProcessing {len(pdf_files)} PDF files, {len(image_files)} image files from folder: {os.path.basename(root)}")
        
        # Launch tasks for all PDFs in this folder
        for pdf_file in pdf_files:
            file_path = os.path.join(root, pdf_file)
            print(f"\nQueuing PDF: {pdf_file}")
            
            # Create and start the task
            task = asyncio.create_task(process_with_semaphore(file_path))
            pending_tasks.append((pdf_file, task))
        
        # Launch tasks for all image files in this folder
        for image_file in image_files:
            file_path = os.path.join(root, image_file)
            print(f"\nQueuing image: {image_file}")
            
            # Create and start the task
            task = asyncio.create_task(process_with_semaphore(file_path))
            pending_tasks.append((image_file, task))
    
    # Wait for all tasks to complete
    for file_name, task in pending_tasks:
        try:
            doc_id, monitor_task = await task
            
            if doc_id:
                print(f"Successfully queued {file_name}")
                # Wait for monitoring to complete
                if monitor_task:
                    result = await monitor_task
                    if result:
                        successful += 1
                        print(f"Successfully processed {file_name}")
                    else:
                        # Even if monitoring failed, the document might have been processed
                        if str(doc_id) in completed_documents:
                            successful += 1
                            print(f"Document {file_name} was eventually processed successfully")
                        else:
                            failed += 1
                            print(f"Failed to fully process {file_name}")
                else:
                    # No monitoring task means something failed early
                    failed += 1
            else:
                failed += 1
                print(f"Failed to process {file_name}")
                
        except Exception as e:
            failed += 1
            print(f"Error processing {file_name}: {e}")
            traceback.print_exc()
    
    # Summary
    print(f"\nProcessing complete:")
    print(f"Successfully processed: {successful}")
    print(f"Failed: {failed}")
    return successful, failed



async def main():
    """Main async function to run the whole process"""
    # Folder containing PDFs and images
    DOCS_FOLDER = EXTRACTION_FOLDER
    
    # Make sure the embedding queue background tasks are started
    # This will work because we're already inside an event loop (asyncio.run creates one)
    embedding_queue.start_background_tasks()
    
    # Run the processor
    success, failed = await process_pdf_folder(DOCS_FOLDER, concurrent_limit=2, timeout=3600)
    
    return success, failed



if __name__ == "__main__":
    # Run everything in a single asyncio.run() call
    await main()

[2025-05-25 17:39:36.188373] Started background task for checking stalled documents

Processing 0 PDF files, 10 image files from folder: extracted_data

Queuing image: sample_9.jpg

Queuing image: sample_8.jpg

Queuing image: sample_5.jpg

Queuing image: sample_4.jpg

Queuing image: sample_6.jpg

Queuing image: sample_7.jpg

Queuing image: sample_3.jpg

Queuing image: sample_2.jpg

Queuing image: sample_0.jpg

Queuing image: sample_1.jpg
Processing image extracted_data/sample_9.jpg
Looking for LaTeX file: extracted_data/sample_9.txt
Extracted image as a single-page document: sample_9.jpg
Processing image extracted_data/sample_8.jpg
Looking for LaTeX file: extracted_data/sample_8.txt
Extracted image as a single-page document: sample_8.jpg
File saved locally: processed_data/d7fb4012-dcdb-4582-842f-48abe1a1e54f.jpg
Inputs to store pdf data: d7fb4012-dcdb-4582-842f-48abe1a1e54f, sample_9.jpg, 1
File saved locally: processed_data/d6d720eb-e09e-43d6-9bdd-5363f2962b38.jpg
Storing 1 images and

### 