---
description: A complementary approach to error analysis: discover execution patterns, detect bottlenecks, and uncover inefficiencies in your AI agents using process mining.
category: Evaluation
sidebarTitle: Process Mining
---

# Process Mining for Agent Evaluation with Langfuse

This cookbook demonstrates how to apply process mining techniques to your AI agent traces exported from Langfuse. By analyzing actual execution patterns at scale, you can discover bottlenecks, identify success and failure patterns, and generate actionable insights that traditional evaluation methods miss.

> **What is Process Mining?** Process mining is a family of techniques that extract knowledge from event logs to discover, monitor, and improve real processes. When applied to AI agents, it reveals the actual execution paths your agents take in production‚Äînot just what you designed, but what's really happening.

> **What is Langfuse?** [Langfuse](https://langfuse.com) is an open-source LLM observability platform that captures traces of your AI applications. Each trace contains a sequence of observations (tool calls, LLM generations, spans) that can be analyzed as process events.

> **What is PM4Py?** [PM4Py](https://pm4py.fit.fraunhofer.de/) is an open-source process mining library in Python that provides algorithms for process discovery, conformance checking, and performance analysis.

## Why Process Mining for Agent Evaluation?

Traditional LLM evaluation tests against known scenarios, which catches regressions but misses unknown failure patterns and emergent behaviors. Process mining complements this by discovering what's actually happening in production:

| Traditional Evaluation | Process Mining |
|----------------------|----------------|
| Tests known scenarios | Discovers unknown patterns |
| Validates expected behavior | Reveals actual behavior |
| Catches regressions | Identifies bottlenecks |
| Manual test case creation | Generates test cases from reality |

By the end of this cookbook, you'll be able to:

1. **Convert** exported Langfuse JSON traces to process mining format
2. **Discover** actual execution patterns and their frequencies
3. **Analyze** which patterns lead to success vs failure
4. **Detect** performance bottlenecks and inefficiencies
5. **Generate** actionable insights for agent improvement

## Step 1: Install Dependencies

In [57]:
%pip install pm4py pandas plotly kaleido --upgrade


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m25.1.1[0m[39;49m -> [0m[32;49m25.3[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.


## Step 2: Import Libraries and Configure Parameters

In [58]:
import json
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Any, Optional
import warnings
warnings.filterwarnings('ignore')

import pandas as pd
import numpy as np

# PM4Py
import pm4py
from pm4py.objects.log.util import dataframe_utils
from pm4py.objects.conversion.log import converter as log_converter
from pm4py.visualization.dfg import visualizer as dfg_visualizer

# Visualization
import plotly.graph_objects as go
from plotly.subplots import make_subplots

# Analysis Parameters
MIN_VARIANT_FREQUENCY = 3            # Minimum occurrences for a pattern
HIGH_SUCCESS_THRESHOLD = 0.85        # Success rate for "high performing"
BOTTLENECK_THRESHOLD_PCT = 20        # % of total time to flag as bottleneck

print("‚úÖ Libraries loaded successfully")

‚úÖ Libraries loaded successfully


## Step 3: Convert Langfuse JSON to Event Log

Export your traces from Langfuse as JSON, then use this converter to transform them into the event log format needed for process mining.

**How to export from Langfuse:**
1. Go to your Langfuse project ‚Üí Traces
2. Filter traces as needed for traces related to agent you want to analyze
3. Export as JSON
4. Save the file and provide the path below

In [59]:
def extract_case_id(trace: Dict[str, Any]) -> str:
    """Extract case ID from trace metadata or ID."""
    metadata = trace.get("metadata", {})
    if "ticket_id" in metadata:
        return metadata["ticket_id"]
    name = trace.get("name", "")
    if "process_ticket_" in name:
        return name.replace("process_ticket_", "")
    return trace.get("id", "UNKNOWN")


def parse_observation_name(obs_name: str) -> tuple:
    """Parse observation name into activity and tool name."""
    if obs_name.startswith("iteration_") or obs_name.startswith("process_ticket"):
        return None, None
    
    activity_map = {
        "classify_ticket": "Classify Ticket",
        "search_knowledge_base": "Search Knowledge Base",
        "check_account_status": "Check Account Status",
        "check_order_history": "Check Order History",
        "generate_response": "Generate Response",
        "escalate_to_human": "Escalate to Human",
        "claude_api_call": None,
    }
    
    tool_name = obs_name
    activity = activity_map.get(obs_name)
    
    if activity is None and obs_name not in ["claude_api_call"]:
        words = obs_name.split("_")
        activity = " ".join(word.capitalize() for word in words)
    
    return activity, tool_name


def extract_events_from_trace(trace: Dict[str, Any]) -> List[Dict[str, Any]]:
    """Extract events from a single Langfuse trace."""
    events = []
    case_id = extract_case_id(trace)
    
    trace_timestamp = trace.get("timestamp")
    if isinstance(trace_timestamp, str):
        try:
            trace_timestamp = datetime.fromisoformat(trace_timestamp.replace("Z", "+00:00"))
        except:
            trace_timestamp = datetime.now()
    elif trace_timestamp is None:
        trace_timestamp = datetime.now()
    
    # Add start event
    events.append({
        "case_id": case_id,
        "activity": "Ticket Incoming",
        "tool_name": "case_start",
        "timestamp": trace_timestamp.strftime("%Y-%m-%d %H:%M:%S"),
        "sequence": 0,
        "status": "success"
    })
    
    sequence = 1
    observations = trace.get("observations", [])
    
    def get_start_time(obs):
        start = obs.get("startTime") or obs.get("start_time")
        if start:
            try:
                if isinstance(start, str):
                    return datetime.fromisoformat(start.replace("Z", "+00:00"))
                return start
            except:
                pass
        return datetime.min
    
    observations = sorted(observations, key=get_start_time)
    
    for obs in observations:
        obs_name = obs.get("name", "")
        activity, tool_name = parse_observation_name(obs_name)
        
        if activity is None:
            continue
        
        start_time = obs.get("startTime") or obs.get("start_time")
        if start_time:
            try:
                if isinstance(start_time, str):
                    timestamp = datetime.fromisoformat(start_time.replace("Z", "+00:00"))
                else:
                    timestamp = start_time
            except:
                timestamp = trace_timestamp
        else:
            timestamp = trace_timestamp
        
        status_message = obs.get("statusMessage", "")
        level = obs.get("level", "DEFAULT")
        status = "error" if level == "ERROR" or "error" in status_message.lower() else "success"
        
        events.append({
            "case_id": case_id,
            "activity": activity,
            "tool_name": tool_name,
            "timestamp": timestamp.strftime("%Y-%m-%d %H:%M:%S"),
            "sequence": sequence,
            "status": status
        })
        sequence += 1
    
    # Add end event
    end_timestamp = trace.get("endTime") or trace.get("end_time")
    if end_timestamp:
        try:
            if isinstance(end_timestamp, str):
                end_timestamp = datetime.fromisoformat(end_timestamp.replace("Z", "+00:00"))
        except:
            end_timestamp = datetime.now()
    else:
        end_timestamp = datetime.now()
    
    events.append({
        "case_id": case_id,
        "activity": "Ticket Closed",
        "tool_name": "case_end",
        "timestamp": end_timestamp.strftime("%Y-%m-%d %H:%M:%S"),
        "sequence": sequence,
        "status": "success"
    })
    
    return events


def load_langfuse_json(file_path: str) -> List[Dict[str, Any]]:
    """Load traces from Langfuse JSON export."""
    print(f"üìÇ Loading: {file_path}")
    
    with open(file_path, 'r') as f:
        data = json.load(f)
    
    if isinstance(data, list):
        traces = data
    elif isinstance(data, dict):
        if "traces" in data:
            traces = data["traces"]
        elif "data" in data:
            traces = data["data"]
        else:
            traces = [data]
    else:
        traces = []
    
    print(f"‚úÖ Loaded {len(traces)} trace(s)")
    return traces


def convert_to_event_log(traces: List[Dict[str, Any]]) -> tuple:
    """Convert Langfuse traces to PM4Py event log."""
    all_events = []
    
    for trace in traces:
        events = extract_events_from_trace(trace)
        all_events.extend(events)
    
    df = pd.DataFrame(all_events)
    
    if df.empty:
        print("‚ùå No events extracted")
        return None, df
    
    # Rename to PM4Py standard columns
    df = df.rename(columns={
        'case_id': 'case:concept:name',
        'activity': 'concept:name'
    })
    
    df['time:timestamp'] = pd.to_datetime(df['timestamp'])
    df = df.sort_values(['case:concept:name', 'sequence'])
    df['duration_ms'] = df.groupby('case:concept:name')['time:timestamp'].diff().dt.total_seconds() * 1000
    df['tokens_total'] = 0
    df['success'] = df['status'].apply(lambda x: 1.0 if x == 'success' else 0.0)
    
    df_pm4py = dataframe_utils.convert_timestamp_columns_in_df(df.copy())
    event_log = log_converter.apply(df_pm4py, parameters={
        log_converter.Variants.TO_EVENT_LOG.value.Parameters.CASE_ID_KEY: 'case:concept:name'
    })
    
    print(f"\nüìä Event Log Summary:")
    print(f"   Traces: {df['case:concept:name'].nunique()}")
    print(f"   Events: {len(df)}")
    print(f"   Activities: {df['concept:name'].nunique()}")
    
    return event_log, df

## Step 4: Load Your Data

Choose one of the options below based on your data source.

In [None]:
# ============================================================
# OPTION 1: Load from Langfuse JSON export
# ============================================================
# json_file = "path/to/your/langfuse_export.json"
# traces = load_langfuse_json(json_file)
# event_log, df = convert_to_event_log(traces)

# ============================================================
# OPTION 2: Load from pre-converted CSV
# ============================================================
csv_file = "examples/sample_traces.csv"

df_raw = pd.read_csv(csv_file)
df = df_raw.rename(columns={'case_id': 'case:concept:name', 'activity': 'concept:name'})
df['time:timestamp'] = pd.to_datetime(df['timestamp'])
df = df.sort_values(['case:concept:name', 'sequence'])
df['duration_ms'] = df.groupby('case:concept:name')['time:timestamp'].diff().dt.total_seconds() * 1000
df['tokens_total'] = 0
df['success'] = df['status'].apply(lambda x: 1.0 if x == 'success' else 0.0)

df_pm4py = dataframe_utils.convert_timestamp_columns_in_df(df.copy())
event_log = log_converter.apply(df_pm4py, parameters={
    log_converter.Variants.TO_EVENT_LOG.value.Parameters.CASE_ID_KEY: 'case:concept:name'
})

print(f"‚úÖ Loaded {df['case:concept:name'].nunique()} traces with {len(df)} events")
print(f"   Activities: {df['concept:name'].nunique()}")

‚úÖ Loaded 22 traces with 168 events
   Activities: 9


## Step 5: Discover Execution Patterns

Use process discovery to visualize the actual execution flows your agent takes. The Directly-Follows Graph (DFG) shows which activities follow each other and how frequently.

In [61]:
dfg, start_activities, end_activities = pm4py.discover_dfg(event_log)

print("üîç Process Discovery Results:")
print(f"\nüìç Start Activities:")
for activity, count in sorted(start_activities.items(), key=lambda x: x[1], reverse=True):
    print(f"   {activity}: {count}x")

print(f"\nüèÅ End Activities:")
for activity, count in sorted(end_activities.items(), key=lambda x: x[1], reverse=True):
    print(f"   {activity}: {count}x")

print(f"\nüîó Most Common Transitions:")
for (from_act, to_act), count in sorted(dfg.items(), key=lambda x: x[1], reverse=True)[:10]:
    print(f"   {from_act} ‚Üí {to_act}: {count}x")

üîç Process Discovery Results:

üìç Start Activities:
   Ticket Incoming: 22x

üèÅ End Activities:
   Ticket Closed: 22x

üîó Most Common Transitions:
   Ticket Incoming ‚Üí Classify Ticket: 22x
   Classify Ticket ‚Üí Check Account Status: 21x
   Check Account Status ‚Üí Search Knowledge Base: 21x
   Search Knowledge Base ‚Üí Generate Response: 19x
   Search Knowledge Base ‚Üí Escalate to Human: 14x
   Escalate to Human ‚Üí Ticket Closed: 14x
   Generate Response ‚Üí User Follow-up: 12x
   User Follow-up ‚Üí Search Knowledge Base: 12x
   Generate Response ‚Üí Ticket Closed: 8x
   Search Knowledge Base ‚Üí Check Order Status: 1x


In [62]:
# Visualize the process flow
gviz = dfg_visualizer.apply(
    dfg, log=event_log,
    variant=dfg_visualizer.Variants.FREQUENCY,
    parameters={
        dfg_visualizer.Variants.FREQUENCY.value.Parameters.FORMAT: "png",
        dfg_visualizer.Variants.FREQUENCY.value.Parameters.START_ACTIVITIES: start_activities,
        dfg_visualizer.Variants.FREQUENCY.value.Parameters.END_ACTIVITIES: end_activities
    }
)
dfg_visualizer.save(gviz, "process_flow.png")
print("‚úÖ Process flow saved as 'process_flow.png'")
print("   Thicker arrows = more frequent transitions")

‚úÖ Process flow saved as 'process_flow.png'
   Thicker arrows = more frequent transitions


## Step 6: Analyze Success and Failure Patterns

Identify which execution paths (variants) lead to successful outcomes vs failures.

In [63]:
def analyze_variants(event_log, df, min_frequency=MIN_VARIANT_FREQUENCY):
    """Analyze execution variants and their success rates."""
    variants = pm4py.get_variants_as_tuples(event_log)
    
    variant_stats = []
    for variant_tuple, cases in variants.items():
        case_ids = [case.attributes['concept:name'] for case in cases]
        variant_df = df[df['case:concept:name'].isin(case_ids)]
        
        frequency = len(case_ids)
        frequency_pct = frequency / df['case:concept:name'].nunique() * 100
        success_values = variant_df.groupby('case:concept:name')['success'].first()
        success_rate = success_values.mean() if not success_values.isna().all() else None
        avg_duration = variant_df.groupby('case:concept:name')['duration_ms'].sum().mean()
        
        variant_stats.append({
            'variant': ' ‚Üí '.join(variant_tuple),
            'variant_tuple': variant_tuple,
            'frequency': frequency,
            'frequency_pct': frequency_pct,
            'success_rate': success_rate,
            'avg_duration_ms': avg_duration,
            'num_steps': len(variant_tuple),
            'case_ids': case_ids
        })
    
    variants_df = pd.DataFrame(variant_stats).sort_values('frequency', ascending=False)
    variants_df_filtered = variants_df[variants_df['frequency'] >= min_frequency]
    
    return variants_df, variants_df_filtered

variants_df, variants_df_filtered = analyze_variants(event_log, df)
print(f"üîç Found {len(variants_df)} unique execution patterns ({len(variants_df_filtered)} with ‚â•{MIN_VARIANT_FREQUENCY} occurrences)")

üîç Found 5 unique execution patterns (3 with ‚â•3 occurrences)


In [64]:
# Separate high and low performers
variants_with_success = variants_df_filtered[variants_df_filtered['success_rate'].notna()]
high_performers = variants_with_success[variants_with_success['success_rate'] >= HIGH_SUCCESS_THRESHOLD].sort_values('frequency', ascending=False)
low_performers = variants_with_success[variants_with_success['success_rate'] < 0.5].sort_values('frequency', ascending=False)

print(f"‚úÖ HIGH-PERFORMING PATTERNS (Success ‚â• {HIGH_SUCCESS_THRESHOLD*100:.0f}%):")
print("=" * 100)
if len(high_performers) > 0:
    for _, row in high_performers.head(5).iterrows():
        print(f"\n{row['frequency']}x ({row['frequency_pct']:.1f}%) | Success: {row['success_rate']*100:.1f}% | Steps: {row['num_steps']}")
        print(f"   {row['variant'][:120]}{'...' if len(row['variant']) > 120 else ''}")
else:
    print("   No high-performing patterns found.")

print(f"\n\n‚ùå LOW-PERFORMING PATTERNS (Success < 50%):")
print("=" * 100)
if len(low_performers) > 0:
    for _, row in low_performers.head(5).iterrows():
        print(f"\n{row['frequency']}x ({row['frequency_pct']:.1f}%) | Success: {row['success_rate']*100:.1f}% | Steps: {row['num_steps']}")
        print(f"   {row['variant'][:120]}{'...' if len(row['variant']) > 120 else ''}")
else:
    print("   ‚úÖ No low-performing patterns found.")

‚úÖ HIGH-PERFORMING PATTERNS (Success ‚â• 85%):

10x (45.5%) | Success: 100.0% | Steps: 6
   Ticket Incoming ‚Üí Classify Ticket ‚Üí Check Account Status ‚Üí Search Knowledge Base ‚Üí Escalate to Human ‚Üí Ticket Closed

6x (27.3%) | Success: 100.0% | Steps: 6
   Ticket Incoming ‚Üí Classify Ticket ‚Üí Check Account Status ‚Üí Search Knowledge Base ‚Üí Generate Response ‚Üí Ticket Closed

4x (18.2%) | Success: 100.0% | Steps: 15
   Ticket Incoming ‚Üí Classify Ticket ‚Üí Check Account Status ‚Üí Search Knowledge Base ‚Üí Generate Response ‚Üí User Follow-up ‚Üí ...


‚ùå LOW-PERFORMING PATTERNS (Success < 50%):
   ‚úÖ No low-performing patterns found.


In [65]:
# Visualize pattern analysis
fig = make_subplots(
    rows=1, cols=2,
    subplot_titles=('Pattern Frequency', 'Success Rate by Pattern')
)

top_variants = variants_df_filtered.head(10)
labels = [f"Pattern {i+1}" for i in range(len(top_variants))]

fig.add_trace(go.Bar(x=labels, y=top_variants['frequency'], name='Frequency'), row=1, col=1)
fig.add_trace(go.Bar(x=labels, y=top_variants['success_rate']*100, name='Success %'), row=1, col=2)

fig.update_layout(height=400, showlegend=False, title_text="Execution Pattern Analysis")
fig.show()

## Step 7: Detect Performance Bottlenecks

Identify which activities consume the most time and may be slowing down your agent.

In [66]:
def analyze_bottlenecks(df, threshold_pct=BOTTLENECK_THRESHOLD_PCT):
    """Identify performance bottlenecks by activity."""
    activity_stats = df.groupby('concept:name').agg({
        'duration_ms': ['mean', 'sum', 'count'],
        'tokens_total': ['mean', 'sum'],
        'success': 'mean'
    }).round(2)
    
    activity_stats.columns = [
        'avg_duration_ms', 'total_duration_ms', 'call_count',
        'avg_tokens', 'total_tokens', 'success_rate'
    ]
    
    total_time = activity_stats['total_duration_ms'].sum()
    activity_stats['pct_of_total_time'] = (activity_stats['total_duration_ms'] / total_time * 100).round(1)
    activity_stats = activity_stats.sort_values('total_duration_ms', ascending=False)
    activity_stats['is_bottleneck'] = activity_stats['pct_of_total_time'] >= threshold_pct
    
    return activity_stats

activity_stats = analyze_bottlenecks(df)

print("‚è±Ô∏è ACTIVITY PERFORMANCE ANALYSIS")
print("=" * 100)
print(f"{'Activity':<35} {'Calls':<8} {'Avg (ms)':<12} {'% Time':<10} {'Success':<10}")
print("=" * 100)

for activity, row in activity_stats.head(10).iterrows():
    flag = "üö®" if row['is_bottleneck'] else "  "
    success_str = f"{row['success_rate']*100:.1f}%" if pd.notna(row['success_rate']) else "N/A"
    print(f"{flag} {activity:<33} {int(row['call_count']):<8} {row['avg_duration_ms']:<12.0f} {row['pct_of_total_time']:<10.1f} {success_str:<10}")

print(f"\nüö® = Bottleneck (>{BOTTLENECK_THRESHOLD_PCT}% of total time)")

‚è±Ô∏è ACTIVITY PERFORMANCE ANALYSIS
Activity                            Calls    Avg (ms)     % Time     Success   
üö® User Follow-up                    12       815000       54.9       100.0%    
   Search Knowledge Base             34       60000        11.4       100.0%    
   Classify Ticket                   22       60000        7.4        100.0%    
   Ticket Closed                     22       60000        7.4        100.0%    
   Check Account Status              21       60000        7.1        100.0%    
   Generate Response                 20       60000        6.7        100.0%    
   Escalate to Human                 14       60000        4.7        100.0%    
   Check Order Status                1        60000        0.3        100.0%    
   Ticket Incoming                   0        nan          0.0        100.0%    

üö® = Bottleneck (>20% of total time)


In [67]:
# Visualize bottlenecks
fig = make_subplots(
    rows=1, cols=2,
    subplot_titles=('Time Distribution by Activity', 'Average Duration by Activity'),
    specs=[[{"type": "pie"}, {"type": "bar"}]]
)

top_activities = activity_stats.head(8)
fig.add_trace(go.Pie(labels=top_activities.index, values=top_activities['total_duration_ms']), row=1, col=1)
fig.add_trace(go.Bar(x=top_activities.index, y=top_activities['avg_duration_ms']), row=1, col=2)

fig.update_layout(height=400, showlegend=False, title_text="Bottleneck Analysis")
fig.update_xaxes(tickangle=45)
fig.show()

## Step 8: Detect Inefficiencies

Find waste patterns like repeated tool calls, loops, and cycles.

In [68]:
def detect_waste(df):
    """Detect various forms of waste and inefficiency."""
    waste_report = {'repeated_calls': [], 'loops': [], 'three_step_loops': [], 'repeating_cycles': [], 'long_chains': []}

    for trace_id in df['case:concept:name'].unique():
        activities = df[df['case:concept:name'] == trace_id].sort_values('time:timestamp')['concept:name'].tolist()

        # Consecutive repeated calls (A ‚Üí A)
        for i in range(len(activities) - 1):
            if activities[i] == activities[i+1]:
                waste_report['repeated_calls'].append({'trace_id': trace_id, 'activity': activities[i]})

        # Two-step loops (A ‚Üí B ‚Üí A)
        for i in range(len(activities) - 2):
            if activities[i] == activities[i+2] and activities[i] != activities[i+1]:
                waste_report['loops'].append({'trace_id': trace_id, 'pattern': f"{activities[i]} ‚Üí {activities[i+1]} ‚Üí {activities[i+2]}"})

        # Three-step loops (A ‚Üí B ‚Üí C ‚Üí A) - cycle returns to start after 3 distinct activities
        for i in range(len(activities) - 3):
            if (activities[i] == activities[i+3] and
                len(set(activities[i:i+3])) == 3):  # All 3 activities are distinct
                waste_report['three_step_loops'].append({
                    'trace_id': trace_id,
                    'pattern': f"{activities[i]} ‚Üí {activities[i+1]} ‚Üí {activities[i+2]} ‚Üí {activities[i+3]}"
                })

        # Repeating cycles (A ‚Üí B ‚Üí C ‚Üí A ‚Üí B ‚Üí C) - exact sequence repetition
        i = 0
        while i <= len(activities) - 6:
            if tuple(activities[i:i+3]) == tuple(activities[i+3:i+6]) and len(set(activities[i:i+3])) > 1:
                waste_report['repeating_cycles'].append({'trace_id': trace_id, 'cycle': ' ‚Üí '.join(activities[i:i+3])})
                i += 6
            else:
                i += 1

        # Long chains (>15 steps)
        if len(activities) > 15:
            waste_report['long_chains'].append({'trace_id': trace_id, 'length': len(activities)})

    return waste_report

waste_report = detect_waste(df)

print("üóëÔ∏è WASTE & INEFFICIENCY DETECTION")
print("=" * 80)

print(f"\n‚ö†Ô∏è Repeated Calls (A‚ÜíA): {len(waste_report['repeated_calls'])} instances")
if waste_report['repeated_calls']:
    for activity, count in pd.DataFrame(waste_report['repeated_calls'])['activity'].value_counts().head(3).items():
        print(f"   {activity}: {count}x")

print(f"\nüîÑ Two-Step Loops (A‚ÜíB‚ÜíA): {len(waste_report['loops'])} instances")
if waste_report['loops']:
    for pattern, count in pd.DataFrame(waste_report['loops'])['pattern'].value_counts().head(3).items():
        print(f"   {pattern}: {count}x")

print(f"\nüîÑ Three-Step Loops (A‚ÜíB‚ÜíC‚ÜíA): {len(waste_report['three_step_loops'])} instances")
if waste_report['three_step_loops']:
    for pattern, count in pd.DataFrame(waste_report['three_step_loops'])['pattern'].value_counts().head(3).items():
        print(f"   {pattern}: {count}x")

print(f"\nüîÅ Repeating Cycles: {len(waste_report['repeating_cycles'])} instances")
if waste_report['repeating_cycles']:
    for cycle, count in pd.DataFrame(waste_report['repeating_cycles'])['cycle'].value_counts().head(3).items():
        print(f"   {cycle}: {count}x")

print(f"\nüìè Long Chains (>15 steps): {len(waste_report['long_chains'])} traces")

üóëÔ∏è WASTE & INEFFICIENCY DETECTION

‚ö†Ô∏è Repeated Calls (A‚ÜíA): 0 instances

üîÑ Two-Step Loops (A‚ÜíB‚ÜíA): 0 instances

üîÑ Three-Step Loops (A‚ÜíB‚ÜíC‚ÜíA): 28 instances
   Search Knowledge Base ‚Üí Generate Response ‚Üí User Follow-up ‚Üí Search Knowledge Base: 12x
   Generate Response ‚Üí User Follow-up ‚Üí Search Knowledge Base ‚Üí Generate Response: 8x
   User Follow-up ‚Üí Search Knowledge Base ‚Üí Generate Response ‚Üí User Follow-up: 8x

üîÅ Repeating Cycles: 4 instances
   Search Knowledge Base ‚Üí Generate Response ‚Üí User Follow-up: 4x

üìè Long Chains (>15 steps): 0 traces


## Step 9: Generate Actionable Insights

Synthesize findings into prioritized recommendations.

In [69]:
def generate_insights(activity_stats, waste_report, high_performers, low_performers):
    """Generate actionable insights from process mining analysis."""
    insights = []

    # Bottlenecks
    for activity, row in activity_stats[activity_stats['is_bottleneck']].iterrows():
        insights.append({
            'category': '‚è±Ô∏è Bottleneck',
            'priority': 'HIGH',
            'finding': f"'{activity}' consumes {row['pct_of_total_time']:.1f}% of execution time",
            'recommendation': f"Optimize '{activity}' - avg {row['avg_duration_ms']:.0f}ms across {int(row['call_count'])} calls"
        })

    # Low success activities
    for activity, row in activity_stats[(activity_stats['success_rate'].notna()) & (activity_stats['success_rate'] < 0.7)].iterrows():
        insights.append({
            'category': '‚ùå Low Success',
            'priority': 'HIGH',
            'finding': f"'{activity}' has {row['success_rate']*100:.1f}% success rate",
            'recommendation': f"Investigate failure causes in '{activity}'"
        })

    # Three-step loops (A‚ÜíB‚ÜíC‚ÜíA)
    if waste_report.get('three_step_loops'):
        for pattern, count in pd.DataFrame(waste_report['three_step_loops'])['pattern'].value_counts().head(2).items():
            insights.append({
                'category': 'üîÑ 3-Step Loop',
                'priority': 'HIGH',
                'finding': f"Three-step loop '{pattern}' occurs {count}x",
                'recommendation': "Add loop detection or state tracking to prevent re-entry"
            })

    # Repeating cycles
    if waste_report.get('repeating_cycles'):
        for cycle, count in pd.DataFrame(waste_report['repeating_cycles'])['cycle'].value_counts().head(2).items():
            insights.append({
                'category': 'üîÅ Cycle',
                'priority': 'HIGH',
                'finding': f"Repeating cycle '{cycle}' occurs {count}x",
                'recommendation': "Add cycle detection or max-iteration limits"
            })

    # Repeated calls
    if waste_report['repeated_calls']:
        for activity, count in pd.DataFrame(waste_report['repeated_calls'])['activity'].value_counts().head(2).items():
            insights.append({
                'category': 'üóëÔ∏è Waste',
                'priority': 'MEDIUM',
                'finding': f"'{activity}' called consecutively {count}x",
                'recommendation': f"Add caching or deduplication for '{activity}'"
            })

    # Low performing patterns
    for _, row in low_performers.head(2).iterrows():
        insights.append({
            'category': 'üîç Investigate',
            'priority': 'HIGH',
            'finding': f"Pattern with {row['success_rate']*100:.1f}% success ({row['frequency']}x)",
            'recommendation': f"Debug: {row['variant'][:80]}..."
        })

    # High performing patterns
    for _, row in high_performers.head(2).iterrows():
        insights.append({
            'category': '‚ú® Success Pattern',
            'priority': 'LOW',
            'finding': f"Pattern with {row['success_rate']*100:.1f}% success ({row['frequency']}x)",
            'recommendation': f"Promote: {row['variant'][:80]}..."
        })

    return insights

insights = generate_insights(activity_stats, waste_report, high_performers, low_performers)

print("üí° ACTIONABLE INSIGHTS")
print("=" * 100)

priority_order = {'HIGH': 0, 'MEDIUM': 1, 'LOW': 2}
for i, insight in enumerate(sorted(insights, key=lambda x: priority_order[x['priority']]), 1):
    emoji = "üî¥" if insight['priority'] == 'HIGH' else "üü°" if insight['priority'] == 'MEDIUM' else "üü¢"
    print(f"\n{i}. {insight['category']} {emoji}")
    print(f"   Finding: {insight['finding']}")
    print(f"   ‚Üí {insight['recommendation']}")

# Export insights
pd.DataFrame(insights).to_csv('process_mining_insights.csv', index=False)
print("\n‚úÖ Insights exported to 'process_mining_insights.csv'")

üí° ACTIONABLE INSIGHTS

1. ‚è±Ô∏è Bottleneck üî¥
   Finding: 'User Follow-up' consumes 54.9% of execution time
   ‚Üí Optimize 'User Follow-up' - avg 815000ms across 12 calls

2. üîÑ 3-Step Loop üî¥
   Finding: Three-step loop 'Search Knowledge Base ‚Üí Generate Response ‚Üí User Follow-up ‚Üí Search Knowledge Base' occurs 12x
   ‚Üí Add loop detection or state tracking to prevent re-entry

3. üîÑ 3-Step Loop üî¥
   Finding: Three-step loop 'Generate Response ‚Üí User Follow-up ‚Üí Search Knowledge Base ‚Üí Generate Response' occurs 8x
   ‚Üí Add loop detection or state tracking to prevent re-entry

4. üîÅ Cycle üî¥
   Finding: Repeating cycle 'Search Knowledge Base ‚Üí Generate Response ‚Üí User Follow-up' occurs 4x
   ‚Üí Add cycle detection or max-iteration limits

5. ‚ú® Success Pattern üü¢
   Finding: Pattern with 100.0% success (10x)
   ‚Üí Promote: Ticket Incoming ‚Üí Classify Ticket ‚Üí Check Account Status ‚Üí Search Knowledge Base...

6. ‚ú® Success Pattern üü¢
   

## Step 10: Executive Summary

In [70]:
print("=" * 100)
print(" " * 35 + "EXECUTIVE SUMMARY")
print("=" * 100)

print(f"\nüìä DATASET OVERVIEW")
print(f"   Traces Analyzed: {df['case:concept:name'].nunique()}")
print(f"   Total Events: {len(df)}")
print(f"   Unique Activities: {df['concept:name'].nunique()}")

overall_success = df.groupby('case:concept:name')['success'].first()
if not overall_success.isna().all():
    print(f"   Overall Success Rate: {overall_success.mean()*100:.1f}%")

print(f"\nüîç EXECUTION PATTERNS")
print(f"   Unique Patterns: {len(variants_df)}")
print(f"   Common Patterns (‚â•{MIN_VARIANT_FREQUENCY}x): {len(variants_df_filtered)}")
if len(variants_df) > 0:
    print(f"   Most Common: {variants_df.iloc[0]['variant'][:60]}...")
    print(f"      ‚Üí {variants_df.iloc[0]['frequency']}x ({variants_df.iloc[0]['frequency_pct']:.1f}%)")

print(f"\n‚è±Ô∏è BOTTLENECKS")
bottlenecks = activity_stats[activity_stats['is_bottleneck']]
print(f"   Critical Bottlenecks: {len(bottlenecks)}")
if len(bottlenecks) > 0:
    top = bottlenecks.iloc[0]
    print(f"   Top: '{bottlenecks.index[0]}' ({top['pct_of_total_time']:.1f}% of time)")

print(f"\n‚úÖ SUCCESS PATTERNS")
print(f"   High Performers (‚â•{HIGH_SUCCESS_THRESHOLD*100:.0f}%): {len(high_performers)}")
print(f"   Low Performers (<50%): {len(low_performers)}")

print(f"\nüóëÔ∏è INEFFICIENCIES")
print(f"   Repeated Calls: {len(waste_report['repeated_calls'])}")
print(f"   Loops: {len(waste_report['loops'])}")
print(f"   Repeating Cycles: {len(waste_report['repeating_cycles'])}")
print(f"   Long Chains: {len(waste_report['long_chains'])}")

print(f"\nüéØ PRIORITY ACTIONS")
high_priority = [i for i in insights if i['priority'] == 'HIGH']
for i, insight in enumerate(high_priority[:3], 1):
    print(f"   {i}. {insight['category']} - {insight['finding'][:60]}")

print("\n" + "=" * 100)
print("\nüìö OUTPUTS:")
print("   ‚Ä¢ process_flow.png - Visual process map")
print("   ‚Ä¢ process_mining_insights.csv - Detailed recommendations")
print("=" * 100)

                                   EXECUTIVE SUMMARY

üìä DATASET OVERVIEW
   Traces Analyzed: 22
   Total Events: 168
   Unique Activities: 9
   Overall Success Rate: 100.0%

üîç EXECUTION PATTERNS
   Unique Patterns: 5
   Common Patterns (‚â•3x): 3
   Most Common: Ticket Incoming ‚Üí Classify Ticket ‚Üí Check Account Status ‚Üí S...
      ‚Üí 10x (45.5%)

‚è±Ô∏è BOTTLENECKS
   Critical Bottlenecks: 1
   Top: 'User Follow-up' (54.9% of time)

‚úÖ SUCCESS PATTERNS
   High Performers (‚â•85%): 3
   Low Performers (<50%): 0

üóëÔ∏è INEFFICIENCIES
   Repeated Calls: 0
   Loops: 0
   Repeating Cycles: 4
   Long Chains: 0

üéØ PRIORITY ACTIONS
   1. ‚è±Ô∏è Bottleneck - 'User Follow-up' consumes 54.9% of execution time
   2. üîÑ 3-Step Loop - Three-step loop 'Search Knowledge Base ‚Üí Generate Response ‚Üí
   3. üîÑ 3-Step Loop - Three-step loop 'Generate Response ‚Üí User Follow-up ‚Üí Search


üìö OUTPUTS:
   ‚Ä¢ process_flow.png - Visual process map
   ‚Ä¢ process_mining_insights.c

## Conclusion

This process mining analysis revealed:

1. **Execution Patterns** - The actual paths your agent takes in production
2. **Success vs Failure Patterns** - Which execution paths lead to good outcomes
3. **Bottlenecks** - Activities consuming disproportionate time
4. **Inefficiencies** - Loops, cycles, and repeated calls indicating problems

### Next Steps

1. Address high-priority bottlenecks and inefficiencies
2. Investigate low-performing patterns to understand failure causes
3. Use high-performing patterns as templates for agent improvement
4. Schedule this analysis to run regularly (weekly/monthly)

### Additional Resources

- [PM4Py Documentation](https://processintelligence.solutions/pm4py)
- [Langfuse Documentation](https://langfuse.com/docs)