In [1]:
import logging
import os

# Clean up any previous logs
for log_file in ["logger.log", "web.log", "tunnel.log"]:
    if os.path.exists(log_file):
        os.remove(log_file)
        print(f"üßπ Cleaned up {log_file}")

# Configure logging with DEBUG log level.
logging.basicConfig(
    filename="logger.log",
    level=logging.DEBUG,
    format="%(filename)s:%(lineno)s %(levelname)s:%(message)s",
)

print("‚úÖ Logging configured")

‚úÖ Logging configured


In [2]:
import os
from kaggle_secrets import UserSecretsClient

try:
    GOOGLE_API_KEY = UserSecretsClient().get_secret("GOOGLE_API_KEY")
    os.environ["GOOGLE_API_KEY"] = GOOGLE_API_KEY
    print("‚úÖ Gemini API key setup complete.")
except Exception as e:
    print(f"üîë Error: {e}")



‚úÖ Gemini API key setup complete.


In [3]:
from IPython.core.display import display, HTML
from jupyter_server.serverapp import list_running_servers


# Gets the proxied URL in the Kaggle Notebooks environment
def get_adk_proxy_url():
    PROXY_HOST = "https://kkb-production.jupyter-proxy.kaggle.net"
    ADK_PORT = "8000"

    servers = list(list_running_servers())
    if not servers:
        raise Exception("No running Jupyter servers found.")

    baseURL = servers[0]["base_url"]

    try:
        path_parts = baseURL.split("/")
        kernel = path_parts[2]
        token = path_parts[3]
    except IndexError:
        raise Exception(f"Could not parse kernel/token from base URL: {baseURL}")

    url_prefix = f"/k/{kernel}/{token}/proxy/proxy/{ADK_PORT}"
    url = f"{PROXY_HOST}{url_prefix}"

    styled_html = f"""
    <div style="padding: 15px; border: 2px solid #f0ad4e; border-radius: 8px; background-color: #fef9f0; margin: 20px 0;">
        <div style="font-family: sans-serif; margin-bottom: 12px; color: #333; font-size: 1.1em;">
            <strong>‚ö†Ô∏è IMPORTANT: Action Required</strong>
        </div>
        <div style="font-family: sans-serif; margin-bottom: 15px; color: #333; line-height: 1.5;">
            The ADK web UI is <strong>not running yet</strong>. You must start it in the next cell.
            <ol style="margin-top: 10px; padding-left: 20px;">
                <li style="margin-bottom: 5px;"><strong>Run the next cell</strong> (the one with <code>!adk web ...</code>) to start the ADK web UI.</li>
                <li style="margin-bottom: 5px;">Wait for that cell to show it is "Running" (it will not "complete").</li>
                <li>Once it's running, <strong>return to this button</strong> and click it to open the UI.</li>
            </ol>
            <em style="font-size: 0.9em; color: #555;">(If you click the button before running the next cell, you will get a 500 error.)</em>
        </div>
        <a href='{url}' target='_blank' style="
            display: inline-block; background-color: #1a73e8; color: white; padding: 10px 20px;
            text-decoration: none; border-radius: 25px; font-family: sans-serif; font-weight: 500;
            box-shadow: 0 2px 5px rgba(0,0,0,0.2); transition: all 0.2s ease;">
            Open ADK Web UI (after running cell below) ‚Üó
        </a>
    </div>
    """

    display(HTML(styled_html))

    return url_prefix


print("‚úÖ Helper functions defined.")

‚úÖ Helper functions defined.


In [4]:
!adk create analysis-agent --model gemini-2.5-flash-lite --api_key $GOOGLE_API_KEY

[32m
Agent created in /kaggle/working/analysis-agent:
- .env
- __init__.py
- agent.py
[0m


In [5]:

import os
from kaggle_secrets import UserSecretsClient

try:
    GOOGLE_API_KEY = UserSecretsClient().get_secret("GOOGLE_API_KEY")
    os.environ["GOOGLE_API_KEY"] = GOOGLE_API_KEY
    print("‚úÖ Gemini API key setup complete.")
except Exception as e:
    print(f"üîë Error: {e}")


# %% [code] {"execution":{"iopub.status.busy":"2025-11-26T08:29:42.406581Z","iopub.execute_input":"2025-11-26T08:29:42.407530Z","iopub.status.idle":"2025-11-26T08:30:13.555313Z","shell.execute_reply.started":"2025-11-26T08:29:42.407496Z","shell.execute_reply":"2025-11-26T08:30:13.554087Z"}}
import json
import requests
import subprocess
import time
import uuid

from google.adk.agents import LlmAgent
from google.adk.agents.remote_a2a_agent import (
    RemoteA2aAgent,
    AGENT_CARD_WELL_KNOWN_PATH,
)

from google.adk.a2a.utils.agent_to_a2a import to_a2a
from google.adk.models.google_llm import Gemini
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.genai import types

# Hide additional warnings in the notebook
import warnings

warnings.filterwarnings("ignore")

print("‚úÖ ADK components imported successfully.")

# %% [code] {"execution":{"iopub.status.busy":"2025-11-26T08:30:15.533832Z","iopub.execute_input":"2025-11-26T08:30:15.534495Z","iopub.status.idle":"2025-11-26T08:30:15.540761Z","shell.execute_reply.started":"2025-11-26T08:30:15.534461Z","shell.execute_reply":"2025-11-26T08:30:15.539619Z"}}
from google.genai import types
retry_config = types.HttpRetryOptions(
    attempts=5,
    exp_base=7,
    initial_delay=1,
    http_status_codes=[429, 500, 503, 504],
)

# %% [code] {"execution":{"iopub.status.busy":"2025-11-26T08:30:17.303013Z","iopub.execute_input":"2025-11-26T08:30:17.303394Z","iopub.status.idle":"2025-11-26T08:30:17.318249Z","shell.execute_reply.started":"2025-11-26T08:30:17.303368Z","shell.execute_reply":"2025-11-26T08:30:17.317173Z"}}
# ============================================================
# 2. DATA PROCESSING TOOLS
# ============================================================
import pandas as pd
def load_data(csv_path: str = "/kaggle/input/m-pesa-data/transanction.csv") -> pd.DataFrame:
    df = pd.read_csv(csv_path)
    numeric_cols = ["Paid In", "Withdrawn", "Balance"]
    df[numeric_cols] = df[numeric_cols].apply(pd.to_numeric, errors='coerce')
    df.dropna(subset=["Balance"], inplace=True)
    return df

def clean_and_prepare_data(df: pd.DataFrame) -> pd.DataFrame:
    df["Completion Time"] = pd.to_datetime(df['Completion Time'], errors='coerce')
    numeric_cols = ["Paid In", "Withdrawn", "Balance"]
    df[numeric_cols] = df[numeric_cols].apply(pd.to_numeric, errors="coerce")
    df.dropna(subset=["Balance"], inplace=True)
    return df

def sort_and_prepare_time_series(df: pd.DataFrame) -> pd.DataFrame:
    df = df.sort_values(by="Completion Time")
    return df.set_index("Completion Time")

def categorize_transaction(details: str) -> str:
    if not isinstance(details, str):
        return "Uncategorized"
    dl = details.lower()
    if "pay bill" in dl:
        return "Bills"
    elif "mpesa" in dl or "customer payment" in dl:
        return "Income"
    elif "withdraw" in dl or "purchase" in dl:
        return "Expenses"
    else:
        return "Other"

def analyze_mpesa_data() -> dict:
    df = load_data()
    df = clean_and_prepare_data(df)
    df = sort_and_prepare_time_series(df)
    df["Category"] = df["Details"].apply(categorize_transaction)

    balance_stats = {
        "start_balance": float(df["Balance"].iloc[0]),
        "end_balance": float(df["Balance"].iloc[-1]),
        "min_balance": float(df["Balance"].min()),
        "max_balance": float(df["Balance"].max()),
        "avg_balance": float(df["Balance"].mean()),
    }

    category_spending = (
        df[df["Withdrawn"] > 0]
        .groupby("Category")["Withdrawn"]
        .sum()
        .sort_values(ascending=False)
        .to_dict()
    )

    income_total = float(df["Paid In"].sum())
    withdrawal_total = float(df["Withdrawn"].sum())

    return {
        "balance_stats": balance_stats,
        "category_spending": category_spending,
        "income_total": income_total,
        "withdrawal_total": withdrawal_total,
    }

# ============================================================
# AGENT DEFINITION
# ============================================================

# The corrected code that is run via uvicorn
data_analyzer_agent = LlmAgent(
    name="DataAnalyzerAgent",
    model=Gemini(model="gemini-2.5-flash-lite", retry_options=retry_config),
    instruction="""
You are the Data Analyzer Agent...

Your mandatory task flow is:
1.  use the tool '**analyze_mpesa_data**' (which handles data retrieval/cleaning) to perform the full structured financial analysis. üëà **CONSENT STEP REMOVED**
2. give the user summary of the analysis
...
""",
    tools=[analyze_mpesa_data],
)

# %% [code] {"execution":{"iopub.status.busy":"2025-11-26T08:30:18.739260Z","iopub.execute_input":"2025-11-26T08:30:18.739560Z","iopub.status.idle":"2025-11-26T08:30:48.783034Z","shell.execute_reply.started":"2025-11-26T08:30:18.739538Z","shell.execute_reply":"2025-11-26T08:30:48.781654Z"}}
data_analyzer_agent_code = '''
import os
from google.adk.agents import LlmAgent
from google.adk.a2a.utils.agent_to_a2a import to_a2a
from google.adk.models.google_llm import Gemini
from google.genai import types

retry_config = types.HttpRetryOptions(
    attempts=5,  # Maximum retry attempts
    exp_base=7,  # Delay multiplier
    initial_delay=1,
    http_status_codes=[429, 500, 503, 504],  # Retry on these HTTP errors
)
# ============================================================
# 2. DATA PROCESSING TOOLS
# ============================================================
import pandas as pd
def load_data(csv_path: str = "/kaggle/input/m-pesa-data/transanction.csv") -> pd.DataFrame:
    df = pd.read_csv(csv_path)
    numeric_cols = ["Paid In", "Withdrawn", "Balance"]
    df[numeric_cols] = df[numeric_cols].apply(pd.to_numeric, errors='coerce')
    df.dropna(subset=["Balance"], inplace=True)
    return df

def clean_and_prepare_data(df: pd.DataFrame) -> pd.DataFrame:
    df["Completion Time"] = pd.to_datetime(df['Completion Time'], errors='coerce')
    numeric_cols = ["Paid In", "Withdrawn", "Balance"]
    df[numeric_cols] = df[numeric_cols].apply(pd.to_numeric, errors="coerce")
    df.dropna(subset=["Balance"], inplace=True)
    return df

def sort_and_prepare_time_series(df: pd.DataFrame) -> pd.DataFrame:
    df = df.sort_values(by="Completion Time")
    return df.set_index("Completion Time")

def categorize_transaction(details: str) -> str:
    if not isinstance(details, str):
        return "Uncategorized"
    dl = details.lower()
    if "pay bill" in dl:
        return "Bills"
    elif "mpesa" in dl or "customer payment" in dl:
        return "Income"
    elif "withdraw" in dl or "purchase" in dl:
        return "Expenses"
    else:
        return "Other"

def analyze_mpesa_data() -> dict:
    df = load_data()
    df = clean_and_prepare_data(df)
    df = sort_and_prepare_time_series(df)
    df["Category"] = df["Details"].apply(categorize_transaction)

    balance_stats = {
        "start_balance": float(df["Balance"].iloc[0]),
        "end_balance": float(df["Balance"].iloc[-1]),
        "min_balance": float(df["Balance"].min()),
        "max_balance": float(df["Balance"].max()),
        "avg_balance": float(df["Balance"].mean()),
    }

    category_spending = (
        df[df["Withdrawn"] > 0]
        .groupby("Category")["Withdrawn"]
        .sum()
        .sort_values(ascending=False)
        .to_dict()
    )

    income_total = float(df["Paid In"].sum())
    withdrawal_total = float(df["Withdrawn"].sum())

    return {
        "balance_stats": balance_stats,
        "category_spending": category_spending,
        "income_total": income_total,
        "withdrawal_total": withdrawal_total,
    }

# ============================================================
# AGENT DEFINITION
# ============================================================

# The corrected code that is run via uvicorn
data_analyzer_agent = LlmAgent(
    name="DataAnalyzerAgent",
    model=Gemini(model="gemini-2.5-flash-lite", retry_options=retry_config),
    instruction="""
You are the Data Analyzer Agent...

Your mandatory task flow is:
1.  use the tool '**analyze_mpesa_data**' (which handles data retrieval/cleaning) to perform the full structured financial analysis.
2. give the user summary of the analysis
""",
    tools=[analyze_mpesa_data],
)
app = to_a2a(data_analyzer_agent, port=8002)
'''
# --- 3. Write the agent code to a temporary file ---
SERVER_FILE = "/tmp/data_analyzer_server.py"
with open(SERVER_FILE, "w") as f:
    f.write(data_analyzer_agent_code)
print(f"üìù Data Analyzer agent code saved to {SERVER_FILE}")

# --- 4. Start uvicorn server in background ---
SERVER_PORT = "8002"
server_process = subprocess.Popen(
    [
        "uvicorn",
        "data_analyzer_server:app",
        "--host",
        "localhost",
        "--port",
        SERVER_PORT,
    ],
    cwd="/tmp",
    stdout=subprocess.PIPE,
    stderr=subprocess.PIPE,
    env={**os.environ},
)

print(f"üöÄ Starting Data Analyzer Agent server on port {SERVER_PORT}...")
print("   Waiting for server to be ready...")

# Wait for server to start (poll until it responds)
max_attempts = 30
for attempt in range(max_attempts):
    try:
        response = requests.get(
            f"http://localhost:{SERVER_PORT}/.well-known/agent-card.json", timeout=1
        )
        if response.status_code == 200:
            print(f"\n‚úÖ Data Analyzer Agent server is running!")
            print(f"   Server URL: http://localhost:{SERVER_PORT}")
            print(f"   Agent card: http://localhost:{SERVER_PORT}/.well-known/agent-card.json")
            break
    except requests.exceptions.RequestException:
        time.sleep(5)
        print(".", end="", flush=True)
else:
    print("\n‚ö†Ô∏è  Server may not be ready yet. Check manually if needed.")

# Store the process so we can stop it later
globals()["data_analyzer_server_process"] = server_process

# %% [code] {"execution":{"iopub.status.busy":"2025-11-26T08:30:48.784840Z","iopub.execute_input":"2025-11-26T08:30:48.785264Z","iopub.status.idle":"2025-11-26T08:30:48.798262Z","shell.execute_reply.started":"2025-11-26T08:30:48.785229Z","shell.execute_reply":"2025-11-26T08:30:48.797311Z"}}
import shutil
import os

# --- Define Source and Destination Paths ---
SOURCE_PATH = "/kaggle/input/m-pesa-data/transanction.csv"
DESTINATION_PATH = "/tmp/mpesa.csv"

# --- Copy the file ---
try:
    # Check if the source file exists before attempting to copy
    if os.path.exists(SOURCE_PATH):
        # shutil.copy will copy the file contents and permission mode
        shutil.copy(SOURCE_PATH, DESTINATION_PATH)
        print(f"‚úÖ Successfully copied data from:")
        print(f"   Source: {SOURCE_PATH}")
        print(f"   To Destination: {DESTINATION_PATH}")
        print("\nThe remote Data Analyzer Agent can now access the file at /tmp/mpesa.csv.")
    else:
        print(f"‚ö†Ô∏è Error: Source file not found at {SOURCE_PATH}")
        print("Please ensure the Kaggle dataset is correctly attached and the path is exact.")

except Exception as e:
    print(f"‚ùå An unexpected error occurred during file copy: {e}")

# --- Verification (Optional) ---
# You can add a quick check to ensure the file is there and has content
if os.path.exists(DESTINATION_PATH):
    file_size = os.path.getsize(DESTINATION_PATH)
    print(f"\nFile verification: {DESTINATION_PATH} size is {file_size} bytes.")
else:
    print("\nFile verification failed: The destination file does not exist.")

# %% [code] {"execution":{"iopub.status.busy":"2025-11-26T08:30:48.799359Z","iopub.execute_input":"2025-11-26T08:30:48.799597Z","iopub.status.idle":"2025-11-26T08:30:48.832216Z","shell.execute_reply.started":"2025-11-26T08:30:48.799576Z","shell.execute_reply":"2025-11-26T08:30:48.831162Z"}}

try:
    response = requests.get(
        "http://localhost:8002/.well-known/agent-card.json", timeout=5
    )

    if response.status_code == 200:
        agent_card = response.json()
        print("üìã mpesa statement analysis agent:")
        print(json.dumps(agent_card, indent=2))

        print("\n‚ú® Key Information:")
        print(f"   Name: {agent_card.get('name')}")
        print(f"   Description: {agent_card.get('description')}")
        print(f"   URL: {agent_card.get('url')}")
        print(f"   Skills: {len(agent_card.get('skills', []))} capabilities exposed")
    else:
        print(f"‚ùå Failed to fetch agent card: {response.status_code}")

except requests.exceptions.RequestException as e:
    print(f"‚ùå Error fetching agent card: {e}")
    print("   Make sure the Product Catalog Agent server is running (previous cell)")


# %% [code] {"execution":{"iopub.status.busy":"2025-11-26T08:30:50.990991Z","iopub.execute_input":"2025-11-26T08:30:50.991663Z","iopub.status.idle":"2025-11-26T08:30:50.998586Z","shell.execute_reply.started":"2025-11-26T08:30:50.991631Z","shell.execute_reply":"2025-11-26T08:30:50.997532Z"}}
from google.adk.agents.remote_a2a_agent import RemoteA2aAgent

# The port where the Data Analyzer Agent is running
SERVER_PORT = "8002"
# The standard A2A path for the agent's metadata
AGENT_CARD_WELL_KNOWN_PATH = "/.well-known/agent-card.json"

# Create a RemoteA2aAgent that connects to our Data Analyzer Agent
remote_mpesa_analyzer_agent = RemoteA2aAgent(
    name="DataAnalyzerAgent", # Use the exact name defined in the remote server code
    description="Remote agent for financial analysis of M-Pesa transaction data.",
    # Point to the agent card URL (localhost:8002)
    agent_card=f"http://localhost:{SERVER_PORT}{AGENT_CARD_WELL_KNOWN_PATH}",
)

print("‚úÖ Remote M-Pesa Data Analyzer Agent proxy created!")
print(f"   Connected to: http://localhost:{SERVER_PORT}")
print(f"   Agent card: http://localhost:{SERVER_PORT}{AGENT_CARD_WELL_KNOWN_PATH}")
print("   A client agent can now use this proxy like a local sub-agent!")

# %% [code] {"execution":{"iopub.status.busy":"2025-11-26T08:33:23.247504Z","iopub.execute_input":"2025-11-26T08:33:23.247887Z","iopub.status.idle":"2025-11-26T08:33:23.254774Z","shell.execute_reply.started":"2025-11-26T08:33:23.247856Z","shell.execute_reply":"2025-11-26T08:33:23.253500Z"}}

root_agent = LlmAgent(
    name="InsightsAgent",
    model=Gemini(model="gemini-2.5-flash-lite", retry_options=retry_config),
    # Add the remote proxy as a sub-agent
    sub_agents=[remote_mpesa_analyzer_agent],
    instruction="""
You are the **Lead Financial Insights and Advisor Agent**. Your primary goal is to perform a comprehensive SME's and individual financial assessment based on M-Pesa data.

**Workflow:**
    When a user asks for a financial assessment or loan advice use the subagent remote_mpesa_analyzer_agent to get the user data analysis
    Based on this data, provide a detailed financial assessment covering:
    * **Financial Health:** Comment on the overall income/withdrawal totals and balance fluctuations.
    * **Spending Profile:** Explain the major expense categories (e.g., Bills, Expenses) and comment on spending consistency.
    * **Creditworthiness:** Highlight key risks and opportunities that affect loan eligibility (e.g., low minimum balance, consistent income).
    
    You must provide a detailed analysis,advice including loan recommendation
    """,
    output_key="final_insights"
)

print("\n‚úÖ Insights Agent instruction corrected! Ready for re-test.")


‚úÖ Gemini API key setup complete.
‚úÖ ADK components imported successfully.
üìù Data Analyzer agent code saved to /tmp/data_analyzer_server.py
üöÄ Starting Data Analyzer Agent server on port 8002...
   Waiting for server to be ready...
.....
‚úÖ Data Analyzer Agent server is running!
   Server URL: http://localhost:8002
   Agent card: http://localhost:8002/.well-known/agent-card.json
‚úÖ Successfully copied data from:
   Source: /kaggle/input/m-pesa-data/transanction.csv
   To Destination: /tmp/mpesa.csv

The remote Data Analyzer Agent can now access the file at /tmp/mpesa.csv.

File verification: /tmp/mpesa.csv size is 182382 bytes.
üìã mpesa statement analysis agent:
{
  "capabilities": {},
  "defaultInputModes": [
    "text/plain"
  ],
  "defaultOutputModes": [
    "text/plain"
  ],
  "description": "An ADK Agent",
  "name": "DataAnalyzerAgent",
  "preferredTransport": "JSONRPC",
  "protocolVersion": "0.3.0",
  "skills": [
    {
      "description": "\nI am the Data Analyzer Age

In [6]:
async def test_mpesa_a2a_analysis(user_query: str):
    """
    Test the A2A communication between Insights Agent and Data Analyzer Agent.
    This function:
    1. Creates a new session for this conversation.
    2. Sends the query to the Insights Agent.
    3. Insights Agent calls the remote Data Analyzer Agent via A2A.
    4. Displays the final financial assessment response.

    Args:
        user_query: The question to ask the Insights Agent.
    """
    # Setup session management (required by ADK)
    session_service = InMemorySessionService()

    # Session identifiers
    app_name = "fintech_app"
    user_id = "demo_applicant"
    # Use unique session ID for each test to avoid conflicts
    session_id = f"demo_session_{uuid.uuid4().hex[:8]}"

    # CRITICAL: Create session BEFORE running agent
    session = await session_service.create_session(
        app_name=app_name, user_id=user_id, session_id=session_id
    )

    # Create runner for the Insights Agent (the client)
    runner = Runner(
        agent=root_agent, app_name=app_name, session_service=session_service
    )

    # Create the user message
    test_content = types.Content(parts=[types.Part(text=user_query)])

    # Display query
    print(f"\nüë§ Applicant Query: {user_query}")
    print(f"\nüß† Insights Agent Response:")
    print("-" * 70)

    # Run the agent asynchronously (handles streaming responses and A2A communication)
    async for event in runner.run_async(
        user_id=user_id, session_id=session_id, new_message=test_content
    ):
        # Print final response only (skip intermediate events)
        if event.is_final_response() and event.content:
            for part in event.content.parts:
                if hasattr(part, "text"):
                    print(part.text)

    print("-" * 70)

# Run the test
print("üß™ Testing A2A Communication for M-Pesa Analysis...\n")

# A typical query that would trigger the financial analysis
analysis_query = "Can you provide a financial assessment based on my M-Pesa transaction data?"

await test_mpesa_a2a_analysis(analysis_query)

# Note: In a live environment, you would need to stop the server process
# globals()["data_analyzer_server_process"].terminate()
# print("üõë Stopped Data Analyzer Agent server.")

üß™ Testing A2A Communication for M-Pesa Analysis...


üë§ Applicant Query: Can you provide a financial assessment based on my M-Pesa transaction data?

üß† Insights Agent Response:
----------------------------------------------------------------------
The analysis of your M-Pesa transactions reveals the following:

**Account Balance:**
*   Your average balance throughout the period was approximately 155.92.
*   The balance at the start of the period was 61.79, and it ended at 0.38.
*   The highest balance recorded was 960.38, and the lowest was 0.35.

**Income and Withdrawals:**
*   Your total income during this period was 52,769.59.
*   Your total withdrawals amounted to -55,391.65.

Currently, there is no data available for spending by category.
----------------------------------------------------------------------
