# Ivy Ingestion Service - Google Colab Edition

This notebook processes PDF documents and uploads them to Qdrant vector database using ColQwen2.5 Omni embeddings.

## Setup Instructions

1. **Runtime**: Set runtime to GPU (Runtime ‚Üí Change runtime type ‚Üí T4 GPU or better)
2. **Google Drive**: Mount your Google Drive when prompted in the next cell
3. **Folder Structure**: Ensure these folders exist in your Google Drive:
   - `ivy/data/unprocessed/` - Place PDFs here
   - `ivy/data/processed/` - Processed PDFs will be moved here
4. **Environment Variables**: Set your Qdrant Cloud credentials when prompted

## Architecture

- **Input**: PDFs from Google Drive (`ivy/data/unprocessed/`)
- **Processing**: Convert pages to images (150 DPI) ‚Üí Generate ColQwen2.5 Omni embeddings
- **Output**: Upload to Qdrant collection "embeddings_database"
- **Archive**: Move processed PDFs to `ivy/data/processed/`

## Step 1: Install Dependencies

In [None]:
%%capture
# Install required packages
!pip install accelerate>=1.12.0
!pip install pdf2image>=1.17.0
!pip install pillow>=12.0.0
!pip install qdrant-client>=1.16.2
!pip install torch>=2.5.0
!pip install tqdm>=4.67.1
!pip install datasets>=4.4.2
!pip install git+https://github.com/illuin-tech/colpali.git

# Install poppler (required for pdf2image)
!apt-get install -y poppler-utils

print("‚úì All dependencies installed successfully!")

## Step 2: Mount Google Drive

**Instructions:**
1. Run the cell below
2. Click the link that appears
3. Sign in to your Google account
4. Copy the authorization code
5. Paste it back into the input field

Your Google Drive will be mounted at `/content/drive/MyDrive/`

In [None]:
from google.colab import drive
import os

# Mount Google Drive
drive.mount('/content/drive')

# Define base paths
DRIVE_BASE = "/content/drive/MyDrive/ivy/data"
UNPROCESSED_DIR = f"{DRIVE_BASE}/unprocessed"
PROCESSED_DIR = f"{DRIVE_BASE}/processed"

# Create directories if they don't exist
os.makedirs(UNPROCESSED_DIR, exist_ok=True)
os.makedirs(PROCESSED_DIR, exist_ok=True)

print(f"‚úì Google Drive mounted successfully!")
print(f"‚úì Unprocessed directory: {UNPROCESSED_DIR}")
print(f"‚úì Processed directory: {PROCESSED_DIR}")

## Step 3: Configure Qdrant Cloud Credentials

Set your Qdrant Cloud connection details as environment variables.

In [None]:
import os
from getpass import getpass

# Prompt for Qdrant Cloud credentials (input will be hidden)
print("Enter your Qdrant Cloud credentials:")
qdrant_host = input("Qdrant Host (e.g., xxx.gcp.cloud.qdrant.io): ").strip()
qdrant_api_key = getpass("Qdrant API Key: ").strip()

# Set as environment variables
os.environ["QDRANT_HOST"] = qdrant_host
os.environ["QDRANT_API_KEY"] = qdrant_api_key

print("‚úì Qdrant credentials configured!")

## Step 4: Import Libraries and Define Constants

In [None]:
import glob
import shutil
import uuid
from typing import List, Optional, Dict, Any

import torch
import numpy as np
from PIL import Image
from tqdm import tqdm
from pdf2image import convert_from_path, pdfinfo_from_path

from colpali_engine.models import ColQwen2_5Omni, ColQwen2_5OmniProcessor
from qdrant_client import QdrantClient, models

# Constants
QDRANT_COLLECTION_NAME = "embeddings_database"
EMBEDDING_DIMENSION = 128
PDF_DPI = 150
PAGES_PER_BATCH = 1  # Process one page at a time

print("‚úì Libraries imported successfully!")
print(f"‚úì PyTorch version: {torch.__version__}")
print(f"‚úì CUDA available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"‚úì GPU: {torch.cuda.get_device_name(0)}")

## Step 5: Define Helper Functions

In [None]:
def embed_images(
    image_batch: List[Image.Image],
    model_processor: ColQwen2_5OmniProcessor,
    model: ColQwen2_5Omni
) -> List[List[float]]:
    """
    Generate embeddings for a batch of images using ColQwen2.5 Omni.

    Creates multivector embeddings preserving spatial structure from the image.

    Args:
        image_batch: List of PIL Image objects to embed
        model_processor: ColQwen2_5OmniProcessor for image preprocessing
        model: ColQwen2_5Omni model for generating embeddings

    Returns:
        List of multivector embeddings (one per image)
    """
    # Generate embeddings without gradient computation (inference only)
    with torch.no_grad():
        processed_images = model_processor.process_images(image_batch).to(model.device)
        image_embeddings = model(**processed_images)

    original_embeddings_batch = image_embeddings.cpu().float().numpy().tolist()

    return original_embeddings_batch


def get_pdf_info(pdf_path: str) -> Optional[Dict[str, Any]]:
    """
    Extract metadata from a PDF file.

    Args:
        pdf_path: Absolute path to the PDF file

    Returns:
        Dictionary containing PDF metadata (including 'Pages' key),
        or None if extraction fails
    """
    try:
        info = pdfinfo_from_path(pdf_path)
        return info
    except Exception as e:
        print(f"Error extracting PDF metadata from {pdf_path}: {e}")
        return None


def upload_embeddings_to_qdrant(
    client: QdrantClient,
    original_embeddings: np.ndarray,
    metadata_batch: List[Dict[str, Any]],
    collection_name: str
) -> None:
    """
    Upload a batch of embeddings with metadata to Qdrant.

    Uploads the original embeddings as named vectors within the same collection for retrieval.

    Args:
        client: QdrantClient instance
        original_embeddings: Full multivector embeddings
        metadata_batch: List of payload dictionaries (source, filename, page_number, etc.)
        collection_name: Target Qdrant collection name
    """
    try:
        client.upload_collection(
            collection_name=collection_name,
            vectors={
                "original": original_embeddings
            },
            payload=metadata_batch,
            ids=[str(uuid.uuid4()) for _ in range(len(original_embeddings))]
        )
    except Exception as e:
        print(f"Error uploading to Qdrant: {e}")

print("‚úì Helper functions defined!")

## Step 6: Initialize Qdrant Client and Collection

In [None]:
# Initialize Qdrant client using environment variables
qdrant_host = os.getenv("QDRANT_HOST")
qdrant_api_key = os.getenv("QDRANT_API_KEY")

client = QdrantClient(host=qdrant_host, api_key=qdrant_api_key)
print(f"‚úì Connected to Qdrant Cloud: {qdrant_host}")

# Ensure collection exists with proper multivector configuration
if client.collection_exists(QDRANT_COLLECTION_NAME):
    print(f"‚úì Collection '{QDRANT_COLLECTION_NAME}' already exists")
else:
    print(f"Creating collection '{QDRANT_COLLECTION_NAME}' with multivector config...")
    client.create_collection(
        collection_name=QDRANT_COLLECTION_NAME,
        vectors_config={
            # Original multivector embeddings
            # HNSW disabled (m=0) for speed since these are only used for reranking
            "original": models.VectorParams(
                size=EMBEDDING_DIMENSION,
                distance=models.Distance.COSINE,
                multivector_config=models.MultiVectorConfig(
                    comparator=models.MultiVectorComparator.MAX_SIM
                ),
                hnsw_config=models.HnswConfigDiff(m=0)  # Disable HNSW index
            )
        }
    )
    print(f"‚úì Collection '{QDRANT_COLLECTION_NAME}' created successfully!")

## Step 7: Load ColQwen2.5 Omni Model (GPU)

In [None]:
print("Loading ColQwen2.5 Omni model and processor...")
print("This may take a few minutes on first run (downloading ~3GB model)...")

# Load model with GPU acceleration
device = "cuda:0" if torch.cuda.is_available() else "cpu"

colqwen_model = ColQwen2_5Omni.from_pretrained(
    "vidore/colqwen-omni-v0.1",
    torch_dtype=torch.bfloat16,
    device_map=device,  # Use GPU if available
    low_cpu_mem_usage=True
).eval()

colqwen_processor = ColQwen2_5OmniProcessor.from_pretrained("vidore/colqwen-omni-v0.1")

print(f"‚úì Model loaded successfully on {device}!")
print(f"‚úì Model device: {colqwen_model.device}")

## Step 8: Process PDFs from Google Drive

This cell will:
1. Find all PDFs in your `unprocessed/` folder
2. Convert each page to an image (150 DPI)
3. Generate ColQwen2.5 Omni embeddings
4. Upload embeddings to Qdrant
5. Move processed PDFs to `processed/` folder

**Note:** Place your PDFs in `Google Drive > ivy/data/unprocessed/` before running this cell.

In [None]:
source_label = "ingestion"  # Label for tracking document source
pdf_files = glob.glob(os.path.join(UNPROCESSED_DIR, "*.pdf"))

print(f"Found {len(pdf_files)} PDF file(s) to process\n")

if len(pdf_files) == 0:
    print("‚ö† No PDFs found in unprocessed directory!")
    print(f"üìÅ Please add PDFs to: {UNPROCESSED_DIR}")
else:
    # Process each PDF file
    for pdf_path in pdf_files:
        filename = os.path.basename(pdf_path)
        print(f"\n{'='*80}")
        print(f"Processing: {filename}")
        print(f"{'='*80}")

        # Extract PDF metadata
        pdf_info = get_pdf_info(pdf_path)
        total_pages = pdf_info.get('Pages', 0) if pdf_info else 0
        print(f"üìÑ Document has {total_pages} page(s)")

        # Process PDF in batches of pages
        for start_page_num in range(1, total_pages + 1, PAGES_PER_BATCH):
            end_page_num = min(start_page_num + PAGES_PER_BATCH - 1, total_pages)

            # Convert PDF pages to images
            page_images = convert_from_path(
                pdf_path,
                dpi=PDF_DPI,
                first_page=start_page_num,
                last_page=end_page_num,
                fmt="JPEG"
            )
            print(f"üìÑ Processing page(s) {start_page_num}-{end_page_num} of {total_pages}...")

            # Embed and upload each page
            for page_offset, page_image in enumerate(tqdm(
                page_images, desc="  Generating embeddings", leave=False
            )):
                current_page_num = start_page_num + page_offset

                try:
                  # Generate embeddings for this page
                  original = embed_images([page_image], colqwen_processor, colqwen_model)

                  # Prepare metadata for this page
                  metadata = {
                      "source": source_label,
                      "filename": filename,
                      "page_number": current_page_num,
                      "doc_type": "pdf"
                  }

                  # Upload to Qdrant
                  upload_embeddings_to_qdrant(
                      client,
                      np.asarray(original, dtype=np.float32),
                      [metadata],
                      QDRANT_COLLECTION_NAME
                  )
                except Exception as e:
                  print(f"Error processing page {current_page_num}: {e}")
                  continue

        print(f"‚úì Completed processing {filename}")

        # Move successfully processed PDF to processed directory
        try:
            destination = os.path.join(PROCESSED_DIR, filename)
            shutil.move(pdf_path, destination)
            print(f"‚úì Moved to processed directory: {filename}")
        except Exception as e:
            print(f"‚ö† Warning: Could not move {filename}: {e}")

    print(f"\n{'='*80}")
    print(f"‚úì ALL PROCESSING COMPLETE!")
    print(f"‚úì Processed {len(pdf_files)} PDF(s)")
    print(f"‚úì Embeddings uploaded to Qdrant collection: {QDRANT_COLLECTION_NAME}")
    print(f"{'='*80}")

## Optional: Verify Upload

Run this cell to check how many embeddings are now in your Qdrant collection.

In [None]:
# Get collection info
collection_info = client.get_collection(QDRANT_COLLECTION_NAME)

print(f"Collection: {QDRANT_COLLECTION_NAME}")
print(f"Total vectors: {collection_info.points_count}")
print(f"Vector config: {collection_info.config.params.vectors}")
print("\n‚úì Verification complete!")