In [1]:
import os
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from concurrent.futures import ThreadPoolExecutor, as_completed
import pandas as pd 

from datasets import load_dataset

from byaldi import RAGMultiModalModel

from langchain_core.messages import HumanMessage
from langchain_openai import ChatOpenAI

In [2]:
with open('keys/hf_key.txt', 'r') as file:
    hf_key = file.read().strip()

with open("keys/openai_api_key.txt", "r") as file:
    openai_key = file.read().strip()

os.environ["HF_TOKEN"] = hf_key
os.environ["OPENAI_API_KEY"] = openai_key

os.environ["TOKENIZERS_PARALLELISM"] = "false"

In [3]:
import warnings

warnings.filterwarnings('ignore')

## Dataset 

In [4]:
# Load the dataset
dataset = load_dataset("ibm/finqa", trust_remote_code=True)

# Access the splits
data = dataset['train'].to_pandas()
validation_data = dataset['validation'].to_pandas()
test_data = dataset['test'].to_pandas()

data = pd.concat([data, validation_data, test_data])
data.reset_index(drop=True, inplace=True)
data = data[["id", "question", "answer", "gold_inds"]]

In [5]:
data["Company"] = [row[0] for row in data.id.str.split("/")]
data["Year"] = [row[1] for row in data.id.str.split("/")]

In [6]:
unique_companies = set(data.Company.unique())

needed_years = {}

for company in unique_companies:
    needed_years[company] = list(data[data.Company == company].Year.unique())

file_count = 0

for company in needed_years.keys():
    for year in needed_years[company]:
        try:
            file_count += len(os.listdir(f"docs/{company}/{year}/"))
        except:
            print(f"docs/{company}/{year}/")
            
file_count

docs/AAP/2006/


29159

In [7]:
data = data[(data.Company == "AAL" )& (data.Year == "2014")]

## Indexing 

In [8]:
RAG = RAGMultiModalModel.from_pretrained("vidore/colqwen2-v1.0", device="mps")

Verbosity is set to 1 (active). Pass verbose=0 to make quieter.


`Qwen2VLRotaryEmbedding` can now be fully parameterized by passing the model config through the `config` argument. All other arguments will be removed in v4.46


Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

In [9]:
RAG.index(
    input_path="docs/temp/",
    index_name="finqa",
    overwrite=True,
)

overwrite is on. Deleting existing index finqa to build a new one.
Indexing file: docs/temp/page_108.pdf
Added page 1 of document 0 to index.
Index exported to .byaldi/finqa
Index exported to .byaldi/finqa


{0: 'docs/temp/page_108.pdf'}

In [None]:
# Define companies
companies = ["AAL"]

# Define a function to process a single page and add it to the RAG index
def process_page_for_index(company, year, page):
    
    RAG.add_to_index(
        input_item=f"docs/{company}/{year}/{page}",
        store_collection_with_index=True,
        metadata={"Company": company, "Year": year, "Page": page},
    )

# Use ThreadPoolExecutor for parallel processing
with ThreadPoolExecutor() as executor:
    futures = []

    for company in companies:
        years = os.listdir(f"docs/{company}/")
        years = ["2014"]

        for year in years:
            pages = os.listdir(f"docs/{company}/{year}/")

            # Submit tasks for each page
            for page in pages:
                futures.append(executor.submit(process_page_for_index, company, year, page))

    # Wait for all tasks to complete
    for future in futures:
        future.result()

## Retrieve and Generate

In [None]:
model = ChatOpenAI(model="gpt-4o")

In [None]:
def image_prompt(image, question):

    query = f"""
    Answer the following query based solely on the provided image,  Give a short answer, 2-3 words at most. Then explain the steps you took to arrive at your answer.

    Query: {question}
    """

    message = HumanMessage(
    content=[
        {"type": "text", "text": query},
        {
            "type": "image_url",
            "image_url": {"url": f"data:image/pdf;base64,{image}"},
        },
    ],
    )

    return message

In [None]:
results = pd.DataFrame(columns=["Retrieved Context","Correct Documents", "Generated Answer", "Correct Answer"], index=data.index)

# Define the function to process a single item
def process_item(idx):
    query = data.loc[idx, "question"]
    company = data.loc[idx, "Company"]
    year = data.loc[idx, "Year"]

    # Perform retrieval
    retrieved = RAG.search(query, k=1, filter_metadata={"Company": company, "Year": year})

    # Populate the results row
    retrieved_context = f"{company}/{year}/{retrieved[0].metadata['Page']}"
    generated_answer = model.invoke([image_prompt(retrieved[0]["base64"], query)]).content

    return idx, retrieved_context, generated_answer

# Use ThreadPoolExecutor for parallel processing
with ThreadPoolExecutor() as executor:
    futures = [executor.submit(process_item, idx) for idx in data.index]

    # Gather results
    for future in futures:
        idx, retrieved_context, generated_answer = future.result()
        results.loc[idx, "Retrieved Context"] = retrieved_context
        results.loc[idx, "Generated Answer"] = generated_answer

In [None]:
results["Correct Answer"] = data.answer
results["Correct Documents"] = data.id
results["Golden Context"] = data.gold_inds

results.to_csv("colpali.csv")