### Setup and Import Section ###

In [None]:
# replicateExp.ipynb
import os
import sys # For robust Colab detection

# --- Setup Section ---
# This part of the code takes care of preparing the environment, both on Colab and locally.

# Check if we are on Google Colab
# A more robust way to check if the code is running in Google Colab
IS_COLAB = 'google.colab' in sys.modules

if IS_COLAB:
    print("Running on Google Colab. Setting up environment...")
    # Name of your GitHub repository - IMPORTANT: MAKE SURE THIS MATCHES YOUR REPO NAME
    repo_name = "REMEMBER" # This should be the actual name of your repository folder
    # Ensure the URL is correct for your repository
    repo_url = f"https://github.com/bistrulli/{repo_name}.git"

    # Clone the repository if the directory doesn't already exist
    if not os.path.exists(repo_name):
        print(f"Cloning repository '{repo_name}' from {repo_url}...")
        !git clone {repo_url}
        if os.path.exists(repo_name):
            print(f"Repository '{repo_name}' cloned successfully.")
        else:
            print(f"Error: Cloning failed or repository directory not found as '{repo_name}'.")
            # Attempt to list current directory contents to help debug
            print("Current directory contents:")
            !ls -a
    else:
        print(f"Repository directory '{repo_name}' already exists. Skipping clone.")

    # Change to the repository directory if it exists
    if os.path.isdir(repo_name):
        os.chdir(repo_name)
        print(f"Changed directory to: {os.getcwd()}")
    else:
        print(f"Warning: Could not change to directory '{repo_name}'. It does not exist or is not a directory.")
        print(f"Current directory remains: {os.getcwd()}")
else:
    import argparse
    print("Not running on Google Colab (or Colab detection failed). Assuming local environment.")
    # You might want to add any local-specific setup here if needed
# --- End GitHub Setup Section ---

# Install OpenJDK 17
print("Updating package list...")
!sudo apt-get update -qq > /dev/null
print("Installing OpenJDK 17...")
!sudo apt-get install -y openjdk-17-jdk -qq > /dev/null
print("Setting JAVA_HOME environment variable...")
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-17-openjdk-amd64"
# Verify Java version
print("Verifying Java installation...")
!java -version
print("Java 17 setup complete.")

print("Installing required packages...")
!pip install -r requirements.txt
print("Package installation attempt complete.")

# Attempt to import pm4py to verify installation, especially on Colab
try:
    import pm4py
    print("pm4py imported successfully.")
except ImportError:
    print("Error: pm4py (or one of its dependencies) could not be imported after installation.")


: 

In [1]:

from vlmcProcessMining import *
import numpy as np
import os
import subprocess
import sys
import json
import re
from scipy.io import savemat
from pathlib import Path
import pandas as pd
from sklearn.model_selection import train_test_split
import pathlib
import shutil
import tqdm
import time
import glob
import warnings
import xml.parsers.expat
import warnings
from pm4py.objects.log.importer.xes import importer as xes_importer
import pm4py
from pm4py.objects.log.importer.xes import importer as xes_importer
from pm4py.objects.log.exporter.xes import exporter as xes_exporter
from pm4py.objects.petri_net.obj import PetriNet, Marking
from pm4py.objects.petri_net.importer import importer as pnml_importer
from pm4py.algo.simulation.playout.petri_net import algorithm as simulator
from pm4py.statistics.variants.log import get as variants_module
from pm4py.objects.conversion.log import converter as log_converter
from pm4py.objects.log.util import dataframe_utils
import gzip
import tempfile
import atexit

import matplotlib.pyplot as plt
from scipy import stats


ldir=Path(os.getcwd())/"likelyhood"
dataDir=Path(os.getcwd())/"data"
vlmcDir=dataDir/"VLMC"
vlmcDir.mkdir(parents=True, exist_ok=True)
convertedDir=dataDir/"converted"
convertedDir.mkdir(parents=True, exist_ok=True)
trainLogDir=dataDir/"2-splitlogs"
testLogDir=dataDir/"2-splitlogstest"

lognames=['Road_Traffic_Fine_Management_Process']

def getVLMCName(inputFile):
    vlmcName=None
    inputFile=Path(inputFile)
    if inputFile.suffix == '.gz':
        vlmcName='.'.join(inputFile.stem.split('.')[0:-1])
    else:
        vlmcName=inputFile.stem
    return vlmcName
    

: 

### Mine The VLMC From an Event Log ###

In [2]:
def createVLMC(inputFile):
    readInputFile(inputFile)
    vlmcName=getVLMCName(inputFile)
    st=time.time()
    mineProcess(ecfFile=f"{os.getcwd()}/data/VLMC/{vlmcName}.ecf",
    infile=f"{os.getcwd()}/data/converted/{vlmcName}.txt",
    vlmcfile=f"{os.getcwd()}/data/VLMC/{vlmcName}.vlmc", 
    nsim="1", ntime="1", alfa="1")
    vlmcTime=time.time()-st
    print(f"VLMC Mined In {vlmcTime}")
    return vlmcTime

### Compute Likelihood With VLMC ###

In [3]:
def getVLMCLikelyhood(vlmcName=None,traceFile=None):
    st=time.time()
    ecfFile=pathlib.Path(f"{os.getcwd()}/data/VLMC/{vlmcName}.ecf").absolute()
    infile=pathlib.Path(f"{os.getcwd()}/data/converted/{vlmcName}.txt").absolute()
    vlmcfile=pathlib.Path(f"{os.getcwd()}/data/VLMC/{vlmcName}2.vlmc").absolute()
    vlmc=pathlib.Path(f"{os.getcwd()}/data/VLMC/{vlmcName}.vlmc").absolute()
    
    cwd=(ldir/pathlib.Path(traceFile.name.split(".")[0]))
    cwd.mkdir(parents=True, exist_ok=True)

    getLikelyhood(
        ecfFile=str(ecfFile),
        infile=str(infile), #dataset used to learn the vlmc
        vlmc=str(vlmc), #input VLMC to use
        vlmcfile=str(vlmcfile),  #output VLMC
        traces=str(traceFile), #traces to compute likelyhood
        cwd=str(cwd),
        outFile="out.mat",nsim="1", ntime="1",alfa="1")
    liktime=time.time()-st
    return liktime

### Compute uEMSC With VLMC Likelihood ###

In [5]:
def uEMSCVLMC(inputLan=None,vlmcName=None):
    st=time.time()
    cwd=(ldir/pathlib.Path(inputLan.name.split(".")[0]))
    cwd.mkdir(parents=True, exist_ok=True)
    
    traceLikName=".".join(inputLan.name.split(".")[0:-1])+".lik"
    traceLikFile=cwd/pathlib.Path(traceLikName)
    
    modelLikName=vlmcName+".vlmc.lik"
    modelLikFile=cwd/pathlib.Path(modelLikName)
    
    uEMSC=computeMuEMSC(traceLik=traceLikFile,modelLik=modelLikFile)
    emsctime=time.time()-st
    return uEMSC,emsctime

### Collect VLMC Results

In [8]:
def collectVLMCRes(resDir):
    """Collects VLMC results from .uemsc files.

    This function searches for files with the '.uemsc' extension within the directory specified as input (and its subdirectories).
    For each found file, it extracts:
        - The log name (derived from the filename).
        - The split index (extracted from the filename using a regular expression).
        - The uEMSC value (read from the file content).

    Returns:
        pandas.DataFrame: A DataFrame containing the collected results with columns 'logname', 'split', and 'uemsc'.
    """
    # Find all files with .uemsc extension in resDir and its subdirectories
    matching_files = list(resDir.rglob("*.uemsc"))
    results=[]
    # Iterate through each found file
    for file in matching_files:
        filePath=Path(file)
        # Extract Logname from the filename (part before the first dot)
        Logname=filePath.name.split(".")[0]
        # Extract split index using regex (digits between 'gz' and '.uemsc')
        splitIdx=int(re.findall(r'gz(\d+)',filePath.name)[0])
        # Load the uEMSC value from the file
        uemsc=np.loadtxt(filePath)
        # Add the extracted information as a new row in the results list
        results+=[[Logname,splitIdx,uemsc]]
    # Convert the results list to a Pandas DataFrame
    return pd.DataFrame(results,columns=["logname","split","uemsc"])

In [9]:
def collectVLMCTime(resDir):
    """Collects VLMC timing results from .time files.

    This function searches for files with the '.time' extension within the directory specified by 'resDir'
    (and its subdirectories).
    For each found file, it extracts:
        - The log name (derived from the filename).
        - The split index (extracted from the filename using a regular expression).
        - Timing values (mctime, liktime, uemsctime) read from the file content.

    Args:
        resDir (pathlib.Path): The directory to search for .time files. This parameter is required.

    Returns:
        pandas.DataFrame: A DataFrame containing the collected timing results with columns
                          'logname', 'split', 'mctime', 'liktime', and 'uemsctime'.
                          
    Raises:
        AttributeError: If resDir is None or not a valid Path object that supports rglob.
    """
    # Find all files with .time extension in the specified directory and its subdirectories
    # The resDir parameter is now directly used.
    matching_files = list(resDir.rglob("*.time"))
    results=[]
    # Iterate through each found file
    for file in matching_files:
        filePath=Path(file)
        # Extract Logname from the filename (part before the first dot)
        Logname=filePath.name.split(".")[0]
        # Extract split index using regex (digits between 'gz' and '.time')
        splitIdx=int(re.findall(r'gz(\d+)',filePath.name)[0])
        # Load the timing data from the file (expected to be an array-like structure)
        time_data=np.loadtxt(filePath)
        #print(Logname,splitIdx,time_data) # Original print statement, commented out
        # Add the extracted information as a new row in the results list
        # Assumes time_data contains at least three elements for mctime, liktime, and uemsctime
        results+=[[Logname,splitIdx,time_data[0],time_data[1],time_data[2]]]
    # Convert the results list to a Pandas DataFrame
    return pd.DataFrame(results,columns=["logname","split","mctime","liktime","uemsctime"])

### Road Traffic Fines Management Example

In [6]:
def computeVLMCSCC():
    print("########################################################")
    print("# This function orchestrates an end-to-end conformance checking pipeline using the Road Traffic Fine Management Process Dataset")
    print("# It involves mining a VLMC model from a training log, then evaluating test logs against this model")
    print("# by computing their likelihood and uEMSC scores.") 
    print("# For the sake of the brevity, we will use only a single training log and two test log, however the code")
    print("# can be easily adapted to use different training log and is already designed to work with multiple test logs.")
    print("########################################################")
    

    # Find all training log files matching the pattern in the trainLogDir
    train_logs = list(trainLogDir.rglob("*.xes.gz[0].xes.gz"))
    # Find all test log files matching the pattern in the testLogDir
    test_logs = list(testLogDir.rglob("*.xes.gz[0-1]_test.xes.gz"))
    # Mine VLMC Model from Event Log (using the first training log)
    inputTrace = train_logs[0]
    # Create the VLMC model and record the mining time
    mctime = createVLMC(inputFile=inputTrace)
    # Extract the base name for the VLMC from the input filename
    vlmcName = ".".join(inputTrace.name.split(".")[0:-2])
    # Construct the expected path for the training trace language file
    traceLan = ldir / Path(inputTrace.name.split(".")[0]) / Path(f"{vlmcName}_trace.lan")

    # Loop over each test log file
    for testTrace in test_logs:
        testname=".".join(testTrace.stem.split(".")[0:-1])
        # Process the test trace file (convert to suitable format)
        # and compute the stochatic languange of the input test trace
        readInputFile(inputFile=testTrace)

        # Compute Test Language Likelihood (likelihood of the test trace given the VLMC)
        # Construct the expected path for the test trace language file
        testLan = ldir / Path(testTrace.name.split(".")[0]) / Path(f"{testname}_trace.lan")
        # Calculate the likelihood and record the time
        liktime = getVLMCLikelyhood(vlmcName=vlmcName, traceFile=testLan)

        # Compute uEMSC (micro Event-based Sequence Conformance)
        # Calculate the uEMSC and record the time
        uEMSC, uemsctime = uEMSCVLMC(inputLan=testLan, vlmcName=vlmcName)

        # Save the computed uEMSC and execution times to files
        # Construct the path for the uEMSC result file
        uEMSCFile = ldir / Path(testTrace.name.split(".")[0]) / Path(f"{testTrace.name}.uemsc")
        # Construct the path for the time results file
        uEMSCTimeFile = ldir / Path(testTrace.name.split(".")[0]) / Path(f"{testTrace.name}.time")
        # Save the uEMSC value
        np.savetxt(str(uEMSCFile), [uEMSC])
        # Save the mining time, likelihood time, and uEMSC time
        np.savetxt(str(uEMSCTimeFile), [mctime, liktime, uemsctime])


### Compute Stochastic Conformance Checking via VLMC    

In [None]:
computeVLMCSCC()
vlmcRes=collectVLMCRes(resDir=ldir)

### Plot Results

In [None]:
# Update global matplotlib parameters to set the default font size for plots to 18
plt.rcParams.update({'font.size': 18})

# Define dictionary for custom styling of median properties in the box plot
# 'linestyle=None' and 'linewidth=0' effectively make the median line invisible
# 'color='firebrick'' sets the color, though it won't be visible with linewidth 0
medianprops = dict(linestyle=None, linewidth=0, color='firebrick')

# Define dictionary for custom styling of mean line properties in the box plot
# 'linestyle="-"' sets the mean line to be a solid line
# 'linewidth=1.5' sets the thickness of the mean line
# 'color='orange'' sets the color of the mean line to orange
meanprops = dict(linestyle="-", linewidth=1.5, color='orange')

# Loop through each log name in the 'lognames' list (assuming 'lognames' is defined elsewhere)
# 'enumerate' provides both the index (i) and the value (lname) for each item
for i, lname in enumerate(lognames):
    # Initialize an empty list 'x' to store the data arrays for the box plot for the current log name
    x = []
    # Initialize an empty list 'names' to store the labels for the box plot(s) for the current log name
    names = []

    # Assume 'vlmcRes' is a Pandas DataFrame containing the results.
    # Filter 'vlmcRes' to get rows where 'logname' matches the current 'lname'.
    # From these filtered rows, extract the 'uemsc' column values as a NumPy array.
    # Append this array of uEMSC values to the list 'x'.
    x += [vlmcRes[vlmcRes["logname"] == lname]["uemsc"].values]
    # Append the label "VLMC" to the 'names' list. This will be the label for the box plot.
    names += ["VLMC"]
            
    # Create a new matplotlib Figure object for the plot.
    # 'figsize=(10, 4)' sets the width to 10 inches and height to 4 inches.
    fig = plt.figure(figsize=(10, 4))
    
    # Generate the box plot using the data and configurations:
    #   x: The list of data arrays (in this case, a list containing one array of uEMSC values).
    #   tick_labels: The labels for each box on the x-axis (in this case, ["VLMC"]).
    #   showmeans=True: Instructs matplotlib to calculate and show the mean.
    #   meanline=True: Instructs matplotlib to represent the mean as a line (using 'meanprops').
    #   medianprops: Applies the custom styling defined earlier for the median.
    #   meanprops: Applies the custom styling defined earlier for the mean line.
    plt.boxplot(x, tick_labels=names, showmeans=True, meanline=True, medianprops=medianprops, meanprops=meanprops)
    
    # Set the label for the y-axis of the plot.
    plt.ylabel("uEMSC")
    # Enable the grid on the plot for better readability of values.
    plt.grid()
    # Rotate the tick labels on the x-axis by 30 degrees.
    # This is useful if the labels are long and might otherwise overlap.
    plt.xticks(rotation=30)
    plt.title("uEMSC distrbutions on all the tested test logs")