Load the "MainProcess.xes" file

In [None]:
import os
from pm4py.objects.log.importer.xes import importer as xes_importer
from pm4py.algo.discovery.heuristics import algorithm as heuristics_miner
from pm4py.visualization.petri_net import visualizer as pn_visualizer
import pandas as pd

# Directory with your .xes files
xes_directory = os.path.join(os.getcwd(), "20130794", "Cleaned Event Log")

# Output directory for models or visuals
output_directory = os.path.join(os.getcwd(), "output")
os.makedirs(output_directory, exist_ok=True)

# Process MainProcess.xes file
filename = "MainProcess.xes"
file_path = os.path.join(xes_directory, filename)
print(f"Processing {filename}")

try:
    log = xes_importer.apply(file_path)
    print("\n\n")
    print(f"Imported {filename} with {len(log)} traces.")
    
    # Print important information about the log
    print(f"Number of events: {sum(len(trace) for trace in log)}")
    activities = set(event["concept:name"] for trace in log for event in trace if "concept:name" in event)
    print(f"Number of unique activities: {len(activities)}")
    print(f"Unique activities: {activities}")
    case_ids = [trace.attributes["concept:name"] for trace in log if "concept:name" in trace.attributes]
    print(f"Number of cases: {len(case_ids)}")
    print(f"First 5 case IDs: {case_ids[:5]}")
    print("\n\n")

except Exception as e:
    print(f"Error processing {filename}: {e}")


Get all the attributes included in this file ("MainProcess.xes")

In [None]:
try:
    # List all attributes of the traces in a table
    print("\n\nAttributes of traces:\n")
    
    trace_attributes = set()
    for trace in log:
        trace_attributes.update(trace.attributes.keys())
        
    trace_attr_info = []
    for attr in trace_attributes:
        attr_type = "unknown"
        for trace in log:
            if attr in trace.attributes:
                attr_type = type(trace.attributes[attr]).__name__
                break
        trace_attr_info.append({"Attribute": attr, "Type": attr_type})

    trace_attr_df = pd.DataFrame(trace_attr_info)
    display(trace_attr_df)
    # Save the attribute DataFrame to an Excel file in a "tables" subfolder
    tables_dir = os.path.join(os.getcwd(), "tables")
    os.makedirs(tables_dir, exist_ok=True)
    trace_attr_df.to_excel(os.path.join(tables_dir, "main_trace_attribute_info.xlsx"), index=False)
    
    # List all attributes of the events in a table
    print("\n\nAttributes of events:\n")
    
    event_attributes = set()
    for trace in log:
        for event in trace:
            event_attributes.update(event.keys())

    event_attr_info = []
    for attr in event_attributes:
        attr_type = "unknown"
        for trace in log:
            for event in trace:
                if attr in event:
                    attr_type = type(event[attr]).__name__
                    break
            if attr_type != "unknown":
                break
        event_attr_info.append({"Attribute": attr, "Type": attr_type})

    event_attr_df = pd.DataFrame(event_attr_info)
    display(event_attr_df)
    # Save the attribute DataFrame to an Excel file in a "tables" subfolder
    tables_dir = os.path.join(os.getcwd(), "tables")
    os.makedirs(tables_dir, exist_ok=True)
    event_attr_df.to_excel(os.path.join(tables_dir, "main_event_attribute_info.xlsx"), index=False)
except Exception as e:
    print(f"Error processing the log: {e}")


List all the resources of the log in a table

In [None]:
try:
    # Set pandas display options for full width
    pd.set_option('display.max_colwidth', None)
    pd.set_option('display.width', 0)
    pd.set_option('display.max_columns', None)

    resources = set()
    for trace in log:
        for event in trace:
            if "org:resource" in event:
                resources.add(event["org:resource"])

    resource_info = []
    for resource in resources:
        # Find the first SubProcessID for this resource by searching events until found
        first_subprocess_id = "N/A"
        parameters_dict = None
        found = False
        for trace in log:
                for event in trace:
                    if event.get("org:resource") == resource:
                        if "SubProcessID" in event:
                            first_subprocess_id = event["SubProcessID"]
                            found = True
                            break
                if found:
                    break
                
        found = False
        for trace in log:
                for event in trace:
                    if event.get("org:resource") == resource:
                        if "parameters" in event:
                            parameters_dict = event["parameters"]
                            found = True
                            break
                if found:
                    break
        event_count = sum(1 for trace in log for event in trace if event.get("org:resource") == resource)
        activities_performed = set(event["concept:name"] for trace in log for event in trace if event.get("org:resource") == resource)
        parameter_keys = list(parameters_dict['children'][i][0] for i in range(len(parameters_dict['children']))) if parameters_dict and 'children' in parameters_dict else []
        resource_info.append({
            "Resource": resource,
            "Event Count": event_count,
            "Unique Activities": len(activities_performed),
            "Activities": ", ".join(sorted(activities_performed)),
            "First Subprocess ID": first_subprocess_id,
            "Parameter Keys": parameter_keys
        })

    resource_df = pd.DataFrame(resource_info)
    display(resource_df)
    # Save the attribute DataFrame to an Excel file in a "tables" subfolder
    tables_dir = os.path.join(os.getcwd(), "tables")
    os.makedirs(tables_dir, exist_ok=True)
    resource_df.to_excel(os.path.join(tables_dir, "resources_info.xlsx"), index=False)

except Exception as e:
    print(f"Error processing the log: {e}")

Load a single subevent log file

In [None]:
# Process 0a0a7c16-85d9-48be-a7d5-32931240c337.xes file
filename = "0a0a7c16-85d9-48be-a7d5-32931240c337.xes"
file_path = os.path.join(xes_directory, filename)
print(f"Processing {filename}")

try:
    subevent_log = xes_importer.apply(file_path)
    print("\n\n")
    print(f"Imported {filename} with {len(subevent_log)} traces.")

    # Print important information about the subevent_log
    print(f"Number of events: {sum(len(trace) for trace in subevent_log)}")
    activities = set(event["concept:name"] for trace in subevent_log for event in trace if "concept:name" in event)
    print(f"Number of unique activities: {len(activities)}")
    print(f"Unique activities: {activities}")
    case_ids = [trace.attributes["concept:name"] for trace in subevent_log if "concept:name" in trace.attributes]
    print(f"Number of cases: {len(case_ids)}")
    print(f"First 5 case IDs: {case_ids[:5]}")
    print("\n\n")

except Exception as e:
    print(f"Error processing {filename}: {e}")

List all the attributes in this file

In [None]:
try:
    # List all attributes in the subevent_log in a table and display nicely in Jupyter Notebook
    all_attributes = set()
    for trace in subevent_log:
        all_attributes.update(trace.attributes.keys())
        for event in trace:
            all_attributes.update(event.keys())

    # Prepare attribute type information
    attr_info = []
    for attr in all_attributes:
        if attr in subevent_log[0].attributes:
            attr_type = type(subevent_log[0].attributes[attr]).__name__
        elif len(subevent_log[0]) > 0 and attr in subevent_log[0][0]:
            attr_type = type(subevent_log[0][0][attr]).__name__
        else:
            attr_type = "unknown"
        attr_info.append({"Attribute": attr, "Type": attr_type})

    # Display as a pandas DataFrame
    attr_df = pd.DataFrame(attr_info)
    display(attr_df)
    # Save the attribute DataFrame to an Excel file in a "tables" subfolder
    tables_dir = os.path.join(os.getcwd(), "tables")
    os.makedirs(tables_dir, exist_ok=True)
    attr_df.to_excel(os.path.join(tables_dir, "sub_attribute_info.xlsx"), index=False)

except Exception as e:
    print(f"Error processing the subevent_log: {e}")

Get all the sensor data included per resource

In [None]:
import os
import xml.etree.ElementTree as ET
import pandas as pd
from pm4py.objects.log.importer.xes import importer as xes_importer
from IPython.display import display

# Helper: parse datastream list element into a dict structure
def parse_datastream_from_event_xml(event_xml):
    # Namespace for XES
    ns = {"xes": "http://code.deckfour.org/xes"}
    # datastream list(s) under this event
    datastreams = event_xml.findall("xes:list[@key='stream:datastream']", ns)
    if not datastreams:
        return None

    # We'll collect multiple datastream lists if present (usually one). For simplicity,
    # if more than one datastream list exists we'll merge their children.
    merged = {"children": {}}
    for datastream in datastreams:
        # Each 'point' is a nested list inside the datastream
        for idx, point in enumerate(datastream.findall("xes:list", ns)):
            # create sensor_point with attributes from the point element (attributes are in the 'stream' namespace)
            # the stream namespace is: https://cpee.org/datastream/datastream.xesext
            stream_ns = "https://cpee.org/datastream/datastream.xesext"
            sensor_point = {
                "stream:system": point.attrib.get(f"{{{stream_ns}}}system"),
                "stream:system_type": point.attrib.get(f"{{{stream_ns}}}system_type"),
                "stream:observation": point.attrib.get(f"{{{stream_ns}}}observation"),
                "stream:procedure_type": point.attrib.get(f"{{{stream_ns}}}procedure_type"),
                "stream:interaction_type": point.attrib.get(f"{{{stream_ns}}}interaction_type"),
                "children": {}
            }

            # child elements inside the point (like <date key="stream:timestamp" value="..."/>)
            for child in point:
                # child.attrib typically has 'key' and 'value'
                key = child.attrib.get("key")
                val = child.attrib.get("value")
                if key:
                    sensor_point["children"][key] = val

            # name the sensor node: use the 'key' attribute of the point element if available,
            # otherwise create an indexed name to keep them unique.
            sensor_key = point.attrib.get("key", f"stream:point_{idx}")
            # If same key already exists, create an indexed unique name
            if sensor_key in merged["children"]:
                # find a unique postfix
                i = 1
                new_key = f"{sensor_key}_{i}"
                while new_key in merged["children"]:
                    i += 1
                    new_key = f"{sensor_key}_{i}"
                sensor_key = new_key

            merged["children"][sensor_key] = sensor_point

    return merged


# Helper: attach parsed sensor dicts to pm4py events (in-memory)
def attach_sensor_data_to_pm4py_log(pm4py_log, xes_file_path):
    ns = {"xes": "http://code.deckfour.org/xes"}
    tree = ET.parse(xes_file_path)
    root = tree.getroot()

    xml_traces = root.findall(".//xes:trace", ns)

    if len(xml_traces) != len(pm4py_log):
        print(f"Warning: number of XML traces ({len(xml_traces)}) != number of pm4py traces ({len(pm4py_log)}). "
              "We'll attach for the minimum of both and skip extras.")

    n_traces = min(len(xml_traces), len(pm4py_log))

    for t_idx in range(n_traces):
        xml_trace = xml_traces[t_idx]
        pm_trace = pm4py_log[t_idx]

        xml_events = xml_trace.findall("xes:event", ns)
        pm_events = list(pm_trace)

        if len(xml_events) != len(pm_events):
            # warn but still attach for min length (common case: may still match)
            print(f"Warning: trace {t_idx} has {len(xml_events)} XML events vs {len(pm_events)} pm4py events. "
                  "Attaching up to the minimum matched events in order.")

        n_events = min(len(xml_events), len(pm_events))
        for e_idx in range(n_events):
            xml_event = xml_events[e_idx]
            pm_event = pm_events[e_idx]

            datastream_dict = parse_datastream_from_event_xml(xml_event)
            if datastream_dict:
                # attach the Python dict directly into the pm4py event
                pm_event["stream:datastream"] = datastream_dict

    # returns pm4py_log mutated in place
    return pm4py_log

def get_sensor_data_from_subfile(subprocess_log):
    sensor_dict = {}
    
    # Go through all events and extract sensor data if present
    for trace in subprocess_log:
        for event in trace:
            if "stream:datastream" in event:
                sensor_data = event["stream:datastream"]
                resource_name = event.get("org:resource", "Unknown Resource")
                sensor_info = []
                
                # sensor_data is a dict with 'children' key containing sensor points
                if "children" in sensor_data:
                    for sensor_name, sensor_point in sensor_data["children"].items():
                        # sensor_name is usually 'stream:point' or similar
                        if isinstance(sensor_point, dict) and "children" in sensor_point:
                            sensor_details = sensor_point["children"]
                            timestamp = sensor_details.get("stream:timestamp")
                            value = sensor_details.get("stream:value")
                            system = sensor_point.get("stream:system")
                            system_type = sensor_point.get("stream:system_type")
                            observation = sensor_point.get("stream:observation")
                            procedure_type = sensor_point.get("stream:procedure_type")
                            interaction_type = sensor_point.get("stream:interaction_type")
                            sensor_info.append({
                                "Sensor Name": sensor_name,
                                "System": system,
                                "System Type": system_type,
                                "Observation": observation,
                                "Procedure Type": procedure_type,
                                "Interaction Type": interaction_type,
                                "Timestamp": timestamp,
                                "Value": value,
                                "Resouce": resource_name
                            })
                            
    return sensor_info


# -------------------------
# Main loop (your original structure, adapted)
# -------------------------
# resource_df is assumed to be defined elsewhere (DataFrame with Resource and First Subprocess ID columns)
# xes_directory likewise defined

# sensor_dict is used as a mapping from resource -> DataFrame, so initialize as a dict
sensor_dict = {}

xes_files = [
    f for f in os.listdir(xes_directory)
    if f.endswith('.xes') and f != "MainProcess.xes"
]

sensor_df = None

import concurrent.futures

def process_xes_file(filename):
    try:
        print(f"Found XES file: {filename}")
        xes_path = os.path.join(xes_directory, filename)
        if not os.path.isfile(xes_path):
            print(f"Error: File not found: {xes_path}")
            return None
        try:
            subprocess_log = xes_importer.apply(xes_path)
        except Exception as e:
            print(f"Error importing XES file {filename}: {e}")
            return None
        try:
            attach_sensor_data_to_pm4py_log(subprocess_log, xes_path)
        except Exception as e:
            print(f"Error attaching sensor data for {filename}: {e}")
            return None
        try:
            sensor_info = get_sensor_data_from_subfile(subprocess_log)
        except Exception as e:
            print(f"Error extracting sensor data from {filename}: {e}")
            return None
        if sensor_info:
            try:
                sub_sensor_df = pd.DataFrame(sensor_info)
                # Add a column with the count of how often each sensor (by "System") appeared before grouping
                sub_sensor_df["Count"] = sub_sensor_df.groupby("System")["System"].transform("count")
                sub_sensor_df = sub_sensor_df.groupby("System", as_index=False).first()
                return sub_sensor_df
            except Exception as e:
                print(f"Error creating DataFrame for {filename}: {e}")
                return None
        return None
    except Exception as e:
        print(f"Unexpected error processing {filename}: {e}")
        return None

sensor_df = None
# Use ThreadPoolExecutor instead of ProcessPoolExecutor for Jupyter compatibility
with concurrent.futures.ThreadPoolExecutor() as executor:
    results = list(executor.map(process_xes_file, xes_files))
    for sub_sensor_df in results:
        if sub_sensor_df is not None:
            if sensor_df is None:
                sensor_df = sub_sensor_df
            else:
                sensor_df = pd.concat([sensor_df, sub_sensor_df], ignore_index=True)
    
if sensor_df is not None:
    sensor_df = sensor_df.groupby("System", as_index=False).agg({
        **{col: "first" for col in sensor_df.columns if col not in ["System", "Count"]},
        "Count": "sum"
    })
    display(sensor_df)
    # Save the combined sensor DataFrame to an Excel file in a "tables" subfolder
    tables_dir = os.path.join(os.getcwd(), "tables")
    os.makedirs(tables_dir, exist_ok=True)
    sensor_df.to_excel(os.path.join(tables_dir, "combined_sensor_data.xlsx"), index=False)


Use the sensor dict

In [None]:
print(len(sensor_dict))

for resource, sensor_df in sensor_dict.items():
    print(f"\nSensor data for Resource: {resource}\n")
    sensor_df = sensor_df.drop(columns=["Interaction Type", "Timestamp"], errors="ignore")
    sensor_df = sensor_df.groupby("System").agg(lambda x: x.iloc[0]).reset_index()
    display(sensor_df)
    # Save the sensor DataFrame to an Excel file in a "tables" subfolder
    tables_dir = os.path.join(os.getcwd(), "tables")
    os.makedirs(tables_dir, exist_ok=True)
    sensor_df.to_excel(os.path.join(tables_dir, f"sensor_data_{resource}.xlsx"), index=False)