In [None]:
from math import floor
import pickle
import pandas as pd
import seaborn as sns
import numpy as np
import os
import matplotlib.pyplot as plt
from cortado_core.process_tree_utils.miscellaneous import get_height, get_number_nodes
from pm4py.objects.petri_net.importer.importer import deserialize as deserialize_pn

In [None]:
sns.set_theme(font="Arial", style="whitegrid", rc={"figure.figsize": (5, 4)})


folder_name = "rtfm"
folder_path = f"./results/{folder_name}"
pickle_files = [
    file for file in os.listdir(folder_path) if file.endswith("final.pickle")
]
pickle_files

In [None]:
with open(
    os.path.join(
        folder_path,
        [file_name for file_name in pickle_files if "inductive_miner" in file_name][0],
    ),
    "rb",
) as file:
    data = pickle.load(file)
    variants_percentages = [100 * i / (len(data)) for i in range(len(data))]
    variant_frequencies = np.array(
        list(map(lambda iteration: iteration["added_variant_frequency"], data))
    )
    trace_percentages = (variant_frequencies / variant_frequencies.sum() * 100).cumsum()

fitness_df = pd.DataFrame(
    {
        "% processed variants": variants_percentages,
        "% processed traces": trace_percentages,
    }
)
precision_df = pd.DataFrame(
    {
        "% processed variants": variants_percentages,
        "% processed traces": trace_percentages,
    }
)
f_measure_df = pd.DataFrame(
    {
        "% processed variants": variants_percentages,
        "% processed traces": trace_percentages,
    }
)
height_df = pd.DataFrame(
    {
        "% processed variants": variants_percentages,
        "% processed traces": trace_percentages,
    }
)
nodes_df = pd.DataFrame(
    {
        "% processed variants": variants_percentages,
        "% processed traces": trace_percentages,
    }
)
pn_size_df = pd.DataFrame(
    {
        "% processed variants": variants_percentages,
        "% processed traces": trace_percentages,
    }
)


for file_name in pickle_files:
    file_path = os.path.join(folder_path, file_name)
    # if len(file_path.split("__")) == 5:
    # log_name, approach, initial_method, result_type, ending = file_path.split("__")
    if len(file_path.split("__")) == 4:
        log_name, approach, initial_method, ending = file_path.split("__")
        initial_method = initial_method.split(".")[-1]
    with open(file_path, "rb") as file:
        data = pickle.load(file)
        nan_values = [None] * (len(variants_percentages) - len(data))
        fitness_ds = pd.Series(
            nan_values + list(map(lambda iteration: iteration["fitness"], data)),
            name=f"{approach}__{initial_method}",
        )
        precision_ds = pd.Series(
            nan_values + list(map(lambda iteration: iteration["precision"], data)),
            name=f"{approach}__{initial_method}",
        )
        f_measure_ds = pd.Series(
            nan_values + list(map(lambda iteration: iteration["f-measure"], data)),
            name=f"{approach}__{initial_method}",
        )
        height_ds = pd.Series(
            nan_values
            + list(
                map(
                    lambda iteration: get_height(iteration["output_tree"])
                    if iteration["output_tree"] is not None
                    else None,
                    data,
                )
            ),
            name=f"{approach}__{initial_method}",
        )
        nodes_ds = pd.Series(
            nan_values
            + list(
                map(
                    lambda iteration: get_number_nodes(iteration["output_tree"])
                    if iteration["output_tree"] is not None
                    else None,
                    data,
                )
            ),
            name=f"{approach}__{initial_method}",
        )
        pn_size_ds = pd.Series(
            nan_values
            + list(
                map(
                    lambda iteration: len(
                        deserialize_pn(iteration["output_model"])[0].places
                    )
                    + len(deserialize_pn(iteration["output_model"])[0].transitions)
                    if iteration["output_model"] is not None
                    else None,
                    data,
                )
            ),
            name=f"{approach}__{initial_method}",
        )

        fitness_df = pd.concat([fitness_df, fitness_ds], axis=1)
        precision_df = pd.concat([precision_df, precision_ds], axis=1)
        f_measure_df = pd.concat([f_measure_df, f_measure_ds], axis=1)
        height_df = pd.concat([height_df, height_ds], axis=1)
        nodes_df = pd.concat([nodes_df, nodes_ds], axis=1)
        pn_size_df = pd.concat([pn_size_df, pn_size_ds], axis=1)

dfs = [fitness_df, precision_df, f_measure_df, height_df, nodes_df, pn_size_df]
for df in dfs:
    for column in df.columns:
        df[column] = df[column].fillna(df["inductive_miner__"])

In [None]:
def plot_results(
    df, initial_method, y_label, x_axis="% processed variants", y_bottom_lim=0
):
    column_name_mapping = {
        f"incremental_lca_False__{initial_method}": "LCA-IPDA (w/o LCA lowering)",
        f"incremental_lca_True__{initial_method}": "LCA-IPDA (w LCA lowering)",
        f"naive__{initial_method}": "Naive IPDA",
        "inductive_miner__": "Inductive Miner (IM)",
        f"model_repair__{initial_method}": "Model Repair",
    }
    filtered_df = df.filter(items=list(column_name_mapping.keys()) + [x_axis])
    renamed_df = filtered_df.rename(columns=column_name_mapping)
    for col in column_name_mapping.values():
        sns.lineplot(
            renamed_df,
            x=x_axis,
            y=col,
            dashes=False,
            label=col,
            alpha=0.8,
        )
    plt.xlabel(x_axis)
    plt.ylabel(y_label)
    plt.xlim(-1, 101)
    plt.ylim(y_bottom_lim, 1)

In [None]:
def plot_tree_measure(df, initial_method, y_label, x_axis="% processed variants"):
    column_name_mapping = {
        f"incremental_lca_False__{initial_method}": "LCA-IPDA (w/o LCA lowering)",
        f"incremental_lca_True__{initial_method}": "LCA-IPDA (w LCA lowering)",
        f"naive__{initial_method}": "Naive IPDA",
        "inductive_miner__": "Inductive Miner (IM)",
    }
    filtered_df = df.filter(items=list(column_name_mapping.keys()) + [x_axis])
    renamed_df = filtered_df.rename(columns=column_name_mapping)
    for col in column_name_mapping.values():
        sns.lineplot(
            renamed_df,
            x=x_axis,
            y=col,
            dashes=False,
            label=col,
            alpha=0.8,
        )
    plt.xlabel(x_axis)
    plt.ylabel(y_label)
    plt.xlim(-1, 101)
    plt.ylim(bottom=1)

In [None]:
def plot_pn_measure(df, initial_method, y_label, x_axis="% processed variants"):
    column_name_mapping = {
        f"incremental_lca_False__{initial_method}": "LCA-IPDA (w/o LCA lowering)",
        f"incremental_lca_True__{initial_method}": "LCA-IPDA (w LCA lowering)",
        f"naive__{initial_method}": "Naive IPDA",
        "inductive_miner__": "Inductive Miner (IM)",
        f"model_repair__{initial_method}": "Model Repair",
    }
    filtered_df = df.filter(items=list(column_name_mapping.keys()) + [x_axis])
    renamed_df = filtered_df.rename(columns=column_name_mapping)
    for col in column_name_mapping.values():
        sns.lineplot(
            renamed_df,
            x=x_axis,
            y=col,
            dashes=False,
            label=col,
            alpha=0.8,
        )
    plt.xlabel(x_axis)
    plt.ylabel(y_label)
    plt.xlim(-1, 101)
    plt.ylim(bottom=1)

In [None]:
initial_methods = {
    "TOP_1": 0,
    "TOP_1_PERCENT": 1,
    "TOP_2_PERCENT": 2,
    "TOP_5_PERCENT": 5,
    "TOP_10_PERCENT": 10,
}


def create_plots(x_axis="% processed variants"):
    for initial_method, starting_percent in initial_methods.items():
        mask = f_measure_df["% processed variants"] >= starting_percent
        last_false_index = (mask == True).idxmax() - 1
        mask[last_false_index] = True

        plt.figure()
        min = (
            floor(
                precision_df.drop(
                    ["% processed variants", "% processed traces"], axis=1
                ).min(axis=None)
                * 10
            )
            / 10
        )
        plot_results(precision_df[mask], initial_method, "precision", x_axis, min)
        plt.savefig(
            f"figures/{folder_name}/{initial_method.lower()}_precision_{x_axis.split()[-1]}.png",
            bbox_inches="tight",
        )

        plt.figure()
        min = (
            floor(
                fitness_df.drop(
                    ["% processed variants", "% processed traces"], axis=1
                ).min(axis=None)
                * 10
            )
            / 10
        )
        plot_results(fitness_df[mask], initial_method, "fitness", x_axis, min)
        plt.savefig(
            f"figures/{folder_name}/{initial_method.lower()}_fitness_{x_axis.split()[-1]}.png",
            bbox_inches="tight",
        )

        plt.figure()
        min = (
            floor(
                f_measure_df.drop(
                    ["% processed variants", "% processed traces"], axis=1
                ).min(axis=None)
                * 10
            )
            / 10
        )
        plot_results(f_measure_df[mask], initial_method, "F-measure", x_axis, min)
        plt.savefig(
            f"figures/{folder_name}/{initial_method.lower()}_f-measure_{x_axis.split()[-1]}.png",
            bbox_inches="tight",
        )

        plt.figure()
        plot_tree_measure(height_df[mask], initial_method, "tree height", x_axis)
        plt.savefig(
            f"figures/{folder_name}/{initial_method.lower()}_heights_{x_axis.split()[-1]}.png",
            bbox_inches="tight",
        )

        plt.figure()
        plot_tree_measure(nodes_df[mask], initial_method, "no. of nodes", x_axis)
        plt.savefig(
            f"figures/{folder_name}/{initial_method.lower()}_nodes_{x_axis.split()[-1]}.png",
            bbox_inches="tight",
        )

        plt.figure()
        plot_pn_measure(pn_size_df[mask], initial_method, "Petri net size", x_axis)
        plt.savefig(
            f"figures/{folder_name}/{initial_method.lower()}_pn_size_{x_axis.split()[-1]}.png",
            bbox_inches="tight",
        )

In [None]:
create_plots("% processed variants")
create_plots("% processed traces")