In [None]:
from vespa.application import Vespa
from dotenv import load_dotenv  
load_dotenv()
import os

# Path to your security files
cert_path = "/Users/shivamjogdand/Desktop/Learning/DeepAgents/Report_Generation_Git/Report_Generation/certs/public.pem"
key_path = "/Users/shivamjogdand/Desktop/Learning/DeepAgents/Report_Generation_Git/Report_Generation/certs/private.pem"
endpoint_url = os.getenv("VESPA_ENDPOINT")

app = Vespa(
    url=endpoint_url,
    cert=cert_path,
    key=key_path
)

# Table Extraction

In [51]:
TABLE_RECONSTRUCTION_PROMPT = """
You are a Senior Equity Research Analyst and Financial Writer.

INPUT:
1. Vespa page title: {page_title}
2. Text content: {content}

YOUR TASK:
1. Extract ONLY true financial statement tables.
2. Extract the financial statement table and generate a high-precision, professional "description" suitable for inclusion in an Annual Report or an Investment Prospectus.

==================================================
TITLE SELECTION RULES (STRICT)
==================================================
1. **HIGHEST PRIORITY (CONTENT):** - Look at the text content immediately preceding the table data.
   - If there is a specific table heading in the text (e.g., "SUMMARIZED CONSOLIDATED STATEMENT...", "Table 1: Revenue", "Segment Information"), use that heading EXACTLY as it appears in the text.
   - This overrides the Vespa title.

2. **SECONDARY PRIORITY (VESPA):**
   - ONLY if no specific heading is found in the text content, check the Vespa page title.
   - Use the Vespa title only if it clearly describes the specific table being extracted.

3. **FORMATTING:**
   - If a title exists, prefix it with '### '.
   - The title MUST appear BEFORE the table, never inside it.
   - Do NOT invent titles. If neither source has a valid title, return null.

==================================================
TRUE FINANCIAL STATEMENT TABLE DEFINITION
==================================================
A table is a TRUE financial statement table ONLY IF it represents:
- Income statement / Comprehensive Income
- Balance sheet / Financial Position
- Cash flow statement
- Statement of changes in equity

The table MUST contain financial magnitudes (monetary figures) and multiple numeric rows.

==================================================
DESCRIPTION ARCHITECTURE (MANDATORY)
==================================================
The "description" field must be a formal 2-paragraph analysis:

Paragraph 1: Identification & Scope
- Formally identify the statement (e.g., "Consolidated Statement of Operations").
- Specify the reporting entity and the period covered (e.g., "for the fiscal years ended December 31").
- State the currency and scale (e.g., "presented in millions of USD, except per share data").

Paragraph 2: Structural Summary
- Describe the flow of the table. (e.g., "The statement begins with Revenue and flows through operating expenses and tax provisions to arrive at Net Income.")
- Mention key line items or sub-sections included (e.g., "Includes breakdowns for Research & Development, Sales and Marketing, and General Administrative costs.")
- Explain what the table tells a reader about the company's health (e.g., "This provides a view of the company's operational efficiency and bottom-line profitability over the comparative periods.")

==================================================
CRITICAL CONTINUITY & MERGING RULE
==================================================
If a statement is split into sections (e.g., "Operating activities", "Investing activities", "Financing activities"), you MUST MERGE them into ONE continuous table_markdown. DO NOT truncate the table after the first section.

==================================================
RULES FOR THE TABLE (STRICT)
==================================================
1. TITLES: Use the Vespa title if it's a formal name; otherwise, extract the heading from the content. Prefix with '### '.
2. EXCLUSIONS: Do NOT extract narrative text, footnotes, or index tables. Only numeric financial statements.

MANDATORY:
- If table_markdown is present, "description" and "investor_relevance" MUST be populated with high-quality content.

==================================================
OUTPUT FORMAT (JSON ONLY)
==================================================
{{
  "title": "string | null",
  "investor_relevance": "A concise one-sentence statement on why this table is critical for valuation.",
  "description": "The 2-paragraph professional description as defined above.",
  "table_markdown": "The cleaned, reconstructed markdown table."
}}
==================================================
"""

In [55]:
import tabulate
from typing import List
def parse_results(results):
    parsed_results = []
    try:
        # Check if results is a Vespa Response object and get JSON
        # If it's already a dict, use it directly
        data = results.get_json() if hasattr(results, 'get_json') else results
        
        children = data.get('root', {}).get('children', [])
        
        if not children:
            logger.info("No results found in Vespa response")
            return parsed_results
        
        for result in children:
            # Safely extract fields with defaults
            fields = result.get('fields', {})
            matchfeatures = fields.get('matchfeatures', {})
            
            parsed_result = {
                'title': fields.get('title', '  '),
                'content': fields.get('content'),
                'page_number': fields.get('page_number'),
                'url': fields.get('url'),
                'tenant_id': fields.get('tenant_id'),
                'file_id': fields.get('file_id'),
                'relevance': result.get('relevance'),
                'cosine_scores': {
                    'cross_max_sim': matchfeatures.get('cross_max_sim'),
                    'semantic_summary': matchfeatures.get('semantic_question'),
                },
                'type': 'document'
            }
            parsed_results.append(parsed_result)
        
        return parsed_results
    except Exception as e:
        logger.error(f"Error parsing search results: {str(e)}")
        return parsed_results

def filter_and_dedupe_results(results: List[dict], score_field: str, threshold: float = 0.75, unique_fields: List[str] = None) -> List[dict]:
    """Filter by score threshold and remove duplicates in one pass"""
    seen_combinations = set()
    filtered_unique = []
    
    for result in results:
        cosine_scores = result.get('cosine_scores', {})
        if cosine_scores.get(score_field, 0.0) < threshold:
            continue
            
        if unique_fields:
            identifier = tuple(result.get(field, '') for field in unique_fields)
            if identifier in seen_combinations:
                continue
            seen_combinations.add(identifier)
        
        filtered_unique.append(result)
    
    return filtered_unique

def print_results(results):
    """
    Utlity function to print results in tabular form
    """
    print('Number of documents retrieved: '+ str(results.number_documents_retrieved))
    print('Number of documents returned: '+ str(len(results.hits)))
    
    record = 1
    for hit in results.hits:
        data = []

        for entry in hit:
            if entry != 'fields': data.append([entry, hit.get(entry)])
        hit = hit.get('fields')
        
        for entry in hit:
            data.append([entry, hit.get(entry)])
        
        print('record#', record)
        print(tabulate.tabulate(data))
        record += 1
        print('\n')

In [None]:
import re
import logging
from typing import List
from concurrent.futures import ThreadPoolExecutor
from langchain_google_genai import ChatGoogleGenerativeAI
from pydantic import BaseModel
from dotenv import load_dotenv
import os

load_dotenv()

# Setup simple logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# ======================================================
# 0. Models
# ======================================================

class TableResponse(BaseModel):
    title: str | None = None
    investor_relevance: str | None = None
    description: str | None = None
    table_markdown: str | None = None


# ======================================================
# 1. Gemini Setup
# ======================================================

gemini_llm = ChatGoogleGenerativeAI(
    model="gemini-2.5-flash-lite",
    temperature=0.7,
    # max_output_tokens=1200,
    api_key=os.getenv("GEMINI_SEARCH_KEY_1")
)

structured_model = gemini_llm.with_structured_output(
    schema=TableResponse.model_json_schema(),
    method="json_schema"
)

# ======================================================
# 2. Regex Helpers
# ======================================================

TABLE_REGEX = re.compile(
    r'(\|.+\|\n\|[-:\s|]+\|\n(?:\|.*\|\n?)*)',
    re.MULTILINE
)

NUMBER_REGEX = re.compile(r'[-+]?\d{1,3}(?:,\d{3})*(?:\.\d+)?')

def has_sufficient_numerical_data(text: str, min_numbers=6) -> bool:
    return len(NUMBER_REGEX.findall(text or "")) >= min_numbers

def is_table_present(content: str) -> bool:
    return any(
        has_sufficient_numerical_data(t)
        for t in TABLE_REGEX.findall(content or "")
    )



# ======================================================
# 4. Vespa Logic
# ======================================================

def get_page_numbers_with_tables(hits):
    """
    Identifies pages containing tables. 
    Handles both 'Parsed' results (flat) and 'Raw' hits (nested fields).
    """
    pages = set()
    for h in hits:
        # Compatibility check: Handle both flattened parsed result and raw Vespa hit
        if 'fields' in h:
            content = h['fields'].get('content', "")
            page_num = h['fields'].get('page_number')
        else:
            content = h.get('content', "")
            page_num = h.get('page_number')

        if is_table_present(content):
            pages.add(page_num)
            
    print(f"[INFO] Initial pages with numeric tables: {sorted(pages)}")
    return pages


def fetch_specific_pages(app, tenant_id, file_id, page_list):
    if not page_list:
        return []

    pages_str = ", ".join(map(str, page_list))
    yql = (
        f"select title, page_number, content "
        f"from sources pefund.pefund "
        f"where tenant_id contains \"{tenant_id}\" "
        f"AND file_id contains \"{file_id}\" "
        f"AND page_number in ({pages_str})"
    )

    res = app.query(body={"yql": yql, "hits": len(page_list)})

    # print(f"\n--- [DEBUG] Fetched Neighbor Pages: {page_list} ---")
    # print_results(res)
    
    return res.hits if hasattr(res, "hits") else res.get("hits", [])


def get_complete_table_sequence(app, tenant_id, file_id, query):
    # 1. Initial Query
    yql = (
        f"select title, page_number, content "
        f"from sources pefund.pefund "
        f"where tenant_id contains \"{tenant_id}\" "
        f"AND file_id contains \"{file_id}\" "
        f"AND (userQuery() or ({{targetHits:100}}nearestNeighbor(question_embeddings,q)))"
    )

    res = app.query({
        "yql": yql,
        "query": query,
        "input.query(q)": f"embed(e5-small-v2, @query)",
        'input.query(qt)': f'embed(colbert, @query)',
        "ranking": "semantic-cross-sim",
        "hits": 20
    })

    print("\n--- [DEBUG] Initial Search Results ---")
    print_results(res)
    
    # 1. Parse raw Vespa response into clean list
    parsed_hits = parse_results(res)

    # 2. Filter by threshold and Remove Duplicates
    filtered_hits = filter_and_dedupe_results(
        results=parsed_hits, 
        score_field='cross_max_sim', 
        threshold=0.65, 
        unique_fields=['file_id', 'page_number'],
    )
    
    print(f"[INFO] Hits after filtering: {len(filtered_hits)} (Original: {len(parsed_hits)})")

    # ======================================================

    # 3. Identify Seed Pages from the Cleaned List
    confirmed = get_page_numbers_with_tables(filtered_hits)
    checked = set()

    # 4. Neighbor Expansion (Same as before)
    while True:
        candidates = {
            n for p in confirmed
            for n in (p - 1, p + 1)
            if n > 0 and n not in confirmed and n not in checked
        }
        if not candidates:
            break

        new_hits = fetch_specific_pages(app, tenant_id, file_id, list(candidates))
        # Note: new_hits are Raw Vespa hits. 
        # get_page_numbers_with_tables handles this format difference.
        
        hit_map = {h["fields"]["page_number"]: h for h in new_hits}

        found = False
        for p in candidates:
            hit = hit_map.get(p)
            if not hit:
                checked.add(p)
                continue
            
            # Check content for table structure
            content = hit["fields"].get("content", "")
            if is_table_present(content):
                confirmed.add(p)
                found = True
            else:
                checked.add(p)

        if not found:
            break

    final_pages = sorted(confirmed)
    print(f"[INFO] Final table pages: {final_pages}")

    # Fetch final content for processing
    final_hits = fetch_specific_pages(app, tenant_id, file_id, final_pages)
    final_hits.sort(key=lambda h: h["fields"]["page_number"])
    return final_hits


# ======================================================
# 5. Parallel Processing (ORDER SAFE)
# ======================================================

def process_single_hit(args):
    idx, hit = args
    
    # Extract fields safely handling both Parsed and Raw formats
    if 'fields' in hit:
        title = hit["fields"].get("title", "")
        content = hit["fields"].get("content", "")
    else:
        title = hit.get("title", "")
        content = hit.get("content", "")

    result = structured_model.invoke(
        # Note: You need to define TABLE_RECONSTRUCTION_PROMPT somewhere in your code
        TABLE_RECONSTRUCTION_PROMPT.format(
            page_title=title,
            content=content
        )
    )

    parsed = TableResponse.model_validate(result)

    if parsed.table_markdown and not has_sufficient_numerical_data(parsed.table_markdown):
        return idx, TableResponse()

    return idx, parsed

# ======================================================
# 6. Execute (Example)
# ======================================================

TENANT_ID = "0f7a4b5f-a137-4a71-b518-1a04ba239b61"
FILE_ID = "9de6cf15-b315-4270-92c0-aa648b818949"
QUERY = "Net income from continuing operations grew by nearly 16% to 4.28B SAR, yet Zakat expenses dropped significantly from 1.27B SAR to 214M SAR; to what extent is this growth driven by one-time provision reversals rather than core operational efficiency?"
OUTPUT_FILE = "outputs/md_files/reconstructed_financial_tables_final_002.md"

# Ensure 'app' is initialized before calling this
hits = get_complete_table_sequence(app, TENANT_ID, FILE_ID, QUERY)

results = [None] * len(hits)


with ThreadPoolExecutor(max_workers=5) as pool:
    for idx, response in pool.map(process_single_hit, enumerate(hits)):
        results[idx] = response

with open(OUTPUT_FILE, "w", encoding="utf-8") as f:
    for r in results:
        if r and r.table_markdown:
            # 1. Write the Title
            if r.title:
                f.write(f"{r.title}\n\n")
            
            # 2. Write the Description (The Analysis)
            if r.description:
                f.write(f"**Analysis & Overview:**\n{r.description}\n\n")

            # 3. Write the Investor Relevance
            f.write(f"**Investor Relevance:** {r.investor_relevance}\n\n")
            
            # 4. Write the Table
            f.write(r.table_markdown + "\n\n")
            f.write("---\n\n") # Separator for readability

print(f"[SUCCESS] Extraction completed. Data saved to {OUTPUT_FILE}")


--- [DEBUG] Initial Search Results ---
Number of documents retrieved: 238
Number of documents returned: 20
record# 1
-------------  --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
id             index:pefund/0/ed36f463b6bdf4da9d7bf591
relevance      70.82115314006805
source         pefund
matchfeatures  {'closest(colbert_chunks)': {'type': 'tensor<int8>(context{},token{})', 'cells': []}, 'closest(question_embeddings)': {'type': 'tensor<bfloat16>(p{})', 'cells': {'0': 1.0}}, 'cross_max_sim': 101.17307591438293, 'semantic_question': 0.0}
title          CONSOLIDATED STATEMENT OF INCOME
content        All amounts in thousands of Saudi Riyals unless otherwise stated.

               **For the years ended 31 December**

               | | Note | 2024 | 2023 |
               | :--- | :--- | :--- | :--- |
 

INFO:google_genai.models:AFC is enabled with max remote calls: 10.
INFO:google_genai.models:AFC is enabled with max remote calls: 10.
INFO:google_genai.models:AFC is enabled with max remote calls: 10.
INFO:google_genai.models:AFC is enabled with max remote calls: 10.
INFO:google_genai.models:AFC is enabled with max remote calls: 10.
INFO:httpx:HTTP Request: POST https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-flash-lite:generateContent "HTTP/1.1 200 OK"
INFO:google_genai.models:AFC is enabled with max remote calls: 10.
INFO:httpx:HTTP Request: POST https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-flash-lite:generateContent "HTTP/1.1 200 OK"
INFO:google_genai.models:AFC is enabled with max remote calls: 10.
INFO:httpx:HTTP Request: POST https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-flash-lite:generateContent "HTTP/1.1 200 OK"
INFO:google_genai.models:AFC is enabled with max remote calls: 10.
INFO:httpx:HTTP Request: POST https://

[SUCCESS] Extraction completed. Data saved to md_files/reconstructed_financial_tables_final_002.md
