# Fiscal Report Analysis with a Custom NumPy Vector Store

This notebook demonstrates how to build a custom, in-memory vector search system to analyze fiscal reports. We'll load a PDF, chunk the text, generate embeddings, and use a custom retriever to answer questions about the document.

## Step 1: Install Dependencies

In [None]:
%pip install airefinery-sdk python-dotenv numpy pdfplumber

## Step 2: Import Libraries and Setup Environment

In [None]:
import asyncio
import os
import numpy as np
from dotenv import load_dotenv
from air import AIRefinery, DistillerClient, login
from air.utils import async_print
from library.data_processing import extract_text_with_pdfplumber, extract_tables_with_pdfplumber, chunk_text, generate_document_embeddings, cache_embeddings, load_cached_embeddings
from library.vector_store import InMemoryVectorStore

load_dotenv(override=True)

## Step 3: Load and Process the PDF Document

In [None]:
EMBEDDINGS_CACHE_PATH = "embeddings.pickle"
PDF_PATH = "data/acn-third-quarter-fiscal-2025-earnings-release.pdf"

auth = login(
    account=str(os.getenv("ACCOUNT")),
    api_key=str(os.getenv("API_KEY")),
)
base_url = os.getenv("AIREFINERY_ADDRESS", "")
air_client = AIRefinery(**auth.openai(base_url=base_url))
embedding_client = air_client.embeddings

if os.path.exists(EMBEDDINGS_CACHE_PATH):
    cached_data = load_cached_embeddings(EMBEDDINGS_CACHE_PATH)
    documents_from_pdf = cached_data["documents"]
    document_vectors = cached_data["vectors"]
else:
    # Extract both text and tables using pdfplumber for better results
    plain_text = extract_text_with_pdfplumber(PDF_PATH)
    table_html = extract_tables_with_pdfplumber(PDF_PATH)
    combined_content = plain_text + "\n\n--- Extracted Tables ---\n" + table_html
    
    documents_from_pdf = chunk_text(combined_content)
    document_vectors = generate_document_embeddings(documents_from_pdf, embedding_client)
    if documents_from_pdf and document_vectors:
        cache_embeddings(documents_from_pdf, document_vectors, EMBEDDINGS_CACHE_PATH)

vector_store = InMemoryVectorStore(documents_from_pdf, document_vectors)

## Step 4: Define the Custom Retriever

In [None]:
def _format_document_result(doc_id, doc, source_weight=1, retriever_name=""):
    base_score = doc.get("score", 0.0)
    final_score = float(base_score * source_weight)
    content_text = doc.get("content", {}).get("text", "")
    formatted_result = f"Source: {retriever_name}\nID: {doc_id}\nContent: {content_text[:500]}..."
    return {"result": formatted_result, "score": final_score}

async def custom_in_memory_vector_search(query: str):
    print(f"Received query for vector search: '{query}'")
    response = embedding_client.create(
        input=[query],
        model="nvidia/nv-embedqa-mistral-7b-v2",
        encoding_format="float",
        extra_body={"input_type": "query", "truncate": "NONE"},
    )
    query_vector = np.array(response.data[0].embedding, dtype=np.float32).reshape(1, -1)
    
    print("Searching for relevant documents in the vector store...")
    documents = vector_store.search(query_vector)

    if not documents:
        return [{"result": "There is no relevant document from the PDF.", "score": 0}]

    results = [
        _format_document_result(
            doc["id"], doc, source_weight=1, retriever_name="PDF-NumPy-Retriever"
        )
        for doc in documents
    ]
    return results

## Step 5: Run a Simple Query with the Research Agent

In [None]:
async def run_simple_query():
    distiller_client = DistillerClient(base_url=base_url)
    project = "DocumentSearch"
    uuid = os.getenv("UUID", "test_user_simple")

    distiller_client.create_project(
        config_path="custom_vector_search.yaml", project=project
    )

    executor_dict = {
        "Research Agent": {
            "Fiscal Reports Database": custom_in_memory_vector_search,
        }
    }

    async with distiller_client(
        project=project,
        uuid=uuid,
        executor_dict=executor_dict,
    ) as dc:
        query = "how much generative ai bookings were there?"
        responses = await dc.query(query=query)
        print(f"----\nQuery: {query}")
        async for response in responses:
            await async_print(f"Response: {response['content']}")

await run_simple_query()

## Step 6: Run a Comparative Analysis with the Flow Super Agent

In [None]:
async def run_flow_super_agent():
    distiller_client = DistillerClient(base_url=base_url)
    project = "FinancialAnalysisFlow"
    uuid = os.getenv("UUID", "test_user_flow")

    distiller_client.create_project(
        config_path="flow_super_agent.yaml", project=project
    )

    executor_dict = {
        "Fiscal Report Researcher": {
            "Fiscal Reports Database": custom_in_memory_vector_search,
        }
    }

    async with distiller_client(
        project=project,
        uuid=uuid,
        executor_dict=executor_dict,
    ) as dc:
        query = "Provide a comparative analysis of the attached fiscal report."
        responses = await dc.query(query=query)
        print(f"----\nQuery: {query}")
        async for response in responses:
            await async_print(f"Role: {response.get('role', 'System')}\nContent: {response.get('content', '')}\n")

await run_flow_super_agent()