# Exploratory Researching in Decision Mining

The paper "Decision Mining with Daikon" proposes a method for discovering branching conditions in business process execution logs by combining invariant detection (Daikon system) and decision tree learning techniques. The approach addresses limitations found in traditional decision mining by identifying complex conditions involving multiple variables and arithmetic operations. It outlines specific algorithms for generating observation instances from event logs and constructing conjunctive and disjunctive branching conditions. The techniques are validated through a series of tests, showcasing their ability to accurately discover various types of conditions. This methodology aims to enhance automated process discovery by providing more detailed insights into the conditions under which different paths in a process are taken.

To replicate the approach in Python, you would:

    1. Generate observation instances from event logs.
    2. Use Daikon to discover invariants from these instances.
    3. Apply decision tree learning to derive branching conditions based on the invariants.
    4. Test and refine the discovered conditions for accuracy.

In [238]:
import pandas as pd
import numpy as np
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import mutual_info_score
import json
import os
import sys
from sklearn.tree import DecisionTreeClassifier, export_text
import subprocess

# Extract decls and dtrace files

In [239]:
# def infer_data_type(value):
#     """
#     Infers the data type of the given value for the Daikon .decls file.
#     """
#     if isinstance(value, int):
#         return 'int'
#     elif isinstance(value, float):
#         return 'double'  # Daikon uses 'double' for floating-point numbers
#     elif isinstance(value, str):
#         return 'java.lang.String'
#     # Add more type checks as necessary for your data
#     return 'unknown'  # Fallback type


#   def create_decls_file(self, filename="variables.decls"):
#     """
#     Generates a Daikon .decls file based on the observation instances.
#     Adds the required 'decl-version 2.0' at the beginning of the file.
#     """
#     self.decls = root_path + filename
#     observation_instances = json.load(open(root_path + self.observation_instances + '.json', 'r'))
#     with open(self.decls, 'w') as file:
#         # Add the declaration version at the beginning
#         file.write("decl-version 2.0\n\n")
        
#         for task in observation_instances.keys():
#             file.write(f"ppt ..{task}:::ENTER\n")
#             file.write("ppt-type enter\n")
#             all_vars = {}
#             for instance in observation_instances[task]:
#                 for var, val in instance.items():
#                     inferred_type = infer_data_type(val)
#                     # Record the most common type if there are multiple types for the same variable
#                     all_vars[var] = inferred_type if var not in all_vars else all_vars[var]

#             for var, var_type in all_vars.items():
#                 file.write(f"variable {var}\n")
#                 file.write("var-kind variable\n")
#                 file.write(f"dec-type {var_type}\n")
#                 file.write(f"rep-type {var_type}\n\n")
#             file.write(f"ppt ..{task}:::EXIT\n")
#             file.write("ppt-type exit\n\n")

          
            
#   def format_for_daikon(self):
#     """
#     Convert observation instances to a string format suitable for Daikon.
#     This will vary based on your data structure and needs.
#     """
#     observation_instances = json.load(open(root_path + self.observation_instances + '.json', 'r'))
#     formatted_data = ""
#     for task, instances in observation_instances.items():
#         for instance in instances:
#             formatted_data += f"{task}:::ENTER\n"
#             for var, val in instance.items():
#                 formatted_data += f"{var} == {val}\n"
#             formatted_data += f"{task}:::EXIT\n"
#     self.dtrace = self.root_path + self.log_filename +'.dtrace'
#     json.dumeval_invariant(P, formatted_data, open(self.dtrace, 'w'), indent=2)
#     # return formatted_data  

# ComplexDecisionMining

In [240]:
log_filename = 'log_events'
root_path = 'D:\\01.Tesis\\DM\\'

def load_ui_log(filepath):
    # Cargar el registro de eventos desde un archivo CSV
    return pd.read_csv(filepath)

log_df = load_ui_log(root_path + log_filename + '.csv')

In [241]:
class ComplexDecisionMining():
  def __init__(self, root_path, log_filename):
    self.root_path = root_path
    self.log_filename = log_filename
    self.log_df = load_ui_log(root_path + log_filename + '.csv')

def generate_observation_instances(log_df, output_filename='observation_instances'):
    # TODO: self.observation_instances = output_filename
    # Define your event log, tasks, and variables based on the dataframe
    L = []  # List of traces
    T = set(log_df['activity'])  # Set of tasks
    V = set(log_df.columns)# - {'trace', 'activity'}  # Set of variables

    # Group the log by trace and convert each group into a list of (activity, phi) tuples
    for _, group in log_df.groupby('trace'):
        trace = []
        for _, row in group.iterrows():
            activity = row['activity']
            # Create the phi dictionary, ignoring NaN values
            phi = {var: row[var] for var in V if pd.notna(row[var])}
            trace.append((activity, phi))
        L.append(trace)

    # Now apply the original algorithm
    # Step 1: Initialize I as a dictionary where each key is a task and the value is an empty set
    I = {t: [] for t in T}

    # Step 2: Iterate over each trace in the event log
    for trace in L:
        # Step 3: Initialize M as a dictionary where each key is a variable and the value is None
        M = {v: None for v in V}
        
        # Step 4: Iterate over each event in the trace
        for (task, phi) in trace:
            # Step 5: Add the current state (M) to the set of observation instances for the current task
            I[task].append(M.copy())
            
            # Step 6: Update M with the current state of variables for this event
            for v, value in phi.items():
                M[v] = value

    # I now contains the observation instances for each task
    # Here we print the observation instances for the first task as an example
    # print(f"Observation instances for the first activity ({list(T)[0]}):")
    # print(I[list(T)[0]])
    
    activities = T
    # For each task, convert the observation instances to a dataframe and save it to a file
    for task, instances in I.items():
        df = pd.DataFrame(instances)
        df.to_csv(f"{root_path}{task}_observation_instances_process_info.csv", index=False)
        # Eliminar las columnas 'trace', 'activity' y 'variant' del dataframe
        df = df.droeval_invariant(P, columns=['trace', 'activity', 'variant'])
        df.to_csv(f"{root_path}{task}_observation_instances.csv", index=False)

    # Step 9: Return I containing the observation instances for each task
    # Store the result in a json file
    # json.dump(I, open(root_path + self.observation_instances +'.json', 'w'), indent=2)
    return activities


In [242]:
activities = generate_observation_instances(log_df)

In [243]:
def discover_invariants(root_path, filename):
    # Definir el directorio de Daikon y Java según tus instalaciones
    DAIKONDIR = "D:\\01.Tesis\\DM\\daikon-5.8.18"
    JAVA_HOME = "C:\\Program Files\\Java\\jdk-20"

    # Establecer las variables de entorno necesarias para Daikon
    os.environ['DAIKONDIR'] = DAIKONDIR
    os.environ['JAVA_HOME'] = JAVA_HOME
    os.environ['PATH'] = f"{DAIKONDIR}\\scripts;{DAIKONDIR}\\utils\\plume-scripts;{JAVA_HOME}\\bin;{os.environ['PATH']}"
    os.environ['DAIKON_CLASSPATH'] = f"{DAIKONDIR}\\daikon.jar;{JAVA_HOME}\\lib\\tools.jar;{JAVA_HOME}\\jre\\lib\\rt.jar"

    # Ejecutar Burdock como módulo de Python para generar los archivos .decls y .dtrace
    # Asegúrate de reemplazar 'python' con la ruta al intérprete de Python de tu entorno virtual si es necesario
    python_interpreter_path = sys.executable
    subprocess.run(["burdock", root_path + filename + ".csv"], check=True)
    print(f"Archivos .decls y .dtrace para {filename} generados con éxito")
    # Ejecutar Daikon para generar las invariantes
    # Aquí, 'nombre_del_log' debería ser el mismo que se utilizó para generar los archivos .decls y .dtrace
    cmd_daikon = f"java -cp %DAIKONDIR%/daikon.jar daikon.Daikon {root_path+filename}.dtrace {root_path+filename}.decls > {root_path + filename}.inv.txt" # > {root_path + log_filename}.inv.txt"
    subprocess.run(cmd_daikon, shell=True, check=True)

    # cmd_daikon = f"java -cp %DAIKONDIR%/daikon.jar daikon.PrintInvariants {root_path + filename}.inv.gz > {root_path + filename}.inv.txt" 
    # subprocess.run(cmd_daikon, shell=True, check=True)
    return f"{root_path + filename}.inv.txt"

In [244]:

for t in activities:
    discover_invariants(root_path, t+"_observation_instances")

Archivos .decls y .dtrace para G_observation_instances generados con éxito
Archivos .decls y .dtrace para E_observation_instances generados con éxito
Archivos .decls y .dtrace para D_observation_instances generados con éxito
Archivos .decls y .dtrace para A_observation_instances generados con éxito
Archivos .decls y .dtrace para C_observation_instances generados con éxito
Archivos .decls y .dtrace para B_observation_instances generados con éxito
Archivos .decls y .dtrace para F_observation_instances generados con éxito


In [251]:
# Función para leer y parsear las invariantes desde el archivo generado por discover_invariants
def read_invariants(file_path, obtain_predicate=False):
    """
    Lee el archivo de invariantes generado por discover_invariants y
    devuelve una lista de invariantes en formato de cadena.
    """
    invariants = []
    with open(file_path, 'r') as file:
        lines = file.readlines()
        start_reading = False
        for line in lines:
            if not 'data:::POINT' in line and start_reading:
                if line.strip() and not line.startswith('Exiting'):
                    # Eliminar espacios en blanco y saltos de línea
                    line = line.strip()
                    line = line.replace(' ', '')
                    # Desde que aparezca '==' in line or '<=' in line or '>=' in line or '!=' in line hasta el final de line, almacenar el contenido en aux
                    operators = ['==', '<=', '>=', '!=']
                    
                    aux = [line.find(op) for op in operators if line.find(op) != -1]

                    if len(aux) > 0:
                        first_operator_pos = min(aux)
                        second_part = line[first_operator_pos+2:]
                        first_part = line[:first_operator_pos]


                        # Si aux contiene NaN, se sustituye por aux por None en line
                        if 'NaN' in second_part:
                            line = line[:first_operator_pos+2] + 'None'


                        if obtain_predicate:
                            for part in [first_part, second_part]:
                                if not ('.' in part):
                                    line = line.replace(part, 'row["'+part+'"]')
                        
                    # Eliminar espacios en blanco
                    invariants.append(line)
            
            if '=========' in line:
                start_reading = True
                
    return invariants

# Test the function
read_invariants(root_path + 'E_observation_instances.inv.txt', True)

['row["length"]==None',
 'row["installement"]==None',
 'row["age"]>=1.0',
 'row["amount"]==None',
 'row["salary"]>=1.0',
 'row["length"]!=row["age"]',
 'row["length"]!=row["salary"]',
 'row["installement"]!=row["age"]',
 'row["installement"]!=row["salary"]',
 'row["age"]!=row["amount"]',
 'row["amount"]!=row["salary"]']

# Branching conditions

In [252]:
from itertools import combinations


# def get_unique_variants(log_df):
#     # Devuelve un diccionario donde las claves son variantes y los valores son las secuencias de actividades
#     variants = {}
#     for variant, group in log_df.groupby('variant'):
#         activities = list(group['activity'].unique())
#         variants[variant] = activities
#     return variants

# def find_branching_points(variants):
#     # Identifica los puntos de ramificación entre todas las combinaciones de variantes
#     branching_points = set()
#     for (variant1, activities1), (variant2, activities2) in combinations(variants.items(), 2):
#         for i, (activity1, activity2) in enumerate(zip(activities1, activities2)):
#             if activity1 != activity2:
#                 # Se ha encontrado un punto de ramificación
#                 branching_points.add((activities1[i-1] if i else 'Start', activity1, activity2))
#                 break  # No se necesitan comparar más actividades después del primer punto de divergencia
#     return branching_points


def get_activities_by_trace(log_df):
    # Agrupa el registro de eventos por 'trace' y 'variant', y recopila las actividades para cada grupo
    grouped = log_df.groupby(['trace', 'variant'])['activity'].apply(list).reset_index()
    # Convierte el dataframe agrupado en un diccionario
    trace_variants = {}
    for _, row in grouped.iterrows():
        trace, variant, activities = row['trace'], row['variant'], row['activity']
        if variant not in trace_variants:
            trace_variants[variant] = []
        trace_variants[variant].append(activities)
    return trace_variants

def find_binary_branching_points(variants):
    print(f"Variants of the process: {variants.keys()}")
    # Identifica los puntos de ramificación binarios entre todas las combinaciones de variantes
    binary_branching_points = set()
    for (variant1, activities1), (variant2, activities2) in combinations(variants.items(), 2):
        # print(f"Variants combained: {variant1} and {variant2}")
        # Encuentra la primera divergencia en las secuencias de actividades
        min_length = min(len(activities1), len(activities2))
        for i in range(min_length):
            index = 0
            while index < len(activities1[i]) and index < len(activities2[i]):
                # set1 = set(activities1[i][:index])
                # set2 = set(activities2[i][:index])
                if activities1[i][index] != activities2[i][index]: 
                    # Agrega el punto de ramificación binario como una tupla (t1, t2)
                    aux = tuple(sorted((activities1[i][index], activities2[i][index])))
                    binary_branching_points.add(aux)
                index += 1
            
    
    
    return binary_branching_points

# Uso de las funciones
variants = get_activities_by_trace(log_df)
branching_points = find_binary_branching_points(variants)
print(branching_points)


Variants of the process: dict_keys(['b2', 'b3', 'b1'])
{('B', 'C'), ('D', 'E'), ('F', 'G')}


# Human in the loop

In [256]:
branching_points = branching_points - {('B', 'C')}
branching_points


{('D', 'E'), ('F', 'G')}

# Test 1: Entropy

In [261]:
# def entropy(class_probabilities):
#     """Calculates the entropy from a list of class probabilities."""
#     return -np.sum(p * np.log2(p) for p in class_probabilities if p > 0)

# def information_gain(I1, I2, IP1, IP2, InP1, InP2):
#     """Calculates the information gain of a predicate P based on the sets of observation instances."""
#     total_size = len(I1) + len(I2)
#     p_size = len(IP1) + len(IP2)
#     np_size = len(InP1) + len(InP2)
    
#     p_entropy = entropy([len(IP1) / p_size if p_size else 0, len(IP2) / p_size if p_size else 0])
#     np_entropy = entropy([len(InP1) / np_size if np_size else 0, len(InP2) / np_size if np_size else 0])
    
#     total_entropy = entropy([len(I1) / total_size, len(I2) / total_size])
    
#     return total_entropy - p_size / total_size * p_entropy - np_size / total_size * np_entropy


# def build_conjunctive_expr(I1: pd.DataFrame, I2: pd.DataFrame, predicates):
#     """Builds the conjunctive expression that maximizes the information gain."""
#     max_ig = -1
#     best_predicate = None
#     for pred in predicates:
#         # Compute subsets where P holds and does not hold
#         IP1 = [instance for i, instance in I1.iterrows() if eval_invariant(pred, instance)]
#         IP2 = [instance for i, instance in I2.iterrows() if eval_invariant(pred, instance)]
#         InP1 = [instance for i, instance in I1.iterrows() if not eval_invariant(pred, instance)]
#         InP2 = [instance for i, instance in I2.iterrows() if not eval_invariant(pred, instance)]
        
#         # Calculate the information gain for P
#         ig = information_gain(I1, I2, IP1, IP2, InP1, InP2)
#         if ig > max_ig:
#             max_ig, best_predicate = ig, pred
    
#     return best_predicate


In [311]:
def entropy(I_size, I_prime_size):
    """
    Calculates the entropy of two sets of observation instances.

    Args:
        I_size: Number of instances in the first set.
        I_prime_size: Number of instances in the second set.

    Returns:
        The entropy of the two sets combined.
    """
    total = I_size + I_prime_size
    if total == 0:  # Avoid division by zero
        return 0
    
    p_I = I_size / total if total > 0 else 0
    p_I_prime = I_prime_size / total if total > 0 else 0

    # Calculate entropy
    entropy_I = -p_I * np.log2(p_I) if p_I > 0 else 0
    entropy_I_prime = -p_I_prime * np.log2(p_I_prime) if p_I_prime > 0 else 0
    
    return entropy_I + entropy_I_prime


def eval_invariant(line, row: pd.Series) -> bool:
    if line:
        if isinstance(line, list):
            line = ' and '.join(line)
        print(f"Line: {line}")
        res = eval(line)
    else:
        res = False
    return res


def calculate_information_gain(I: pd.DataFrame, I_prime: pd.DataFrame, P):
    """
    Calculates the information gain of a predicate P with respect to two sets of instances.

    Args:
        I: List of instances from the first set.
        I_prime: List of instances from the second set.
        P: Predicate function that partitions I and I_prime.

    Returns:
        The information gain of partitioning the instances by predicate P.
    """
    # Partition the instances based on P
    IP = [inst for i, inst in I.iterrows() if eval_invariant(P, inst)] + [inst for i, inst in I_prime.iterrows() if eval_invariant(P, inst)]
    I_not_P = [inst for i, inst in I.iterrows() if not eval_invariant(P, inst)] + [inst for i, inst in I_prime.iterrows() if not eval_invariant(P, inst)]
    
    # Calculate the initial entropy
    initial_entropy = entropy(len(I), len(I_prime))
    
    # Calculate the entropy after partitioning
    entropy_after_partition = ((len(IP) / (len(I) + len(I_prime))) * entropy(len([inst for i, inst in I.iterrows() if eval_invariant(P, inst)]), len([inst for i, inst in I_prime.iterrows() if eval_invariant(P, inst)]))
                              + (len(I_not_P) / (len(I) + len(I_prime))) * entropy(len([inst for i, inst in I.iterrows() if not eval_invariant(P, inst)]), len([inst for i, inst in I_prime.iterrows() if not eval_invariant(P, inst)])))
    
    # Calculate and return the information gain
    return initial_entropy - entropy_after_partition



In [312]:
def build_conjunctive_expr(I1, I2, P):
    """
    Builds a conjunctive expression that maximizes the information gain.

    Args:
        I1: First set of observation instances.
        I2: Second set of observation instances.
        P: A conjunctive expression represented as a list of predicates.
        calculate_information_gain: Function to calculate the information gain.

    Returns:
        A conjunction of a subset of the atoms in P that maximizes the information gain.
    """
    # Check if P is undefined
    if not P:
        return "⊥"  # Represents undefined or no invariants discovered by Daikon
    
    # Convert P into a set of predicates
    S = set(P)

    # Initialize the result P with an empty set
    P_new = []

    # Iterate until there are no predicates left to consider
    while S:
        # Pick the predicate from S that maximizes the information gain when added to P_new
        best_q, best_ig = None, float('-inf')
        for q in S:
            # Compute the information gain of adding q to P_new
            ig = calculate_information_gain(I1, I2, P_new + [q])
            if ig > best_ig:
                best_q, best_ig = q, ig
        
        # If adding the best predicate increases the information gain, add it to P_new
        if best_q and best_ig > calculate_information_gain(I1, I2, P_new):
            P_new.append(best_q)
            S.remove(best_q)
        else:
            # If no improvement can be made, break the loop
            break
    
    # Return the conjunction of predicates in P_new
    return ' AND '.join(P_new) if P_new else "⊥"


In [313]:
observation_instance = 'C_observation_instances'
invariants = read_invariants(root_path + observation_instance + '.inv.txt', True)
build_conjunctive_expr(load_ui_log(root_path + observation_instance + '.csv'), load_ui_log(root_path + 'B_observation_instances.csv'), invariants)

Line: row["length"]==None
Line: row["length"]==None
Line: row["length"]==None
Line: row["length"]==None
Line: row["length"]==None
Line: row["length"]==None
Line: row["length"]==None
Line: row["length"]==None
Line: row["length"]==None
Line: row["length"]==None
Line: row["length"]==None
Line: row["length"]==None
Line: row["length"]==None
Line: row["length"]==None
Line: row["length"]==None
Line: row["length"]==None
Line: row["length"]==None
Line: row["length"]==None
Line: row["length"]==None
Line: row["length"]==None
Line: row["length"]==None
Line: row["length"]==None
Line: row["length"]==None
Line: row["length"]==None
Line: row["length"]==None
Line: row["length"]==None
Line: row["length"]==None
Line: row["length"]==None
Line: row["length"]==None
Line: row["length"]==None
Line: row["length"]==None
Line: row["length"]==None
Line: row["length"]==None
Line: row["length"]==None
Line: row["length"]==None
Line: row["length"]==None
Line: row["length"]==None
Line: row["length"]==None
Line: row["l

'⊥'

# Test 2

In [None]:
# Función para convertir invariantes en condiciones y calcular la ganancia de información
def convert_invariants_to_conditions(dataframe, invariants):
    """
    Toma las invariantes y las convierte en condiciones booleanas aplicables al DataFrame.
    Luego calcula la ganancia de información para cada condición.
    """
    conditions = {}
    for invariant in invariants:
        # Aquí deberías convertir cada invariante a una expresión que pueda ser evaluada en tu DataFrame.
        # Esto es un ejemplo, necesitarás adaptar esta parte a tu lógica específica y formato de invariantes.
        condition = invariant.replace('!=', '!=')  # Simplificado, deberías expandir esto
        # Evaluar la condición en el DataFrame y almacenar el resultado como una nueva columna
        try:
            condition_result = eval(f'dataframe.eval("{condition}")')
            dataframe[f'cond_{invariant}'] = condition_result
            # Calcular la ganancia de información para la condición
            ig = mutual_info_score(dataframe[f'cond_{invariant}'], dataframe['outcome_column'])
            conditions[invariant] = ig
        except Exception as e:
            print(f"Error al procesar la invariante {invariant}: {e}")
    return conditions

# Función para seleccionar las condiciones más informativas
def select_conditions(conditions, threshold=0.01):
    """
    Selecciona y devuelve las condiciones con una ganancia de información superior al umbral.
    """
    selected_conditions = {cond: ig for cond, ig in conditions.items() if ig > threshold}
    return selected_conditions


In [None]:
t = 'A'
# 1. Carga tus datos en un DataFrame
dataframe = load_ui_log(obj.root_path + t+'_observation_instances.csv')

# 2. Usa discover_invariants para encontrar invariantes
output_path = discover_invariants(obj.root_path, t+'_observation_instances')

# 3. Lee las invariantes desde el archivo generado
invariants = read_invariants(output_path)

# 4. Convierte invariantes a condiciones y calcula la ganancia de información
conditions = convert_invariants_to_conditions(dataframe, invariants)

# 5. Selecciona las condiciones más informativas basadas en un umbral de ganancia de información
selected_conditions = select_conditions(conditions)

# Ahora tienes las condiciones más informativas que puedes usar para entender mejor tu proceso de negocio.
print(selected_conditions)

Archivos .decls y .dtrace para A_observation_instances generados con éxito
Error al procesar la invariante length >= 1.0: 'outcome_column'
Error al procesar la invariante installement >= 1.0: 'outcome_column'
Error al procesar la invariante length != salary: 'outcome_column'
Error al procesar la invariante salary != age: 'outcome_column'
Error al procesar la invariante salary != amount: 'outcome_column'
Error al procesar la invariante salary != installement: 'outcome_column'
Error al procesar la invariante age != installement: 'outcome_column'
{}


In [None]:

def derive_branching_conditions(self, observation_instances_file='observation_instances.json'):
      # Cargar instancias de observación
      with open(self.root_path + observation_instances_file, 'r') as file:
          observation_instances = json.load(file)
      
      # Convertir instancias de observación a DataFrame para el aprendizaje
      data = []
      labels = []
      for task, instances in observation_instances.items():
          for instance in instances:
              # Aquí, 'task' actúa como etiqueta y 'instance' son las características
              labels.append(task)
              data.append(instance)
      
      df = pd.DataFrame(data)
      df.fillna(0, inplace=True)  # Tratar valores NaN
      
      # Entrenar un árbol de decisión
      clf = DecisionTreeClassifier(random_state=0)
      clf.fit(df, labels)
      
      # Convertir el árbol de decisión a reglas de texto
      tree_rules = export_text(clf, feature_names=list(df.columns))
      print(tree_rules)