# Requirements and Imports

Requirements installation

In [None]:
%pip install pydot
%pip install cupy-cuda11x
%pip install pm4py

Collecting cupy-cuda11x
  Downloading cupy_cuda11x-13.3.0-cp310-cp310-manylinux2014_x86_64.whl.metadata (2.7 kB)
Downloading cupy_cuda11x-13.3.0-cp310-cp310-manylinux2014_x86_64.whl (96.6 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m96.6/96.6 MB[0m [31m7.4 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: cupy-cuda11x
Successfully installed cupy-cuda11x-13.3.0
Collecting pm4py
  Downloading pm4py-2.7.12.1-py3-none-any.whl.metadata (4.2 kB)
Collecting deprecation (from pm4py)
  Downloading deprecation-2.1.0-py2.py3-none-any.whl.metadata (4.6 kB)
Collecting intervaltree (from pm4py)
  Downloading intervaltree-3.1.0.tar.gz (32 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting sortedcontainers<3.0,>=2.0 (from intervaltree->pm4py)
  Downloading sortedcontainers-2.4.0-py2.py3-none-any.whl.metadata (10 kB)
Downloading pm4py-2.7.12.1-py3-none-any.whl (2.0 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.0/2.0 MB[0m 

We mount Google Drive for files access

In [None]:
# Drive mounting

from google.colab import files
from google.colab import drive


directory = '/content/problems'

!rm {directory}/*

!rm *

print("Please Upload the dot formatted declare file")
uploaded = files.upload()

for file_name in uploaded.keys():
    print(f'Uploaded file: {file_name}')


drive.mount('/content/drive')


We make the required imports


In [None]:
import pydot
import random
from datetime import datetime, timedelta
import os
from pm4py.objects.log.importer.xes import importer as xes_importer
from pm4py.objects.log.exporter.xes import exporter as xes_exporter
import os
import shutil

# Pddl Translator

We set the required global parameters

In [None]:
# let the user define this section
xes_file = "drive/MyDrive/TESI/SRC/length_20.xes_atomized_1.xes"
# constraint files in dot format
dot_files = ["declare1.dot"]
domain_name = "traceAlignment"
problem_limit = 4 # if you want to parse problems until a bound is reached
problem_automata = 1 # number of automata for the dot files if more than one
problem_automata_list = [1,2,5,9] # if you want to sample traces
problem_automata_debug_list = [0] # if you want to debug traces in this list
problem_automata_noise_list = [1,2,5,9] # if you want to sample the traces where to inject noise
activities = ["Handle Case", "Call Outbound", "Inbound Call", "Handle Email", "Inbound Email"] # let the user define its own activities
lifecycles = ["assign","start","complete"] # let the user define its own lifecycles
lifecycle_states = ["init_state","assigned_state","started_state","completed_state","sink_state"]
resources = ["Susi", "John", "Eric"] # let the user define its own resources
max_amount = 2 # upper bound of elements to add and delete if we add noise probabilistically
# log traces tags
trace_id_tag = "concept:name"
event_id_tag = "concept:name"
activity_tag = "Activity"
lifecycle_tag = "lifecycle:transition"
org_tag = "org:resource"
resource_tag = "Resource"
timestamp_tag = "time:timestamp"
# list of moves
move_1 = "swap"
move_2 = "delete"
move_3 = "add"
move_4 = "modify"
move_5 = "pass"
# list of weights for each move
move_1_w = 0.1
move_2_w = 0.1
move_3_w = 0.4
move_4_w = 0.2
move_5_w = 0.2
# list of amount of move for each choice
amount_move_1 = amount_move_2 = amount_move_3 = amount_move_4 = 2 # let the user define them
avg_trace_length = int(xes_file[(xes_file.find("_") + 1):xes_file.find(".")]) # let the user define it
noise_percentage = int((move_1_w + move_2_w + move_3_w + move_4_w) * 100)# let the user define it

Function to write the pddl problem to a .pddl file

In [None]:
def write_pddl_problem(output_file, problem_name, initial_state, objects, goal_state):
    """
    Function to write the pddl problem with content settled by parameters in input.

    Parameters:
        output_file (file name) : The output file name.
        problem_name (file name): The pddl problem name.
        initial_state (list): The lines representing the initial state of the pddl problem.
        objects (dict): A dictionary representing the objects of the pddl problem.
        goal_state (list): The lines representing the goal state of the pddl problem.

    """
    directory = '/content/problems'

    file_path = os.path.join(directory, output_file)

    if not os.path.exists(directory):
        os.makedirs(directory)

    with open(file_path, 'w') as f:
        f.write(f"(define (problem {problem_name})\n")
        f.write(f"\t(:domain {domain_name})\n\n")

        # Objects section
        f.write("\t(:objects\n")
        for obj_type, obj_list in objects.items():
            f.write(f"\t\t{' '.join(obj_list)} - {obj_type}\n")
        f.write("\t)\n\n")

        # Initial state section
        f.write("\t(:init\n")
        for predicate in initial_state:
            f.write(f"\t\t{predicate}\n")
        f.write("\t)\n\n")

        # Goal state section
        f.write("\t(:goal\n")
        f.write(f"\t\t(and\n")
        for predicate in goal_state:
            f.write(f"\t\t\t{predicate}\n")
        f.write("\t\t)\n")
        f.write("\t)\n")
        f.write("\t(:metric minimize (total-cost))\n")
        f.write(")\n")
    print("Writed file : ",file_path)

Function to generate automata from a .dot file

In [None]:
def generate_formula(initial_state,objects,goal_state,found_activities):

   for automaton_index in range(0,problem_automata):
      # Parse the .dot file and extract graph structure
      graph = pydot.graph_from_dot_file(dot_files[automaton_index])[0]
      if automaton_index == 0:
        # Iterate over each automaton
        automaton = {"states": [], "transitions": []}

        # Extract states and transitions from graph
        for node in graph.get_nodes():
            state_name = node.get_name().strip('"')
            state_shape = node.get_shape().strip('"')
            if state_name:
              if state_name != "initial":
                automaton["states"].append(f"s{state_name}")
                if state_shape == "doublecircle":
                  initial_state.append(f"(final_state s{state_name})")
                  goal_state.append(f"(cur_state s{state_name})")



        for edge in graph.get_edges():
            source = edge.get_source().strip('"')
            target = edge.get_destination().strip('"')
            original_labels_string = edge.get_label()
            original_label= edge.get_label()
            label = edge.get_label()  # Get the label of the edge (arc)
            if label != None :
              label = label.replace('"','')
              if label.find(f"_{lifecycles[0]}") != -1 :
                label = label.replace(f"_{lifecycles[0]}","")
              elif label.find(f"_{lifecycles[1]}") != -1 :
                label = label.replace(f"_{lifecycles[1]}","")
              elif label.find(f"_{lifecycles[2]}") != -1:
                label = label.replace(f"_{lifecycles[2]}","")
              if label.find('\\n') != -1 :
                original_labels_string = original_labels_string.replace('"','')
                original_labels_string_array = original_labels_string.split('\\n')
                labels = label.split('\\n')
                length = len(labels)
                activities = []
                for i in range(0,length):
                  activities.append(labels[i])
                for act in activities :
                  if act not in found_activities :
                    initial_state.append(f"(cur_lifecycle_state {act} {lifecycle_states[0]})")
                    initial_state.append(f"(final_lifecycle_state {act} {lifecycle_states[3]})")
                    initial_state.append(f"(lifecycle_activity_of {act} {act}_{lifecycles[0]})")
                    initial_state.append(f"(lifecycle_activity_of {act} {act}_{lifecycles[1]})")
                    initial_state.append(f"(lifecycle_activity_of {act} {act}_{lifecycles[2]})")
                    initial_state.append(f"(lifecycle {lifecycle_states[0]} {act}_{lifecycles[0]} {lifecycle_states[1]})")
                    initial_state.append(f"(lifecycle {lifecycle_states[0]} {act}_{lifecycles[1]} {lifecycle_states[4]})")
                    initial_state.append(f"(lifecycle {lifecycle_states[0]} {act}_{lifecycles[2]} {lifecycle_states[4]})")
                    initial_state.append(f"(lifecycle {lifecycle_states[1]} {act}_{lifecycles[1]} {lifecycle_states[2]})")
                    initial_state.append(f"(lifecycle {lifecycle_states[1]} {act}_{lifecycles[0]} {lifecycle_states[4]})")
                    initial_state.append(f"(lifecycle {lifecycle_states[1]} {act}_{lifecycles[2]} {lifecycle_states[4]})")
                    initial_state.append(f"(lifecycle {lifecycle_states[2]} {act}_{lifecycles[2]} {lifecycle_states[3]})")
                    initial_state.append(f"(lifecycle {lifecycle_states[2]} {act}_{lifecycles[0]} {lifecycle_states[4]})")
                    initial_state.append(f"(lifecycle {lifecycle_states[2]} {act}_{lifecycles[1]} {lifecycle_states[4]})")
                    initial_state.append(f"(lifecycle {lifecycle_states[3]} {act}_{lifecycles[0]} {lifecycle_states[0]})")
                    initial_state.append(f"(lifecycle {lifecycle_states[3]} {act}_{lifecycles[1]} {lifecycle_states[4]})")
                    initial_state.append(f"(lifecycle {lifecycle_states[3]} {act}_{lifecycles[2]} {lifecycle_states[4]})")
                    goal_state.append(f"(cur_lifecycle_state {act} {lifecycle_states[3]})")
                    objects["activity"].append(act)
                    objects["lifecycle_activity"].append(f"{act}_{lifecycles[0]}")
                    objects["lifecycle_activity"].append(f"{act}_{lifecycles[1]}")
                    objects["lifecycle_activity"].append(f"{act}_{lifecycles[2]}")
                    found_activities.append(act)
                for act_name in original_labels_string_array :
                  if source and target and act_name:
                    if source != target :
                      if source != "initial" :
                        automaton["transitions"].append((source, target, act_name))
                      else:
                        initial_state.append(f"(cur_state s{target})")
              else :
                activity = label
                if activity not in found_activities :
                  initial_state.append(f"(cur_lifecycle_state {activity} {lifecycle_states[0]})")
                  initial_state.append(f"(final_lifecycle_state {activity} {lifecycle_states[3]})")
                  initial_state.append(f"(lifecycle_activity_of {activity} {activity}_{lifecycles[0]})")
                  initial_state.append(f"(lifecycle_activity_of {activity} {activity}_{lifecycles[1]})")
                  initial_state.append(f"(lifecycle_activity_of {activity} {activity}_{lifecycles[2]})")
                  initial_state.append(f"(lifecycle {lifecycle_states[0]} {activity}_{lifecycles[0]} {lifecycle_states[1]})")
                  initial_state.append(f"(lifecycle {lifecycle_states[0]} {activity}_{lifecycles[1]} {lifecycle_states[4]})")
                  initial_state.append(f"(lifecycle {lifecycle_states[0]} {activity}_{lifecycles[2]} {lifecycle_states[4]})")
                  initial_state.append(f"(lifecycle {lifecycle_states[1]} {activity}_{lifecycles[1]} {lifecycle_states[2]})")
                  initial_state.append(f"(lifecycle {lifecycle_states[1]} {activity}_{lifecycles[0]} {lifecycle_states[4]})")
                  initial_state.append(f"(lifecycle {lifecycle_states[1]} {activity}_{lifecycles[2]} {lifecycle_states[4]})")
                  initial_state.append(f"(lifecycle {lifecycle_states[2]} {activity}_{lifecycles[2]} {lifecycle_states[3]})")
                  initial_state.append(f"(lifecycle {lifecycle_states[2]} {activity}_{lifecycles[0]} {lifecycle_states[4]})")
                  initial_state.append(f"(lifecycle {lifecycle_states[2]} {activity}_{lifecycles[1]} {lifecycle_states[4]})")
                  initial_state.append(f"(lifecycle {lifecycle_states[3]} {activity}_{lifecycles[0]} {lifecycle_states[0]})")
                  initial_state.append(f"(lifecycle {lifecycle_states[3]} {activity}_{lifecycles[1]} {lifecycle_states[4]})")
                  initial_state.append(f"(lifecycle {lifecycle_states[3]} {activity}_{lifecycles[2]} {lifecycle_states[4]})")
                  goal_state.append(f"(cur_lifecycle_state {activity} {lifecycle_states[3]})")
                  objects["activity"].append(activity)
                  objects["lifecycle_activity"].append(f"{activity}_{lifecycles[0]}")
                  objects["lifecycle_activity"].append(f"{activity}_{lifecycles[1]}")
                  objects["lifecycle_activity"].append(f"{activity}_{lifecycles[2]}")
                  found_activities.append(activity)
                if source and target and original_label:
                 if source != target :
                   if source != "initial" :
                     automaton["transitions"].append((source, target, original_label))
                   else:
                     initial_state.append(f"(cur_state s{target})")


        # Extract from declare model to formula automata
        start = True
        for state in automaton["states"]:
          if start == True :
            initial_state.append(f"(cur_state {state})")
            start = False
          objects["automaton_state"].append(state)
        for source, target, label in automaton["transitions"]:
            label = label.replace('"','')
            initial_state.append(f"(automaton s{source} {label} s{target})")

# Synthetic Log Creation

In this section we generate a noisy log from the original one

We start with the noise injection utils functions

In [None]:
def inject_noise_move_1(noisy_trace, noisy_event, noisy_index, prob = False, chose_amount = 1):
    """
    Function to inject noise amount for move_1: swap choice.
    Swap two events in a trace chose_amount elements far apart.

    Parameters:
        noisy_trace (pm4py.Trace) : The original log trace.
        noisy_event (pm4py.Event): The original log trace event.
        noisy_index (integer): The original log trace event index.
        prob (default = False) (bool): Flag to decide whether to injects noise in a probabilistic way.
        chose_amount (default = 1) (integer): To decide the amount of the move for each choice.

    """
    if prob:
      # init amount
      start_index = 1
      end_index = len(noisy_trace)
      # amount : Another source of randomness that depends on the specific function
      amount = random.randint(start_index, end_index)
    else:
      amount = chose_amount

    # get the indexes of the events
    event_index_1 = noisy_index
    event_index_2 = (event_index_1 + amount) % len(noisy_trace)
    # get the events
    event_1 = noisy_event
    event_2 = noisy_trace[event_index_2]

    # swap the events
    noisy_trace[event_index_1] = event_2
    noisy_trace[event_index_2] = event_1

def inject_noise_move_2(noisy_trace, noisy_trace_index, trace_events_to_delete, noisy_event, noisy_index, prob = False, chose_amount = 1):
    """
    Function to inject noise amount for move_2: delete choice.
    Delete chose_amount events in a trace.

    Parameters:
        noisy_trace (pm4py.Trace) : The original log trace.
        noisy_trace_index (integer): The original log trace index.
        traces_events_to_delete (list): List to keep track of events deleted.
        noisy_event (pm4py.Event): The original log trace event.
        noisy_index (integer): The original log trace event index.
        prob (default = False) (bool): Flag to decide whether to injects noise in a probabilistic way.
        chose_amount (default = 1) (integer): To decide the amount of the move for each choice.

    """
    if prob:
      # init amount
      start_index = 0
      end_index = 1
      # amount : Another source of randomness that depends on the specific function.
      amount = random.randint(start_index, end_index)
    else:
      amount = chose_amount
    # delete
    if amount == 1 or amount == 2 :
      trace_events_to_delete[noisy_trace_index].append(noisy_event)



def inject_noise_move_3(noisy_trace, noisy_trace_index, noisy_event, noisy_index, trace_events_to_skip, prob = False, chose_amount = 1):
    """
    Function to inject noise amount for move_3: add choice.
    Add chose_amount events in a trace.
    Parameters:
        noisy_trace (pm4py.Trace) : The original log trace.
        noisy_event (pm4py.Event): The original log trace event.
        noisy_index (integer): The original log trace event index.
        prob (default = False) (bool): Flag to decide whether to injects noise in a probabilistic way.
        chose_amount (default = 1) (integer): To decide the amount of the move for each choice.

    """
    if prob:
      # init amount
      start_index = 1
      end_index = max_amount
      # amount : Another source of randomness that depends on the specific function.
      amount = random.randint(start_index, end_index)
    else:
      amount = chose_amount

    for _ in range(amount):
      # sample parameters
      activity = random.choices(activities, k = 1)[0]
      lifecycle = random.choices(lifecycles, k = 1)[0]
      start_date = datetime(2022, 3, 1, 0, 0, 0)  # March 1, 2022
      end_date = datetime(2023, 7, 31, 23, 59, 59)  # July 31, 2023

      # random timedelta within the range
      random_timedelta = random.randint(0, int((end_date - start_date).total_seconds()))

      # random timedelta to the start date to get a random timestamp
      random_timestamp = start_date + timedelta(seconds = random_timedelta)

      random_timestamp = random_timestamp.strftime('%Y-%m-%dT%H:%M:%S.%f')[:23] + 'Z'



      # Create a new event dictionary representing the event you want to add
      new_event = {
              event_id_tag: activity,
              lifecycle_tag: lifecycle,
              timestamp_tag: random_timestamp,
                   }
      noisy_trace.insert(noisy_index + 1 , new_event) # insert
      trace_events_to_skip[noisy_trace_index].append(new_event)

def inject_noise_move_4(noisy_trace, noisy_event, prob = False, chose_amount = 1):
    """
    Function to inject noise amount for move_4: modify choice.
    Modify chose_amount events in a trace.

    Parameters:
        noisy_trace (pm4py.Trace) : The original log trace.
        noisy_event (pm4py.Event): The original log trace event.
        prob (default = False) (bool): Flag to decide whether to injects noise in a probabilistic way.
        chose_amount (default = 1) (integer): To decide the amount of the move for each choice.

    """
    if prob :
      # init amount
      start_index = 1
      end_index = 5
      # amount : Another source of randomness that depends on the specific function.
      amount = random.randint(start_index, end_index)
    else :
      amount = chose_amount

    if amount >= 1 :
      # modify current activity
      activity = random.choices(activities, k = 1)[0]
      noisy_event[event_id_tag] = activity
      #noisy_event[activity_tag] = activity
    if amount >= 2 :
      # modify current lifecycle
      lifecycle = random.choices(lifecycles, k = 1)[0]
      noisy_event[lifecycle_tag] = lifecycle
    if amount >= 3 :
      pass
      # modify current resource
      # resource = random.choices(resources, k = 1)[0]
      #noisy_event[org_tag] = resource
      #noisy_event[resource_tag] = resource
    if amount >= 4 :
      pass
      # modify current timestamp
      #start_date = datetime(2022, 3, 1, 0, 0, 0)  # March 1, 2022
      #end_date = datetime(2023, 7, 31, 23, 59, 59)  # July 31, 2023

      # random timedelta within the range
      #random_timedelta = random.randint(0, int((end_date - start_date).total_seconds()))

      # random timedelta to the start date to get a random timestamp
      #random_timestamp = start_date + timedelta(seconds = random_timedelta)
      #noisy_event[timestamp_tag] = random_timestamp

We proceed with the main function to inject noise

In [None]:
def inject_noise(xes_file, trace_events_to_delete, prob = False, chose_amount = 1):
    """
    Function to preprocess the xes log and inject noise for each trace.
    The injection is made by several parameters.
    The function choose the move element with probability/percentage given by {move}_w param.
    The possible default moves are :
      1) swap --> swap two events in the trace
      2) delete --> delete an event in the trace
      3) add --> add an event in the trace
      4) modify --> modify an event in the trace
      5) pass --> this moves does not modify the trace

    Other moves can be added.
    Each move choice is made up by a chose_amount amount if acting deterministically otherwise the amount
    for the move is randomly selected.

    Parameters:
        xes_file (file name): A xes formatted log file path name.
        traces_events_to_delete (list): List to keep track of events deleted.
        prob (default = False) (bool): Flag to decide whether to injects noise in a probabilistic way.
        chose_amount (default = 1) (integer): To decide the amount of the move for each choice.

    Returns:
        noisy_log (pm4py.Log): A new log with the noise injected.
    """

    # store the events added because of noise to the trace to avoid long processing
    trace_events_to_skip = {}


    if not prob :
      sum_weights = move_1_w + move_2_w + move_3_w + move_4_w + move_5_w
      sum_rounded = round(sum_weights, 10)  # Round to 10 decimal places
      result = (sum_rounded == 1)
      assert(result == True)

    # list of choices for the rand moves
    moves = [move_1,move_2,move_3,move_4,move_5]

    # list containing weights,
    # in functions like this weights are interpreted as relative probabilities so they don't need to sum to 1
    weights = [move_1_w, move_2_w, move_3_w, move_4_w, move_5_w]

    # import log
    noisy_log = xes_importer.apply(xes_file)
    num = 0
    # loop for all traces in the log
    for noisy_trace_index, noisy_trace in enumerate(noisy_log):

      trace_events_to_delete[noisy_trace_index] = []
      trace_events_to_skip[noisy_trace_index] = []
      # inject noise
      if not prob:
        noisy_trace_length = len(noisy_trace)
        curr_moves = [move_1,move_2,move_3,move_4,move_5]
        total_weight = round(sum(weights),10)
        amounts = [0] * len(weights)

        # remaining trace length to distribute
        remaining_trace_length = noisy_trace_length

        # scaling each weight to make the sum equal to noisy_trace_length
        for k in range(len(weights)):
            # Calculate the scaled value using integer division
            scaled_value = int((weights[k] * noisy_trace_length) // total_weight)

            # Assign the scaled value to amounts
            amounts[k] = scaled_value

            # Update the remaining trace length
            remaining_trace_length -= scaled_value

        # distributing the remaining trace length evenly across the amounts
        for k in range(remaining_trace_length):
            amounts[k] += 1
      else:
        amounts = []


      num = num + 1
      if num in problem_automata_noise_list or not problem_automata_list:

        # clean structures
        print("New trace: ",noisy_trace.attributes[trace_id_tag],"Trace Length: ",len(noisy_trace),"Amounts: ",amounts)
        if noisy_trace_index == 0 and not prob:
          global noise_percentage
          noise_percentage = int((sum(amounts[:4]) / avg_trace_length) * 100)
          print("Noise Percentage: ",noise_percentage)
        noisy_trace_id = noisy_trace.attributes[trace_id_tag].replace(" ", "_")
        cur_activity = "init"

        for noisy_index, noisy_event in enumerate(noisy_trace):
          if (noisy_event not in trace_events_to_delete[noisy_trace_index]) and (noisy_event not in trace_events_to_skip[noisy_trace_index]) :
            if cur_activity != noisy_event[event_id_tag].replace(" ","_") :
              # new activity found
              #print("Old activity: ",cur_activity,", New activity: ",noisy_event[activity_tag].replace(" ","_"))
              cur_activity = noisy_event[event_id_tag].replace(" ","_")

            if prob:
              # Perform a random choice biased with weights
              biased_choice = random.choices(moves, weights=weights, k=1)[0]
            else:
              move_to_remove = ""

              # Iterate over the list in reverse order to avoid index changes
              for k in range(len(amounts)):
                  if amounts[k] == 0:
                    move_to_remove = moves[k]

              # Remove elements from the list after the loop
              if move_to_remove in curr_moves :
                curr_moves.remove(move_to_remove)
              if len(curr_moves) < 1 :
                print("Trace completed, go to the next trace",noisy_event)
                break

              biased_choice = random.choice(curr_moves)

            # logging the choice
            if biased_choice == move_1:
                print("You chose case 1 SWAP")
                # inject noise amount
                if not prob:
                  amounts[0] = amounts[0] - 1
                  inject_noise_move_1(noisy_trace=noisy_trace,  noisy_event=noisy_event, noisy_index=noisy_index, chose_amount=amount_move_1)
                else:
                  inject_noise_move_1(noisy_trace=noisy_trace, noisy_event=noisy_event, noisy_index=noisy_index, prob=prob)
            elif biased_choice == move_2:
                print("You chose case 2 DELETE")
                # inject noise amount
                if not prob:
                  amounts[1] = amounts[1] - 1
                  inject_noise_move_2(noisy_trace=noisy_trace, noisy_trace_index=noisy_trace_index, trace_events_to_delete=trace_events_to_delete, noisy_event=noisy_event, noisy_index=noisy_index, chose_amount=amount_move_2)
                else:
                  inject_noise_move_2(noisy_trace=noisy_trace, noisy_trace_index=noisy_trace_index, trace_events_to_delete=trace_events_to_delete, noisy_event=noisy_event, noisy_index=noisy_index, prob = prob)
            elif biased_choice == move_3:
                print("You chose case 3 ADD")
                # inject noise amount
                if not prob:
                  amounts[2] = amounts[2] - 1
                  inject_noise_move_3(noisy_trace=noisy_trace, noisy_trace_index=noisy_trace_index, noisy_event=noisy_event, noisy_index=noisy_index, trace_events_to_skip=trace_events_to_skip, chose_amount=amount_move_3)
                else:
                  inject_noise_move_3(noisy_trace=noisy_trace, noisy_trace_index=noisy_trace_index, noisy_event=noisy_event, noisy_index=noisy_index, trace_events_to_skip=trace_events_to_skip, prob=prob)
            elif biased_choice == move_4:
                print("You chose case 4 MODIFY")
                # inject noise amount
                if not prob:
                  amounts[3] = amounts[3] - 1
                  inject_noise_move_4(noisy_trace=noisy_trace, noisy_event=noisy_event, chose_amount=amount_move_4)
                else:
                  inject_noise_move_4(noisy_trace=noisy_trace, noisy_event=noisy_event, prob=prob)
            elif biased_choice == move_5:
                print("You chose case 5 PASS")
                if not prob:
                  amounts[4] = amounts[4] - 1
            else:
                print("Error: Invalid case", biased_choice)
                return None

    return noisy_log

# Execution

In this section we have the main functions to execute the overall program

In [None]:
def debug_function(log):
    """
    Function to cat the log given as parameter only for the indexes in problem_automata_list

    Parameters:
        log (pm4py.Log): A log imported.oice.
    """
    debug_log = [log[i] for i in problem_automata_debug_list]
    for debug_trace in debug_log:
      print("Number of events:", len(debug_trace),"\n")
      print("<Trace Start> --------------------------------------------\n")
      print("\t<Trace ID>:", debug_trace.attributes[trace_id_tag],"\n")
      for debug_event in debug_trace:
          print("<Event Start> --------------------------------------------\n")
          print("\t<Event>:", debug_event[event_id_tag],"\n")
          print("\t\t<Lifecycle>:", debug_event[lifecycle_tag],"\n")
          print("\t\t<Timestamp>:", debug_event[timestamp_tag],"\n")
          print("\t\t<Activity>:", debug_event[event_id_tag],"\n")
          #print("\t\t<Resource>:", debug_event[resource_tag],"\n")
          print("<Event End> --------------------------------------------\n")
      print("<Trace End> --------------------------------------------\n")

def generate(noise = False, debug = False, prob = False, chose_amount = 1):
    """
    Main Function to generate pddl problems from:
    1) The xes formatted log imported with pm4py.
    2) The dot formatted declare model.

    The function can inject noise in the traces and can cat the log traces.

    Parameters:
        noise (bool): A flag to decide wheter to add noise to the traces.
        debug (bool): A flag to decide wheter to debug the traces.
        prob (default = False) (bool): Flag to decide whether to injects noise in a probabilistic way.
        chose_amount (default = 1) (integer): To decide the amount of the move for each choice.

    Returns:
        A string stating the result of the generation or None if something went wrong during the process.
    """

    # init dict of element to skip for each trace
    traces_events_to_delete = {}
    # Parse the XES log and check if the noise must be injected
    if noise == False :
      log = xes_importer.apply(xes_file)
      if debug:
        debug_function(log)
        return "Debug"
    else :
      log = inject_noise(xes_file=xes_file, trace_events_to_delete=traces_events_to_delete, prob=prob, chose_amount=chose_amount)
      # export the noisy log to XES format
      xes_exporter.apply(log, f"{avg_trace_length}_{noise_percentage}%_noisy_log.xes")
      if log is None:
        return log
      if debug:
        debug_function(log)
        return "Debug"


    num = 0
    # Extract activities and lifecycle from xes to lifecycle and trace automata
    for trace_index, trace in enumerate(log):
      num = num + 1
      if num in problem_automata_list :
        # clean structures
        print("New trace",trace.attributes[trace_id_tag])
        trace_id = trace.attributes[trace_id_tag].replace(" ", "_")
        message = "_Noisy" if noise == True else ""
        problem_name = f"trace_Alignment_Problem_{trace_id}{message}.pddl"
        objects = {
        "trace_state": [],
        "automaton_state": [],
        "activity": [],
        "lifecycle_activity": []
        }
        initial_state = []
        goal_state = []
        found_activities = []
        generate_formula(initial_state,objects,goal_state,found_activities)
        i = 0
        cur_activity = "init"
        initial_state.append(f"(cur_state t{i})")
        if not noise :
          traces_events_to_delete[trace_index] = []
        for event_index, event in enumerate(trace):
          if event not in traces_events_to_delete[trace_index]:
            if cur_activity != event[event_id_tag].replace(" ","_") :
              # new activity found
              print("Old activity: ",cur_activity,", New activity: ",event[event_id_tag].replace(" ","_"))
              cur_activity = event[event_id_tag].replace(" ","_")
              # setting lifecycle for the new activity not encountered before
              if cur_activity not in found_activities :
                objects["trace_state"].append(f"t{i}")
                initial_state.append(f"(trace t{i} {cur_activity}_{lifecycles[0]} t{i+1})")
                i = i + 1
                initial_state.append(f"(cur_lifecycle_state {cur_activity} {lifecycle_states[0]})")
                initial_state.append(f"(final_lifecycle_state {cur_activity} {lifecycle_states[3]})")
                initial_state.append(f"(lifecycle_activity_of {cur_activity} {cur_activity}_{lifecycles[0]})")
                initial_state.append(f"(lifecycle {lifecycle_states[0]} {cur_activity}_{lifecycles[0]} {lifecycle_states[1]})")
                initial_state.append(f"(lifecycle {lifecycle_states[0]} {cur_activity}_{lifecycles[1]} {lifecycle_states[4]})")
                initial_state.append(f"(lifecycle {lifecycle_states[0]} {cur_activity}_{lifecycles[2]} {lifecycle_states[4]})")
                goal_state.append(f"(cur_lifecycle_state {cur_activity} {lifecycle_states[3]})")
                objects["activity"].append(cur_activity)
                objects["lifecycle_activity"].append(f"{cur_activity}_{lifecycles[0]}")
                objects["lifecycle_activity"].append(f"{cur_activity}_{lifecycles[1]}")
                objects["lifecycle_activity"].append(f"{cur_activity}_{lifecycles[2]}")
                initial_state.append(f"(lifecycle_activity_of {cur_activity} {cur_activity}_{lifecycles[1]})")
                initial_state.append(f"(lifecycle {lifecycle_states[1]} {cur_activity}_{lifecycles[1]} {lifecycle_states[2]})")
                initial_state.append(f"(lifecycle {lifecycle_states[1]} {cur_activity}_{lifecycles[0]} {lifecycle_states[4]})")
                initial_state.append(f"(lifecycle {lifecycle_states[1]} {cur_activity}_{lifecycles[2]} {lifecycle_states[4]})")
                initial_state.append(f"(lifecycle_activity_of {cur_activity} {cur_activity}_{lifecycles[2]})")
                initial_state.append(f"(lifecycle {lifecycle_states[2]} {cur_activity}_{lifecycles[2]} {lifecycle_states[3]})")
                initial_state.append(f"(lifecycle {lifecycle_states[2]} {cur_activity}_{lifecycles[0]} {lifecycle_states[4]})")
                initial_state.append(f"(lifecycle {lifecycle_states[2]} {cur_activity}_{lifecycles[1]} {lifecycle_states[4]})")
                initial_state.append(f"(lifecycle {lifecycle_states[3]} {cur_activity}_{lifecycles[0]} {lifecycle_states[0]})")
                initial_state.append(f"(lifecycle {lifecycle_states[3]} {cur_activity}_{lifecycles[1]} {lifecycle_states[4]})")
                initial_state.append(f"(lifecycle {lifecycle_states[3]} {cur_activity}_{lifecycles[2]} {lifecycle_states[4]})")
                found_activities.append(cur_activity)

            # lifecycle activity name
            cur_lifecycle = event[lifecycle_tag]

            # adding transition state, re-append here if the log does not have assign tag
            objects["trace_state"].append(f"t{i}")
            initial_state.append(f"(trace t{i} {cur_activity}_{cur_lifecycle} t{i+1})")
            i = i + 1

        objects["trace_state"].append(f"t{i}")
        initial_state.append(f"(final_state t{i})")
        goal_state.append(f"(cur_state t{i})")
        # Create PDDL problem file
        write_pddl_problem(problem_name,problem_name, initial_state, objects, goal_state)
    return "Completed"

if __name__ == "__main__":

    print("\n-----WELCOME TO THE PDDL TRANSALTOR-----\n")

    debug_option = input("You want to add Debug?: 1 --> (yes) 0 --> (no)")

    noise_option = input("You want to add Noise?: 1 --> (yes) 0 --> (no)")

    noise_type_option = "0"

    amount_option = "1"

    if noise_option == "1" :
      noise_type_option = input("You want to add Noise probabilistically or deterministically?: 1 --> (probabilistically) 0 --> (deterministically)")
      # decomment if you want a single amount for all moves
      #if noise_type_option == "0":
      #        amount_option = input("Input the selected amount (integer):")

    result = generate(noise=(int(noise_option) == 1),debug=(int(debug_option) == 1),prob=(int(noise_type_option) == 1),chose_amount=int(amount_option))
    if result == "Completed" :
      print("Success: problems generated")
    if result == "Debug":
      print("Success: debug completed")
    if result is None :
      print("Error: something went wrong")

    print("\n-----THANK YOU FOR USING PDDL TRANSALTOR-----\n")


# Problems Exports

In this section we export as a zip the problems generated previously

In [None]:
current_dir = '/content/problems'

temp_dir = '/content/temp_zip'
os.makedirs(temp_dir, exist_ok=True)

files_in_dir = os.listdir(current_dir)

for file_name in files_in_dir:
    file_path = os.path.join(current_dir, file_name)
    if os.path.isfile(file_path):
        shutil.copy(file_path, temp_dir)

shutil.make_archive('/content/problems', 'zip', temp_dir)

files.download('/content/problems.zip')
files.download('/content/noisy_log.xes')


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

# New Noise Setting

In [None]:
from google.colab import files

from google.colab import drive


directory = '/content/problems'

# Clean dir
!rm {directory}/*

# Clean files
!rm *

# Upload .dot files
print("Please Upload the dot formatted declare file")
uploaded = files.upload()

for file_name in uploaded.keys():
    print(f'Uploaded file: {file_name}')

# Mount Google Drive to get the log file (large size)
drive.mount('/content/drive')

In [None]:
import pm4py
import random
from datetime import datetime, timedelta
from pm4py.objects.log.importer.xes import importer as xes_importer
from pm4py.objects.log.exporter.xes import exporter as xes_exporter

# Example: Active Activities and Constraints
active_5_activities = {"A", "B", "C", "D", "E"}  # Only these activities are active in this trace
active_10_activities = {"A", "B", "C", "D", "E", "F", "G", "H", "I", "L"}  # Only these activities are active in this trace

# Active constraints for 5 activities
active_constraints_5_activities = [
    "Existence: Activity A must appear at least once.",
    "Not Existence: Activity F must not appear.",
    "Precedence: Activity A must precede Activity D.",
    "Response: Activity B must follow Activity A.",
    "Not Chain Succession: Activity C and D cannot follow each other directly.",
    "Not Chain Succession: Activity E and A cannot follow each other directly.",
    "Precedence: Activity B must precede Activity E."
]

# Active constraints for 10 activities
active_constraints_10_activities = [
    "Existence: Activity A must appear at least once.",
    "Existence: Activity B must appear at least once.",
    "Precedence: Activity A must precede Activity D.",
    "Precedence: Activity B must precede Activity E.",
    "Response: Activity C must follow Activity B.",
    "Not Existence: Activity F must not appear.",
    "Not Chain Succession: Activities H and I cannot follow each other directly."
]

# Set noise percentage (e.g., 50% of constraints should be violated)
noise_percentage = 50  # Adjust as needed

# Choose which activity set and constraints to use
active_activities = active_10_activities  # Choose for 10 activities
active_constraints = active_constraints_10_activities  # Choose for 10 activities
trace_length = 10
alphabet = 5

# Tags for event data
event_id_tag = "concept:name"
lifecycle_tag = "lifecycle:transition"
timestamp_tag = "time:timestamp"

file_path = "log.xes"
log = xes_importer.apply(file_path)

In [None]:
import pm4py
import random
from datetime import datetime, timedelta
from pm4py.objects.log.importer.xes import importer as xes_importer
from pm4py.objects.log.exporter.xes import exporter as xes_exporter

# Example: Active Activities and Constraints
active_5_activities = {"A", "B", "C", "D", "E"}  # Only these activities are active in this trace
active_10_activities = {"A", "B", "C", "D", "E", "F", "G", "H", "I", "L"}  # Only these activities are active in this trace

# Active constraints for 5 activities
active_constraints_5_activities = [
    "Existence: Activity A must appear at least once.",
    "Not Existence: Activity F must not appear.",
    "Precedence: Activity A must precede Activity D.",
    "Response: Activity B must follow Activity A.",
    "Not Chain Succession: Activity C and D cannot follow each other directly.",
    "Not Chain Succession: Activity E and A cannot follow each other directly.",
    "Precedence: Activity B must precede Activity E."
]

# Active constraints for 10 activities
active_constraints_10_activities = [
    "Existence: Activity A must appear at least once.",
    "Existence: Activity B must appear at least once.",
    "Precedence: Activity A must precede Activity D.",
    "Precedence: Activity B must precede Activity E.",
    "Response: Activity C must follow Activity B.",
    "Not Existence: Activity F must not appear.",
    "Not Chain Succession: Activities H and I cannot follow each other directly."
]

# Set noise percentage (e.g., 50% of constraints should be violated)
noise_percentage = 50  # Adjust as needed

# Choose which activity set and constraints to use
active_activities = active_10_activities  # Choose for 10 activities
active_constraints = active_constraints_10_activities  # Choose for 10 activities

# Tags for event data
event_id_tag = "concept:name"
lifecycle_tag = "lifecycle:transition"
timestamp_tag = "time:timestamp"

# Function to add an event to a trace at a random position
def add_event(trace, event):
    new_trace = trace[:]
    position = random.randint(0, len(new_trace))
    lifecycle = random.choice(["assign", "start", "complete"])
    start_date = datetime(2022, 3, 1)
    end_date = datetime(2023, 7, 31)
    random_timedelta = timedelta(seconds=random.randint(0, int((end_date - start_date).total_seconds())))
    random_timestamp = (start_date + random_timedelta).strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'

    new_event = {
        event_id_tag: event,
        lifecycle_tag: lifecycle,
        timestamp_tag: random_timestamp
    }
    new_trace.insert(position, new_event)
    return new_trace

# Function to delete an event from a trace by event name
def delete_event(trace, event):
    return [e for e in trace if e[event_id_tag] != event]

# Function to swap two events in a trace
def swap_events(trace, event_1, event_2):
    indices = {e[event_id_tag]: i for i, e in enumerate(trace)}
    if event_1 in indices and event_2 in indices:
        new_trace = trace[:]
        i, j = indices[event_1], indices[event_2]
        new_trace[i], new_trace[j] = new_trace[j], new_trace[i]
        return new_trace
    return trace

# Function to handle violation of the "Not Existence" constraint
def violate_not_existence(trace, constraint):
    activity = constraint.split(":")[1].strip().split()[1]
    if all(e[event_id_tag] != activity for e in trace):
        return add_event(trace, activity)
    return trace

# Function to handle violation of the "Not Chain Succession" constraint
def violate_chain_succession(trace, constraint):
    activities = [x for x in constraint.split() if x.isupper()]
    if len(activities) == 2:
        first, second = activities
        for i in range(len(trace) - 1):
            if trace[i][event_id_tag] == first:
                trace[i + 1] = {
                    event_id_tag: second,
                    lifecycle_tag: random.choice(["start", "complete"]),
                    timestamp_tag: trace[i + 1][timestamp_tag]
                }
                break
    return trace

# Function to violate specific constraints
def violate_constraint(trace, constraint):
    if "Existence" in constraint and "Not" not in constraint:
        activity = constraint.split(":")[1].strip().split()[1]
        return delete_event(trace, activity)
    elif "Precedence" in constraint:
        activities = [x for x in constraint.split() if x.isupper()]
        if len(activities) == 2:
            return swap_events(trace, activities[0], activities[1])
    elif "Response" in constraint:
        activities = [x for x in constraint.split() if x.isupper()]
        if len(activities) == 2:
            return delete_event(trace, activities[1])
    elif "Not Existence" in constraint:
        return violate_not_existence(trace, constraint)
    elif "Not Chain Succession" in constraint:
        return violate_chain_succession(trace, constraint)
    return trace

# Function to inject noise into a trace based on constraints and noise percentage
def inject_noise(trace, active_constraints, noise_percentage):
    num_constraints_to_violate = int(len(active_constraints) * (noise_percentage / 100))
    violated_constraints = random.sample(active_constraints, num_constraints_to_violate)

    for constraint in violated_constraints:
        trace = violate_constraint(trace, constraint)

    return trace

# Inject noise into the entire log
def inject_noise_into_log(log, active_constraints, noise_percentage):
    noisy_log = []
    for trace in log:
        noisy_trace = inject_noise(trace, active_constraints, noise_percentage)
        noisy_log.append(noisy_trace)
    return noisy_log

if __name__ == "__main__":
    file_path = "log.xes"
    log = xes_importer.apply(file_path)

    # Inject noise into the log
    noisy_log = inject_noise_into_log(log, active_constraints, noise_percentage)

    # Create a PM4Py event log object
    event_log = pm4py.objects.log.obj.EventLog()
    for trace in noisy_log:
        event_log.append(pm4py.objects.log.obj.Trace(trace))

    # Save the noisy log to a new file
    xes_exporter.apply(event_log, "noisy_log.xes")


parsing log, completed traces ::   0%|          | 0/1000 [00:00<?, ?it/s]

exporting log, completed traces ::   0%|          | 0/1000 [00:00<?, ?it/s]