# Snowflake Agent Testing at Scale

This notebook tests your agent by looping through sample questions and capturing responses using the Snowflake REST API.

**In this sample notebook, the agent details are as follows:**
- Agent Name: SERVICENOW_ANALYTICS_AGENT
- Database: SNOWFLAKE_INTELLIGENCE
- Schema: AGENTS

**Note:** Agents must be called via the REST API endpoint: `/api/v2/databases/{DB}/schemas/{SCHEMA}/agents/{AGENT}:run`


## Setup and Imports


In [None]:
# Import required libraries
import _snowflake
import snowflake.snowpark as snowpark
from snowflake.snowpark import Session
from snowflake.snowpark.context import get_active_session
import pandas as pd
import json
from datetime import datetime

# Get the current session (automatically available in Snowflake notebooks)
session = get_active_session()

# Agent configuration
AGENT_DATABASE = "SNOWFLAKE_INTELLIGENCE"
AGENT_SCHEMA = "AGENTS"
AGENT_NAME = "SERVICENOW_ANALYTICS_AGENT" #update with your agent name

print(f"✓ Session established")
print(f"Agent: {AGENT_DATABASE}.{AGENT_SCHEMA}.{AGENT_NAME}")


## Define Sample Questions

Create a list of test questions to send to the agent.


In [None]:
# Sample questions for testing
sample_questions = [
    "What are the top 5 incident categories by count?",
    "Show me the average resolution time for high priority incidents",
    "How many incidents were opened last month?",
    "What is the current backlog of unresolved incidents?",
    "Which teams have the highest incident volume?"
]

print(f"Total test questions: {len(sample_questions)}")
for i, q in enumerate(sample_questions, 1):
    print(f"{i}. {q}")


## Call Agent Using REST API

Snowflake agents are called via the REST API using `_snowflake.send_snow_api_request()`.
The endpoint format is: `/api/v2/databases/{DB}/schemas/{SCHEMA}/agents/{AGENT}:run`


In [None]:
# Function to send a message to the agent
def send_agent_message(prompt: str) -> dict:
    """Calls the Agent REST API and returns the response."""
    
    # Construct the API endpoint
    endpoint = f"/api/v2/databases/{AGENT_DATABASE}/schemas/{AGENT_SCHEMA}/agents/{AGENT_NAME}:run"
    
    # Prepare the request body
    request_body = {
        "messages": [
            {
                "role": "user",
                "content": [
                    {
                        "type": "text",
                        "text": prompt
                    }
                ]
            }
        ]
    }
    
    # Make the API call
    resp = _snowflake.send_snow_api_request(
        "POST",
        endpoint,
        {},      # path_params
        {},      # query_params
        request_body,
        {},      # headers
        30000    # timeout in ms
    )
    
    if resp["status"] < 400:
        return json.loads(resp["content"])
    else:
        raise Exception(
            f"Failed request with status {resp['status']}: {resp}"
        )

# Test the agent with a sample question
test_question = sample_questions[0]

try:
    print(f"Testing agent with question: {test_question}\n")
    response = send_agent_message(test_question)
    
    print("✓ Successfully called agent via REST API")
    print(f"\nResponse structure:")
    print(json.dumps(response, indent=2))
    
except Exception as e:
    print(f"✗ Error calling agent: {str(e)}")


## Understanding Agent Response Format

The agent returns a structured response with message content that may include text, SQL, suggestions, and more.


In [None]:
# Function to extract text response from agent streaming events
def extract_agent_response(response_events: list) -> str:
    """
    Extracts the text response from the agent API streaming response.
    
    The agent returns an array of streaming events. The final consolidated
    response is in the event where event == 'response'.
    
    Args:
        response_events: List of streaming event dictionaries from the agent
        
    Returns:
        Formatted string with the agent's response
    """
    try:
        # Response events is a list - find the final 'response' event
        final_response = None
        for event in response_events:
            if event.get('event') == 'response':
                final_response = event.get('data', {})
                break
        
        if not final_response:
            return f"No final response found in events. Got {len(response_events)} events."
        
        # Extract content from the final response
        content = final_response.get("content", [])
        response_parts = []
        
        for item in content:
            item_type = item.get("type")
            
            if item_type == "text":
                response_parts.append(item.get("text", ""))
                
            elif item_type == "thinking":
                thinking_text = item.get("thinking", {}).get("text", "")
                if thinking_text:
                    response_parts.append(f"[Thinking: {thinking_text}]")
                    
            elif item_type == "tool_use":
                tool_name = item.get("tool_use", {}).get("name", "unknown")
                tool_type = item.get("tool_use", {}).get("type", "unknown")
                response_parts.append(f"[Used tool: {tool_name} ({tool_type})]")
                
            elif item_type == "tool_result":
                # Extract SQL and data from tool results
                tool_result = item.get("tool_result", {})
                tool_content = tool_result.get("content", [])
                
                for tc in tool_content:
                    if tc.get("type") == "json":
                        json_data = tc.get("json", {})
                        if "sql" in json_data:
                            response_parts.append(f"\n[SQL Query]\n{json_data['sql']}")
                        if "text" in json_data:
                            response_parts.append(json_data["text"])
                            
            elif item_type == "chart":
                response_parts.append("[Chart generated - see visualization]")
        
        return "\n\n".join(response_parts) if response_parts else "No response content found"
    
    except Exception as e:
        return f"Error parsing response: {str(e)}\nResponse type: {type(response_events)}"

# Test extraction with the previous response
test_question = sample_questions[0]

try:
    response = send_agent_message(test_question)
    extracted = extract_agent_response(response)
    
    print(f"Question: {test_question}\n")
    print(f"Extracted Response:")
    print("="*80)
    print(extracted)
    print("="*80)
    
    # Also show number of events received
    print(f"\nReceived {len(response)} streaming events")
    
except Exception as e:
    print(f"Error: {str(e)}")


## Quick Test: Final Result Only

Test the agent and see only the final clean result without verbose logging.


In [None]:
# Quick test - show only the final result
test_question = sample_questions[0]

print(f"Question: {test_question}")
print("="*80)

try:
    # Call the agent
    response = send_agent_message(test_question)
    
    # Extract and display only the final text answer
    extracted = extract_agent_response(response)
    print(extracted)
    
except Exception as e:
    print(f"Error: {str(e)}")


## Understanding Streaming Response Events

The agent API returns an array of streaming events that show the agent's progress:
- `response.thinking.delta` - Agent's reasoning process (streamed)
- `response.tool_use` - Tools the agent is using
- `response.tool_result` - Results from tool execution
- `response.text.delta` - Response text (streamed)
- `response.chart` - Chart specifications (if generated)
- `response` - Final consolidated response
- `done` - End of stream


In [None]:
# Helper function to analyze streaming events
def analyze_streaming_events(events: list) -> dict:
    """
    Analyze the streaming events to understand agent behavior.
    
    Args:
        events: List of event dictionaries from agent response
        
    Returns:
        Dictionary with event statistics and extracted information
    """
    event_types = {}
    tools_used = []
    text_parts = []
    sql_queries = []
    charts = []
    
    for event in events:
        event_type = event.get('event', 'unknown')
        event_types[event_type] = event_types.get(event_type, 0) + 1
        
        data = event.get('data', {})
        
        # Track tools used
        if event_type == 'response.tool_use':
            tool_name = data.get('name', 'unknown')
            tool_type = data.get('type', 'unknown')
            tools_used.append(f"{tool_name} ({tool_type})")
        
        # Extract text deltas
        if event_type == 'response.text.delta':
            text_parts.append(data.get('text', ''))
        
        # Extract SQL from tool results
        if event_type == 'response.tool_result':
            content = data.get('content', [])
            for item in content:
                if item.get('type') == 'json':
                    json_data = item.get('json', {})
                    if 'sql' in json_data:
                        sql_queries.append(json_data['sql'])
        
        # Track charts
        if event_type == 'response.chart':
            charts.append(data.get('chart_spec', ''))
    
    return {
        'total_events': len(events),
        'event_types': event_types,
        'tools_used': tools_used,
        'full_text': ''.join(text_parts),
        'sql_queries': sql_queries,
        'charts_generated': len(charts)
    }

# Analyze a test response
test_question = sample_questions[0]

try:
    print(f"Question: {test_question}\n")
    response = send_agent_message(test_question)
    
    # Analyze the streaming events
    analysis = analyze_streaming_events(response)
    
    print("Event Analysis:")
    print("="*80)
    print(f"Total Events: {analysis['total_events']}")
    print(f"\nEvent Type Counts:")
    for event_type, count in sorted(analysis['event_types'].items()):
        print(f"  {event_type}: {count}")
    
    print(f"\nTools Used: {', '.join(analysis['tools_used']) if analysis['tools_used'] else 'None'}")
    print(f"SQL Queries Generated: {len(analysis['sql_queries'])}")
    print(f"Charts Generated: {analysis['charts_generated']}")
    
    if analysis['full_text']:
        print(f"\nFull Response Text:")
        print("-"*80)
        print(analysis['full_text'])
    
    if analysis['sql_queries']:
        print(f"\nSQL Queries:")
        print("-"*80)
        for i, sql in enumerate(analysis['sql_queries'], 1):
            print(f"\nQuery {i}:")
            print(sql[:500] + "..." if len(sql) > 500 else sql)
    
except Exception as e:
    print(f"Error: {str(e)}")


## Batch Testing: Loop Through All Questions

Now let's loop through all sample questions and capture responses with full metadata.


In [None]:
# Function to call agent and capture response with full metadata
def call_agent(question):
    """
    Call the agent with a question via REST API and return the response.
    
    Args:
        question: The question to ask the agent
    
    Returns:
        dict with question, response, raw response, and metadata
    """
    start_time = datetime.now()
    
    try:
        # Call the agent via REST API
        raw_response = send_agent_message(question)
        
        # Extract readable response
        extracted_response = extract_agent_response(raw_response)
        
        end_time = datetime.now()
        duration = (end_time - start_time).total_seconds()
        
        return {
            "question": question,
            "response": extracted_response,
            "raw_response_json": json.dumps(raw_response, default=str),  # Convert to JSON string
            "num_events": len(raw_response) if isinstance(raw_response, list) else 0,
            "status": "success",
            "duration_seconds": duration,
            "timestamp": start_time.isoformat(),
            "error": None
        }
    
    except Exception as e:
        end_time = datetime.now()
        duration = (end_time - start_time).total_seconds()
        
        return {
            "question": question,
            "response": None,
            "raw_response_json": None,
            "num_events": 0,
            "status": "error",
            "duration_seconds": duration,
            "timestamp": start_time.isoformat(),
            "error": str(e)
        }

print("✓ Agent calling function defined successfully")


In [None]:
# Run batch testing
print(f"Starting batch test with {len(sample_questions)} questions...\n")
print("="*80)

results = []

for i, question in enumerate(sample_questions, 1):
    print(f"\n[{i}/{len(sample_questions)}] Testing question: {question}")
    
    # Call the agent
    result = call_agent(question)
    results.append(result)
    
    if result["status"] == "success":
        print(f"✓ Success ({result['duration_seconds']:.2f}s)")
        response_str = str(result['response'])
        preview = response_str[:200] + "..." if len(response_str) > 200 else response_str
        print(f"Response preview: {preview}")
    else:
        print(f"✗ Error: {result['error']}")
    
    print("-"*80)

print(f"\n\nBatch testing complete!")
print(f"Total questions: {len(results)}")
print(f"Successful: {sum(1 for r in results if r['status'] == 'success')}")
print(f"Failed: {sum(1 for r in results if r['status'] == 'error')}")


## Analyze Results

Convert results to a DataFrame for easier analysis.


In [None]:
# Convert results to pandas DataFrame for analysis
results_df = pd.DataFrame(results)

print("Results Summary:")
print("="*80)

# Display key columns (exclude raw_response_json as it's very large)
display_cols = ['question', 'status', 'duration_seconds', 'num_events']
print(results_df[display_cols].to_string(index=False))

print("\n" + "="*80)
print(f"Note: Full raw responses stored in 'raw_response_json' field")
print(f"      Use json.loads(results_df.iloc[i]['raw_response_json']) to access")

# Display DataFrame without the JSON column for readability
results_df[['question', 'response', 'status', 'duration_seconds', 'num_events', 'timestamp', 'error']]


### Accessing Raw Response Data

If you need to access the full raw streaming events for a specific result:


In [None]:
# Example: Access raw response for the first successful result
if len(results_df) > 0 and results_df.iloc[0]['raw_response_json']:
    # Parse the JSON string back to Python object
    first_raw_response = json.loads(results_df.iloc[0]['raw_response_json'])
    
    print(f"First result has {len(first_raw_response)} streaming events")
    print(f"\nEvent types in first result:")
    
    event_types = {}
    for event in first_raw_response:
        event_type = event.get('event', 'unknown')
        event_types[event_type] = event_types.get(event_type, 0) + 1
    
    for event_type, count in sorted(event_types.items()):
        print(f"  {event_type}: {count}")
    
    # Uncomment to see full raw response structure
    # print("\nFull raw response:")
    # print(json.dumps(first_raw_response, indent=2, default=str)[:1000] + "...")
else:
    print("No results available yet")


In [None]:
# Calculate statistics
successful_results = results_df[results_df['status'] == 'success']

if len(successful_results) > 0:
    print("Performance Statistics:")
    print("="*80)
    print(f"Total Queries: {len(results_df)}")
    print(f"Successful: {len(successful_results)}")
    print(f"Failed: {len(results_df) - len(successful_results)}")
    print(f"Success Rate: {len(successful_results)/len(results_df)*100:.1f}%")
    print(f"\nResponse Time Statistics:")
    print(f"Average: {successful_results['duration_seconds'].mean():.2f}s")
    print(f"Median: {successful_results['duration_seconds'].median():.2f}s")
    print(f"Min: {successful_results['duration_seconds'].min():.2f}s")
    print(f"Max: {successful_results['duration_seconds'].max():.2f}s")
else:
    print("No successful results to analyze.")
    print("\nPlease check the error messages above and adjust the agent calling method.")


## Save Results to Snowflake Table

Optionally save the results to a Snowflake table for later analysis.


In [None]:
# Save results to a Snowflake table
# Uncomment and modify the database/schema as needed

#target_table = "YOUR_DB.YOUR_SCHEMA.AGENT_TEST_RESULTS"

#try:
   # Convert results to Snowpark DataFrame
    #results_snowpark_df = session.create_dataframe(results)
    
   # Write to table
    #results_snowpark_df.write.mode("append").save_as_table(target_table)
     
    #print(f"✓ Results saved to {target_table}")
#except Exception as e:
    #print(f"✗ Error saving results: {str(e)}")

print("To save results, uncomment the code above and set your target table name.")
