In [None]:
# Build-in imports
import os
import sys
import math
import random
from pathlib import Path

# 3rd party imports
import json
import pandas as pd
import pm4py
from pm4py.objects.log.obj import EventStream, EventLog
from pm4py.objects.conversion.process_tree import converter as pt_converter
from pm4py.algo.evaluation.replay_fitness import algorithm as fitness_alg
from pm4py.algo.evaluation.precision import algorithm as precision_alg
from pm4py.algo.evaluation.simplicity import algorithm as simplicity_alg
from pm4py.algo.evaluation.generalization import algorithm as generalization_alg

# DPIM imports (from the DPIM repo)
sys.path.insert(0, os.path.abspath("./DPIM")) # Fixes an issue with importing

from DPIM.main import DPIM
from DPIM.utils import eventLog_parsing

In [None]:
def get_event_log_hyperparameters() -> dict:
    df_raw = pd.read_excel(
        "./DPIM/evaluation_results/Evaluation_Results_DPIM.xlsx",
        header=[0, 1]  # two-row header
    )

    df = df_raw.copy()

    df.columns = [
        f"{a}_{b}".strip("_")
        for a, b in df.columns
    ]

    df = df.dropna(subset=["Event Logs_Event Log"])
    df = df.reset_index(drop=True)

    event_log_hyperparameters: dict = {}

    directory: str = "./Evaluation Logs"
    for file in os.listdir(directory):
        if not file.endswith(".gz"):
            continue

        log_name: str = Path(file).name.removesuffix(".xes.gz")
        row = df.loc[df["Event Logs_Event Log"] == log_name]

        lower = int(row["Hyperparameters_Lower"].iloc[0])
        upper = int(row["Hyperparameters_Upper"].iloc[0])
        fitness_threshold = float(row["Hyperparameters_Fitness"].iloc[0])
        
        event_log_hyperparameters[log_name] = {
            "lower_bound": lower,
            "upper_bound": upper,
            "fitness_threshold": fitness_threshold
        }
    
    return event_log_hyperparameters

In [None]:
def dpim_tree_from_log(log, dpim_cfg: dict):
    """
    Build a PM4Py ProcessTree using DPIM for a given PM4Py EventLog.
    
    Args:
        log: PM4Py EventLog object
        dpim_cfg: Configuration dict with keys:
          - no_dp: bool (if True => epsilon=100000, DP disabled)
          - epsilon: float (privacy parameter, default 1.0)
          - fitness_threshold: float (minimum fitness, default 0.95)
          - lower: int | None (lower bound for DFR selection)
          - upper: int | None (upper bound for DFR selection)
    
    Returns:
        pm4py ProcessTree object
        
    Raises:
        RuntimeError: If DPIM fails to construct a valid tree
        ValueError: If epsilon is invalid when DP is enabled
    """
    no_dp = bool(dpim_cfg.get("no_dp", False))
    epsilon = float(dpim_cfg.get("epsilon", 1.0))
    fitness_threshold = float(dpim_cfg.get("fitness_threshold", 0.95))
    lower = dpim_cfg.get("lower", None)
    upper = dpim_cfg.get("upper", None)

    # Validate epsilon
    if not no_dp and epsilon <= 0:
        raise ValueError("epsilon must be positive when DP is enabled")

    # DPIM preprocessing (as used in main.py)
    permutations, traceList, num_acts = eventLog_parsing.xesFile().createPermutations_XES(event_log=log)

    # Bounds: mirror the logic from main.py defaults
    if lower is None:
        lower = num_acts
    if upper is None:
        upper = (num_acts ** 2) - 1

    # Apply the same sanity constraints main.py applies
    if lower < num_acts or lower >= upper or lower > (num_acts**2) - 1:
        lower = num_acts
    if upper > (num_acts**2) - 1 or upper <= lower or upper < num_acts:
        upper = (num_acts ** 2) - 1

    miner = DPIM()
    miner.fit_trehsold = fitness_threshold
    miner.lower_bound = lower
    miner.upper_bound = upper

    if no_dp:
        miner.DP = False
        eps = 100_000.0
    else:
        miner.DP = True
        eps = epsilon

    tree = miner.create_tree(
        permutations=permutations,
        traceList=traceList,
        epsilon=eps,
        event_log=log
    )

    # Handle return values
    if tree is False:
        raise RuntimeError(
            "DPIM rejection sampling did not find a tree meeting the fitness threshold. "
            "Try lowering fitness_threshold or increasing epsilon."
        )
    if tree is None:
        raise RuntimeError(
            "DPIM tree construction failed (likely due to recursion limit). "
            "The model may be too complex for the selected edges."
        )

    return tree

In [None]:
def compute_metrics(log, mode: str = "pm4py_im", dpim_cfg: dict | None = None) -> list[float]:
    """
    This function takes a log for which a process tree is discovered and metrics are computed.
    The log is a required parameter and the rest are optional. If only a log is provided the standard IM
    via `pm4py` is used. Otherwise, the DPIM process tree can be build with either privacy or no privacy. 
    
    :param log: The Event Log for which metrics like fitness, precision, etc are computed.
    :param mode: 'pm4py_im' (baseline inductive miner via PM4Py), 'dpim' (DPIM process tree; configure via dpim_cfg)
    :param dpim_cfg: Contains values like epsilon, fitness threshold, lower- and upper bound.
    :return: Returns a list of metrics in this order - fitness, precision, simplicity, generalization.
    :rtype: list[float]
    """
    
    # 1) Discover a process tree depending on mode
    if mode == "pm4py_im":
        tree = pm4py.discover_process_tree_inductive(log)
    elif mode == "dpim":
        if dpim_cfg is None:
            dpim_cfg = {}
        tree = dpim_tree_from_log(log, dpim_cfg)
    else:
        raise ValueError(f"Unknown mode='{mode}'. Use 'pm4py_im' or 'dpim'.")
    
    # 2) Convert tree -> Petri net
    net, im, fm = pm4py.convert_to_petri_net(tree)
    
    # 3) Compute metrics
   
    # Fitness should be calculated when privacy is selected, otherwise no (see xlsx results in DPIM)

    """ 
    This applies for both IM via pm4py (mode=="pm4py") and
    in the case where DPIM is used without privacy (mode=="dpim" and no_dp==True")
    """
    fitness = 1
    
    # Differentially Private Inductive Miner with privacy (epsilon != 100_000)
    if dpim_cfg is not None and dpim_cfg["no_dp"] is False:
        fitness = pm4py.fitness_token_based_replay(log, net, im, fm)['log_fitness']
        
    precision = pm4py.precision_token_based_replay(log, net, im, fm)  
    simplicity = simplicity_alg.apply(net)                    
    generalization = generalization_alg.apply(log, net, im, fm)

    return fitness, precision, simplicity, generalization

In [None]:
"""
Iterate over all Event Logs used for evaluation in the DPIM paper and compute metrics for all variations:

IM using pm4py, DPIM with no privacy, DPIM with epsilon=3, DPIM with epsilon=1, DPIM with epsilon=0.1
"""

hyperparameters: dict = get_event_log_hyperparameters()
evaluation_results: list[dict] = []

directory: str = "./Evaluation Logs"
log_files = [f for f in os.listdir(directory) if f.endswith(".gz")]

for idx, file in enumerate(log_files, 1):
    filename: str = os.path.join(directory, file)
    log_name: str = Path(file).name.removesuffix(".xes.gz")
    
    print(f"============== PROCESSING {file} ==============")
    
    try:
        # Load the full log (*.xes.gz)
        log = pm4py.read_xes(filename)

        # Sample the data, since it's too big and it takes long time to compute   
        sampling_percent: float = 0.1 # Take 10% percent of each event log randomly     
        sample_size: int = math.ceil(len(log) * sampling_percent)
        sampled_traces = random.sample(log, sample_size)
        event_stream = EventStream(sampled_traces)
        sampled_log = EventLog(event_stream)
        
        # --- PM4Py Inductive Miner ---
        pm4py_fitness, pm4py_precision, pm4py_simplicity, pm4py_generalization = compute_metrics(sampled_log)
        
        # --- DPIM (no privacy) --- (Tests correctness compared to IM)
        no_dp_fitness, no_dp_precision, no_dp_simplicity, no_dp_generalization = compute_metrics(
            sampled_log,
            mode="dpim",
            dpim_cfg={
                "no_dp": True,
                "lower": math.ceil(hyperparameters[log_name]["lower_bound"] * sampling_percent),
                    "upper": math.ceil(hyperparameters[log_name]["upper_bound"] * sampling_percent),
                "fitness_threshold": hyperparameters[log_name]["fitness_threshold"]
            }
        )
        
        # --- DPIM with privacy --- (Tests privacy gain and utility loss)
        epsilon_values = [3., 1., 0.1] # The values explicitly used in the evaluation results in the paper
        epsilon_values_results = {}
        for eps in epsilon_values:
            dp_fitness, dp_precision, dp_simplicity, dp_generalization = compute_metrics(
                sampled_log,
                mode="dpim",
                dpim_cfg={
                    "no_dp": False,
                    "epsilon": eps,
                    "lower": math.ceil(hyperparameters[log_name]["lower_bound"] * sampling_percent),
                    "upper": math.ceil(hyperparameters[log_name]["upper_bound"] * sampling_percent),
                    "fitness_threshold": hyperparameters[log_name]["fitness_threshold"]
                }
            )

            epsilon_values_results[f"eps_{eps:g}"] = {
                "fitness": round(dp_fitness, 4),
                "precision": round(dp_precision, 4),
                "simplicity": round(dp_simplicity, 4),
                "generalization": round(dp_generalization, 4),
            }
            
        evaluation_results.append({
            "file": file,

            "pm4py_im": {
                "fitness": round(pm4py_fitness, 4),
                "precision": round(pm4py_precision, 4),
                "simplicity": round(pm4py_simplicity, 4),
                "generalization": round(pm4py_generalization, 4),
            },

            "dpim_no_dp": {
                "fitness": round(no_dp_fitness, 4),
                "precision": round(no_dp_precision, 4),
                "simplicity": round(no_dp_simplicity, 4),
                "generalization": round(no_dp_generalization, 4),
            },
            
            "dpim_dp": epsilon_values_results
        })

        print(f"============== METRICS FOR {file} computed ==============")
    except KeyboardInterrupt as ki:
        print(f"\n✗ PROGRAM WAS INTERRUPTED ...")
        break
    except Exception as e:
        print(f"\n✗ FAILED to process {file}: {e}")
        break

In [None]:
# Save results
with open("evaluation_results.json", "w") as f:
    json.dump(evaluation_results, f, indent=2)