# Flat Toy Example: Colored Petri Nets for Health Behavior Data

This notebook demonstrates how to mine a colored Petri net from the toy event log, focusing on concurrent events and using color to represent contextual variables (stress and location).

## 1. Setup

In [1]:
import pandas as pd
import numpy as np
import pm4py
from pm4py.objects.petri_net.obj import PetriNet, Marking
from pm4py.objects.petri_net.utils import petri_utils
from pm4py.visualization.petri_net import visualizer as pn_visualizer
from pm4py.objects.petri_net.obj import PetriNet, Marking, InhibitorNet
from pm4py.objects.petri_net.utils import petri_utils
from pm4py.objects.petri_net import properties
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime
import warnings
warnings.filterwarnings('ignore')

# Set style for better visualizations
plt.style.use('ggplot')  # Using a built-in style instead of 'seaborn'
sns.set_theme()  # This will apply seaborn's default styling
sns.set_palette('husl')  # Set the color palette

## 2. Load and Preprocess the Event Log

In [2]:
# Load the event log
event_log_df = pd.read_json('2d_toy_event_log.json')

# Convert timestamps
event_log_df['timestamp'] = pd.to_datetime(event_log_df['timestamp'])
    
# Create stress level categories
event_log_df['stress_level'] = pd.cut(event_log_df['stress'], 
                               bins=[0, 3, 6, 10], 
                               labels=['low', 'medium', 'high'])
    
    # Create activity state categories
event_log_df['activity_state'] = 'sedentary'  # Default state
event_log_df.loc[event_log_df['event_type'] == 'physical_activity', 'activity_state'] = event_log_df['bout_type'].str.lower()
    
# Create notification state
event_log_df['notification_state'] = 'no_notification'
event_log_df.loc[event_log_df['event_type'] == 'notification', 'notification_state'] = event_log_df['action'].str.lower()

# Display the first few rows of the event log with new columns
print("\nFirst few rows of the event log with new columns:")
display(event_log_df.head())


First few rows of the event log with new columns:


Unnamed: 0,timestamp,valence,arousal,stress,location,event_type,lifecycle,bout_type,action,location_type,day,stress_qual,stress_level,activity_state,notification_state
0,2025-03-20 15:39:28.698000+00:00,7.0,5.0,6.0,,self-report,,,,,1747958000000.0,high,medium,sedentary,no_notification
1,2025-03-21 11:12:32.086000+00:00,5.0,5.0,5.0,,self-report,,,,,1747988000000.0,high,medium,sedentary,no_notification
2,2025-03-24 12:16:29.428000+00:00,5.0,5.0,5.0,,self-report,,,,,1747730000000.0,high,medium,sedentary,no_notification
3,2025-03-25 00:21:36.166000+00:00,6.0,3.0,3.0,,self-report,,,,,1747731000000.0,low,low,sedentary,no_notification
4,2025-03-25 03:22:13.885000+00:00,6.0,5.0,3.0,,self-report,,,,,1747644000000.0,low,low,sedentary,no_notification


## 3. Data-Driven Process Discovery with Colored Petri Nets


In [3]:
# Prepare the event log with detailed event types
from pm4py.objects.log.util import dataframe_utils
from pm4py.objects.conversion.log import converter as log_converter

# Create a detailed event log for process discovery
discovery_log = event_log_df.copy()

# Create detailed activity types
def create_detailed_activity(row):
    if row['event_type'] == 'physical_activity':
        if row['lifecycle'] == 'START':
            return f"start_{row['bout_type'].lower()}"
        else:  # END
            return f"end_{row['bout_type'].lower()}"
    elif row['event_type'] == 'notification':
        if row['action'] == 'received':
            return 'notification_received'
        elif row['action'] == 'read':
            return 'notification_read'
    elif row['event_type'] == 'self-report':
        # Create stress level events based on the stress value
        if row['stress'] <= 3:
            return 'report_stress_low'
        elif row['stress'] <= 6:
            return 'report_stress_medium'
        else:
            return 'report_stress_high'
    return row['event_type']

# Apply the detailed activity mapping
discovery_log['activity'] = discovery_log.apply(create_detailed_activity, axis=1)

# Add required PM4Py columns
discovery_log['case:concept:name'] = discovery_log['day']  # Use existing case_id as case identifier
discovery_log['concept:name'] = discovery_log['activity']  # Use our detailed activity as the event name
discovery_log['time:timestamp'] = discovery_log['timestamp']  # Use existing timestamp

# Convert to PM4Py event log format
event_log = log_converter.apply(discovery_log)

# Print activity statistics
print("Unique activities in the event log:")
print(discovery_log['activity'].value_counts())
discovery_log.head()

Unique activities in the event log:
activity
location_change               678
notification                  158
end_light_pa                  104
start_light_pa                104
report_stress_medium           50
start_moderate-vigorous_pa     28
end_moderate-vigorous_pa       28
report_stress_low              18
report_stress_high              8
Name: count, dtype: int64


Unnamed: 0,timestamp,valence,arousal,stress,location,event_type,lifecycle,bout_type,action,location_type,day,stress_qual,stress_level,activity_state,notification_state,activity,case:concept:name,concept:name,time:timestamp
0,2025-03-20 15:39:28.698000+00:00,7.0,5.0,6.0,,self-report,,,,,1747958000000.0,high,medium,sedentary,no_notification,report_stress_medium,1747958000000.0,report_stress_medium,2025-03-20 15:39:28.698000+00:00
1,2025-03-21 11:12:32.086000+00:00,5.0,5.0,5.0,,self-report,,,,,1747988000000.0,high,medium,sedentary,no_notification,report_stress_medium,1747988000000.0,report_stress_medium,2025-03-21 11:12:32.086000+00:00
2,2025-03-24 12:16:29.428000+00:00,5.0,5.0,5.0,,self-report,,,,,1747730000000.0,high,medium,sedentary,no_notification,report_stress_medium,1747730000000.0,report_stress_medium,2025-03-24 12:16:29.428000+00:00
3,2025-03-25 00:21:36.166000+00:00,6.0,3.0,3.0,,self-report,,,,,1747731000000.0,low,low,sedentary,no_notification,report_stress_low,1747731000000.0,report_stress_low,2025-03-25 00:21:36.166000+00:00
4,2025-03-25 03:22:13.885000+00:00,6.0,5.0,3.0,,self-report,,,,,1747644000000.0,low,low,sedentary,no_notification,report_stress_low,1747644000000.0,report_stress_low,2025-03-25 03:22:13.885000+00:00


#### 3.1 Discover Basic Process Structure

Let's use the Inductive Miner algorithm to discover the basic process structure. This algorithm is good at handling:
- Concurrency
- Loops
- Optional activities
- Complex process patterns

In [4]:
# Discover process structure using Inductive Miner
from pm4py.algo.discovery.inductive import algorithm as inductive_miner
from pm4py.visualization.process_tree import visualizer as pt_visualizer

# Discover process tree using the standard inductive miner
process_tree = inductive_miner.apply(event_log)

# Visualize the process tree with simplified parameters
parameters = {
    "format": "png"
}
gviz = pt_visualizer.apply(process_tree, parameters=parameters)
pt_visualizer.save(gviz, "discovered_process_tree.png")

# Convert to Petri net
from pm4py.objects.conversion.process_tree import converter as pt_converter
net, initial_marking, final_marking = pt_converter.apply(process_tree)

### 3.2 Enhance Discovered Model with Colors

Now we'll enhance the discovered Petri net with colors to represent:
- Physical activity states (sedentary, light, moderate-vigorous)
- Stress levels (low, medium, high)
- Location context (home, work, gym, etc.)

In [5]:
# Define color mappings for different event types
activity_colors = {
    # Physical activity events
    'start_light_pa': 'green',
    'end_light_pa': 'green',
    'start_moderate-vigorous_pa': 'red',
    'end_moderate-vigorous_pa': 'red',
    
    # Notification events
    'notification_received': 'yellow',
    'notification_read': 'orange',
    
    # Stress report events
    'report_stress_low': 'lightgreen',
    'report_stress_medium': 'orange',
    'report_stress_high': 'red',
    
    # Location states
    'at_home': 'blue',
    'at_work': 'purple',
    'at_gym': 'green',
    'at_other': 'gray',
    'at_invalid': 'red',
    'at_in_transit': 'yellow'
}

# Add color properties to places and transitions
for place in net.places:
    # Color places based on their state type
    for state, color in activity_colors.items():
        if state in place.name.lower():
            place.properties['color'] = color
            break
    else:
        place.properties['color'] = 'gray'  # default color

for transition in net.transitions:
    # Color transitions based on the activity they represent
    if transition.name in activity_colors:
        transition.properties['color'] = activity_colors[transition.name]
    else:
        transition.properties['color'] = 'gray'  # default color

# Visualize the colored Petri net
parameters = {
    pn_visualizer.Variants.WO_DECORATION.value.Parameters.FORMAT: "png",
    pn_visualizer.Variants.WO_DECORATION.value.Parameters.DEBUG: False,
    pn_visualizer.Variants.WO_DECORATION.value.Parameters.RANKDIR: "TB"
}
gviz = pn_visualizer.apply(net, initial_marking, final_marking, parameters=parameters)
pn_visualizer.save(gviz, "discovered_colored_petri_net.png")

''

### 3.3 Analyze Discovered Patterns

Let's analyze the discovered patterns to understand:
1. Common activity sequences
2. Relationship between physical activity and stress reporting
3. Location influence on activities

In [6]:
# Analyze patterns in the event log
from pm4py.algo.discovery.dfg import algorithm as dfg_discovery
from pm4py.visualization.dfg import visualizer as dfg_visualizer

# Discover and visualize Directly-Follows Graph
dfg = dfg_discovery.apply(event_log)

parameters = {
    dfg_visualizer.Variants.FREQUENCY.value.Parameters.FORMAT: "png",
    dfg_visualizer.Variants.FREQUENCY.value.Parameters.START_ACTIVITIES: True,
    dfg_visualizer.Variants.FREQUENCY.value.Parameters.END_ACTIVITIES: True
}
gviz = dfg_visualizer.apply(dfg, log=event_log, variant=dfg_visualizer.Variants.FREQUENCY, parameters=parameters)
dfg_visualizer.save(gviz, "activity_sequences.png")

# Analyze notification and stress patterns
def analyze_notification_stress_patterns(df):
    """Analyze patterns between notifications, physical activity, and stress reports"""
    patterns = []
    
    # Make sure we have a case_id column
    if 'case_id' not in df.columns:
        print("Warning: No case_id column found. Using index as case identifier.")
        df['case_id'] = df.index
    
    # Group by case and analyze sequences
    for case_id in df['case_id'].unique():
        case_events = df[df['case_id'] == case_id].sort_values('timestamp')
        
        # Track notification states
        notification_state = 'no_notification'
        
        for i, event in case_events.iterrows():
            # Handle notification events
            if event['event_type'] == 'notification':
                if pd.notna(event['action']):  # Check if action is not NaN
                    if event['action'] == 'received':
                        notification_state = 'received'
                    elif event['action'] == 'read':
                        notification_state = 'read'
            
            # Handle stress report events
            elif event['event_type'] == 'self-report':
                # Look for preceding activities within 30 minutes
                preceding_events = case_events[
                    (case_events['timestamp'] < event['timestamp']) &
                    (case_events['timestamp'] >= event['timestamp'] - pd.Timedelta(minutes=30))
                ]
                
                # Only add pattern if we have valid stress level
                if pd.notna(event['stress_level']):
                    pattern = {
                        'stress_level': event['stress_level'],
                        'notification_state': notification_state,
                        'preceding_activities': preceding_events['activity'].tolist() if 'activity' in preceding_events.columns else [],
                        'location': event['location'] if pd.notna(event['location']) else 'unknown'
                    }
                    patterns.append(pattern)
    
    # Convert to DataFrame and handle empty patterns
    if patterns:
        return pd.DataFrame(patterns)
    else:
        print("Warning: No patterns found in the data")
        return pd.DataFrame(columns=['stress_level', 'notification_state', 'preceding_activities', 'location'])

# Analyze and visualize patterns
patterns_df = analyze_notification_stress_patterns(event_log_df)

if not patterns_df.empty:
    print("\nNotification-Stress Patterns:")
    print(patterns_df.groupby(['notification_state', 'stress_level']).size().unstack(fill_value=0))

    # Visualize the relationships
    plt.figure(figsize=(12, 6))
    sns.boxplot(data=patterns_df, x='notification_state', y='stress_level')
    plt.title('Stress Levels by Notification State')
    plt.xticks(rotation=45)
    plt.tight_layout()
    plt.savefig('notification_stress_patterns.png')
else:
    print("No patterns to visualize")

TypeError: 'bool' object is not iterable

In [None]:
# Discover process structure using Inductive Miner
from pm4py.algo.discovery.inductive import algorithm as inductive_miner
from pm4py.visualization.process_tree import visualizer as pt_visualizer

# Discover process tree using the standard inductive miner
process_tree = inductive_miner.apply(event_log)

# Visualize the process tree with simplified parameters
parameters = {
    "format": "png"
}
gviz = pt_visualizer.apply(process_tree, parameters=parameters)
pt_visualizer.save(gviz, "discovered_process_tree.png")

# Convert to Petri net
from pm4py.objects.conversion.process_tree import converter as pt_converter
net, initial_marking, final_marking = pt_converter.apply(process_tree)

### 3.4 Compare Discovered Model with Domain Knowledge

Let's compare the discovered model with our domain knowledge to:
1. Validate our assumptions about process structure
2. Identify any unexpected patterns
3. Understand the limitations of the discovered model

In [None]:
# Compare discovered model with expected behavior
discovered_transitions = set(t.name for t in net.transitions)
expected_transitions = set([
    # Physical activity transitions
    'start_light_pa', 'end_light_pa',
    'start_moderate-vigorous_pa', 'end_moderate-vigorous_pa',
    
    # Notification transitions
    'notification_received', 'notification_read',
    
    # Stress report transitions
    'report_stress_low', 'report_stress_medium', 'report_stress_high'
])

print("\nTransition Analysis:")
print("Expected transitions:", expected_transitions)
print("Discovered transitions:", discovered_transitions)
print("\nMissing transitions:", expected_transitions - discovered_transitions)
print("Unexpected transitions:", discovered_transitions - expected_transitions)

# Analyze model quality
from pm4py.algo.evaluation.replay_fitness import algorithm as replay_fitness
from pm4py.algo.analysis.woflan import algorithm as woflan

fitness = replay_fitness.apply(event_log, net, initial_marking, final_marking)
is_sound = woflan.apply(net, initial_marking, final_marking)

print("\nModel Quality Metrics:")
print(f"Fitness: {fitness}")
print(f"Soundness: {'Sound' if is_sound else 'Not Sound'}")