In [1]:
# Configure your Gemini API key
import os

try:
    # If you're in Kaggle:
    from kaggle_secrets import UserSecretsClient
    GOOGLE_API_KEY = UserSecretsClient().get_secret("GOOGLE_API_KEY")
    os.environ["GOOGLE_API_KEY"] = GOOGLE_API_KEY
except Exception as e:
    # If local, set manually:
    # os.environ["GOOGLE_API_KEY"] = "YOUR_API_KEY_HERE"
    print(" Set GOOGLE_API_KEY in environment or Kaggle Secrets.", e)

print(" API setup ready (if key is configured).")


 API setup ready (if key is configured).


In [2]:
# Imports
from typing import Any, Dict, List

import logging
import sqlite3
from dataclasses import dataclass

from google.adk.agents import Agent, LlmAgent
from google.adk.apps.app import App, EventsCompactionConfig
from google.adk.models.google_llm import Gemini
from google.adk.sessions import DatabaseSessionService
from google.adk.runners import Runner
from google.adk.tools.tool_context import ToolContext
from google.genai import types

logging.basicConfig(level=logging.INFO)
print("ADK and libraries imported.")


ADK and libraries imported.


In [3]:
# Global config
APP_NAME = "smartdocs_app"
USER_ID = "demo_user"
MODEL_NAME = "gemini-2.5-flash-lite"

retry_config = types.HttpRetryOptions(
    attempts=5,
    exp_base=7,
    initial_delay=1,
    http_status_codes=[429, 500, 503, 504],
)

# Session service with SQLite persistence
db_url = "sqlite:///smartdocs_sessions.db"
session_service = DatabaseSessionService(db_url=db_url)

print(" Session service initialized with SQLite.")


 Session service initialized with SQLite.


In [4]:
from google import genai

# Create a single client for the whole notebook
genai_client = genai.Client()

def call_gemini(prompt: str) -> str:
    """
    Helper to call Gemini and return plain text, compatible with different
    google-genai versions.
    """
    response = genai_client.models.generate_content(
        model=MODEL_NAME,
        contents=prompt,
    )

    # 1) Newer SDKs: output_text
    if hasattr(response, "output_text") and response.output_text:
        return response.output_text.strip()

    # 2) Older-style: text
    if hasattr(response, "text") and response.text:
        return response.text.strip()

    # 3) Fallback: gather from candidates[0].content.parts
    try:
        parts = response.candidates[0].content.parts
        texts = [
            getattr(p, "text", "")
            for p in parts
            if getattr(p, "text", "")
        ]
        return "".join(texts).strip()
    except Exception as e:
        print("Unexpected response format from Gemini:")
        print(response)
        raise e


In [5]:
import asyncio

async def run_session(
    runner_instance: Runner,
    user_queries: List[str] | str,
    session_name: str = "default",
):
    print(f"\n### Session: {session_name}")
    app_name = runner_instance.app_name

    # Create or get session
    try:
        session = await session_service.create_session(
            app_name=app_name, user_id=USER_ID, session_id=session_name
        )
    except:
        session = await session_service.get_session(
            app_name=app_name, user_id=USER_ID, session_id=session_name
        )

    if isinstance(user_queries, str):
        user_queries = [user_queries]

    for query in user_queries:
        print(f"\nUser > {query}")
        query_content = types.Content(role="user", parts=[types.Part(text=query)])

        async for event in runner_instance.run_async(
            user_id=USER_ID, session_id=session.id, new_message=query_content
        ):
            if event.content and event.content.parts:
                text = event.content.parts[0].text
                if text and text != "None":
                    print(f"{MODEL_NAME} > {text}")


In [6]:
import asyncio

async def run_session(
    runner_instance: Runner,
    user_queries: List[str] | str,
    session_name: str = "default",
):
    print(f"\n### Session: {session_name}")
    app_name = runner_instance.app_name

    # Create or get session
    try:
        session = await session_service.create_session(
            app_name=app_name, user_id=USER_ID, session_id=session_name
        )
    except:
        session = await session_service.get_session(
            app_name=app_name, user_id=USER_ID, session_id=session_name
        )

    if isinstance(user_queries, str):
        user_queries = [user_queries]

    for query in user_queries:
        print(f"\nUser > {query}")
        query_content = types.Content(role="user", parts=[types.Part(text=query)])

        async for event in runner_instance.run_async(
            user_id=USER_ID, session_id=session.id, new_message=query_content
        ):
            if event.content and event.content.parts:
                text = event.content.parts[0].text
                if text and text != "None":
                    print(f"{MODEL_NAME} > {text}")


In [7]:
# Tool: save structured document record to SQLite
def init_smartdocs_db():
    with sqlite3.connect("smartdocs.db") as conn:
        cursor = conn.cursor()
        cursor.execute(
            """
            CREATE TABLE IF NOT EXISTS documents (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                doc_type TEXT,
                summary TEXT,
                fields_json TEXT,
                suggested_action TEXT
            )
            """
        )
        conn.commit()

init_smartdocs_db()
print("smartdocs.db initialized.")


smartdocs.db initialized.


In [8]:
import json

def save_document_record(
    tool_context: ToolContext,
    doc_type: str,
    summary: str,
    fields: Dict[str, Any],
    suggested_action: str,
) -> Dict[str, Any]:
    """Custom tool that saves a processed document to SQLite."""
    logging.info("ðŸ”§ save_document_record called.")
    fields_json = json.dumps(fields, ensure_ascii=False)

    with sqlite3.connect("smartdocs.db") as conn:
        cursor = conn.cursor()
        cursor.execute(
            """
            INSERT INTO documents (doc_type, summary, fields_json, suggested_action)
            VALUES (?, ?, ?, ?)
            """,
            (doc_type, summary, fields_json, suggested_action),
        )
        conn.commit()
        doc_id = cursor.lastrowid

    logging.info(f"Document saved with id={doc_id}")
    return {"status": "success", "doc_id": doc_id}


In [9]:
def lookup_vendor_info(tool_context: ToolContext, vendor_name: str) -> Dict[str, Any]:
    """Stub tool to simulate vendor lookup."""
    logging.info(f" lookup_vendor_info called for vendor={vendor_name}")
    # Fake result for demo
    return {
        "vendor_name": vendor_name,
        "risk_level": "low",
        "country": "India",
        "notes": "Stub vendor info. In real life, this could query an API."
    }


In [10]:
type_id_agent = LlmAgent(
    model=Gemini(model=MODEL_NAME, retry_options=retry_config),
    name="type_id_agent",
    description=(
        "Classify the business document type. "
        "Allowed types: invoice, purchase_order, contract, resume, other. "
        "Return ONLY a JSON object with keys: doc_type, confidence, reasoning."
    ),
)


In [11]:
extraction_agent = LlmAgent(
    model=Gemini(model=MODEL_NAME, retry_options=retry_config),
    name="extraction_agent",
    description=(
        "Given a business document and its type, extract structured fields. "
        "For invoices/purchase orders: vendor, customer, date, due_date, currency, total_amount, line_items. "
        "For contracts: parties, start_date, end_date, key_clauses, jurisdiction. "
        "For resumes: name, email, phone, skills, experience_summary. "
        "Return ONLY a valid JSON object with fields appropriate for the doc type."
    ),
)


In [12]:
summary_agent = LlmAgent(
    model=Gemini(model=MODEL_NAME, retry_options=retry_config),
    name="summary_agent",
    description=(
        "Write a concise business summary (3â€“5 sentences) of the given document. "
        "Highlight who is involved, what the document is about, and any key numbers or dates."
    ),
)


In [13]:
action_agent = LlmAgent(
    model=Gemini(model=MODEL_NAME, retry_options=retry_config),
    name="action_agent",
    description=(
        "Given the document type, extracted fields, and summary, recommend the next action. "
        "Choose from: approve, escalate_to_finance, escalate_to_legal, request_more_info, archive. "
        "Return a JSON object with keys: suggested_action, reasoning."
    ),
    tools=[save_document_record, lookup_vendor_info],
)


In [14]:
@dataclass
class ProcessedDocument:
    raw_text: str
    doc_type: str
    fields: Dict[str, Any]
    summary: str
    suggested_action: str
    doc_id: int | None = None


In [15]:
import json
import re

def safe_json_loads(text: str) -> dict:
    """
    Try to parse JSON from a model response that may contain extra text,
    markdown fences, etc.
    """
    if text is None:
        raise ValueError("Empty response from model, cannot parse JSON.")

    text = text.strip()

    # 1) Try direct JSON parse
    try:
        return json.loads(text)
    except Exception:
        pass

    # 2) Remove ```json ... ``` fences if present
    if text.startswith("```"):
        # Remove first and last code fences
        text = re.sub(r"^```[a-zA-Z0-9]*", "", text).strip()
        text = re.sub(r"```$", "", text).strip()
        try:
            return json.loads(text)
        except Exception:
            pass

    # 3) Try to find the first {...} block in the text
    start = text.find("{")
    end = text.rfind("}")
    if start != -1 and end != -1 and end > start:
        candidate = text[start : end + 1]
        try:
            return json.loads(candidate)
        except Exception:
            pass

    # 4) If still failing, print the text to debug and raise
    print("Could not parse JSON from text:\n", text)
    raise ValueError("safe_json_loads: unable to parse JSON from model output.")


In [16]:
from dataclasses import dataclass
from typing import Any, Dict
import json
import re
import logging

from google.adk.tools.tool_context import ToolContext  # you already imported this earlier


# ---------- Helper: safer JSON parsing ----------

def safe_json_loads(text: str) -> dict:
    """
    Try to parse JSON from a model response that may contain extra text
    or markdown fences like ```json ... ```.
    """
    if text is None:
        raise ValueError("Empty response from model, cannot parse JSON.")

    text = text.strip()

    # 1) Try direct JSON parse
    try:
        return json.loads(text)
    except Exception:
        pass

    # 2) Strip ```json ... ``` style fences
    if text.startswith("```"):
        # remove starting fence
        text_no_fence = re.sub(r"^```[a-zA-Z0-9]*", "", text).strip()
        # remove ending fence
        text_no_fence = re.sub(r"```$", "", text_no_fence).strip()
        try:
            return json.loads(text_no_fence)
        except Exception:
            pass

    # 3) Extract the first {...} block
    start = text.find("{")
    end = text.rfind("}")
    if start != -1 and end != -1 and end > start:
        candidate = text[start : end + 1]
        try:
            return json.loads(candidate)
        except Exception:
            pass

    # 4) Give up with a helpful message
    print("safe_json_loads: could not parse JSON from this text:\n", text)
    raise ValueError("safe_json_loads: unable to parse JSON from model output.")


# ---------- Data class for the pipeline result ----------

@dataclass
class ProcessedDocument:
    raw_text: str
    doc_type: str
    fields: Dict[str, Any]
    summary: str
    suggested_action: str
    doc_id: int | None = None


# ---------- Main pipeline using Gemini via call_gemini ----------

def process_document(raw_text: str) -> ProcessedDocument:
    """
    High-level pipeline that calls Gemini multiple times with different prompts
    to simulate multiple 'agents': type classifier, extractor, summarizer,
    and action suggester. Finally saves the result via save_document_record().
    """

    # 1) Type Identification "agent"
    type_prompt = (
        "You are a classifier for business documents.\n\n"
        f"Document:\n{raw_text}\n\n"
        "Classify the document type. "
        "Allowed types: invoice, purchase_order, contract, resume, other.\n"
        "Respond with ONLY a JSON object like this:\n"
        "{\"doc_type\": \"invoice\", \"confidence\": 0.95, \"reasoning\": \"...\"}"
    )
    type_text = call_gemini(type_prompt)
    logging.info(f"Type agent raw response: {type_text}")
    type_data = safe_json_loads(type_text)
    doc_type = type_data.get("doc_type", "other")

    # 2) Extraction "agent"
    extract_prompt = (
        f"Document type: {doc_type}\n\n"
        f"Document:\n{raw_text}\n\n"
        "Extract structured information as JSON ONLY.\n"
        "For invoices/purchase orders, include: "
        "vendor, customer, invoice_date, due_date, currency, total_amount, line_items.\n"
        "For contracts, include: parties, start_date, end_date, key_clauses, jurisdiction.\n"
        "For resumes, include: name, email, phone, skills, experience_summary.\n"
        "Return ONLY a valid JSON object."
    )
    extract_text = call_gemini(extract_prompt)
    logging.info(f"Extraction agent raw response: {extract_text}")
    fields = safe_json_loads(extract_text)

    # 3) Summary "agent"
    summary_prompt = (
        "Write a short business summary (3â€“5 sentences) of this document. "
        "Highlight who is involved, what the document is about, and any key "
        "amounts or dates.\n\n"
        f"{raw_text}"
    )
    summary = call_gemini(summary_prompt)

    # 4) Action Suggestion "agent"
    action_prompt = (
        "You are a decision support assistant for business documents.\n\n"
        f"Document type: {doc_type}\n"
        f"Extracted fields (JSON): {json.dumps(fields, ensure_ascii=False)}\n"
        f"Summary: {summary}\n\n"
        "Choose ONE action from this list:\n"
        " - approve\n"
        " - escalate_to_finance\n"
        " - escalate_to_legal\n"
        " - request_more_info\n"
        " - archive\n\n"
        "Respond with ONLY JSON like:\n"
        "{\"suggested_action\": \"approve\", \"reasoning\": \"...\"}"
    )
    action_text = call_gemini(action_prompt)
    logging.info(f"Action agent raw response: {action_text}")
    action_data = safe_json_loads(action_text)
    suggested_action = action_data.get("suggested_action", "archive")

    # 5) Save via custom tool (your save_document_record from earlier)
    save_result = save_document_record(
        tool_context=None,  # dummy state is fine here
        doc_type=doc_type,
        summary=summary,
        fields=fields,
        suggested_action=suggested_action,
    )
    doc_id = save_result.get("doc_id")

    return ProcessedDocument(
        raw_text=raw_text,
        doc_type=doc_type,
        fields=fields,
        summary=summary,
        suggested_action=suggested_action,
        doc_id=doc_id,
    )


In [17]:
smartdocs_app = App(
    name=APP_NAME,
    root_agent=Agent(
        model=Gemini(model=MODEL_NAME, retry_options=retry_config),
        name="smartdocs_root",
        description="Root agent for SmartDocs (used mainly for interactive chat demos).",
    ),
    events_compaction_config=EventsCompactionConfig(
        compaction_interval=3,
        overlap_size=1,
    ),
)

smartdocs_runner = Runner(
    app=smartdocs_app,
    session_service=session_service,
)

print("SmartDocs app and runner initialized with compaction.")


SmartDocs app and runner initialized with compaction.


  events_compaction_config=EventsCompactionConfig(


In [18]:
sample_invoice = """
INVOICE
Vendor: ACME Pvt Ltd
Customer: CMB Solutions
Invoice Date: 2025-10-01
Due Date: 2025-10-15
Currency: INR
Total Amount: 45,000
Items:
 - Cloud hosting services - 30,000
 - Support & maintenance - 15,000
"""

processed = process_document(sample_invoice)

print("Doc Type:", processed.doc_type)
print("Suggested Action:", processed.suggested_action)
print("Summary:\n", processed.summary)
print("Fields:\n", json.dumps(processed.fields, indent=2, ensure_ascii=False))
print("Stored as doc_id:", processed.doc_id)


Doc Type: invoice
Suggested Action: approve
Summary:
 This document is an invoice from ACME Pvt Ltd to CMB Solutions for cloud hosting services and support & maintenance. The total amount due is INR 45,000, with a due date of October 15, 2025. This transaction occurred on October 1, 2025.
Fields:
 {
  "vendor": "ACME Pvt Ltd",
  "customer": "CMB Solutions",
  "invoice_date": "2025-10-01",
  "due_date": "2025-10-15",
  "currency": "INR",
  "total_amount": 45000,
  "line_items": [
    {
      "description": "Cloud hosting services",
      "amount": 30000
    },
    {
      "description": "Support & maintenance",
      "amount": 15000
    }
  ]
}
Stored as doc_id: 1


In [19]:
test_docs = [
    {
        "name": "Invoice 1",
        "expected_type": "invoice",
        "text": sample_invoice,
    },
    {
        "name": "Resume 1",
        "expected_type": "resume",
        "text": "Curriculum Vitae\nName: John Doe\nEmail: john@example.com\nExperience: 3 years as data analyst...",
    },
    # Add more examples for POs, contracts, etc.
]

def evaluate_smartdocs(test_docs):
    correct_type = 0
    total = len(test_docs)

    for doc in test_docs:
        print(f"\n=== Evaluating {doc['name']} ===")
        # NO await here â€“ process_document is sync
        result = process_document(doc["text"])
        predicted = result.doc_type
        expected = doc["expected_type"]
        print("Expected type:", expected, "| Predicted:", predicted)
        if predicted == expected:
            correct_type += 1

        # Check if some key fields exist
        print("Extracted fields keys:", list(result.fields.keys()))
        print("Suggested action:", result.suggested_action)

    accuracy = correct_type / total if total else 0
    print(f"\nOverall type classification accuracy: {accuracy:.2f}")

# NO await here either
evaluate_smartdocs(test_docs)



=== Evaluating Invoice 1 ===
Expected type: invoice | Predicted: invoice
Extracted fields keys: ['vendor', 'customer', 'invoice_date', 'due_date', 'currency', 'total_amount', 'line_items']
Suggested action: approve

=== Evaluating Resume 1 ===
Expected type: resume | Predicted: resume
Extracted fields keys: ['name', 'email', 'experience_summary']
Suggested action: approve

Overall type classification accuracy: 1.00
