# I. Input Statistics

### Text statistics

In [None]:
!pip install PyPDF2 nltk


In [None]:
import nltk
nltk.download('punkt')
nltk.download('punkt_tab')

In [None]:
text = """
1. Member States shall require that verification of the identity of the customer and the beneficial owner take place
before the establishment of a business relationship or the carrying out of the transaction.
2. By way of derogation from paragraph 1, Member States may allow verification of the identity of the customer and the beneficial owner to be completed during the establishment of a business relationship if necessary so as not to interrupt the normal conduct of business and where there is little risk of money laundering or terrorist financing. In such situations, those procedures shall be completed as soon as practicable after initial contact.
3. By way of derogation from paragraph 1, Member States may allow the opening of an account with a credit
institution or financial institution, including accounts that permit transactions in transferable securities, provided that there are adequate safeguards in place to ensure that transactions are not carried out by the customer or on its behalf until full compliance with the customer due diligence requirements laid down in points (a) and (b) of the first subparagraph of Article 13(1) is obtained.
4. Member States shall require that, where an obliged entity is unable to comply with the customer due diligence
requirements laid down in point (a), (b) or (c) of the first subparagraph of Article 13(1), it shall not carry out a
transaction through a bank account, establish a business relationship or carry out the transaction, and shall terminate the business relationship and consider making a suspicious transaction report to the FIU in relation to the customer in accordance with Article 33. Member States shall not apply the first subparagraph to notaries, other independent legal professionals, auditors, external accountants and tax advisors only to the strict extent that those persons ascertain the legal position of their client, or perform the task of defending or representing that client in, or concerning, judicial proceedings, including providing advice on instituting or avoiding such proceedings.
5. Member States shall require that obliged entities apply the customer due diligence measures not only to all new
customers but also at appropriate times to existing customers on a risk-sensitive basis, including at times when the
relevant circumstances of a customer change.
"""

# Tokenize the text
tokens = nltk.word_tokenize(text)

# Count the total number of tokens
total_tokens = len(tokens)

# Split the text into sentences and count tokens for each sentence
sentences = nltk.sent_tokenize(text)
sentence_token_counts = [len(nltk.word_tokenize(sentence)) for sentence in sentences]

# Calculate average sentence length (in tokens)
average_sentence_length_tokens = total_tokens / len(sentences)

print(f"Total tokens: {total_tokens}")
print(f"Total sentences: {len(sentences)}")
print(f"Average sentence length (tokens): {average_sentence_length_tokens}")
print(f"Sentence token counts: {sentence_token_counts}")


In [None]:
import nltk
import PyPDF2

# Function to extract text from a PDF file
def extract_text_from_pdf(pdf_path):
    with open(pdf_path, "rb") as file:
        reader = PyPDF2.PdfReader(file)
        text = ""
        for page in reader.pages:
            text += page.extract_text()
    return text

# Path to your PDF file
pdf_path = "2_blood_donor_selection.pdf"  # Replace with your PDF file path

# Extract text from the PDF
text = extract_text_from_pdf(pdf_path)

# Tokenize the text
tokens = nltk.word_tokenize(text)

# Count the total number of tokens
total_tokens = len(tokens)

# Split the text into sentences and count tokens for each sentence
sentences = nltk.sent_tokenize(text)
sentence_token_counts = [len(nltk.word_tokenize(sentence)) for sentence in sentences]

# Calculate average sentence length (in tokens)
average_sentence_length_tokens = total_tokens / len(sentences)

# Output the results
print(f"Total tokens: {total_tokens}")
print(f"Total sentences: {len(sentences)}")
print(f"Average sentence length (tokens): {average_sentence_length_tokens}")
print(f"Sentence token counts: {sentence_token_counts}")

### Model statistics

In [None]:
import xml.etree.ElementTree as ET
import os

def analyze_bpmn(file_path):
    """
    Parses a .bpmn file to count nodes (activities and events),
    gateways, edges (sequence flows), and roles (lanes).

    Args:
        file_path (str): The full path to the .bpmn file.

    Returns:
        dict: A dictionary containing the counts of each element type.
              Returns None if the file is not found or is not a valid XML.
    """
    if not os.path.exists(file_path):
        print(f"Error: File not found at '{file_path}'")
        return None

    try:
        tree = ET.parse(file_path)
        root = tree.getroot()

        namespace = {'bpmn': root.tag.split('}')[0][1:]}

        activity_types = [
            'task', 'sendTask', 'receiveTask', 'userTask', 'manualTask',
            'businessRuleTask', 'serviceTask', 'scriptTask', 'callActivity', 'subProcess'
        ]
        event_types = [
            'startEvent', 'endEvent', 'intermediateThrowEvent',
            'intermediateCatchEvent', 'boundaryEvent'
        ]

        gateway_types = [
            'exclusiveGateway', 'parallelGateway', 'inclusiveGateway',
            'eventBasedGateway', 'complexGateway'
        ]

        edge_types = ['sequenceFlow']

        role_types = ['pool','lane']

        total_activities = sum(len(root.findall(f".//bpmn:{t}", namespace)) for t in activity_types)

        total_events = sum(len(root.findall(f".//bpmn:{t}", namespace)) for t in event_types)

        total_nodes = total_activities + total_events

        total_gateways = sum(len(root.findall(f".//bpmn:{t}", namespace)) for t in gateway_types)

        total_edges = sum(len(root.findall(f".//bpmn:{t}", namespace)) for t in edge_types)

        total_roles = sum(len(root.findall(f".//bpmn:{t}", namespace)) for t in role_types)

        if total_roles == 0:
            total_roles = len(root.findall('.//bpmn:participant', namespace))

        counts = {
            "Total Nodes (Activities + Events)": total_nodes,
            "Total Gateways": total_gateways,
            "Total Edges (Sequence Flows)": total_edges,
            "Total Roles (Lanes/Participants)": total_roles
        }

        return counts

    except ET.ParseError:
        print(f"Error: Could not parse '{file_path}'. Make sure it is a valid BPMN/XML file.")
        return None
    except Exception as e:
        print(f"An unexpected error occurred: {e}")
        return None

def main():

    file_path = "/content/finance_customer_due_diligence.bpmn"
    analysis_results = analyze_bpmn(file_path)

    if analysis_results:
        print("\n--- BPMN Analysis Results ---")
        for key, value in analysis_results.items():
            print(f"{key}: {value}")
        print("---------------------------\n")


if __name__ == "__main__":
    main()


# II. Semantic EVAL


### 1. Completeness

In [None]:
!pip install sentence-transformers torch pandas openpyxl

In [None]:
# step one check for completness
import xml.etree.ElementTree as ET
from collections import defaultdict
from sentence_transformers import SentenceTransformer, util
import pandas as pd
import os

NAMESPACES = {'bpmn': 'http://www.omg.org/spec/BPMN/20100524/MODEL'}


def get_elements_by_tag(root, tag_name):
    """
    Finds all elements with a specific tag name in the BPMN XML tree.

    Args:
        root (ET.Element): The root of the XML tree.
        tag_name (str): The BPMN tag to search for (e.g., 'task', 'userTask').

    Returns:
        list: A list of found XML elements.
    """
    return root.findall(f'.//bpmn:{tag_name}', NAMESPACES)

def get_element_names(elements):
    """
    Extracts the 'name' attribute from a list of BPMN elements.
    Filters out elements without a name.

    Args:
        elements (list): A list of XML elements.

    Returns:
        list: A list of names (strings).
    """
    names = [elem.get('name') for elem in elements]
    return [name for name in names if name]

def get_actors(root):
    """
    Extracts actors from the BPMN model. Actors are represented as
    Participants (which correspond to Pools) and Lanes.

    Args:
        root (ET.Element): The root of the XML tree.

    Returns:
        list: A list of actor names.
    """
    participants = get_elements_by_tag(root, 'participant')
    lanes = get_elements_by_tag(root, 'lane')
    return get_element_names(participants + lanes)

def get_activities(root):
    """
    Extracts activities from the BPMN model.
    This includes tasks, user tasks, service tasks, etc.

    Args:
        root (ET.Element): The root of the XML tree.

    Returns:
        list: A list of activity names.
    """
    activity_tags = ['task', 'userTask', 'serviceTask', 'sendTask', 'receiveTask',
                     'manualTask', 'businessRuleTask', 'scriptTask', 'callActivity', 'subProcess']
    activities = []
    for tag in activity_tags:
        activities.extend(get_elements_by_tag(root, tag))
    return get_element_names(activities)

def get_events(root):
    """
    Extracts events from the BPMN model (start, end, intermediate).

    Args:
        root (ET.Element): The root of the XML tree.

    Returns:
        list: A list of event names.
    """
    event_tags = ['startEvent', 'endEvent', 'intermediateThrowEvent', 'intermediateCatchEvent', 'boundaryEvent']
    events = []
    for tag in event_tags:
        events.extend(get_elements_by_tag(root, tag))
    return get_element_names(events)

def get_gateways_count(root):
    """
    Counts the number of AND (parallel) and XOR (exclusive) gateways.
    These are often not named, so we count them instead of matching names.

    Args:
        root (ET.Element): The root of the XML tree.

    Returns:
        dict: A dictionary with counts for 'AND' and 'XOR' gateways.
    """
    and_gateways = get_elements_by_tag(root, 'parallelGateway')
    xor_gateways = get_elements_by_tag(root, 'exclusiveGateway')
    return {'AND': len(and_gateways), 'XOR': len(xor_gateways)}

def get_data_objects(root):
    """
    Extracts data objects from the BPMN model.

    Args:
        root (ET.Element): The root of the XML tree.

    Returns:
        list: A list of data object names.
    """
    data_objects = get_elements_by_tag(root, 'dataObjectReference')
    return get_element_names(data_objects)

def get_conditions(root):
    """
    Extracts conditions, defined as the names/labels of the
    XOR (exclusive) gateways that represent a decision split.

    Args:
        root (ET.Element): The root of the XML tree.

    Returns:
        list: A list of condition texts (gateway labels).
    """
    conditions = []
    xor_gateways = root.findall('.//bpmn:exclusiveGateway', NAMESPACES)

    for gateway in xor_gateways:
        gateway_id = gateway.get('id')
        outgoing_flows = [
            flow for flow in root.findall('.//bpmn:sequenceFlow', NAMESPACES)
            if flow.get('sourceRef') == gateway_id
        ]

        if len(outgoing_flows) > 1:
            condition_text = gateway.get('name')
            if condition_text:
                conditions.append(condition_text)

    return conditions


def calculate_recall_for_semantic_elements(gold_standard_names, generated_names, similarity_threshold, model):
    """
    Calculates recall for named elements based on semantic similarity and returns detailed match lists.

    Returns:
        tuple: A tuple containing (recall, matches_count, total_gold, matches_list, unmatched_gold, unmatched_generated).
    """
    if not gold_standard_names:
        return 1.0, 0, 0, [], [], generated_names
    if not generated_names:
        return 0.0, 0, len(gold_standard_names), [], gold_standard_names, []

    gold_embeddings = model.encode(gold_standard_names, convert_to_tensor=True)
    generated_embeddings = model.encode(generated_names, convert_to_tensor=True)

    cosine_scores = util.pytorch_cos_sim(gold_embeddings, generated_embeddings)

    matches_list = []
    unmatched_gold_indices = set(range(len(gold_standard_names)))
    unmatched_generated_indices = set(range(len(generated_names)))

    matched_generated_indices = set()

    for gs_idx in range(len(gold_standard_names)):
        best_score = -1
        best_gen_idx = -1
        for gen_idx in range(len(generated_names)):
            if gen_idx in matched_generated_indices:
                continue

            score = cosine_scores[gs_idx][gen_idx]
            if score > best_score:
                best_score = score
                best_gen_idx = gen_idx

        if best_score >= similarity_threshold:
            matches_list.append(
                (gold_standard_names[gs_idx], generated_names[best_gen_idx], best_score.item())
            )
            if best_gen_idx != -1:
                matched_generated_indices.add(best_gen_idx)

            if gs_idx in unmatched_gold_indices:
                unmatched_gold_indices.remove(gs_idx)
            if best_gen_idx in unmatched_generated_indices:
                unmatched_generated_indices.remove(best_gen_idx)

    matches_count = len(matches_list)
    total_gold = len(gold_standard_names)
    recall = matches_count / total_gold if total_gold > 0 else 1.0

    unmatched_gold = [gold_standard_names[i] for i in unmatched_gold_indices]
    unmatched_generated = [generated_names[i] for i in unmatched_generated_indices]

    return recall, matches_count, total_gold, matches_list, unmatched_gold, unmatched_generated


def calculate_recall_for_counts(gold_standard_count, generated_count):
    """
    Calculates recall for elements that are counted (like gateways).

    Returns:
        tuple: A tuple containing (recall, matches, total_count).
    """
    if gold_standard_count == 0:
        return 1.0, 0, 0

    matches = min(gold_standard_count, generated_count)
    recall = matches / gold_standard_count
    return recall, matches, gold_standard_count

def format_matches_for_excel(matches_list, unmatched_gold, unmatched_gen):
    """Formats the detailed match lists into a single string for an Excel cell."""
    report_parts = []
    if matches_list:
        report_parts.append("MATCHED:")
        for gs, gen, score in sorted(matches_list, key=lambda x: x[2], reverse=True):
            report_parts.append(f"  - G: '{gs}' <-> M: '{gen}' ({score:.2f})")
    if unmatched_gold:
        report_parts.append("\nMISSING FROM MODEL:")
        for name in sorted(unmatched_gold):
            report_parts.append(f"  - {name}")
    if unmatched_gen:
        report_parts.append("\nEXTRA IN MODEL:")
        for name in sorted(unmatched_gen):
            report_parts.append(f"  - {name}")
    return "\n".join(report_parts)


def run_completeness_check(gold_standard_file, generated_file, thresholds, model):
    """
    Main function to run the completeness check.
    Returns a dictionary with all the results for a single comparison.
    """
    results = {
        'Gold Standard': os.path.basename(gold_standard_file),
        'Generated Model': os.path.basename(generated_file),
    }

    try:
        gold_tree = ET.parse(gold_standard_file)
        gold_root = gold_tree.getroot()
    except (FileNotFoundError, ET.ParseError) as e:
        print(f"Error reading gold standard file '{gold_standard_file}': {e}")
        return None

    try:
        generated_tree = ET.parse(generated_file)
        generated_root = generated_tree.getroot()
    except (FileNotFoundError, ET.ParseError) as e:
        print(f"Error reading generated file '{generated_file}': {e}")
        return None

    all_elements = {
        'Actors': (get_actors(gold_root), get_actors(generated_root)),
        'Activities': (get_activities(gold_root), get_activities(generated_root)),
        'Events': (get_events(gold_root), get_events(generated_root)),
        'Data Objects': (get_data_objects(gold_root), get_data_objects(generated_root)),
        'Conditions': (get_conditions(gold_root), get_conditions(generated_root)),
    }

    for category, (gold_list, gen_list) in all_elements.items():
        thresh_key = category.lower().replace(' ', '_')
        thresh = thresholds.get(thresh_key, 0.75)
        recall, matches, total, matches_list, unmatched_gold, unmatched_gen = \
            calculate_recall_for_semantic_elements(gold_list, gen_list, thresh, model)

        results[f'{category} Recall'] = recall
        results[f'{category} Absolute'] = f"{matches}/{total}"
        results[f'{category} Details'] = format_matches_for_excel(matches_list, unmatched_gold, unmatched_gen)

    gold_gateways = get_gateways_count(gold_root)
    gen_gateways = get_gateways_count(generated_root)

    recall_and, matches_and, total_and = calculate_recall_for_counts(gold_gateways['AND'], gen_gateways['AND'])
    results['AND Gateways Recall'] = recall_and
    results['AND Gateways Absolute'] = f"{matches_and}/{total_and}"
    results['AND Gateways Details'] = f"Gold: {gold_gateways['AND']}, Generated: {gen_gateways['AND']}"

    recall_xor, matches_xor, total_xor = calculate_recall_for_counts(gold_gateways['XOR'], gen_gateways['XOR'])
    results['XOR Gateways Recall'] = recall_xor
    results['XOR Gateways Absolute'] = f"{matches_xor}/{total_xor}"
    results['XOR Gateways Details'] = f"Gold: {gold_gateways['XOR']}, Generated: {gen_gateways['XOR']}"

    return results


if __name__ == "__main__":

    # file pairs to compare: (gold_standard_file, generated_file)
    comparison_files = [
        ("/content/smart_meter_refactored.bpmn", "/content/smart_meter_01.bpmn"),
        ("/content/smart_meter_refactored.bpmn", "/content/smart_meter_02.bpmn"),
        ("/content/smart_meter_refactored.bpmn", "/content/smart_meter_03.bpmn"),
        ("/content/gdpr_refactored.bpmn", "/content/gdpr_01.bpmn"),
        ("/content/gdpr_refactored.bpmn", "/content/gdpr_02.bpmn"),
        ("/content/gdpr_refactored.bpmn", "/content/gdpr_03.bpmn"),
        ("/content/blood_donor_selection.bpmn", "/content/blood_donor_01.bpmn"),
        ("/content/blood_donor_selection.bpmn", "/content/blood_donor_02.bpmn"),
        ("/content/blood_donor_selection.bpmn", "/content/blood_donor_03.bpmn"),
        ("/content/health_data.bpmn", "/content/health_data_01.bpmn"),
        ("/content/health_data.bpmn", "/content/health_data_02.bpmn"),
        ("/content/health_data.bpmn", "/content/health_data_03.bpmn"),
        ("/content/finance_customer_due_diligence.bpmn", "/content/CDD_01.bpmn"),
        ("/content/finance_customer_due_diligence.bpmn", "/content/CDD_02.bpmn"),
        ("/content/finance_customer_due_diligence.bpmn", "/content/CDD_03.bpmn")
    ]

    output_excel_file = "bpmn_completeness_report_CARB.xlsx"

    similarity_thresholds = {
        'actors': 0.70,
        'activities': 0.50,
        'events': 0.55,
        'data_objects': 0.60,
        'conditions': 0.60
    }

    print("Loading semantic similarity model (this may take a moment)...")
    model = SentenceTransformer('all-MiniLM-L6-v2')
    print("Model loaded.")

    all_results = []
    print("\nStarting batch processing...")
    for gold_file, generated_file in comparison_files:
        print(f"  - Comparing '{os.path.basename(gold_file)}' with '{os.path.basename(generated_file)}'")
        result_data = run_completeness_check(gold_file, generated_file, similarity_thresholds, model)
        if result_data:
            all_results.append(result_data)

    if all_results:
        df = pd.DataFrame(all_results)

        column_order = [
            'Gold Standard', 'Generated Model',
            'Actors Recall', 'Actors Absolute',
            'Activities Recall', 'Activities Absolute',
            'Events Recall', 'Events Absolute',
            'Data Objects Recall', 'Data Objects Absolute',
            'Conditions Recall', 'Conditions Absolute',
            'AND Gateways Recall', 'AND Gateways Absolute',
            'XOR Gateways Recall', 'XOR Gateways Absolute',
            'Actors Details', 'Activities Details', 'Events Details',
            'Data Objects Details', 'Conditions Details', 'AND Gateways Details', 'XOR Gateways Details'
        ]
        df = df.reindex(columns=column_order)

        try:
            df.to_excel(output_excel_file, index=False, engine='openpyxl')
            print(f"\nSuccessfully exported {len(all_results)} results to '{output_excel_file}'")
        except Exception as e:
            print(f"\nError exporting to Excel: {e}")
    else:
        print("\nNo results to export.")

### 2. Correctness

In [None]:
!pip install sentence-transformers torch pandas openpyxl requests

In [None]:
# step two check for correctness
import xml.etree.ElementTree as ET
from collections import defaultdict
from sentence_transformers import SentenceTransformer, util
import pandas as pd
import os
import requests
import json
import time

NAMESPACES = {'bpmn': 'http://www.omg.org/spec/BPMN/20100524/MODEL'}

class BPMNModel:
    """
    Represents a parsed BPMN model with efficient lookups for elements,
    lanes, and participants.
    """
    def __init__(self, file_path):
        try:
            self.root = ET.parse(file_path).getroot()
            # Create maps for efficient lookups
            self.id_map = {elem.get('id'): elem for elem in self.root.iter() if elem.get('id')}
            self.parent_map = {c: p for p in self.root.iter() for c in p}
            self.elements_by_name = self._get_all_elements_by_name()
            self.element_to_lane_name_map = self._map_elements_to_lanes()
            self.process_to_participant_name_map = self._map_processes_to_participants()
        except ET.ParseError as e:
            raise IOError(f"Failed to parse XML file: {file_path}. Error: {e}")

    def _get_all_elements_by_name(self):
        """Creates a dictionary mapping element names to a list of elements."""
        mapping = defaultdict(list)
        for elem in self.id_map.values():
            name = elem.get('name')
            if name:
                mapping[name].append(elem)
        return mapping

    def _map_elements_to_lanes(self):
        """Maps flow element IDs to the name of the lane they belong to."""
        mapping = {}
        for lane in self.root.findall('.//bpmn:lane', NAMESPACES):
            lane_name = lane.get('name')
            if lane_name:
                for node_ref in lane.findall('.//bpmn:flowNodeRef', NAMESPACES):
                    if node_ref.text:
                        mapping[node_ref.text] = lane_name
        return mapping

    def _map_processes_to_participants(self):
        """Maps process IDs to the name of the participant (pool) they belong to."""
        mapping = {}
        for participant in self.root.findall('.//bpmn:participant', NAMESPACES):
            participant_name = participant.get('name')
            process_ref = participant.get('processRef')
            if participant_name and process_ref:
                mapping[process_ref] = participant_name
        return mapping

    def get_all_activities(self):
        """Returns a list of all activity elements in the model."""
        activity_tags = ['task', 'userTask', 'serviceTask', 'sendTask', 'receiveTask',
                         'manualTask', 'businessRuleTask', 'scriptTask', 'callActivity', 'subProcess']
        activities = []
        for tag in activity_tags:
            activities.extend(self.root.findall(f'.//bpmn:{tag}', NAMESPACES))
        return activities

    def get_activity_actor_pairs(self):
        """
        Extracts all (activity_name, actor_name) pairs from the model.
        """
        pairs = []
        activities = self.get_all_activities()
        for activity in activities:
            activity_name = activity.get('name')
            actor_name = self.get_actor_name_for_element(activity)
            if activity_name and actor_name:
                pairs.append((activity_name, actor_name))
        return pairs

    def get_all_flow_nodes(self):
        """Returns a list of all flow node elements (tasks, events, gateways)."""
        flow_node_tags = [
            'task', 'userTask', 'serviceTask', 'sendTask', 'receiveTask',
            'manualTask', 'businessRuleTask', 'scriptTask', 'callActivity', 'subProcess',
            'startEvent', 'endEvent', 'intermediateThrowEvent', 'intermediateCatchEvent',
            'boundaryEvent', 'parallelGateway', 'exclusiveGateway', 'inclusiveGateway',
            'eventBasedGateway', 'complexGateway'
        ]
        nodes = []
        for tag in flow_node_tags:
            nodes.extend(self.root.findall(f'.//bpmn:{tag}', NAMESPACES))
        return nodes

    def get_control_flow_pairs(self):
        """
        Extracts all control flow pairs (source_name, target_name) from the model.
        """
        pairs = []
        flow_nodes = self.get_all_flow_nodes()
        for node in flow_nodes:
            source_name = node.get('name')
            if not source_name:
                continue

            outgoing_flows = self.root.findall(f".//bpmn:sequenceFlow[@sourceRef='{node.get('id')}']", NAMESPACES)
            for flow in outgoing_flows:
                target_id = flow.get('targetRef')
                if target_id and target_id in self.id_map:
                    target_elem = self.id_map[target_id]
                    target_name = target_elem.get('name')
                    if target_name:
                        pairs.append((source_name, target_name))
        return pairs

    def get_decision_splits(self):
        """
        Extracts all decision splits (XOR gateway name and its conditional labels).
        This is the new method for the revised decision logic check.
        """
        splits = []
        # Find all exclusive gateways that have a name.
        gateways = self.root.findall('.//bpmn:exclusiveGateway[@name]', NAMESPACES)
        for gw in gateways:
            labels = self.get_outgoing_conditional_flow_labels(gw)
            # A decision split must have at least two outgoing conditional flows.
            if len(labels) > 1:
                splits.append({'name': gw.get('name'), 'labels': labels})
        return splits

    def get_element_by_name(self, name):
        """Retrieves the first element matching a given name."""
        return self.elements_by_name.get(name, [None])[0]

    def get_actor_name_for_element(self, element):
        """
        Finds the actor (Lane or Participant name) for a given element.
        """
        if element is None:
            return None
        elem_id = element.get('id')

        if elem_id in self.element_to_lane_name_map:
            return self.element_to_lane_name_map[elem_id]

        current = element
        while current is not None:
            parent = self.parent_map.get(current)
            if parent is not None and parent.tag.endswith('process'):
                process_id = parent.get('id')
                if process_id in self.process_to_participant_name_map:
                    return self.process_to_participant_name_map[process_id]
                break
            current = parent
        return None

    def get_outgoing_conditional_flow_labels(self, element):
        """Gets the names (labels) of outgoing conditional flows from a gateway."""
        if element is None:
            return []
        elem_id = element.get('id')
        labels = []
        flows = self.root.findall(f".//bpmn:sequenceFlow[@sourceRef='{elem_id}']", NAMESPACES)
        for flow in flows:
            if flow.get('name'):
                labels.append(flow.get('name'))
        return sorted(labels)

def get_semantic_matches(list_a, list_b, threshold, model):
    """Finds the best semantic matches between two lists of strings."""
    if not list_a or not list_b:
        return []

    embeddings_a = model.encode(list_a, convert_to_tensor=True)
    embeddings_b = model.encode(list_b, convert_to_tensor=True)
    cosine_scores = util.pytorch_cos_sim(embeddings_a, embeddings_b)

    matches = []
    matched_b_indices = set()

    for i, item_a in enumerate(list_a):
        best_score = -1
        best_j = -1
        for j, item_b in enumerate(list_b):
            if j in matched_b_indices:
                continue
            score = cosine_scores[i][j]
            if score > best_score:
                best_score = score
                best_j = j

        if best_score >= threshold:
            matches.append({'gold': item_a, 'gen': list_b[best_j], 'score': best_score.item()})
            if best_j != -1:
                matched_b_indices.add(best_j)
    return matches

def ask_llm(prompt, api_key):
    """Sends a prompt to the Gemini API and returns the response."""
    api_url = f"https://generativelanguage.googleapis.com/v1/models/gemini-2.5-pro:generateContent?key={api_key}"

    headers = {"Content-Type": "application/json"}
    payload = {"contents": [{"parts": [{"text": prompt}]}]}

    max_retries = 3
    for attempt in range(max_retries):
        try:
            response = requests.post(api_url, headers=headers, json=payload, timeout=120)

            if 500 <= response.status_code < 600:
                print(f"API Request Warning: Received status {response.status_code}. Retrying in {2**attempt}s...")
                time.sleep(2**attempt)
                continue

            response.raise_for_status()
            response_json = response.json()

            if 'candidates' in response_json and len(response_json['candidates']) > 0:
                content = response_json['candidates'][0].get('content', {})
                if 'parts' in content and len(content['parts']) > 0:
                    return content['parts'][0].get('text', '')
            return "Error: Could not parse LLM response."

        except requests.exceptions.RequestException as e:
            print(f"API Request Error: {e}. Attempt {attempt + 1} of {max_retries}.")
            if attempt < max_retries - 1:
                time.sleep(2**attempt)
            else:
                return "Error: API request failed after multiple retries."
    return "Error: API request failed after multiple retries."

def describe_model_for_llm(model: BPMNModel):
    """Creates a textual description of a BPMN model for LLM analysis."""
    description = []
    description.append("BPMN Model Structure:")

    participants = model.root.findall('.//bpmn:participant', NAMESPACES)
    for p in participants:
        p_name = p.get('name')
        description.append(f"\n- Participant (Pool): '{p_name}'")

        process_ref = p.get('processRef')
        process = model.root.find(f".//bpmn:process[@id='{process_ref}']", NAMESPACES)
        if process is not None:
            for elem in process:
                elem_name = elem.get('name')
                elem_tag = elem.tag.split('}')[-1]
                if elem_name:
                    description.append(f"  - Contains {elem_tag}: '{elem_name}'")

    description.append("\n- Sequence Flows (Connections):")
    flows = model.root.findall('.//bpmn:sequenceFlow', NAMESPACES)
    for flow in flows:
        source_id = flow.get('sourceRef')
        target_id = flow.get('targetRef')
        source_name = model.id_map.get(source_id, ET.Element("")).get('name', 'Unnamed')
        target_name = model.id_map.get(target_id, ET.Element("")).get('name', 'Unnamed')
        flow_name = flow.get('name')

        flow_desc = f"  - From '{source_name}' to '{target_name}'"
        if flow_name:
            flow_desc += f" (Condition: '{flow_name}')"
        description.append(flow_desc)

    return "\n".join(description)

def check_actor_assignment(gold_model, gen_model, model, thresholds):
    """
    Checks for semantically similar (activity, actor) pairs.
    """
    gold_pairs = gold_model.get_activity_actor_pairs()
    gen_pairs = gen_model.get_activity_actor_pairs()
    total_gold_pairs = len(gold_pairs)

    if total_gold_pairs == 0:
        return "NA" if not gen_pairs else 1.0, "0/0", "Not applicable: No (activity, actor) pairs in gold standard."

    if not gen_pairs:
        return 0.0, f"0/{total_gold_pairs}", "No (activity, actor) pairs found in the generated model."

    all_activity_names = list(set([p[0] for p in gold_pairs] + [p[0] for p in gen_pairs]))
    all_actor_names = list(set([p[1] for p in gold_pairs] + [p[1] for p in gen_pairs]))

    activity_embeddings = {name: emb for name, emb in zip(all_activity_names, model.encode(all_activity_names))}
    actor_embeddings = {name: emb for name, emb in zip(all_actor_names, model.encode(all_actor_names))}

    def get_similarity(name1, name2, embedding_dict):
        if name1 not in embedding_dict or name2 not in embedding_dict:
            return 0.0
        return util.pytorch_cos_sim(embedding_dict[name1], embedding_dict[name2]).item()

    matched_gen_indices = set()
    correctly_assigned = 0
    details = []

    for gold_activity, gold_actor in gold_pairs:
        best_pair_score = -1
        best_gen_idx = -1

        for gen_idx, (gen_activity, gen_actor) in enumerate(gen_pairs):
            if gen_idx in matched_gen_indices:
                continue

            sim_activity = get_similarity(gold_activity, gen_activity, activity_embeddings)
            sim_actor = get_similarity(gold_actor, gen_actor, actor_embeddings)

            if sim_activity >= thresholds['activities'] and sim_actor >= thresholds['actors']:
                current_pair_score = (sim_activity + sim_actor) / 2.0
                if current_pair_score > best_pair_score:
                    best_pair_score = current_pair_score
                    best_gen_idx = gen_idx

        if best_gen_idx != -1:
            correctly_assigned += 1
            matched_gen_indices.add(best_gen_idx)
            matched_gen_pair = gen_pairs[best_gen_idx]
            details.append(f"MATCH: Gold ('{gold_activity}' by '{gold_actor}') with Gen ('{matched_gen_pair[0]}' by '{matched_gen_pair[1]}') (Score: {best_pair_score:.2f})")

    score = correctly_assigned / total_gold_pairs

    unmatched_details = []
    for gold_act, gold_acr in gold_pairs:
        if not any(f"Gold ('{gold_act}' by '{gold_acr}')" in d for d in details):
            unmatched_details.append(f"MISSING: Gold pair ('{gold_act}' by '{gold_acr}') not found in generated model.")

    for idx, (gen_act, gen_acr) in enumerate(gen_pairs):
        if idx not in matched_gen_indices:
            unmatched_details.append(f"EXTRA: Generated pair ('{gen_act}' by '{gen_acr}') has no match in gold standard.")

    return score, f"{correctly_assigned}/{total_gold_pairs}", "\n".join(details + unmatched_details)


def check_control_flow(gold_model, gen_model, model, threshold):
    """
    Checks for consecutive pairs of semantically similar flow nodes.
    """
    gold_pairs = gold_model.get_control_flow_pairs()
    gen_pairs = gen_model.get_control_flow_pairs()
    total_gold_pairs = len(gold_pairs)

    if total_gold_pairs == 0:
        return "NA" if not gen_pairs else 1.0, "0/0", "Not applicable: No control flow pairs in gold standard."

    if not gen_pairs:
        return 0.0, f"0/{total_gold_pairs}", "No control flow pairs found in the generated model."

    all_names = list(set([name for pair in gold_pairs + gen_pairs for name in pair]))
    name_to_embedding = {name: emb for name, emb in zip(all_names, model.encode(all_names))}

    def get_similarity(name1, name2):
        if name1 not in name_to_embedding or name2 not in name_to_embedding:
            return 0.0
        return util.pytorch_cos_sim(name_to_embedding[name1], name_to_embedding[name2]).item()

    matched_gen_indices = set()
    correct_pairs = 0
    details = []

    for gold_idx, (gs_name, gt_name) in enumerate(gold_pairs):
        best_pair_score = -1
        best_gen_idx = -1

        for gen_idx, (ms_name, mt_name) in enumerate(gen_pairs):
            if gen_idx in matched_gen_indices:
                continue

            sim_source = get_similarity(gs_name, ms_name)
            sim_target = get_similarity(gt_name, mt_name)

            if sim_source >= threshold and sim_target >= threshold:
                current_pair_score = (sim_source + sim_target) / 2.0
                if current_pair_score > best_pair_score:
                    best_pair_score = current_pair_score
                    best_gen_idx = gen_idx

        if best_gen_idx != -1:
            correct_pairs += 1
            matched_gen_indices.add(best_gen_idx)
            matched_gen_pair = gen_pairs[best_gen_idx]
            details.append(f"MATCH: Gold ('{gs_name}' -> '{gt_name}') with Gen ('{matched_gen_pair[0]}' -> '{matched_gen_pair[1]}') (Score: {best_pair_score:.2f})")

    score = correct_pairs / total_gold_pairs

    unmatched_details = []
    for gs, gt in gold_pairs:
        if not any(f"Gold ('{gs}' -> '{gt}')" in d for d in details):
            unmatched_details.append(f"MISSING: Gold pair ('{gs}' -> '{gt}') not found in generated model.")

    for idx, (ms, mt) in enumerate(gen_pairs):
        if idx not in matched_gen_indices:
            unmatched_details.append(f"EXTRA: Generated pair ('{ms}' -> '{mt}') has no match in the gold standard.")

    return score, f"{correct_pairs}/{total_gold_pairs}", "\n".join(details + unmatched_details)


def check_decision_logic(gold_model, gen_model, model, threshold):
    """
    Checks if decision splits (gateway + labels) are semantically correct.
    """
    gold_splits = gold_model.get_decision_splits()
    gen_splits = gen_model.get_decision_splits()
    total_gold_splits = len(gold_splits)

    if total_gold_splits == 0:
        return "NA" if not gen_splits else 1.0, "0/0", "Not applicable: No decision splits in gold standard."

    if not gen_splits:
        return 0.0, f"0/{total_gold_splits}", "No decision splits found in the generated model."

    all_gw_names = list(set([s['name'] for s in gold_splits] + [s['name'] for s in gen_splits]))
    all_labels = list(set([label for s in gold_splits for label in s['labels']] + [label for s in gen_splits for label in s['labels']]))

    gw_embeddings = {name: emb for name, emb in zip(all_gw_names, model.encode(all_gw_names))}
    label_embeddings = {name: emb for name, emb in zip(all_labels, model.encode(all_labels))}

    def get_gw_similarity(name1, name2):
        if name1 not in gw_embeddings or name2 not in gw_embeddings: return 0.0
        return util.pytorch_cos_sim(gw_embeddings[name1], gw_embeddings[name2]).item()

    def get_label_set_recall(gold_labels, gen_labels):
        if not gold_labels: return 1.0
        if not gen_labels: return 0.0

        label_matches = 0
        used_gen_indices = set()
        for gl in gold_labels:
            best_score = -1
            best_idx = -1
            for i, ml in enumerate(gen_labels):
                if i in used_gen_indices: continue
                score = util.pytorch_cos_sim(label_embeddings[gl], label_embeddings[ml]).item()
                if score > best_score:
                    best_score = score
                    best_idx = i

            if best_score >= threshold:
                label_matches += 1
                if best_idx != -1:
                    used_gen_indices.add(best_idx)

        return label_matches / len(gold_labels)

    correct_splits = 0
    matched_gen_indices = set()
    details = []

    for gold_split in gold_splits:
        best_overall_score = -1
        best_gen_idx = -1

        for gen_idx, gen_split in enumerate(gen_splits):
            if gen_idx in matched_gen_indices:
                continue

            gw_sim = get_gw_similarity(gold_split['name'], gen_split['name'])

            if gw_sim >= threshold:
                label_recall = get_label_set_recall(gold_split['labels'], gen_split['labels'])
                overall_score = (gw_sim + label_recall) / 2.0

                if overall_score > best_overall_score:
                    best_overall_score = overall_score
                    best_gen_idx = gen_idx

        if best_gen_idx != -1:
            correct_splits += 1
            matched_gen_indices.add(best_gen_idx)
            matched_gen_split = gen_splits[best_gen_idx]
            details.append(f"MATCH: Gold Split '{gold_split['name']}' with Gen Split '{matched_gen_split['name']}' (Score: {best_overall_score:.2f})")

    score = correct_splits / total_gold_splits

    unmatched_details = []
    for gold_split in gold_splits:
        if not any(f"'{gold_split['name']}'" in d for d in details):
             unmatched_details.append(f"MISSING: Gold split '{gold_split['name']}' not found in generated model.")

    for idx, gen_split in enumerate(gen_splits):
        if idx not in matched_gen_indices:
            unmatched_details.append(f"EXTRA: Generated split '{gen_split['name']}' has no match in gold standard.")

    return score, f"{correct_splits}/{total_gold_splits}", "\n".join(details + unmatched_details)


def check_conditional_logic_llm(source_text, gen_model, api_key):
    """Uses an LLM to evaluate if business rules from text are in the model."""
    model_description = describe_model_for_llm(gen_model)

    prompt = f"""
    You are an expert BPMN 2.0 analyst. Your task is to analyze a source text, identify all business rules within it, and then evaluate if each rule is correctly implemented in a given BPMN model.

    **Source Text:**
    ```
    {source_text}
    ```

    **Description of the Generated BPMN Model:**
    ```
    {model_description}
    ```

    **Analysis Task:**
    1.  Read the entire Source Text and identify every sentence that constitutes a business rule, a constraint, or a condition (e.g., sentences containing "must", "if", "can", "within", etc.).
    2.  For each rule you identify, evaluate its implementation in the provided BPMN Model Description.
    3.  Consider if conditional clauses use gateways, if temporal constraints use timer events, and if deontic logic (must, can, must not) is correctly modeled.

    **Output Format:**
    Respond with a single JSON array only. Each object in the array should represent one rule you identified and have three keys: "identified_rule" (string), "evaluation" (string: "Correct" or "Incorrect"), and "justification" (string).
    """

    response_text = ask_llm(prompt, api_key)
    details = []
    correct_implementations = 0
    total_rules_identified = 0

    try:
        clean_response = response_text.strip().replace("```json", "").replace("```", "")
        llm_eval_list = json.loads(clean_response)

        if not isinstance(llm_eval_list, list):
             raise json.JSONDecodeError("Response is not a list.", clean_response, 0)

        total_rules_identified = len(llm_eval_list)

        for eval_item in llm_eval_list:
            rule = eval_item.get("identified_rule", "N/A")
            evaluation = eval_item.get("evaluation", "Error")
            justification = eval_item.get("justification", "No justification provided.")

            if evaluation.lower() == "correct":
                correct_implementations += 1

            details.append(f"Rule: '{rule}' -> {evaluation}. Justification: {justification}")

    except (json.JSONDecodeError, AttributeError, TypeError) as e:
        details.append(f"Error parsing LLM response: {e}\nRaw Response: {response_text}")

    score = correct_implementations / total_rules_identified if total_rules_identified > 0 else 1.0
    if total_rules_identified == 0:
        score = "NA" # If no rules are identified, it's not applicable.

    return score, f"{correct_implementations}/{total_rules_identified}", "\n".join(details)


def run_correctness_check(gold_file, gen_file, thresholds, model, source_text, api_key):
    """Main function to run all correctness checks for a pair of files."""
    results = {
        'Gold Standard': os.path.basename(gold_file),
        'Generated Model': os.path.basename(gen_file),
    }

    try:
        gold_model = BPMNModel(gold_file)
        gen_model = BPMNModel(gen_file)
    except IOError as e:
        print(f"  - SKIPPING: Could not process file. Error: {e}")
        return None

    score, absolute, details = check_actor_assignment(gold_model, gen_model, model, thresholds)
    results['(i) Actor Assignment Score'] = score
    results['(i) Actor Assignment Absolute'] = absolute
    results['(i) Actor Assignment Details'] = details

    score, absolute, details = check_control_flow(gold_model, gen_model, model, thresholds['activities'])
    results['(ii) Control Flow Score'] = score
    results['(ii) Control Flow Absolute'] = absolute
    results['(ii) Control Flow Details'] = details

    score, absolute, details = check_decision_logic(gold_model, gen_model, model, thresholds['conditions'])
    results['(iii) Decision Logic Score'] = score
    results['(iii) Decision Logic Absolute'] = absolute
    results['(iii) Decision Logic Details'] = details

    score, absolute, details = check_conditional_logic_llm(source_text, gen_model, api_key)
    results['(iv) Conditional Logic (LLM) Score'] = score
    results['(iv) Conditional Logic (LLM) Absolute'] = absolute
    results['(iv) Conditional Logic (LLM) Details'] = details

    return results

if __name__ == "__main__":
    comparisons = [
        {"gold": "/content/smart_meter_refactored.bpmn", "gen": "/content/smart_meter_01.bpmn", "source_text_file": "/content/1_smart_meter_uc3_5.txt"},
        {"gold": "/content/smart_meter_refactored.bpmn", "gen": "/content/smart_meter_02.bpmn", "source_text_file": "/content/1_smart_meter_uc3_5.txt"},
        {"gold": "/content/smart_meter_refactored.bpmn", "gen": "/content/smart_meter_03.bpmn", "source_text_file": "/content/1_smart_meter_uc3_5.txt"},
        {"gold": "/content/gdpr_refactored.bpmn", "gen": "/content/gdpr_01.bpmn", "source_text_file": "/content/2_gdpr_article33_34.txt"},
        {"gold": "/content/gdpr_refactored.bpmn", "gen": "/content/gdpr_02.bpmn", "source_text_file": "/content/2_gdpr_article33_34.txt"},
        {"gold": "/content/gdpr_refactored.bpmn", "gen": "/content/gdpr_03.bpmn", "source_text_file": "/content/2_gdpr_article33_34.txt"},
        {"gold": "/content/blood_donor_selection.bpmn", "gen": "/content/blood_donor_01.bpmn", "source_text_file": "/content/3_pdfExtractor_blood_donor_selection.txt"},
        {"gold": "/content/blood_donor_selection.bpmn", "gen": "/content/blood_donor_02.bpmn", "source_text_file": "/content/3_pdfExtractor_blood_donor_selection.txt"},
        {"gold": "/content/blood_donor_selection.bpmn", "gen": "/content/blood_donor_03.bpmn", "source_text_file": "/content/3_pdfExtractor_blood_donor_selection.txt"},
        {"gold": "/content/health_data.bpmn", "gen": "/content/health_data_01.bpmn", "source_text_file": "/content/4_health_data_article32.txt"},
        {"gold": "/content/health_data.bpmn", "gen": "/content/health_data_02.bpmn", "source_text_file": "/content/4_health_data_article32.txt"},
        {"gold": "/content/health_data.bpmn", "gen": "/content/health_data_03.bpmn", "source_text_file": "/content/4_health_data_article32.txt"},
        {"gold": "/content/finance_customer_due_diligence.bpmn", "gen": "/content/CDD_01.bpmn", "source_text_file": "/content/5_CDD.txt"},
        {"gold": "/content/finance_customer_due_diligence.bpmn", "gen": "/content/CDD_02.bpmn", "source_text_file": "/content/5_CDD.txt"},
        {"gold": "/content/finance_customer_due_diligence.bpmn", "gen": "/content/CDD_03.bpmn", "source_text_file": "/content/5_CDD.txt"},
    ]

    output_excel_file = "bpmn_correctness_report_CARB.xlsx"

    gemini_api_key = None
    try:
        from google.colab import userdata
        gemini_api_key = userdata.get('Google_API_Key')
        if not gemini_api_key:
            print("\nWARNING: Could not retrieve 'Google_API_Key' from Colab secrets.")
            gemini_api_key = "YOUR_API_KEY_HERE"
    except ImportError:
        gemini_api_key = "YOUR_API_KEY_HERE"

    similarity_thresholds = {
        'activities': 0.50,
        'conditions': 0.50,
        'actors': 0.70
    }

    print("Loading semantic similarity model...")
    model = SentenceTransformer('all-MiniLM-L6-v2')
    print("Model loaded.")

    if not gemini_api_key or gemini_api_key == "YOUR_API_KEY_HERE":
        print("\nWARNING: Gemini API key is not set. LLM-based checks will fail.")
        print("Please edit the script or set up your environment correctly.\n")

    all_results = []
    print("\nStarting correctness check...")
    for item in comparisons:
        gold_file = item["gold"]
        gen_file = item["gen"]
        source_text_file = item["source_text_file"]

        try:
            with open(source_text_file, 'r', encoding='utf-8') as f:
                source_text = f.read()
        except FileNotFoundError:
            print(f"  - SKIPPING: Source text file not found: {source_text_file}")
            continue
        except Exception as e:
            print(f"  - SKIPPING: Error reading source text file {source_text_file}: {e}")
            continue

        print(f"  - Comparing '{os.path.basename(gold_file)}' with '{os.path.basename(gen_file)}'")
        result_data = run_correctness_check(gold_file, gen_file, similarity_thresholds, model, source_text, gemini_api_key)
        if result_data:
            all_results.append(result_data)

    if all_results:
        df = pd.DataFrame(all_results)
        column_order = [
            'Gold Standard', 'Generated Model',
            '(i) Actor Assignment Score', '(i) Actor Assignment Absolute',
            '(ii) Control Flow Score', '(ii) Control Flow Absolute',
            '(iii) Decision Logic Score', '(iii) Decision Logic Absolute',
            '(iv) Conditional Logic (LLM) Score', '(iv) Conditional Logic (LLM) Absolute',
            '(i) Actor Assignment Details', '(ii) Control Flow Details',
            '(iii) Decision Logic Details', '(iv) Conditional Logic (LLM) Details'
        ]
        df = df.reindex(columns=column_order)

        try:
            df.to_excel(output_excel_file, index=False, engine='openpyxl')
            print(f"\nSuccessfully exported {len(all_results)} results to '{output_excel_file}'")
        except Exception as e:
            print(f"\nError exporting to Excel: {e}")
    else:
        print("\nNo results to export.")


### 3. Traceability

In [None]:
!pip install sentence-transformers torch pandas openpyxl

In [None]:
import xml.etree.ElementTree as ET
from collections import defaultdict
from sentence_transformers import SentenceTransformer, util
import pandas as pd
import os


NAMESPACES = {'bpmn': 'http://www.omg.org/spec/BPMN/20100524/MODEL'}


def get_elements_by_tag(root, tag_name):
    """
    Finds all elements with a specific tag name in the BPMN XML tree.

    Args:
        root (ET.Element): The root of the XML tree.
        tag_name (str): The BPMN tag to search for (e.g., 'task', 'userTask').

    Returns:
        list: A list of found XML elements.
    """
    return root.findall(f'.//bpmn:{tag_name}', NAMESPACES)

def get_element_names(elements):
    """
    Extracts the 'name' attribute from a list of BPMN elements.
    Filters out elements without a name.

    Args:
        elements (list): A list of XML elements.

    Returns:
        list: A list of names (strings).
    """
    names = [elem.get('name') for elem in elements]
    return [name for name in names if name]

def get_actors(root):
    """
    Extracts actors from the BPMN model. Actors are represented as
    Participants (which correspond to Pools) and Lanes.

    Args:
        root (ET.Element): The root of the XML tree.

    Returns:
        list: A list of actor names.
    """
    participants = get_elements_by_tag(root, 'participant')
    lanes = get_elements_by_tag(root, 'lane')
    return get_element_names(participants + lanes)


def get_activities(root):
    """
    Extracts activities from the BPMN model.
    This includes tasks, user tasks, service tasks, etc.

    Args:
        root (ET.Element): The root of the XML tree.

    Returns:
        list: A list of activity names.
    """
    activity_tags = ['task', 'userTask', 'serviceTask', 'sendTask', 'receiveTask',
                     'manualTask', 'businessRuleTask', 'scriptTask', 'callActivity', 'subProcess']
    activities = []
    for tag in activity_tags:
        activities.extend(get_elements_by_tag(root, tag))
    return get_element_names(activities)



def calculate_precision_for_semantic_elements(gold_standard_names, generated_names, similarity_threshold, model):
    """
    Calculates precision for named elements based on semantic similarity and returns detailed match lists.

    Returns:
        tuple: A tuple containing (precision, matches_count, total_generated, matches_list, unmatched_gold, unmatched_generated).
    """
    if not generated_names:
        return 1.0, 0, 0, [], gold_standard_names, []
    if not gold_standard_names:
        return 0.0, 0, len(generated_names), [], [], generated_names

    gold_embeddings = model.encode(gold_standard_names, convert_to_tensor=True)
    generated_embeddings = model.encode(generated_names, convert_to_tensor=True)

    cosine_scores = util.pytorch_cos_sim(gold_embeddings, generated_embeddings)

    matches_list = []
    unmatched_gold_indices = set(range(len(gold_standard_names)))
    unmatched_generated_indices = set(range(len(generated_names)))

    matched_generated_indices = set()

    for gs_idx in range(len(gold_standard_names)):
        best_score = -1
        best_gen_idx = -1
        for gen_idx in range(len(generated_names)):
            if gen_idx in matched_generated_indices:
                continue

            score = cosine_scores[gs_idx][gen_idx]
            if score > best_score:
                best_score = score
                best_gen_idx = gen_idx

        if best_score >= similarity_threshold:
            matches_list.append(
                (gold_standard_names[gs_idx], generated_names[best_gen_idx], best_score.item())
            )
            if best_gen_idx != -1:
                matched_generated_indices.add(best_gen_idx)

            if gs_idx in unmatched_gold_indices:
                unmatched_gold_indices.remove(gs_idx)
            if best_gen_idx in unmatched_generated_indices:
                unmatched_generated_indices.remove(best_gen_idx)

    matches_count = len(matches_list)
    total_generated = len(generated_names)
    precision = matches_count / total_generated if total_generated > 0 else 1.0

    unmatched_gold = [gold_standard_names[i] for i in unmatched_gold_indices]
    unmatched_generated = [generated_names[i] for i in unmatched_generated_indices]

    return precision, matches_count, total_generated, matches_list, unmatched_gold, unmatched_generated


def format_matches_for_excel(matches_list, unmatched_gold, unmatched_gen):
    """Formats the detailed match lists into a single string for an Excel cell."""
    report_parts = []
    if matches_list:
        report_parts.append("MATCHED (Correctly Generated):")
        for gs, gen, score in sorted(matches_list, key=lambda x: x[2], reverse=True):
            report_parts.append(f"  - G: '{gs}' <-> M: '{gen}' ({score:.2f})")
    if unmatched_gen:
        report_parts.append("\nEXTRA IN MODEL (Incorrectly Generated):")
        for name in sorted(unmatched_gen):
            report_parts.append(f"  - {name}")
    if unmatched_gold:
        report_parts.append("\nMISSING FROM MODEL (Not relevant for precision but good to know):")
        for name in sorted(unmatched_gold):
            report_parts.append(f"  - {name}")
    return "\n".join(report_parts)


def run_traceability_check(gold_standard_file, generated_file, thresholds, model):
    """
    Main function to run the traceability check (precision).
    Returns a dictionary with the precision results.
    """
    results = {
        'Gold Standard': os.path.basename(gold_standard_file),
        'Generated Model': os.path.basename(generated_file),
    }

    try:
        gold_tree = ET.parse(gold_standard_file)
        gold_root = gold_tree.getroot()
    except (FileNotFoundError, ET.ParseError) as e:
        print(f"Error reading gold standard file '{gold_standard_file}': {e}")
        return None

    try:
        generated_tree = ET.parse(generated_file)
        generated_root = generated_tree.getroot()
    except (FileNotFoundError, ET.ParseError) as e:
        print(f"Error reading generated file '{generated_file}': {e}")
        return None

    traceability_elements = {
        'Actors': (get_actors(gold_root), get_actors(generated_root)),
        'Activities': (get_activities(gold_root), get_activities(generated_root)),
    }

    for category, (gold_list, gen_list) in traceability_elements.items():
        thresh_key = category.lower().replace(' ', '_')
        thresh = thresholds.get(thresh_key, 0.75)
        precision, matches, total_generated, matches_list, unmatched_gold, unmatched_gen = \
            calculate_precision_for_semantic_elements(gold_list, gen_list, thresh, model)

        results[f'{category} Precision'] = precision
        results[f'{category} Absolute (Precision)'] = f"{matches}/{total_generated}"
        results[f'{category} Details'] = format_matches_for_excel(matches_list, unmatched_gold, unmatched_gen)

    return results


if __name__ == "__main__":

    comparison_files = [
        ("/content/smart_meter_refactored.bpmn", "/content/smart_meter_01.bpmn"),
        ("/content/smart_meter_refactored.bpmn", "/content/smart_meter_02.bpmn"),
        ("/content/smart_meter_refactored.bpmn", "/content/smart_meter_03.bpmn"),
        ("/content/gdpr_refactored.bpmn", "/content/gdpr_01.bpmn"),
        ("/content/gdpr_refactored.bpmn", "/content/gdpr_02.bpmn"),
        ("/content/gdpr_refactored.bpmn", "/content/gdpr_03.bpmn"),
        ("/content/blood_donor_selection.bpmn", "/content/blood_donor_01.bpmn"),
        ("/content/blood_donor_selection.bpmn", "/content/blood_donor_02.bpmn"),
        ("/content/blood_donor_selection.bpmn", "/content/blood_donor_03.bpmn"),
        ("/content/health_data.bpmn", "/content/health_data_01.bpmn"),
        ("/content/health_data.bpmn", "/content/health_data_02.bpmn"),
        ("/content/health_data.bpmn", "/content/health_data_03.bpmn"),
        ("/content/finance_customer_due_diligence.bpmn", "/content/CDD_01.bpmn"),
        ("/content/finance_customer_due_diligence.bpmn", "/content/CDD_02.bpmn"),
        ("/content/finance_customer_due_diligence.bpmn", "/content/CDD_03.bpmn")
    ]

    output_excel_file = "bpmn_traceability_report_CARB.xlsx"

    similarity_thresholds = {
        'actors': 0.70,
        'activities': 0.5
    }

    print("Loading semantic similarity model (this may take a moment)...")
    model = SentenceTransformer('all-MiniLM-L6-v2')
    print("Model loaded.")

    all_results = []
    print("\nStarting traceability (precision) check...")
    for gold_file, generated_file in comparison_files:
        print(f"  - Comparing '{os.path.basename(gold_file)}' with '{os.path.basename(generated_file)}'")
        result_data = run_traceability_check(gold_file, generated_file, similarity_thresholds, model)
        if result_data:
            all_results.append(result_data)

    if all_results:
        df = pd.DataFrame(all_results)

        column_order = [
            'Gold Standard', 'Generated Model',
            'Actors Precision', 'Actors Absolute (Precision)',
            'Activities Precision', 'Activities Absolute (Precision)',
            'Actors Details', 'Activities Details'
        ]
        df = df.reindex(columns=column_order)


        try:
            df.to_excel(output_excel_file, index=False, engine='openpyxl')
            print(f"\nSuccessfully exported {len(all_results)} results to '{output_excel_file}'")
        except Exception as e:
            print(f"\nError exporting to Excel: {e}")
    else:
        print("\nNo results to export.")

# III. Layout Eval

In [None]:
!pip install pandas openpyxl requests

In [None]:
import pandas as pd
import os
import requests
import json
import time
import base64

def encode_image_to_base64(image_path):
    """Reads an image file and encodes it into a Base64 string."""
    try:
        with open(image_path, "rb") as image_file:
            return base64.b64encode(image_file.read()).decode('utf-8')
    except IOError as e:
        print(f"Error reading image file {image_path}: {e}")
        return None

def ask_llm_for_layout_analysis(image_base64, api_key):

    api_url = f"https://generativelanguage.googleapis.com/v1/models/gemini-2.5-pro:generateContent?key={api_key}"

    headers = {"Content-Type": "application/json"}

    prompt = """
    You are a meticulous BPMN 2.0 layout quality analyst. Your task is to analyze the provided BPMN diagram image and identify all layout issues.

    **Analysis Criteria:**
    Please check for the all three of the following categories of issues:
    1.  **Text Overlaps:** Any text label that overlaps with another label or a BPMN element's (e.g. pools) boundary.
    2.  **Element Overlaps:** Any BPMN shapes (tasks, events, gateways) that overlap each other. A task must be fully contained within its lane. Sequence flows (solid lines) must not cross over tasks or other elements. Note: Message flows (dashed lines) are permitted to cross over pools.
    3.  **Non-Orthogonal Edges:** Any sequence flow (solid line with a solid arrowhead) that is not horizontal or vertical. Diagonal or curved sequence flows are considered layout issues.

    **Output Format:**
    Respond with a single JSON object only. Do not include any other text or markdown formatting.
    The JSON object should have two keys:
    - "overall_score": A single float from 0.0 (very poor) to 1.0 (perfect), representing the overall layout quality.
    - "identified_issues": A list of objects. Each object must have two keys: "issue_description" (a clear, concise description of a single layout problem) and "severity" (a string: "High", "Medium", or "Low"). If there are no issues, return an empty list.

    **Example Response:**
    {
      "overall_score": 0.65,
      "identified_issues": [
        {
          "issue_description": "The label for 'Validate Order' task overlaps with the task boundary.",
          "severity": "Medium"
        },
        {
          "issue_description": "The sequence flow from 'Approve Order' to 'Ship Order' is drawn diagonally.",
          "severity": "High"
        }
      ]
    }
    """

    payload = {
        "contents": [{
            "parts": [
                {"text": prompt},
                {
                    "inline_data": {
                        "mime_type": "image/png",
                        "data": image_base64
                    }
                }
            ]
        }]
    }

    try:
        response = requests.post(api_url, headers=headers, json=payload)
        response.raise_for_status()

        response_json = response.json()

        if 'candidates' in response_json and len(response_json['candidates']) > 0:
            content = response_json['candidates'][0].get('content', {})
            if 'parts' in content and len(content['parts']) > 0:
                return content['parts'][0].get('text', '')
        return '{"error": "Could not parse LLM response."}'

    except requests.exceptions.RequestException as e:
        print(f"API Request Error: {e}")
        return f'{{"error": "API request failed: {e}"}}'
    finally:
        time.sleep(1)


def run_layout_check(image_path, api_key):
    """
    Main function to run the layout check for a single image.
    Returns a dictionary with the results.
    """
    results = {
        'Model Image': os.path.basename(image_path),
        'Overall Score': None,
        'Issues Count': 0,
        'Issues Details': 'N/A'
    }

    image_base64 = encode_image_to_base64(image_path)
    if not image_base64:
        results['Issues Details'] = "Error: Could not read or encode image file."
        return results

    response_text = ask_llm_for_layout_analysis(image_base64, api_key)

    try:
        clean_response = response_text.strip().replace("```json", "").replace("```", "")
        llm_eval = json.loads(clean_response)

        if "error" in llm_eval:
             raise ValueError(llm_eval["error"])

        results['Overall Score'] = llm_eval.get('overall_score', 0.0)
        issues = llm_eval.get('identified_issues', [])
        results['Issues Count'] = len(issues)

        if issues:
            details_list = [f"- {item.get('issue_description', 'N/A')} (Severity: {item.get('severity', 'N/A')})" for item in issues]
            results['Issues Details'] = "\n".join(details_list)
        else:
            results['Issues Details'] = "No layout issues identified."

    except (json.JSONDecodeError, AttributeError, ValueError) as e:
        results['Issues Details'] = f"Error parsing LLM response: {e}\nRaw Response: {response_text}"

    return results

if __name__ == "__main__":

    image_files_to_check = [
        "/content/smart_meter_01.png",
        "/content/gdpr_01.png",
        "/content/blood_donor_01.png",
        "/content/health_data_01.png",
        "/content/CDD_01.png"
    ]

    output_excel_file = "bpmn_layout_report_CARB.xlsx"

    gemini_api_key = None
    try:
        from google.colab import userdata
        gemini_api_key = userdata.get('Google_API_Key')
        if not gemini_api_key:
            print("\nWARNING: Could not retrieve 'Google_API_Key' from Colab secrets.")
            print("Please ensure the secret is created and has the correct name.\n")
            gemini_api_key = "YOUR_API_KEY_HERE" # Fallback
    except ImportError:
        print("\nINFO: Not running in a Google Colab environment. Using placeholder for API key.")
        gemini_api_key = "YOUR_API_KEY_HERE"

    if gemini_api_key == "YOUR_API_KEY_HERE":
        print("\nWARNING: Gemini API key is not set. The script will not be able to run.")
        print("Please edit the script or set up your environment correctly.\n")

    all_results = []
    print("\nStarting BPMN layout analysis...")
    for image_file in image_files_to_check:
        if not os.path.exists(image_file):
            print(f"  - Skipping '{image_file}' (File not found).")
            continue

        print(f"  - Analyzing '{os.path.basename(image_file)}'...")
        result_data = run_layout_check(image_file, gemini_api_key)
        if result_data:
            all_results.append(result_data)

    if all_results:
        df = pd.DataFrame(all_results)

        column_order = [
            'Model Image', 'Overall Score', 'Issues Count', 'Issues Details'
        ]
        df = df.reindex(columns=column_order)

        try:
            df.to_excel(output_excel_file, index=False, engine='openpyxl')
            print(f"\nSuccessfully exported {len(all_results)} results to '{output_excel_file}'")
        except Exception as e:
            print(f"\nError exporting to Excel: {e}")
    else:
        print("\nNo results to export.")

# IV. Various Helper

[A] extracting text from .pdf

In [None]:
!pip install PyPDF2

In [None]:
import PyPDF2
import os

def extract_text_from_pdf(pdf_path):
    """
    Extracts text from a given PDF file.

    Args:
        pdf_path (str): The full path to the PDF file.

    Returns:
        str: The extracted text from the PDF, or an error message if something goes wrong.
    """
    if not os.path.exists(pdf_path):
        return f"Error: The file '{pdf_path}' was not found."

    try:
        with open(pdf_path, 'rb') as pdf_file:
            pdf_reader = PyPDF2.PdfReader(pdf_file)

            full_text = ""

            for page_num in range(len(pdf_reader.pages)):
                page = pdf_reader.pages[page_num]

                text = page.extract_text()

                if text:
                    full_text += text + "\n--- PAGE BREAK ---\n"

            return full_text

    except Exception as e:
        return f"An error occurred while processing the PDF: {e}"

def save_text_to_file(text, output_filename="extracted_text.txt"):
    """
    Saves the given text to a .txt file.

    Args:
        text (str): The text content to save.
        output_filename (str): The name of the output file.
    """
    try:
        with open(output_filename, 'w', encoding='utf-8') as f:
            f.write(text)
        print(f"Successfully saved extracted text to '{output_filename}'")
    except Exception as e:
        print(f"Could not save text to file: {e}")


if __name__ == "__main__":
    pdf_file_path = "2_blood_donor_selection.pdf"

    extracted_text = extract_text_from_pdf(pdf_file_path)

    if "Error:" not in extracted_text and "An error occurred" not in extracted_text:
        save_text_to_file(extracted_text)
    else:
        print(extracted_text)