In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [1]:
# Cell 1 — Install and set up API key (run once)
!pip install -q google-adk google-generativeai

# Kaggle secrets helper (optional - recommended on Kaggle)
import os
try:
    from kaggle_secrets import UserSecretsClient
    GOOGLE_API_KEY = UserSecretsClient().get_secret("GOOGLE_API_KEY")
    os.environ["GOOGLE_API_KEY"] = GOOGLE_API_KEY
    print("✅ Loaded GOOGLE_API_KEY from Kaggle secrets.")
except Exception as e:
    print("⚠️ Could not load Kaggle secret. Make sure 'GOOGLE_API_KEY' is set in secrets or set env var GOOGLE_API_KEY.")
    # Optionally check env var:
    if os.environ.get("GOOGLE_API_KEY"):
        print("✅ GOOGLE_API_KEY present as env var.")
    else:
        print("❗ You must set GOOGLE_API_KEY before calling Gemini-based tools.")


[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m319.9/319.9 kB[0m [31m5.5 MB/s[0m eta [36m0:00:00[0m:00:01[0m
[?25h[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
bigframes 2.12.0 requires google-cloud-bigquery-storage<3.0.0,>=2.30.0, which is not installed.
google-cloud-translate 3.12.1 requires protobuf!=3.20.0,!=3.20.1,!=4.21.0,!=4.21.1,!=4.21.2,!=4.21.3,!=4.21.4,!=4.21.5,<5.0.0dev,>=3.19.5, but you have protobuf 5.29.5 which is incompatible.
ray 2.51.1 requires click!=8.3.0,>=7.0, but you have click 8.3.0 which is incompatible.
bigframes 2.12.0 requires rich<14,>=12.4.4, but you have rich 14.2.0 which is incompatible.
pydrive2 1.21.3 requires cryptography<44, but you have cryptography 46.0.3 which is incompatible.
pydrive2 1.21.3 requires pyOpenSSL<=24.2.1,>=19.1.0, but you have pyopenssl 25.3.0 which is incompatible.
gcsfs 2025.3.

In [17]:
# Cell 2 — Imports & ADK setup
import os, json, uuid, base64
from pathlib import Path
from datetime import datetime
from typing import Dict, Any

# ADK/Gemini imports
from google.genai import types
from google.adk.agents import LlmAgent
from google.adk.models.google_llm import Gemini
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.adk.tools.function_tool import FunctionTool
from google.adk.apps.app import App, ResumabilityConfig
from google.adk.runners import InMemoryRunner

# Create data directory
Path("data").mkdir(exist_ok=True)

print("✅ ADK imports ready.")


✅ ADK imports ready.


In [3]:
# Cell 3 — Retry config and model factory helper
retry_config = types.HttpRetryOptions(
    attempts=5,
    exp_base=7,
    initial_delay=1,
    http_status_codes=[429, 500, 503, 504],
)

def make_gemini_model(model_name="gemini-2.5-flash-lite"):
    return Gemini(model=model_name, retry_options=retry_config)

print("✅ Retry config and Gemini helper prepared.")


✅ Retry config and Gemini helper prepared.


In [4]:
# Cell 4 — Example alerts (you can replace with your sample_alerts.json)
sample_alerts = [
    {"alert_id":"I0001","timestamp":"2025-11-25T08:12:00Z","service":"service-auth","message":"High CPU usage on service-auth (92%) and repeated timeouts","metric_context":"cpu_usage:92%","severity":"critical"},
    {"alert_id":"I0002","timestamp":"2025-11-25T08:13:40Z","service":"service-cart","message":"Error rate spike for service-cart (7% errors)","metric_context":"error_rate:7%","severity":"high"},
    {"alert_id":"I0003","timestamp":"2025-11-25T08:14:00Z","service":"database","message":"Critical: DB primary node disk below 5%","metric_context":"disk_free:3%","severity":"critical"}
]
Path("data/sample_alerts.json").write_text(json.dumps(sample_alerts, indent=2))
print("✅ Wrote demo sample_alerts.json (3 alerts). Update data/sample_alerts.json if you want more.")


✅ Wrote demo sample_alerts.json (3 alerts). Update data/sample_alerts.json if you want more.


In [5]:
# Cell 5 — Ingest tool (FunctionTool)
def ingest_alerts(path: str = "data/sample_alerts.json") -> Dict[str, Any]:
    p = Path(path)
    if not p.exists():
        raise FileNotFoundError(f"{path} not found.")
    alerts = json.loads(p.read_text())
    # Normalize timestamps
    for a in alerts:
        a.setdefault("metric_context", "")
        a.setdefault("log_snippet", "")
        a.setdefault("severity", "info")
    return {"count": len(alerts), "alerts": alerts}

ingest_tool = FunctionTool(func=ingest_alerts)
print("✅ Ingest tool ready.")


✅ Ingest tool ready.


In [6]:
# Cell 6 — Correlator tool (simple service-based grouping + text similarity)
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity

def correlate_alerts_tool(alerts: list, threshold: float = 0.65) -> Dict[str, Any]:
    if not alerts:
        return {"groups": []}
    # Create search texts
    texts = [ (a.get("message","") + " " + a.get("metric_context","")) for a in alerts ]
    tfidf = TfidfVectorizer().fit_transform(texts)
    sim = cosine_similarity(tfidf)
    n = len(alerts)
    groups = [-1]*n
    gid = 0
    for i in range(n):
        if groups[i] != -1:
            continue
        groups[i] = gid
        for j in range(i+1, n):
            if groups[j] == -1 and sim[i,j] >= threshold:
                groups[j] = gid
        gid += 1
    # attach group ids
    for idx,a in enumerate(alerts):
        a["group_id"] = f"G{groups[idx]}"
    return {"groups": list(set([a["group_id"] for a in alerts])), "alerts": alerts}

correlator_tool = FunctionTool(func=correlate_alerts_tool)
print("✅ Correlator tool ready.")


✅ Correlator tool ready.


In [7]:
# Cell 7 — Rule-based priority classifier tool
def classify_priority_tool(alerts: list) -> Dict[str, Any]:
    def rule_prior(a):
        msg = (a.get("message","") + " " + a.get("log_snippet","")).lower()
        mc = a.get("metric_context","").lower()
        if "disk below" in msg or "data loss" in msg or "service down" in msg or "critical" in a.get("severity","").lower():
            return "P0"
        if "error rate" in mc or "timeout" in msg or "latency" in mc:
            return "P1"
        if "cannot" in msg or "failed" in msg:
            return "P2"
        return "P3"
    for a in alerts:
        a["pred_priority"] = rule_prior(a)
    return {"alerts": alerts}

classifier_tool = FunctionTool(func=classify_priority_tool)
print("✅ Classifier tool ready.")


✅ Classifier tool ready.


In [8]:
# Cell 8 — LLM-based summarizer and action recommender (calls Gemini)
# This tool will call the model to generate a JSON summary + recommended actions for an alert.
# Ensure GOOGLE_API_KEY is set before running this tool.

from google.genai import types as gen_types
from google.genai import client as gen_client  # used by ADK model under the hood if needed

def llm_summarize_and_recommend_tool(alert: dict, model_name: str = "gemini-2.5-flash-lite") -> Dict[str, Any]:
    """
    Use a direct GenerativeModel call to produce structured JSON for an individual alert.
    We create a compact prompt and expect JSON output (summary, priority_reason, actions[]).
    """
    # Basic guard: if no API key, return a template (so notebook can run w/o key)
    if not os.environ.get("GOOGLE_API_KEY"):
        # Fallback deterministic response (for offline/demo runs)
        return {
            "alert_id": alert.get("alert_id"),
            "summary": f"[{alert.get('service')}] {alert.get('message')}",
            "priority_reason": alert.get("pred_priority","P3"),
            "actions": [
                "Collect logs and relevant metrics",
                "Notify on-call engineer",
            ],
        }

    # Build prompt
    system = """
You are an enterprise incident assistant. Given an alert JSON, produce a compact JSON with:
- summary (1-2 sentences)
- priority_reason (one-sentence justification)
- actions (list of 2-5 concise next steps)
Return JSON only.
"""
    user_prompt = f"ALERT_JSON: {json.dumps(alert)}\n\nRespond with JSON."

    # Use the Gemini model via ADK's model object
    model = make_gemini_model(model_name)
    # The ADK LlmAgent usually orchestrates model calls; here we call model.generate_content via the ADK model wrapper.
    # Using the adk model object interface:
    response = model.generate_content(system_prompt=system, user_prompt=user_prompt)
    # response.text expected to be text string; try parse JSON
    try:
        text = response.text
        parsed = json.loads(text)
        return parsed
    except Exception as e:
        # fallback if model output not strict JSON
        return {
            "alert_id": alert.get("alert_id"),
            "summary": f"[{alert.get('service')}] {alert.get('message')}",
            "priority_reason": alert.get("pred_priority","P3"),
            "actions": [
                "Collect logs and relevant metrics",
                "Notify on-call engineer",
            ],
            "model_raw": getattr(response, "text", str(response))
        }

llm_tool = FunctionTool(func=llm_summarize_and_recommend_tool)
print("✅ LLM summarizer/recommender tool ready (uses Gemini when API key present).")


✅ LLM summarizer/recommender tool ready (uses Gemini when API key present).


In [9]:
# Cell 9 — Mitigation tool that requires human approval for high-risk automated actions
from google.adk.tools.tool_context import ToolContext

AUTO_MITIGATION_THRESHOLD = 0  # We'll require approval for any mitigation that would "restart DB" or similar

def mitigation_tool(action: str, target: str, tool_context: ToolContext) -> Dict[str, Any]:
    """
    action: e.g., "restart_service", "free_disk_space", "scale_up"
    target: resource identifier, e.g., "db-1" or "service-auth"
    This tool can request confirmation via tool_context.request_confirmation and will pause/resume.
    """
    # If this is first call and approval not provided, request confirmation
    # Using a simple policy: require confirmation if action contains 'restart' or 'shutdown' or 'scale_up' or target is production DB
    high_risk = any(k in action.lower() for k in ["restart", "shutdown", "scale"]) or "db" in target.lower()

    if not high_risk:
        # Auto-execute safe actions
        return {"status": "executed", "action": action, "target": target, "message": "Auto-executed safe mitigation."}

    # If high risk and tool_context has no confirmation yet -> request confirmation (pause)
    if not tool_context.tool_confirmation:
        tool_context.request_confirmation(
            hint=f"⚠️ Confirm {action} on {target} (high-risk).",
            payload={"action": action, "target": target}
        )
        return {"status": "pending", "message": f"Mitigation '{action}' on {target} requires approval."}

    # If resuming: check confirmation
    if tool_context.tool_confirmation.confirmed:
        # Here we'd perform the mitigation (mock)
        return {"status": "executed", "action": action, "target": target, "message": "Mitigation executed after approval."}
    else:
        return {"status": "rejected", "message": "Mitigation rejected by human."}

mitigation_function_tool = FunctionTool(func=mitigation_tool)
print("✅ Mitigation tool (pausable) ready.")


✅ Mitigation tool (pausable) ready.


In [10]:
# Cell 10 — Create LlmAgent that knows about all tools
triage_agent = LlmAgent(
    name="triage_agent",
    model=make_gemini_model("gemini-2.5-flash-lite"),
    instruction="""
You are an enterprise incident triage assistant. Use the provided tools to:
1) Ingest alerts (ingest_alerts)
2) Correlate related alerts (correlate_alerts)
3) Classify priorities (classify_priority)
4) For each alert, ask the LLM tool to summarize & recommend actions (llm_summarize_and_recommend)
5) If a recommended mitigation is high risk, call mitigation_tool (which may pause for approval)
Respond concisely and always return structured outputs as JSON where applicable.
""",
    tools=[ingest_tool, correlator_tool, classifier_tool, llm_tool, mitigation_function_tool],
)
print("✅ Triage agent created with tools.")


✅ Triage agent created with tools.


In [11]:
# Cell 11 — App & Runner for resumable flows
triage_app = App(name="triage_app", root_agent=triage_agent, resumability_config=ResumabilityConfig(is_resumable=True))
session_service = InMemorySessionService()
triage_runner = Runner(app=triage_app, session_service=session_service)

print("✅ App, session service and runner ready for resumable triage workflows.")


✅ App, session service and runner ready for resumable triage workflows.


  triage_app = App(name="triage_app", root_agent=triage_agent, resumability_config=ResumabilityConfig(is_resumable=True))


In [12]:
# Cell 12 — Helper utilities for running/resuming workflows
def find_confirmation_event(events):
    """Return approval info if present in events (search for adk_request_confirmation)."""
    for event in events:
        if event.content and event.content.parts:
            for part in event.content.parts:
                if part.function_call and part.function_call.name == "adk_request_confirmation":
                    return {"approval_id": part.function_call.id, "invocation_id": event.invocation_id}
    return None

def make_confirmation_content(approval_id: str, confirmed: bool):
    """Construct types.Content that wraps a FunctionResponse confirming or rejecting."""
    fr = types.FunctionResponse(id=approval_id, name="adk_request_confirmation", response={"confirmed": confirmed})
    return types.Content(role="user", parts=[types.Part(function_response=fr)])

def print_agent_text(events):
    for event in events:
        if event.content and event.content.parts:
            for part in event.content.parts:
                if part.text:
                    print("Agent >", part.text)


In [13]:
# Cell 13 — Orchestrator to run triage flow for the dataset and handle approvals (async)
import asyncio
from google.genai import types as genai_types

async def run_trige_session(auto_approve: bool = True):
    # create unique session id
    session_id = f"triage_{uuid.uuid4().hex[:8]}"
    await session_service.create_session(app_name="triage_app", user_id="demo_user", session_id=session_id)

    # initial user message: ask agent to process sample alerts from data/sample_alerts.json
    user_msg = types.Content(role="user", parts=[types.Part(text="Process the alerts in data/sample_alerts.json and triage them. Create tickets for critical incidents and propose mitigations.")])

    events = []
    async for event in triage_runner.run_async(user_id="demo_user", session_id=session_id, new_message=user_msg):
        events.append(event)

    # Check if agent asked for any confirmation
    approval_info = find_confirmation_event(events)
    if approval_info:
        print("⏸️ Agent requested human approval for a mitigation.")
        print(f"Simulating human decision: {'APPROVE' if auto_approve else 'REJECT'}")
        confirm_content = make_confirmation_content(approval_info["approval_id"], confirmed=auto_approve)
        # Resume the agent: need to pass the same invocation_id to resume
        async for ev in triage_runner.run_async(user_id="demo_user", session_id=session_id, new_message=confirm_content, invocation_id=approval_info["invocation_id"]):
            # print textual outputs
            if ev.content and ev.content.parts:
                for part in ev.content.parts:
                    if part.text:
                        print("Agent >", part.text)
    else:
        # No approvals required — print initial textual outputs
        print_agent_text(events)

    print("✅ Triage session completed.")

# Run the async function synchronously in the notebook
await run_trige_session(auto_approve=True)


ClientError: 400 INVALID_ARGUMENT. {'error': {'code': 400, 'message': '* GenerateContentRequest.tools[0].function_declarations[1].parameters.properties[alerts].items: missing field.\n* GenerateContentRequest.tools[0].function_declarations[2].parameters.properties[alerts].items: missing field.\n', 'status': 'INVALID_ARGUMENT'}}

In [18]:
# Cell 14 — After a run, check data/tickets_created.json (agent tools may create tickets as they run)
# If you implemented a ticket creation inside the agent's workflow (FunctionTool), it may have written to this file.
tickets_path = Path("data/tickets_created.json")
if tickets_path.exists():
    print("Tickets created:")
    print(tickets_path.read_text())
else:
    print("No tickets_created.json found. If your agent didn't create tickets, check the agent output printed above.")

# Create a simple evaluation placeholder CSV combining alerts -> priority predicted
# (If classifier tool attached predictions to alerts and agent persisted them, we can read them from a file; otherwise we can re-call the classifier here)

# Quick local evaluation: use classifier tool on sample alerts to produce predicted priorities
from google.adk.tools.function_tool import FunctionCallContext
# call classifier directly for demo (not via agent)
alerts = json.loads(Path("data/sample_alerts.json").read_text())
res = classify_priority_tool(alerts)  # direct function call as defined earlier
df = []
for a in res["alerts"]:
    df.append({"alert_id": a["alert_id"], "service": a["service"], "pred_priority": a["pred_priority"]})
import pandas as pd
df = pd.DataFrame(df)
df.to_csv("data/triage_predictions.csv", index=False)
print("Wrote data/triage_predictions.csv")
display(df)


No tickets_created.json found. If your agent didn't create tickets, check the agent output printed above.


ImportError: cannot import name 'FunctionCallContext' from 'google.adk.tools.function_tool' (/usr/local/lib/python3.11/dist-packages/google/adk/tools/function_tool.py)

In [19]:
# Cell 15 — Run the triage session twice: once auto-approve True, once False (demonstration)
print("\n--- Demo run: auto_approve = True ---")
await run_trige_session(auto_approve=True)

print("\n--- Demo run: auto_approve = False ---")
await run_trige_session(auto_approve=False)



--- Demo run: auto_approve = True ---


ClientError: 400 INVALID_ARGUMENT. {'error': {'code': 400, 'message': '* GenerateContentRequest.tools[0].function_declarations[1].parameters.properties[alerts].items: missing field.\n* GenerateContentRequest.tools[0].function_declarations[2].parameters.properties[alerts].items: missing field.\n', 'status': 'INVALID_ARGUMENT'}}