# Sherpa Digital Brain Technical Task

## Setup Instructions

*Git instructions*

## Task

*Task instructions*


In [None]:
# Git magic to copy additional folders from github repo
import os

REPO_URL = "https://github.com/Charter-AI/sherpa-digital-brain-technical-task.git"
REPO_NAME = "sherpa-digital-brain-technical-task"
REPO_PATH = f"/content/{REPO_NAME}"


# Clone only if the repo folder doesn't already exist
if not os.path.exists(REPO_PATH):
    print(f"Cloning {REPO_URL} into {REPO_PATH}")
    os.chdir('/content')
    !git clone $REPO_URL
    os.chdir(REPO_PATH)
else:
    print(f"Repo '{REPO_NAME}' already exists at {REPO_PATH}. \nSkipping clone and pulling instead...")
    os.chdir(REPO_PATH)
    !git pull

# Confirm it worked
print("Current directory:", os.getcwd())
print("Files in current directory:", os.listdir('.'))




In [5]:
# Used later to make all references line up
project_root = os.getcwd()

In [None]:
# Download byaldi embeddings and save to /content/sherpa-digital-brain-technical-task/.byaldi/default_index

%pip install requests
%pip install tqdm

import os
import zipfile
import requests
from tqdm import tqdm

# Public URL to the ZIP file in Azure Blob Storage
ZIP_URL = "https://sherpapublicdata.blob.core.windows.net/byaldi-embeddings/MBB_AI_byaldi_embeddings.zip"

# Where the extracted files should go
EXTRACTED_FOLDER = "/content/sherpa-digital-brain-technical-task/.byaldi/default_index"
ZIP_FILENAME = "MBB_AI_byaldi_embeddings.zip"

# Check if folder is already extracted
if not os.path.exists(EXTRACTED_FOLDER):
    # Download ZIP file
    print(f"Downloading {ZIP_FILENAME}...")
    response = requests.get(ZIP_URL, stream=True)
    if response.status_code != 200:
        raise Exception(f"Failed to download ZIP file: {response.status_code}")

    total_size = int(response.headers.get('content-length', 0))
    with open(ZIP_FILENAME, "wb") as f, tqdm(desc=ZIP_FILENAME, total=total_size, unit='B', unit_scale=True, unit_divisor=1024) as bar:
        for chunk in response.iter_content(chunk_size=1024):
            if chunk:
                f.write(chunk)
                bar.update(len(chunk))

    # Extract ZIP
    print(f"Extracting {ZIP_FILENAME}...")
    with zipfile.ZipFile(ZIP_FILENAME, 'r') as zip_ref:
        zip_ref.extractall(EXTRACTED_FOLDER)

    # Cleanup if needed
    os.remove(ZIP_FILENAME)
    print(f"Extraction complete. Files are in '{EXTRACTED_FOLDER}'")
else:
    print(f"Folder '{EXTRACTED_FOLDER}' already exists. Skipping download and extraction.")


In [None]:
# Install dependencies
# Takes ~2min

%pip install --upgrade pip

%pip install numpy
%pip install torch
%pip install openai
%pip install python-dotenv

!apt-get install -y poppler-utils

%pip install byaldi
%pip install flash-attn

In [2]:
# Import dependencies
import os
import sys
import torch
import base64

import numpy as np
from dotenv import load_dotenv

# from openai import AzureOpenAI
from byaldi import RAGMultiModalModel
from IPython.display import display
from IPython.display import Image as IPythonImage

  from .autonotebook import tqdm as notebook_tqdm


In [None]:
# LLM functions (OpenAI GPT)
load_dotenv()

# Access environment variables
AZURE_OPENAI_MODEL = os.getenv("AZURE_OPENAI_MODEL")
AZURE_OPENAI_API_KEY = os.getenv("AZURE_OPENAI_API_KEY")
AZURE_OPENAI_ENDPOINT = os.getenv("AZURE_OPENAI_ENDPOINT")
AZURE_OPENAI_VERSION = os.getenv("AZURE_OPENAI_VERSION")

openai_client = AzureOpenAI(
    api_key=AZURE_OPENAI_API_KEY,
    api_version=AZURE_OPENAI_VERSION,
    azure_endpoint=AZURE_OPENAI_ENDPOINT,
)


def query_llm(
        query: str,
        system_prompt: str = "You are a helpful assistant.",
        b64_images: list[str] | None = None,
        raw_output: bool = False
) -> str:
    user_content = [{"type": "text", "text": query}]

    # Send b64 images to LLM if provided
    if b64_images is not None:
        user_content.extend([
            {
                "type": "image_url",
                "image_url": {
                    "url": f"data:image/jpeg;base64,{b64_image}"
                }
            } for b64_image in b64_images
        ])

    messages = [
        {"role": "system", "content": system_prompt},
        {"role": "user", "content": user_content}
    ]

    raw_response = openai_client.chat.completions.create(
        model=AZURE_OPENAI_MODEL,
        messages=messages,
        temperature=0.0
    )

    if raw_output:
        return raw_response
    else:
        return raw_response.choices[0].message.content


In [9]:
# Byaldi functions
default_index = "default_index"

def initialise_rag_model(index_name: str = default_index, device: str | None = None) -> RAGMultiModalModel:
    """
    Initialises the RAG model with index stored at `.byaldi/<index_name>`.
    If index does not exist, index will be created.
    If index does exist, it will be loaded.
    """
    rag_model = None

    # Use CUDA if available, otherwise use CPU
    if device is None:
        device = "cuda" if torch.cuda.is_available() else "cpu"
    else:
        device = "cuda" if device == "gpu" else "cpu"

    print(f"Using device: {device}")

    # If .byaldi/<index_name> folder exists, load index from there
    # Otherwise, load generic model with vidore/colqwen2-v1.0 vision model

    index_path = project_root + "/.byaldi/" + index_name
    if os.path.exists(index_path):
        print(f"Loading index from {index_path}")
        rag_model = RAGMultiModalModel.from_index(index_path, device=device)
    else:
        print(f"No index found at {index_path}. Loading generic vidore/colqwen2-v1.0 model.")
        rag_model = RAGMultiModalModel.from_pretrained(
            "vidore/colqwen2-v1.0",
            device=device
        )

    return rag_model

def index_documents(rag_model: RAGMultiModalModel, index_name: str = default_index, docs_folder_name: str = "documents"):
    """
    Indexes documents for RAG model in `docs_folder_name` and stores index at `.byaldi/<index_name>`.
    If index already exists, it will be overwritten.
    """
    project_root + "/.byaldi/" + index_name
    docs_path = project_root + "/" + docs_folder_name

    # Check if documents folder exists
    if not os.path.exists(docs_path):
        raise FileNotFoundError(f"Documents folder {docs_path} does not exist")

    rag_model.index(
        input_path=docs_path,
        index_name=index_name,
        store_collection_with_index=True,    # Store base64 encodings of documents with index
        max_image_height=2048,
        max_image_width=2048,
        overwrite=True
    )

def query_rag_model(rag_model: RAGMultiModalModel, query: str, k: int = 5):
    """
    Queries the RAG model with `query` and returns `k` most relevant documents.

    Returns a list of dictionaries, each containing the following keys:
    - `doc_id`: ID of document
    - `page_num`: Page number of image within document
    - `score`: Score of document
    - `metadata`: Metadata attached to image
    - `base64`: Base64 encoded image
    """
    results = rag_model.search(
        query=query,
        k=k
    )

    results_dict = [x.__dict__ for x in results]
    return results_dict


In [10]:
# Index all 3 files in /documents folder
# Takes ~mins on CPU
rag_model = initialise_rag_model(device="cpu")
index_documents(rag_model)

Using device: cpu
No index found at /home/sebastian/Coding/sherpa-digital-brain-technical-task/.byaldi/default_index. Loading generic vidore/colqwen2-v1.0 model.
Verbosity is set to 1 (active). Pass verbose=0 to make quieter.


Fetching 2 files: 100%|██████████| 2/2 [00:00<00:00, 35394.97it/s]
Loading checkpoint shards: 100%|██████████| 2/2 [00:11<00:00,  5.59s/it]
Using a slow image processor as `use_fast` is unset and a slow processor was saved with this model. `use_fast=True` will be the default behavior in v4.52, even if the model was saved with a slow processor. This will result in minor differences in outputs. You'll still be able to use a slow processor with `use_fast=False`.


Indexing file: /home/sebastian/Coding/sherpa-digital-brain-technical-task/documents/bain_report_luxury_and_technology_artificial_intelligence_the_quiet_revolution.pdf.pdf
Added page 1 of document 0 to index.


: 

In [None]:
# Query RAG model and display retrieved images
example_queries = [
    "Who at Bain and Company should be assigned to a project on integrating AI into the luxury industry?",
    "What frameworks can be used to conceptualise AI integration into modern businesses?",
    "Summarise that McKinsey report on generative AI",
]

rag_model = initialise_rag_model()

for query in example_queries:
    results = query_rag_model(rag_model, query)
    images = [result["base64"] for result in results]

    print(f"Query: {query}")
    for result in results:
        print(f"Retreived page {result['page_num']} from document {result['doc_id']} with score {result['score']}")

# for result in results:
#     display(IPythonImage(data=base64.b64decode(result["base64"])))

In [None]:
llm_response = query_llm(example_query, b64_images=images)
print(llm_response)