In [None]:
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Demo: BigQuery Agent Analytics Plugin + Conversational Analytics — A Closed-Loop Integration

This notebook demonstrates a **complete closed-loop** workflow for monitoring and analyzing AI agents using Google BigQuery:

```
Agent runs → BQ AA Plugin auto-logs events → BQ Table → CA Data Agent → Natural Language Insights
```

You will:
1. Build a live ADK agent (`my_bq_agent`) with `BigQueryToolset` and `Gemini`
2. Instrument it with `BigQueryAgentAnalyticsPlugin` to auto-log all events to BigQuery
3. Create a **Conversational Analytics (CA) Data Agent** over that event log table
4. Analyze agent behavior using **both** manual SQL **and** natural language questions via the CA SDK

<table align="left">

  <td>
    <a href="https://colab.research.google.com/github/haiyuan-eng-google/demo_BQ_agent_analytics_plugin_notebook/blob/main/NY_City_Bike_Agent_Logging.ipynb">
      <img src="https://raw.githubusercontent.com/googleapis/python-bigquery-dataframes/refs/heads/main/third_party/logo/colab-logo.png" alt="Colab logo"> Run in Colab
    </a>
  </td>
  <td>
    <a href="https://github.com/haiyuan-eng-google/demo_BQ_agent_analytics_plugin_notebook/blob/main/NY_City_Bike_Agent_Logging.ipynb">
      <img src="https://raw.githubusercontent.com/googleapis/python-bigquery-dataframes/refs/heads/main/third_party/logo/github-logo.png" width="32" alt="GitHub logo">
      View on GitHub
    </a>
  </td>
  <td>
    <a href="https://console.cloud.google.com/vertex-ai/workbench/deploy-notebook?download_url=https://raw.githubusercontent.com/haiyuan-eng-google/demo_BQ_agent_analytics_plugin_notebook/main/NY_City_Bike_Agent_Logging.ipynb">
      <img src="https://www.gstatic.com/images/branding/product/1x/google_cloud_48dp.png" alt="Vertex AI logo" width="32">
      Open in Vertex AI Workbench
    </a>
  </td>
  <td>
    <a href="https://console.cloud.google.com/bigquery/import?url=https://github.com/haiyuan-eng-google/demo_BQ_agent_analytics_plugin_notebook/blob/main/NY_City_Bike_Agent_Logging.ipynb">
      <img src="https://encrypted-tbn0.gstatic.com/images?q=tbn:ANd9GcTW1gvOovVlbZAIZylUtf5Iu8-693qS1w5NJw&s" alt="BQ logo" width="35">
      Open in BQ Studio
    </a>
  </td>
</table>

**_NOTE_**: This notebook has been tested in the following environment:

* Python version = 3.10+

### Costs
This tutorial uses billable components of Google Cloud:
* BigQuery (storage and compute)
* Vertex AI (Gemini models for analysis)
* Conversational Analytics API (free during Preview)

Learn about [BigQuery pricing](https://cloud.google.com/bigquery/pricing) and [Vertex AI pricing](https://cloud.google.com/vertex-ai/pricing).

## Environment setup

Complete the tasks in this section to set up your environment.
This section includes
1. Authentication
2. Agent and plugin setup to stream events into BigQuery table
3. **Run the agent** with sample queries to generate event data
4. Initialize BQ client for further analysis of the agent events
5. Install the Conversational Analytics SDK and create a CA Data Agent over the event log table

In [None]:
# Authentication
from google.colab import auth as google_auth
google_auth.authenticate_user()
print('Authenticated')

Authenticated


In [None]:
# Import Libraries & Initialize Plugin, Tools, Models and Agent
import google.auth
import os
from google.adk.agents import Agent
from google.adk.apps import App
from google.adk.models.google_llm import Gemini
from google.adk.plugins.bigquery_agent_analytics_plugin import BigQueryAgentAnalyticsPlugin
from google.adk.tools.bigquery import BigQueryCredentialsConfig, BigQueryToolset
from google.adk.tools.tool_context import ToolContext

# Configuration
PROJECT_ID = "your-project-id"  # @param {type:"string"}  # @param {type:"string"}
DATASET_ID = "agent_ops_demo"  # @param {type:"string"}  # @param {type:"string"}
TABLE_ID = "agent_events"  # @param {type:"string"}  # @param {type:"string"}
LOCATION = "us-central1"  # @param {type:"string"}
CONNECTION_ID = "us-central1.bqml_connection"  # @param {type:"string"} Cloud Resource Connection for Gemini SQL functions

# Ensure environment variables are set for ADK/Vertex AI
os.environ["GOOGLE_CLOUD_PROJECT"] = PROJECT_ID
os.environ["GOOGLE_CLOUD_LOCATION"] = (
    "us-central1"  # Do not use US or EU since they are not compatible with Vertex AI endpoint
)
os.environ["GOOGLE_GENAI_USE_VERTEXAI"] = (
    "True"  # Make sure you have Vertex AI API enabled
)

# --- Initialize the Plugin ---
bq_logging_plugin = BigQueryAgentAnalyticsPlugin(
    project_id=PROJECT_ID,  # project_id is required input from user
    dataset_id=DATASET_ID,  # dataset_id is required input from user
    table_id=TABLE_ID,
    # Optional: defaults to "agent_events". The plugin automatically creates
    # this table if it doesn't exist.
)
print(f"BigQueryAgentAnalyticsPlugin initialized, streaming data to {PROJECT_ID}:{DATASET_ID}.{TABLE_ID}")

# --- Initialize Tools & Model ---
credentials, _ = google.auth.default(
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
bigquery_toolset = BigQueryToolset(
    credentials_config=BigQueryCredentialsConfig(credentials=credentials)
)

llm = Gemini(
    model="gemini-2.5-flash",
)

def set_state(key: str, value: str, tool_context: ToolContext) -> str:
  """Sets a key-value pair in the session state."""
  tool_context.state[key] = value
  return f"Set state {key} to {value}"


root_agent = Agent(
    model=llm,
    name="my_bq_agent",
    instruction=(
        "You are a helpful assistant with access to BigQuery tools. "
        "When users ask about NYC Citi Bike data, query the public dataset "
        "`bigquery-public-data.new_york_citibike.citibike_trips` and "
        "`bigquery-public-data.new_york_citibike.citibike_stations`. "
        "Always use the user's project for billing: " + PROJECT_ID + ". "
        "You can also set session state using the `set_state` tool."
    ),
    tools=[bigquery_toolset, set_state],
    generate_content_config={
        "temperature": 0.5,
        "top_p": 0.9,
    },
)

# --- Create the App ---
app = App(
    name="my_bq_agent",
    root_agent=root_agent,
    plugins=[bq_logging_plugin], # Register the plugin here
)
print(f"my_bq_agent initialized for project {PROJECT_ID}, dataset {DATASET_ID}")

### Run the Agent to Generate Event Data

Now we run `my_bq_agent` with sample user queries. The `BigQueryAgentAnalyticsPlugin` automatically captures every event (user messages, LLM calls, tool executions) and streams them to BigQuery.

This is what makes the notebook **self-contained** — the same agent whose events we analyze later is run right here.

In [None]:
import uuid
import time
from google.adk.runners import InMemoryRunner
from google.genai import types

# Create a runner from the App (includes the BQ AA Plugin for auto-logging)
runner = InMemoryRunner(app=app)

# Sample user queries to exercise different agent capabilities
user_queries = [
    ("user1", [
        "What datasets and tables are available for NYC Citi Bike data?",
        "Show me the average trip duration in minutes from the citibike_trips table",
        "Which are the top 5 most popular start stations by trip count?",
    ]),
    ("user2", [
        "What is the average trip duration by user type (subscriber vs customer)?",
        "Show me the number of trips per month for the last available year",
    ]),
    ("user3", [
        "What are the busiest hours of the day for bike trips?",
        "Show me the trip count trend over the years",
    ]),
    ("user4", [
        "Which bike stations have the longest average trip durations?",
        "Are there any tables with personal or sensitive data in this project?",
    ]),
]


async def run_agent_queries():
    """Send sample queries to the agent, generating events logged by the BQ AA Plugin."""
    for user_id, queries in user_queries:
        session_id = f"{user_id}_{uuid.uuid4().hex[:8]}"
        session = await runner.session_service.create_session(
            app_name=app.name,
            user_id=user_id,
            session_id=session_id,
        )
        for query in queries:
            print(f"\n[{user_id}] Asking: {query}")
            content = types.Content(
                role="user",
                parts=[types.Part.from_text(text=query)],
            )
            try:
                async for event in runner.run_async(
                    user_id=user_id,
                    session_id=session.id,
                    new_message=content,
                ):
                    if event.content and event.content.parts:
                        for part in event.content.parts:
                            if part.text:
                                preview = part.text[:200].replace('\n', ' ')
                                print(f"  [{event.author}]: {preview}")
                                break
            except Exception as e:
                print(f"  Error: {e}")


await run_agent_queries()
print("\nAll agent queries completed.")

In [None]:
# Wait for BigQuery streaming writes to flush
print("Waiting for BQ streaming writes to flush (30 seconds)...")
time.sleep(30)

# Verify events are now in BigQuery
from google.cloud import bigquery as _bq
_verify_client = _bq.Client(project=PROJECT_ID, location=LOCATION)
verify_sql = f"""
SELECT event_type, COUNT(*) as count
FROM `{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}`
WHERE agent = 'my_bq_agent'
GROUP BY event_type
ORDER BY count DESC
"""
try:
    df_verify = _verify_client.query(verify_sql).to_dataframe()
    print(f"Events found in BigQuery ({df_verify['count'].sum()} total):")
    print(df_verify.to_markdown(index=False))
except Exception as e:
    print(f"Verification query failed: {e}")
    print("Events may still be streaming. Proceed and the queries below will pick them up.")

In [None]:
# Install Conversational Analytics SDK
!pip install -q google-cloud-geminidataanalytics

# Import Libraries & Initialize BigQuery Clients
from google.cloud import bigquery
from google.cloud import geminidataanalytics
import pandas as pd
import json
import os

# Initialize BigQuery Client
bq_client = bigquery.Client(project=PROJECT_ID, location=LOCATION)
print(f"BigQuery client initialized for project {PROJECT_ID}, dataset {DATASET_ID}")

# Initialize Conversational Analytics Clients
data_agent_client = geminidataanalytics.DataAgentServiceClient()
data_chat_client = geminidataanalytics.DataChatServiceClient()
CA_LOCATION = "global"  # CA API always uses global
import uuid as _uuid
_run_id = _uuid.uuid4().hex[:8]
CA_AGENT_ID = f"ny_bike_agent_ops_{_run_id}"
CA_CONVERSATION_ID = f"ny_bike_convo_{_run_id}"
print(f"Conversational Analytics clients initialized (location: {CA_LOCATION})")

# Helper function to run BigQuery jobs
def run_bq_query(sql):
    return bq_client.query(sql).to_dataframe()

def run_bq_job(sql):
    bq_client.query(sql).result()
    print("BigQuery job finished.")

# --- CA Helper Functions ---

def ca_ask(question):
    """Send a natural language question to the CA Data Agent and return streaming responses."""
    messages = [
        geminidataanalytics.Message(
            user_message=geminidataanalytics.UserMessage(text=question)
        )
    ]
    conversation_name = data_chat_client.conversation_path(
        PROJECT_ID, CA_LOCATION, CA_CONVERSATION_ID
    )
    agent_name = data_agent_client.data_agent_path(
        PROJECT_ID, CA_LOCATION, CA_AGENT_ID
    )
    request = geminidataanalytics.ChatRequest(
        parent=f"projects/{PROJECT_ID}/locations/{CA_LOCATION}",
        messages=messages,
        conversation_reference=geminidataanalytics.ConversationReference(
            conversation=conversation_name,
            data_agent_context=geminidataanalytics.DataAgentContext(
                data_agent=agent_name
            ),
        ),
    )
    try:
        return list(data_chat_client.chat(request=request, timeout=300))
    except Exception as e:
        print(f"CA query error: {e}")
        return []


def display_ca_response(responses):
    """Parse and display CA streaming responses: text, SQL, data tables, and charts."""
    for resp in responses:
        m = resp.system_message
        if "text" in m:
            text_msg = m.text
            if text_msg.text_type == geminidataanalytics.TextMessage.TextType.PROGRESS:
                print(f"[Progress] {''.join(text_msg.parts)}")
            elif text_msg.text_type == geminidataanalytics.TextMessage.TextType.THOUGHT:
                pass  # Skip internal thoughts
            else:
                print("\n--- CA Insight ---")
                print("\n".join(text_msg.parts))
        elif "data" in m:
            data_msg = m.data
            if "generated_sql" in data_msg:
                print("\n--- Generated SQL ---")
                print(data_msg.generated_sql)
            elif "result" in data_msg:
                print("\n--- Query Results ---")
                fields = [field.name for field in data_msg.result.schema.fields]
                d = {field: [] for field in fields}
                for row in data_msg.result.data:
                    for field in fields:
                        d[field].append(row[field])
                df = pd.DataFrame(d)
                print(df.to_markdown(index=False))
        elif "chart" in m:
            chart_msg = m.chart
            if "result" in chart_msg:
                print("\n--- Chart Generated (Vega-Lite spec available) ---")
        elif "error" in m:
            print(f"\n[CA Error] {m.error}")

## Phase 0: Create a Conversational Analytics Data Agent

This is the key integration point. We create a CA Data Agent that uses the **BQ Agent Analytics Plugin's event log table** as its knowledge source. This enables natural language querying of all agent events.

The Data Agent is configured with:
- **Data source**: The `agent_events` table populated by the BQ AA Plugin
- **Schema descriptions**: So the CA agent understands each column
- **Glossary terms**: Domain-specific terminology (event types, latency fields, etc.)
- **Verified queries**: Pre-validated SQL for common analysis patterns

In [None]:
# --- Create the CA Data Agent on the BQ AA Plugin event log table ---

# Define the data source: the BQ AA Plugin's event log table
bq_table_ref = geminidataanalytics.BigQueryTableReference(
    project_id=PROJECT_ID,
    dataset_id=DATASET_ID,
    table_id=TABLE_ID,
    schema=geminidataanalytics.Schema(
        description="Agent event logs auto-captured by the BigQuery Agent Analytics Plugin. Each row is one event in an agent's lifecycle.",
        fields=[
            geminidataanalytics.Field(name="timestamp", description="When the event occurred (UTC)"),
            geminidataanalytics.Field(name="event_type", description="Type of agent event: USER_MESSAGE_RECEIVED, LLM_REQUEST, LLM_RESPONSE, TOOL_STARTING, TOOL_COMPLETED, TOOL_ERROR, INVOCATION_STARTING, AGENT_COMPLETED"),
            geminidataanalytics.Field(name="agent", description="Name of the agent that produced this event"),
            geminidataanalytics.Field(name="session_id", description="Unique identifier for the user session"),
            geminidataanalytics.Field(name="invocation_id", description="Unique identifier for one user-to-agent invocation within a session"),
            geminidataanalytics.Field(name="user_id", description="Identifier for the user who triggered the event"),
            geminidataanalytics.Field(name="content", description="JSON payload whose structure varies by event_type (e.g., text_summary for user messages, tool name for tool events, response for LLM responses)"),
            geminidataanalytics.Field(name="latency_ms", description="JSON field containing total_ms (total latency in milliseconds) and time_to_first_token_ms"),
            geminidataanalytics.Field(name="status", description="Outcome status of the event (e.g., success, error)"),
            geminidataanalytics.Field(name="error_message", description="Error details if the event failed, NULL otherwise"),
            geminidataanalytics.Field(name="attributes", description="JSON field with additional metadata and attributes"),
            geminidataanalytics.Field(name="trace_id", description="Distributed tracing trace ID"),
            geminidataanalytics.Field(name="span_id", description="Distributed tracing span ID"),
            geminidataanalytics.Field(name="content_parts", description="Structured content parts with text, function calls, and function responses"),
            geminidataanalytics.Field(name="messages", description="Message-level content with role and parts"),
        ],
    ),
)

datasource_references = geminidataanalytics.DatasourceReferences(
    bq=geminidataanalytics.BigQueryTableReferences(table_references=[bq_table_ref])
)

# Define glossary terms for domain-specific vocabulary
glossary_terms = [
    geminidataanalytics.GlossaryTerm(
        display_name="event_type",
        description="The type of agent event. Values include: USER_MESSAGE_RECEIVED (user sent a message), LLM_REQUEST (request sent to the LLM), LLM_RESPONSE (response received from LLM), TOOL_STARTING (tool execution began), TOOL_COMPLETED (tool execution finished), TOOL_ERROR (tool execution failed), INVOCATION_STARTING (new invocation began), AGENT_COMPLETED (agent finished processing)",
    ),
    geminidataanalytics.GlossaryTerm(
        display_name="latency_ms",
        description="JSON field containing total_ms (total latency from start to finish in milliseconds) and time_to_first_token_ms (time until the first token was generated)",
    ),
    geminidataanalytics.GlossaryTerm(
        display_name="content",
        description="JSON payload that varies by event_type. For USER_MESSAGE_RECEIVED: contains text_summary. For TOOL_COMPLETED: contains tool name. For LLM_RESPONSE: contains response text.",
    ),
    geminidataanalytics.GlossaryTerm(
        display_name="session",
        description="A single conversation between a user and the agent, identified by session_id",
    ),
    geminidataanalytics.GlossaryTerm(
        display_name="invocation",
        description="One user-to-agent request-response cycle within a session, identified by invocation_id",
    ),
]

# Define verified queries (example queries that guide the CA agent)
example_queries = [
    geminidataanalytics.ExampleQuery(
        natural_language_question="Show usage monitoring — daily active users, sessions, invocations, and average latency",
        sql_query=f"""SELECT DATE(timestamp) AS usage_date, COUNT(DISTINCT user_id) AS unique_active_users, COUNT(DISTINCT session_id) AS total_sessions, COUNTIF(event_type = 'INVOCATION_STARTING') AS total_invocations, ROUND(AVG(SAFE_CAST(JSON_VALUE(latency_ms, '$.total_ms') AS INT64)), 2) AS avg_completion_latency_ms FROM `{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}` GROUP BY usage_date ORDER BY usage_date DESC""",
    ),
    geminidataanalytics.ExampleQuery(
        natural_language_question="Show all errors with timestamps and error messages",
        sql_query=f"""SELECT timestamp, session_id, event_type, error_message FROM `{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}` WHERE error_message IS NOT NULL ORDER BY timestamp DESC LIMIT 10""",
    ),
    geminidataanalytics.ExampleQuery(
        natural_language_question="Analyze performance by event type — show average, max, and p99 latency",
        sql_query=f"""SELECT event_type, ROUND(AVG(CAST(JSON_VALUE(latency_ms, '$.total_ms') AS FLOAT64)) / 1000, 2) AS avg_latency_sec, ROUND(MAX(CAST(JSON_VALUE(latency_ms, '$.total_ms') AS FLOAT64)) / 1000, 2) AS max_latency_sec, COUNT(*) AS event_count FROM `{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}` WHERE agent = 'my_bq_agent' AND latency_ms IS NOT NULL GROUP BY event_type ORDER BY avg_latency_sec DESC""",
    ),
    geminidataanalytics.ExampleQuery(
        natural_language_question="Which tool calls have the highest latency?",
        sql_query=f"""SELECT JSON_VALUE(content, '$.tool') AS tool_name, ROUND(SAFE_CAST(JSON_VALUE(latency_ms, '$.total_ms') AS FLOAT64) / 1000, 2) AS latency_sec, timestamp, session_id, user_id FROM `{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}` WHERE event_type = 'TOOL_COMPLETED' AND agent = 'my_bq_agent' AND latency_ms IS NOT NULL ORDER BY latency_sec DESC LIMIT 10""",
    ),
    geminidataanalytics.ExampleQuery(
        natural_language_question="Detect latency anomalies in agent completion events",
        sql_query=f"""WITH base_data AS (SELECT DATE_TRUNC(timestamp, MINUTE) AS event_minute, AVG(CAST(JSON_VALUE(latency_ms, '$.total_ms') AS FLOAT64)) AS avg_latency_ms FROM `{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}` WHERE agent = 'my_bq_agent' AND event_type = 'AGENT_COMPLETED' AND latency_ms IS NOT NULL GROUP BY 1), historical_data AS (SELECT event_minute, avg_latency_ms FROM base_data ORDER BY event_minute LIMIT 50), target_data AS (SELECT event_minute, avg_latency_ms FROM base_data ORDER BY event_minute DESC LIMIT 10) SELECT time_series_timestamp, time_series_data, is_anomaly, lower_bound, upper_bound, anomaly_probability FROM AI.DETECT_ANOMALIES((SELECT * FROM historical_data), (SELECT * FROM target_data), data_col => 'avg_latency_ms', timestamp_col => 'event_minute') ORDER BY time_series_timestamp DESC""",
    ),
    geminidataanalytics.ExampleQuery(
        natural_language_question="Classify user messages by intent and show the distribution",
        sql_query=f"""WITH user_messages AS (SELECT timestamp, user_id, JSON_VALUE(content, '$.text_summary') AS raw_message FROM `{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}` WHERE agent = 'my_bq_agent' AND event_type = 'USER_MESSAGE_RECEIVED' AND JSON_VALUE(content, '$.text_summary') IS NOT NULL LIMIT 50) SELECT user_id, raw_message, AI.CLASSIFY(raw_message, categories => ['Trend Analysis', 'Data Exploration', 'Location Service', 'Security', 'Other'], connection_id => '{PROJECT_ID}.{CONNECTION_ID}', endpoint => 'gemini-2.5-flash') AS ai_intent FROM user_messages""",
    ),
]

# Build the published context
published_context = geminidataanalytics.Context(
    system_instruction=(
        "You are an agent operations analyst. This table contains event logs "
        "from an AI agent (my_bq_agent) that helps users query NYC Citi Bike data. "
        "The logs are auto-captured by the BigQuery Agent Analytics Plugin. "
        "Help the user understand agent behavior, performance, errors, and usage patterns."
    ),
    datasource_references=datasource_references,
    glossary_terms=glossary_terms,
    example_queries=example_queries,
)

# Create the Data Agent
data_agent = geminidataanalytics.DataAgent(
    data_analytics_agent=geminidataanalytics.DataAnalyticsAgent(
        published_context=published_context
    ),
)

create_request = geminidataanalytics.CreateDataAgentRequest(
    parent=f"projects/{PROJECT_ID}/locations/{CA_LOCATION}",
    data_agent_id=CA_AGENT_ID,
    data_agent=data_agent,
)

try:
    ca_agent = data_agent_client.create_data_agent_sync(request=create_request)
    print(f"CA Data Agent created: {ca_agent.name}")
except Exception as e:
    if "already exists" in str(e).lower() or "ALREADY_EXISTS" in str(e):
        ca_agent = data_agent_client.get_data_agent(
            request=geminidataanalytics.GetDataAgentRequest(
                name=f"projects/{PROJECT_ID}/locations/{CA_LOCATION}/dataAgents/{CA_AGENT_ID}"
            )
        )
        print(f"CA Data Agent already exists, reusing: {ca_agent.name}")
    else:
        raise

In [None]:
# --- Create a conversation with the CA Data Agent ---

conversation = geminidataanalytics.Conversation(
    agents=[data_agent_client.data_agent_path(PROJECT_ID, CA_LOCATION, CA_AGENT_ID)],
)

conv_request = geminidataanalytics.CreateConversationRequest(
    parent=f"projects/{PROJECT_ID}/locations/{CA_LOCATION}",
    conversation_id=CA_CONVERSATION_ID,
    conversation=conversation,
)

try:
    ca_conversation = data_chat_client.create_conversation(request=conv_request)
    print(f"CA Conversation created: {ca_conversation.name}")
except Exception as e:
    if "already exists" in str(e).lower() or "ALREADY_EXISTS" in str(e):
        conv_name = data_chat_client.conversation_path(PROJECT_ID, CA_LOCATION, CA_CONVERSATION_ID)
        ca_conversation = data_chat_client.get_conversation(
            request=geminidataanalytics.GetConversationRequest(name=conv_name)
        )
        print(f"CA Conversation already exists, reusing: {ca_conversation.name}")
    else:
        raise

print("\nSetup complete! The CA Data Agent is ready to analyze agent event logs via natural language.")

## Phase 1: Understand the Agent Logging Table

In this phase, we query the same event log data using **both** approaches:
- **Manual SQL** — full control over the query
- **Conversational Analytics** — ask the same question in natural language

In [None]:
### Real-time Feed from my_bq_bot

print("Fetching latest My_bq_bot events...")
query = f"""
SELECT timestamp, event_type, session_id, user_id, TO_JSON_STRING(content) as content, error_message, status
FROM `{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}`

ORDER BY timestamp DESC
LIMIT 20
"""
try:
    df_events = run_bq_query(query)
    print(df_events.to_markdown(index=False))
except Exception as e:
    print(f"Error querying events (maybe run simulation first?): {e}")

Fetching latest My_bq_bot events...
| timestamp                        | event_type           | session_id                           | user_id   | content                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                              

In [None]:
# --- Ask the same question via Conversational Analytics ---
print("Asking CA: 'Show me the latest 20 agent events'\n")
try:
    responses = ca_ask("Show me the latest 20 agent events with timestamp, event type, session ID, user ID, content, and status")
    display_ca_response(responses)
except Exception as e:
    print(f"CA error: {e}")

### Conversational Analytics in Action

Throughout this notebook, every manual SQL query is paired with a **live CA SDK call** that asks the same question in natural language. The CA Data Agent we created in Phase 0:
1. **Writes SQL** — translating your natural language question into an executable query
2. **Returns results** — the same data you'd get from manual SQL
3. **Generates insights** — automatically finding patterns, trends, and anomalies

No screenshots needed — you can see it working live in every cell below!

## Phase 2: Analyze Agent Operations — Manual SQL vs Conversational Analytics

In [15]:
print("\n--- Usage Monitoring ---")
usage_sql = f"""
SELECT
    DATE(event_logging.timestamp) AS usage_date,
    COUNT(DISTINCT event_logging.user_id) AS unique_active_users,
    COUNT(DISTINCT event_logging.session_id) AS total_sessions,
    COUNTIF(event_logging.event_type = 'INVOCATION_STARTING') AS total_invocations,
    ROUND(AVG(SAFE_CAST(JSON_VALUE(event_logging.latency_ms, '$.total_ms') AS INT64)), 2) AS avg_completion_latency_ms
FROM
    `{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}` AS event_logging
GROUP BY
    usage_date
ORDER BY
    usage_date DESC
"""
try:
    df_usage = run_bq_query(usage_sql)
    print(df_usage.to_markdown(index=False))
except Exception as e:
    print(f"Error: {e}")


--- Usage Monitoring ---
| usage_date   |   unique_active_users |   total_sessions |   total_invocations |   avg_completion_latency_ms |
|:-------------|----------------------:|-----------------:|--------------------:|----------------------------:|
| 2026-02-20   |                     2 |                2 |                   5 |                     3033.44 |
| 2026-02-19   |                     3 |                3 |                  20 |                     2707.79 |
| 2026-02-09   |                     6 |                6 |                  26 |                     3991.28 |


In [None]:
# --- CA equivalent: Usage Monitoring ---
print("Asking CA: 'Show usage monitoring'\n")
try:
    responses = ca_ask("Show usage monitoring — daily active users, sessions, invocations, and average latency")
    display_ca_response(responses)
except Exception as e:
    print(f"CA error: {e}")

### Manual SQL vs CA: Side by Side

The cell above demonstrates how the CA Data Agent generates the same results as the manual SQL — plus automatic insights and follow-up suggestions. Both approaches query the same BQ AA Plugin event log table.

In [None]:
# Error Analysis
print("\n--- Error Analysis ---")
error_sql = f"""
SELECT timestamp, session_id, event_type, TO_JSON_STRING(content) as content, error_message
FROM `{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}`
WHERE agent = 'my_bq_agent' AND error_message IS NOT NULL
ORDER BY timestamp DESC
LIMIT 10;
"""
try:
    df_errors = run_bq_query(error_sql)
    print(df_errors.to_markdown(index=False))
except Exception as e:
    print(f"Error: {e}")

In [None]:
# --- CA equivalent: Error Analysis ---
print("Asking CA: 'Show me all errors'\n")
try:
    responses = ca_ask("Show me all errors with timestamps and error messages")
    display_ca_response(responses)
except Exception as e:
    print(f"CA error: {e}")

In [None]:
# Granular Cost Tracking
print("\n--- Granular Cost Tracking ---")
cost_sql = f"""
SELECT
  session_id,
  user_id,
  COUNT(*) as interaction_count,
  -- Approximation: 4 chars per token
  SUM(LENGTH(TO_JSON_STRING(content))) / 4 AS estimated_tokens,
  -- Example cost: $0.0001 per 1k tokens
  ROUND((SUM(LENGTH(TO_JSON_STRING(content))) / 4) / 1000 * 0.0001, 6) AS estimated_cost_usd
FROM `{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}`
GROUP BY session_id, user_id
ORDER BY estimated_cost_usd DESC
LIMIT 5;
"""
try:
    df_cost = run_bq_query(cost_sql)
    print(df_cost.to_markdown(index=False))
except Exception as e:
    print(f"Error: {e}")

In [None]:
# --- CA equivalent: Cost Tracking ---
print("Asking CA: 'Calculate per-session cost'\n")
try:
    responses = ca_ask("Calculate per-session cost by estimating token counts from content length")
    display_ca_response(responses)
except Exception as e:
    print(f"CA error: {e}")

## Phase 3: Performance Analysis

Dig deeper into agent performance with manual SQL and CA side by side.

In [18]:
print("\n--- Performance Analysis ---")
performance_sql = f"""
SELECT
    event_type,
    ROUND(AVG(CAST(JSON_VALUE(latency_ms, '$.total_ms') AS FLOAT64)) / 1000, 2) AS avg_latency_sec,
    ROUND(MAX(CAST(JSON_VALUE(latency_ms, '$.total_ms') AS FLOAT64)) / 1000, 2) AS max_latency_sec,
    COUNT(*) AS event_count
FROM `{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}`
WHERE agent = 'my_bq_agent'
  AND latency_ms IS NOT NULL
GROUP BY event_type
ORDER BY avg_latency_sec DESC;
"""
try:
    df_perf = run_bq_query(performance_sql)
    print(df_perf.to_markdown(index=False))
except Exception as e:
    print(f"Error: {e}")


--- Performance Analysis ---
| event_type      |   avg_latency_sec |   max_latency_sec |   event_count |
|:----------------|------------------:|------------------:|--------------:|
| AGENT_COMPLETED |              7.27 |             28.38 |            50 |
| LLM_RESPONSE    |              2.51 |              6.59 |            96 |
| TOOL_COMPLETED  |              1.65 |             23.27 |            74 |


In [None]:
# --- CA equivalent: Performance Analysis ---
print("Asking CA: 'Analyze performance by event type'\n")
try:
    responses = ca_ask("Analyze performance by event type — show average, max, and p99 latency")
    display_ca_response(responses)
except Exception as e:
    print(f"CA error: {e}")

✨ **Insights Generated by Conversational Analytics** ✨
* **LLM Bottlenecks:** LLM_RESPONSE has an average latency of ~2.5 seconds, which is often the primary driver of perceived wait times for users.
* **Long-Tail Latency:** While the average tool execution (TOOL_COMPLETED) is relatively fast at ~1.6 seconds, it has a massive maximum latency spike of ~23.3 seconds. This suggests that specific external tool calls occasionally experience significant delays.
* **Overall Cycle:** The average total time for an agent to complete a request (AGENT_COMPLETED) is approximately 7.3 seconds.

In [22]:
print("\n--- Slowest Tool Calls Analysis ---")
slow_tools_sql = f"""
SELECT
    JSON_VALUE(content, '$.tool') AS tool_name,
    ROUND(SAFE_CAST(JSON_VALUE(latency_ms, '$.total_ms') AS FLOAT64) / 1000, 2) AS latency_sec,
    timestamp,
    session_id,
    user_id
FROM `{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}`
WHERE event_type = 'TOOL_COMPLETED'
  AND agent = 'my_bq_agent'
  AND latency_ms IS NOT NULL
ORDER BY latency_sec DESC
LIMIT 10;
"""
try:
    df_slow = run_bq_query(slow_tools_sql)
    print(df_slow.to_markdown(index=False))
except Exception as e:
    print(f"Error: {e}")


--- Slowest Tool Calls Analysis ---
| tool_name         |   latency_sec | timestamp                        | session_id                           | user_id   |
|:------------------|--------------:|:---------------------------------|:-------------------------------------|:----------|
| ask_data_insights |         23.27 | 2026-02-09 03:51:51.396805+00:00 | aa1e2a17-a924-4ddd-b034-9999c085b9e8 | user7     |
| ask_data_insights |         17.57 | 2026-02-09 03:50:49.799422+00:00 | aa1e2a17-a924-4ddd-b034-9999c085b9e8 | user7     |
| ask_data_insights |         13.68 | 2026-02-09 03:39:18.573275+00:00 | aa1e2a17-a924-4ddd-b034-9999c085b9e8 | user7     |
| ask_data_insights |          6.44 | 2026-02-09 03:37:42.009659+00:00 | 9a706926-fc6b-4db3-a312-9001f1a4efdc | user4     |
| ask_data_insights |          6.1  | 2026-02-09 03:36:01.800984+00:00 | 9a706926-fc6b-4db3-a312-9001f1a4efdc | user4     |
| execute_sql       |          1.72 | 2026-02-20 00:37:45.268033+00:00 | a198c956-d7c2-4867-b23

In [None]:
# --- CA equivalent: Slowest Tool Calls ---
print("Asking CA: 'Which tool calls have the highest latency?'\n")
try:
    responses = ca_ask("Which tool calls have the highest latency?")
    display_ca_response(responses)
except Exception as e:
    print(f"CA error: {e}")

✨ **Insights Generated by Conversational Analytics** ✨

* **Heaviest Tool:** ask_data_insights is consistently the slowest tool, likely because it involves additional reasoning or summarizing steps compared to a direct SQL execution.
* **Session Focus:** User7's session remains the primary area for performance investigation, as it contains multiple ask_data_insights calls that took over 10 seconds.
* **SQL Efficiency:** SQL Execution calls are generally much faster, typically completing in under 2 seconds.

In [23]:
print("\n--- High-Latency User Questions Analysis ---")
slow_query_sql = f"""
SELECT
    (SELECT text FROM UNNEST(messages.content_parts) WHERE text IS NOT NULL LIMIT 1) AS user_question,
    ROUND(SAFE_CAST(JSON_VALUE(completions.latency_ms, '$.total_ms') AS FLOAT64) / 1000, 2) AS latency_sec,
    messages.timestamp,
    messages.session_id,
    messages.user_id
FROM
    `{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}` AS messages
JOIN
    `{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}` AS completions
    ON messages.invocation_id = completions.invocation_id
WHERE
    messages.event_type = 'USER_MESSAGE_RECEIVED'
    AND completions.event_type = 'AGENT_COMPLETED'
    AND messages.agent = 'my_bq_agent'
ORDER BY
    latency_sec DESC
LIMIT 10;
"""
try:
    df_slow_queries = run_bq_query(slow_query_sql)
    print(df_slow_queries.to_markdown(index=False))
except Exception as e:
    print(f"Error: {e}")


--- High-Latency User Questions Analysis ---
| user_question                                                                                                                                                             |   latency_sec | timestamp                        | session_id                           | user_id   |
|:--------------------------------------------------------------------------------------------------------------------------------------------------------------------------|--------------:|:---------------------------------|:-------------------------------------|:----------|
| calculate the tip rate, show me the trend                                                                                                                                 |         28.38 | 2026-02-09 03:51:24.870298+00:00 | aa1e2a17-a924-4ddd-b034-9999c085b9e8 | user7     |
| what about the trend in tips                                                                                                

In [None]:
# --- CA equivalent: High-Latency Questions ---
print("Asking CA: 'Which user questions took the longest to complete?'\n")
try:
    responses = ca_ask("Which user questions took the longest to complete?")
    display_ca_response(responses)
except Exception as e:
    print(f"CA error: {e}")

✨ **Insights Generated by Conversational Analytics** ✨

* **Complexity of Trends:** Queries asking for "trends" (e.g., "tip rate trend" or "health trend over the years") consistently appear as high-latency events. This is likely due to the agent having to perform multiple data retrievals and aggregation steps before responding.
* **Large Dataset Scans:** Questions that specify multiple years of data or specific datasets (like tlc_green_trips from 2013-2016) trigger intensive background processes that contribute to the ~22-28 second response times seen for user7.
* **Optimization Opportunity:** The agent spends significant time on "discovery" questions (e.g., "tell me what's in the tables"). Optimizing metadata caching for these specific intents could reduce initial response latency by several seconds.

### Combining Conversational Analytics with BigQuery ML AI Functions

BigQuery's built-in AI functions (`AI.DETECT_ANOMALIES`, `AI.CLASSIFY`, `AI.GENERATE`) provide powerful analysis capabilities. Below, we run these AI queries via manual SQL and ask the CA agent equivalent questions.

In [37]:
print("\n--- Latency Anomaly Detection ---")
anomaly_sql_0209 = f"""
WITH base_data AS (
  SELECT
    DATE_TRUNC(timestamp, MINUTE) AS event_minute,
    AVG(CAST(JSON_VALUE(latency_ms, '$.total_ms') AS FLOAT64)) AS avg_latency_ms
  FROM `{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}`
  WHERE agent = 'my_bq_agent'
    AND event_type = 'AGENT_COMPLETED'
    AND latency_ms IS NOT NULL
  GROUP BY 1
),
ordered_data AS (
  SELECT event_minute, avg_latency_ms,
         ROW_NUMBER() OVER (ORDER BY event_minute) AS rn,
         COUNT(*) OVER () AS total
  FROM base_data
),
historical_data AS (
  SELECT event_minute, avg_latency_ms FROM ordered_data WHERE rn <= CAST(total * 0.7 AS INT64)
),
target_data AS (
  SELECT event_minute, avg_latency_ms FROM ordered_data WHERE rn > CAST(total * 0.7 AS INT64)
)
SELECT
  time_series_timestamp,
  time_series_data,
  is_anomaly,
  lower_bound,
  upper_bound,
  anomaly_probability
FROM
  AI.DETECT_ANOMALIES(
    (SELECT * FROM historical_data),
    (SELECT * FROM target_data),
    data_col => 'avg_latency_ms',
    timestamp_col => 'event_minute'
  )
ORDER BY time_series_timestamp DESC;
"""
try:
    df_anomalies = run_bq_query(anomaly_sql_0209)
    print(df_anomalies.to_markdown(index=False))
except Exception as e:
  print(f"Error: {e}")


--- Latency Anomaly Detection (02/09 Only) ---
| time_series_timestamp     |   time_series_data | is_anomaly   |   lower_bound |   upper_bound |   anomaly_probability |
|:--------------------------|-------------------:|:-------------|--------------:|--------------:|----------------------:|
| 2026-02-09 03:53:00+00:00 |               5982 | False        |      -2135.43 |       48172   |              0.497746 |
| 2026-02-09 03:51:00+00:00 |              28379 | False        |      -3007.7  |       49576.5 |              0.61526  |
| 2026-02-09 03:50:00+00:00 |              24935 | False        |      -3259.99 |       50828.7 |              0.482176 |


In [None]:
# --- CA equivalent: Anomaly Detection ---
print("Asking CA: 'Detect latency anomalies'\n")
try:
    responses = ca_ask("Detect latency anomalies in agent completion events")
    display_ca_response(responses)
except Exception as e:
    print(f"CA error: {e}")

✨ **Insights Generated by Conversational Analytics** ✨
* **Statistical Normality:** Despite the average latency reaching 28.3 seconds at 03:51 AM, the model did not flag this as an anomaly (is_anomaly = false).
* **High Confidence Bounds:** The upper_bound for expected latency is very high (approx. 49 seconds). This indicates that because the agent handles varied and complex queries (like the multi-year taxi analysis we found earlier), a wide range of response times is considered "statistically normal" for this specific workload.
* **Probability:** The anomaly probability for the 03:51 AM spike was 0.61, which is elevated but below the default threshold (usually 0.95 or 0.99) required to trigger a formal anomaly flag.

Of course, you can always lower the anomaly threshold to be more sensitive to delays.

## Phase 4: Understand and Protect Your User

To run a successful agent, sometimes you want to understand your user better. BigQuery's AI feature can help transform raw log data into structured insights without needing external machine learning pipelines.


In [None]:
print("\n--- AI-Driven User Intent Analysis ---")
ai_user_analysis_sql = f"""
WITH user_messages AS (
  SELECT
    timestamp,
    user_id,
    JSON_VALUE(content, '$.text_summary') AS raw_message
  FROM `{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}`
  WHERE agent = 'my_bq_agent'
    AND event_type = 'USER_MESSAGE_RECEIVED'
    AND JSON_VALUE(content, '$.text_summary') IS NOT NULL
  ORDER BY timestamp DESC
  LIMIT 50
)
SELECT
  timestamp,
  user_id,
  raw_message,
  -- Generate a short summary of the user's question
  AI.GENERATE(
    CONCAT('Summarize this user request in 5 words or less: ', raw_message),
    connection_id => '{PROJECT_ID}.{CONNECTION_ID}',
    endpoint => 'gemini-2.5-flash'
  ).result AS ai_summary,
  -- Categorize the message into predefined intent buckets
  AI.CLASSIFY(
    raw_message,
    categories => ['Trend Analysis', 'Data Exploration', 'Location Service', 'Security', 'Other'],
    connection_id => '{PROJECT_ID}.{CONNECTION_ID}',
    endpoint => 'gemini-2.5-flash'
  ) AS ai_intent
FROM user_messages;
"""
try:
    df_ai_analysis = run_bq_query(ai_user_analysis_sql)
    print(df_ai_analysis.to_markdown(index=False))
except Exception as e:
    print(f"Error: {e}")

In [None]:
# --- CA equivalent: Intent Analysis ---
print("Asking CA: 'Classify user messages by intent'\n")
try:
    responses = ca_ask("Classify user messages by intent and show the distribution")
    display_ca_response(responses)
except Exception as e:
    print(f"CA error: {e}")

* **AI.CLASSIFY:** The AI.CLASSIFY function successfully distinguished between "Trend Analysis" (e.g., fare trends) and "Data Exploration" (e.g., fare amount), allowing you to see exactly which features of your agent are most popular.
* **AI.GENERATE:** The AI.GENERATE function condensed a long list of specific table names (from the 03:35 AM request) into a simple summary: "Analyze green trips, 2013-2016." This makes high-level reporting much cleaner.


In [None]:
print("\n--- AI Intent: Volume vs. Performance Analysis ---")
intent_perf_sql = f"""
WITH classified_intents AS (
  SELECT
    AI.CLASSIFY(
      input => (SELECT text FROM UNNEST(messages.content_parts) WHERE text IS NOT NULL LIMIT 1),
      categories => ['Trend Analysis', 'Data Exploration', 'Location Service', 'Security', 'Other'],
      connection_id => '{{PROJECT_ID}}.{{CONNECTION_ID}}',
      endpoint => 'gemini-2.5-flash'
    ) AS intent,
    SAFE_CAST(JSON_VALUE(completions.latency_ms, '$.total_ms') AS FLOAT64) / 1000 AS latency_sec
  FROM
    `{{PROJECT_ID}}.{{DATASET_ID}}.{{TABLE_ID}}` AS messages
  JOIN
    `{{PROJECT_ID}}.{{DATASET_ID}}.{{TABLE_ID}}` AS completions
    ON messages.invocation_id = completions.invocation_id
  WHERE
    messages.event_type = 'USER_MESSAGE_RECEIVED'
    AND completions.event_type = 'AGENT_COMPLETED'
    AND messages.agent = 'my_bq_agent'
)
SELECT
  intent,
  COUNT(*) AS intent_volume,
  ROUND(AVG(latency_sec), 2) AS avg_latency_sec
FROM
  classified_intents
GROUP BY
  intent
ORDER BY
  intent_volume DESC;
""".format(PROJECT_ID=PROJECT_ID, DATASET_ID=DATASET_ID, TABLE_ID=TABLE_ID, CONNECTION_ID=CONNECTION_ID)
try:
    df_intent_perf = run_bq_query(intent_perf_sql)
    print(df_intent_perf.to_markdown(index=False))
except Exception as e:
    print(f"Error: {e}")

In [None]:
# --- CA equivalent: Intent Volume vs Latency ---
print("Asking CA: 'Show intent volume vs average latency'\n")
try:
    responses = ca_ask("Show intent volume vs average latency")
    display_ca_response(responses)
except Exception as e:
    print(f"CA error: {e}")

✨ **Insights Generated by Conversational Analytics** ✨

* **Complexity and Latency:** Trend Analysis has the highest average latency at 17.14 seconds, despite a relatively low volume (7 occurrences). This confirms that analytical trend-based queries are the most computationally intensive for the agent.
* **Primary Use Case:** Data Exploration is by far the most frequent intent (29 occurrences), with a moderate average latency of 6.68 seconds. This suggests that users primarily use the agent for initial data discovery and inspection.
* **Lightweight Intents:** Location Service and Other (General Chat) are significantly faster, with latencies around 3-4 seconds.

Based on this sample, Data Exploration and Trend Analysis are the most common reasons users interact with your agent, suggesting you should prioritize optimizing the performance for those specific tasks.

You want to protect your customer from data leak and improve PII detection. So let's take a look at one interesting user whose question is security related.

In [None]:
print("\n--- Security-Related User Questions ---")
security_questions_sql = f"""
WITH classified_messages AS (
  SELECT
    event_logging.user_id,
    (SELECT text FROM UNNEST(event_logging.content_parts) WHERE text IS NOT NULL LIMIT 1) AS user_message,
    event_logging.timestamp,
    AI.CLASSIFY(
      input => (SELECT text FROM UNNEST(event_logging.content_parts) WHERE text IS NOT NULL LIMIT 1),
      categories => ['Trend Analysis', 'Data Exploration', 'Location Service', 'Security', 'Other'],
      connection_id => '{PROJECT_ID}.{CONNECTION_ID}',
      endpoint => 'gemini-2.5-flash'
    ) AS intent
  FROM `{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}` AS event_logging
  WHERE
    event_logging.event_type = 'USER_MESSAGE_RECEIVED'
    AND event_logging.agent = 'my_bq_agent'
)
SELECT
  user_id,
  user_message,
  timestamp,
  intent
FROM
  classified_messages
WHERE
  intent = 'Security'
ORDER BY
  timestamp DESC;

"""
try:
    df_sec_questions = run_bq_query(security_questions_sql)
    print(df_sec_questions.to_markdown(index=False))
except Exception as e:
    print(f"Error: {e}")

In [None]:
# --- CA equivalent: Security Questions ---
print("Asking CA: 'Find users asking security-related questions'\n")
try:
    responses = ca_ask("Find users asking security-related questions")
    display_ca_response(responses)
except Exception as e:
    print(f"CA error: {e}")

✨ **Insights Generated by Conversational Analytics** ✨

* **Specific Interest:** Only user9 has been identified as having asked a security-related question, which was about identifying sensitive or personal data in a specific project.
* **Risk Assessment:** This kind of question could indicate that the user is trying to identify potential security vulnerabilities or data privacy risks within the available datasets.
* **Agent Responsibility:** It is important to monitor these types of queries to ensure that the agent does not inadvertently expose sensitive data or provide information that could lead to a security breach.

The **AI.CLASSIFY** feature successfully flagged this as a Security intent, which allows you to filter for such risks even if users don't use obvious keywords in the future.

Now let's review our agent's response, make sure no PII is leaked.

In [None]:
print("\n--- Security Audit: PII Detection in Agent Responses ---")
agent_pii_audit_sql = f"""
WITH pii_extraction AS (
  SELECT
    JSON_VALUE(event_logging.content, '$.response') AS agent_response,
    AI.GENERATE(
      prompt => CONCAT(
        'Extract any PII from the following text and return it as a JSON object with keys "name", "email", and "phone". ',
        'If a field is not found, use the string "None" as the value. Text: ',
        JSON_VALUE(event_logging.content, '$.response')
      ),
      connection_id => '{{PROJECT_ID}}.{{CONNECTION_ID}}',
      endpoint => 'gemini-2.5-flash'
    ).result AS extracted_pii_json,
    event_logging.timestamp,
    event_logging.user_id,
    event_logging.session_id
  FROM
    `{{PROJECT_ID}}.{{DATASET_ID}}.{{TABLE_ID}}` AS event_logging
  WHERE
    event_logging.event_type = 'LLM_RESPONSE'
    AND JSON_VALUE(event_logging.content, '$.response') IS NOT NULL
),
pii_fields AS (
  SELECT
    agent_response,
    JSON_VALUE(extracted_pii_json, '$.name') AS pii_name,
    JSON_VALUE(extracted_pii_json, '$.email') AS pii_email,
    JSON_VALUE(extracted_pii_json, '$.phone') AS pii_phone,
    timestamp,
    user_id,
    session_id
  FROM
    pii_extraction
)
SELECT
  *
FROM
  pii_fields
WHERE
  (pii_name IS NOT NULL AND LOWER(pii_name) != 'none')
  OR (pii_email IS NOT NULL AND LOWER(pii_email) != 'none')
  OR (pii_phone IS NOT NULL AND LOWER(pii_phone) != 'none')
ORDER BY
  timestamp DESC;
""".format(PROJECT_ID=PROJECT_ID, DATASET_ID=DATASET_ID, TABLE_ID=TABLE_ID, CONNECTION_ID=CONNECTION_ID)

try:
    df_pii = run_bq_query(agent_pii_audit_sql)
    if df_pii.empty:
        print("No rows containing potential PII were found.")
    else:
        print(df_pii.to_markdown(index=False))
except Exception as e:
    print(f"Error: {{e}}")

In [None]:
# --- CA equivalent: PII Detection ---
print("Asking CA: 'Check agent responses for PII'\n")
try:
    responses = ca_ask("Check agent responses for PII like names, emails, and phone numbers")
    display_ca_response(responses)
except Exception as e:
    print(f"CA error: {e}")

## Phase 5: The Complete Picture

We've demonstrated the full closed-loop integration:

1. **Agent runs** — `my_bq_agent` processes user requests using `BigQueryToolset` and `Gemini`
2. **BQ AA Plugin auto-logs** — Every event (user message, LLM call, tool execution) is automatically streamed to BigQuery
3. **Manual SQL analysis** — Traditional queries for full control over performance, error, and security analysis
4. **CA Data Agent** — The same table is instantly queryable via natural language, with automatic insights
5. **AI-powered analysis** — BigQuery ML functions (`AI.CLASSIFY`, `AI.GENERATE`, `AI.DETECT_ANOMALIES`) enrich both approaches

Let's ask a few more questions to explore the agent's operational health:

In [None]:
# --- Bonus: Open-ended NL questions to the CA Data Agent ---

bonus_questions = [
    "What was the busiest hour for my agent?",
    "Compare average latency between different users",
    "Summarize the overall health of my agent in the last 24 hours",
]

for q in bonus_questions:
    print(f"\n{'='*80}")
    print(f"Question: {q}")
    print(f"{'='*80}")
    try:
        responses = ca_ask(q)
        display_ca_response(responses)
    except Exception as e:
        print(f"Error: {e}")

## Cleanup

Delete the CA Data Agent and conversation to avoid leaving resources behind.

In [None]:
# --- Clean up CA resources ---

# Delete conversation
try:
    data_chat_client.delete_conversation(
        request=geminidataanalytics.DeleteConversationRequest(
            name=data_chat_client.conversation_path(PROJECT_ID, CA_LOCATION, CA_CONVERSATION_ID)
        )
    )
    print(f"Conversation '{CA_CONVERSATION_ID}' deleted.")
except Exception as e:
    print(f"Could not delete conversation: {e}")

# Delete Data Agent
try:
    data_agent_client.delete_data_agent_sync(
        request=geminidataanalytics.DeleteDataAgentRequest(
            name=data_agent_client.data_agent_path(PROJECT_ID, CA_LOCATION, CA_AGENT_ID)
        )
    )
    print(f"CA Data Agent '{CA_AGENT_ID}' deleted.")
except Exception as e:
    print(f"Could not delete agent: {e}")

print("\nCleanup complete.")