# Behavioral Data Analysis for Potato Annotations

This notebook demonstrates how to analyze behavioral tracking data collected by Potato's interaction tracking system.

## Overview

Potato tracks:
- **Interactions**: Clicks, focus changes, keyboard shortcuts, navigation
- **AI Assistance**: Request/response timing, acceptance rates
- **Annotation Changes**: All modifications with timestamps and sources
- **Timing Data**: Session duration, focus time per element, scroll depth

This data enables:
- Quality assessment of annotations
- Identification of problematic annotators
- Analysis of AI assistance effectiveness
- Optimization of annotation workflows

In [None]:
import json
import pandas as pd
import numpy as np
from pathlib import Path
from collections import defaultdict, Counter
from datetime import datetime

# Optional: for visualization
try:
    import matplotlib.pyplot as plt
    import seaborn as sns
    HAS_PLOTS = True
    sns.set_style("whitegrid")
    plt.rcParams['figure.figsize'] = [10, 6]
except ImportError:
    HAS_PLOTS = False
    print("matplotlib/seaborn not available - skipping visualizations")

## 1. Loading Behavioral Data

Behavioral data can be loaded from:
1. Individual user state files (`annotation_output/<user>/user_state.json`)
2. Example data files (like the one in this directory)

In [None]:
def load_behavioral_data_from_file(filepath: str) -> dict:
    """Load behavioral data from a JSON file."""
    with open(filepath) as f:
        return json.load(f)

def load_behavioral_data_from_annotation_output(output_dir: str) -> dict:
    """
    Load behavioral data from Potato annotation output directory.
    
    Args:
        output_dir: Path to annotation_output directory
        
    Returns:
        Dictionary mapping user_id -> instance_id -> behavioral_data
    """
    data = {}
    output_path = Path(output_dir)
    
    for user_dir in output_path.iterdir():
        if not user_dir.is_dir():
            continue
            
        state_file = user_dir / 'user_state.json'
        if state_file.exists():
            with open(state_file) as f:
                user_state = json.load(f)
            
            user_id = user_state.get('user_id', user_dir.name)
            behavioral = user_state.get('instance_id_to_behavioral_data', {})
            if behavioral:
                data[user_id] = behavioral
    
    return data

# Load example data
behavioral_data = load_behavioral_data_from_file('example_behavioral_data.json')
print(f"Loaded data for {len(behavioral_data)} users")
for user_id, instances in behavioral_data.items():
    print(f"  {user_id}: {len(instances)} instances")

## 2. Basic Statistics

Let's calculate some basic statistics about annotation behavior.

In [None]:
def calculate_user_stats(behavioral_data: dict) -> pd.DataFrame:
    """
    Calculate per-user statistics from behavioral data.
    
    Returns DataFrame with columns:
    - user_id, total_instances, total_time_sec, avg_time_sec,
    - total_interactions, avg_interactions, total_changes, avg_changes,
    - ai_requests, ai_accepts, ai_accept_rate
    """
    rows = []
    
    for user_id, instances in behavioral_data.items():
        times = []
        interactions = []
        changes = []
        ai_requests = 0
        ai_accepts = 0
        
        for instance_id, bd in instances.items():
            # Time
            time_ms = bd.get('total_time_ms', 0)
            times.append(time_ms / 1000)
            
            # Interactions
            interactions.append(len(bd.get('interactions', [])))
            
            # Changes
            changes.append(len(bd.get('annotation_changes', [])))
            
            # AI usage
            for ai in bd.get('ai_usage', []):
                ai_requests += 1
                if ai.get('suggestion_accepted'):
                    ai_accepts += 1
        
        rows.append({
            'user_id': user_id,
            'total_instances': len(instances),
            'total_time_sec': sum(times),
            'avg_time_sec': np.mean(times) if times else 0,
            'min_time_sec': min(times) if times else 0,
            'max_time_sec': max(times) if times else 0,
            'total_interactions': sum(interactions),
            'avg_interactions': np.mean(interactions) if interactions else 0,
            'total_changes': sum(changes),
            'avg_changes': np.mean(changes) if changes else 0,
            'ai_requests': ai_requests,
            'ai_accepts': ai_accepts,
            'ai_accept_rate': ai_accepts / ai_requests if ai_requests > 0 else None
        })
    
    return pd.DataFrame(rows)

user_stats = calculate_user_stats(behavioral_data)
user_stats

## 3. Annotation Time Analysis

Analyze how long annotators spend on each instance.

In [None]:
def get_all_annotation_times(behavioral_data: dict) -> pd.DataFrame:
    """Extract annotation times for all instances."""
    rows = []
    
    for user_id, instances in behavioral_data.items():
        for instance_id, bd in instances.items():
            rows.append({
                'user_id': user_id,
                'instance_id': instance_id,
                'time_sec': bd.get('total_time_ms', 0) / 1000,
                'interactions': len(bd.get('interactions', [])),
                'changes': len(bd.get('annotation_changes', [])),
                'scroll_depth': bd.get('scroll_depth_max', 0)
            })
    
    return pd.DataFrame(rows)

times_df = get_all_annotation_times(behavioral_data)
print("Annotation Time Statistics:")
print(times_df['time_sec'].describe())

In [None]:
if HAS_PLOTS:
    fig, axes = plt.subplots(1, 2, figsize=(14, 5))
    
    # Histogram of annotation times
    axes[0].hist(times_df['time_sec'], bins=20, edgecolor='black', alpha=0.7)
    axes[0].set_xlabel('Time (seconds)')
    axes[0].set_ylabel('Count')
    axes[0].set_title('Distribution of Annotation Times')
    axes[0].axvline(times_df['time_sec'].mean(), color='red', linestyle='--', label=f'Mean: {times_df["time_sec"].mean():.1f}s')
    axes[0].legend()
    
    # Box plot by user
    times_df.boxplot(column='time_sec', by='user_id', ax=axes[1])
    axes[1].set_xlabel('User ID')
    axes[1].set_ylabel('Time (seconds)')
    axes[1].set_title('Annotation Time by User')
    plt.suptitle('')
    
    plt.tight_layout()
    plt.show()

## 4. AI Assistance Analysis

Analyze how annotators use AI assistance.

In [None]:
def analyze_ai_usage(behavioral_data: dict) -> dict:
    """
    Analyze AI assistance usage patterns.
    
    Returns:
        Dictionary with AI usage statistics
    """
    ai_events = []
    
    for user_id, instances in behavioral_data.items():
        for instance_id, bd in instances.items():
            for ai in bd.get('ai_usage', []):
                ai_events.append({
                    'user_id': user_id,
                    'instance_id': instance_id,
                    'schema': ai.get('schema_name'),
                    'suggestions': ai.get('suggestions_shown', []),
                    'accepted': ai.get('suggestion_accepted'),
                    'decision_time_ms': ai.get('time_to_decision_ms'),
                    'response_latency': (
                        ai.get('response_timestamp', 0) - ai.get('request_timestamp', 0)
                    ) if ai.get('response_timestamp') else None
                })
    
    if not ai_events:
        return {'total_requests': 0, 'message': 'No AI usage data found'}
    
    df = pd.DataFrame(ai_events)
    
    # Calculate statistics
    total_requests = len(df)
    total_accepts = df['accepted'].notna().sum()
    total_rejects = total_requests - total_accepts
    
    decision_times = df['decision_time_ms'].dropna()
    
    return {
        'total_requests': total_requests,
        'total_accepts': total_accepts,
        'total_rejects': total_rejects,
        'accept_rate': total_accepts / total_requests if total_requests > 0 else 0,
        'avg_decision_time_ms': decision_times.mean() if len(decision_times) > 0 else None,
        'by_user': df.groupby('user_id').agg({
            'accepted': lambda x: x.notna().sum(),
            'decision_time_ms': 'mean'
        }).to_dict(),
        'events_df': df
    }

ai_analysis = analyze_ai_usage(behavioral_data)
print("AI Assistance Analysis:")
print(f"  Total requests: {ai_analysis['total_requests']}")
print(f"  Accepts: {ai_analysis['total_accepts']}")
print(f"  Rejects: {ai_analysis['total_rejects']}")
print(f"  Accept rate: {ai_analysis['accept_rate']:.1%}")
if ai_analysis.get('avg_decision_time_ms'):
    print(f"  Avg decision time: {ai_analysis['avg_decision_time_ms']:.0f}ms")

## 5. Interaction Pattern Analysis

Analyze what elements users interact with and how.

In [None]:
def analyze_interactions(behavioral_data: dict) -> pd.DataFrame:
    """Analyze interaction patterns."""
    interactions = []
    
    for user_id, instances in behavioral_data.items():
        for instance_id, bd in instances.items():
            for event in bd.get('interactions', []):
                interactions.append({
                    'user_id': user_id,
                    'instance_id': instance_id,
                    'event_type': event.get('event_type'),
                    'target': event.get('target'),
                    'timestamp': event.get('timestamp')
                })
    
    return pd.DataFrame(interactions)

interactions_df = analyze_interactions(behavioral_data)

print("\nInteraction Event Types:")
print(interactions_df['event_type'].value_counts())

print("\nMost Common Targets:")
print(interactions_df['target'].value_counts().head(10))

In [None]:
if HAS_PLOTS:
    fig, axes = plt.subplots(1, 2, figsize=(14, 5))
    
    # Event types
    event_counts = interactions_df['event_type'].value_counts()
    event_counts.plot(kind='bar', ax=axes[0], color='steelblue', edgecolor='black')
    axes[0].set_title('Interaction Event Types')
    axes[0].set_xlabel('Event Type')
    axes[0].set_ylabel('Count')
    axes[0].tick_params(axis='x', rotation=45)
    
    # Target categories
    interactions_df['target_category'] = interactions_df['target'].apply(
        lambda x: x.split(':')[0] if ':' in str(x) else str(x)
    )
    target_counts = interactions_df['target_category'].value_counts()
    target_counts.plot(kind='bar', ax=axes[1], color='coral', edgecolor='black')
    axes[1].set_title('Interaction Target Categories')
    axes[1].set_xlabel('Target Category')
    axes[1].set_ylabel('Count')
    axes[1].tick_params(axis='x', rotation=45)
    
    plt.tight_layout()
    plt.show()

## 6. Quality Detection: Finding Suspicious Annotators

Identify annotators who may be providing low-quality annotations.

In [None]:
def detect_suspicious_behavior(behavioral_data: dict,
                               min_time_threshold: float = 5.0,
                               min_interactions_threshold: int = 3,
                               min_scroll_threshold: float = 25.0) -> pd.DataFrame:
    """
    Identify annotators with potentially low-quality behavior.
    
    Flags:
    - Very fast annotation times (< min_time_threshold seconds)
    - Very few interactions per instance
    - No scroll activity (didn't read the text)
    - No annotation changes (just clicking through)
    
    Returns:
        DataFrame with suspicious behavior indicators per user
    """
    rows = []
    
    for user_id, instances in behavioral_data.items():
        fast_count = 0
        low_interaction_count = 0
        no_scroll_count = 0
        no_change_count = 0
        total = len(instances)
        
        for instance_id, bd in instances.items():
            time_sec = bd.get('total_time_ms', 0) / 1000
            interactions = len(bd.get('interactions', []))
            changes = len(bd.get('annotation_changes', []))
            scroll = bd.get('scroll_depth_max', 0)
            
            if time_sec < min_time_threshold:
                fast_count += 1
            if interactions < min_interactions_threshold:
                low_interaction_count += 1
            if scroll < min_scroll_threshold:
                no_scroll_count += 1
            if changes == 0:
                no_change_count += 1
        
        if total > 0:
            fast_rate = fast_count / total
            low_rate = low_interaction_count / total
            no_scroll_rate = no_scroll_count / total
            no_change_rate = no_change_count / total
            
            # Calculate suspicion score (0-1)
            suspicion_score = (
                fast_rate * 0.3 + 
                low_rate * 0.2 + 
                no_scroll_rate * 0.2 + 
                no_change_rate * 0.3
            )
            
            rows.append({
                'user_id': user_id,
                'total_instances': total,
                'fast_annotation_rate': fast_rate,
                'low_interaction_rate': low_rate,
                'no_scroll_rate': no_scroll_rate,
                'no_change_rate': no_change_rate,
                'suspicion_score': suspicion_score,
                'flag': 'SUSPICIOUS' if suspicion_score > 0.5 else 'OK'
            })
    
    return pd.DataFrame(rows).sort_values('suspicion_score', ascending=False)

suspicious_df = detect_suspicious_behavior(behavioral_data)
print("Annotator Quality Analysis:")
suspicious_df

In [None]:
if HAS_PLOTS:
    fig, ax = plt.subplots(figsize=(10, 6))
    
    # Create grouped bar chart
    metrics = ['fast_annotation_rate', 'low_interaction_rate', 'no_scroll_rate', 'no_change_rate']
    x = np.arange(len(suspicious_df))
    width = 0.2
    
    for i, metric in enumerate(metrics):
        ax.bar(x + i * width, suspicious_df[metric], width, label=metric.replace('_', ' ').title())
    
    ax.set_xlabel('User')
    ax.set_ylabel('Rate')
    ax.set_title('Quality Indicators by User')
    ax.set_xticks(x + width * 1.5)
    ax.set_xticklabels(suspicious_df['user_id'])
    ax.legend()
    ax.set_ylim(0, 1)
    
    plt.tight_layout()
    plt.show()

## 7. Focus Time Analysis

Analyze where annotators spend their time looking.

In [None]:
def analyze_focus_time(behavioral_data: dict) -> dict:
    """Analyze focus time distribution across elements."""
    focus_totals = defaultdict(int)
    focus_by_user = defaultdict(lambda: defaultdict(int))
    
    for user_id, instances in behavioral_data.items():
        for instance_id, bd in instances.items():
            for element, time_ms in bd.get('focus_time_by_element', {}).items():
                focus_totals[element] += time_ms
                focus_by_user[user_id][element] += time_ms
    
    return {
        'totals': dict(focus_totals),
        'by_user': {k: dict(v) for k, v in focus_by_user.items()}
    }

focus_analysis = analyze_focus_time(behavioral_data)
print("\nTotal Focus Time by Element (ms):")
for element, time_ms in sorted(focus_analysis['totals'].items(), key=lambda x: -x[1]):
    print(f"  {element}: {time_ms}ms ({time_ms/1000:.1f}s)")

## 8. Annotation Change Patterns

Analyze how annotators change their minds.

In [None]:
def analyze_annotation_changes(behavioral_data: dict) -> pd.DataFrame:
    """Analyze annotation change patterns."""
    changes = []
    
    for user_id, instances in behavioral_data.items():
        for instance_id, bd in instances.items():
            instance_changes = bd.get('annotation_changes', [])
            
            for change in instance_changes:
                changes.append({
                    'user_id': user_id,
                    'instance_id': instance_id,
                    'schema': change.get('schema_name'),
                    'label': change.get('label_name'),
                    'action': change.get('action'),
                    'source': change.get('source', 'user'),
                    'timestamp': change.get('timestamp')
                })
    
    return pd.DataFrame(changes)

changes_df = analyze_annotation_changes(behavioral_data)

print("\nAnnotation Change Actions:")
print(changes_df['action'].value_counts())

print("\nAnnotation Change Sources:")
print(changes_df['source'].value_counts())

# Mind changes: select followed by deselect
mind_changes = changes_df.groupby(['user_id', 'instance_id']).apply(
    lambda x: (x['action'] == 'deselect').sum()
).reset_index(name='mind_changes')

print("\nMind Changes (deselections) by User:")
print(mind_changes.groupby('user_id')['mind_changes'].sum())

## 9. Generating Reports

Create summary reports for stakeholders.

In [None]:
def generate_summary_report(behavioral_data: dict) -> str:
    """Generate a text summary report of behavioral data."""
    user_stats = calculate_user_stats(behavioral_data)
    ai_stats = analyze_ai_usage(behavioral_data)
    suspicious = detect_suspicious_behavior(behavioral_data)
    
    report = []
    report.append("=" * 60)
    report.append("BEHAVIORAL DATA ANALYSIS REPORT")
    report.append("=" * 60)
    report.append("")
    
    # Overview
    report.append("OVERVIEW")
    report.append("-" * 40)
    report.append(f"Total Annotators: {len(user_stats)}")
    report.append(f"Total Instances Annotated: {user_stats['total_instances'].sum()}")
    report.append(f"Total Annotation Time: {user_stats['total_time_sec'].sum() / 60:.1f} minutes")
    report.append(f"Average Time per Instance: {user_stats['avg_time_sec'].mean():.1f} seconds")
    report.append("")
    
    # AI Usage
    report.append("AI ASSISTANCE USAGE")
    report.append("-" * 40)
    report.append(f"Total AI Requests: {ai_stats['total_requests']}")
    report.append(f"Suggestions Accepted: {ai_stats['total_accepts']}")
    report.append(f"Accept Rate: {ai_stats['accept_rate']:.1%}")
    if ai_stats.get('avg_decision_time_ms'):
        report.append(f"Avg Decision Time: {ai_stats['avg_decision_time_ms']:.0f}ms")
    report.append("")
    
    # Quality
    report.append("QUALITY INDICATORS")
    report.append("-" * 40)
    suspicious_users = suspicious[suspicious['flag'] == 'SUSPICIOUS']
    report.append(f"Flagged Annotators: {len(suspicious_users)} of {len(suspicious)}")
    if len(suspicious_users) > 0:
        report.append(f"Flagged Users: {', '.join(suspicious_users['user_id'].tolist())}")
    report.append("")
    
    # Per-user summary
    report.append("PER-USER SUMMARY")
    report.append("-" * 40)
    for _, row in user_stats.iterrows():
        flag = suspicious[suspicious['user_id'] == row['user_id']]['flag'].values[0]
        report.append(f"\n{row['user_id']} [{flag}]:")
        report.append(f"  Instances: {row['total_instances']}")
        report.append(f"  Avg Time: {row['avg_time_sec']:.1f}s")
        report.append(f"  Avg Interactions: {row['avg_interactions']:.1f}")
        if row['ai_requests'] > 0:
            report.append(f"  AI Accept Rate: {row['ai_accept_rate']:.1%}")
    
    return "\n".join(report)

print(generate_summary_report(behavioral_data))

## 10. Export Results

Save analysis results for further processing.

In [None]:
# Export to CSV
user_stats.to_csv('user_statistics.csv', index=False)
suspicious_df.to_csv('quality_analysis.csv', index=False)
times_df.to_csv('annotation_times.csv', index=False)
interactions_df.to_csv('interactions.csv', index=False)
changes_df.to_csv('annotation_changes.csv', index=False)

print("Exported files:")
print("  - user_statistics.csv")
print("  - quality_analysis.csv")
print("  - annotation_times.csv")
print("  - interactions.csv")
print("  - annotation_changes.csv")

## Next Steps

1. **Integration with Admin Dashboard**: View real-time analytics at `/admin`
2. **Custom Thresholds**: Adjust detection thresholds based on your task
3. **Longitudinal Analysis**: Track behavior changes over time
4. **Correlation Analysis**: Compare behavioral data with annotation quality scores

For more information, see:
- [Behavioral Tracking Documentation](../../docs/behavioral_tracking.md)
- [Admin Dashboard Documentation](../../docs/admin_dashboard.md)
- [Quality Control Documentation](../../docs/quality_control.md)