# Gemini Approach

inspired by: https://www.philschmid.de/gemini-pdf-to-data

In [18]:
import os
import re
import json
from google import genai
from pydantic import BaseModel, Field
from typing import List, Optional, Literal, Union
from dotenv import load_dotenv
from utils import *

load_dotenv()

True

In [19]:
# load question json

# company = "reit"
# company = "TSX_Y_2022"
# company = "NASDAQ_CLXT_2022"

# with open(f"../data/questions_{company}.json", "r") as file:
#     questions = json.load(file)

# with open(f"../data/questions/clxt-tsxy.json", "r") as file:
#     questions = json.load(file)

with open(f"../data/round2/questions.json", "r") as file:
    questions = json.load(file)

# open companies list
# TODO
# companies_dict = get_companies_dict(r"C:\Users\felix.krause\code\trustbit\enterprise-rag-challenge\dataset_v2.json", subset_json=False)
# companies_dict = get_companies_dict(r"C:\Users\Felix\code\trustbit\enterprise-rag-challenge\dataset_v2.json", subset_json=False)
companies_dict = get_companies_dict(r"../data/round2/subset.json")

In [25]:
# open prompts
with open(f"prompts/prompt_gemini.md", "r") as file:
    system_prompt = file.read()

with open(f"prompts/prompt_company_extractor.md", "r") as file:
    prompt_company_extractor = file.read()

In [27]:
model_id = "gemini-2.0-flash"  # or "gemini-2.0-flash-lite-preview-02-05"  , "gemini-2.0-pro-exp-02-05", at least 1M token input context window

client = genai.Client(api_key=os.getenv("GEMINI_API_KEY"))

In [40]:
class SourceReferenceLLM(BaseModel):
    pdf_sha1: str = Field(..., description="SHA1 hash of the PDF file")
    page_index: int = Field(..., description="Physical page number in the PDF file")

class AnswerLLM(BaseModel):
    # question_text: str = Field(..., description="Text of the question")
    # kind: Literal["number", "name", "boolean", "names"] = Field(..., description="Kind of the question")
    chain_of_thought: str = Field(..., description="Chain of thought that led to the answer value")
    value: List[str] = Field(..., description="Answer to the question, according to the question schema")
    references: List[SourceReferenceLLM] = Field(..., description="References to the source material in the PDF file")

In [41]:
class Companies(BaseModel):
    chain_of_thought: str = Field(..., description="Chain of thought that led to the answer value")
    value: List[str] = Field(..., description="Identified company name as in list of companies.")

def get_company_name(query, companies_dict, verbose=True) -> list | None:
    companies = list(companies_dict.keys())
    company_meta = None

    # Check if query contains company name directly
    candidates = []
    for company in companies:
        # Build a regex pattern that ignores case
        pattern = re.escape(company)
        if re.search(pattern, query, re.IGNORECASE):
            candidates.append(company)

    if len(candidates) == 1:
        if verbose:
            print(f"Found company name with re: {candidates[0]}")
        company_meta = companies_dict[candidates[0]]
        company_meta["name"] = candidates[0]
        return [company_meta]

    if len(candidates) > 1:
        if verbose:
            print(f"Found multiple company names with re: {candidates}")
        company_meta = [companies_dict[c] for c in candidates]
        for meta, candidate in zip(company_meta, candidates):
            meta["name"] = candidate
        return company_meta

    # If no company name was found, ask model to extract it
    # SHOULD NEVER HAPPEN
    # prompt = prompt_company_extractor.replace("<<COMPANIES>>", ", ".join(companies))
    # prompt += "\n\nQuery: " + query
    #
    # response = client.models.generate_content(model=model_id, contents=[prompt], config={'response_mime_type': 'application/json', 'response_schema': Companies})
    #
    # if verbose:
    #     print(response.parsed)
    #
    # if response.parsed.value[0].lower() != "skip":
    #     company_meta = companies_dict[response.parsed.value]
    #     company_meta["name"] = response.parsed.value
    # else:
    #     return "N/A"

    return None

In [42]:
# Test if company is there
query = "Did Backpowder Inc mention any mergers or acquisitions in the annual report?"
response_0 = get_company_name(query, companies_dict)
response_0

In [43]:
# print(questions[53])
response_1 = get_company_name(questions[0]["text"], companies_dict)
response_1

Found company name with re: Ziff Davis, Inc.


[{'name': 'Ziff Davis, Inc.',
  'sha1': 'ecabab4934d4b80570c4bb3b8e35b7476694b3fb',
  'id': None}]

In [44]:
def get_document(company, verbose=True):
    uploaded_docs = list(client.files.list())
    files_set = {file.display_name: file for file in uploaded_docs}
    if f"{company}_annual_report" in files_set:
        if verbose:
            print(f"Found {company}_annual_report")
        return files_set[f"{company}_annual_report"]
    else:
        if verbose:
            print(f"Uploading {company} annual report...")
        document_path = f"../data/round2/pdfs/{company}.pdf"
        return client.files.upload(file=document_path, config={'display_name': f'{company}_annual_report'})

# Upload PDF manually
# company_document = client.files.upload(file=test_pdf_path, config={'display_name': f'{company}_annual_report'})

In [45]:
def ask_gemini(query, companies_dict=None, verbose=True):
    with open(f"prompts/prompt_gemini.md", "r") as file:
        system_prompt = file.read()

    prompt = system_prompt + "\n\nQuery: " + str(query)

    company_meta = get_company_name(query["text"], companies_dict, verbose)

    if not company_meta:
        return AnswerLLM(chain_of_thought="Company name not found", value=["N/A"], references=[]), None

    prompt = prompt.replace("<<COMPANY SHA1 MAPPING>>", str(company_meta))

    # Single company case
    if len(company_meta) == 1:
        document = get_document(company_meta[0]["sha1"], verbose)

        response = client.models.generate_content(model=model_id, contents=[prompt, document], config={'response_mime_type': 'application/json', 'response_schema': AnswerLLM, "temperature": 0.0})

        if verbose:
            print(response.parsed)

        return response.parsed, company_meta[0]["sha1"]
    else:
        documents = [get_document(company["sha1"], verbose) for company in company_meta]
        contents = [prompt] + documents

        response = client.models.generate_content(model=model_id, contents=contents, config={'response_mime_type': 'application/json', 'response_schema': AnswerLLM, "temperature": 0.0})

        if verbose:
            print(response.parsed)

        return response.parsed, [company["sha1"] for company in company_meta]

In [46]:
print(questions[0])
response_0, _ = ask_gemini(questions[0], companies_dict)

{'text': "For Ziff Davis, Inc., what was the value of Cloud storage capacity (TB) at the end of the period listed in annual report? If data is not available, return 'N/A'.", 'kind': 'number'}
Found company name with re: Ziff Davis, Inc.
Found ecabab4934d4b80570c4bb3b8e35b7476694b3fb_annual_report
chain_of_thought='I apologize, but I am unable to find the answer to your question within the provided document. The document does not contain any information about the value of Cloud storage capacity (TB) at the end of the period listed in the annual report for Ziff Davis, Inc.' value=['N/A'] references=[]


In [None]:
print(questions[53])
response_1, _ = ask_gemini(questions[53], companies_dict)

## Create final submission

In [35]:
# Create submission based on answers list and store json
import time

answer_items = [None] * len(questions)
failed_questions = []
for i, question in enumerate(questions):
    try:
        print("\n##############################")
        print(i, question)
        answer, sha1 = ask_gemini(question, companies_dict)
        print(sha1)

        if answer.value[0] == "N/A":
            value = "N/A"
            answer.references = []
        if len(answer.value) == 1:
            value = answer.value[0]
        else:
            value = answer.value

        if not answer.references:
            references = []
        else:
            references = [SourceReference(pdf_sha1=sha1, page_index=reference.page_index) for reference in answer.references]

        answer_item = Answer(question_text=question["text"], kind=question["kind"], value=value, references=references)
        answer_items[i] = answer_item

    except Exception as e:
        print("#+#+#+#+#+#")
        print(f"Failed to answer question {i}: {e}")
        print("#+#+#+#+#+#")
        failed_questions.append(i)

        answer_item = Answer(question_text=question["text"], kind=question["kind"], value="n/a", references=[])
        answer_items[i] = answer_item
    # wait for 20 sec
    print("\n\n\n\n")
    break
    time.sleep(20)

if failed_questions:
    print(f"WARNING: Failed to answer questions: {failed_questions}")


##############################
0 {'text': "For Ziff Davis, Inc., what was the value of Cloud storage capacity (TB) at the end of the period listed in annual report? If data is not available, return 'N/A'.", 'kind': 'number'}
Found company name with re: Ziff Davis, Inc.
Found ecabab4934d4b80570c4bb3b8e35b7476694b3fb_annual_report
None
ecabab4934d4b80570c4bb3b8e35b7476694b3fb
#+#+#+#+#+#
Failed to answer question 0: 'NoneType' object has no attribute 'value'
#+#+#+#+#+#







KeyboardInterrupt: 

In [10]:
# failed_questions of v1:
# "The request's total referenced files bytes are too large to be read"
# failed_questions = [6, 53, 60]

[6, 53, 60]

In [12]:
answer_items

[Answer(question_text="For Ziff Davis, Inc., what was the value of Cloud storage capacity (TB) at the end of the period listed in annual report? If data is not available, return 'N/A'.", kind='number', value='N/A', references=[]),
 Answer(question_text='Did Liberty Broadband Corporation announce a share buyback plan in the annual report? If there is no mention, return False.', kind='boolean', value='no', references=[]),
 Answer(question_text="What is the total number of employees let go by Pintec Technology Holdings Limited according to the annual report? If data is not available, return 'N/A'.", kind='number', value='N/A', references=[]),
 Answer(question_text="Which leadership positions changed at Westwater Resources, Inc. in the reporting period? If data is not available, return 'N/A'. Give me the title of the position.", kind='names', value=['Chief President', 'Chief Executive Officer', 'Senior Vice President of Finance', 'Chief Financial Officer', 'Chief Administrative Officer'], 

In [11]:
final_submission = AnswerSubmission(answers=answer_items, team_email="felix.krause@timetoact.at", submission_name="gemini_naive")

In [22]:
# store submission as json
final_submission_path = f"../data/round2/submissions/gemini_submission.json"

with open(final_submission_path, "w") as file:
    json.dump(final_submission.model_dump(), file, indent=4)