In [None]:
from fastapi import FastAPI, HTTPException
import argparse
from pydantic import BaseModel
import os
import json
import torch
from doctr.models import ocr_predictor
from doctr.io import DocumentFile
from PIL import Image, ImageEnhance
import io
import re
from pdf2image import convert_from_path
from byaldi import RAGMultiModalModel
from transformers import Qwen2VLForConditionalGeneration, AutoProcessor
from qwen_vl_utils import process_vision_info

app = FastAPI()
TEMP_IMAGE_PATH = "./temp_image.jpg"
TEMP_IMAGE_DIR = "./temp_images/"

# Set environment variables for library compatibility
os.environ['USE_TORCH'] = 'YES'
os.environ['USE_TF'] = 'NO'
os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True"
os.environ["TOKENIZERS_PARALLELISM"] = "false"

# Paths to the PDF files and mapping file
PDF_DIRECTORY = "./document_test"
MAPPING_FILE = './doc_id_to_path.json'
INDEX_ROOT = "/home/ubuntu/Educational_VQA/.byaldi"
INDEX_NAME = "global_index"
overwrite= False

In [None]:
def initialize_rag_model(overwrite=False, device="cuda", verbose=1):
    index_path = os.path.join(INDEX_ROOT, INDEX_NAME)
    print("overwrite-----", overwrite)
    print('index_path---', index_path)
    print(f"Checking index path: {index_path}")
    if os.path.exists(index_path) and not overwrite:
        print("Index exists and overwrite=False. Loading existing index.")
        RAG = RAGMultiModalModel.from_index(
            index_path=index_path,
            index_root=INDEX_ROOT,
            device=device,
            verbose=verbose
        )
        print("Loaded existing index.")
    else:
        if os.path.exists(index_path) and overwrite:
            print("Index exists and overwrite=True. Deleting existing index.")
            shutil.rmtree(index_path)
        # Initialize RAG from pretrained
        print("Initializing RAG from pretrained.")
        RAG = RAGMultiModalModel.from_pretrained("vidore/colqwen2-v1.0")
    return RAG

In [None]:
def index_documents_in_folder(RAG, folder_path, index_name, overwrite=False):
    """Indexes all documents in a folder and creates a doc_id-to-path mapping."""
    print(f"Indexing documents in {folder_path} with index_name {index_name}, overwrite={overwrite}")
    # Get list of PDF files
    pdf_files = [os.path.join(folder_path, f) for f in os.listdir(folder_path) if f.endswith(".pdf")]
    # Create mapping between doc_id and file paths
    doc_id_to_path = {i: pdf_path for i, pdf_path in enumerate(pdf_files)}
    # Write mapping file only if overwrite is True or it doesn't exist
    if overwrite or not os.path.exists(MAPPING_FILE):
        with open(MAPPING_FILE, "w") as f:
            json.dump(doc_id_to_path, f)
    # Index documents
    RAG.index(
        input_path=folder_path,
        index_name=index_name,
        store_collection_with_index=False,
        overwrite=overwrite,
    )


def add_index_documents_in_folder(RAG, folder_path, index_name, overwrite=False):
    """
    Indexes all documents in a folder and creates a doc_id-to-path mapping 
    using the `add_to_index` method.
    """
    print(f"Indexing documents in {folder_path} with index_name {index_name}, overwrite={overwrite}")
    
    # Get list of PDF files
    pdf_files = [os.path.join(folder_path, f) for f in os.listdir(folder_path) if f.endswith(".pdf")]
    
    if not overwrite and os.path.exists(MAPPING_FILE):
        with open(MAPPING_FILE, "r") as f:
            doc_id_to_path = json.load(f)
        # Ensure no duplicate entries by finding the max doc_id
        existing_doc_ids = set(doc_id_to_path.keys())
        max_doc_id = max(map(int, existing_doc_ids)) if existing_doc_ids else 0
    else:
        doc_id_to_path = {}
        max_doc_id = 0

    # Add new documents to the mapping
    new_doc_id_to_path = {
        max_doc_id + i + 1: pdf_path for i, pdf_path in enumerate(pdf_files) 
        if pdf_path not in doc_id_to_path.values()  # Avoid duplicate file entries
    }
    doc_id_to_path.update(new_doc_id_to_path)

    with open(MAPPING_FILE, "w") as f:
        json.dump(doc_id_to_path, f, indent=4)
    
    # Add documents to the index individually
    for doc_id, pdf_path in new_doc_id_to_path.items():
        print(f"Adding document {pdf_path} to index with doc_id {doc_id}")
        RAG.add_to_index(
            input_item=pdf_path,
            store_collection_with_index=False,  # Adjust based on your requirement
            doc_id=doc_id,
            metadata={"filename": os.path.basename(pdf_path)}  # Optional metadata
        )
    print(f"Indexing completed for {len(pdf_files)} documents.")