In [4]:
import sys
import os
import pandas as pd
import pm4py
from pm4py.objects.conversion.log import converter as xes_converter
from pm4py.algo.filtering.log.attributes import attributes_filter
from pm4py.objects.log.obj import EventLog, Trace, Event
import math
import csv
from CminSampler import CminSampler
import constants
import os
import glob
import sys
import matplotlib.pyplot as plt
import format_logistic as fl

In [5]:
def read_dfs(df_path: str) -> []:
    '''reads all csv files for sampling'''
    
    # Use os.fchdir() method to change the dir
    fd = os.open(df_path, os.O_RDONLY )
    os.fchdir(fd)
    
    # use glob to get all the csv files in the folder
    path = os.getcwd()
    csv_files = glob.glob(os.path.join(path, "*.csv"))
    
    dataframes = []
    
    # loop over the list of csv files
    for f in csv_files:
        
        # get src and dest loc from the file name
        src_dest_loc = f.split('/')[-1].split('_')[1:]
        src_loc = src_dest_loc[0]
        dest_loc = src_dest_loc[1].split('.')[0]
        
        # read the csv file
        df = pd.read_csv(f)
        # add source and destination location
        df[constants.src_dest_attributes[0]] = src_loc
        df[constants.src_dest_attributes[1]] = dest_loc
        dataframes.append(df)
        
    return dataframes

In [6]:
def convert_to_logs(dataframes: []):
    '''converts the dataframes into event logs'''
    
    # Initialize an empty list for the logs
    logs = []
    for dataframe in dataframes:
        log = xes_converter.apply(dataframe, variant=xes_converter.Variants.TO_EVENT_LOG) 
        logs.append(log)
    return logs

In [7]:
def sample_cases(dataframes: [], no_of_cases:[]) -> []:
    '''samples cases from the dataframes'''
    
    sampled_dfs = [0]*len(dataframes) # empty dataframes list to store sampled dataframes
    sampled_dfs_idx = [] # marks the dataframe that has already been sampled
    
    # the percentage of cases to sample
    for case_percentage in [0.10, 0.20, 0.30, 0.40, 0.50]:
        for idx, dataframe in zip(range(0, len(no_of_cases)), dataframes):
            # dataframe[idx] is not sampled
            if idx not in sampled_dfs_idx:
                if case_percentage != 0.50:
                    #the dataframe has only one case, sampling not needed
                    if no_of_cases[idx] == 1:
                        sampled_dfs[idx] = dataframe
                        sampled_dfs_idx.append(i)
                    # sample cases based on percentage
                    else:
                        no_of_cases_to_sample = math.ceil(no_of_cases[i]*case_percentage)
                        # use cminsampler to sample cases
                        sampler = CminSampler(no_of_cases_to_sample)
                        sampler.load_df(dataframe, constants.attr_case, constants.attr_event)
                        sampled_cases = sampler.sample()
                        # filter the dataframe only retaining the sampled cases
                        sampled_df = dataframe[(dataframe[constants.attr_case].isin(sampled_cases))]
                        # check how many unique equipments the sampling method covered
                        all_unique_equipments = set(dataframe[constants.attr_event].to_list())
                        sampled_unique_equipments = set(sampled_df[constants.attr_event].to_list())
                        equipment_coverage = len(sampled_unique_equipments)/len(all_unique_equipments)
                        # equipment coverage less than 80%, increase case percentage and sample again
                        if equipment_coverage >= 0.80:
                            # append the sampled dataframe to the list
                            sampled_dfs[idx] = sampled_df
                            # mark the dataframe as sampled
                            sampled_dfs_idx.append(idx)
                else:
                    # case percentage 50%, stop sampling
                    no_of_cases_to_sample = math.ceil(no_of_cases[idx]*case_percentage)
                    sampler = CminSampler(no_of_cases_to_sample)
                    sampler.load_df(dataframe, constants.attr_case, constants.attr_event)
                    sampled_cases = sampler.sample()
                    # filter the dataframe only retaining the sampled cases
                    sampled_df = dataframe[(dataframe[constants.attr_case].isin(sampled_cases))]
                    # append the sampled dataframe to the list
                    sampled_dfs[idx] = sampled_df
                    # mark the dataframe as sampled
                    sampled_dfs_idx.append(i)
    
    return sampled_dfs

In [None]:
datafiles = './2022_csv_rectified_logs' # contains all the csv data, change according to the name of the directory where files are saved
# read the dataframes and convert to logs
dataframes = read_dfs(lc.datafiles)
logs = convert_to_logs(dataframes)
# no of cases in each log
no_of_cases = [len(log) for log in logs]
# sample the dataframes
sampled_dfs = sample_cases(dataframes, no_of_cases)
# concatenate the sampled dataframes into one
sampled_df = pd.concat(sampled_dfs)
# convert the dataframe into logistic format
sampled_df = fl.format_dataframe(sampled_df)
# save the concatenated sampled dataframe
sampled_df.to_csv('logistic_sampled.csv') 