In [None]:
import pm4py
from tokenreplay import TokenReplay
from comparison import ModelComparator
from practical.ProcessMining.group1.shared.utils import event_log_to_dataframe, read_txt_test_logs
from practical.ProcessMining.group1.shared import utils
from practical.ProcessMining.group1.shared.visualizer import Visualizer

In [None]:
def get_logs(file_path):
   return utils.import_csv(file_path)

In [None]:
def compare_real_logs():
    logs = get_logs("../shared/example_files/DomesticDeclarations_cleansed.csv")
    logs = event_log_to_dataframe(logs)
    logs = pm4py.format_dataframe(logs, case_id='case_id', activity_key='activity', timestamp_key='timestamp')
    # alpha
    net_alpha, im_alpha, fm_alpha = pm4py.discover_petri_net_alpha(logs)

    # heuristic
    net_heuristic, im_heuristic, fm_heuristic = pm4py.discover_petri_net_heuristics(logs)

    # im
    net_inductive, im_inductive, fm_inductive = pm4py.discover_petri_net_inductive(logs)

    # #inductive inf
    net_inductive_inf, im_inductive_inf, fm_inductive_inf = pm4py.discover_petri_net_inductive(logs,noise_threshold=0.7)

    m1 = TokenReplay(logs, net_alpha, im_alpha, fm_alpha, "AM")
    m2 = TokenReplay(logs, net_heuristic, im_heuristic, fm_heuristic, "HM")
    m3 = TokenReplay(logs, net_inductive, im_inductive, fm_inductive, "IM")
    m4 = TokenReplay(logs, net_inductive_inf, im_inductive_inf, fm_inductive_inf, "IMi")

    comparator = ModelComparator([m1, m2, m3, m4])

    # print("===== all models values for log:", log_key, "=====")
    models_values = comparator.get_models_values()
    print('models_values ======', models_values)
    # print("=============================\n")

    print("===== Pareto Efficient Models for log:", "=====")
    results = comparator.run(x_dimension='f', y_dimension='s')
    # results_dict[log_key] = results
    print(results)
    print("=============================\n")
    get_visualizer(m1, net_alpha, im_alpha, fm_alpha)
    get_visualizer(m2, net_heuristic, im_heuristic, fm_heuristic)
    get_visualizer(m3, net_inductive, im_inductive, fm_inductive)
    get_visualizer(m4, net_inductive_inf, im_inductive_inf, fm_inductive_inf)

In [None]:
def compare_all_logs():
    logs = read_txt_test_logs("../shared/example_files/simple_event_logs_modified.txt")

    results_dict = {}

    for log_key, log in logs.items():
        # print(f"Processing log: {log_key}")
        log_df = event_log_to_dataframe(log)
        # print('log_df =====', log_df)
        log_df = pm4py.format_dataframe(log_df, case_id='case_id', activity_key='activity', timestamp_key='timestamp')

        # alpha
        net_alpha, im_alpha, fm_alpha = pm4py.discover_petri_net_alpha(log_df)

        # heuristic
        net_heuristic, im_heuristic, fm_heuristic = pm4py.discover_petri_net_heuristics(log_df)

        # im
        net_inductive, im_inductive, fm_inductive = pm4py.discover_petri_net_inductive(log_df)

        # #inductive inf
        net_inductive_inf, im_inductive_inf, fm_inductive_inf = pm4py.discover_petri_net_inductive(log_df,noise_threshold=0.7)

        m1 = TokenReplay(log_df, net_alpha, im_alpha, fm_alpha, "AM")
        m2 = TokenReplay(log_df, net_heuristic, im_heuristic, fm_heuristic, "HM")
        m3 = TokenReplay(log_df, net_inductive, im_inductive, fm_inductive, "IM")
        m4 = TokenReplay(log_df, net_inductive_inf, im_inductive_inf, fm_inductive_inf, "IMi")

        comparator = ModelComparator([m1, m2, m3, m4])

        # print("===== all models values for log:", log_key, "=====")
        models_values = comparator.get_models_values()
        print('models_values ======', models_values)
        # print("=============================\n")

        print("===== Pareto Efficient Models for log:", log_key, "=====")
        results = comparator.run(x_dimension='f', y_dimension='s')
        # results_dict[log_key] = results
        print(results)
        print("=============================\n")

    # Print or return all results after processing all logs
    # for log_key, results in results_dict.items():
        # print(f"Results for log {log_key}:")
        # print(results)
        # print("=============================\n")

In [None]:
def get_visualizer(token_replay, net, initial_marking, final_marking):
    vizard = Visualizer()
    places = net.places
    print('places:', places)
    tokens = token_replay.get_tokens()
    print('tokens ==========', tokens)
    graph_fitness = vizard.build_petri_net(net, initial_marking, final_marking, tokens)
    print('graph_fitness ===', graph_fitness)
    vizard.save(graph_fitness,'graph_fitness')

In [None]:
compare_all_logs()

In [None]:
compare_real_logs()