In [None]:
import pandas as pd
import pm4py
import networkx as nx
import matplotlib.pyplot as plt
import seaborn as sns
from mlxtend.preprocessing import TransactionEncoder
from mlxtend.frequent_patterns import apriori, association_rules
from pm4py.objects.log.importer.xes import importer
from sklearn.model_selection import train_test_split
import graphviz
from pm4py.algo.evaluation.replay_fitness import algorithm as replay_fitness
from pm4py.algo.evaluation.precision import algorithm as precision_evaluator
from pm4py.algo.evaluation.generalization import algorithm as generalization_evaluator
from pm4py.algo.evaluation.simplicity import algorithm as simplicity_evaluator
from pm4py.algo.conformance.tokenreplay import algorithm as token_replay

# Data Reading And Cleaning

### Loading and checking if the data is ready for use

In [None]:
# Read the event log CSV file
csv_file_path = 'event_log.csv'  # Replace with your CSV file path
df = pd.read_csv(csv_file_path)

# Convert 'start_time' column to datetime if it's not already
df['start_time'] = pd.to_datetime(df['start_time'])

# Find events where start_time equals end_time
events_same_start_end = df[df['start_time'] == df['completion_time']]

# Get list of event names where start and end times are the same
event_names_same_start_end = events_same_start_end['event_label'].unique().tolist()

# Display or further process event names where start and end times are the same
print("Event names where start and end times are the same:")
print(event_names_same_start_end)

In [None]:
#Load the original CSV file
df = pd.read_csv('event_log.csv')

In [None]:
df.info()

In [None]:
# Check for null cells in the entire DataFrame
null_cells = df.isnull()
print(True in null_cells)

In [None]:
df.head(10)

### Prepering the dataframe

We will create two dataframes where one will be more accurate (we will join the resource column to the events column) and the second will be more general (remove the resource column)

the first general dataframe : only removing columns

In [None]:
df_general = df
df_general = df_general.drop(columns=['resource', 'task_id', 'diagnosis', 'completion_time'])
df_general.head(10)

the second more accurate dataframe : where we have a non None value we will merge it to the event_label value

In [None]:
# Define the function to modify the dataframe in place
def transform_event_label_and_resource(df):
    for index, row in df.iterrows():
        if row['resource'] != 'None':
            # Change the event_label to the resource value
            df.at[index, 'event_label'] = row['resource']
            # Change the resource to 'None'
            df.at[index, 'resource'] = 'None'
    return df

# Apply the transformation
df_accurate = transform_event_label_and_resource(df)

# Display the transformed dataframe
df_accurate.head(10)

In [None]:
# Remove the resource column
df_accurate = df_accurate.drop(columns=['resource', 'task_id', 'diagnosis', 'completion_time'])
df_accurate.head(10)

### Sorting to data to have a better look on it

sorting the general dataframe

In [None]:
sorted_df_general = df_general.sort_values(['case_id', 'start_time'], ascending=[True, True])
sorted_df_general.head(10)

sorting the more accurate dataframe

In [None]:
sorted_df_accurate = df_accurate.sort_values(['case_id', 'start_time'], ascending=[True, True])
sorted_df_accurate.head(10)

### Convert the data from CSV to XES

Converting both dataframes to a csv file

In [None]:
# conveerting the accurate dataframe
sorted_df_accurate.to_csv('accurate_event_log.csv', index=False)

# conveerting the general dataframe
sorted_df_general.to_csv('general_event_log.csv', index=False)

Convert from CSV format to XES format

for the general dataframe:

In [None]:
# Prepare the dataframe for conversion to XES
sorted_df_general.rename(columns={'case_id': 'case:concept:name', 'event_label': 'concept:name', 'start_time': 'time:timestamp'}, inplace=True)
sorted_df_general['time:timestamp'] = pd.to_datetime(sorted_df_general['time:timestamp'])

# Convert the dataframe to an XES event log
event_log = pm4py.format_dataframe(sorted_df_general, case_id='case:concept:name', activity_key='concept:name', timestamp_key='time:timestamp')
xes_event_log = pm4py.convert_to_event_log(event_log)

# Save the XES event log to a file
pm4py.write_xes(xes_event_log, 'general_event_log.xes')

for the accurate dataframe:

In [None]:
# Prepare the dataframe for conversion to XES
sorted_df_accurate.rename(columns={'case_id': 'case:concept:name', 'event_label': 'concept:name', 'start_time': 'time:timestamp'}, inplace=True)
sorted_df_accurate['time:timestamp'] = pd.to_datetime(sorted_df_accurate['time:timestamp'])

# Convert the dataframe to an XES event log
event_log = pm4py.format_dataframe(sorted_df_accurate, case_id='case:concept:name', activity_key='concept:name', timestamp_key='time:timestamp')
xes_event_log = pm4py.convert_to_event_log(event_log)

# Save the XES event log to a file
pm4py.write_xes(xes_event_log, 'accurate_event_log.xes')

# Data Organizing

##### We wanna know first what the activities we start and end with at our data

for the general dataframe:

In [None]:
log = importer.apply('general_event_log.xes')

log_start = pm4py.get_start_activities(log)

# Convert dictionary to DataFrame
df = pd.DataFrame(list(log_start.items()), columns=['Activity', 'Count'])

# Specify the file path for CSV
csv_file_path = 'general_log_start_activities.csv'

# Export to CSV
df.to_csv(csv_file_path, index=False)

In [None]:
log = importer.apply('general_event_log.xes')

# Get end activities
log_end = pm4py.get_end_activities(log)
# Convert dictionary to DataFrame
end_df = pd.DataFrame(list(log_end.items()), columns=['Activity', 'Count'])

# Specify the file path for end activities CSV
csv_file_path_end = 'general_log_end_activities.csv'
# Export end activities to CSV
end_df.to_csv(csv_file_path_end, index=False)

for the accurate dataframe:

In [None]:
log = importer.apply('accurate_event_log.xes')

log_start = pm4py.get_start_activities(log)

# Convert dictionary to DataFrame
df = pd.DataFrame(list(log_start.items()), columns=['Activity', 'Count'])

# Specify the file path for CSV
csv_file_path = 'accurate_log_start_activities.csv'

# Export to CSV
df.to_csv(csv_file_path, index=False)

In [None]:
log = importer.apply('accurate_event_log.xes')

# Get end activities
log_end = pm4py.get_end_activities(log)
# Convert dictionary to DataFrame
end_df = pd.DataFrame(list(log_end.items()), columns=['Activity', 'Count'])

# Specify the file path for end activities CSV
csv_file_path_end = 'accurate_log_end_activities.csv'
# Export end activities to CSV
end_df.to_csv(csv_file_path_end, index=False)

##### We now wanna give number to each event and to get information about the processes that we can see in the data

for the general dataframe:

In [None]:
# Mapping event labels to numbers
event_label_to_number = {}
current_number = 1  # Start with number 1

for event_label in sorted_df_general['concept:name'].unique():
    event_label_to_number[event_label] = current_number
    current_number += 1

# Create a new column 'number' based on event_label
sorted_df_general['number'] = sorted_df_general['concept:name'].map(event_label_to_number)

# Group by case_id and concatenate numbers into traces with spaces
traces = sorted_df_general.groupby('case:concept:name')['number'].apply(lambda x: ' '.join(map(str, x))).reset_index()

# Count occurrences of each trace
trace_counts = traces['number'].value_counts().reset_index()
trace_counts.columns = ['trace', 'count']

# Specify the file path for CSVs
csv_file_path_trace_counts = 'general_trace_counts.csv'
csv_file_path_case_traces = 'general_case_traces.csv'
csv_file_path_general_meanings = 'general_meanings.csv'

# Export trace counts to CSV
trace_counts.to_csv(csv_file_path_trace_counts, index=False)

# Export case_id and trace to CSV
traces.columns = ['case_id', 'trace']
traces.to_csv(csv_file_path_case_traces, index=False)

# Create DataFrame for event meanings
event_meanings = pd.DataFrame(list(event_label_to_number.items()), columns=['event', 'number'])
event_meanings = event_meanings[['number', 'event']]  # Reorder columns

# Export event meanings to CSV
event_meanings.to_csv(csv_file_path_general_meanings, index=False)

for the accurate dataframe:

In [None]:
# Mapping event labels to numbers
event_label_to_number = {}
current_number = 1  # Start with number 1

for event_label in sorted_df_accurate['concept:name'].unique():
    event_label_to_number[event_label] = current_number
    current_number += 1

# Create a new column 'number' based on event_label
sorted_df_accurate['number'] = sorted_df_accurate['concept:name'].map(event_label_to_number)

# Group by case_id and concatenate numbers into traces with spaces
traces = sorted_df_accurate.groupby('case:concept:name')['number'].apply(lambda x: ' '.join(map(str, x))).reset_index()

# Count occurrences of each trace
trace_counts = traces['number'].value_counts().reset_index()
trace_counts.columns = ['trace', 'count']

# Specify the file path for CSVs
csv_file_path_trace_counts = 'accurate_trace_counts.csv'
csv_file_path_case_traces = 'accurate_case_traces.csv'
csv_file_path_accurate_meanings = 'accurate_meanings.csv'

# Export trace counts to CSV
trace_counts.to_csv(csv_file_path_trace_counts, index=False)

# Export case_id and trace to CSV
traces.columns = ['case_id', 'trace']
traces.to_csv(csv_file_path_case_traces, index=False)

# Create DataFrame for event meanings
event_meanings = pd.DataFrame(list(event_label_to_number.items()), columns=['event', 'number'])
event_meanings = event_meanings[['number', 'event']]  # Reorder columns

# Export event meanings to CSV
event_meanings.to_csv(csv_file_path_accurate_meanings, index=False)

# Algorithms And Models

### Splitting the dataframes

for the general dataframe:

In [None]:
# Split the DataFrame into training and testing sets
general_train, general_test = train_test_split(sorted_df_general, test_size=0.4, random_state=42)

# Convert the dataframe to an XES event log
general_event_log_train = pm4py.format_dataframe(general_train, case_id='case:concept:name', activity_key='concept:name', timestamp_key='time:timestamp')
general_xes_event_log_train = pm4py.convert_to_event_log(general_event_log_train)

# Save the XES event log to a file
pm4py.write_xes(general_xes_event_log_train, 'general_event_log_train.xes')

# Convert the dataframe to an XES event log
general_event_log_test = pm4py.format_dataframe(general_test, case_id='case:concept:name', activity_key='concept:name', timestamp_key='time:timestamp')
general_xes_event_log_test = pm4py.convert_to_event_log(general_event_log_test)

# Save the XES event log to a file
pm4py.write_xes(general_xes_event_log_test, 'general_event_log_test.xes')

# Save the training and testing sets to separate CSV files
general_train.to_csv('general_train_event_log.csv', index=False)
general_test.to_csv('general_test_event_log.csv', index=False)

for the accurate dataframe:

In [None]:
# Split the DataFrame into training and testing sets
accurate_train, accurate_test = train_test_split(sorted_df_accurate, test_size=0.4, random_state=42)

# Convert the dataframe to an XES event log
accurate_event_log_train = pm4py.format_dataframe(accurate_train, case_id='case:concept:name', activity_key='concept:name', timestamp_key='time:timestamp')
accurate_xes_event_log_train = pm4py.convert_to_event_log(accurate_event_log_train)

# Save the XES event log to a file
pm4py.write_xes(accurate_xes_event_log_train, 'accurate_event_log_train.xes')

# Convert the dataframe to an XES event log
accurate_event_log_test = pm4py.format_dataframe(accurate_test, case_id='case:concept:name', activity_key='concept:name', timestamp_key='time:timestamp')
accurate_xes_event_log_test = pm4py.convert_to_event_log(accurate_event_log_test)

# Save the XES event log to a file
pm4py.write_xes(accurate_xes_event_log_test, 'accurate_event_log_test.xes')

# Save the training and testing sets to separate CSV files
accurate_train.to_csv('accurate_train_event_log.csv', index=False)
accurate_test.to_csv('accurate_test_event_log.csv', index=False)

### Algorithms

##### Alpha algorithm:

for the general dataframe:

In [None]:
# Load the event log
general_alpha_xes_event_log_train = pm4py.read_xes('general_event_log_train.xes')

# Apply the Alpha Miner algorithm to discover a Petri net
general_alpha_net, general_alpha_initial_marking, general_alpha_final_marking = alpha_miner.apply(general_alpha_xes_event_log_train)

# Save the Petri net to a PNML file
pm4py.write_pnml(general_alpha_net, general_alpha_initial_marking, general_alpha_final_marking, 'general_alpha_mined_petri_net.pnml')

# Visualize the Petri net
pm4py.view_petri_net(general_alpha_net, general_alpha_initial_marking, general_alpha_final_marking)

for the accurate dataframe:

In [None]:
# Load the event log
accurate_alpha_xes_event_log_train = pm4py.read_xes('accurate_event_log_train.xes')

# Apply the Alpha Miner algorithm to discover a Petri net
accurate_alpha_net, accurate_alpha_initial_marking, accurate_alpha_final_marking = alpha_miner.apply(accurate_alpha_xes_event_log_train)

# Save the Petri net to a PNML file
pm4py.write_pnml(accurate_alpha_net, accurate_alpha_initial_marking, accurate_alpha_final_marking, 'accurate_alpha_mined_petri_net.pnml')

# Visualize the Petri net
pm4py.view_petri_net(accurate_alpha_net, accurate_alpha_initial_marking, accurate_alpha_final_marking)

##### Inductive algorithm:

for the general dataframe:

In [None]:
general_log_inductive = pm4py.read_xes('general_event_log_train.xes')
general_inductive_net, general_inductive_initial_marking, general_inductive_final_marking = pm4py.discover_petri_net_inductive(general_log_inductive)
pm4py.write_pnml(general_inductive_net, general_inductive_initial_marking, general_inductive_final_marking, 'general_inductive_mined_petri_net.pnml')
pm4py.view_petri_net(general_inductive_net, general_inductive_initial_marking, general_inductive_final_marking)

for the accurate dataframe:

In [None]:
accurate_log_inductive = pm4py.read_xes('accurate_event_log_train.xes')
accurate_inductive_net, accurate_inductive_initial_marking, accurate_inductive_final_marking = pm4py.discover_petri_net_inductive(accurate_log_inductive)
pm4py.write_pnml(accurate_inductive_net, accurate_inductive_initial_marking, accurate_inductive_final_marking, 'accurate_inductive_mined_petri_net.pnml')
pm4py.view_petri_net(accurate_inductive_net, accurate_inductive_initial_marking, accurate_inductive_final_marking)

##### Heuristic algorithm:

for the general dataframe:

In [None]:
# Load the event log
general_heuristic_xes_event_log_train = pm4py.read_xes('general_event_log_train.xes')

# Apply the Heuristic Miner algorithm to discover a Petri net
general_heuristic_net, general_heuristic_initial_marking, general_heuristic_final_marking = pm4py.discover_petri_net_heuristics(general_heuristic_xes_event_log_train)

# Save the Petri net to a PNML file
pm4py.write_pnml(general_heuristic_net, general_heuristic_initial_marking, general_heuristic_final_marking, 'general_heuristic_mined_petri_net.pnml')
pm4py.view_petri_net(general_heuristic_net, general_heuristic_initial_marking, general_heuristic_final_marking)

for the accurate dataframe:

In [None]:
# Load the event log
accurate_heuristic_xes_event_log_train = pm4py.read_xes('accurate_event_log_train.xes')

# Apply the Heuristic Miner algorithm to discover a Petri net
accurate_heuristic_net, accurate_heuristic_initial_marking, accurate_heuristic_final_marking = pm4py.discover_petri_net_heuristics(accurate_heuristic_xes_event_log_train)

# Save the Petri net to a PNML file
pm4py.write_pnml(accurate_heuristic_net, accurate_heuristic_initial_marking, accurate_heuristic_final_marking, 'accurate_heuristic_mined_petri_net.pnml')
pm4py.view_petri_net(accurate_heuristic_net, accurate_heuristic_initial_marking, accurate_heuristic_final_marking)

# Conformence Checking

##### Variables for the model accuration

evaluate function:

In [None]:
# Define a function to evaluate performance using the test log
def evaluate_performance(log, net, initial_marking, final_marking):
    # Fitness
    fitness_value = replay_fitness.apply(log, net, initial_marking, final_marking)['averageFitness']
    
    # Precision
    precision_value = precision_evaluator.apply(log, net, initial_marking, final_marking)
    
    # Generalization
    generalization_value = generalization_evaluator.apply(log, net, initial_marking, final_marking)
    
    # Simplicity
    simplicity_value = simplicity_evaluator.apply(net)
    
    return {
        "fitness": fitness_value,
        "precision": precision_value,
        "generalization": generalization_value,
        "simplicity": simplicity_value
    }

for alpha algorithm

for the general dataframe:

In [None]:
general_alpha_performance = evaluate_performance(general_xes_event_log_test, general_alpha_net, general_alpha_initial_marking, general_alpha_final_marking)
print(general_alpha_performance)

for the accurate dataframe:

In [None]:
accurate_alpha_performance = evaluate_performance(accurate_xes_event_log_test, accurate_alpha_net, accurate_alpha_initial_marking, accurate_alpha_final_marking)
print(accurate_alpha_performance)

for inductive algorithm

for the general dataframe:

In [None]:
general_inductive_performance = evaluate_performance(general_xes_event_log_test, general_inductive_net, general_inductive_initial_marking, general_inductive_final_marking)
print(general_inductive_performance)

for the accurate dataframe:

In [None]:
accurate_inductive_performance = evaluate_performance(accurate_xes_event_log_test, accurate_inductive_net, accurate_inductive_initial_marking, accurate_inductive_final_marking)
print(accurate_inductive_performance)

for heuristic algorithm

for the general dataframe:

In [None]:
general_heuristic_performance = evaluate_performance(general_xes_event_log_test, general_heuristic_net, general_heuristic_initial_marking, general_heuristic_final_marking)
print(general_heuristic_performance)

for the accurate dataframe:

In [None]:
accurate_heuristic_performance = evaluate_performance(accurate_xes_event_log_test, accurate_heuristic_net, accurate_heuristic_initial_marking, accurate_heuristic_final_marking)
print(accurate_heuristic_performance)

##### Replay method

for alpha algorithm

for the general dataframe:

In [None]:
# Perform token-based replay conformance checking
replay_result = token_replay.apply(general_xes_event_log_test, general_alpha_net, general_alpha_initial_marking, general_alpha_final_marking)

# Convert replay results to a DataFrame manually
records = []
for case in replay_result:
    missing_tokens = case['missing_tokens']
    remaining_tokens = case['remaining_tokens']
    produced_tokens = case['produced_tokens']
    consumed_tokens = case['consumed_tokens']
    fit_traces = case['trace_is_fit']
    
    records.append({
        'missing_tokens': missing_tokens,
        'remaining_tokens': remaining_tokens,
        'produced_tokens': produced_tokens,
        'consumed_tokens': consumed_tokens,
        'fit_traces': fit_traces
    })

df = pd.DataFrame(records)

# Calculate the total missing, extra, produced, and consumed tokens
total_missing_tokens = df['missing_tokens'].sum()
total_extra_tokens = df['remaining_tokens'].sum()
total_produced_tokens = df['produced_tokens'].sum()
total_consumed_tokens = df['consumed_tokens'].sum()

# Print the summary
print(f"Total Missing Tokens: {total_missing_tokens}")
print(f"Total Remaining Tokens: {total_extra_tokens}")
print(f"Total Produced Tokens: {total_produced_tokens}")
print(f"Total Consumed Tokens: {total_consumed_tokens}")

fitness = 0.5 * (1 - total_missing_tokens / total_produced_tokens) + 0.5 * (1 - total_extra_tokens / total_consumed_tokens)
print(f"Fitness: {fitness:.4f}")

for the accurate dataframe:

In [None]:
# Perform token-based replay conformance checking
replay_result = token_replay.apply(accurate_xes_event_log_test, accurate_alpha_net, accurate_alpha_initial_marking, accurate_alpha_final_marking)

# Convert replay results to a DataFrame manually
records = []
for case in replay_result:
    missing_tokens = case['missing_tokens']
    remaining_tokens = case['remaining_tokens']
    produced_tokens = case['produced_tokens']
    consumed_tokens = case['consumed_tokens']
    fit_traces = case['trace_is_fit']
    
    records.append({
        'missing_tokens': missing_tokens,
        'remaining_tokens': remaining_tokens,
        'produced_tokens': produced_tokens,
        'consumed_tokens': consumed_tokens,
        'fit_traces': fit_traces
    })

df = pd.DataFrame(records)

# Calculate the total missing, extra, produced, and consumed tokens
total_missing_tokens = df['missing_tokens'].sum()
total_extra_tokens = df['remaining_tokens'].sum()
total_produced_tokens = df['produced_tokens'].sum()
total_consumed_tokens = df['consumed_tokens'].sum()

# Print the summary
print(f"Total Missing Tokens: {total_missing_tokens}")
print(f"Total Remaining Tokens: {total_extra_tokens}")
print(f"Total Produced Tokens: {total_produced_tokens}")
print(f"Total Consumed Tokens: {total_consumed_tokens}")

fitness = 0.5 * (1 - total_missing_tokens / total_produced_tokens) + 0.5 * (1 - total_extra_tokens / total_consumed_tokens)
print(f"Fitness: {fitness:.4f}")

for inductive algorithm

for the general dataframe:

In [None]:
# Perform token-based replay conformance checking
replay_result = token_replay.apply(general_xes_event_log_test, general_inductive_net, general_inductive_initial_marking, general_inductive_final_marking)

# Convert replay results to a DataFrame manually
records = []
for case in replay_result:
    missing_tokens = case['missing_tokens']
    remaining_tokens = case['remaining_tokens']
    produced_tokens = case['produced_tokens']
    consumed_tokens = case['consumed_tokens']
    fit_traces = case['trace_is_fit']
    
    records.append({
        'missing_tokens': missing_tokens,
        'remaining_tokens': remaining_tokens,
        'produced_tokens': produced_tokens,
        'consumed_tokens': consumed_tokens,
        'fit_traces': fit_traces
    })

df = pd.DataFrame(records)

# Calculate the total missing, extra, produced, and consumed tokens
total_missing_tokens = df['missing_tokens'].sum()
total_extra_tokens = df['remaining_tokens'].sum()
total_produced_tokens = df['produced_tokens'].sum()
total_consumed_tokens = df['consumed_tokens'].sum()

# Print the summary
print(f"Total Missing Tokens: {total_missing_tokens}")
print(f"Total Remaining Tokens: {total_extra_tokens}")
print(f"Total Produced Tokens: {total_produced_tokens}")
print(f"Total Consumed Tokens: {total_consumed_tokens}")

fitness = 0.5 * (1 - total_missing_tokens / total_produced_tokens) + 0.5 * (1 - total_extra_tokens / total_consumed_tokens)
print(f"Fitness: {fitness:.4f}")

for the accurate dataframe:

In [None]:
# Perform token-based replay conformance checking
replay_result = token_replay.apply(accurate_xes_event_log_test, accurate_inductive_net, accurate_inductive_initial_marking, accurate_inductive_final_marking)

# Convert replay results to a DataFrame manually
records = []
for case in replay_result:
    missing_tokens = case['missing_tokens']
    remaining_tokens = case['remaining_tokens']
    produced_tokens = case['produced_tokens']
    consumed_tokens = case['consumed_tokens']
    fit_traces = case['trace_is_fit']
    
    records.append({
        'missing_tokens': missing_tokens,
        'remaining_tokens': remaining_tokens,
        'produced_tokens': produced_tokens,
        'consumed_tokens': consumed_tokens,
        'fit_traces': fit_traces
    })

df = pd.DataFrame(records)

# Calculate the total missing, extra, produced, and consumed tokens
total_missing_tokens = df['missing_tokens'].sum()
total_extra_tokens = df['remaining_tokens'].sum()
total_produced_tokens = df['produced_tokens'].sum()
total_consumed_tokens = df['consumed_tokens'].sum()

# Print the summary
print(f"Total Missing Tokens: {total_missing_tokens}")
print(f"Total Remaining Tokens: {total_extra_tokens}")
print(f"Total Produced Tokens: {total_produced_tokens}")
print(f"Total Consumed Tokens: {total_consumed_tokens}")

fitness = 0.5 * (1 - total_missing_tokens / total_produced_tokens) + 0.5 * (1 - total_extra_tokens / total_consumed_tokens)
print(f"Fitness: {fitness:.4f}")

for heuristic algorithm

for the general dataframe:

In [None]:
# Perform token-based replay conformance checking
replay_result = token_replay.apply(general_xes_event_log_test, general_heuristic_net, general_heuristic_initial_marking, general_heuristic_final_marking)

# Convert replay results to a DataFrame manually
records = []
for case in replay_result:
    missing_tokens = case['missing_tokens']
    remaining_tokens = case['remaining_tokens']
    produced_tokens = case['produced_tokens']
    consumed_tokens = case['consumed_tokens']
    fit_traces = case['trace_is_fit']
    
    records.append({
        'missing_tokens': missing_tokens,
        'remaining_tokens': remaining_tokens,
        'produced_tokens': produced_tokens,
        'consumed_tokens': consumed_tokens,
        'fit_traces': fit_traces
    })

df = pd.DataFrame(records)

# Calculate the total missing, extra, produced, and consumed tokens
total_missing_tokens = df['missing_tokens'].sum()
total_extra_tokens = df['remaining_tokens'].sum()
total_produced_tokens = df['produced_tokens'].sum()
total_consumed_tokens = df['consumed_tokens'].sum()

# Print the summary
print(f"Total Missing Tokens: {total_missing_tokens}")
print(f"Total Remaining Tokens: {total_extra_tokens}")
print(f"Total Produced Tokens: {total_produced_tokens}")
print(f"Total Consumed Tokens: {total_consumed_tokens}")

fitness = 0.5 * (1 - total_missing_tokens / total_produced_tokens) + 0.5 * (1 - total_extra_tokens / total_consumed_tokens)
print(f"Fitness: {fitness:.4f}")

for the accurate dataframe:

In [59]:
# Perform token-based replay conformance checking
replay_result = token_replay.apply(accurate_xes_event_log_test, accurate_heuristic_net, accurate_heuristic_initial_marking, accurate_heuristic_final_marking)

# Convert replay results to a DataFrame manually
records = []
for case in replay_result:
    missing_tokens = case['missing_tokens']
    remaining_tokens = case['remaining_tokens']
    produced_tokens = case['produced_tokens']
    consumed_tokens = case['consumed_tokens']
    fit_traces = case['trace_is_fit']
    
    records.append({
        'missing_tokens': missing_tokens,
        'remaining_tokens': remaining_tokens,
        'produced_tokens': produced_tokens,
        'consumed_tokens': consumed_tokens,
        'fit_traces': fit_traces
    })

df = pd.DataFrame(records)

# Calculate the total missing, extra, produced, and consumed tokens
total_missing_tokens = df['missing_tokens'].sum()
total_extra_tokens = df['remaining_tokens'].sum()
total_produced_tokens = df['produced_tokens'].sum()
total_consumed_tokens = df['consumed_tokens'].sum()

# Print the summary
print(f"Total Missing Tokens: {total_missing_tokens}")
print(f"Total Remaining Tokens: {total_extra_tokens}")
print(f"Total Produced Tokens: {total_produced_tokens}")
print(f"Total Consumed Tokens: {total_consumed_tokens}")

fitness = 0.5 * (1 - total_missing_tokens / total_produced_tokens) + 0.5 * (1 - total_extra_tokens / total_consumed_tokens)
print(f"Fitness: {fitness:.4f}")

# To delete

In [None]:
# Read the trace counts from CSV
trace_counts = pd.read_csv('trace_counts.csv')

def remove_consecutive_duplicates(trace):
    numbers = trace.split()
    result = [numbers[0]]
    for num in numbers[1:]:
        if num != result[-1]:
            result.append(num)
    return ' '.join(result)

# Apply the function to remove consecutive duplicates
trace_counts['trace'] = trace_counts['trace'].apply(remove_consecutive_duplicates)

# Group by the trace and sum the counts
deduplicated_trace_counts = trace_counts.groupby('trace')['count'].sum().reset_index()

# Sort the DataFrame by count in descending order
deduplicated_trace_counts = deduplicated_trace_counts.sort_values(by='count', ascending=False)

# Export the deduplicated and sorted trace counts to CSV
csv_file_path_deduplicated = 'trace_counts.csv'
deduplicated_trace_counts.to_csv(csv_file_path_deduplicated, index=False)

# Select the top 7 most frequent traces
top_traces = deduplicated_trace_counts.head(10)

# Create a bar chart of the top 7 trace counts
plt.figure(figsize=(10, 7))
plt.bar(top_traces['trace'], top_traces['count'], color='skyblue')
plt.xlabel('Trace')
plt.ylabel('Count')
plt.title('Most Frequent Trace Counts')
plt.xticks(rotation=45, ha='right')  # Rotate x-axis labels for better readability
plt.tight_layout()  # Adjust layout to make room for rotated x-axis labels

# Save the bar chart to a file
bar_chart_path = 'top_trace_counts_bar_chart.png'
plt.savefig(bar_chart_path)

# Display the bar chart
plt.show()


In [None]:
# Count the number of events per case
case_event_counts = sorted_df['case:concept:name'].value_counts()

# Plot the distribution of case lengths
plt.figure(figsize=(10, 6))
sns.histplot(case_event_counts, kde=True, color='purple')
plt.title('Distribution of Events per Case')
plt.xlabel('Number of Events per Case')
plt.ylabel('Frequency')
plt.show()

# Display the case event counts
print(case_event_counts.describe())

In [None]:
# Select a few cases to visualize
sample_cases = sorted_df[sorted_df['case:concept:name'].isin(sorted_df['case:concept:name'].unique()[:5])]

# Plot the event sequences for the selected cases
plt.figure(figsize=(15, 8))
sns.lineplot(x='time:timestamp', y='concept:name', hue='case:concept:name', data=sample_cases, marker='o')
plt.title('Event Sequences for Sample Cases')
plt.xlabel('Timestamp')
plt.ylabel('Event Name')
plt.legend(title='Case ID')
plt.show()

In [None]:
# Prepare the data for sequence mining
cases = sorted_df.groupby('case:concept:name')['concept:name'].apply(list).values

# Encode the data
te = TransactionEncoder()
te_ary = te.fit(cases).transform(cases)
df = pd.DataFrame(te_ary, columns=te.columns_)

# Apply the apriori algorithm to find frequent itemsets
frequent_itemsets = apriori(df, min_support=0.3, use_colnames=True)

# Generate association rules
rules = association_rules(frequent_itemsets, metric="lift", min_threshold=1)

# Display the results
print(frequent_itemsets)
print(rules)