In [None]:
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Demo Plan: BigQuery for Agent Ops - Unified Platform

<table align="left">

  <td>
    <a href="https://colab.research.google.com/github/haiyuan-eng-google/demo_BQ_agent_analytics_plugin_notebook/blob/main/Demo_Plan_BigQuery_for_Agent_Ops_Unified_Platform_Public.ipynb">
      <img src="https://raw.githubusercontent.com/googleapis/python-bigquery-dataframes/refs/heads/main/third_party/logo/colab-logo.png" alt="Colab logo"> Run in Colab
    </a>
  </td>
  <td>
    <a href="https://github.com/haiyuan-eng-google/demo_BQ_agent_analytics_plugin_notebook/blob/main/Demo_Plan_BigQuery_for_Agent_Ops_Unified_Platform_Public.ipynb">
      <img src="https://raw.githubusercontent.com/googleapis/python-bigquery-dataframes/refs/heads/main/third_party/logo/github-logo.png" width="32" alt="GitHub logo">
      View on GitHub
    </a>
  </td>
  <td>
    <a href="https://console.cloud.google.com/vertex-ai/workbench/deploy-notebook?download_url=https://raw.githubusercontent.com/haiyuan-eng-google/demo_BQ_agent_analytics_plugin_notebook/main/Demo_Plan_BigQuery_for_Agent_Ops_Unified_Platform_Public.ipynb">
      <img src="https://www.gstatic.com/images/branding/product/1x/google_cloud_48dp.png" alt="Vertex AI logo" width="32">
      Open in Vertex AI Workbench
    </a>
  </td>
  <td>
    <a href="https://console.cloud.google.com/bigquery/import?url=https://github.com/haiyuan-eng-google/demo_BQ_agent_analytics_plugin_notebook/blob/main/Demo_Plan_BigQuery_for_Agent_Ops_Unified_Platform_Public.ipynb">
      <img src="https://encrypted-tbn0.gstatic.com/images?q=tbn:ANd9GcTW1gvOovVlbZAIZylUtf5Iu8-693qS1w5NJw&s" alt="BQ logo" width="35">
      Open in BQ Studio
    </a>
  </td>
</table>

## Install Dependencies

In [None]:
!pip install -q google-adk bigquery-agent-analytics google-cloud-bigquery nest-asyncio

## Authenticate & Configure

In [None]:
import os

# Colab authentication
try:
    from google.colab import auth
    auth.authenticate_user()
    print("Colab authentication successful.")
except ImportError:
    print("Not running in Colab — using default credentials.")

# ---------- Configuration ----------
PROJECT_ID = os.environ.get("GOOGLE_CLOUD_PROJECT", "test-project-0728-467323")
DATASET_ID = os.environ.get("BQ_DATASET", "agent_analytics")
TABLE_ID = os.environ.get("BQ_TABLE", "agent_events_v2")
MODEL_NAME = os.environ.get("MODEL_NAME", "gemini-3-flash-preview")
LOCATION = "US"
APP_NAME = "e2e_notebook_demo"
USER_ID = "demo_user"

os.environ["GOOGLE_GENAI_USE_VERTEXAI"] = "true"
os.environ["GOOGLE_CLOUD_PROJECT"] = PROJECT_ID
os.environ["GOOGLE_CLOUD_LOCATION"] = "global"

# Enable async in Jupyter
import nest_asyncio
nest_asyncio.apply()

print(f"Project  : {PROJECT_ID}")
print(f"Dataset  : {DATASET_ID}")
print(f"Table    : {TABLE_ID}")
print(f"Model    : {MODEL_NAME}")

---

## Phase 1: Run Agent & Log Traces to BigQuery

We define a **travel planner agent** with four deterministic tools, run three conversations, and log every event to BigQuery via the `BigQueryAgentAnalyticsPlugin`.

In [None]:
import hashlib
import random
from typing import Any


async def search_flights(
    origin: str,
    destination: str,
    date: str,
    max_results: int = 5,
) -> dict[str, Any]:
    """Search for available flights between two cities.

    Args:
        origin: Departure city or airport code.
        destination: Arrival city or airport code.
        date: Travel date in YYYY-MM-DD format.
        max_results: Maximum number of results to return.

    Returns:
        Dictionary with flight search results.
    """
    seed = int(
        hashlib.md5(f"{origin}{destination}{date}".encode()).hexdigest()[:8],
        16,
    )
    rng = random.Random(seed)
    airlines = [
        "United Airlines", "Delta Air Lines", "American Airlines",
        "JetBlue Airways", "Southwest Airlines", "Alaska Airlines",
    ]
    flights = []
    for i in range(min(max_results, 5)):
        dep_hour = rng.randint(6, 20)
        duration_h = rng.randint(2, 14)
        flights.append({
            "flight_id": f"FL-{seed + i:06d}",
            "airline": rng.choice(airlines),
            "origin": origin,
            "destination": destination,
            "date": date,
            "departure_time": f"{dep_hour:02d}:{rng.choice(['00','15','30','45'])}",
            "arrival_time": f"{(dep_hour + duration_h) % 24:02d}:{rng.choice(['00','15','30','45'])}",
            "duration_hours": duration_h,
            "price_usd": round(rng.uniform(150, 1200), 2),
            "class": rng.choice(["Economy", "Premium Economy", "Business"]),
            "stops": rng.choice([0, 0, 0, 1, 1, 2]),
        })
    return {
        "query": {"origin": origin, "destination": destination, "date": date},
        "results_count": len(flights),
        "flights": flights,
    }


async def search_hotels(
    city: str,
    check_in: str,
    check_out: str,
    max_results: int = 5,
) -> dict[str, Any]:
    """Search for hotels in a given city.

    Args:
        city: City name to search hotels in.
        check_in: Check-in date (YYYY-MM-DD).
        check_out: Check-out date (YYYY-MM-DD).
        max_results: Maximum number of results to return.

    Returns:
        Dictionary with hotel search results.
    """
    seed = int(hashlib.md5(f"{city}{check_in}".encode()).hexdigest()[:8], 16)
    rng = random.Random(seed)
    hotel_names = [
        f"Grand {city} Hotel", f"{city} Plaza",
        f"The {city} Marriott", f"Hilton {city} Downtown",
        f"Hyatt Regency {city}", f"Four Seasons {city}",
        f"Holiday Inn {city}",
    ]
    hotels = []
    for i in range(min(max_results, 5)):
        rating = round(rng.uniform(3.5, 5.0), 1)
        hotels.append({
            "hotel_id": f"HT-{seed + i:06d}",
            "name": hotel_names[i % len(hotel_names)],
            "city": city,
            "check_in": check_in,
            "check_out": check_out,
            "rating": rating,
            "price_per_night_usd": round(rng.uniform(80, 500), 2),
            "amenities": rng.sample(
                ["WiFi", "Pool", "Gym", "Spa", "Restaurant",
                 "Bar", "Room Service", "Parking",
                 "Airport Shuttle", "Business Center"],
                k=rng.randint(3, 7),
            ),
            "distance_to_center_km": round(rng.uniform(0.2, 8.0), 1),
        })
    return {
        "query": {"city": city, "check_in": check_in, "check_out": check_out},
        "results_count": len(hotels),
        "hotels": hotels,
    }


async def get_weather_forecast(
    city: str,
    date: str,
) -> dict[str, Any]:
    """Get weather forecast for a city on a specific date.

    Args:
        city: City name.
        date: Date in YYYY-MM-DD format.

    Returns:
        Dictionary with weather forecast data.
    """
    seed = int(hashlib.md5(f"{city}{date}".encode()).hexdigest()[:8], 16)
    rng = random.Random(seed)
    conditions = [
        "Sunny", "Partly Cloudy", "Cloudy", "Light Rain",
        "Rain", "Thunderstorms", "Clear", "Overcast",
    ]
    return {
        "city": city,
        "date": date,
        "temperature_high_c": rng.randint(15, 35),
        "temperature_low_c": rng.randint(5, 20),
        "condition": rng.choice(conditions),
        "humidity_pct": rng.randint(30, 90),
        "wind_speed_kmh": rng.randint(5, 40),
        "precipitation_chance_pct": rng.randint(0, 80),
        "uv_index": rng.randint(1, 11),
    }


async def calculate_trip_budget(
    flights: float,
    hotels: float,
    daily_expenses: float,
    num_days: int,
) -> dict[str, Any]:
    """Calculate total trip budget from component costs.

    Args:
        flights: Total flight cost in USD.
        hotels: Total hotel cost in USD.
        daily_expenses: Estimated daily expenses (food, transport, etc.).
        num_days: Number of trip days.

    Returns:
        Dictionary with itemised budget breakdown.
    """
    total_daily = daily_expenses * num_days
    subtotal = flights + hotels + total_daily
    tax_and_fees = round(subtotal * 0.12, 2)
    total = round(subtotal + tax_and_fees, 2)
    return {
        "breakdown": {
            "flights": round(flights, 2),
            "hotels": round(hotels, 2),
            "daily_expenses_total": round(total_daily, 2),
            "daily_expenses_per_day": round(daily_expenses, 2),
            "num_days": num_days,
            "tax_and_fees": tax_and_fees,
        },
        "subtotal_usd": round(subtotal, 2),
        "total_usd": total,
        "currency": "USD",
    }


print("Tool functions defined: search_flights, search_hotels, get_weather_forecast, calculate_trip_budget")

In [None]:
from google.adk.agents import LlmAgent
from google.genai import types

TRAVEL_PLANNER_INSTRUCTION = """\
You are a helpful travel planning assistant. You help users plan trips by
searching for flights, hotels, checking weather forecasts, and calculating
budgets.

Guidelines:
- Always search for flights and hotels when the user asks to plan a trip.
- Check the weather at the destination when relevant.
- Provide a budget estimate when enough cost information is available.
- Be concise but informative in your responses.
- Present results in a clear, organized format.
- When multiple tools are needed, call them as appropriate and then
  synthesize the results into a cohesive plan.
"""


def build_agent() -> LlmAgent:
    """Build the travel planner agent."""
    return LlmAgent(
        name="travel_planner",
        model=MODEL_NAME,
        instruction=TRAVEL_PLANNER_INSTRUCTION,
        tools=[
            search_flights,
            search_hotels,
            get_weather_forecast,
            calculate_trip_budget,
        ],
        generate_content_config=types.GenerateContentConfig(
            temperature=1.0,
        ),
    )


print("Agent builder ready.")

In [None]:
import asyncio
import uuid

from google.adk.plugins.bigquery_agent_analytics_plugin import (
    BigQueryAgentAnalyticsPlugin,
    BigQueryLoggerConfig,
)
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService

# Build agent, runner, and plugin
agent = build_agent()
session_service = InMemorySessionService()

plugin = BigQueryAgentAnalyticsPlugin(
    project_id=PROJECT_ID,
    dataset_id=DATASET_ID,
    config=BigQueryLoggerConfig(
        table_id=TABLE_ID,
        batch_size=1,
        batch_flush_interval=1.0,
    ),
    location=LOCATION,
)

runner = Runner(
    agent=agent,
    app_name=APP_NAME,
    session_service=session_service,
    plugins=[plugin],
)

# Define three conversations
conversations = [
    {
        "label": "Simple trip (SF -> NY)",
        "messages": [
            (
                "Plan a weekend trip from San Francisco to New York"
                " departing 2025-04-12 and returning 2025-04-14."
                " Search flights for April 12 and hotels checking"
                " in April 12, checking out April 14."
            ),
        ],
    },
    {
        "label": "Complex trip (LA -> Tokyo)",
        "messages": [
            (
                "I want to plan a 5-day vacation to Tokyo from"
                " 2025-05-01 to 2025-05-06. Search flights from"
                " Los Angeles departing 2025-05-01, find hotels in"
                " Tokyo checking in 2025-05-01 and checking out"
                " 2025-05-06, check the weather for 2025-05-02,"
                " and calculate the budget with the flight and"
                " hotel prices you find plus $150/day expenses"
                " for 5 days."
            ),
        ],
    },
    {
        "label": "Multi-turn (Chicago -> Paris)",
        "messages": [
            "What's the weather like in Paris on 2025-04-20?",
            "Find me flights from Chicago to Paris on 2025-04-20.",
            (
                "Now find hotels in Paris checking in 2025-04-20"
                " and checking out 2025-04-25."
            ),
        ],
    },
]


async def run_conversation(messages, label=""):
    """Run a multi-turn conversation and return the session_id."""
    session_id = f"e2e-{uuid.uuid4().hex[:12]}"
    await session_service.create_session(
        app_name=APP_NAME,
        user_id=USER_ID,
        session_id=session_id,
    )
    print(f"\n{'=' * 60}")
    print(f"  Session: {session_id}  [{label}]")
    print(f"{'=' * 60}")

    for i, message in enumerate(messages, 1):
        print(f"\n[Turn {i}] User: {message}")
        print("-" * 48)
        user_content = types.Content(
            role="user",
            parts=[types.Part(text=message)],
        )
        response_parts = []
        async for event in runner.run_async(
            user_id=USER_ID,
            session_id=session_id,
            new_message=user_content,
        ):
            if event.content and event.content.parts:
                for part in event.content.parts:
                    if hasattr(part, "text") and part.text:
                        response_parts.append(part.text)
                    elif hasattr(part, "function_call") and part.function_call:
                        print(f"  -> Tool call: {part.function_call.name}")
        if response_parts:
            text = "\n".join(response_parts)
            print(f"\n[Agent]: {text[:1000]}")
            if len(text) > 1000:
                print(f"  ... (truncated, {len(text)} chars total)")
    return session_id


# Run all conversations and collect session IDs
session_ids = []
for conv in conversations:
    sid = asyncio.get_event_loop().run_until_complete(
        run_conversation(conv["messages"], label=conv["label"])
    )
    session_ids.append(sid)

print(f"\n\nSession IDs: {session_ids}")

In [None]:
import time

# Flush plugin to ensure all events are written
print("Flushing traces to BigQuery ...")
try:
    asyncio.get_event_loop().run_until_complete(plugin.flush())
except Exception as exc:
    print(f"Flush warning: {exc}")

settle_seconds = 15
print(f"Waiting {settle_seconds}s for BigQuery data to settle ...")
time.sleep(settle_seconds)
print("Done.")

---

## Phase 2: Trace Retrieval & Visualization

Now that traces are in BigQuery, we use the **SDK Client** to fetch them. Each `Trace` contains a hierarchical span tree that can be rendered as a DAG. We can also inspect tool calls, the final response, and any error spans.

In [None]:
from bigquery_agent_analytics import Client, TraceFilter

client = Client(
    project_id=PROJECT_ID,
    dataset_id=DATASET_ID,
    table_id=TABLE_ID,
    location=LOCATION,
    endpoint=MODEL_NAME,
)
print("SDK Client initialised.")

In [None]:
# Retrieve and render each trace
traces = []
for sid in session_ids:
    try:
        trace = client.get_trace(sid)
        traces.append(trace)
        print(f"\n{'=' * 60}")
        print(f"  Trace for session: {sid}")
        print(f"{'=' * 60}")
        _ = trace.render()  # render() prints and returns the tree
    except Exception as exc:
        print(f"Could not retrieve trace {sid}: {exc}")
        traces.append(None)

In [None]:
# Inspect trace properties
for i, trace in enumerate(traces):
    if trace is None:
        continue
    print(f"\n--- Session {i+1}: {trace.session_id} ---")
    print(f"  Tool calls: {len(trace.tool_calls)}")
    for tc in trace.tool_calls:
        print(f"    - {tc.get('tool_name', '?')}")
    final = trace.final_response or "(none)"
    print(f"  Final response: {final[:300]}")
    errors = trace.error_spans
    print(f"  Error spans: {len(errors)}")
    for es in errors:
        print(f"    - {es.event_type}: {es.error_message}")

In [None]:
# List traces with filtering
all_traces = client.list_traces(
    TraceFilter(session_ids=session_ids)
)
print(f"Listed {len(all_traces)} traces:")
for t in all_traces:
    print(f"  - {t.session_id}  spans={len(t.spans)}  "
          f"tools={len(t.tool_calls)}")

---

## Phase 3: Code-Based Evaluation

The `CodeEvaluator` runs deterministic metrics over session aggregates — no LLM needed. Pre-built evaluators cover latency, turn count, error rate, token efficiency, and cost. You can also define custom metrics.

In [None]:
from bigquery_agent_analytics import CodeEvaluator

trace_filter = TraceFilter(session_ids=session_ids)

presets = [
    ("latency", CodeEvaluator.latency(threshold_ms=30000)),
    ("turn_count", CodeEvaluator.turn_count(max_turns=10)),
    ("error_rate", CodeEvaluator.error_rate(max_error_rate=0.1)),
    ("token_efficiency", CodeEvaluator.token_efficiency(max_tokens=100000)),
    ("cost_per_session", CodeEvaluator.cost_per_session(max_cost_usd=1.0)),
]

for name, evaluator in presets:
    try:
        report = asyncio.get_event_loop().run_until_complete(
            asyncio.to_thread(
                client.evaluate,
                evaluator=evaluator,
                filters=trace_filter,
            )
        )
        print(f"\n[{name}]")
        print(report.summary())
    except Exception as exc:
        print(f"\n[{name}] Failed: {exc}")

In [None]:
# Custom metric: response length scoring
def response_length_score(session_summary: dict) -> float:
    """Score based on response token count — longer is better up to a point."""
    tokens = session_summary.get("output_tokens") or 0
    # Ideal range: 200-2000 tokens
    if 200 <= tokens <= 2000:
        return 1.0
    elif tokens < 200:
        return tokens / 200.0
    else:
        return max(0.0, 1.0 - (tokens - 2000) / 5000.0)


custom_eval = (
    CodeEvaluator("custom_metrics")
    .add_metric("response_length", response_length_score, threshold=0.5)
)

try:
    report = asyncio.get_event_loop().run_until_complete(
        asyncio.to_thread(
            client.evaluate,
            evaluator=custom_eval,
            filters=trace_filter,
        )
    )
    print("[custom: response_length]")
    print(report.summary())
except Exception as exc:
    print(f"Custom evaluator failed: {exc}")

---

## Phase 4: LLM-as-Judge Evaluation

Semantic evaluation using an LLM to judge agent quality. The SDK supports a 3-tier fallback: BigQuery `AI.GENERATE` → `ML.GENERATE_TEXT` → Gemini API. Pre-built judges evaluate **correctness**, **hallucination** (faithfulness), and **sentiment**.

In [None]:
from bigquery_agent_analytics import LLMAsJudge

# Correctness evaluation
judge_correctness = LLMAsJudge.correctness(threshold=0.6)
try:
    report = asyncio.get_event_loop().run_until_complete(
        asyncio.to_thread(
            client.evaluate,
            evaluator=judge_correctness,
            filters=trace_filter,
        )
    )
    print("[LLM Judge: Correctness]")
    print(report.summary())
    print("\nPer-session details:")
    for ss in report.session_scores:
        print(f"  {ss.session_id}: scores={ss.scores} "
              f"passed={ss.passed}")
        if ss.llm_feedback:
            print(f"    Feedback: {ss.llm_feedback[:200]}")
except Exception as exc:
    print(f"Correctness judge failed: {exc}")

In [None]:
# Hallucination (faithfulness) evaluation
judge_hallucination = LLMAsJudge.hallucination()
try:
    report = asyncio.get_event_loop().run_until_complete(
        asyncio.to_thread(
            client.evaluate,
            evaluator=judge_hallucination,
            filters=trace_filter,
        )
    )
    print("[LLM Judge: Hallucination/Faithfulness]")
    print(report.summary())
except Exception as exc:
    print(f"Hallucination judge failed: {exc}")

In [None]:
# Sentiment evaluation
judge_sentiment = LLMAsJudge.sentiment()
try:
    report = asyncio.get_event_loop().run_until_complete(
        asyncio.to_thread(
            client.evaluate,
            evaluator=judge_sentiment,
            filters=trace_filter,
        )
    )
    print("[LLM Judge: Sentiment]")
    print(report.summary())
except Exception as exc:
    print(f"Sentiment judge failed: {exc}")

---

## Phase 5: Trajectory Matching

Compare actual agent tool-call sequences against **golden trajectories**. Three match types:

| `MatchType` | Description |
|---|---|
| `EXACT` | Tool calls must match exactly (order & count) |
| `IN_ORDER` | Expected tools appear in order, extra tools allowed between |
| `ANY_ORDER` | All expected tools present, any order |


In [None]:
from bigquery_agent_analytics import BigQueryTraceEvaluator
from bigquery_agent_analytics.trace_evaluator import MatchType

trace_evaluator = BigQueryTraceEvaluator(
    project_id=PROJECT_ID,
    dataset_id=DATASET_ID,
    table_id=TABLE_ID,
)
print("BigQueryTraceEvaluator ready.")

In [None]:
import json

# Tokyo trip (session index 1) — IN_ORDER match
# The complex trip should call all 4 tools in sequence.
golden_tokyo = [
    {"tool_name": "search_flights"},
    {"tool_name": "search_hotels"},
    {"tool_name": "get_weather_forecast"},
    {"tool_name": "calculate_trip_budget"},
]

try:
    result = asyncio.get_event_loop().run_until_complete(
        trace_evaluator.evaluate_session(
            session_id=session_ids[1],
            golden_trajectory=golden_tokyo,
            match_type=MatchType.IN_ORDER,
        )
    )
    print("[Trajectory: Tokyo trip — IN_ORDER]")
    print(f"  Session : {result.session_id}")
    print(f"  Status  : {result.eval_status}")
    print(f"  Scores  : {result.scores}")
    if result.details:
        print(f"  Details : {json.dumps(result.details, indent=2)}")
except Exception as exc:
    print(f"Trajectory evaluation failed: {exc}")

In [None]:
# Compare match types: EXACT vs ANY_ORDER on the same session
for match_type in [MatchType.EXACT, MatchType.ANY_ORDER]:
    try:
        result = asyncio.get_event_loop().run_until_complete(
            trace_evaluator.evaluate_session(
                session_id=session_ids[1],
                golden_trajectory=golden_tokyo,
                match_type=match_type,
            )
        )
        print(f"\n[Trajectory: Tokyo — {match_type.value}]")
        print(f"  Status: {result.eval_status}  "
              f"Scores: {result.scores}")
    except Exception as exc:
        print(f"  {match_type.value} failed: {exc}")

In [None]:
# Batch evaluation across all sessions
eval_dataset = [
    {
        "session_id": session_ids[0],
        "expected_trajectory": [
            {"tool_name": "search_flights"},
            {"tool_name": "search_hotels"},
        ],
    },
    {
        "session_id": session_ids[1],
        "expected_trajectory": golden_tokyo,
    },
    {
        "session_id": session_ids[2],
        "expected_trajectory": [
            {"tool_name": "get_weather_forecast"},
            {"tool_name": "search_flights"},
            {"tool_name": "search_hotels"},
        ],
    },
]

try:
    batch_results = asyncio.get_event_loop().run_until_complete(
        trace_evaluator.evaluate_batch(
            eval_dataset=eval_dataset,
            match_type=MatchType.IN_ORDER,
        )
    )
    print("[Batch Trajectory Evaluation — IN_ORDER]")
    for r in batch_results:
        print(f"  {r.session_id}: {r.eval_status}  "
              f"scores={r.scores}")
except Exception as exc:
    print(f"Batch evaluation failed: {exc}")

---

## Phase 6: Grader Pipeline

Compose multiple evaluators (code + LLM) into a single **GraderPipeline** with configurable voting strategies: `WeightedStrategy`, `BinaryStrategy`, or `MajorityStrategy`.

In [None]:
from bigquery_agent_analytics import (
    GraderPipeline,
    WeightedStrategy,
    BinaryStrategy,
    MajorityStrategy,
)

# Build a weighted pipeline with code + LLM graders
pipeline = (
    GraderPipeline(WeightedStrategy(threshold=0.6))
    .add_code_grader(
        CodeEvaluator.latency(threshold_ms=30000),
        weight=1.0,
    )
    .add_code_grader(
        CodeEvaluator.error_rate(max_error_rate=0.1),
        weight=1.0,
    )
    .add_llm_grader(
        LLMAsJudge.correctness(threshold=0.6),
        weight=2.0,
    )
)
print("GraderPipeline built (weighted: code=1.0 + code=1.0 + llm=2.0).")

In [None]:
# Construct session_summary from trace metadata and evaluate
# We use the Tokyo trip trace (index 1) as an example.
import io, contextlib

trace_idx = 1
if traces[trace_idx] is not None:
    trace = traces[trace_idx]
    session_summary = {
        "session_id": trace.session_id,
        "total_events": len(trace.spans),
        "tool_calls": len(trace.tool_calls),
        "tool_errors": len(trace.error_spans),
        "llm_calls": sum(
            1 for s in trace.spans
            if s.event_type in ("llm_request", "llm_response")
        ),
        "avg_latency_ms": (
            trace.total_latency_ms / max(len(trace.spans), 1)
            if trace.total_latency_ms
            else 0.0
        ),
        "max_latency_ms": max(
            (s.latency_ms or 0 for s in trace.spans), default=0
        ),
        "total_latency_ms": trace.total_latency_ms or 0.0,
        "turn_count": sum(
            1 for s in trace.spans if s.event_type == "user_message"
        ),
        "has_error": len(trace.error_spans) > 0,
        "input_tokens": sum(
            s.attributes.get("input_tokens", 0) or 0
            for s in trace.spans
        ),
        "output_tokens": sum(
            s.attributes.get("output_tokens", 0) or 0
            for s in trace.spans
        ),
        "total_tokens": sum(
            s.attributes.get("total_tokens", 0) or 0
            for s in trace.spans
        ),
    }

    # Get trace text (suppress render's print) and final response
    buf = io.StringIO()
    with contextlib.redirect_stdout(buf):
        trace_text = trace.render(format="tree")
    if not isinstance(trace_text, str):
        trace_text = buf.getvalue()
    final_response = trace.final_response or ""

    verdict = asyncio.get_event_loop().run_until_complete(
        pipeline.evaluate(
            session_summary=session_summary,
            trace_text=trace_text,
            final_response=final_response,
        )
    )

    print(f"[GraderPipeline — Weighted]")
    print(f"  Final score : {verdict.final_score:.3f}")
    print(f"  Passed      : {verdict.passed}")
    print(f"  Strategy    : {verdict.strategy_name}")
    print(f"  Grader breakdown:")
    for gr in verdict.grader_results:
        print(f"    - {gr.grader_name}: scores={gr.scores} "
              f"passed={gr.passed}")
else:
    print("Trace not available — skipping pipeline evaluation.")

In [None]:
# Demo alternative strategies: Binary and Majority
if traces[trace_idx] is not None:
    for strategy_cls, strategy_name in [
        (BinaryStrategy, "Binary (all must pass)"),
        (MajorityStrategy, "Majority"),
    ]:
        alt_pipeline = (
            GraderPipeline(strategy_cls())
            .add_code_grader(
                CodeEvaluator.latency(threshold_ms=30000),
            )
            .add_code_grader(
                CodeEvaluator.error_rate(max_error_rate=0.1),
            )
            .add_llm_grader(
                LLMAsJudge.correctness(threshold=0.6),
            )
        )
        v = asyncio.get_event_loop().run_until_complete(
            alt_pipeline.evaluate(
                session_summary=session_summary,
                trace_text=trace_text,
                final_response=final_response,
            )
        )
        print(f"\n[GraderPipeline — {strategy_name}]")
        print(f"  Final score: {v.final_score:.3f}  "
              f"Passed: {v.passed}")

---

## Phase 7: Eval Suite & Validator

The **EvalSuite** manages evaluation task definitions, supports capability-to-regression graduation, and exports to eval datasets. The **EvalValidator** performs sanity checks (ambiguity, balance, threshold consistency, duplicates, saturation).

In [None]:
from bigquery_agent_analytics import (
    EvalSuite,
    EvalTaskDef,
    EvalCategory,
    EvalValidator,
)

suite = EvalSuite(name="travel_agent_evals")

# Add tasks from Phase 1 sessions
suite.add_task(EvalTaskDef(
    task_id="simple_trip_sf_ny",
    session_id=session_ids[0],
    description="Simple SF->NY weekend trip — should call flights + hotels.",
    category=EvalCategory.CAPABILITY,
    expected_trajectory=[
        {"tool_name": "search_flights"},
        {"tool_name": "search_hotels"},
    ],
    thresholds={"trajectory_match": 0.8, "latency": 0.7},
    tags=["simple", "domestic"],
))

suite.add_task(EvalTaskDef(
    task_id="complex_trip_tokyo",
    session_id=session_ids[1],
    description="Complex LA->Tokyo 5-day trip — all 4 tools expected.",
    category=EvalCategory.CAPABILITY,
    expected_trajectory=golden_tokyo,
    thresholds={"trajectory_match": 0.9, "latency": 0.6},
    tags=["complex", "international"],
))

suite.add_task(EvalTaskDef(
    task_id="multiturn_paris",
    session_id=session_ids[2],
    description="Multi-turn Chicago->Paris — weather, flights, hotels across 3 turns.",
    category=EvalCategory.REGRESSION,
    expected_trajectory=[
        {"tool_name": "get_weather_forecast"},
        {"tool_name": "search_flights"},
        {"tool_name": "search_hotels"},
    ],
    thresholds={"trajectory_match": 0.8},
    tags=["multi-turn", "international"],
))

print(f"EvalSuite '{suite.name}' — {len(suite.get_tasks())} tasks added.")
for t in suite.get_tasks():
    print(f"  [{t.category.value}] {t.task_id}: {t.description}")

In [None]:
# Suite health check
pass_history = {
    "simple_trip_sf_ny": [True, True, True, True, True],
    "complex_trip_tokyo": [True, False, True, True, True],
    "multiturn_paris": [True, True, True, True, True],
}

health = suite.check_health(pass_history=pass_history)
print("[Suite Health]")
print(f"  Total tasks      : {health.total_tasks}")
print(f"  Capability tasks : {health.capability_tasks}")
print(f"  Regression tasks : {health.regression_tasks}")
print(f"  Positive cases   : {health.positive_cases}")
print(f"  Negative cases   : {health.negative_cases}")
print(f"  Balance ratio    : {health.balance_ratio:.2f}")
print(f"  Saturated tasks  : {health.saturated_task_ids}")
if health.warnings:
    print(f"  Warnings:")
    for w in health.warnings:
        print(f"    - {w}")

In [None]:
# Validate suite
warnings = EvalValidator.validate_suite(
    suite, pass_history=pass_history
)
print(f"[EvalValidator] {len(warnings)} warnings:")
for w in warnings:
    print(f"  [{w.severity}] {w.check_name} "
          f"(task={w.task_id}): {w.message}")
if not warnings:
    print("  No warnings — suite looks healthy!")

In [None]:
# Export to eval dataset
eval_ds = suite.to_eval_dataset()
print(f"Exported {len(eval_ds)} tasks to eval dataset format:")
for entry in eval_ds:
    print(f"  session_id={entry['session_id']}  "
          f"trajectory_len="
          f"{len(entry.get('expected_trajectory', []))}")

---

## Phase 8: Multi-Trial Evaluation

LLM agents are non-deterministic. The **TrialRunner** repeats evaluation N times to compute:
- **pass@k** — probability that at least one trial passes
- **pass^k** — probability that all k trials pass
- **per_trial_pass_rate** — fraction of trials that passed
- **mean_scores** and **score_std_dev** — statistics across trials

In [None]:
from bigquery_agent_analytics import TrialRunner

trial_runner = TrialRunner(
    evaluator=trace_evaluator,
    num_trials=3,
    concurrency=3,
)

try:
    trial_report = asyncio.get_event_loop().run_until_complete(
        trial_runner.run_trials(
            session_id=session_ids[1],
            golden_trajectory=golden_tokyo,
            match_type=MatchType.IN_ORDER,
            use_llm_judge=True,
        )
    )
    print("[Multi-Trial Report — Tokyo trip, 3 trials]")
    print(f"  pass@k             : {trial_report.pass_at_k:.3f}")
    print(f"  pass^k             : {trial_report.pass_pow_k:.3f}")
    print(f"  per_trial_pass_rate: {trial_report.per_trial_pass_rate:.3f}")
    print(f"  mean_scores        : {trial_report.mean_scores}")
    print(f"  score_std_dev      : {trial_report.score_std_dev}")
    print(f"\n  Per-trial results:")
    for tr in trial_report.trial_results:
        print(f"    Trial {tr.trial_index}: passed={tr.passed} "
              f"scores={tr.scores}")
except Exception as exc:
    print(f"Multi-trial evaluation failed: {exc}")

---

## Phase 9: Insights Report

The **Insights** pipeline is a multi-stage AI-powered analysis:
1. Session filtering and metadata extraction
2. Per-session facet extraction (goals, outcomes, friction)
3. Cross-session aggregation
4. Multi-prompt analysis (7 specialised prompts)
5. Executive summary generation

In [None]:
from bigquery_agent_analytics import InsightsConfig

try:
    insights_report = asyncio.get_event_loop().run_until_complete(
        asyncio.to_thread(
            client.insights,
            filters=TraceFilter(session_ids=session_ids),
            config=InsightsConfig(
                max_sessions=10,
                min_events_per_session=3,
                min_turns_per_session=1,
            ),
        )
    )
    print("[Insights Report]")
    print(insights_report.summary())
except Exception as exc:
    print(f"Insights generation failed: {exc}")

In [None]:
# Executive summary
try:
    if insights_report.executive_summary:
        print("[Executive Summary]")
        print(insights_report.executive_summary)
    else:
        print("No executive summary generated.")
except NameError:
    print("Insights report not available — run previous cell first.")

In [None]:
# Analysis sections
try:
    for section in insights_report.analysis_sections:
        print(f"\n## {section.title}")
        print(section.content[:2000])
        if len(section.content) > 2000:
            print("  ... (truncated)")
except NameError:
    print("Insights report not available — run previous cells first.")

In [None]:
# Per-session facets
try:
    print("[Session Facets]")
    for facet in insights_report.session_facets:
        print(f"\n  Session: {facet.session_id}")
        if facet.goal_categories:
            print(f"    Goal categories  : {facet.goal_categories}")
        if facet.outcome:
            print(f"    Outcome          : {facet.outcome}")
        if facet.satisfaction:
            print(f"    Satisfaction     : {facet.satisfaction}")
        if facet.key_topics:
            print(f"    Key topics       : {facet.key_topics}")
        print(f"    Effectiveness    : {facet.agent_effectiveness}")
        print(f"    Primary success  : {facet.primary_success}")
except NameError:
    print("Insights report not available — run previous cells first.")

---

## Phase 10: Deep Analysis & Drift Detection

**Deep analysis** performs question distribution analysis — grouping user queries into semantic categories. **Drift detection** compares production questions against a golden dataset to measure coverage.

In [None]:
from bigquery_agent_analytics import AnalysisConfig

try:
    question_dist = asyncio.get_event_loop().run_until_complete(
        asyncio.to_thread(
            client.deep_analysis,
            filters=TraceFilter(session_ids=session_ids),
            configuration=AnalysisConfig(
                mode="frequently_asked",
            ),
        )
    )
    print("[Deep Analysis — Frequently Asked Questions]")
    print(question_dist.summary())
    print(f"\nTotal questions: {question_dist.total_questions}")
    for cat in question_dist.categories:
        print(f"\n  Category: {cat.name} "
              f"(count={cat.count}, {cat.percentage:.1f}%)")
        for ex in cat.examples[:3]:
            print(f"    - {ex}")
except Exception as exc:
    print(f"Deep analysis failed: {exc}")

In [None]:
# Drift detection requires a golden dataset table in BigQuery.
# Below shows the API pattern — uncomment and provide your golden table.

# from bigquery_agent_analytics import DriftReport
#
# drift_report = asyncio.get_event_loop().run_until_complete(
#     asyncio.to_thread(
#         client.drift_detection,
#         golden_dataset="your_project.your_dataset.golden_questions",
#         filters=TraceFilter(session_ids=session_ids),
#     )
# )
# print("[Drift Detection]")
# print(drift_report.summary())
# print(f"  Coverage: {drift_report.coverage_percentage:.1f}%")
# print(f"  Uncovered questions: {drift_report.uncovered_questions}")
# print(f"  New questions: {drift_report.new_questions}")

print("Drift detection requires a golden dataset table — see commented code above.")

---

## Summary

This notebook demonstrated the full **BigQuery Agent Analytics SDK** lifecycle:

| Phase | Feature | Description |
|---|---|---|
| 1 | **Agent Execution** | Ran a travel planner agent with 4 tools, logged traces to BigQuery |
| 2 | **Trace Retrieval** | Fetched traces, rendered hierarchical DAGs, inspected tool calls |
| 3 | **Code Evaluation** | Latency, turn count, error rate, token efficiency, cost, custom metrics |
| 4 | **LLM-as-Judge** | Correctness, hallucination/faithfulness, sentiment scoring |
| 5 | **Trajectory Matching** | EXACT, IN_ORDER, ANY_ORDER matching against golden trajectories |
| 6 | **Grader Pipeline** | Composed evaluators with Weighted, Binary, and Majority strategies |
| 7 | **Eval Suite** | Task management, health checks, validation, dataset export |
| 8 | **Multi-Trial** | N-trial evaluation with pass@k, pass^k, and score statistics |
| 9 | **Insights** | AI-powered multi-stage analysis with executive summaries |
| 10 | **Deep Analysis** | Question distribution analysis and drift detection patterns |

### Key Takeaways

- **Production-ready logging**: The `BigQueryAgentAnalyticsPlugin` integrates directly with ADK's Runner to capture every agent event.
- **Multi-level evaluation**: From deterministic code metrics to semantic LLM judges to trajectory matching — evaluate agents at every level.
- **Composable grading**: The `GraderPipeline` lets you combine evaluators with flexible voting strategies for nuanced pass/fail decisions.
- **Suite management**: `EvalSuite` + `EvalValidator` support capability-to-regression graduation and health monitoring.
- **Non-determinism handling**: `TrialRunner` repeats evaluations to compute robust pass@k/pass^k metrics.
- **AI-powered insights**: The insights pipeline and deep analysis provide actionable intelligence about agent behavior at scale.

In [None]:
# Cleanup
try:
    asyncio.get_event_loop().run_until_complete(
        plugin.shutdown(timeout=10.0)
    )
except Exception:
    pass

print("\nDemo complete!")
print(f"Sessions: {session_ids}")
print(f"Traces logged to: {PROJECT_ID}.{DATASET_ID}.{TABLE_ID}")