Of course. This is the perfect time to architect a scalable and robust pipeline. Moving from a single-table proof-of-concept to a multi-document, multi-table system requires careful planning around data flow, context management, and graph modeling.

Here is a robust, phased plan of action that addresses your requirements. We will focus on solving the multi-table/multi-document problem first, laying a strong foundation for integrating unstructured text later.

---

### **Phase 1: The Scalable Multi-Document & Multi-Table Pipeline**

The goal of this phase is to process a directory of PDF documents, extract all tables from each, convert them into a single, unified knowledge graph in Neo4j, and critically, **preserve the origin (provenance)** of every piece of data.

#### **Plan of Action**

**Step 1: Design a Hierarchical Data Model for Provenance**

Before writing any code, we must update our graph model. We'll introduce `Document` and `Table` nodes to track where every fact comes from.

*   **`(:Document)` Node**: Represents a single PDF file.
    *   **ID**: A unique identifier, e.g., a hash of the filename or the filename itself (`doc_id`).
    *   **Properties**: `{filePath: "...", fileName: "...", url: "...", processedAt: datetime()}`
*   **`(:Table)` Node**: Represents a single table within a document.
    *   **ID**: A composite ID to ensure uniqueness, e.g., `{doc_id}_table_{table_index}`.
    *   **Properties**: `{title: "...", pageNumber: 1, tableIndex: 0}`
*   **`(:FinancialMetric)` Node**: Your existing metric nodes (e.g., "Total Operating Revenues").
    *   **ID**: A normalized version of the metric's label (e.g., `total_operating_revenue`).
*   **Relationships to Connect the Hierarchy**:
    *   `(:Document)-[:CONTAINS_TABLE]->(:Table)`
    *   `(:Table)-[:REPORTS_ON]->(:FinancialMetric)`
    *   `(:FinancialMetric)-[r:HAS_VALUE_FOR_PERIOD]->(:TimePeriod)` (with properties like `amount` on the relationship `r`).

This model allows you to query your graph for data provenance, e.g., "Show me the exact table and document where this financial figure originated."

**Step 2: Implement the Main Processing Loop**

Your main script will orchestrate the entire pipeline. It will no longer work on a single hardcoded file but on a directory.



In [None]:
import os
from pathlib import Path

# Assume you have these classes/functions defined from previous steps
# from your_code import PDFProcessor, TableReconstructionAgent, GraphExtractor, Neo4jGraphConstructor

# --- Configuration ---
PDF_DIRECTORY = Path("i:/My Drive/M. Tech AI ML/AIML SEM 4/Dissertation/Project/downloaded_verizon_financial_pdfs/downloaded_verizon_quarterly_pdfs/2025/1Q")
NEO4J_URI = "bolt://localhost:7687"
NEO4J_USER = "neo4j"
NEO4J_PASSWORD = "your_password"

# --- Initialization ---
pdf_processor = PDFProcessor()
table_agent = TableReconstructionAgent()
graph_extractor = GraphExtractor()
graph_constructor = Neo4jGraphConstructor(NEO4J_URI, NEO4J_USER, NEO4J_PASSWORD)

# --- Main Loop ---
def process_all_documents():
    pdf_files = list(PDF_DIRECTORY.glob("*.pdf"))
    print(f"Found {len(pdf_files)} PDF documents to process.")

    for pdf_path in pdf_files:
        doc_id = pdf_path.stem # e.g., "Download Financial statements PDF_5a9b6fa4"
        print(f"\n--- Processing Document: {doc_id} ---")
        
        # Add document node to the graph
        graph_constructor.add_document_node(doc_id, str(pdf_path))

        # 1. Parse the PDF to get tables and text
        docling_data = pdf_processor.parse(pdf_path)
        
        # 2. Loop through each table found in the document
        for table_index, table_data in enumerate(docling_data.get('tables', [])):
            table_id = f"{doc_id}_table_{table_index}"
            print(f"  -> Processing Table #{table_index}")

            # Add table node and link it to the document
            graph_constructor.add_table_node(table_id, doc_id, table_data.get('metadata'))

            # 3. Reconstruct the table into markdown (your existing logic)
            reconstructed_table = table_agent.reconstruct_table(table_data)
            
            # 4. Extract graph data (nodes and relationships) from the markdown
            # This now needs the document and table context
            graph_json = graph_extractor.extract_graph_from_table(
                reconstructed_table, 
                doc_id, 
                table_id
            )
            
            # 5. Add the extracted data to Neo4j
            graph_constructor.add_data_to_graph(graph_json, table_id)

    graph_constructor.close()
    print("\n✅ Pipeline finished. All documents processed.")

# process_all_documents()



**Step 3: Update the Graph Extraction Prompt**

This is a critical change. The prompt must now receive the `doc_id` and `table_id` and be instructed to use them.

**New `graph_prompt` Template:**



In [None]:
prompt_template = """You are an expert financial analyst AI creating a knowledge graph.
The input contains a markdown table extracted from a specific table within a source document.

**CONTEXT:**
- **Document ID**: `{doc_id}`
- **Table ID**: `{table_id}`
- **Financial Context**: `{financial_context}`
- **Markdown Table**: `{markdown_table}`

**YOUR TASK:**
Create a JSON object with "nodes" and "relationships" representing the information.
1.  Create a `Document` node with the id `{doc_id}`.
2.  Create a `Table` node with the id `{table_id}`.
3.  For each financial metric (e.g., "Total Operating Revenues"), create a `FinancialMetric` node.
4.  Create a relationship from the `Document` node to the `Table` node.
5.  Create a relationship from the `Table` node to each `FinancialMetric` node it contains.
6.  Extract all financial values as relationships between `FinancialMetric` and `TimePeriod` nodes, as previously instructed.

**Output MUST be a single, clean JSON object.**

---
TEXT TO ANALYZE:
{input} 
---
"""




**Step 4: Create a `Neo4jGraphConstructor` Class**

This class will abstract away the Cypher queries, making your main loop clean and readable. It will use idempotent `MERGE` queries to prevent data duplication on re-runs.



In [None]:
from neo4j import GraphDatabase

class Neo4jGraphConstructor:
    def __init__(self, uri, user, password):
        self.driver = GraphDatabase.driver(uri, auth=(user, password))

    def close(self):
        self.driver.close()

    def execute_query(self, query, parameters=None):
        with self.driver.session() as session:
            return session.run(query, parameters)

    def add_document_node(self, doc_id, file_path):
        query = """
        MERGE (d:Document {id: $doc_id})
        ON CREATE SET d.filePath = $file_path, d.processedAt = datetime()
        """
        self.execute_query(query, {'doc_id': doc_id, 'file_path': file_path})

    def add_table_node(self, table_id, doc_id, metadata):
        query = """
        MERGE (t:Table {id: $table_id})
        ON CREATE SET t.title = $title, t.page = $page
        WITH t
        MATCH (d:Document {id: $doc_id})
        MERGE (d)-[:CONTAINS_TABLE]->(t)
        """
        self.execute_query(query, {
            'table_id': table_id,
            'doc_id': doc_id,
            'title': metadata.get('title'),
            'page': metadata.get('page')
        })

    def add_data_to_graph(self, graph_json, table_id):
        # This method will contain loops to iterate through nodes and relationships
        # from the LLM's output and generate the appropriate MERGE queries.
        # It will link FinancialMetric nodes to the current table_id.
        pass # Implementation would follow here



---

### **Phase 2: Integrating Unstructured Text (The Next Step)**

Once Phase 1 is working reliably, integrating text is straightforward because the foundation is already built.

1.  **Add a Text Extraction Step**: In your main loop, after processing tables, you'll process the text fragments from `docling_data.get('body', [])`.
2.  **Use a General Extraction Prompt**: Create a new prompt for unstructured text. This prompt will be more general, asking the LLM to identify any entities (`Person`, `Organization`, `Location`, `Product`) and their relationships.
3.  **Feed to the Same Constructor**: The nodes and relationships extracted from the text will be fed into the *same* `graph_constructor.add_data_to_graph()` method. The power of `MERGE` in Neo4j will automatically link new information to existing nodes. For example, if the text mentions "Verizon" and a table created a `(:Company {name: "Verizon"})` node, Neo4j will merge them, enriching the existing node with new relationships from the text.

This plan provides a clear, step-by-step path to building the robust, multi-source knowledge graph you envision.

Similar code found with 1 license type

---

##### 1) Ingest & Parse (pyMuPDF4LLM + Docling), preserve provenance

In [None]:
import os, re, json, uuid
from dataclasses import dataclass
from typing import List, Dict, Any, Optional

# We only need Docling for this refined pipeline
try:
    from docling.document_converter import DocumentConverter
    DOCLING = True
except ImportError:
    print("⚠️ Docling library not found. Please install it to proceed.")
    DocumentConverter = None
    DOCLING = False

@dataclass
class IngestConfig:
    # The root output directory
    out_dir: str = "output/parsed_pdfs"

# --- UTILITY FUNCTIONS ---
def _ensure_dir(p: str):
    """Ensures that a directory exists, creating it if necessary."""
    os.makedirs(p, exist_ok=True)

def _slug(s: str) -> str:
    """Creates a clean, URL-friendly slug from a file path."""
    basename = os.path.splitext(os.path.basename(s))[0]
    return re.sub(r"[^A-Za-z0-9]+", "-", basename).strip("-").lower()

# --- CORE EXTRACTION LOGIC ---
def extract_with_docling(pdf_path: str) -> Optional[Dict[str, Any]]:
    """
    Extracts structured JSON from a PDF using Docling.
    Returns the JSON data or None if extraction fails.
    """
    if not DOCLING:
        return None
    
    try:
        conv = DocumentConverter()
        res = conv.convert(pdf_path)
        doc = getattr(res, "document", None) or res
        
        # Prioritize dictionary methods, then fall back to JSON string methods
        for method_name in ("to_dict", "export_to_dict", "as_dict"):
            if hasattr(doc, method_name):
                return getattr(doc, method_name)()
        
        for method_name in ("to_json", "export_to_json"):
            if hasattr(doc, method_name):
                return json.loads(getattr(doc, method_name)())
                
    except Exception as e:
        print(f"  ❌ Error processing with Docling: {e}")
        return None
    
    return None

# --- REFINED INGESTION PIPELINE ---
def run_ingest_refined(pdf_path: str, root_input_dir: str, cfg: IngestConfig) -> Optional[Dict[str, Any]]:
    """
    A simplified pipeline that extracts Docling JSON and saves it in a mirrored directory structure.
    """
    doc_id = _slug(pdf_path)
    
    # Determine the relative path from the root input to the PDF's directory
    relative_dir = os.path.dirname(os.path.relpath(pdf_path, root_input_dir))
    
    # Create the mirrored output directory structure
    # e.g., output/parsed_pdfs/2025/1Q/download-financial-statements-pdf-5a9b6fa4/
    final_out_dir = os.path.join(cfg.out_dir, relative_dir, doc_id)
    
    # --- Caching Check ---
    summary_path = os.path.join(final_out_dir, f"{doc_id}.summary.json")
    if os.path.exists(summary_path):
        print(f"SKIP (already processed): {doc_id}")
        with open(summary_path, "r", encoding="utf-8") as f:
            return json.load(f)
    
    print(f"  -> Processing with Docling...")
    _ensure_dir(final_out_dir)
    
    # --- Extraction ---
    docling_json = extract_with_docling(pdf_path)
    
    if not docling_json:
        print(f"  ❌ Failed to extract Docling JSON for {doc_id}.")
        return None
        
    # --- Save Artifacts ---
    docl_json_path = os.path.join(final_out_dir, f"{doc_id}.docling.json")
    with open(docl_json_path, "w", encoding="utf-8") as f:
        json.dump(docling_json, f, ensure_ascii=False, indent=2)
    print(f"  ✅ Saved Docling JSON to: {docl_json_path}")

    # --- Create Final Summary ---
    summary = {
        "doc_id": doc_id,
        "pdf_path": pdf_path,
        "out_dir": final_out_dir,
        "artifacts": {
            "docling_json": docl_json_path,
        },
        "stats": {
            "docling_available": True
        }
    }
    with open(summary_path, "w", encoding="utf-8") as f:
        json.dump(summary, f, ensure_ascii=False, indent=2)
        
    return summary

In [3]:
import os
import glob

# --- Configuration ---
ROOT = r"i:\My Drive\M. Tech AI ML\AIML SEM 4\Dissertation\Project"
# Define the base directory for all input PDFs. This is used to create the mirrored output structure.
ROOT_INPUT_DIR = os.path.join(ROOT, "downloaded_verizon_financial_pdfs")
# Define the single output directory for all parsed results.
OUT_DIR = os.path.join(ROOT, "output", "parsed_pdfs")

# --- Dynamic File Discovery ---
# The target directory to search for all quarterly reports.
QUARTERLY_REPORTS_DIR = os.path.join(ROOT_INPUT_DIR, "downloaded_verizon_quarterly_pdfs")

print(f"🔍 Searching for all PDF files in: {QUARTERLY_REPORTS_DIR}")
# Use glob to recursively find all files ending with .pdf
all_pdf_files = glob.glob(os.path.join(QUARTERLY_REPORTS_DIR, '**', '*.pdf'), recursive=True)
print(f"  -> Found {len(all_pdf_files)} total PDF files.")

# --- Filter by Year ---
# allowed_years = {'2023', '2024', '2025'}
allowed_years = {'2024', '2025'}
print(f"🎯 Filtering for years: {sorted(list(allowed_years))}")

filtered_pdf_files = []
for path in all_pdf_files:
    # Check if any part of the path is one of the allowed years
    path_parts = set(path.split(os.path.sep))
    if not allowed_years.isdisjoint(path_parts):
        filtered_pdf_files.append(path)

print(f"  -> Found {len(filtered_pdf_files)} files to process from the specified years.")


# --- Run the Ingestion Pipeline ---
# The config is now much simpler.
cfg = IngestConfig(out_dir=OUT_DIR)

all_results = []
# Loop through the dynamically discovered and filtered list of files.
for pdf_path in filtered_pdf_files:
    if not os.path.exists(pdf_path):
        print(f"⚠️  File not found, skipping: {pdf_path}")
        continue
        
    print(f"\nProcessing Ingest for: {os.path.basename(pdf_path)}")
    # Call the refined function with the root input directory
    result = run_ingest_refined(pdf_path, ROOT_INPUT_DIR, cfg)
    
    if result:
        print(f"  ✅ Ingest successful for: {result['doc_id']}")
        all_results.append(result)
    else:
        print(f"  ❌ Ingest failed for: {pdf_path}")

print(f"\n\n--- PIPELINE COMPLETE ---")
print(f"Successfully processed {len(all_results)} out of {len(filtered_pdf_files)} documents.")

🔍 Searching for all PDF files in: i:\My Drive\M. Tech AI ML\AIML SEM 4\Dissertation\Project\downloaded_verizon_financial_pdfs\downloaded_verizon_quarterly_pdfs
  -> Found 209 total PDF files.
🎯 Filtering for years: ['2024', '2025']
  -> Found 44 files to process from the specified years.

Processing Ingest for: Download Webcast Transcript PDF_0773ea78.pdf
SKIP (already processed): download-webcast-transcript-pdf-0773ea78
  ✅ Ingest successful for: download-webcast-transcript-pdf-0773ea78

Processing Ingest for: Download Formal Remarks PDF_e2835efd.pdf
SKIP (already processed): download-formal-remarks-pdf-e2835efd
  ✅ Ingest successful for: download-formal-remarks-pdf-e2835efd

Processing Ingest for: Download Webcast Presentation PDF_a1b3e2e1.pdf
SKIP (already processed): download-webcast-presentation-pdf-a1b3e2e1
  ✅ Ingest successful for: download-webcast-presentation-pdf-a1b3e2e1

Processing Ingest for: Download Infographic PDF_88d85bda.pdf
SKIP (already processed): download-infograp

In [4]:
# Final corrected TableReconstructionAgent with rate limit handling
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_core.prompts import ChatPromptTemplate
import json
import re
import time  # Import the time module
import random # Import the random module
from typing import Dict, Any, List, Optional

def save_prepared_data(prepared_data: Dict[str, Any], ingest_summary: Dict[str, Any]) -> Optional[str]:
    """
    Saves the prepared document data (with reconstructed tables and text chunks)
    to a JSON file in the document's specific output directory.
    """
    doc_id = prepared_data.get("doc_id")
    out_dir = ingest_summary.get("out_dir")

    if not doc_id or not out_dir:
        print("❌ ERROR: Cannot save prepared data. Missing 'doc_id' or 'out_dir'.")
        return None

    try:
        # Ensure the output directory from the ingest summary exists
        os.makedirs(out_dir, exist_ok=True)
        
        # Define the output path for the prepared data JSON file
        output_path = os.path.join(out_dir, f"{doc_id}.prepared.json")
        
        # Write the dictionary to the JSON file
        with open(output_path, "w", encoding="utf-8") as f:
            json.dump(prepared_data, f, ensure_ascii=False, indent=2)
            
        print(f"  -> ✅ Saved prepared data to: {output_path}")
        return output_path

    except Exception as e:
        print(f"  -> ❌ ERROR: Failed to save prepared data for doc_id '{doc_id}'. Reason: {e}")
        return None

class DocumentProcessor:
    """
    Orchestrates the processing of a single document's parsed data.
    *** This version is hardened against empty 'prov' lists. ***
    """
    def __init__(self, table_agent: "TableReconstructionAgent"):
        self.table_agent = table_agent
        print("✅ DocumentProcessor initialized.")

    def process_document(self, ingest_summary: Dict[str, Any]) -> Optional[Dict[str, Any]]:
        doc_id = ingest_summary.get("doc_id")
        docling_json_path = ingest_summary.get("artifacts", {}).get("docling_json")

        if not doc_id or not docling_json_path or not os.path.exists(docling_json_path):
            print(f"❌ ERROR: Invalid ingest summary for doc_id '{doc_id}'.")
            return None

        print(f"\n--- Preparing Document for Graph Extraction: {doc_id} ---")
        
        with open(docling_json_path, "r", encoding="utf-8") as f:
            docling_data = json.load(f)

        # 1. Process tables using 'prov' for metadata
        reconstructed_tables = []
        raw_tables = docling_data.get("tables", [])
        if raw_tables:
            print(f"  -> Found {len(raw_tables)} tables to process.")
            for i, raw_table in enumerate(raw_tables):
                page_no = "N/A"
                prov_list = raw_table.get("prov", [])
                # *** HARDENING STEP ***: Check if the list is not empty before accessing it.
                if prov_list and isinstance(prov_list[0], dict):
                    page_no = prov_list[0].get("page_no", "N/A")
                
                table_id = f"{doc_id}_p{page_no}_t{i}"
                
                reconstructed_result = self.table_agent.reconstruct_table(raw_table, i)
                reconstructed_result["table_id"] = table_id
                reconstructed_tables.append(reconstructed_result)
        else:
            print("  -> No tables found in this document.")

        # 2. Process text using 'prov' for metadata
        text_chunks_with_meta = []
        text_elements = docling_data.get("texts", [])
        if text_elements:
            for element in text_elements:
                if isinstance(element, dict) and "text" in element:
                    page_no = "N/A"
                    prov_list = element.get("prov", [])
                    # *** HARDENING STEP ***: Check if the list is not empty here as well.
                    if prov_list and isinstance(prov_list[0], dict):
                        page_no = prov_list[0].get("page_no", "N/A")
                    
                    text_chunks_with_meta.append({
                        "text": element["text"],
                        "page_number": page_no,
                        "original": element.get("orig"),
                        "label": element.get("label")
                    })
            print(f"  -> Extracted {len(text_chunks_with_meta)} text chunks with metadata.")
        else:
            print("  -> No text elements found in this document.")

        # 3. Return the final prepared data package
        prepared_data = {
            "doc_id": doc_id,
            "source_pdf": ingest_summary.get("pdf_path"),
            "reconstructed_tables": reconstructed_tables,
            "text_chunks": text_chunks_with_meta
        }
        
        print(f"  -> ✅ Document '{doc_id}' prepared successfully.")
        return prepared_data


class TableReconstructionAgent:
    """
    An agent that intelligently reconstructs tables from raw Docling JSON
    by iteratively processing chunks of data and preserving financial context.
    """
    def __init__(self, model_name: str = "gemma-3-27b-it"):
        self.model_name = model_name
        self.llm = ChatGoogleGenerativeAI(
            model=self.model_name, 
            temperature=0.0,
            max_output_tokens=8192,
            max_retries=1 
        )
        self.prompt = self._create_prompt()
        print(f"✅ Agent initialized with Gemini model: {self.model_name}")

    def _create_prompt(self):
        """
        Creates a prompt that works with models without system instruction support
        by combining system instructions into the human prompt.
        """
        system_instructions = """You are an expert financial data analyst reconstructing tables from raw data.  

Your task:
1. Process raw table cell data chunks iteratively
2. Build a clean, structured table in MARKDOWN format
3. Preserve financial context (units, time periods, etc.)
4. Maintain proper headers and data alignment

Return JSON with this structure:
{{
  "markdown_table": "| Header 1 | Header 2 |\\n|----------|----------|\\n| Data 1   | Data 2   |",
  "financial_context": {{
    "units": "millions USD",
    "time_periods": ["Q1 2025", "Q1 2024"],
    "currency": "USD",
    "notes": ["(dollars in millions, except per share amounts)"]
  }},
  "table_metadata": {{
    "title": "Condensed Consolidated Statements of Income",
    "page": 1,
    "sections": ["Operating Revenues", "Operating Expenses"],
    "total_rows": 28,
    "total_cols": 4
  }}
}}

Focus on:
- Clear markdown table structure
- Proper header identification
- Financial section groupings
- Unit and time period preservation
- Data type recognition (amounts, percentages, text)"""

        human_prompt = f"""{system_instructions}

Current table state: {{existing_table}}

Next raw data chunk: {{raw_chunk}}

Process this chunk and update the markdown table structure."""

        return ChatPromptTemplate.from_template(human_prompt)

    def _safe_json_loads(self, json_content: str) -> Dict[str, Any]:
        """Safely parse JSON, cleaning it if necessary."""
        print(f"🔍 DEBUG: Attempting to parse JSON content...")
        try:
            match = re.search(r'```json\s*(\{.*?\})\s*```', json_content, re.DOTALL)
            if match:
                print("🔍 DEBUG: Found JSON block inside markdown.")
                json_content = match.group(1)
            
            return json.loads(json_content)
        except json.JSONDecodeError as e:
            print(f"❌ ERROR: JSON parsing failed: {e}")
            print(f"Corrupted JSON content preview: {json_content[:500]}")
            return {"markdown_table": "PARSING_ERROR"}

    def _chunk_table(self, table_data: Dict[str, Any], chunk_size: int = 20) -> List[Dict[str, Any]]:
        """Splits the raw table cells into manageable chunks."""
        cells = table_data.get("data", {}).get("table_cells", [])
        if not cells:
            return []
        
        chunks = []
        for i in range(0, len(cells), chunk_size):
            chunks.append({"cells": cells[i:i + chunk_size]})
        return chunks

    def reconstruct_table(self, raw_table: Dict[str, Any], table_index: int) -> Dict[str, Any]:
        """
        Processes a single raw table iteratively to build a markdown representation.
        *** This version uses a proactive, fixed-wait strategy after each chunk. ***
        """
        print(f"\n--- Reconstructing Table # {table_index} (Markdown Format) ---")
        
        # --- METADATA ENHANCEMENT using 'prov' ---
        page_no = "N/A"
        prov_list = raw_table.get("prov", [])
        if prov_list and isinstance(prov_list[0], dict):
            page_no = prov_list[0].get("page_no", "N/A")

        source_metadata = raw_table.get("metadata", {})
        
        reconstructed_table = {
            "markdown_table": "",
            "financial_context": {},
            "table_metadata": {
                "page": page_no, # Prioritize page number from 'prov'
                "title": source_metadata.get("title")
            }
        }
        
        table_chunks = self._chunk_table(raw_table)
        if not table_chunks:
            print(f"  -> No cells found in table. Skipping.")
            return reconstructed_table

        print(f"  -> Divided into {len(table_chunks)} chunks for processing.")

        for i, chunk in enumerate(table_chunks, 1):
            print(f"    -> Processing chunk {i}/{len(table_chunks)}...")
            
            try:
                existing_table_str = json.dumps(reconstructed_table, indent=2)
                raw_chunk_str = json.dumps(chunk, indent=2)
                
                prompt_value = self.prompt.invoke({
                    "existing_table": existing_table_str,
                    "raw_chunk": raw_chunk_str
                })
                
                resp = self.llm.invoke(prompt_value)
                llm_output = self._safe_json_loads(resp.content.strip())
                
                if "markdown_table" in llm_output and llm_output["markdown_table"] != "PARSING_ERROR":
                    reconstructed_table = llm_output
                    reconstructed_table["table_metadata"]["page"] = page_no
                    if "title" not in reconstructed_table["table_metadata"]:
                         reconstructed_table["table_metadata"]["title"] = source_metadata.get("title")
                    
                    rows = len(reconstructed_table.get("markdown_table", "").split('\n'))
                    print(f"      -> ✅ Chunk merged. Table now has ~{rows} rows.")
                else:
                    print(f"      -> ⚠️ Skipping update due to invalid response from LLM.")
            
            except Exception as e:
                print(f"      -> ❌ Unhandled error on chunk {i}: {e}")
                print(f"      -> Moving to the next chunk.")

            # --- Proactive Wait Logic ---
            # Wait after every chunk attempt (success or failure) to avoid rate limits.
            # Do not wait after the very last chunk.
            if i < len(table_chunks):
                wait_time = 33 + random.uniform(0, 2)
                print(f"      -> ⏱️ Waiting for {wait_time:.2f} seconds before next chunk...")
                time.sleep(wait_time)
            
        print(f"  -> ✅ Table #{table_index} reconstruction completed!")
        return reconstructed_table

    def process_first_table_only(self, docling_data: Dict[str, Any]) -> Dict[str, Any]:
        """Process only the first table in the Docling document for debugging."""
        tables = docling_data.get("tables", [])
        if not tables:
            print("❌ No tables found in docling_data")
            return {}
        
        first_table = tables[0]
        print(f"🔍 Found first table with {len(first_table.get('data', {}).get('table_cells', []))} cells and {first_table.get('data', {}).get('num_rows', 0)} rows.")
        
        reconstructed = self.reconstruct_table(first_table, 0)
        
        if reconstructed and reconstructed.get("markdown_table"):
            markdown_table = reconstructed.get("markdown_table", "")
            print("\n" + "="*50)
            print("📊 FINAL RECONSTRUCTED TABLE")
            print("="*50)
            print(markdown_table)
            print("="*50)
            print(f"\n📋 Final financial context: {reconstructed.get('financial_context', {})}")
            print(f"📋 Final metadata: {reconstructed.get('table_metadata', {})}")
            return reconstructed
        else:
            print("❌ Failed to reconstruct first table.")
            return {}



In [5]:
all_results

[{'doc_id': 'download-webcast-transcript-pdf-0773ea78',
  'pdf_path': 'i:\\My Drive\\M. Tech AI ML\\AIML SEM 4\\Dissertation\\Project\\downloaded_verizon_financial_pdfs\\downloaded_verizon_quarterly_pdfs\\2024\\1Q\\Download Webcast Transcript PDF_0773ea78.pdf',
  'out_dir': 'i:\\My Drive\\M. Tech AI ML\\AIML SEM 4\\Dissertation\\Project\\output\\parsed_pdfs\\downloaded_verizon_quarterly_pdfs\\2024\\1Q\\download-webcast-transcript-pdf-0773ea78',
  'artifacts': {'docling_json': 'i:\\My Drive\\M. Tech AI ML\\AIML SEM 4\\Dissertation\\Project\\output\\parsed_pdfs\\downloaded_verizon_quarterly_pdfs\\2024\\1Q\\download-webcast-transcript-pdf-0773ea78\\download-webcast-transcript-pdf-0773ea78.docling.json'},
  'stats': {'docling_available': True}},
 {'doc_id': 'download-formal-remarks-pdf-e2835efd',
  'pdf_path': 'i:\\My Drive\\M. Tech AI ML\\AIML SEM 4\\Dissertation\\Project\\downloaded_verizon_financial_pdfs\\downloaded_verizon_quarterly_pdfs\\2024\\1Q\\Download Formal Remarks PDF_e2835efd.

In [None]:
# Test with rate limit handling
try:
    print("🔍 DEBUG: Loading Docling data...")
    DOC_JSON_PATH = r"I:\My Drive\M. Tech AI ML\AIML SEM 4\Dissertation\Project\output\parsed_pdfs\download-financial-statements-pdf-5a9b6fa4\download-financial-statements-pdf-5a9b6fa4.docling.json"
    
    with open(DOC_JSON_PATH, "r", encoding="utf-8") as f:
        docling_data = json.load(f)
    
    print("🔍 DEBUG: Initializing agent...")
    agent = TableReconstructionAgent()
    
    print("🔍 DEBUG: Processing first table...")
    first_table_result = agent.process_first_table_only(docling_data)
    
    if first_table_result and first_table_result.get("markdown_table"):
        print(f"\n✅ SUCCESS: First table processed successfully!")
        final_rows = len(first_table_result.get('markdown_table', '').split('\n'))
        print(f"📊 Final markdown table has {final_rows} lines.")
    else:
        print(f"\n❌ FAILED: Could not process first table")
        
except Exception as e:
    print(f"\n❌ Error during table reconstruction: {e}")
    import traceback
    traceback.print_exc()

🔍 DEBUG: Loading Docling data...
🔍 DEBUG: Initializing agent...
✅ Agent initialized with Gemini model: gemma-3-27b-it
🔍 DEBUG: Processing first table...
🔍 Found first table with 95 cells and 28 rows.

--- Reconstructing Table # 0 (Markdown Format) ---
  -> Divided into 5 chunks for processing.
    -> Processing chunk 1/5...
🔍 DEBUG: Attempting to parse JSON content...
🔍 DEBUG: Found JSON block inside markdown.
      -> ✅ Chunk merged. Table now has ~9 rows.
    -> ⏱️  Waiting for 40 seconds to avoid rate limits...
🔍 DEBUG: Attempting to parse JSON content...
🔍 DEBUG: Found JSON block inside markdown.
      -> ✅ Chunk merged. Table now has ~9 rows.
    -> ⏱️  Waiting for 40 seconds to avoid rate limits...
    -> Processing chunk 2/5...
    -> Processing chunk 2/5...
🔍 DEBUG: Attempting to parse JSON content...
🔍 DEBUG: Found JSON block inside markdown.
      -> ✅ Chunk merged. Table now has ~14 rows.
    -> ⏱️  Waiting for 40 seconds to avoid rate limits...
🔍 DEBUG: Attempting to parse 

In [None]:
# --- Example Usage ---
# (Assuming 'all_results' from the previous ingestion cell is available)

# 1. Initialize the agent that the processor will use
# Note: This can be time-consuming if you run it for every document,
# so we initialize it once and pass it to the processor.
table_recon_agent = TableReconstructionAgent()

# 2. Initialize the processor
doc_processor = DocumentProcessor(table_agent=table_recon_agent)

# 3. Process the first document from our ingestion results as a test
if all_results:
    first_doc_summary = all_results[0]
    prepared_document_data = doc_processor.process_document(first_doc_summary)

    # Print a summary of the prepared data
    if prepared_document_data:
        print("\n--- Example Prepared Data Output ---")
        print(f"Document ID: {prepared_document_data['doc_id']}")
        print(f"Tables Reconstructed: {len(prepared_document_data['reconstructed_tables'])}")
        print(f"Text Chunks Found: {len(prepared_document_data['text_chunks'])}")
        
        # --- SAVE THE PREPARED DATA ---
        save_prepared_data(prepared_document_data, first_doc_summary)

        # Preview the first table's data
        if prepared_document_data['reconstructed_tables']:
            first_table = prepared_document_data['reconstructed_tables'][0]
            print("\nPreview of first table:")
            print(f"  Table ID: {first_table['table_id']}")
            print(f"  Markdown Preview: {first_table.get('markdown_table', 'N/A')[:100]}...")
else:
    print("\n⚠️ Skipping example usage because 'all_results' is empty. Please run the ingestion cell first.")

✅ Agent initialized with Gemini model: gemma-3-27b-it
✅ DocumentProcessor initialized.

--- Preparing Document for Graph Extraction: download-webcast-transcript-pdf-0773ea78 ---
  -> No tables found in this document.
  -> Extracted 339 text chunks with metadata.
  -> ✅ Document 'download-webcast-transcript-pdf-0773ea78' prepared successfully.

--- Example Prepared Data Output ---
Document ID: download-webcast-transcript-pdf-0773ea78
Tables Reconstructed: 0
Text Chunks Found: 339


In [51]:
# 1. Initialize the agent that the processor will use
table_recon_agent = TableReconstructionAgent()

# 2. Initialize the processor
doc_processor = DocumentProcessor(table_agent=table_recon_agent)

# 3. Define the specific document to find and process
TARGET_DOC_ID = "download-financial-statements-pdf-ad7baa45"
print(f"🎯 Searching for and processing specific document: {TARGET_DOC_ID}")

target_doc_summary = None
if all_results:
    # Find the specific document summary from the ingestion results
    for summary in all_results:
        if summary.get("doc_id") == TARGET_DOC_ID:
            target_doc_summary = summary
            print(f"🎯 Found document summary for ID: {TARGET_DOC_ID}")
            break # Found it, no need to search further

    if target_doc_summary:
        # Process the specific document we found
        prepared_document_data = doc_processor.process_document(target_doc_summary)

        # Print a summary of the prepared data
        if prepared_document_data:
            print("\n--- ✅ Processing Complete: Prepared Data Output ---")
            print(f"Document ID: {prepared_document_data['doc_id']}")
            print(f"Tables Reconstructed: {len(prepared_document_data['reconstructed_tables'])}")
            print(f"Text Chunks Found: {len(prepared_document_data['text_chunks'])}")
            
            save_prepared_data(prepared_document_data, target_doc_summary)
            
            # Preview the first table's data
            if prepared_document_data.get('reconstructed_tables'):
                first_table = prepared_document_data['reconstructed_tables'][0]
                print("\nPreview of first reconstructed table:")
                print(f"  Table ID: {first_table.get('table_id', 'N/A')}")
                print(f"  Page: {first_table.get('table_metadata', {}).get('page', 'N/A')}")
                print(f"  Markdown Preview: \n{first_table.get('markdown_table', 'N/A')[:400]}...")
            else:
                print("\n⚠️ No tables were reconstructed for this document.")
    else:
        print(f"\n❌ ERROR: Could not find document with ID '{TARGET_DOC_ID}' in 'all_results'.")
else:
    print("\n⚠️ Skipping example usage because 'all_results' is empty. Please run the ingestion cell first.")

✅ Agent initialized with Gemini model: gemma-3-27b-it
✅ DocumentProcessor initialized.
🎯 Searching for and processing specific document: download-financial-statements-pdf-ad7baa45
🎯 Found document summary for ID: download-financial-statements-pdf-ad7baa45

--- Preparing Document for Graph Extraction: download-financial-statements-pdf-ad7baa45 ---
  -> Found 10 tables to process.

--- Reconstructing Table # 0 (Markdown Format) ---
  -> Divided into 5 chunks for processing.
    -> Processing chunk 1/5...
🔍 DEBUG: Attempting to parse JSON content...
🔍 DEBUG: Found JSON block inside markdown.
      -> ✅ Chunk merged. Table now has ~8 rows.
      -> ⏱️ Waiting for 20.03 seconds before next chunk...
    -> Processing chunk 2/5...
🔍 DEBUG: Attempting to parse JSON content...
🔍 DEBUG: Found JSON block inside markdown.
      -> ✅ Chunk merged. Table now has ~13 rows.
      -> ⏱️ Waiting for 20.99 seconds before next chunk...
    -> Processing chunk 3/5...
🔍 DEBUG: Attempting to parse JSON conte

In [45]:
from IPython.display import display, Markdown

# Assuming 'prepared_document_data' is available from the previous cell run
if prepared_document_data and prepared_document_data.get('reconstructed_tables'):
    first_table = prepared_document_data['reconstructed_tables'][1]
    first_table_markdown = first_table.get('markdown_table', '*Markdown table not found.*')
    
    print("--- 📊 Displaying Reconstructed Markdown Table ---")
    # This will render the string as a visual markdown table
    display(Markdown(first_table_markdown))
else:
    print("⚠️ No reconstructed table available to display.")


--- 📊 Displaying Reconstructed Markdown Table ---


|               | 3/31/24  | 12/31/23 | $ Change | 
|----------------|----------|----------|----------|
| Unaudited      |          |          |          |
| Assets          |          |          |          |
| Current assets  |          |          |          |
| Cash and cash equivalents | $ 2,365  | $ 2,065  | $ 300   |
| Accounts receivable | 26,380   | 26,102   | 278     |
| Less Allowance for credit losses | 1,061    | 1,017    | 44      |
| Accounts receivable, net | 25,319   | 25,085   | 234     |
| Inventories     | 2,076    | 2,057    | 19      |
| Prepaid expenses and other | 8,197    | 7,607    | 590     |
| Total current assets | 37,957   | 36,814   | 1,143   |
| Property, plant and equipment | 322,266  | 320,108  | 2,158   |
| Less Accumulated depreciation | 322,266  | 320,108  | 2,158   |
| Property, plant and equipment, net | 214,403  | 211,798  | 2,605   |
| Investments in unconsolidated businesses | 941      | 953      | (12)    |
| Wireless licenses | 156,111  | 155,667  | 444     |
| Goodwill | 22,842   | 22,843   | (1)     |
| Other intangible assets, net | 10,835   | 11,057   | (222)   |
| Operating lease right-of-use assets | 24,351   | 24,726   | (375)   |
| Other assets | 19,258   | 19,885   | (627)   |
| Total assets | $ 380,158 | $ 380,255 | $ (97)   |
| Liabilities and Equity |          |          |          |
| Current liabilities |          |          |          |
| Debt maturing within one year | $ 15,594 | $ 12,973 | $ 2,621   |
| Accounts payable and accrued liabilities | 20,139   | 23,453   | (3,314)   |
| Current operating lease liabilities | 4,282    | 4,266    | 16      |
| Other current liabilities | 13,616   | 12,531   | 1,085   |
| Total current liabilities | 53,631   | 53,223   | 408     |
| Long-term debt | 136,104  | 137,701  | (1,597)   |
| Employee benefit obligations | 12,805   | 13,189   | (384)   |
| Deferred income taxes | 45,980   | 45,781   | 199     |
| Non-current operating lease liabilities | 19,654   | 20,002   | (348)   |
| Other liabilities | 16,258   | 16,560   | (302)   |
| Total long-term liabilities | 230,801  | 233,233  | (2,432)   |
| Equity |          |          |          |
| Common stock | 429      | 429      | -       |
| Additional paid in capital | 13,571   | 13,631   | (60)    |
| Retained earnings | 84,714   | 82,915   | 1,799   |
| Accumulated other comprehensive loss | (1,199)  | (1,380)  | 181     |
| Common stock in treasury, at cost | (3,602)  |          |          |
| Deferred compensation - employee stock ownership plans and other | 421      | 656      | (235)   |
| Noncontrolling interests | 1,392    | 1,369    | 23      |
| Total equity | 95,726   | 93,799   | 1,927   |
| Total liabilities and equity | $ 380,158 | $ 380,255 | $ (97)   |

In [None]:
save_prepared_data(prepared_document_data, target_doc_summary)

  -> ✅ Saved prepared data to: i:\My Drive\M. Tech AI ML\AIML SEM 4\Dissertation\Project\output\parsed_pdfs\downloaded_verizon_quarterly_pdfs\2024\1Q\download-webcast-transcript-pdf-0773ea78\download-financial-statements-pdf-ad7baa45.prepared.json


'i:\\My Drive\\M. Tech AI ML\\AIML SEM 4\\Dissertation\\Project\\output\\parsed_pdfs\\downloaded_verizon_quarterly_pdfs\\2024\\1Q\\download-webcast-transcript-pdf-0773ea78\\download-financial-statements-pdf-ad7baa45.prepared.json'

In [6]:
import os
from tqdm.notebook import tqdm # Use tqdm.notebook for a nice progress bar in Jupyter

# --- Batch Processing Setup ---
# Initialize agents once to be reused for all documents
table_recon_agent = TableReconstructionAgent()
doc_processor = DocumentProcessor(table_agent=table_recon_agent)

processed_count = 0
skipped_count = 0
failed_count = 0

print(f"\n--- Starting Batch Processing for {len(all_results)} Documents ---")

if all_results:
    # Wrap the all_results list with tqdm for a progress bar
    for summary in tqdm(all_results, desc="Processing Documents"):
        doc_id = summary.get("doc_id")
        out_dir = summary.get("out_dir")

        if not doc_id or not out_dir:
            print(f"⚠️ Skipping an item due to missing 'doc_id' or 'out_dir'.")
            failed_count += 1
            continue

        # --- Caching Check ---
        # Check if the prepared data file already exists
        prepared_file_path = os.path.join(out_dir, f"{doc_id}.prepared.json")
        if os.path.exists(prepared_file_path):
            # print(f"  -> SKIP (already processed): {doc_id}")
            skipped_count += 1
            continue
        
        # --- Process the Document ---
        print(f"\nProcessing document: {doc_id}")
        prepared_document_data = doc_processor.process_document(summary)

        # --- Save the Result ---
        if prepared_document_data:
            save_path = save_prepared_data(prepared_document_data, summary)
            if save_path:
                processed_count += 1
            else:
                print(f"  -> ❌ FAILED to save: {doc_id}")
                failed_count += 1
        else:
            print(f"  -> ❌ FAILED to process: {doc_id}")
            failed_count += 1

    print("\n\n--- ✅ BATCH PROCESSING COMPLETE ---")
    print(f"  - Successfully Processed: {processed_count}")
    print(f"  - Skipped (already done): {skipped_count}")
    print(f"  - Failed: {failed_count}")
    print("------------------------------------")

else:
    print("\n⚠️ Cannot start batch processing because 'all_results' is empty. Please run the ingestion cell first.")

✅ Agent initialized with Gemini model: gemma-3-27b-it
✅ DocumentProcessor initialized.

--- Starting Batch Processing for 44 Documents ---


Processing Documents:   0%|          | 0/44 [00:00<?, ?it/s]


Processing document: download-financial-operating-information-pdf-2f954363

--- Preparing Document for Graph Extraction: download-financial-operating-information-pdf-2f954363 ---
  -> Found 20 tables to process.

--- Reconstructing Table # 0 (Markdown Format) ---
  -> Divided into 1 chunks for processing.
    -> Processing chunk 1/1...
🔍 DEBUG: Attempting to parse JSON content...
🔍 DEBUG: Found JSON block inside markdown.
      -> ✅ Chunk merged. Table now has ~1 rows.
  -> ✅ Table #0 reconstruction completed!

--- Reconstructing Table # 1 (Markdown Format) ---
  -> Divided into 15 chunks for processing.
    -> Processing chunk 1/15...
🔍 DEBUG: Attempting to parse JSON content...
🔍 DEBUG: Found JSON block inside markdown.
      -> ✅ Chunk merged. Table now has ~6 rows.
      -> ⏱️ Waiting for 34.93 seconds before next chunk...
    -> Processing chunk 2/15...
🔍 DEBUG: Attempting to parse JSON content...
🔍 DEBUG: Found JSON block inside markdown.
      -> ✅ Chunk merged. Table now has ~

In [7]:
web_data_index = r"output/verizon_document_index.json"
web_data = r"output/verizon_production_web_documents.json"

---

In [1]:
"""
Enhanced Verizon knowledge extraction pipeline (schema-integrated; simplified & fixed).
- LLM no longer produces IDs; we deterministically generate eid/sid/evid.
- Name→EID mapping + synthesis prevents "entities-only" drops.
- Predicate normalization removed.
- Validation skipped.
- Evidences attached to all statements in the same chunk (simple, robust).
- Default model: gemini-2.5-flash-lite.
- Verbose progress logs + token usage (best effort).
"""

from __future__ import annotations
import os, json, time, random, hashlib, re, traceback
from pathlib import Path
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Iterator, Tuple
from collections import defaultdict

# ---------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------
WEB_INDEX_PATH = Path("output/verizon_document_index.json")  # reserved
WEB_DOCS_PATH = Path("output/verizon_production_web_documents.json")
PDF_PREPARED_ROOT = Path(r"output/parsed_pdfs/downloaded_verizon_quarterly_pdfs")
OUTPUT_DIR = Path("output/extraction_results")

try:
    OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
    print(f"[INFO] Output directory: {OUTPUT_DIR.absolute()}")
except Exception as e:
    print(f"[ERROR] Failed to create output directory: {e}")
    OUTPUT_DIR = Path.cwd() / "output" / "extraction_results"
    OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
    print(f"[INFO] Using fallback output directory: {OUTPUT_DIR.absolute()}")

# RATE_LIMIT_SECONDS = 31.0
RATE_LIMIT_SECONDS = 15.0
JITTER_RANGE = (1.0, 5.0)
CHECKPOINT_EVERY_RECORDS = 25
CHECKPOINT_EVERY_SECONDS = 300
RESUME = True
SAVE_JSONL = True
MAX_FAILURES = 50
ENABLE_PROMPT_CACHE = True
ENABLE_TOKEN_USAGE = True
# VERBOSE = True  # top-level config
VERBOSE = False  # top-level config

# >>> how many sample statements/evidences to print per chunk
VERBOSE_SAMPLES = 2

# ---------------------------------------------------------------------
# Environment / API key
# ---------------------------------------------------------------------
try:
    from dotenv import load_dotenv
    load_dotenv()
except ImportError:
    pass  # optional

GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")
if not GOOGLE_API_KEY:
    raise RuntimeError("GOOGLE_API_KEY not set in environment/.env")
print("[INFO] GOOGLE_API_KEY loaded (length only) ->", len(GOOGLE_API_KEY))

# Default Gemini model
GENAI_MODEL = os.getenv("GENAI_MODEL", "gemini-2.5-flash-lite")

# ---------------------------------------------------------------------
# Utilities
# ---------------------------------------------------------------------
def _sha(s: str, n: int = 16) -> str:
    return hashlib.sha256(s.encode("utf-8")).hexdigest()[:n]

def _slug(s: str) -> str:
    if not s:
        return ""
    s = re.sub(r"[^a-z0-9]+", "-", s.lower()).strip("-")
    return re.sub(r"-{2,}", "-", s)

def _numify(x: Any) -> Any:
    if isinstance(x, (int, float)):
        return x
    if isinstance(x, str):
        s = x.strip().replace(",", "")
        if re.match(r"^-?\d+\.\d+$", s):
            try: return float(s)
            except: return x
        if re.match(r"^-?\d+$", s):
            try: return int(s)
            except: return x
    return x

def _statement_hash(subject_eid: str, predicate: str, obj: Dict[str, Any]) -> str:
    canon = json.dumps({"s": subject_eid, "p": predicate, "o": obj}, sort_keys=True, ensure_ascii=False)
    return f"st:{_sha(canon, 16)}"

def _evidence_hash(doc_id: str, chunk_id: str, span: Dict[str, Any], sid: Optional[str] = None) -> str:
    base = f"{doc_id}|{chunk_id}|{sid or ''}|{json.dumps(span, sort_keys=True, ensure_ascii=False)[:160]}"
    return f"ev:{_sha(base, 16)}"

# ---------------------------------------------------------------------
# LangChain setup
# ---------------------------------------------------------------------
from langchain_core.prompts import ChatPromptTemplate
from langchain_google_genai import ChatGoogleGenerativeAI

def build_base_llm(model_name: str, temperature: float = 0.0, top_p: float = 0.9, max_retries: int = 1, max_output_tokens: Optional[int] = 8024):
    return ChatGoogleGenerativeAI(
        model=model_name,
        temperature=temperature,
        top_p=top_p,
        max_retries=max_retries,
        max_output_tokens=max_output_tokens,  # guard verbosity
        # google_api_key=GOOGLE_API_KEY  # reads from env if omitted
    )

# ---------------------------------------------------------------------
# GraphExtractionAgent
# ---------------------------------------------------------------------
class GraphExtractionAgent:
    PROMPT_VERSION = "v5-no-llm-ids"

    def __init__(self,
                 model_name: str = GENAI_MODEL,
                 rate_limit: float = RATE_LIMIT_SECONDS,
                 jitter_range: Tuple[float, float] = JITTER_RANGE,
                 temperature: float = 0.0,
                 top_p: float = 0.9,
                 max_retries: int = 1,
                 enable_cache: bool = ENABLE_PROMPT_CACHE):
        self.model_name = model_name
        self.rate_limit = rate_limit
        self.jitter_range = jitter_range
        self.last_request_time: Optional[float] = None
        self.stats = defaultdict(int)
        self.enable_cache = enable_cache
        self._cache: Dict[str, Dict[str, Any]] = {}
        
        self.verbose = VERBOSE
        
        # >>> token running totals
        self.stats["prompt_tokens"] = 0
        self.stats["completion_tokens"] = 0
        self.stats["total_tokens"] = 0

        base_llm = build_base_llm(model_name, temperature=temperature, top_p=top_p, max_retries=max_retries)
        self.llm_table = self._create_table_prompt() | base_llm
        self.llm_text  = self._create_text_prompt()  | base_llm

    # ---- Prompts (escaped braces kept) ----
    def _create_table_prompt(self):
        return ChatPromptTemplate.from_template("""
            You convert a FINANCIAL MARKDOWN TABLE into a knowledge graph **without generating IDs**.

            Return ONLY a JSON object:
            {{
              "entities": [{{"name": "...", "type": "Organization|Person|Metric|TimePeriod|Product|Event|Location|Role|Thing"}}],
              "statements": [
                {{
                  "subject": "...",
                  "predicate": "...",
                  "object": (
                    {{ "is_entity": true,  "name": "...", "type": "..." }}
                    OR
                    {{ "is_entity": false, "value": <number|string>, "dtype": "Money|Percent|Number|Text", "unit": "...", "currency": "...", "period": "..." }}
                  ),
                  "confidence": 0.0..1.0
                }}
              ],
              "evidences": [
                {{ "span": {{ "row_label": "...", "column": "..." }} OR {{ "quote":"..." }}, "confidence": 0.0..1.0 }}
              ]
            }}

            Rules:
            1) Use column headers for period (e.g., "Q1 2025") when emitting numeric metric statements.
            2) Parse numbers (e.g., "33,485" -> 33485). Keep currency/units if shown.
            3) Provide concrete statements only; avoid vague claims.

            Context:
            doc_id={doc_id}
            chunk_id={chunk_id}
            page={page}
            table_id={table_id}
            source={source}

            Financial_Context (JSON):
            {financial_context}

            Markdown_Table:
            {markdown}
            """.strip())

    def _create_text_prompt(self):
        return ChatPromptTemplate.from_template("""
            Extract entities, statements, evidences (NO IDs; we will assign IDs).

            Return ONLY a JSON object with keys: entities, statements, evidences.

            entities:
            - array of objects like: {{ "name": "...", "type": "Organization|Person|Metric|TimePeriod|Product|Event|Location|Role|Thing" }}

            statements:
            - array of objects like:
              {{
                "subject": "...",
                "predicate": "...",
                "object": (
                  {{ "is_entity": true,  "name": "...", "type": "..." }}
                  OR
                  {{ "is_entity": false, "value": <number|string>, "dtype": "Money|Percent|Number|Text", "unit": "...", "currency": "...", "period": "..." }}
                ),
                "confidence": 0.0..1.0
              }}

            evidences:
            - array of objects like: {{ "span": {{ "quote": "..." }}, "confidence": 0.0..1.0 }}

            Context:
            doc_id={doc_id}
            chunk_id={chunk_id}
            page={page}
            label={label}
            source={source}

            Text:
            {text}
            """.strip())

    # ---- Rate limit ----
    def _enforce_rate_limit(self):
        import time
        if self.last_request_time is None:
            self.last_request_time = time.time()
            return
        elapsed = time.time() - self.last_request_time
        wait = self.rate_limit - elapsed
        if wait > 0:
            self.stats["rate_limit_waits"] += 1
            # >>> show wait time
            jitter = random.uniform(*self.jitter_range)
            wait += jitter
            if self.verbose:
                print(f"[RATE] sleeping {wait:.1f}s (base+{jitter:.1f}s jitter)")
            time.sleep(wait)
        self.last_request_time = time.time()

    # ---- JSON parsing ----
    _TOP_OBJECT_RE = re.compile(r'\{.*\}', re.DOTALL)

    def _safe_json_loads(self, raw: str) -> Dict[str, Any]:
        txt = raw.strip()
        if txt.startswith("```"):
            txt = re.sub(r"^```(?:json)?", "", txt, flags=re.IGNORECASE).strip()
            if txt.endswith("```"):
                txt = txt[:-3].strip()
        m = self._TOP_OBJECT_RE.search(txt)
        candidate = m.group(0) if m else txt
        candidate = re.sub(r',\s*([\]}])', r'\1', candidate)
        try:
            data = json.loads(candidate)
            if isinstance(data, dict):
                return data
        except Exception:
            pass
        return {"entities": [], "statements": [], "evidences": []}

    # ---- Token accounting (best effort) ----
    def _record_usage(self, resp, prompt_vars: Dict[str, Any]) -> Tuple[int,int,int]:
        """Try to collect prompt/completion/total tokens; fallback to approximate."""
        pm = cm = tt = 0
        meta = getattr(resp, "response_metadata", None) or {}
        # Gemini often exposes usage as usageMetadata or token_count-like fields
        usage = (
            meta.get("usage_metadata")
            or meta.get("usageMetadata")
            or meta.get("token_usage")
            or meta.get("token_count")
            or {}
        )
        if isinstance(usage, dict):
            # Try common key variants
            for k in ("promptTokenCount", "prompt_tokens", "input_tokens", "prompt_tokens_count"):
                if k in usage: pm = int(usage[k]); break
            for k in ("candidatesTokenCount", "completion_tokens", "output_tokens"):
                if k in usage: cm = int(usage[k]); break
            for k in ("totalTokenCount", "total_tokens", "total"):
                if k in usage: tt = int(usage[k]); break
        if not tt and (pm or cm):
            tt = pm + cm
        if not (pm or cm or tt):
            # Fallback heuristic (roughly 4 chars per token)
            prompt_str = json.dumps(prompt_vars, ensure_ascii=False)
            pm = max(1, int(len(prompt_str) / 4))
            cm = max(1, int(len(getattr(resp, "content", "")) / 4))
            tt = pm + cm

        self.stats["prompt_tokens"] += pm
        self.stats["completion_tokens"] += cm
        self.stats["total_tokens"] += tt
        return pm, cm, tt

    # ---- Normalization (build IDs here) ----
    def _norm_literal_object(self, o: Dict[str, Any]) -> Dict[str, Any]:
        v = _numify(o.get("value"))
        dtype = o.get("dtype") or ("Number" if isinstance(v, (int, float)) else "Text")
        out = {"is_entity": False, "value": v, "dtype": dtype}
        for k in ("unit", "currency", "period"):
            if o.get(k) is not None:
                out[k] = o[k]
        return out

    def _normalize(self, raw: Dict[str, Any], doc_ctx: Dict[str, Any], meta: Dict[str, Any]) -> Dict[str, List[Dict[str, Any]]]:
        # 1) Entities → deterministic eids + name map
        ents_in = raw.get("entities") or []
        entities: List[Dict[str, Any]] = []
        name_to_eid: Dict[str, str] = {}
        seen_eids = set()

        for e in ents_in:
            if not isinstance(e, dict):
                continue
            name = (e.get("name") or "").strip()
            if not name:
                continue
            etype = (e.get("type") or "Thing").strip()
            eid = f"{_slug(etype)}:{_slug(name)}"
            if eid in seen_eids:
                continue
            entities.append({"eid": eid, "name": name, "type": etype})
            name_to_eid[name.lower()] = eid
            seen_eids.add(eid)

        # 2) Statements → map names to eids; synthesize entities when needed
        stmts_in = raw.get("statements") or []
        statements: List[Dict[str, Any]] = []
        for s in stmts_in:
            if not isinstance(s, dict):
                continue
            subj_name = (s.get("subject") or s.get("subject_eid") or "").strip()
            pred = (s.get("predicate") or "").strip()
            obj = s.get("object")

            if not subj_name or not pred or obj is None:
                continue

            # subject: map or synthesize
            subj_eid = name_to_eid.get(subj_name.lower())
            if not subj_eid:
                subj_eid = f"thing:{_slug(subj_name)}"
                if subj_eid not in seen_eids:
                    entities.append({"eid": subj_eid, "name": subj_name, "type": "Thing"})
                    name_to_eid[subj_name.lower()] = subj_eid
                    seen_eids.add(subj_eid)

            # object: entity or literal
            if isinstance(obj, dict) and obj.get("is_entity"):
                obj_name = (obj.get("name") or "").strip()
                obj_type = (obj.get("type") or "Thing").strip()
                if obj_name:
                    obj_eid = name_to_eid.get(obj_name.lower())
                    if not obj_eid:
                        obj_eid = f"{_slug(obj_type)}:{_slug(obj_name)}"
                        if obj_eid not in seen_eids:
                            entities.append({"eid": obj_eid, "name": obj_name, "type": obj_type})
                            name_to_eid[obj_name.lower()] = obj_eid
                            seen_eids.add(obj_eid)
                    object_norm = {"is_entity": True, "eid": obj_eid}
                else:
                    object_norm = {"is_entity": False, "value": str(obj), "dtype": "Text"}
            elif isinstance(obj, dict) and (("value" in obj) or ("dtype" in obj) or ("period" in obj)):
                object_norm = self._norm_literal_object(obj)
            else:
                object_norm = {"is_entity": False, "value": str(obj), "dtype": "Text"}

            sid = _statement_hash(subj_eid, pred, object_norm)
            conf = float(s.get("confidence", 0.7))
            statements.append({
                "sid": sid,
                "subject_eid": subj_eid,
                "predicate": pred,           # NO normalization
                "object": object_norm,
                "confidence": conf,
                "model": self.model_name,
                "prompt_version": self.PROMPT_VERSION
            })

        # 3) Evidences → attach spans to all statements in the chunk
        evids_in = raw.get("evidences") or []
        doc_id = doc_ctx.get("doc_id")
        chunk_id = meta.get("chunk_id")
        page = meta.get("page")
        table_id = meta.get("table_id")
        label = meta.get("label") or ("table" if table_id else "text")

        evidences: List[Dict[str, Any]] = []
        if evids_in:
            for ev in evids_in:
                span = ev.get("span") or ev.get("quote") or ""
                if isinstance(span, str):
                    span = {"quote": span[:300]}
                conf = float(ev.get("confidence", 0.8))
                for s in statements:
                    evid = _evidence_hash(doc_id or "", chunk_id or "", span, s["sid"])  # <-- pass sid
                    evidences.append({
                        "evid": evid,
                        "sid": s["sid"],
                        "doc_id": doc_id,
                        "chunk_id": chunk_id,
                        "page": page,
                        "table_id": table_id,
                        "label": label,
                        "span": span,
                        "confidence": conf,
                        "model": self.model_name,
                        "prompt_version": self.PROMPT_VERSION
                    })
        else:
            for s in statements:
                span = {"note": "auto-evidence (no span provided)"}
                evid = _evidence_hash(doc_id or "", chunk_id or "", span, s['sid'])
                evidences.append({
                    "evid": evid,
                    "sid": s["sid"],
                    "doc_id": doc_id,
                    "chunk_id": chunk_id,
                    "page": page,
                    "table_id": table_id,
                    "label": label,
                    "span": span,
                    "confidence": 0.55,
                    "model": self.model_name,
                    "prompt_version": self.PROMPT_VERSION
                })

        return {"entities": entities, "statements": statements, "evidences": evidences}

    # ---- Cache key ----
    def _cache_key(self, kind: str, payload: Dict[str, Any]) -> str:
        base = json.dumps({"k": kind, "p": payload}, sort_keys=True, ensure_ascii=False)
        return hashlib.sha256(base.encode()).hexdigest()

    # ---- Public APIs with verbose printing + token accounting ----
    def extract_table(self, *, table_markdown: str, doc_ctx: Dict[str, Any], meta: Dict[str, Any]) -> Dict[str, Any]:
        if not table_markdown.strip():
            return {"entities": [], "statements": [], "evidences": []}
        vars_payload = {
            "doc_id": doc_ctx.get("doc_id"),
            "chunk_id": meta.get("chunk_id"),
            "page": meta.get("page"),
            "table_id": meta.get("table_id"),
            "source": doc_ctx.get("source"),
            "financial_context": json.dumps(meta.get("financial_context") or {}, ensure_ascii=False),
            "markdown": table_markdown
        }
        ck = self._cache_key("table", vars_payload)
        if self.enable_cache and ck in self._cache:
            if self.verbose:
                print(f"[HIT] cache table doc={doc_ctx.get('doc_id')} chunk={meta.get('chunk_id')}")
            return self._cache[ck]

        if self.verbose:
            print(f"[RUN] table  doc={doc_ctx.get('doc_id')} chunk={meta.get('chunk_id')} page={meta.get('page')} tbl={meta.get('table_id')}")
        self._enforce_rate_limit()
        self.stats["requests"] += 1
        try:
            resp = self.llm_table.invoke(vars_payload)
            pm, cm, tt = self._record_usage(resp, vars_payload)
            raw = self._safe_json_loads(resp.content)
            norm = self._normalize(raw, doc_ctx, meta)
            self._tally(norm)

            if self.verbose:
                print(f"[OK ] table  doc={doc_ctx.get('doc_id')} chunk={meta.get('chunk_id')} "
                    f"ents={len(norm['entities'])} stmts={len(norm['statements'])} evs={len(norm['evidences'])} | tokens p={pm} c={cm} t={tt}")
                for s in norm["statements"][:VERBOSE_SAMPLES]:
                    print("      • stmt", json.dumps({"subj": s["subject_eid"], "pred": s["predicate"], "obj": s["object"]}, ensure_ascii=False))
                for ev in norm["evidences"][:VERBOSE_SAMPLES]:
                    span = ev.get("span", {})
                    span_str = json.dumps(span, ensure_ascii=False)
                    print("      • evid", span_str if len(span_str) < 160 else json.dumps({"preview": span_str[:160] + "..."}, ensure_ascii=False))

            if self.enable_cache:
                self._cache[ck] = norm
            return norm
        except Exception as e:
            self.stats["failures"] += 1
            if self.verbose:
                print(f"[ERROR] Table extraction error: {e}\n{traceback.format_exc()}")
            return {"entities": [], "statements": [], "evidences": []}

    def extract_text(self, *, text: str, doc_ctx: Dict[str, Any], meta: Dict[str, Any]) -> Dict[str, Any]:
        if not text or len(text.split()) < 5:
            return {"entities": [], "statements": [], "evidences": []}
        vars_payload = {
            "doc_id": doc_ctx.get("doc_id"),
            "chunk_id": meta.get("chunk_id"),
            "page": meta.get("page"),
            "label": meta.get("label") or "text",
            "source": doc_ctx.get("source"),
            "text": text
        }
        ck = self._cache_key("text", vars_payload)
        if self.enable_cache and ck in self._cache:
            if self.verbose:
                print(f"[HIT] cache text  doc={doc_ctx.get('doc_id')} chunk={meta.get('chunk_id')}")
            return self._cache[ck]

        if self.verbose:
            print(f"[RUN] text   doc={doc_ctx.get('doc_id')} chunk={meta.get('chunk_id')} page={meta.get('page')} words={len(text.split())}")
        self._enforce_rate_limit()
        self.stats["requests"] += 1
        try:
            resp = self.llm_text.invoke(vars_payload)
            pm, cm, tt = self._record_usage(resp, vars_payload)
            raw = self._safe_json_loads(resp.content)
            norm = self._normalize(raw, doc_ctx, meta)
            self._tally(norm)

            if self.verbose:
                print(f"[OK ] text   doc={doc_ctx.get('doc_id')} chunk={meta.get('chunk_id')} "
                    f"ents={len(norm['entities'])} stmts={len(norm['statements'])} evs={len(norm['evidences'])} | tokens p={pm} c={cm} t={tt}")
                for s in norm["statements"][:VERBOSE_SAMPLES]:
                    print("      • stmt", json.dumps({"subj": s["subject_eid"], "pred": s["predicate"], "obj": s["object"]}, ensure_ascii=False))
                for ev in norm["evidences"][:VERBOSE_SAMPLES]:
                    span = ev.get("span", {})
                    span_str = json.dumps(span, ensure_ascii=False)
                    print("      • evid", span_str if len(span_str) < 160 else json.dumps({"preview": span_str[:160] + "..."}, ensure_ascii=False))

            if self.enable_cache:
                self._cache[ck] = norm
            return norm
        except Exception as e:
            self.stats["failures"] += 1
            if self.verbose:
                print(f"[ERROR] Text extraction error: {e}\n{traceback.format_exc()}")
            return {"entities": [], "statements": [], "evidences": []}

    def _tally(self, norm: Dict[str, List[Dict[str, Any]]]):
        self.stats["success"] += 1
        self.stats["entities"] += len(norm["entities"])
        self.stats["statements"] += len(norm["statements"])
        self.stats["evidences"] += len(norm["evidences"])

    def get_statistics(self) -> Dict[str, Any]:
        total_req = max(1, self.stats.get("requests", 0))
        succ = self.stats.get("success", 0)
        return {
            **self.stats,
            "success_rate": succ / total_req,
            "avg_entities_per_success": self.stats.get("entities", 0) / max(1, succ),
            "avg_statements_per_success": self.stats.get("statements", 0) / max(1, succ),
            "avg_evidences_per_success": self.stats.get("evidences", 0) / max(1, succ),
        }

# ---------------------------------------------------------------------
# Data loading (web + prepared PDF)  [unchanged]
# ---------------------------------------------------------------------
def _safe_load_json(path: Path) -> Optional[dict]:
    try:
        with path.open("r", encoding="utf-8") as f:
            return json.load(f)
    except Exception as e:
        print(f"[WARN] Failed reading {path}: {e}")
        return None

def load_raw_web_docs() -> Any:
    if not WEB_DOCS_PATH.exists():
        print(f"[INFO] Web docs file not found: {WEB_DOCS_PATH}")
        return None
    try:
        with WEB_DOCS_PATH.open("r", encoding="utf-8") as f:
            return json.load(f)
    except Exception as e:
        print(f"[WARN] Failed to load web docs: {e}")
        return None

def _normalize_web_docs(raw: Any) -> List[Dict[str, Any]]:
    if raw is None:
        return []
    if isinstance(raw, list):
        return [r for r in raw if isinstance(r, dict)]
    if isinstance(raw, dict):
        if "documents" in raw and isinstance(raw["documents"], list):
            return [r for r in raw["documents"] if isinstance(r, dict)]
        if "data" in raw and isinstance(raw["data"], list):
            return [r for r in raw["data"] if isinstance(r, dict)]
        if all(isinstance(v, dict) for v in raw.values()):
            out = []
            for k, v in raw.items():
                if "doc_id" not in v and "id" not in v:
                    v = {**v, "doc_id": k}
                out.append(v)
            return out
    print("[WARN] Unrecognized web docs JSON structure; proceeding with empty list.")
    return []

def _derive_web_doc_id(meta: Dict[str, Any], index: int) -> str:
    for key in ("doc_id", "id", "slug"):
        if meta.get(key):
            return str(meta[key])
    url = meta.get("url")
    if url:
        return "web:" + _sha(url, 12)
    snippet = (meta.get("text") or meta.get("content") or "")[:80]
    return "web:" + _sha(f"{index}|{snippet}", 12)

def iter_web_chunks(max_chars: int = 10000) -> Iterator[Dict[str, Any]]:
    raw = load_raw_web_docs()
    records = _normalize_web_docs(raw)
    if not records:
        return
    for idx, meta in enumerate(records):
        if not isinstance(meta, dict):
            continue
        doc_id = _derive_web_doc_id(meta, idx)
        text = meta.get("text") or meta.get("content") or ""
        if not isinstance(text, str):
            if isinstance(text, list):
                text = " ".join(str(t) for t in text)
            else:
                text = str(text)
        if not text.strip():
            continue
        url = meta.get("url")
        start = 0
        chunk_idx = 0
        while start < len(text):
            piece = text[start:start+max_chars]
            yield {
                "source_type": "web",
                "doc_id": doc_id,
                "chunk_id": f"{doc_id}::chunk_{chunk_idx}",
                "page": None,
                "table_id": None,
                "content_type": "text",
                "text": piece,
                "markdown_table": None,
                "url": url,
                "meta": {k: v for k, v in meta.items() if k not in ("text", "content")},
                "year": meta.get("year"),
                "quarter": meta.get("quarter"),
            }
            start += max_chars
            chunk_idx += 1

def iter_prepared_pdf_files(years: Optional[List[str]] = None) -> Iterator[Path]:
    if not PDF_PREPARED_ROOT.exists():
        return
    for p in PDF_PREPARED_ROOT.rglob("*.prepared.json"):
        year = next((seg for seg in p.parts if seg.isdigit() and len(seg) == 4), None)
        if years and year and year not in years:
            continue
        yield p

def _extract_pdf_tables(d: dict) -> List[dict]:
    tables = d.get("reconstructed_tables") or d.get("tables") or []
    out = []
    for t in tables:
        md = t.get("markdown") or t.get("markdown_table") or t.get("md")
        if not md:
            continue
        out.append({
            "table_id": t.get("id") or t.get("table_id"),
            "page": t.get("page") or t.get("page_no") or t.get("page_number"),
            "markdown": md,
            "title": t.get("title"),
            "financial_context": t.get("financial_context"),
            "meta": {k: v for k, v in t.items() if k not in ("markdown","markdown_table","md")}
        })
    return out

def _extract_pdf_text(d: dict) -> List[dict]:
    chunks = d.get("text_chunks") or d.get("chunks") or []
    out = []
    for c in chunks:
        txt = c.get("text") or c.get("content") or ""
        if not txt.strip():
            continue
        out.append({
            "chunk_id": c.get("id") or c.get("chunk_id"),
            "page": c.get("page") or c.get("page_no") or c.get("page_number"),
            "text": txt,
            "meta": {k: v for k, v in c.items() if k not in ("text","content")}
        })
    return out

def iter_pdf_records(years: Optional[List[str]] = None,
                     include_tables: bool = True,
                     include_text: bool = True) -> Iterator[Dict[str, Any]]:
    for path in iter_prepared_pdf_files(years=years):
        data = _safe_load_json(path)
        if not data:
            continue
        meta = data.get("document_meta") or data.get("meta") or {}
        doc_id = data.get("doc_id") or f"pdfdoc:{_sha(str(path), 12)}"
        year = next((seg for seg in path.parts if seg.isdigit() and len(seg) == 4), None)
        quarter = None
        for seg in path.parts:
            if len(seg) == 2 and seg[0].isdigit() and seg[1].upper() == "Q":
                quarter = seg.upper()
        if include_tables:
            for t in _extract_pdf_tables(data):
                yield {
                    "source_type": "pdf",
                    "doc_id": doc_id,
                    "chunk_id": f"{doc_id}::table::{t['table_id']}",
                    "page": t["page"],
                    "table_id": t["table_id"],
                    "content_type": "table",
                    "text": None,
                    "markdown_table": t["markdown"],
                    "url": meta.get("source_url"),
                    "meta": {**meta, **t.get("meta", {}), "title": t.get("title")},
                    "financial_context": t.get("financial_context"),
                    "source_path": str(path),
                    "year": year,
                    "quarter": quarter
                }
        if include_text:
            for c in _extract_pdf_text(data):
                cid = c["chunk_id"] or f"{doc_id}::chunk::{_sha(str(path)+str(c['page'])+c['text'][:32], 10)}"
                yield {
                    "source_type": "pdf",
                    "doc_id": doc_id,
                    "chunk_id": cid,
                    "page": c["page"],
                    "table_id": None,
                    "content_type": "text",
                    "text": c["text"],
                    "markdown_table": None,
                    "url": meta.get("source_url"),
                    "meta": {**meta, **c.get("meta", {})},
                    "source_path": str(path),
                    "year": year,
                    "quarter": quarter
                }

def iter_all_records(years: Optional[List[str]] = None,
                     include_web: bool = True,
                     include_pdf_tables: bool = True,
                     include_pdf_text: bool = True) -> Iterator[Dict[str, Any]]:
    if include_web:
        yield from iter_web_chunks()
    yield from iter_pdf_records(years=years,
                                include_tables=include_pdf_tables,
                                include_text=include_pdf_text)

# ---------------------------------------------------------------------
# Aggregation / Merge
# ---------------------------------------------------------------------
@dataclass
class AggregateStore:
    entities: Dict[str, Dict[str, Any]] = field(default_factory=dict)
    statements: Dict[str, Dict[str, Any]] = field(default_factory=dict)   # sid -> statement
    evidences: Dict[str, Dict[str, Any]] = field(default_factory=dict)    # evid -> evidence
    stmt_key_index: Dict[str, str] = field(default_factory=dict)          # canonical key -> sid

    def _statement_canonical_key(self, s: Dict[str, Any]) -> str:
        obj = json.dumps(s["object"], sort_keys=True, ensure_ascii=False)
        return f"{s['subject_eid']}|{s['predicate']}|{obj}"

    def merge_batch(self, batch: Dict[str, List[Dict[str, Any]]]):
        # entities
        for e in batch["entities"]:
            self.entities[e["eid"]] = e
        # statements with dedupe
        for s in batch["statements"]:
            skey = self._statement_canonical_key(s)
            existing_sid = self.stmt_key_index.get(skey)
            if existing_sid:
                existing = self.statements[existing_sid]
                existing["confidence"] = max(existing.get("confidence", 0), s.get("confidence", 0))
            else:
                self.statements[s["sid"]] = s
                self.stmt_key_index[skey] = s["sid"]
        # evidences
        for ev in batch["evidences"]:
            self.evidences[ev["evid"]] = ev

    def to_dict(self) -> Dict[str, Any]:
        sid_to_evids = defaultdict(list)
        for ev in self.evidences.values():
            sid_to_evids[ev["sid"]].append(ev["evid"])
        out_statements = []
        for s in self.statements.values():
            s_out = dict(s)
            s_out["evidence_ids"] = sid_to_evids.get(s["sid"], [])
            out_statements.append(s_out)
        return {
            "entities": list(self.entities.values()),
            "statements": out_statements,
            "evidences": list(self.evidences.values())
        }

    def save(self, path: Path):
        tmp = path.with_suffix(".tmp")
        with tmp.open("w", encoding="utf-8") as f:
            json.dump(self.to_dict(), f, indent=2)
        tmp.replace(path)

    def save_jsonl(self, base_dir: Path):
        data = self.to_dict()
        for name, seq in data.items():
            p = base_dir / f"{name}.jsonl"
            tmp = p.with_suffix(".tmp")
            with tmp.open("w", encoding="utf-8") as f:
                for obj in seq:
                    f.write(json.dumps(obj, ensure_ascii=False) + "\n")
            tmp.replace(p)

def load_existing_store(path: Path) -> AggregateStore:
    if not path.exists():
        return AggregateStore()
    try:
        data = json.loads(path.read_text(encoding="utf-8"))
        store = AggregateStore()
        for e in data.get("entities", []):
            store.entities[e["eid"]] = e
        for s in data.get("statements", []):
            store.statements[s["sid"]] = s
            store.stmt_key_index[store._statement_canonical_key(s)] = s["sid"]
        for ev in data.get("evidences", []):
            store.evidences[ev["evid"]] = ev
        return store
    except Exception as e:
        print(f"[WARN] Failed loading existing aggregate: {e}")
        return AggregateStore()

# ---------------------------------------------------------------------
# Orchestrator
# ---------------------------------------------------------------------
def run_extraction(years: Optional[List[str]] = None,
                   include_web: bool = True,
                   include_pdf_tables: bool = True,
                   include_pdf_text: bool = True,
                   max_records: Optional[int] = None,
                   output_name: str = "knowledge_extraction_aggregate.json",
                   model_name: str = GENAI_MODEL) -> Dict[str, Any]:
    aggregate_path = OUTPUT_DIR / output_name
    store = load_existing_store(aggregate_path) if RESUME else AggregateStore()
    
    # New addition
    # ===============================================================
    pre_counts = {
        "entities": len(store.entities),
        "statements": len(store.statements),
        "evidences": len(store.evidences),
    }
    print("[INIT] store entities=%d statements=%d evidences=%d" %
        (pre_counts["entities"], pre_counts["statements"], pre_counts["evidences"]))
    # ==============================================================

    extractor = GraphExtractionAgent(model_name=model_name)

    # Optional token usage measurement via callback context (kept; may or may not populate)
    if ENABLE_TOKEN_USAGE:
        try:
            from langchain.callbacks.token_usage import TokenUsageCallbackHandler
            token_handler = TokenUsageCallbackHandler()
            token_handler.__enter__()  # start tracking
            print("[INFO] TokenUsageCallbackHandler active")
        except Exception:
            token_handler = None
    else:
        token_handler = None

    failures = 0
    processed = 0
    start_time = time.time()
    last_checkpoint = start_time

    for rec in iter_all_records(years=years,
                                include_web=include_web,
                                include_pdf_tables=include_pdf_tables,
                                include_pdf_text=include_pdf_text):
        if max_records and processed >= max_records:
            break

        doc_ctx = {"doc_id": rec["doc_id"], "source": rec.get("url")}
        meta = {
            "chunk_id": rec["chunk_id"],
            "page": rec.get("page"),
            "table_id": rec.get("table_id"),
            "label": "table" if rec["content_type"] == "table" else "text",
            "financial_context": rec.get("financial_context")
        }

        try:
            if rec["content_type"] == "table" and rec.get("markdown_table"):
                batch = extractor.extract_table(table_markdown=rec["markdown_table"], doc_ctx=doc_ctx, meta=meta)
            else:
                batch = extractor.extract_text(text=rec.get("text") or "", doc_ctx=doc_ctx, meta=meta)
            store.merge_batch(batch)
        except Exception as e:
            failures += 1
            print(f"[ERROR] Extraction failure doc_id={rec['doc_id']} type={rec['content_type']}: {e}")
            if failures >= MAX_FAILURES:
                print("[ABORT] Failure threshold reached.")
                break

        processed += 1
        now = time.time()
        if (processed % CHECKPOINT_EVERY_RECORDS == 0) or (now - last_checkpoint >= CHECKPOINT_EVERY_SECONDS):
            store.save(aggregate_path)
            if SAVE_JSONL:
                store.save_jsonl(OUTPUT_DIR)
            tokens_total = extractor.stats.get("total_tokens", 0)
            print(f"[CHECKPOINT] processed={processed} ents={len(store.entities)} stmts={len(store.statements)} "
                f"evs={len(store.evidences)} elapsed={int(now-start_time)}s tokens_total={tokens_total}")
            last_checkpoint = now

    # Final save
    store.save(aggregate_path)
    if SAVE_JSONL:
        store.save_jsonl(OUTPUT_DIR)

    stats_agent = extractor.get_statistics()
    summary = {
        "processed_records": processed,
        "entities": len(store.entities),
        "statements": len(store.statements),
        "evidences": len(store.evidences),
        "failures": failures,
        "elapsed_seconds": int(time.time() - start_time),
        "agent_stats": stats_agent
    }

    # Close token tracking
    if token_handler:
        try:
            token_handler.__exit__(None, None, None)
            summary["token_usage_callback"] = getattr(token_handler, "total_usage", {})
        except Exception:
            pass

    # >>> Always include our in-agent totals
    summary["token_usage_agent"] = {
        "prompt_tokens": stats_agent.get("prompt_tokens", 0),
        "completion_tokens": stats_agent.get("completion_tokens", 0),
        "total_tokens": stats_agent.get("total_tokens", 0),
    }

    with (OUTPUT_DIR / "run_summary.json").open("w", encoding="utf-8") as f:
        json.dump(summary, f, indent=2)
    print("[DONE]", json.dumps(summary, indent=2))
    
    # Added Code
    # ================================================================================
    post_counts = {
        "entities": len(store.entities),
        "statements": len(store.statements),
        "evidences": len(store.evidences),
    }
    summary["delta"] = {
        "entities_added": post_counts["entities"] - pre_counts["entities"],
        "statements_added": post_counts["statements"] - pre_counts["statements"],
        "evidences_added": post_counts["evidences"] - pre_counts["evidences"],
    }

    # ================================================================================
    
    
    return summary


[INFO] Output directory: i:\My Drive\M. Tech AI ML\AIML SEM 4\Dissertation\Project\output\extraction_results
[INFO] GOOGLE_API_KEY loaded (length only) -> 39


In [None]:
"""
Enhanced Verizon knowledge extraction pipeline (schema-integrated; simplified & fixed).
- LLM no longer produces IDs; we deterministically generate eid/sid/evid.
- Name→EID mapping + synthesis prevents "entities-only" drops.
- Predicate normalization removed.
- Validation skipped.
- Evidences attached to all statements in the same chunk (simple, robust).
- Default model: gemini-2.5-flash-lite.
- Verbose progress logs + token usage (best effort).
"""

from __future__ import annotations
import os, json, time, random, hashlib, re, traceback
from pathlib import Path
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Iterator, Tuple
from collections import defaultdict

# ---------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------
WEB_INDEX_PATH = Path("output/verizon_document_index.json")  # reserved
WEB_DOCS_PATH = Path("output/verizon_production_web_documents.json")
PDF_PREPARED_ROOT = Path(r"output/parsed_pdfs/downloaded_verizon_quarterly_pdfs")
OUTPUT_DIR = Path("output/extraction_results")

try:
    OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
    print(f"[INFO] Output directory: {OUTPUT_DIR.absolute()}")
except Exception as e:
    print(f"[ERROR] Failed to create output directory: {e}")
    OUTPUT_DIR = Path.cwd() / "output" / "extraction_results"
    OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
    print(f"[INFO] Using fallback output directory: {OUTPUT_DIR.absolute()}")

# RATE_LIMIT_SECONDS = 31.0
RATE_LIMIT_SECONDS = 15.0
JITTER_RANGE = (1.0, 5.0)
CHECKPOINT_EVERY_RECORDS = 25
CHECKPOINT_EVERY_SECONDS = 300
RESUME = True
SAVE_JSONL = True
MAX_FAILURES = 50
ENABLE_PROMPT_CACHE = True
ENABLE_TOKEN_USAGE = True
# VERBOSE = True  # top-level config
VERBOSE = False  # top-level config

# >>> how many sample statements/evidences to print per chunk
VERBOSE_SAMPLES = 2

# ---------------------------------------------------------------------
# Environment / API key
# ---------------------------------------------------------------------
try:
    from dotenv import load_dotenv
    load_dotenv()
except ImportError:
    pass  # optional

GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")
if not GOOGLE_API_KEY:
    raise RuntimeError("GOOGLE_API_KEY not set in environment/.env")
print("[INFO] GOOGLE_API_KEY loaded (length only) ->", len(GOOGLE_API_KEY))

# Default Gemini model
GENAI_MODEL = os.getenv("GENAI_MODEL", "gemini-2.5-flash-lite")

# ---------------------------------------------------------------------
# Utilities
# ---------------------------------------------------------------------
def _sha(s: str, n: int = 16) -> str:
    return hashlib.sha256(s.encode("utf-8")).hexdigest()[:n]

def _slug(s: str) -> str:
    if not s:
        return ""
    s = re.sub(r"[^a-z0-9]+", "-", s.lower()).strip("-")
    return re.sub(r"-{2,}", "-", s)

def _numify(x: Any) -> Any:
    if isinstance(x, (int, float)):
        return x
    if isinstance(x, str):
        s = x.strip().replace(",", "")
        if re.match(r"^-?\d+\.\d+$", s):
            try: return float(s)
            except: return x
        if re.match(r"^-?\d+$", s):
            try: return int(s)
            except: return x
    return x

def _statement_hash(subject_eid: str, predicate: str, obj: Dict[str, Any]) -> str:
    canon = json.dumps({"s": subject_eid, "p": predicate, "o": obj}, sort_keys=True, ensure_ascii=False)
    return f"st:{_sha(canon, 16)}"

def _evidence_hash(doc_id: str, chunk_id: str, span: Dict[str, Any], sid: Optional[str] = None) -> str:
    base = f"{doc_id}|{chunk_id}|{sid or ''}|{json.dumps(span, sort_keys=True, ensure_ascii=False)[:160]}"
    return f"ev:{_sha(base, 16)}"

# ---------------------------------------------------------------------
# LangChain setup
# ---------------------------------------------------------------------
from langchain_core.prompts import ChatPromptTemplate
from langchain_google_genai import ChatGoogleGenerativeAI

def build_base_llm(model_name: str, temperature: float = 0.0, top_p: float = 0.9, max_retries: int = 1, max_output_tokens: Optional[int] = 8024):
    return ChatGoogleGenerativeAI(
        model=model_name,
        temperature=temperature,
        top_p=top_p,
        max_retries=max_retries,
        max_output_tokens=max_output_tokens,  # guard verbosity
        # google_api_key=GOOGLE_API_KEY  # reads from env if omitted
    )

# ---------------------------------------------------------------------
# GraphExtractionAgent
# ---------------------------------------------------------------------
class GraphExtractionAgent:
    PROMPT_VERSION = "v5-no-llm-ids"

    def __init__(self,
                 model_name: str = GENAI_MODEL,
                 rate_limit: float = RATE_LIMIT_SECONDS,
                 jitter_range: Tuple[float, float] = JITTER_RANGE,
                 temperature: float = 0.0,
                 top_p: float = 0.9,
                 max_retries: int = 1,
                 enable_cache: bool = ENABLE_PROMPT_CACHE):
        self.model_name = model_name
        self.rate_limit = rate_limit
        self.jitter_range = jitter_range
        self.last_request_time: Optional[float] = None
        self.stats = defaultdict(int)
        self.enable_cache = enable_cache
        self._cache: Dict[str, Dict[str, Any]] = {}
        
        self.verbose = VERBOSE
        
        # >>> token running totals
        self.stats["prompt_tokens"] = 0
        self.stats["completion_tokens"] = 0
        self.stats["total_tokens"] = 0

        base_llm = build_base_llm(model_name, temperature=temperature, top_p=top_p, max_retries=max_retries)
        self.llm_table = self._create_table_prompt() | base_llm
        self.llm_text  = self._create_text_prompt()  | base_llm

    # ---- Prompts (escaped braces kept) ----
    def _create_table_prompt(self):
        return ChatPromptTemplate.from_template("""
            You convert a FINANCIAL MARKDOWN TABLE into a knowledge graph **without generating IDs**.

            Return ONLY a JSON object:
            {{
              "entities": [{{"name": "...", "type": "Organization|Person|Metric|TimePeriod|Product|Event|Location|Role|Thing"}}],
              "statements": [
                {{
                  "subject": "...",
                  "predicate": "...",
                  "object": (
                    {{ "is_entity": true,  "name": "...", "type": "..." }}
                    OR
                    {{ "is_entity": false, "value": <number|string>, "dtype": "Money|Percent|Number|Text", "unit": "...", "currency": "...", "period": "..." }}
                  ),
                  "confidence": 0.0..1.0
                }}
              ],
              "evidences": [
                {{ "span": {{ "row_label": "...", "column": "..." }} OR {{ "quote":"..." }}, "confidence": 0.0..1.0 }}
              ]
            }}

            Rules:
            1) Use column headers for period (e.g., "Q1 2025") when emitting numeric metric statements.
            2) Parse numbers (e.g., "33,485" -> 33485). Keep currency/units if shown.
            3) Provide concrete statements only; avoid vague claims.

            Context:
            doc_id={doc_id}
            chunk_id={chunk_id}
            page={page}
            table_id={table_id}
            source={source}

            Financial_Context (JSON):
            {financial_context}

            Markdown_Table:
            {markdown}
            """.strip())

    def _create_text_prompt(self):
        return ChatPromptTemplate.from_template("""
            Extract entities, statements, evidences (NO IDs; we will assign IDs).

            Return ONLY a JSON object with keys: entities, statements, evidences.

            entities:
            - array of objects like: {{ "name": "...", "type": "Organization|Person|Metric|TimePeriod|Product|Event|Location|Role|Thing" }}

            statements:
            - array of objects like:
              {{
                "subject": "...",
                "predicate": "...",
                "object": (
                  {{ "is_entity": true,  "name": "...", "type": "..." }}
                  OR
                  {{ "is_entity": false, "value": <number|string>, "dtype": "Money|Percent|Number|Text", "unit": "...", "currency": "...", "period": "..." }}
                ),
                "confidence": 0.0..1.0
              }}

            evidences:
            - array of objects like: {{ "span": {{ "quote": "..." }}, "confidence": 0.0..1.0 }}

            Context:
            doc_id={doc_id}
            chunk_id={chunk_id}
            page={page}
            label={label}
            source={source}

            Text:
            {text}
            """.strip())

    # ---- Rate limit ----
    def _enforce_rate_limit(self):
        import time
        if self.last_request_time is None:
            self.last_request_time = time.time()
            return
        elapsed = time.time() - self.last_request_time
        wait = self.rate_limit - elapsed
        if wait > 0:
            self.stats["rate_limit_waits"] += 1
            # >>> show wait time
            jitter = random.uniform(*self.jitter_range)
            wait += jitter
            if self.verbose:
                print(f"[RATE] sleeping {wait:.1f}s (base+{jitter:.1f}s jitter)")
            time.sleep(wait)
        self.last_request_time = time.time()

    # ---- JSON parsing ----
    _TOP_OBJECT_RE = re.compile(r'\{.*\}', re.DOTALL)

    def _safe_json_loads(self, raw: str) -> Dict[str, Any]:
        txt = raw.strip()
        if txt.startswith("```"):
            txt = re.sub(r"^```(?:json)?", "", txt, flags=re.IGNORECASE).strip()
            if txt.endswith("```"):
                txt = txt[:-3].strip()
        m = self._TOP_OBJECT_RE.search(txt)
        candidate = m.group(0) if m else txt
        candidate = re.sub(r',\s*([\]}])', r'\1', candidate)
        try:
            data = json.loads(candidate)
            if isinstance(data, dict):
                return data
        except Exception:
            pass
        return {"entities": [], "statements": [], "evidences": []}

    # ---- Token accounting (best effort) ----
    def _record_usage(self, resp, prompt_vars: Dict[str, Any]) -> Tuple[int,int,int]:
        """Try to collect prompt/completion/total tokens; fallback to approximate."""
        pm = cm = tt = 0
        meta = getattr(resp, "response_metadata", None) or {}
        # Gemini often exposes usage as usageMetadata or token_count-like fields
        usage = (
            meta.get("usage_metadata")
            or meta.get("usageMetadata")
            or meta.get("token_usage")
            or meta.get("token_count")
            or {}
        )
        if isinstance(usage, dict):
            # Try common key variants
            for k in ("promptTokenCount", "prompt_tokens", "input_tokens", "prompt_tokens_count"):
                if k in usage: pm = int(usage[k]); break
            for k in ("candidatesTokenCount", "completion_tokens", "output_tokens"):
                if k in usage: cm = int(usage[k]); break
            for k in ("totalTokenCount", "total_tokens", "total"):
                if k in usage: tt = int(usage[k]); break
        if not tt and (pm or cm):
            tt = pm + cm
        if not (pm or cm or tt):
            # Fallback heuristic (roughly 4 chars per token)
            prompt_str = json.dumps(prompt_vars, ensure_ascii=False)
            pm = max(1, int(len(prompt_str) / 4))
            cm = max(1, int(len(getattr(resp, "content", "")) / 4))
            tt = pm + cm

        self.stats["prompt_tokens"] += pm
        self.stats["completion_tokens"] += cm
        self.stats["total_tokens"] += tt
        return pm, cm, tt

    # ---- Normalization (build IDs here) ----
    def _norm_literal_object(self, o: Dict[str, Any]) -> Dict[str, Any]:
        v = _numify(o.get("value"))
        dtype = o.get("dtype") or ("Number" if isinstance(v, (int, float)) else "Text")
        out = {"is_entity": False, "value": v, "dtype": dtype}
        for k in ("unit", "currency", "period"):
            if o.get(k) is not None:
                out[k] = o[k]
        return out

    def _normalize(self, raw: Dict[str, Any], doc_ctx: Dict[str, Any], meta: Dict[str, Any]) -> Dict[str, List[Dict[str, Any]]]:
        # 1) Entities → deterministic eids + name map
        ents_in = raw.get("entities") or []
        entities: List[Dict[str, Any]] = []
        name_to_eid: Dict[str, str] = {}
        seen_eids = set()

        for e in ents_in:
            if not isinstance(e, dict):
                continue
            name = (e.get("name") or "").strip()
            if not name:
                continue
            etype = (e.get("type") or "Thing").strip()
            eid = f"{_slug(etype)}:{_slug(name)}"
            if eid in seen_eids:
                continue
            entities.append({"eid": eid, "name": name, "type": etype})
            name_to_eid[name.lower()] = eid
            seen_eids.add(eid)

        # 2) Statements → map names to eids; synthesize entities when needed
        stmts_in = raw.get("statements") or []
        statements: List[Dict[str, Any]] = []
        for s in stmts_in:
            if not isinstance(s, dict):
                continue
            subj_name = (s.get("subject") or s.get("subject_eid") or "").strip()
            pred = (s.get("predicate") or "").strip()
            obj = s.get("object")

            if not subj_name or not pred or obj is None:
                continue

            # subject: map or synthesize
            subj_eid = name_to_eid.get(subj_name.lower())
            if not subj_eid:
                subj_eid = f"thing:{_slug(subj_name)}"
                if subj_eid not in seen_eids:
                    entities.append({"eid": subj_eid, "name": subj_name, "type": "Thing"})
                    name_to_eid[subj_name.lower()] = subj_eid
                    seen_eids.add(subj_eid)

            # object: entity or literal
            if isinstance(obj, dict) and obj.get("is_entity"):
                obj_name = (obj.get("name") or "").strip()
                obj_type = (obj.get("type") or "Thing").strip()
                if obj_name:
                    obj_eid = name_to_eid.get(obj_name.lower())
                    if not obj_eid:
                        obj_eid = f"{_slug(obj_type)}:{_slug(obj_name)}"
                        if obj_eid not in seen_eids:
                            entities.append({"eid": obj_eid, "name": obj_name, "type": obj_type})
                            name_to_eid[obj_name.lower()] = obj_eid
                            seen_eids.add(obj_eid)
                    object_norm = {"is_entity": True, "eid": obj_eid}
                else:
                    object_norm = {"is_entity": False, "value": str(obj), "dtype": "Text"}
            elif isinstance(obj, dict) and (("value" in obj) or ("dtype" in obj) or ("period" in obj)):
                object_norm = self._norm_literal_object(obj)
            else:
                object_norm = {"is_entity": False, "value": str(obj), "dtype": "Text"}

            sid = _statement_hash(subj_eid, pred, object_norm)
            conf = float(s.get("confidence", 0.7))
            statements.append({
                "sid": sid,
                "subject_eid": subj_eid,
                "predicate": pred,           # NO normalization
                "object": object_norm,
                "confidence": conf,
                "model": self.model_name,
                "prompt_version": self.PROMPT_VERSION
            })

        # 3) Evidences → attach spans to all statements in the chunk
        evids_in = raw.get("evidences") or []
        doc_id = doc_ctx.get("doc_id")
        chunk_id = meta.get("chunk_id")
        page = meta.get("page")
        table_id = meta.get("table_id")
        label = meta.get("label") or ("table" if table_id else "text")

        evidences: List[Dict[str, Any]] = []
        if evids_in:
            for ev in evids_in:
                span = ev.get("span") or ev.get("quote") or ""
                if isinstance(span, str):
                    span = {"quote": span[:300]}
                conf = float(ev.get("confidence", 0.8))
                for s in statements:
                    evid = _evidence_hash(doc_id or "", chunk_id or "", span, s["sid"])  # <-- pass sid
                    evidences.append({
                        "evid": evid,
                        "sid": s["sid"],
                        "doc_id": doc_id,
                        "chunk_id": chunk_id,
                        "page": page,
                        "table_id": table_id,
                        "label": label,
                        "span": span,
                        "confidence": conf,
                        "model": self.model_name,
                        "prompt_version": self.PROMPT_VERSION
                    })
        else:
            for s in statements:
                span = {"note": "auto-evidence (no span provided)"}
                evid = _evidence_hash(doc_id or "", chunk_id or "", span, s['sid'])
                evidences.append({
                    "evid": evid,
                    "sid": s["sid"],
                    "doc_id": doc_id,
                    "chunk_id": chunk_id,
                    "page": page,
                    "table_id": table_id,
                    "label": label,
                    "span": span,
                    "confidence": 0.55,
                    "model": self.model_name,
                    "prompt_version": self.PROMPT_VERSION
                })

        return {"entities": entities, "statements": statements, "evidences": evidences}

    # ---- Cache key ----
    def _cache_key(self, kind: str, payload: Dict[str, Any]) -> str:
        base = json.dumps({"k": kind, "p": payload}, sort_keys=True, ensure_ascii=False)
        return hashlib.sha256(base.encode()).hexdigest()

    # ---- Public APIs with verbose printing + token accounting ----
    def extract_table(self, *, table_markdown: str, doc_ctx: Dict[str, Any], meta: Dict[str, Any]) -> Dict[str, Any]:
        if not table_markdown.strip():
            return {"entities": [], "statements": [], "evidences": []}
        vars_payload = {
            "doc_id": doc_ctx.get("doc_id"),
            "chunk_id": meta.get("chunk_id"),
            "page": meta.get("page"),
            "table_id": meta.get("table_id"),
            "source": doc_ctx.get("source"),
            "financial_context": json.dumps(meta.get("financial_context") or {}, ensure_ascii=False),
            "markdown": table_markdown
        }
        ck = self._cache_key("table", vars_payload)
        if self.enable_cache and ck in self._cache:
            if self.verbose:
                print(f"[HIT] cache table doc={doc_ctx.get('doc_id')} chunk={meta.get('chunk_id')}")
            return self._cache[ck]

        if self.verbose:
            print(f"[RUN] table  doc={doc_ctx.get('doc_id')} chunk={meta.get('chunk_id')} page={meta.get('page')} tbl={meta.get('table_id')}")
        self._enforce_rate_limit()
        self.stats["requests"] += 1
        try:
            resp = self.llm_table.invoke(vars_payload)
            pm, cm, tt = self._record_usage(resp, vars_payload)
            raw = self._safe_json_loads(resp.content)
            norm = self._normalize(raw, doc_ctx, meta)
            self._tally(norm)

            if self.verbose:
                print(f"[OK ] table  doc={doc_ctx.get('doc_id')} chunk={meta.get('chunk_id')} "
                    f"ents={len(norm['entities'])} stmts={len(norm['statements'])} evs={len(norm['evidences'])} | tokens p={pm} c={cm} t={tt}")
                for s in norm["statements"][:VERBOSE_SAMPLES]:
                    print("      • stmt", json.dumps({"subj": s["subject_eid"], "pred": s["predicate"], "obj": s["object"]}, ensure_ascii=False))
                for ev in norm["evidences"][:VERBOSE_SAMPLES]:
                    span = ev.get("span", {})
                    span_str = json.dumps(span, ensure_ascii=False)
                    print("      • evid", span_str if len(span_str) < 160 else json.dumps({"preview": span_str[:160] + "..."}, ensure_ascii=False))

            if self.enable_cache:
                self._cache[ck] = norm
            return norm
        except Exception as e:
            self.stats["failures"] += 1
            if self.verbose:
                print(f"[ERROR] Table extraction error: {e}\n{traceback.format_exc()}")
            return {"entities": [], "statements": [], "evidences": []}

    def extract_text(self, *, text: str, doc_ctx: Dict[str, Any], meta: Dict[str, Any]) -> Dict[str, Any]:
        if not text or len(text.split()) < 5:
            return {"entities": [], "statements": [], "evidences": []}
        vars_payload = {
            "doc_id": doc_ctx.get("doc_id"),
            "chunk_id": meta.get("chunk_id"),
            "page": meta.get("page"),
            "label": meta.get("label") or "text",
            "source": doc_ctx.get("source"),
            "text": text
        }
        ck = self._cache_key("text", vars_payload)
        if self.enable_cache and ck in self._cache:
            if self.verbose:
                print(f"[HIT] cache text  doc={doc_ctx.get('doc_id')} chunk={meta.get('chunk_id')}")
            return self._cache[ck]

        if self.verbose:
            print(f"[RUN] text   doc={doc_ctx.get('doc_id')} chunk={meta.get('chunk_id')} page={meta.get('page')} words={len(text.split())}")
        self._enforce_rate_limit()
        self.stats["requests"] += 1
        try:
            resp = self.llm_text.invoke(vars_payload)
            pm, cm, tt = self._record_usage(resp, vars_payload)
            raw = self._safe_json_loads(resp.content)
            norm = self._normalize(raw, doc_ctx, meta)
            self._tally(norm)

            if self.verbose:
                print(f"[OK ] text   doc={doc_ctx.get('doc_id')} chunk={meta.get('chunk_id')} "
                    f"ents={len(norm['entities'])} stmts={len(norm['statements'])} evs={len(norm['evidences'])} | tokens p={pm} c={cm} t={tt}")
                for s in norm["statements"][:VERBOSE_SAMPLES]:
                    print("      • stmt", json.dumps({"subj": s["subject_eid"], "pred": s["predicate"], "obj": s["object"]}, ensure_ascii=False))
                for ev in norm["evidences"][:VERBOSE_SAMPLES]:
                    span = ev.get("span", {})
                    span_str = json.dumps(span, ensure_ascii=False)
                    print("      • evid", span_str if len(span_str) < 160 else json.dumps({"preview": span_str[:160] + "..."}, ensure_ascii=False))

            if self.enable_cache:
                self._cache[ck] = norm
            return norm
        except Exception as e:
            self.stats["failures"] += 1
            if self.verbose:
                print(f"[ERROR] Text extraction error: {e}\n{traceback.format_exc()}")
            return {"entities": [], "statements": [], "evidences": []}

    def _tally(self, norm: Dict[str, List[Dict[str, Any]]]):
        self.stats["success"] += 1
        self.stats["entities"] += len(norm["entities"])
        self.stats["statements"] += len(norm["statements"])
        self.stats["evidences"] += len(norm["evidences"])

    def get_statistics(self) -> Dict[str, Any]:
        total_req = max(1, self.stats.get("requests", 0))
        succ = self.stats.get("success", 0)
        return {
            **self.stats,
            "success_rate": succ / total_req,
            "avg_entities_per_success": self.stats.get("entities", 0) / max(1, succ),
            "avg_statements_per_success": self.stats.get("statements", 0) / max(1, succ),
            "avg_evidences_per_success": self.stats.get("evidences", 0) / max(1, succ),
        }

# ---------------------------------------------------------------------
# Data loading (web + prepared PDF)  [unchanged]
# ---------------------------------------------------------------------
def _safe_load_json(path: Path) -> Optional[dict]:
    try:
        with path.open("r", encoding="utf-8") as f:
            return json.load(f)
    except Exception as e:
        print(f"[WARN] Failed reading {path}: {e}")
        return None

def load_raw_web_docs() -> Any:
    if not WEB_DOCS_PATH.exists():
        print(f"[INFO] Web docs file not found: {WEB_DOCS_PATH}")
        return None
    try:
        with WEB_DOCS_PATH.open("r", encoding="utf-8") as f:
            return json.load(f)
    except Exception as e:
        print(f"[WARN] Failed to load web docs: {e}")
        return None

def _normalize_web_docs(raw: Any) -> List[Dict[str, Any]]:
    if raw is None:
        return []
    if isinstance(raw, list):
        return [r for r in raw if isinstance(r, dict)]
    if isinstance(raw, dict):
        if "documents" in raw and isinstance(raw["documents"], list):
            return [r for r in raw["documents"] if isinstance(r, dict)]
        if "data" in raw and isinstance(raw["data"], list):
            return [r for r in raw["data"] if isinstance(r, dict)]
        if all(isinstance(v, dict) for v in raw.values()):
            out = []
            for k, v in raw.items():
                if "doc_id" not in v and "id" not in v:
                    v = {**v, "doc_id": k}
                out.append(v)
            return out
    print("[WARN] Unrecognized web docs JSON structure; proceeding with empty list.")
    return []

def _derive_web_doc_id(meta: Dict[str, Any], index: int) -> str:
    for key in ("doc_id", "id", "slug"):
        if meta.get(key):
            return str(meta[key])
    url = meta.get("url")
    if url:
        return "web:" + _sha(url, 12)
    snippet = (meta.get("text") or meta.get("content") or "")[:80]
    return "web:" + _sha(f"{index}|{snippet}", 12)

def iter_web_chunks(max_chars: int = 10000) -> Iterator[Dict[str, Any]]:
    raw = load_raw_web_docs()
    records = _normalize_web_docs(raw)
    if not records:
        return
    for idx, meta in enumerate(records):
        if not isinstance(meta, dict):
            continue
        doc_id = _derive_web_doc_id(meta, idx)
        text = meta.get("text") or meta.get("content") or ""
        if not isinstance(text, str):
            if isinstance(text, list):
                text = " ".join(str(t) for t in text)
            else:
                text = str(text)
        if not text.strip():
            continue
        url = meta.get("url")
        start = 0
        chunk_idx = 0
        while start < len(text):
            piece = text[start:start+max_chars]
            yield {
                "source_type": "web",
                "doc_id": doc_id,
                "chunk_id": f"{doc_id}::chunk_{chunk_idx}",
                "page": None,
                "table_id": None,
                "content_type": "text",
                "text": piece,
                "markdown_table": None,
                "url": url,
                "meta": {k: v for k, v in meta.items() if k not in ("text", "content")},
                "year": meta.get("year"),
                "quarter": meta.get("quarter"),
            }
            start += max_chars
            chunk_idx += 1

def iter_prepared_pdf_files(years: Optional[List[str]] = None) -> Iterator[Path]:
    if not PDF_PREPARED_ROOT.exists():
        return
    for p in PDF_PREPARED_ROOT.rglob("*.prepared.json"):
        year = next((seg for seg in p.parts if seg.isdigit() and len(seg) == 4), None)
        if years and year and year not in years:
            continue
        yield p

def _extract_pdf_tables(d: dict) -> List[dict]:
    tables = d.get("reconstructed_tables") or d.get("tables") or []
    out = []
    for t in tables:
        md = t.get("markdown") or t.get("markdown_table") or t.get("md")
        if not md:
            continue
        out.append({
            "table_id": t.get("id") or t.get("table_id"),
            "page": t.get("page") or t.get("page_no") or t.get("page_number"),
            "markdown": md,
            "title": t.get("title"),
            "financial_context": t.get("financial_context"),
            "meta": {k: v for k, v in t.items() if k not in ("markdown","markdown_table","md")}
        })
    return out

def _extract_pdf_text(d: dict) -> List[dict]:
    chunks = d.get("text_chunks") or d.get("chunks") or []
    out = []
    for c in chunks:
        txt = c.get("text") or c.get("content") or ""
        if not txt.strip():
            continue
        out.append({
            "chunk_id": c.get("id") or c.get("chunk_id"),
            "page": c.get("page") or c.get("page_no") or c.get("page_number"),
            "text": txt,
            "meta": {k: v for k, v in c.items() if k not in ("text","content")}
        })
    return out

def iter_pdf_records(years: Optional[List[str]] = None,
                     include_tables: bool = True,
                     include_text: bool = True) -> Iterator[Dict[str, Any]]:
    for path in iter_prepared_pdf_files(years=years):
        data = _safe_load_json(path)
        if not data:
            continue
        meta = data.get("document_meta") or data.get("meta") or {}
        doc_id = data.get("doc_id") or f"pdfdoc:{_sha(str(path), 12)}"
        year = next((seg for seg in path.parts if seg.isdigit() and len(seg) == 4), None)
        quarter = None
        for seg in path.parts:
            if len(seg) == 2 and seg[0].isdigit() and seg[1].upper() == "Q":
                quarter = seg.upper()
        if include_tables:
            for t in _extract_pdf_tables(data):
                yield {
                    "source_type": "pdf",
                    "doc_id": doc_id,
                    "chunk_id": f"{doc_id}::table::{t['table_id']}",
                    "page": t["page"],
                    "table_id": t["table_id"],
                    "content_type": "table",
                    "text": None,
                    "markdown_table": t["markdown"],
                    "url": meta.get("source_url"),
                    "meta": {**meta, **t.get("meta", {}), "title": t.get("title")},
                    "financial_context": t.get("financial_context"),
                    "source_path": str(path),
                    "year": year,
                    "quarter": quarter
                }
        if include_text:
            for c in _extract_pdf_text(data):
                cid = c["chunk_id"] or f"{doc_id}::chunk::{_sha(str(path)+str(c['page'])+c['text'][:32], 10)}"
                yield {
                    "source_type": "pdf",
                    "doc_id": doc_id,
                    "chunk_id": cid,
                    "page": c["page"],
                    "table_id": None,
                    "content_type": "text",
                    "text": c["text"],
                    "markdown_table": None,
                    "url": meta.get("source_url"),
                    "meta": {**meta, **c.get("meta", {})},
                    "source_path": str(path),
                    "year": year,
                    "quarter": quarter
                }

def iter_all_records(years: Optional[List[str]] = None,
                     include_web: bool = True,
                     include_pdf_tables: bool = True,
                     include_pdf_text: bool = True) -> Iterator[Dict[str, Any]]:
    if include_web:
        yield from iter_web_chunks()
    yield from iter_pdf_records(years=years,
                                include_tables=include_pdf_tables,
                                include_text=include_pdf_text)

# ---------------------------------------------------------------------
# Aggregation / Merge
# ---------------------------------------------------------------------
@dataclass
class AggregateStore:
    entities: Dict[str, Dict[str, Any]] = field(default_factory=dict)
    statements: Dict[str, Dict[str, Any]] = field(default_factory=dict)   # sid -> statement
    evidences: Dict[str, Dict[str, Any]] = field(default_factory=dict)    # evid -> evidence
    stmt_key_index: Dict[str, str] = field(default_factory=dict)          # canonical key -> sid

    def _statement_canonical_key(self, s: Dict[str, Any]) -> str:
        obj = json.dumps(s["object"], sort_keys=True, ensure_ascii=False)
        return f"{s['subject_eid']}|{s['predicate']}|{obj}"

    def merge_batch(self, batch: Dict[str, List[Dict[str, Any]]]):
        # entities
        for e in batch["entities"]:
            self.entities[e["eid"]] = e
        # statements with dedupe
        for s in batch["statements"]:
            skey = self._statement_canonical_key(s)
            existing_sid = self.stmt_key_index.get(skey)
            if existing_sid:
                existing = self.statements[existing_sid]
                existing["confidence"] = max(existing.get("confidence", 0), s.get("confidence", 0))
            else:
                self.statements[s["sid"]] = s
                self.stmt_key_index[skey] = s["sid"]
        # evidences
        for ev in batch["evidences"]:
            self.evidences[ev["evid"]] = ev

    def to_dict(self) -> Dict[str, Any]:
        sid_to_evids = defaultdict(list)
        for ev in self.evidences.values():
            sid_to_evids[ev["sid"]].append(ev["evid"])
        out_statements = []
        for s in self.statements.values():
            s_out = dict(s)
            s_out["evidence_ids"] = sid_to_evids.get(s["sid"], [])
            out_statements.append(s_out)
        return {
            "entities": list(self.entities.values()),
            "statements": out_statements,
            "evidences": list(self.evidences.values())
        }

    def save(self, path: Path):
        tmp = path.with_suffix(".tmp")
        with tmp.open("w", encoding="utf-8") as f:
            json.dump(self.to_dict(), f, indent=2)
        tmp.replace(path)

    def save_jsonl(self, base_dir: Path):
        data = self.to_dict()
        for name, seq in data.items():
            p = base_dir / f"{name}.jsonl"
            tmp = p.with_suffix(".tmp")
            with tmp.open("w", encoding="utf-8") as f:
                for obj in seq:
                    f.write(json.dumps(obj, ensure_ascii=False) + "\n")
            tmp.replace(p)

def load_existing_store(path: Path) -> AggregateStore:
    if not path.exists():
        return AggregateStore()
    try:
        data = json.loads(path.read_text(encoding="utf-8"))
        store = AggregateStore()
        for e in data.get("entities", []):
            store.entities[e["eid"]] = e
        for s in data.get("statements", []):
            store.statements[s["sid"]] = s
            store.stmt_key_index[store._statement_canonical_key(s)] = s["sid"]
        for ev in data.get("evidences", []):
            store.evidences[ev["evid"]] = ev
        return store
    except Exception as e:
        print(f"[WARN] Failed loading existing aggregate: {e}")
        return AggregateStore()

# ---------------------------------------------------------------------
# Orchestrator
# ---------------------------------------------------------------------
def run_extraction(years: Optional[List[str]] = None,
                   include_web: bool = True,
                   include_pdf_tables: bool = True,
                   include_pdf_text: bool = True,
                   max_records: Optional[int] = None,
                   output_name: str = "knowledge_extraction_aggregate.json",
                   model_name: str = GENAI_MODEL) -> Dict[str, Any]:
    aggregate_path = OUTPUT_DIR / output_name
    store = load_existing_store(aggregate_path) if RESUME else AggregateStore()
    
    # New addition
    # ===============================================================
    pre_counts = {
        "entities": len(store.entities),
        "statements": len(store.statements),
        "evidences": len(store.evidences),
    }
    print("[INIT] store entities=%d statements=%d evidences=%d" %
        (pre_counts["entities"], pre_counts["statements"], pre_counts["evidences"]))
    # ==============================================================

    extractor = GraphExtractionAgent(model_name=model_name)

    # Optional token usage measurement via callback context (kept; may or may not populate)
    if ENABLE_TOKEN_USAGE:
        try:
            from langchain.callbacks.token_usage import TokenUsageCallbackHandler
            token_handler = TokenUsageCallbackHandler()
            token_handler.__enter__()  # start tracking
            print("[INFO] TokenUsageCallbackHandler active")
        except Exception:
            token_handler = None
    else:
        token_handler = None

    failures = 0
    processed = 0
    start_time = time.time()
    last_checkpoint = start_time

    for rec in iter_all_records(years=years,
                                include_web=include_web,
                                include_pdf_tables=include_pdf_tables,
                                include_pdf_text=include_pdf_text):
        if max_records and processed >= max_records:
            break

        doc_ctx = {"doc_id": rec["doc_id"], "source": rec.get("url")}
        meta = {
            "chunk_id": rec["chunk_id"],
            "page": rec.get("page"),
            "table_id": rec.get("table_id"),
            "label": "table" if rec["content_type"] == "table" else "text",
            "financial_context": rec.get("financial_context")
        }

        try:
            if rec["content_type"] == "table" and rec.get("markdown_table"):
                batch = extractor.extract_table(table_markdown=rec["markdown_table"], doc_ctx=doc_ctx, meta=meta)
            else:
                batch = extractor.extract_text(text=rec.get("text") or "", doc_ctx=doc_ctx, meta=meta)
            store.merge_batch(batch)
        except Exception as e:
            failures += 1
            print(f"[ERROR] Extraction failure doc_id={rec['doc_id']} type={rec['content_type']}: {e}")
            if failures >= MAX_FAILURES:
                print("[ABORT] Failure threshold reached.")
                break

        processed += 1
        now = time.time()
        if (processed % CHECKPOINT_EVERY_RECORDS == 0) or (now - last_checkpoint >= CHECKPOINT_EVERY_SECONDS):
            store.save(aggregate_path)
            if SAVE_JSONL:
                store.save_jsonl(OUTPUT_DIR)
            tokens_total = extractor.stats.get("total_tokens", 0)
            print(f"[CHECKPOINT] processed={processed} ents={len(store.entities)} stmts={len(store.statements)} "
                f"evs={len(store.evidences)} elapsed={int(now-start_time)}s tokens_total={tokens_total}")
            last_checkpoint = now

    # Final save
    store.save(aggregate_path)
    if SAVE_JSONL:
        store.save_jsonl(OUTPUT_DIR)

    stats_agent = extractor.get_statistics()
    summary = {
        "processed_records": processed,
        "entities": len(store.entities),
        "statements": len(store.statements),
        "evidences": len(store.evidences),
        "failures": failures,
        "elapsed_seconds": int(time.time() - start_time),
        "agent_stats": stats_agent
    }

    # Close token tracking
    if token_handler:
        try:
            token_handler.__exit__(None, None, None)
            summary["token_usage_callback"] = getattr(token_handler, "total_usage", {})
        except Exception:
            pass

    # >>> Always include our in-agent totals
    summary["token_usage_agent"] = {
        "prompt_tokens": stats_agent.get("prompt_tokens", 0),
        "completion_tokens": stats_agent.get("completion_tokens", 0),
        "total_tokens": stats_agent.get("total_tokens", 0),
    }

    with (OUTPUT_DIR / "run_summary.json").open("w", encoding="utf-8") as f:
        json.dump(summary, f, indent=2)
    print("[DONE]", json.dumps(summary, indent=2))
    
    # Added Code
    # ================================================================================
    post_counts = {
        "entities": len(store.entities),
        "statements": len(store.statements),
        "evidences": len(store.evidences),
    }
    summary["delta"] = {
        "entities_added": post_counts["entities"] - pre_counts["entities"],
        "statements_added": post_counts["statements"] - pre_counts["statements"],
        "evidences_added": post_counts["evidences"] - pre_counts["evidences"],
    }

    # ================================================================================
    
    
    return summary


In [5]:
run_extraction(
    years=["2024", "2025"],
    include_web=True,
    include_pdf_tables=True,
    include_pdf_text=True,
    max_records=None,               # set None for full run
    model_name=GENAI_MODEL        # "gemini-2.5-flash-lite" by default
)

[CHECKPOINT] processed=18 ents=38 stmts=28 evs=55 elapsed=307s tokens_total=28769
[CHECKPOINT] processed=25 ents=38 stmts=28 evs=75 elapsed=433s tokens_total=38577
[CHECKPOINT] processed=42 ents=38 stmts=28 evs=75 elapsed=746s tokens_total=71225
[CHECKPOINT] processed=50 ents=41 stmts=32 evs=79 elapsed=895s tokens_total=82337
[CHECKPOINT] processed=67 ents=52 stmts=48 evs=121 elapsed=1201s tokens_total=103240
[CHECKPOINT] processed=75 ents=67 stmts=69 evs=194 elapsed=1344s tokens_total=113423
[CHECKPOINT] processed=92 ents=83 stmts=89 evs=246 elapsed=1791s tokens_total=135120
[CHECKPOINT] processed=100 ents=88 stmts=104 evs=300 elapsed=1918s tokens_total=144800
[CHECKPOINT] processed=117 ents=114 stmts=145 evs=427 elapsed=2228s tokens_total=166308
[CHECKPOINT] processed=125 ents=116 stmts=148 evs=436 elapsed=2370s tokens_total=177399
[CHECKPOINT] processed=135 ents=134 stmts=165 evs=502 elapsed=2761s tokens_total=188372
[CHECKPOINT] processed=150 ents=135 stmts=170 evs=510 elapsed=3009

KeyboardInterrupt: 

In [2]:
run_extraction(
    years=["2025"],
    include_web=False,          # <— turn off web crawling
    include_pdf_tables=True,
    include_pdf_text=True,
    max_records=None,
    model_name=GENAI_MODEL
)

[INIT] store entities=3156 statements=7292 evidences=61697
[CHECKPOINT] processed=25 ents=3156 stmts=7292 evs=61697 elapsed=283s tokens_total=4705
[CHECKPOINT] processed=45 ents=3156 stmts=7292 evs=61697 elapsed=586s tokens_total=10732
[CHECKPOINT] processed=50 ents=3156 stmts=7292 evs=61697 elapsed=677s tokens_total=13503
[CHECKPOINT] processed=69 ents=3156 stmts=7292 evs=61697 elapsed=982s tokens_total=24710
[CHECKPOINT] processed=75 ents=3156 stmts=7292 evs=61697 elapsed=1089s tokens_total=28352
[CHECKPOINT] processed=94 ents=3156 stmts=7292 evs=61697 elapsed=1401s tokens_total=38124
[CHECKPOINT] processed=100 ents=3156 stmts=7292 evs=61697 elapsed=1470s tokens_total=40697


KeyboardInterrupt: 

# KG rebuild with LLMGraphTransformer (table-aware, budgeted)

This section filters ~50–70 useful web pages by title, chunks to ~9k tokens, extracts triples from text and tables using LLMGraphTransformer, and writes to Neo4j with provenance.

In [None]:
# Config: limits, env, and paths

import os, json, re, math

from pathlib import Path

from typing import List, Dict, Any, Iterable, Tuple



# Limits

MAX_PAGES = 70  # cap at 70

MIN_PAGES = 50  # try to keep >=50 if available

MAX_TOKENS_PER_CHUNK = 9000  # ~9k tokens budget per batch

OVERLAP_TOKENS = 400  # conservative overlap for context continuity



# Data sources

ROOT = Path(r"i:\My Drive\M. Tech AI ML\AIML SEM 4\Dissertation\Project")

WEB_DOCS_JSON = ROOT/"output"/"verizon_production_web_documents.json"

DOC_LING_DIR = ROOT/"output"/"extraction_results"



# Neo4j conn (env-driven)

NEO4J_URI = os.getenv("NEO4J_URI", "bolt://localhost:7687")

NEO4J_USER = os.getenv("NEO4J_USER", "neo4j")

NEO4J_PASSWORD = os.getenv("NEO4J_PASSWORD", "password")

NEO4J_DATABASE = os.getenv("NEO4J_DATABASE", "neo4j")



# Model config

GENAI_MODEL = os.getenv("GENAI_MODEL", "gemini-1.5-pro")

EMBED_MODEL = os.getenv("EMBED_MODEL", "text-embedding-004")



# Basic token estimator using tiktoken-like fallback

def rough_token_count(text: str) -> int:

    if not text: return 0

    # heuristic: ~4 chars/token, clamp

    return max(1, math.ceil(len(text) / 4))



def select_useful_pages(docs: List[Dict[str, Any]],

                        keep_terms=("investor","financial","quarter","annual","10-k","10-q","earnings","webcast","presentation","report","capex","opex","revenue","cash flow","guidance","forecast","board","governance")) -> List[Dict[str, Any]]:

    """Score by title heuristics; prefer investor/financial content and long-form pages."""

    scored = []

    for d in docs:

        url = d.get("url","")

        content = d.get("content","") or ""

        ttl = url.split("/")[-1].replace("-"," ").lower()

        score = 0

        # title and path hints

        for t in keep_terms:

            if t in ttl:

                score += 8

        if any(seg in url for seg in ["/investors/","quarterly-reports","financial","earnings","conference","webcast","annual"]):

            score += 10

        # content length as a proxy for substance

        score += min(10, len(content)//2000)

        # de-prioritize T&C, privacy, generic parenting pages

        if any(x in url for x in ["terms-conditions","privacy","parenting","international/privacy","our-company/our-culture"]):

            score -= 12

        scored.append((score, d))

    scored.sort(key=lambda x: x[0], reverse=True)

    top = [d for _, d in scored if _ > 0]

    return top[:MAX_PAGES]



def chunk_text_to_budget(text: str, max_tokens=MAX_TOKENS_PER_CHUNK, overlap=OVERLAP_TOKENS) -> List[str]:

    if not text: return []

    tokens = rough_token_count(text)

    if tokens <= max_tokens:

        return [text]

    # naive sliding window by character proportional to tokens

    chars_per_token = max(1, len(text)//tokens)

    max_chars = max_tokens * chars_per_token

    overlap_chars = overlap * chars_per_token

    chunks = []

    start = 0

    while start < len(text):

        end = min(len(text), start + max_chars)

        chunk = text[start:end]

        chunks.append(chunk)

        if end >= len(text): break

        start = end - overlap_chars

        if start < 0: start = 0

    return chunks


In [None]:
# Load web documents and select 50–70 useful pages by title/URL heuristics

with open(WEB_DOCS_JSON, "r", encoding="utf-8") as f:

    web_blob = json.load(f)

docs = web_blob.get("documents", [])

print(f"Total web docs: {len(docs)}")

selected = select_useful_pages(docs)

if len(selected) < MIN_PAGES:

    # fallback: relax filters and take top MIN_PAGES by content length

    docs_sorted = sorted(docs, key=lambda d: len(d.get("content","")), reverse=True)

    selected = docs_sorted[:MIN_PAGES]

print(f"Selected pages: {len(selected)} (target 50–70)")

for i, d in enumerate(selected[:5], 1):

    print(i, d.get("url",""), len(d.get("content","")))

In [None]:
# Extract triples from text and tables

from langchain_community.graphs.graph_document import GraphDocument

from langchain_community.graphs.neo4j_graph import Neo4jGraph

from langchain_experimental.graph_transformers import LLMGraphTransformer

from langchain_google_genai import ChatGoogleGenerativeAI

from neo4j import GraphDatabase

from docling.document_converter import DocumentConverter

from docling.models.document import Table as DLTable



# Initialize LLM and graph

llm = ChatGoogleGenerativeAI(model=GENAI_MODEL, temperature=0.2)

graph = Neo4jGraph(url=NEO4J_URI, username=NEO4J_USER, password=NEO4J_PASSWORD)



# Optional: rebuild graph (wipe only Verizon domain) – comment out if you want to append

REBUILD = True

if REBUILD:

    with graph._driver.session(database=NEO4J_DATABASE) as s:

        s.run("MATCH (n) DETACH DELETE n")

        print("Neo4j cleared.")



# Transformer configured to be table-aware by feeding serialized tables as text prompts per chunk

kg_transformer = LLMGraphTransformer(llm=llm)



def serialize_table(tbl: DLTable) -> str:

    # produce a simple markdown-like table string

    try:

        headers = [c.text or "" for c in (tbl.columns or [])]

        rows = []

        for r in (tbl.rows or []):

            rows.append([c.text or "" for c in r.cells])

        lines = []

        if headers:

            lines.append(" | ".join(headers))

            lines.append(" | ".join(["---"]*len(headers)))

        for r in rows:

            lines.append(" | ".join(r))

        return "\n".join(lines)

    except Exception:

        return ""



def iter_pages(selected_docs: List[Dict[str,Any]]):

    for d in selected_docs:

        yield d.get("url",""), d.get("content","")



converter = DocumentConverter()



all_graph_docs: List[GraphDocument] = []

batch_tokens = 0

batch_chunks: List[Tuple[str,str]] = []  # (url, chunk_text)



def flush_batch(batch: List[Tuple[str,str]]):

    global all_graph_docs

    if not batch: return

    inputs = []

    for url, text in batch:

        # Try to parse tables if file exists in docling results (optional)

        # For web HTML, we already have plain text; we attach as-is

        inputs.append(text)

    gdocs = kg_transformer.transform(texts=inputs)

    # Attach provenance (source URL) into GraphDocument metadata

    for (url, _), gd in zip(batch, gdocs):

        if not hasattr(gd, "metadata") or gd.metadata is None:

            gd.metadata = {}

        gd.metadata["source_url"] = url

    all_graph_docs.extend(gdocs)



for url, content in iter_pages(selected):

    # chunk by token budget

    chunks = chunk_text_to_budget(content, MAX_TOKENS_PER_CHUNK, OVERLAP_TOKENS)

    for ch in chunks:

        tks = rough_token_count(ch)

        if batch_tokens + tks > MAX_TOKENS_PER_CHUNK and batch_chunks:

            flush_batch(batch_chunks)

            batch_chunks = []

            batch_tokens = 0

        batch_chunks.append((url, ch))

        batch_tokens += tks



# flush remaining

flush_batch(batch_chunks)

print(f"Graph docs extracted: {len(all_graph_docs)}")



# Write to Neo4j with constraints

graph.add_graph_documents(all_graph_docs, baseEntityLabel=True, include_source=True)

print("Write complete.")

In [None]:
# Quick validation: counts and sample

from neo4j import GraphDatabase



driver = GraphDatabase.driver(NEO4J_URI, auth=(NEO4J_USER, NEO4J_PASSWORD))

with driver.session(database=NEO4J_DATABASE) as s:

    c = s.run("MATCH (n) RETURN count(n) AS c").single()["c"]

    r = s.run("MATCH ()-[r]->() RETURN count(r) AS c").single()["c"]

    print("Nodes:", c, "Relationships:", r)

    ex = s.run("MATCH (a)-[r]->(b) RETURN a,r,b LIMIT 3").data()

    for i, row in enumerate(ex, 1):

        a = row['a'].get('name') or row['a'].get('id') or row['a'].get('label')

        b = row['b'].get('name') or row['b'].get('id') or row['b'].get('label')

        print(i, row['r'].type, '::', a, '->', b)

driver.close()

In [None]:
# Optional: Ingest tables from Docling JSON outputs (PDF parsing)

import glob



def load_docling_tables(docling_path: str) -> List[Tuple[str, str]]:

    """Return list of (provenance, serialized_table_text)."""

    with open(docling_path, "r", encoding="utf-8") as f:

        blob = json.load(f)

    results = []

    # docling JSON schema: pages -> blocks (type: table) OR content.tables depending on version

    tables = []

    if isinstance(blob, dict):

        # newer schema

        if "tables" in blob:

            tables = blob["tables"]

        elif "pages" in blob:

            for p in blob["pages"]:

                for b in p.get("blocks", []):

                    if b.get("type") == "table":

                        tables.append(b)

    for idx, t in enumerate(tables):

        # Attempt to read cells

        md = []

        headers = [c.get("text","") for c in t.get("header", [])] if isinstance(t.get("header"), list) else []

        if headers:

            md.append(" | ".join(headers))

            md.append(" | ".join(["---"]*len(headers)))

        for row in t.get("rows", []):

            cells = [c.get("text","") for c in row]

            md.append(" | ".join(cells))

        if not md and "markdown" in t:

            md = [t["markdown"]]

        txt = "\n".join(md)

        if txt.strip():

            prov = f"docling:{Path(docling_path).name}#table{idx}"

            results.append((prov, txt))

    return results



# Collect docling files

docling_files = glob.glob(str((ROOT/"output").joinpath("*.docling.json")))

print("Docling JSON files:", len(docling_files))



table_graph_docs: List[GraphDocument] = []

batch_tokens = 0

batch_chunks: List[Tuple[str,str]] = []



def flush_table_batch(batch: List[Tuple[str,str]]):

    global table_graph_docs

    if not batch: return

    texts = [t for _, t in batch]

    gdocs = kg_transformer.transform(texts=texts)

    for (prov, _), gd in zip(batch, gdocs):

        if not hasattr(gd, "metadata") or gd.metadata is None:

            gd.metadata = {}

        gd.metadata["source_url"] = prov

        gd.metadata["source_type"] = "table"

    table_graph_docs.extend(gdocs)



for fp in docling_files:

    pairs = load_docling_tables(fp)

    for prov, txt in pairs:

        tks = rough_token_count(txt)

        if batch_tokens + tks > MAX_TOKENS_PER_CHUNK and batch_chunks:

            flush_table_batch(batch_chunks)

            batch_chunks = []

            batch_tokens = 0

        batch_chunks.append((prov, txt))

        batch_tokens += tks



flush_table_batch(batch_chunks)

print(f"Table graph docs extracted: {len(table_graph_docs)}")



if table_graph_docs:

    graph.add_graph_documents(table_graph_docs, baseEntityLabel=True, include_source=True)

    print("Table triples written to Neo4j.")