<center>
    <p style="text-align:center">
    <img alt="arize logo" src="https://storage.googleapis.com/arize-assets/arize-logo-white.jpg" width="300"/>
        <br>
        <a href="https://docs.arize.com/arize/">Docs</a>
        |
        <a href="https://github.com/Arize-ai/client_python">GitHub</a>
        |
        <a href="https://arize-ai.slack.com/join/shared_invite/zt-11t1vbu4x-xkBIHmOREQnYnYDH1GDfCg">Slack Community</a>
    </p>
</center>

# Session-Level Evaluations

This notebook demonstrates how to evaluate the effectiveness of AI agent interactions at the session level, where a session consists of multiple traces (individual interactions) between a user and the system.

## Conceptual Overview

Session-level evaluations assess:
- Coherence across multiple interactions
- Context retention between interactions
- Overall goal achievement across an entire conversation
- Appropriate progression through complex multi-step tasks

## Setup

Configure your environment variables and import dependencies. You'll need to set up your Arize API key and import necessary libraries for data processing and evaluation.

In [None]:
%pip install arize arize-phoenix pandas openai nest_asyncio getpass arize-phoenix-evals

In [None]:
from phoenix.evals import (
    llm_classify,
    OpenAIModel # see https://docs.arize.com/phoenix/evaluation/evaluation-models
    # for a full list of supported models
)

import os
import pandas as pd

from datetime import datetime, timedelta

from arize.exporter import ArizeExportClient
from arize.utils.types import Environments

In [None]:
from getpass import getpass

api_key = getpass('Enter your Arize API key: ')
os.environ['ARIZE_API_KEY'] = api_key

space_id = getpass('Enter your Arize Space ID: ')
os.environ['ARIZE_SPACE_ID'] = space_id

model_id = getpass('Enter your Arize Space ID(Project Name): ')
os.environ['ARIZE_MODEL_ID'] = model_id

openai_key = getpass('Enter your OpenAI API key: ')
os.environ['OPENAI_API_KEY'] = openai_key

   ## Data Extraction
   
   Pull trace data from Arize and prepare it for analysis.
   
   > **Note**: Modify the space_id, model_id, and date range to match your deployment.

In [None]:
client = ArizeExportClient(api_key=os.environ['ARIZE_API_KEY'])

print('#### Exporting your primary dataset into a dataframe.')

primary_df = client.export_model_to_df(
    space_id=os.environ['ARIZE_SPACE_ID'],
    model_id=os.environ['ARIZE_MODEL_ID'],
    environment=Environments.TRACING,
    start_time=datetime.now() - timedelta(days=7),
    end_time=datetime.now(),
    # Optionally specify columns to improve query performance
    # columns=['context.span_id', 'attributes.llm.input']
)

## Evaluation Prompt Design

The evaluation uses a carefully designed prompt template that instructs the LLM how to evaluate session-level effectiveness and coherence. You can customize this template to fit your specific evaluation criteria.

The session evaluation prompt focuses on:

- Coherence assessment: Does the agent maintain a consistent understanding across interactions?
- Context utilization: Does the agent effectively use information from previous interactions?
- Goal progression: Does the conversation move logically toward resolving the user's needs?
- Response appropriateness: Are the agent's responses suitable given the conversation history?

The evaluation looks at overall conversation quality and effectiveness throughout the session.



### Prompt Variables

| Variable | Description | Source |
|----------|-------------|--------|
| `{session_user_inputs}` | The user inputs across all traces in the session | Extracted from trace data |
| `{session_output_messages}` | The AI's responses across all traces in the session | Extracted from trace data |

### Customizing the Prompt

You may want to adjust the evaluation criteria or output format based on your specific use case:

- Add domain-specific criteria relevant to your agent's purpose
- Modify success criteria based on your application's goals
- Include additional session metadata as context

In [4]:
# Define the session level evaluation prompt
SESSION_CORRECTNESS_PROMPT = """
You are a helpful AI bot that evaluates the effectiveness and correctness of an AI agent's session.

A session consists of multiple traces (interactions) between a user and an AI system. I will provide you with:
1. The user inputs that initiated each trace in the session, in chronological order
2. The AI's output messages for each trace in the session, in chronological order
3. The total number of traces in this session

An effective and correct session:
- Shows consistent understanding of user intentions across traces
- Maintains context and coherence between interactions
- Successfully achieves the overall user goals
- Builds upon previous interactions in the conversation
- Avoids unnecessary repetition or confusion

##

User Inputs:
{user_inputs}

Output Messages:
{output_messages}

##

Evaluate the session based on the given criteria:
- Assess whether the agent maintains coherence throughout the session
- Analyze if the session progresses logically toward resolving user requests
- Check if the agent effectively uses context from previous interactions

Your response must be a single string, either `correct` or `incorrect`, and must not include any additional text.

- Respond with `correct` if the session effectively accomplishes user goals with appropriate responses and coherence.
- Respond with `incorrect` if the session shows confusion, inappropriate responses, or fails to accomplish user goals.
"""


## Data Preparation

These functions filter and transform session data into the format needed for evaluation.

**Core concepts:**
- **Session identification**: Finding complete user sessions to evaluate
- **Trace ordering**: Arranging traces chronologically within sessions
- **Message extraction**: Gathering user inputs and system responses across the session

The `filter_sessions_by_trace_criteria` function is particularly important as it allows you to:
1. Select relevant sessions that contain traces matching your criteria
2. Retrieve the complete session context for evaluation

This approach ensures we evaluate the full conversation flow rather than isolated interactions.

In [5]:
from typing import Dict, Any

def filter_sessions_by_trace_criteria(
    df: pd.DataFrame,
    trace_filters: Dict[str, Dict[str, Any]] = {},
    span_filters: Dict[str, Dict[str, Any]] = {},
) -> pd.DataFrame:
    """Filter to find sessions that contain traces meeting the specified criteria.

    Args:
        df: DataFrame with trace data
        trace_filters: Dictionary of column names and filtering criteria for traces
                      Format: {"column_name": {"operator": value}}
                      Supported operators: ">=", "<=", "==", "!=", "contains", "notna", "isna"
        span_filters: Dictionary of column names and filtering criteria for spans
                     Format: {"column_name": {"operator": value}}
                     Same supported operators as trace_filters

    Returns:
        DataFrame with all spans from sessions that contain at least one matching trace
    """
    # Make a copy of the dataframe
    df_copy = df.copy()

    # Apply trace-level filters
    filtered_df = df_copy.copy()
    for column, criteria in trace_filters.items():
        if column not in filtered_df.columns:
            print(f"Warning: Column '{column}' not found in dataframe")
            continue

        for operator, value in criteria.items():
            filtered_df = _apply_filter(filtered_df, column, operator, value)

    # Apply span-level filters
    for column, criteria in span_filters.items():
        if column not in filtered_df.columns:
            print(f"Warning: Column '{column}' not found in dataframe")
            continue

        for operator, value in criteria.items():
            filtered_df = _apply_filter(filtered_df, column, operator, value)

    # Get the session IDs that contain matching traces
    if "attributes.session.id" not in filtered_df.columns:
        print("Warning: 'attributes.session.id' column not found in dataframe")
        return filtered_df

    matching_session_ids = set(filtered_df["attributes.session.id"].unique())
    print(
        f"Found {len(matching_session_ids)} sessions containing traces that match criteria"
    )

    if not matching_session_ids:
        print("No matching sessions found")
        return pd.DataFrame()

    # Return ALL spans from matching sessions (not just the filtered ones)
    # This ensures we have complete sessions for evaluation
    result_df = df[df["attributes.session.id"].isin(matching_session_ids)].copy()

    # Get counts for reporting
    session_count = len(matching_session_ids)
    trace_count = len(result_df["context.trace_id"].unique())
    span_count = len(result_df)

    print(
        f"Final result: {span_count} spans from {trace_count} traces in {session_count} sessions"
    )
    return result_df


def _apply_filter(df, column, operator, value):
    """Helper function to apply a single filter operation"""
    if operator == ">=":
        return df[df[column] >= value]
    elif operator == "<=":
        return df[df[column] <= value]
    elif operator == "==":
        # Special handling for None/null comparison
        if value is None:
            return df[df[column].isnull()]
        else:
            return df[df[column] == value]
    elif operator == "!=":
        # Special handling for None/null comparison
        if value is None:
            return df[~df[column].isnull()]
        else:
            return df[df[column] != value]
    elif operator == "contains":
        return df[df[column].str.contains(value, case=False, na=False)]
    elif operator == "isna":
        return df[df[column].isna()]
    elif operator == "notna":
        return df[df[column].notna()]
    else:
        print(f"Warning: Unsupported operator '{operator}' - skipping")
        return df

In [6]:
def prepare_session_data_for_evaluation(
    df,
    extract_cols={
        "output_messages": "attributes.llm.output_messages",
        "user_input": "attributes.input.value",
    },
    filter_empty=True,
    max_chars_per_value=5000,  # Maximum characters per individual value
    max_chars_per_session=100000,  # Maximum characters per entire session
    truncation_strategy="end",  # "start", "end", or "middle"
):
    """
    Prepare session data for evaluation by grouping by session_id and organizing trace data chronologically.
    Includes truncation to avoid context limits.

    Args:
        df: DataFrame containing trace data
        extract_cols: Dict mapping {output_key: source_column} to extract from each row
        filter_empty: Whether to filter out empty values (default: True)
        max_chars_per_value: Maximum characters per individual text value
        max_chars_per_session: Maximum characters for entire session data
        truncation_strategy: How to truncate ("start", "end", or "middle")

    Returns:
        DataFrame with processed session data ready for evaluation
    """

    def truncate_text(text, max_chars, strategy="end"):
        """Truncate text based on strategy"""
        if not text or len(text) <= max_chars:
            return text

        if strategy == "start":
            return "..." + text[-(max_chars-3):]
        elif strategy == "middle":
            half = (max_chars - 3) // 2
            return text[:half] + "..." + text[-half:]
        else:  # "end"
            return text[:max_chars-3] + "..."

    def estimate_session_size(session_dict):
        """Estimate total character count for a session"""
        total_chars = 0
        for key, value in session_dict.items():
            if isinstance(value, list):
                for item in value:
                    if isinstance(item, dict):
                        for sub_item in item.values():
                            if isinstance(sub_item, list):
                                for text in sub_item:
                                    if isinstance(text, str):
                                        total_chars += len(text)
                            elif isinstance(sub_item, str):
                                total_chars += len(sub_item)
            elif isinstance(value, str):
                total_chars += len(value)
        return total_chars

    # Ensure required columns exist
    required_cols = ["attributes.session.id", "context.trace_id", "start_time"]
    for col in required_cols:
        if col not in df.columns:
            print(f"Error: Required column '{col}' not found in dataframe")
            return pd.DataFrame()

    # Group by session_id
    session_groups = df.groupby("attributes.session.id")

    # Prepare results list
    results = []

    for session_id, session_data in session_groups:
        # Initialize a dict to store session data
        session_dict = {"attributes.session.id": session_id}

        # Count traces in this session
        trace_ids = session_data["context.trace_id"].unique()
        session_dict["trace_count"] = len(trace_ids)

        # Order traces chronologically based on the start_time of their first span
        trace_start_times = {}
        for trace_id in trace_ids:
            trace_data = session_data[session_data["context.trace_id"] == trace_id]
            trace_start_times[trace_id] = trace_data["start_time"].min()

        # Sort trace IDs by their start times
        ordered_trace_ids = sorted(
            trace_start_times.keys(), key=lambda x: trace_start_times[x]
        )

        # Extract data for each column type across all traces in chronological order
        for output_key, source_col in extract_cols.items():
            all_trace_data = []

            for trace_id in ordered_trace_ids:
                trace_data = session_data[session_data["context.trace_id"] == trace_id]
                # Sort spans within trace by start_time
                trace_data = trace_data.sort_values("start_time")

                # Aggregate values for this trace
                trace_values = []
                for _, row in trace_data.iterrows():
                    value = row.get(source_col)
                    if not filter_empty or (value is not None and value):
                        # Truncate individual values
                        if isinstance(value, str):
                            value = truncate_text(value, max_chars_per_value, truncation_strategy)
                        trace_values.append(value)

                if trace_values:
                    # Use the trace index in the ordered list as the key
                    trace_index = ordered_trace_ids.index(trace_id) + 1
                    all_trace_data.append({str(trace_index): trace_values})

            # Use the output_key directly without adding "session_" prefix
            session_dict[output_key] = all_trace_data

        # Check if session exceeds max size and truncate if necessary
        current_size = estimate_session_size(session_dict)
        if current_size > max_chars_per_session:
            print(f"Warning: Session {session_id} exceeds max size ({current_size} chars). Truncating...")

            # Truncate by reducing the number of traces if we have many
            if len(ordered_trace_ids) > 10:  # If more than 10 traces, keep first 5 and last 5
                kept_traces = ordered_trace_ids[:5] + ordered_trace_ids[-5:]
                for output_key in extract_cols.keys():
                    if output_key in session_dict:
                        # Filter to keep only the selected traces
                        filtered_data = []
                        for trace_data in session_dict[output_key]:
                            trace_idx = list(trace_data.keys())[0]
                            if int(trace_idx) <= 5 or int(trace_idx) > len(ordered_trace_ids) - 5:
                                filtered_data.append(trace_data)
                        session_dict[output_key] = filtered_data

                # Update trace count
                session_dict["trace_count"] = len(kept_traces)

            # If still too large, truncate individual values more aggressively
            current_size = estimate_session_size(session_dict)
            if current_size > max_chars_per_session:
                aggressive_limit = max_chars_per_value // 2
                for output_key in extract_cols.keys():
                    if output_key in session_dict:
                        for trace_data in session_dict[output_key]:
                            for trace_idx, values in trace_data.items():
                                if isinstance(values, list):
                                    trace_data[trace_idx] = [
                                        truncate_text(str(v), aggressive_limit, truncation_strategy)
                                        if isinstance(v, str) else v
                                        for v in values
                                    ]

        # Add to results
        results.append(session_dict)

    # Convert to DataFrame
    return pd.DataFrame(results)

In [7]:
def extract_tool_calls(output_messages):
    if not output_messages:
        return []

    tool_calls = []
    for message in output_messages:
        if "message.tool_calls" in message:
            for tool_call in message["message.tool_calls"]:
                tool_calls.append({
                    "name": tool_call["tool_call.function.name"],
                    "arguments": tool_call["tool_call.function.arguments"]
                })
    return tool_calls


## Evaluation Configuration



#### **Filter Data**

Customize these parameters to match your specific evaluation needs:

| Parameter | Description | Example |
|-----------|-------------|---------|
| trace_filters | Criteria for selecting traces within sessions | `{"name": {"contains": "searchrouter"}}` |
| span_filters | Criteria for selecting spans within traces | `{"parent_id": {"==": None}}` |

Span filters help determine which specific spans within the matched traces will be used for the evaluation. For example, filtering for `"parent_id": None` ensures we focus on the parent spans for the selected sessions.
   > **Note**: Update the `trace_filters` and `span_filters` to match your specific evaluation criteria

In [None]:
display(primary_df)

In [None]:
# Get the number of root spans (spans with no parent) in the primary dataframe
primary_df.loc[primary_df['parent_id'].isnull()].shape

In [None]:
# Filter traces to only include root spans (spans with no parent)
# This helps focus the evaluation on top-level interactions
eval_traces = filter_sessions_by_trace_criteria(
    df=primary_df,
    # trace_filters={},
    span_filters={"parent_id": {"==": None }}
)

In [17]:
sample_eval_traces = eval_traces.head(20)

### Prepare the data for the evaluation
This will group the prompt variables by session_id and extract the required columns and append any additional data to the dataframe

In [18]:
sessions_df = prepare_session_data_for_evaluation(
    df=eval_traces,
    extract_cols={"output_messages": "attributes.output.value", "user_inputs": "attributes.input.value"}, #can also add any additional columns to the dataframe
)

In [None]:
sessions_df.head()

In [None]:
# Sample 2 sessions that have more than 5 traces for detailed evaluation
sample_data = sessions_df.loc[sessions_df['trace_count'] > 5].sample(2)

In [None]:
sample_data.head()

## Running the Evaluation

After preparing your sessions and configuring the evaluation parameters, you can execute the LLM-based evaluation:

In [22]:
import nest_asyncio

nest_asyncio.apply()

In [23]:

model = OpenAIModel(
    api_key=os.environ['OPENAI_API_KEY'],
    model="gpt-4o-mini",
    temperature=0.0,
)

In [None]:
rails =["correct","incorrect"]
eval_results = llm_classify(
    dataframe=sessions_df,
    template=SESSION_CORRECTNESS_PROMPT,
    model=model,
    rails=rails,
    provide_explanation=True,
    verbose=False,
    concurrency=20,
)

   ## Analyzing Results
   
   The evaluation results contain:
   - **label**: Overall trajectory assessment (correct/incorrect)
   - **explanation**: Detailed reasoning for the assessment
   

In [26]:
import pandas as pd

# # # Set pandas options to display full content
# pd.set_option('display.max_columns', None)  # Show all columns
# pd.set_option('display.max_rows', None)     # Show all rows
# pd.set_option('display.max_colwidth', None) # Show full column width
# pd.set_option('display.width', None)        # Auto-detect terminal width
# pd.set_option('display.max_seq_items', None) # Show all items in sequences


# Reset options back to default when done
pd.reset_option('display.max_colwidth')
pd.reset_option('display.max_rows')

In [None]:
eval_results.head()

The evaluation results can then be merged with your original data for analysis or to log back to Arize:

In [28]:
# merge with original df to get span_id
merged_df = pd.merge(sessions_df, eval_results, left_index=True, right_index=True)

merged_df.rename(
    columns={
        "label": "session_eval.SessionCorrectness.label",
        "explanation": "session_eval.SessionCorrectness.explanation",
    },
    inplace=True,
)

# Get the root spans for each session - using the first trace's root span as the session's root span
root_spans = primary_df[primary_df["parent_id"].isnull()][
    ["attributes.session.id", "context.trace_id", "context.span_id"]
].drop_duplicates(subset=["attributes.session.id"], keep="first")

# Merge with merged_df to get the root span_id
final_df = pd.merge(merged_df, root_spans, on="attributes.session.id", how="left")

In [29]:
final_eval_df = final_df[
    [
        "attributes.session.id",
        "context.span_id",
        "session_eval.SessionCorrectness.label",
        "session_eval.SessionCorrectness.explanation",
    ]
]


In [None]:
final_eval_df.head()

In [None]:
from arize.pandas.logger import Client


# Initialize Arize client using the model_id of your traces
arize_client = Client(space_id=os.environ['ARIZE_SPACE_ID'], api_key=os.environ['ARIZE_API_KEY'])


# Set the evals_df to have the correct span ID to log it to Arize
final_eval_df = final_eval_df.set_index(final_df["context.span_id"])

# Use Arize client to log evaluations
response = arize_client.log_evaluations_sync(
    dataframe=final_eval_df,
    model_id=os.environ['ARIZE_MODEL_ID'],
)

See your results in Arize

<img src="https://storage.googleapis.com/arize-phoenix-assets/assets/docs/notebooks/session-level-evals/Screenshot%202025-06-22%20at%205.48.37%E2%80%AFPM.png" width="800"/>