In [4]:
import pandas as pd
import pm4py
from pathlib import Path

In [5]:
datasets = ['BPI2017', 'BPI2019_1', 'Hospital_Billing', 'Sepsis', 'RTFMP']
raw_path = Path('../data/raw')
processed_path = Path('../data/interim/processed_logs')

In [6]:
def get_log_stats(datasets):

    log_stats = {}

    for dataset in datasets:
        log_stats[dataset] = {
            'raw': {},
            'processed': {}
        }

        raw_log = pm4py.read_xes(f'../data/raw/{dataset}.xes')
        processed_log = pm4py.read_xes(f'../data/interim/processed_logs/{dataset}.xes')

        for log_type, log in [('raw', raw_log), ('processed', processed_log)]:

            timestamps = pm4py.get_event_attribute_values(log, 'time:timestamp')
            time_length = (max(timestamps) - min(timestamps)).days
            case_count = sum(pm4py.get_variants(log).values())
            variant_count = len(pm4py.get_variants(log))
            event_count = len(log)
            activity_count = len(pm4py.get_event_attribute_values(log, 'concept:name'))
            dfg, _, _ = pm4py.discover_dfg(log)
            df_distinct_count = len(dfg)
            df_total_count = sum(dfg.values())

            log_stats[dataset][log_type] = {
                'time length': time_length,
                '# cases': case_count,
                '# variants': variant_count,
                '# events': event_count,
                '# activities': activity_count,
                '# DFs': df_distinct_count,
                '# DFs occurrences': df_total_count
            }

    return log_stats

In [7]:
log_stats = get_log_stats(datasets)

parsing log, completed traces ::   0%|          | 0/42995 [00:00<?, ?it/s]

parsing log, completed traces ::   0%|          | 0/40229 [00:00<?, ?it/s]

parsing log, completed traces ::   0%|          | 0/221010 [00:00<?, ?it/s]

parsing log, completed traces ::   0%|          | 0/197521 [00:00<?, ?it/s]

parsing log, completed traces ::   0%|          | 0/90604 [00:00<?, ?it/s]

parsing log, completed traces ::   0%|          | 0/78828 [00:00<?, ?it/s]

parsing log, completed traces ::   0%|          | 0/1050 [00:00<?, ?it/s]

parsing log, completed traces ::   0%|          | 0/999 [00:00<?, ?it/s]

parsing log, completed traces ::   0%|          | 0/138260 [00:00<?, ?it/s]

parsing log, completed traces ::   0%|          | 0/112368 [00:00<?, ?it/s]

In [8]:
def save_stats_to_csv(stats_dict, output_path='event_log_stats.csv'):
    """
    Convert the nested dictionary of event log statistics to CSV format and save it.

    Parameters:
    -----------
    stats_dict : dict
        Nested dictionary with dataset statistics
    output_path : str or Path
        Path where the CSV file will be saved
    """
    # Create a list to hold flattened records
    records = []

    # Iterate through the nested dictionary and flatten it
    for dataset, versions in stats_dict.items():
        for version, stats in versions.items():
            # Skip entries with errors
            if 'error' in stats:
                continue

            # Create a flat record with dataset and version
            record = {'dataset': dataset, 'version': version}

            # Add all statistics to the record
            record.update(stats)

            # Add to the list of records
            records.append(record)

    # Convert to DataFrame
    df = pd.DataFrame(records)

    # Reorder columns for better readability
    if 'dataset' in df.columns and 'version' in df.columns:
        cols = ['dataset', 'version']
        other_cols = [col for col in df.columns if col not in cols]
        df = df[cols + other_cols]

    # Save to CSV
    df.to_csv(output_path, index=False)
    print(f"Statistics saved to {output_path}")

    return df

In [9]:
stats_df = save_stats_to_csv(log_stats, output_path='../logs/data_preprocess/event_log_statistics.csv')

Statistics saved to ../logs/data_preprocess/event_log_statistics.csv
