In [1]:
import pandas as pd
import pm4py
import os
from pm4py.algo.discovery.alpha import algorithm as alpha_miner
from pm4py.algo.discovery.heuristics import algorithm as heuristics_miner
from pm4py.algo.discovery.inductive import algorithm as inductive_miner
from pm4py.visualization.petri_net import visualizer as pn_visualizer
from pm4py.algo.evaluation.replay_fitness import algorithm as replay_fitness_evaluator
from pm4py.algo.evaluation.precision import algorithm as precision_evaluator
from pm4py.algo.evaluation.generalization import algorithm as generalization_evaluator
from pm4py.algo.evaluation.simplicity import algorithm as simplicity_evaluator
from pm4py.objects.conversion.process_tree import converter as pt_converter

import matplotlib.pyplot as plt

In [2]:
def prepare_dataset(file_path, approach='daily', session_gap_hours=4):
    """
    Prepare dataset using either daily or session-based approach.
    
    Parameters:
    file_path (str): Path to the input file
    approach (str): Either 'daily' or 'session'
    session_gap_hours (int): Hours gap to define a new session (only for session approach)
    
    Returns:
    pandas.DataFrame: Processed DataFrame with required columns
    """
    with open(file_path, "r") as file:
        lines = file.readlines()

    df = pd.DataFrame([line.split() for line in lines], 
                     columns=["Date", "Time", "org:resource", "lifecycle:transition", "concept:name"])
    
    # Convert to datetime and sort chronologically
    df["time:timestamp"] = pd.to_datetime(df["Date"] + " " + df["Time"], format='ISO8601')
    df = df.sort_values('time:timestamp')
    
    if approach == 'daily':
        # Daily approach: use calendar date as case identifier
        df['case:concept:name'] = df['time:timestamp'].dt.date.astype(str)
    
    elif approach == 'session':
        # Session approach: identify sessions based on time gaps and sleep activity
        time_diff = df['time:timestamp'].diff()
        new_session = (
            (time_diff > pd.Timedelta(hours=session_gap_hours)) | 
            ((df['concept:name'] == 'Sleep') & (df['lifecycle:transition'] == 'ON'))
        )
        df['case:concept:name'] = new_session.cumsum().astype(str)
    
    else:
        raise ValueError("approach must be either 'daily' or 'session'")
    
    # Keep required columns in correct order and drop unused ones
    df = df[[
        'case:concept:name',
        'time:timestamp',
        'concept:name',
        'org:resource',
        'lifecycle:transition'
    ]]
    
    # Count distinct values for each attribute
    # distinct_counts = df.nunique()
    # print("Attributes counts:")
    # display(distinct_counts.to_frame().T)
    
    return df

In [3]:
def apply_process_mining(log):

    alpha_net, alpha_initial_marking, alpha_final_marking = alpha_miner.apply(log)
    heuristic_net, heu_initial_marking, heu_final_marking = heuristics_miner.apply(log)
    inductive_tree = inductive_miner.apply(log)
    inductive_net, ind_initial_marking, ind_final_marking = pt_converter.apply(inductive_tree)

    models = {
        'Alpha': (alpha_net, alpha_initial_marking, alpha_final_marking),
        'Heuristic': (heuristic_net, heu_initial_marking, heu_final_marking),
        'Inductive': (inductive_net, ind_initial_marking, ind_final_marking)
    }
    
    return models

In [4]:
def calculate_metrics(log, models):
    metrics = {}
    
    for name, (net, initial_marking, final_marking) in models.items():
        fitness = replay_fitness_evaluator.apply(log, net, initial_marking, final_marking)
        precision = precision_evaluator.apply(log, net, initial_marking, final_marking)
        generalization = generalization_evaluator.apply(log, net, initial_marking, final_marking)
        simplicity = simplicity_evaluator.apply(net)
        
        metrics[name] = {
            'Fitness': fitness['average_trace_fitness'],
            'Precision': precision,
            'Generalization': generalization,
            'Simplicity': simplicity
        }
    
    return metrics

In [5]:
def plot_metrics_comparison(metrics, dataset_approach, figures_folder):
    # Convert metrics to DataFrame
    metrics_df = pd.DataFrame(metrics).T
    
    # Create figure and axis
    _, ax = plt.subplots(figsize=(12, 6))
    
    # Plot bars
    metrics_df.plot(kind='bar', width=0.8, ax=ax)
    
    # Add value labels on top of each bar
    for container in ax.containers:
        ax.bar_label(container, fmt='%.3f', padding=3)
    
    # Customize plot
    plt.title(f'Metrics Comparison - {dataset_approach} approach')
    plt.xlabel('Miners')
    plt.ylabel('Score')
    plt.legend(title='Metrics', bbox_to_anchor=(1.05, 1), loc='upper left')
    
    # Adjust layout to prevent label cutoff
    plt.subplots_adjust(right=0.85, bottom=0.15)
    
    # Save plot
    output_path = os.path.join(figures_folder, f"metrics_comparison_{dataset_approach}.png")
    plt.savefig(output_path, bbox_inches='tight', dpi=300)
    plt.close()
    
    # Print numerical values in console
    # print("\nNumerical Metrics:")
    # print(metrics_df.round(3).to_string())
    return metrics_df

In [6]:
def visualize_petri_nets(models, dataset_approach, figures_folder):
    os.makedirs(figures_folder, exist_ok=True)
    for name, (net, initial_marking, final_marking) in models.items():
        gviz = pn_visualizer.apply(net, initial_marking, final_marking)
        output_path = os.path.join(figures_folder, f"petri_net_{name.lower()}_{dataset_approach}.png")
        pn_visualizer.save(gviz, output_path)

In [7]:
def analyze_user_habits(log, dataset_approach, processed_folder):
    # Extract activity patterns
    activities_by_day = {}
    for trace in log:
        day = trace.attributes['concept:name']
        activities = [(event['concept:name'], event['lifecycle:transition']) for event in sorted(trace, key=lambda x: x['time:timestamp'])]
        activities_by_day[day] = activities
    
    # Analyze common patterns
    common_sequences = {}
    
    for day, activities in activities_by_day.items():
        current_activity = None
        sequence_parts = []

        for activity, state in activities:
            # Only add new activities when they start (ON state)
            if current_activity != activity and state == 'ON':
                sequence_parts.append(activity)
                current_activity = activity
        
        # Only create sequence if there are activities
        if sequence_parts:
            sequence = ' -> '.join(sequence_parts)
            common_sequences[sequence] = common_sequences.get(sequence, 0) + 1

    # Sort by frequency
    sorted_sequences = dict(sorted(common_sequences.items(),  key=lambda x: x[1], reverse=True))

    output_file = os.path.join(processed_folder, f"user_habits_analysis_{dataset_approach}.txt")

    with open(output_file, 'w', encoding='utf-8') as f:
        for sequence, count in sorted(sorted_sequences.items(), key=lambda x: x[1], reverse=True):
            f.write(f'Frequency: {count}\nSequence: {sequence}\n\n')
    
    return sorted_sequences

In [8]:
figures_folder = os.path.join('resources', 'figures')

In [9]:
# Load dataset

input_file = os.path.join('data', 'raw', 'tm001.txt')

dataset_approaches = ["daily", "session"]

for ds_approach in dataset_approaches:

    df = prepare_dataset(input_file, approach=ds_approach)
    df_stratified = df.groupby('concept:name').apply(lambda x: x.sample(5)).reset_index(drop=True)

    # Convert to event log
    log = pm4py.convert_to_event_log(df_stratified)

    # Apply process mining
    models = apply_process_mining(log)

    # Calculate metrics
    metrics = calculate_metrics(log, models)

    # Plot comparisons
    plot_metrics_comparison(metrics, ds_approach, figures_folder)

    # Visualize Petri nets
    visualize_petri_nets(models, ds_approach, figures_folder)

    # Analyze user habits
    user_habits_path = os.path.join('data', 'processed')
    habits = analyze_user_habits(log, ds_approach, user_habits_path)

  df_stratified = df.groupby('concept:name').apply(lambda x: x.sample(5)).reset_index(drop=True)


aligning log, completed variants ::   0%|          | 0/54 [00:00<?, ?it/s]

computing precision with alignments, completed variants ::   0%|          | 0/88 [00:00<?, ?it/s]

replaying log with TBR, completed traces ::   0%|          | 0/54 [00:00<?, ?it/s]

aligning log, completed variants ::   0%|          | 0/54 [00:00<?, ?it/s]

computing precision with alignments, completed variants ::   0%|          | 0/88 [00:00<?, ?it/s]

replaying log with TBR, completed traces ::   0%|          | 0/54 [00:00<?, ?it/s]

aligning log, completed variants ::   0%|          | 0/54 [00:00<?, ?it/s]

computing precision with alignments, completed variants ::   0%|          | 0/88 [00:00<?, ?it/s]

replaying log with TBR, completed traces ::   0%|          | 0/54 [00:00<?, ?it/s]

  df_stratified = df.groupby('concept:name').apply(lambda x: x.sample(5)).reset_index(drop=True)


aligning log, completed variants ::   0%|          | 0/45 [00:00<?, ?it/s]

computing precision with alignments, completed variants ::   0%|          | 0/87 [00:00<?, ?it/s]

replaying log with TBR, completed traces ::   0%|          | 0/45 [00:00<?, ?it/s]

aligning log, completed variants ::   0%|          | 0/45 [00:00<?, ?it/s]

computing precision with alignments, completed variants ::   0%|          | 0/87 [00:00<?, ?it/s]

replaying log with TBR, completed traces ::   0%|          | 0/45 [00:00<?, ?it/s]

aligning log, completed variants ::   0%|          | 0/45 [00:00<?, ?it/s]

computing precision with alignments, completed variants ::   0%|          | 0/87 [00:00<?, ?it/s]

replaying log with TBR, completed traces ::   0%|          | 0/45 [00:00<?, ?it/s]