## Section A: Imports

In [None]:
!pip install pm4py optuna

In [None]:
import random
import time
import numpy as np
import pm4py
import os
from pm4py.algo.discovery.inductive import algorithm as inductive_miner
from pm4py.objects.conversion.process_tree import converter as pt_converter
from pm4py.algo.filtering.log.variants import variants_filter
from pm4py.algo.filtering.log.attributes import attributes_filter
from pm4py.algo.filtering.log.start_activities import start_activities_filter
from pm4py.algo.filtering.log.end_activities import end_activities_filter
from pm4py.algo.conformance.tokenreplay import algorithm as token_replay
from pm4py.algo.evaluation.generalization import algorithm as generalization_algorithm
from pm4py.algo.evaluation.simplicity import algorithm as simplicity_algorithm
from pm4py.algo.discovery.alpha import algorithm as alpha_miner
from pm4py.algo.discovery.heuristics import algorithm as heuristics_miner
from pm4py.algo.discovery.heuristics import variants as heuristics_miner_variants
from pm4py.objects.conversion.heuristics_net import converter as hn_converter
from pm4py.algo.discovery.inductive import variants
import optuna


# Set a random seed for reproducibility
SEED = 42
random.seed(SEED)
np.random.seed(SEED)

In [None]:
from google.colab import drive
drive.mount('/content/drive')

## Section B: Load Data and Define Evaluation Functions

In [None]:
EVENT_LOG_PATH = "/content/drive/MyDrive/Masters & PhD/PhD/05_Courses/01_AutoML_Course/Project 2/PermitLog.xes"
log = pm4py.read_xes(EVENT_LOG_PATH)

In [None]:
def filter_log_by_start_and_end(log, start_activity, end_activity):
    '''
    Filter Noisy Traces and keep only traces with the main start / end activities
    '''
    log_start_filtered = start_activities_filter.apply(log, {start_activity})
    log_filtered = end_activities_filter.apply(log_start_filtered, {end_activity})
    return log_filtered

log = filter_log_by_start_and_end(log, "Permit SUBMITTED by EMPLOYEE", "Payment Handled")

In [None]:
def discover_model(algorithm, params):
    if algorithm == "alpha":
        from pm4py.algo.discovery.alpha import algorithm as alpha_miner
        net, im, fm = alpha_miner.apply(log)
        return net, im, fm
    elif algorithm == "heuristic":
        from pm4py.algo.discovery.heuristics import algorithm as heuristics_miner
        from pm4py.objects.conversion.heuristics_net import converter as hn_converter
        dep_thresh = params.get("dependency_thresh", 0.5)
        and_thresh = params.get("and_thresh", 0.5)
        noise_thresh = params.get("noise_thresh", 0.0) 
        heu_net = heuristics_miner.apply_heu(
            log,
            parameters={
                heuristics_miner.Variants.CLASSIC.value.Parameters.DEPENDENCY_THRESH: dep_thresh,
                heuristics_miner.Variants.CLASSIC.value.Parameters.AND_MEASURE_THRESH: and_thresh
            }
        )
        net, im, fm = hn_converter.apply(heu_net)
        return net, im, fm
    elif algorithm == "inductive":
        from pm4py.algo.discovery.inductive import algorithm as inductive_miner
        from pm4py.algo.discovery.inductive import variants
        variant_name = params.get("variant", "im")
        # Map variant string to actual PM4Py variant
        if variant_name == "im":
            variant = variants.im
        elif variant_name == "imd":
            variant = variants.imd
        else:
            variant = variants.imf  # default fallback

        process_tree = inductive_miner.apply(log, variant=variant)
        net, im, fm = pt_converter.apply(process_tree, variant=pt_converter.Variants.TO_PETRI_NET)
        return net, im, fm
    else:
        raise ValueError("Unknown algorithm specified.")

In [None]:
def evaluate_model(net, im, fm, log):
    # Fitness
    fitness_res = token_replay.apply(log, net, im, fm)
    if isinstance(fitness_res, list) and len(fitness_res) > 0:
        total_fitness = sum(res["trace_fitness"] for res in fitness_res if "trace_fitness" in res)
        num_traces = len(fitness_res)
        fitness = total_fitness / num_traces if num_traces > 0 else 0.0
    else:
        fitness = 0.0
    # Generalization
    generalization_value = generalization_algorithm.apply(log, net, im, fm)
    # Simplicity
    simplicity_value = simplicity_algorithm.apply(net)
    return fitness, generalization_value, simplicity_value

## Section C: Create Logger to store the results while running the HPO

In [None]:
import logging

# Configure logging
logging.basicConfig(
    level=logging.INFO,  # Set the log level
    format="%(asctime)s - %(message)s",  # Log format includes timestamp
    handlers=[
        logging.FileHandler("multi_optuna_objective_optimization.log"),
        logging.StreamHandler()  # Log to console
    ]
)

## Section D: Define Search Space with Optuna
#### Define the mode parameter of the objective to (single/multi) objective. If single is selected, it will optimize based on Fitness. If Multi is selected, it will optimize based on the sum of fitness, generalization and simplicity.

In [None]:
algos = ["alpha", "heuristic", "inductive"]

# Record global start time for time measurement
global_start_time = time.time()

def objective(trial, mode='multi'): # You need to define the mode here for single object or multi-objective
    algorithm = trial.suggest_categorical("algorithm", algos)

    if algorithm == "alpha":
        remove_loops = trial.suggest_categorical("alpha_remove_loops", [True, False])
        ignore_noise = trial.suggest_categorical("alpha_ignore_noise", [True, False])
        params = {
            "remove_loops": remove_loops,
            "ignore_noise": ignore_noise
        }
    elif algorithm == "heuristic":
        dependency_thresh = trial.suggest_float("heuristic_dep", 0.0, 0.9)
        and_thresh = trial.suggest_float("heuristic_and", 0.0, 0.9)
        noise_thresh = trial.suggest_float("heuristic_noise", 0.0, 0.5)
        params = {
            "dependency_thresh": dependency_thresh,
            "and_thresh": and_thresh,
            "noise_thresh": noise_thresh
        }
    else:  # inductive
        variant = trial.suggest_categorical("inductive_variant", ["im", "imd", "imf"])
        noise_thresh = trial.suggest_float("inductive_noise", 0.0, 0.5)
        activity_freq = trial.suggest_float("inductive_activity_frequency", 0.0, 1.0)
        params = {
            "noise_thresh": noise_thresh,
            "variant": variant,
            "activity_frequency_filter": activity_freq
        }

    start_time = time.time()
    try:
        net, im, fm = discover_model(algorithm, params)
        fitness, generalization, simplicity = evaluate_model(net, im, fm, log)
    except Exception:
        fitness, generalization, simplicity = 0.0, 0.0, 0.0

    # Measure elapsed time since the beginning of the optimization run
    elapsed_time = time.time() - global_start_time

    # Print elapsed time and the three metrics
    print(f"Trial {trial.number}, Elapsed Time: {elapsed_time:.2f}s, "
          f"Fitness: {fitness:.4f}, Generalization: {generalization:.4f}, Simplicity: {simplicity:.4f}")

    # Store these values as trial user attributes for record
    trial.set_user_attr("fitness", fitness)
    trial.set_user_attr("generalization", generalization)
    trial.set_user_attr("simplicity", simplicity)
    trial.set_user_attr("elapsed_time", elapsed_time)

    # We want to maximize fitness
    if mode == 'single':
        return fitness
    else:
        return fitness+generalization+simplicity

## Section E: Run the Optimization with a Time Budget

In [None]:
study = optuna.create_study(direction="maximize", sampler=optuna.samplers.TPESampler(seed=SEED))
time_budget_seconds = 60 * 60 # 1 Hour is the maximum time budget

study.optimize(objective, timeout=time_budget_seconds)
# Print the best trial
logging.info("Best trial:")
best_trial = study.best_trial
logging.info("  Value: ", best_trial.value)
logging.info("  Params: ")
for key, value in best_trial.params.items():
    logging.info("    {}: {}".format(key, value))
logging.info("  Fitness: ", best_trial.user_attrs.get("fitness", "N/A"))
logging.info("  Generalization: ", best_trial.user_attrs.get("generalization", "N/A"))
logging.info("  Simplicity: ", best_trial.user_attrs.get("simplicity", "N/A"))
logging.info("  Elapsed Time: ", best_trial.user_attrs.get("elapsed_time", "N/A"), "s")