## *import libraries*

In [1]:
import functions as f

import os
import numpy as np
import pandas as pd
from tabulate import tabulate
from sklearn.metrics import pairwise_distances_argmin_min

import pm4py
from pm4py.algo.evaluation import algorithm
from pm4py.visualization.petri_net import visualizer as pn_visualizer
from pm4py.algo.conformance.tokenreplay import algorithm as token_replay
from pm4py.objects.petri_net.importer.variants import pnml as pnml_importer
from pm4py.algo.conformance.alignments.petri_net import algorithm as alignments

import warnings
warnings.filterwarnings('ignore')

## *Parameter Setting*

In [2]:

params = {
    'encoding': 'freq',   # freq, dur
    'export_net': False,    # True, False
}

## *Read Event Log*

In [3]:
def UI_read_log(file_path, labels_path):
    cluster_labels_df = pd.read_csv(label_file_path) 
    total_pred = cluster_labels_df.shape[0]
    trace_id = np.arange(1, total_pred+1).tolist()
    cluster_labels_df['case:concept:name'] = trace_id
    cluster_labels_df['case:concept:name'] = cluster_labels_df['case:concept:name'].astype(str)

    event_log = f.read_log(file_path)
    event_log['time:timestamp'] = pd.to_datetime(event_log['time:timestamp'], errors='coerce')
    event_log['time:timestamp'].ffill(inplace=True)

    return event_log, cluster_labels_df

In [4]:
def process_segments(event_log, cluster_labels_df):
    # Given lists
    seg_pred = list(cluster_labels_df['prediction'])
    if all(val == -1 for val in seg_pred):
        seg_pred = [0] * len(seg_pred)
        # seg_pred = list(range(len(seg_pred)))

    cluster_labels_df['prediction'] = seg_pred   
    
    bounds = list(cluster_labels_df['bound'])
    # Initialize the third list with default values (-1)
    max_bound = max(bounds)+1
    segments = [-1] * max_bound  # Length of third list is max bound
    
    # Filling the third list based on bounds and corresponding labels
    start_idx = 0
    trace_id = 1
    for i, bound in enumerate(bounds):
        label = seg_pred[i]  # Get the corresponding label
        segments[start_idx:bound+1] = [trace_id] * (bound+1 - start_idx)  # Assign label to the range
        # third_list[start_idx:bound+1] = trace_id  # Assign label to the range
        start_idx = bound+1  # Update the start index for the next range
        trace_id = trace_id+1
        
    # Output the third list
    # print(segments)
    
    event_log['case:concept:name'] = segments
    event_log['case:concept:name'] = event_log['case:concept:name'].astype(str)
    return event_log, cluster_labels_df

In [5]:
def remove_noise_samples(encoded_log):
    features_log = f.clustering_preprocessing(encoded_log)
    if 'DBSCAN_Cluster' in encoded_log.columns:
        cluster_column = 'DBSCAN_Cluster'
    else:
        cluster_column = 'cluster_label'

    features_log[cluster_column] = encoded_log[cluster_column]
    
    # Find noise indices
    noise_indices = encoded_log[encoded_log[cluster_column] == -1].index

    if len(noise_indices) >= 1:
    
        # Find the nearest cluster for each noise point
        nearest_cluster_indices, _ = pairwise_distances_argmin_min(
            features_log.iloc[noise_indices, :-1],  # Exclude the cluster label column
            features_log[features_log[cluster_column] != -1].iloc[:, :-1]  # Exclude noise points
        )
        
        # Merge noise samples into the nearest cluster
        encoded_log.loc[noise_indices, cluster_column] = encoded_log.loc[
            encoded_log[cluster_column] != -1, cluster_column
        ].iloc[nearest_cluster_indices].values
        
        # Check if there are still noise points
        remaining_noise_indices = encoded_log[encoded_log[cluster_column] == -1].index
        if len(remaining_noise_indices) > 0:
            # print(f"\nThere are still {len(remaining_noise_indices)} noise points remaining.\n")
            pass
        else:
            # print("\nAll noise points have been assigned to the nearest cluster.\n")
            pass

        # print("\n", encoded_log[cluster_column].value_counts(), "\n")
    return encoded_log

In [6]:
def get_acitvity_log(session_log, encoded_log):
    cluster_column = 'cluster_label'
    clusters = encoded_log[cluster_column].unique()
    cluster_map = {c: f"routine_{c+1}" for c in clusters}
    
    merged_log = pd.merge(session_log, encoded_log, on=['case:concept:name', 'Session'], how='inner')
    merged_log = merged_log[['case:concept:name', 'Session', 'time:timestamp', 'concept:name', cluster_column]]
    
    merged_log[cluster_column] = merged_log[cluster_column].replace(cluster_map)
    return merged_log, cluster_map

## *Util Functions (Evaluation Scores)* 

In [7]:
# Extract transitions (activities) from both models to calculate Jaccard Coefficient
def extract_transitions(net):
    """ Extracts the transitions (activities) from the given Petri net model. """
    return {t.label for t in net.transitions if t.label is not None}


def get_fScore(fitness, precision):
    if fitness + precision == 0:
        return 0
    f_score = (2*fitness*precision)/(fitness+precision)
    return f_score


def get_JC(log, net, im, fm, routine_label):
    gt_routine_activities = extract_transitions(net)
    routine_activities = set(log['concept:name'])
    # print(gt_routine_activities, "\n")
    # print(routine_activities, "\n")
    # print("="*30)

    # Calculate the Jaccard Coefficient (Intersection over Union)
    intersection = routine_activities.intersection(gt_routine_activities)
    union = routine_activities.union(gt_routine_activities)
    jaccard_coefficient = len(intersection) / len(union)

    return jaccard_coefficient


def token_base_evaluation(log, net, im, fm):
    replayed_traces = token_replay.apply(log, net, im, fm)
    
    # Calculate Support (fraction of transitions supported by the log)
    activated_transitions = set()
    for trace in replayed_traces:
        for trans in trace['activated_transitions']:
            if trans is not None:  # Ensure the transition is valid
                activated_transitions.add(trans)
    total_transitions = len(net.transitions)
    support = len(activated_transitions) / total_transitions

    # Calculate Coverage
    covered_traces = sum([1 for trace in replayed_traces if trace['activated_transitions']])
    total_traces = len(log)
    coverage = covered_traces / total_traces

    return support, coverage
    

def alignment_base_evaluation(log, net, im, fm):
    # Apply alignment-based conformance checking
    aligned_traces = alignments.apply_log(log, net, im, fm)
    
    # Calculate Support (fraction of transitions supported by the log)
    activated_transitions = set()
    for trace in aligned_traces:
        for step in trace['alignment']:
            # Check if the step corresponds to a 'model move' (indicating a supported transition)
            if step[0] == step[1] and step[1] is not None:
                activated_transitions.add(step[1])
    total_transitions = len(net.transitions)
    support = len(activated_transitions) / total_transitions
    
    # Calculate Coverage (fraction of traces covered by at least one activated transition)
    covered_traces = sum(1 for trace in aligned_traces if any(step[0] == step[1] and step[1] is not None for step in trace['alignment']))
    total_traces = len(log)
    coverage = covered_traces / total_traces

    return support, coverage


def evaluate_routines(log, net, im, fm, token_base, routine_label):
    gt_routine_activities = extract_transitions(net)
    routine_activities = set(log['concept:name'])

    # case_id = log['case:concept:name'].unique()[0]
    # routine_length = len(log[log['case:concept:name'] == case_id])
    log_traces = len(log['case:concept:name'].unique())

    # Calculate the Jaccard Coefficient (Intersection over Union)
    intersection = routine_activities.intersection(gt_routine_activities)
    union = routine_activities.union(gt_routine_activities)
    jaccard_coefficient = len(intersection) / len(union)

    log = pm4py.convert_to_event_log(log)
    support, coverage = token_base_evaluation(log, net, im, fm) if token_base else alignment_base_evaluation(log, net, im, fm)

    # Calculate fitness, precision, generalization, and F-score
    q_o = algorithm.apply(log, net, im, fm)
    fitness = round(q_o['fitness']['average_trace_fitness'],3)
    prec = round(q_o['precision'],3)
    gen = round(q_o['generalization'],3)
    simp = round(q_o['simplicity'],3)
    f_score = get_fScore(fitness, prec)

    # Print metrics
    # print("\nEvaluation Scores:")
    # print("=====================")
    # print("Fitness: ", fitness)
    # print("Precision: ", prec)
    # print("Generalization: ", gen)
    # print("Simplicity: ", simp)
    
    # print(f"\nCoverage: {coverage:.2f}")
    # print(f"Support: {support:.2f}")
    # print(f"Jaccard Coefficient: {jaccard_coefficient:.2f}")

    return [routine_label, log_traces, len(routine_activities), fitness, prec, gen, simp, f_score, coverage, support, jaccard_coefficient]

# *Routine by Routine Evaluation*

In [8]:
def UI_logs_evaluate(activity_log, cluster_map):
    key_value = 1
    for key, routine_label in cluster_map.items():
        # print(f"Start Evaluating the {routine_label}:")
        routine_log = activity_log[activity_log[cluster_column] == routine_label]
        JC_scores = {}
        for file_name in os.listdir(cpn_ground_truth_dir):
            model_name = file_name.split('.')[0]
            cpn_ground_truth_path = os.path.join(cpn_ground_truth_dir, file_name)
            gt_net, gt_im, gt_fm = pnml_importer.import_net(cpn_ground_truth_path)
            JC = get_JC(routine_log, gt_net, gt_im, gt_fm, routine_label=routine_label)
            JC_scores[file_name] = JC
            # print(f"JC Score of {routine_label} is {JC} with {model_name}")
            
        # Find the key with the maximum value
        max_key = max(JC_scores, key=JC_scores.get)
        max_value = JC_scores[max_key]
    
        # Output the result
        # print(f"\nMaximum JC Score of {routine_label} is {max_value}")
    
        cpn_ground_truth_path = os.path.join(cpn_ground_truth_dir, max_key)
        gt_net, gt_im, gt_fm = pnml_importer.import_net(cpn_ground_truth_path)
        scores = evaluate_routines(routine_log, gt_net, gt_im, gt_fm, token_base=False, routine_label=routine_label)
        result_dic[key_value] = scores
        key_value += 1
        # print("\n\n\n")
    return result_dic

## *Summary of Evaluation Scores*

In [9]:
# ✅ Function to calculate F-score given fitness and precision
def calculate_f_score(fitness, precision):
    if fitness + precision == 0:
        return 0
    return 2 * (fitness * precision) / (fitness + precision)

# ✅ Function to calculate simple and weighted averages for a single result dictionary
def UI_logs_summary(result_dic):
    # Column names including Coverage, Support, and JC
    columns = ["Metrics", "Traces", "Trace Length", "Fitness", "Precision", 
               "Generalization", "Simplicity", "F-Score", "Coverage", "Support", "JC"]

    # Initialize lists for averages
    simple_average_values = ["Simple Average"]
    weighted_average_values = ["Weighted Average"]

    total_traces = sum(result_dic[key][1] for key in result_dic if isinstance(key, int))
    simple_average_values.append(total_traces)
    weighted_average_values.append(total_traces)

    mean_fitness = mean_precision = weighted_fitness = weighted_precision = 0

    # Iterate through metrics starting from index 2 ("Trace Length")
    for i in range(2, len(result_dic[1])):  
        values = [result_dic[key][i] for key in result_dic if isinstance(key, int)]

        simple_avg = sum(values) / len(values)
        weighted_avg = sum(result_dic[key][i] * result_dic[key][1] for key in result_dic if isinstance(key, int)) / total_traces

        simple_avg = round(simple_avg, 3)
        weighted_avg = round(weighted_avg, 3)
        
        if i == 3:  # Fitness column
            mean_fitness = simple_avg
            weighted_fitness = weighted_avg
        elif i == 4:  # Precision column
            mean_precision = simple_avg
            weighted_precision = weighted_avg

        simple_average_values.append(simple_avg)
        weighted_average_values.append(weighted_avg)

    # 🔢 Calculate and update the F-Score
    f_score_simple_avg = calculate_f_score(mean_fitness, mean_precision)
    f_score_weighted_avg = calculate_f_score(weighted_fitness, weighted_precision)

    # Update F-Score at the correct index (7th metric after 'Metrics' & 'Traces')
    simple_average_values[7] = round(f_score_simple_avg, 3)
    weighted_average_values[7] = round(f_score_weighted_avg, 3)

    # ➕ Insert average rows into result_dic
    next_key_simple = max([key for key in result_dic if isinstance(key, int)], default=0) + 1
    next_key_weighted = next_key_simple + 1

    result_dic[next_key_simple] = simple_average_values
    result_dic[next_key_weighted] = weighted_average_values

    # 📝 Convert to DataFrame and print
    result_df = pd.DataFrame.from_dict(result_dic, orient='index', columns=columns)
    result_df = result_df.set_index('Metrics')

    # print("\n✅ Per-Log Summary:")
    # print(tabulate(result_df, headers='keys', tablefmt='psql'))

    return result_dic, simple_average_values  # Return simple average row for aggregation

# 🚀 Function to handle cross-validation and final aggregation
def cross_validation_summary(cross_val_results):
    """Performs cross-validation, collects simple averages, and calculates final averages."""
    iter_results = {}
    simple_averages_list = []

    # 📊 Collect simple averages from each iteration
    for i, result_dic in enumerate(cross_val_results, start=1):
        _, simple_avg_row = UI_logs_summary(result_dic)
        iter_results[f"CV {i}"] = simple_avg_row[1:]  # Exclude 'Simple Average' label
        simple_averages_list.append(simple_avg_row[1:])

    # 🔎 Convert iteration results into DataFrame
    iter_columns = ["Traces", "Trace Length", "Fitness", "Precision", "Gen", 
                    "Simp", "F-Score", "Coverage", "Support", "JC"]
    iter_df = pd.DataFrame(iter_results, index=iter_columns).T

    print("\n✅ Cross-Validation Iteration Results:")
    print(tabulate(iter_df, headers='keys', tablefmt='psql'))

    # 🧮 Final Simple and Weighted Averages
    final_simple_avg = iter_df.mean().tolist()
    total_traces = iter_df["Traces"].sum()
    weighted_avg = [(iter_df[col] * iter_df["Traces"]).sum() / total_traces for col in iter_df.columns]

    # 📢 Final Aggregated Results
    final_df = pd.DataFrame(
        [final_simple_avg, weighted_avg],
        index=["Simple Average", "Weighted Average"],
        columns=iter_columns
    )

    print("\n🚀 Final Aggregated Results:")
    print(tabulate(final_df, headers='keys', tablefmt='outline'))

    return iter_df, final_df


## *main*

In [10]:
# Noise = 0.4

# for log_number in range(1, 10):
#     if log_number==7:
#         continue
#     cross_val_results = []
#     for variant in range(1, 11):
#         cpn_ground_truth_dir = f"GT_Models/log{log_number}/"
#         file_path = f"Transformed_Logs_and_Results/Our/Transformed_Log_With_Noise_{Noise}/log{log_number}/noisy_transform_log{log_number}_{variant}.xes"
#         label_file_path = f"Transformed_Logs_and_Results/arebmann/Transformed_Log_With_Noise_{Noise}/Discovered_Routines/log{log_number}/log{log_number}_{variant}_pred.csv"

#         result_dic = {'Metrics':['Routine', "Traces", "Length", 'Fitness','Precision','Generalization','Simplicity', 'F_Score', 'coverage', 'Support', 'JC'],}
        
#         event_log, cluster_labels_df = UI_read_log(file_path, label_file_path)
#         event_log, cluster_labels_df = process_segments(event_log, cluster_labels_df)
        
#         session_log = f.create_session(event_log)
#         encoded_log = f.freq_encoding(session_log)
        
#         cluster_column = "cluster_label"
#         encoded_log = pd.merge(encoded_log, cluster_labels_df, on=['case:concept:name'], how='inner')
#         encoded_log.rename(columns={'prediction': cluster_column}, inplace=True)
        
#         encoded_log = remove_noise_samples(encoded_log)
#         activity_log, cluster_map = get_acitvity_log(session_log, encoded_log)
        
#         result_dic = UI_logs_evaluate(activity_log, cluster_map)
        
#         cross_val_results.append(result_dic)
    
#     # Run cross-validation summary
#     iteration_results, final_averages = cross_validation_summary(cross_val_results)
    
#     results_df = pd.concat([iteration_results, final_averages])
    
#     outputfile = f"Transformed_Logs_and_Results/arebmann/Transformed_Log_With_Noise_{Noise}/Results"
#     if not os.path.exists(outputfile):
#         os.makedirs(outputfile)
#     output_file_name = f"log{log_number}_reuslts.csv"
#     results_df.to_csv(os.path.join(outputfile, output_file_name))

## *For Noise Free Logs*

In [None]:
Noise = 0

for log_number in range(7, 8):
    cpn_ground_truth_dir = f"GT_Models/log{log_number}/"
    file_path = f"Transformed_Logs_and_Results/Our/Transformed_Log_Without_Noise/transform_log{log_number}.xes"
    label_file_path = f"Transformed_Logs_and_Results/arebmann/Transformed_Log_With_Noise_{Noise}/Discovered_Routines/log{log_number}/log{log_number}_1_pred.csv"

    result_dic = {'Metrics':['Routine', "Traces", "Length", 'Fitness','Precision','Generalization','Simplicity', 'F_Score', 'coverage', 'Support', 'JC'],}
    
    event_log, cluster_labels_df = UI_read_log(file_path, label_file_path)
    event_log, cluster_labels_df = process_segments(event_log, cluster_labels_df)
    
    session_log = f.create_session(event_log)
    encoded_log = f.freq_encoding(session_log)
    
    cluster_column = "cluster_label"
    encoded_log = pd.merge(encoded_log, cluster_labels_df, on=['case:concept:name'], how='inner')
    encoded_log.rename(columns={'prediction': cluster_column}, inplace=True)
    
    encoded_log = remove_noise_samples(encoded_log)
    activity_log, cluster_map = get_acitvity_log(session_log, encoded_log)
    
    result_dic = UI_logs_evaluate(activity_log, cluster_map)
    result_dic, simple_average_values = UI_logs_summary(result_dic)

    result_df = pd.DataFrame(result_dic)
    result_df = result_df.set_index('Metrics')
    results_df = result_df.T
    
    outputfile = f"Transformed_Logs_and_Results/arebmann/Transformed_Log_With_Noise_{Noise}/Results"
    if not os.path.exists(outputfile):
        os.makedirs(outputfile)
    output_file_name = f"log{log_number}_reuslts.csv"
    results_df.to_csv(os.path.join(outputfile, output_file_name), index=False)

parsing log, completed traces :: 100%|████████████████████████████████████████████| 1500/1500 [00:05<00:00, 284.32it/s]
aligning log, completed variants ::   0%|                                                        | 0/3 [00:00<?, ?it/s]