In [2]:
############ HEPSYCODE ############
from collections import defaultdict
import os

# Define dependencies for ADD and SET events
add_dependencies = {
    "StructuredNode ports ADD": ["BehaviorSpecification nodes ADD"],
    "Stimulus ports ADD": ["BehaviorSpecification nodes ADD"],
    "Display ports ADD": ["BehaviorSpecification nodes ADD"],
    "Port pChannels ADD": ["BehaviorSpecification nodes ADD"],  # Will handle special case below
    "StructuredNode processes ADD": ["BehaviorSpecification nodes ADD"],
    "StructuredNode nChannels ADD": ["BehaviorSpecification nodes ADD", "StructuredNode processes ADD", "StructuredNode processes ADD"],
}

set_dependencies = {
    "Stimulus name SET": ["Stimulus ports ADD"],
    "Display name SET": ["Display ports ADD"],
    "StructuredNode name SET": ["BehaviorSpecification nodes ADD"],
    "Channel pFrom SET": ["Port pChannels ADD"],
    "Channel pTo SET": ["Port pChannels ADD"],
    "Process name SET": ["StructuredNode processes ADD"],
    "Channel nFrom SET": ["StructuredNode nChannels ADD"],
    "Channel nTo SET": ["StructuredNode nChannels ADD"],
    "Port portExtension SET": ["StructuredNode ports ADD"],
    "Process processExtension SET": ["StructuredNode processes ADD"],
    "Channel name SET": ["StructuredNode nChannels ADD"],
    "Channel queueSize SET": ["StructuredNode nChannels ADD"],
    "Channel rendezVous SET": ["StructuredNode nChannels ADD"],
    "Channel message SET": ["StructuredNode nChannels ADD"],
    "Message name SET": ["StructuredNode nChannels ADD"],
    "Message entry ADD": ["StructuredNode nChannels ADD"],
    "Entry name SET": ["Message entry ADD"],
    "Entry type SET": ["Message entry ADD"],
    "Port name SET": ["StructuredNode ports ADD"],
}

# Parse trace from file and check dependencies
def check_dependencies(file_path):
    with open(file_path, 'r') as file:
        trace = [line.strip() for line in file]

    events_seen = defaultdict(int)
    violations = []

    for line in trace:
        event = line.strip()

        # Special case for Port pChannels ADD
        if event == "Port pChannels ADD":
            if not ((events_seen["Display ports ADD"] > 0 and events_seen["BehaviorSpecification nodes ADD"] > 0) or \
                    (events_seen["Stimulus ports ADD"] > 0 and events_seen["BehaviorSpecification nodes ADD"] > 0)):
                violations.append((event, "Display ports ADD and BehaviorSpecification nodes ADD OR Stimulus ports ADD and BehaviorSpecification nodes ADD"))
            continue

        # Check ADD event dependencies
        if event in add_dependencies:
            for dep in add_dependencies[event]:
                if events_seen[dep] == 0:
                    violations.append((event, dep))

        # Check SET event dependencies
        if event in set_dependencies:
            for dep in set_dependencies[event]:
                if events_seen[dep] == 0:
                    violations.append((event, dep))

        # Record the event as seen
        events_seen[event] += 1

    return violations

# Count violations
def count_violations(file_path):
    violations = check_dependencies(file_path)
    return len(violations)

# Process all files in a specific folder
def process_folder(folder_path, file_extension):
    all_violations = {}
    for file_name in os.listdir(folder_path):
        if file_name.endswith(file_extension):
            file_path = os.path.join(folder_path, file_name)
            violations_count = count_violations(file_path)
            all_violations[file_name] = violations_count

    return all_violations

# Usage
folder_path = ".\HEPSYCODE\XES-MORGAN-LLM-MISTRAL"  # Replace with your folder path
file_extension = ".xes"  # Specify the file format

violations_summary = process_folder(folder_path, file_extension)
for file_name, violations_count in violations_summary.items():
    print(f"File: {file_name} -> Number of dependency violations: {violations_count}")

File: 2024-02-13 16.44 00%20-%20DigitalCam%20Nominal-representations.aird.xes -> Number of dependency violations: 0
File: 2024-02-13 16.59 00%20-%20DigitalCam%20Parallel-representations.aird.xes -> Number of dependency violations: 0
File: 2024-02-13 17.34 01%20-%20FIRFIRGCD-representations.aird.xes -> Number of dependency violations: 0
File: 2024-02-14 14.40 02%20-%20RT%20App-representations.aird.xes -> Number of dependency violations: 0
File: 2024-02-14 14.50 03%20-%20RT%20App%20Paper-representations.aird.xes -> Number of dependency violations: 0
File: 2024-02-14 15.04 04%20-%20RT%20App%20Paper%202-representations.aird.xes -> Number of dependency violations: 0
File: 2024-02-14 15.40 05%20-%20Hepsy%20Example-representations.aird.xes -> Number of dependency violations: 0
File: 2024-02-14 15.51 06%20-%20Sobel-representations.aird.xes -> Number of dependency violations: 0
File: 2024-02-14 15.52 07%20-%20Roberts-representations.aird.xes -> Number of dependency violations: 0
File: 2024-02-1

In [4]:
############ CAEX ############
import re
import os
import csv
from collections import defaultdict

# Function to parse the trace data and extract events
def parse_trace(trace):
    events = []
    for line in trace.strip().split("\n"):
        match = re.match(r"event\s+(\S+)\s+(\S+)\s+(\S+)", line)
        if match:
            event_type, element, action = match.groups()
            events.append((event_type, element, action))
    return events

# Function to check temporal order of events with dependencies
def check_order_with_dependencies(events):
    add_seen = defaultdict(set)  # Tracks elements that have been added
    errors = []

    # Define specific dependencies
    add_dependencies = {
        "internalLink": "externalInterface",
    }

    set_dependencies = {
        "name": "InternalElement",
        "role": "supportedRoleClass",
        "baseClass": "ExternalInterface",
        "partnerSideA": "InternalLink",
        "partnerSideB": "InternalLink",
    }

    for event_type, element, action in events:
        if action == "ADD":
            add_seen[element].add(event_type)

            # Check if ADD has an unresolved dependency
            if element in add_dependencies:
                dependency = add_dependencies[element]
                if not any(dep == dependency for dep in add_seen):
                    errors.append(f"Error: {event_type} {element} ADD requires prior {dependency} ADD")

        elif action == "SET":
            # Ensure that a related ADD event has occurred before SET
            dependency = set_dependencies.get(event_type)
            if dependency and dependency not in add_seen:
                errors.append(f"Error: {event_type} {element} SET requires prior {dependency} ADD")

    return errors

# Function to process all XES files in a specified folder and save results to a CSV
def process_folder(folder_path, output_csv):
    xes_files = [f for f in os.listdir(folder_path) if f.endswith(".xes")]
    results = []

    for xes_file in xes_files:
        file_path = os.path.join(folder_path, xes_file)
        print(f"Processing file: {xes_file}")

        with open(file_path, "r") as file:
            trace_data = file.read()

        events = parse_trace(trace_data)
        errors = check_order_with_dependencies(events)

        violation_count = len(errors)
        results.append({"file": xes_file, "violations": violation_count})

        if errors:
            print(f"Temporal order not met in {xes_file}. Total violations: {violation_count}")
            for error in errors:
                print(error)
        else:
            print(f"All temporal orders are satisfied in {xes_file}.")

    # Write results to CSV
    try:
        with open(output_csv, "w", newline="") as csvfile:
            fieldnames = ["file", "violations"]
            writer = csv.DictWriter(csvfile, fieldnames=fieldnames)

            writer.writeheader()
            for result in results:
                writer.writerow(result)
    except PermissionError:
        print(f"Error: Unable to write to file '{output_csv}'. Please specify a valid file path.")

# Usage
folder_path = ".\CAEX\XES-MORGAN-LLM-MISTRAL"  # Replace with your folder path
output_csv = "results_dependencies_MISTRAL.csv"  # Replace with your folder path
process_folder(folder_path, output_csv)    

Processing file: 2023-02-07 08.04 Test-representations.airdM.xes
All temporal orders are satisfied in 2023-02-07 08.04 Test-representations.airdM.xes.
Processing file: 2023-02-07 09.06 Test-representations.airdM.xes
All temporal orders are satisfied in 2023-02-07 09.06 Test-representations.airdM.xes.
Processing file: 2023-02-07 09.17 Test-representations.airdM.xes
All temporal orders are satisfied in 2023-02-07 09.17 Test-representations.airdM.xes.
Processing file: 2023-02-07 09.27 Test-representations.airdM.xes
All temporal orders are satisfied in 2023-02-07 09.27 Test-representations.airdM.xes.
Processing file: 2023-02-07 09.48 Test-representations.airdM.xes
All temporal orders are satisfied in 2023-02-07 09.48 Test-representations.airdM.xes.
Processing file: 2023-02-07 10.55 Test-representations.airdM.xes
All temporal orders are satisfied in 2023-02-07 10.55 Test-representations.airdM.xes.
Processing file: 2023-02-07 11.07 Test-representations.airdM.xes
All temporal orders are satis