# Bulk HTML Ingestion with ColPali and Astra Late Interaction Pipeline

## Requirements

**Python version**: 3.12+ 

### Packages:

**For connecting to AstraDB**:
`astrapy==2.0.0rc`

**For convenient Late Interaction processing**:
`astra-multivector[late-interaction]`

**For converting HTML files to images**:
`weasyprint`
`pdf2image`

Alternatively you could simply install `astra-multivector[all]` and this will download the requirements necessary for this specific notebook, but there may be other dependencies that you will find unnecessary.

In [None]:
import os
from dotenv import load_dotenv

from astrapy import AsyncDatabase, DataAPIClient

from astra_multivector.late_interaction import ColPaliModel, LateInteractionPipeline


# Load Environment Variables
load_dotenv()

ASTRA_DB_APPLICATION_TOKEN = os.getenv("ASTRA_DB_APPLICATION_TOKEN")
ASTRA_DB_API_ENDPOINT = os.getenv("ASTRA_DB_API_ENDPOINT")


# Initialize Astra DB Client
astra_client: DataAPIClient = DataAPIClient(token=ASTRA_DB_APPLICATION_TOKEN)
db: AsyncDatabase = astra_client.get_async_database(api_endpoint=ASTRA_DB_API_ENDPOINT)


# Initialize ColPali Model
model = ColPaliModel(
    model_name="vidore/colpali-v1.3",
    device="mps",
)

# Initialize Late Interaction Pipeline
pipeline = LateInteractionPipeline(
    db=db,
    model=model,
    base_table_name="colpali_table",
    doc_pool_factor=10,
)



In [2]:
import logging
import sys


def setup_logging(level=logging.DEBUG):
    root_logger = logging.getLogger()
    for handler in root_logger.handlers[:]:
        root_logger.removeHandler(handler)
    
    root_logger.setLevel(level)
    
    console_handler = logging.StreamHandler(sys.stdout)
    console_handler.setLevel(level)
    
    formatter = logging.Formatter(
        '%(asctime)s - %(name)s - %(levelname)s - %(message)s',
        datefmt='%Y-%m-%d %H:%M:%S'
    )
    console_handler.setFormatter(formatter)
    
    root_logger.addHandler(console_handler)
    
    return root_logger

def config_module_logging(module_name, level=logging.DEBUG):
    logger = logging.getLogger(module_name)
    logger.setLevel(level)
    return logger

logger = setup_logging(logging.DEBUG)

config_module_logging('astra_multivector.late_interaction', logging.DEBUG)

config_module_logging('astrapy', logging.INFO)

<Logger astrapy (INFO)>

In [None]:
await pipeline.search(
    query="What are the monthly access charges for 2 lines with autopay and without autopay?",
    k=5,
    n_ann_tokens=20,
)

## HTML to Image Pipeline

ColPali requires the input documents to be images. Furthermore, the training method treated PDF pages as individual input documents, so if your HTML page is large it will be beneficial to convert it into a multi-page PDF and vectorize each page as an image.

ColPali generators 1024 + 7 vectors for each page-image. The 1024 vectors are akin to the token-wise embeddings from ColBERT, the underlying Vision Transformer partitions the image into a 32 x 32 grid. The other 7 vectors encode contextual information derived from ColPali’s special tokens (e.g., for the beginning of the sequence, and task-specific instructions like “Describe the image”).

We can use pooling to reduce the number of vectors we return per page.

In [2]:
from pathlib import PosixPath
from PIL import Image
from uuid import UUID
from typing import Dict, Generator, List, Optional, Union

from weasyprint import HTML
from pdf2image import convert_from_bytes


def html_to_pages(
    html_path: Optional[PosixPath] = None,
    html_content: Optional[str] = None,
    dpi: int = 200
) -> List[Image.Image]:
    """
    Convert HTML to multiple page images
    
    Args:
        html_path: Path to HTML file
        html_content: HTML content as string
        dpi: Resolution for the images
        
    Returns:
        List of PIL Images, one per page
    """
    if html_path:
        document = HTML(filename=html_path)
    elif html_content:
        document = HTML(string=html_content)
    else:
        raise ValueError("Either html_path or html_content must be provided")
    
    pdf_bytes = document.write_pdf()
    
    images: List[Image.Image] = convert_from_bytes(pdf_bytes, dpi=dpi)
    
    return images


def html_page_generator(
    directory_path: PosixPath
) -> Generator[Dict[str, Union[Image.Image, str]], None, None]:
    """
    Generator that yields dictionaries containing page images for each HTML file.
    
    Args:
        directory_path: Path object pointing to the directory to search
        
    Yields:
        Dict with content (page image) and image_url (source HTML path)
    """
    html_files: Generator[PosixPath, None, None] = directory_path.rglob("*html")
    
    for html_path in html_files:
        try:
            page_images: List[Image.Image] = html_to_pages(html_path)
            
            for page_image in page_images:
                yield {
                    "content": page_image,
                    "image_url": str(html_path),
                }
        except Exception as e:
            print(f"Error processing {html_path}: {str(e)}")
            continue


async def bulk_index_html_directory(
    pipeline: LateInteractionPipeline,
    directory_path: PosixPath,
    batch_size: int = 5,
    concurrency: int = 3) -> List[UUID]:
    """
    Index HTML files from a directory into the late interaction pipeline.
    
    Args:
        pipeline: LateInteractionPipeline instance
        directory_path: Path to directory containing HTML files
        batch_size: Number of pages to process in a batch
        concurrency: Maximum number of concurrent embedding operations
    
    Returns:
        List of document IDs indexed
    """
    page_dicts = html_page_generator(directory_path)
    all_doc_ids = []
    
    batch = []
    for page_dict in page_dicts:
        print(f"Indexing document {page_dict['image_url']} with {page_dict['content'].size}")
        batch.append(page_dict)
        
        if len(batch) >= batch_size:
            doc_ids = await pipeline.bulk_index_documents(
                batch, 
                embedding_concurrency=concurrency
            )
            all_doc_ids.extend(doc_ids)
            
            batch = []
            print(f"Indexed {len(doc_ids)} pages, total so far: {len(all_doc_ids)}")
    
    if batch:
        doc_ids = await pipeline.bulk_index_documents(
            batch, 
            embedding_concurrency=concurrency
        )
        all_doc_ids.extend(doc_ids)
        print(f"Indexed {len(doc_ids)} pages, total: {len(all_doc_ids)}")
    
    return all_doc_ids


## Ingest HTML Files

In [None]:
from pathlib import Path, PosixPath


file_directory: PosixPath = Path("/Users/brian.ogrady/src/python3/astra-multivector/examples/notebooks/")

await bulk_index_html_directory(
    pipeline=pipeline,
    directory_path=file_directory,
    batch_size=1,
    concurrency=None,
)

