# Parallel execution tree expansion with the simulator API

Questo notebook mostra come usare le API REST del simulatore per generare un execution tree in parallelo. L'obiettivo è mantenere l'albero condiviso tra i thread, delegando la creazione dei nodi al simulatore e rispettando il vincolo che ogni thread che genera figli con una natura attiva continui immediatamente l'espansione di quella sottostruttura.


In [None]:
import json
import sys
sys.path.append('simulator/src')
sys.path.append('src')
import os
import threading
import queue
import copy
from collections import deque
from typing import Any, Dict, Iterable, List, Optional, Tuple
import requests
import graphviz
import requests
from IPython.display import display
from src.utils.env import IMPACTS_NAMES
from dot import wrapper_execution_tree_to_dot, get_path_to_current_node


URL = "127.0.0.1"
SIMULATOR_PORT = 8001
SOLVER_PORT = 8000

SIMULATOR_SERVER = os.getenv("SIMULATOR_SERVER", "http://127.0.0.1:8001/")
HEADERS = {"Content-Type": "application/json"}

In [None]:
from IPython.core.display import SVG

with open("bpmn_fig8_bound_135_15.json", "r") as f:
    bpmn_file = f.read()

bpmn_definition = json.loads(bpmn_file)
impacts_names = bpmn_definition.get(IMPACTS_NAMES, [])

try:
    resp = requests.get(f"http://{URL}:{SOLVER_PORT}/create_bpmn", json={'bpmn': bpmn_definition},  headers=HEADERS)
    resp.raise_for_status()
    display(SVG(graphviz.Source(resp.json()['bpmn_dot']).pipe(format="svg")))

    resp = requests.get(f"http://{URL}:{SOLVER_PORT}/create_parse_tree", json={'bpmn': bpmn_definition},  headers=HEADERS)
    resp.raise_for_status()
    parse_tree = resp.json()['parse_tree']

except requests.exceptions.HTTPError as e:
    print(f"HTTP Error ({resp.status_code}):", resp.json())

In [None]:
bpmn_definition

In [None]:
if 'normalize_parse_tree' not in globals():
    def normalize_parse_tree(data):
        """Return a copy of the parse tree with normalized node types."""
        if isinstance(data, str):
            tree_data = json.loads(data)
        else:
            tree_data = copy.deepcopy(data)

        def _normalize(node):
            if isinstance(node, dict):
                node_type = node.get('type')
                if isinstance(node_type, str):
                    node['type'] = node_type.capitalize()
                for key in ('sx_child', 'dx_child'):
                    child = node.get(key)
                    if isinstance(child, (dict, list)):
                        _normalize(child)
                transitions = node.get('transitions')
                if isinstance(transitions, dict):
                    for child in transitions.values():
                        if isinstance(child, (dict, list)):
                            _normalize(child)
                children = node.get('children')
                if isinstance(children, list):
                    for child in children:
                        if isinstance(child, (dict, list)):
                            _normalize(child)
            elif isinstance(node, list):
                for item in node:
                    if isinstance(item, (dict, list)):
                        _normalize(item)
            return node

        return _normalize(tree_data)



## BPMN


In [None]:
request_json = { "bpmn": parse_tree }

response = requests.post(f"http://{URL}:{SIMULATOR_PORT}/execute", headers=HEADERS, json=request_json)
response_json = response.json()

bpmn = response_json['bpmn']
petri_net = response_json['petri_net']
petri_net_dot = response_json['petri_net_dot']
execution_tree = response_json['execution_tree']


In [None]:
execution_tree

In [None]:

from dot import get_active_region_by_pn, wrap_to_dot
marking = {"21": {"token": 1}, "7": {"token": 1}}

active_region_by_pn = get_active_region_by_pn(petri_net, marking)
dot = wrap_to_dot(bpmn, impacts_names, active_region_by_pn)
graph = graphviz.Source(dot, format="svg")

display(graph)

## Worker

Use endpoint `/execute`. Accetta un payload JSON e restituisce strutture pronte da reinserire nell'albero condiviso.


In [None]:
class Worker:
    def __init__(self, base_url: str = SIMULATOR_SERVER, session: Optional[requests.Session] = None) -> None:
        self.base_url = base_url.rstrip("/") + "/"
        self.session = session or requests.Session()

    def execute(self, payload: Dict[str, Any]) -> Dict[str, Any]:
        response = self.session.post(self.base_url + "execute", headers=HEADERS, json=payload, timeout=60)
        response.raise_for_status()
        data = response.json()
        if "error" in data:
            raise RuntimeError(f"Worker error: {data['error']}")
        return data

    def bootstrap(self, parse_tree_payload: Dict[str, Any]) -> Dict[str, Any]:
        payload = {"bpmn": parse_tree_payload}
        return self.execute(payload)

    def expand_node(
        self,
        shared_state: Dict[str, Any],
        node_id: str,
        *,
        choices: Optional[List[str]] = None,
    ) -> Dict[str, Any]:
        """
        Expand a node by calling the simulator /execute endpoint.
        The server works on execution_tree.current_node, so we update it before calling.
        """
        # Clone the execution tree and set current_node to the target node
        exec_tree = copy.deepcopy(shared_state["execution_tree"])
        exec_tree["current_node"] = node_id
        
        request_payload = {
            "bpmn": shared_state["bpmn"],
            "petri_net": shared_state["petri_net"],
            "execution_tree": exec_tree,
        }
        if choices is not None:
            request_payload["choices"] = list(choices)
        
        response = self.execute(request_payload)
        return response

In [None]:
class SimulatorClient(Worker):
    """Lightweight client that reuses the Worker implementation."""
    pass


## Utilità per navigare l'albero

Funzioni di supporto per contare i nodi, estrarre percorsi e aggiornare porzioni dell'albero senza blocchi globali.


In [None]:
def iter_leaf_nodes(node: Dict[str, Any], path: Tuple[str, ...] = ()) -> Iterable[Tuple[str, ...]]:
    """Return paths to all leaf nodes (nodes without children)."""
    children = node.get("children", [])
    if not children:
        yield path
    else:
        for idx, child in enumerate(children):
            yield from iter_leaf_nodes(child, path + (str(idx),))


def get_node_at_path(root: Dict[str, Any], path: Tuple[str, ...]) -> Dict[str, Any]:
    """Follow the path and return the dictionary corresponding to the node."""
    node = root
    for key in path:
        children = node.get("children", [])
        node = children[int(key)]
    return node


def get_node_by_id(root: Dict[str, Any], node_id: str) -> Optional[Dict[str, Any]]:
    """Find a node by its ID in the tree."""
    if root.get("id") == node_id:
        return root
    for child in root.get("children", []):
        result = get_node_by_id(child, node_id)
        if result:
            return result
    return None


def is_final_node(node: Dict[str, Any], final_marking: Dict[str, Dict[str, float]]) -> bool:
    """Check if a node represents a final marking."""
    marking = node.get("snapshot", {}).get("marking", {})
    if not marking or not final_marking:
        return False
    # Compare markings (simplified check)
    for place_name, place_data in final_marking.items():
        if marking.get(place_name, {}).get("token", 0) != place_data.get("token", 0):
            return False
    return True


def count_nodes(node: Dict[str, Any]) -> int:
    """Recursively count the nodes stored in the tree."""
    total = 1
    for child in node.get("children", []):
        total += count_nodes(child)
    return total


def clone_with_children(node: Dict[str, Any]) -> Dict[str, Any]:
    """Clone the tree (already uses 'children' format from server)."""
    if not node:
        return {}
    
    cloned = {key: copy.deepcopy(value) for key, value in node.items() if key != "children"}
    children = [clone_with_children(child) for child in node.get("children", [])]
    cloned["children"] = children
    return cloned

## Scheduler parallelo

`ParallelExecutionTreeBuilder` coordina i thread e garantisce che i nodi figli con nature attive vengano espansi immediatamente dallo stesso worker. Ogni nodo dell'albero possiede un proprio lock per evitare un collo di bottiglia globale durante l'inserimento dei figli.


In [None]:
class ParallelExecutionTreeBuilder:
    """Expand an execution tree by driving the simulator in parallel."""

    def __init__(self, client: SimulatorClient, max_workers: int = 1) -> None:
        if max_workers < 1:
            raise ValueError("max_workers must be >= 1")
        self.client = client
        self.max_workers = max_workers
        self._queue: "queue.Queue[str]" = queue.Queue()  # Queue of node IDs to expand
        self._node_locks: Dict[str, threading.Lock] = {}
        self._locks_guard = threading.Lock()
        self._stop_event = threading.Event()
        self._expanded_nodes: set = set()  # Track expanded nodes
        self._tree_lock = threading.Lock()  # Global lock for tree updates

    def _lock_for(self, node_id: str) -> threading.Lock:
        with self._locks_guard:
            lock = self._node_locks.get(node_id)
            if lock is None:
                lock = threading.Lock()
                self._node_locks[node_id] = lock
            return lock

    def _enqueue_initial_frontier(self, tree_root: Dict[str, Any]) -> None:
        # Always start by expanding the root node
        root_id = tree_root.get("id", "0")
        self._queue.put(root_id)

    def _schedule_children(self, node_id: str) -> None:
        """Find the node in the tree and schedule its children for expansion."""
        with self._tree_lock:
            node = get_node_by_id(self._shared_tree["execution_tree"]["root"], node_id)
            if not node:
                return
            
            children = node.get("children", [])
            for child in children:
                child_id = child.get("id")
                if child_id and child_id not in self._expanded_nodes:
                    # Check if it's a final node
                    if not is_final_node(child, self._shared_tree["petri_net"]["final_marking"]):
                        self._queue.put(child_id)

    def _expand_node(self, node_id: str) -> None:
        if self._stop_event.is_set():
            return

        # Check if already expanded
        with self._locks_guard:
            if node_id in self._expanded_nodes:
                return
            self._expanded_nodes.add(node_id)
        
        lock = self._lock_for(node_id)
        with lock:
            try:
                # Check if node already has children
                with self._tree_lock:
                    node = get_node_by_id(self._shared_tree["execution_tree"]["root"], node_id)
                    if node and node.get("children"):
                        # Already expanded, just schedule children
                        self._schedule_children(node_id)
                        return
                
                # Expand with default choices - server creates ALL children for natures
                response = self.client.expand_node(self._shared_tree, node_id, choices=[])
                
                # Update shared tree with response
                with self._tree_lock:
                    if "execution_tree" in response:
                        self._shared_tree["execution_tree"] = response["execution_tree"]
                    if "petri_net" in response:
                        self._shared_tree["petri_net"] = response["petri_net"]
                    if "petri_net_dot" in response:
                        self._shared_tree["petri_net_dot"] = response["petri_net_dot"]
                    if "bpmn" in response:
                        self._shared_tree["bpmn"] = response["bpmn"]
                
                # Schedule ALL children for expansion
                self._schedule_children(node_id)
            except Exception as e:
                print(f"Error expanding node {node_id}: {e}")

    def _worker(self) -> None:
        while not self._stop_event.is_set():
            try:
                node_id = self._queue.get(timeout=0.1)
            except queue.Empty:
                continue
            try:
                self._expand_node(node_id)
            finally:
                self._queue.task_done()

    def build(self, parse_tree_payload: Dict[str, Any]) -> Dict[str, Any]:
        bootstrap = self.client.bootstrap(parse_tree_payload)
        self._shared_tree = {
            "bpmn": bootstrap["bpmn"],
            "petri_net": bootstrap["petri_net"],
            "petri_net_dot": bootstrap["petri_net_dot"],
            "execution_tree": bootstrap["execution_tree"],
        }
        self._enqueue_initial_frontier(self._shared_tree["execution_tree"]["root"])
        
        workers: List[threading.Thread] = []
        for idx in range(self.max_workers):
            t = threading.Thread(target=self._worker, name=f"tree-worker-{idx}", daemon=True)
            t.start()
            workers.append(t)
        
        self._queue.join()
        self._stop_event.set()
        
        for t in workers:
            t.join()
        
        return self._shared_tree


## Algoritmo di espansione completa

Questo algoritmo espande ricorsivamente tutte le transizioni pendenti sfruttando le API del simulatore fino a saturare l'intero execution tree.


In [None]:
def build_complete_execution_tree(client: SimulatorClient, parse_tree_payload: Dict[str, Any]) -> Dict[str, Any]:
    """Espande TUTTO l'albero provando TUTTE le transizioni abilitate."""
    shared_state = client.bootstrap(parse_tree_payload)
    final_marking = shared_state["petri_net"]["final_marking"]
    
    expanded_nodes = set()
    frontier: deque[str] = deque([shared_state["execution_tree"]["root"]["id"]])
    iteration = 0
    
    def get_enabled_transitions(node_marking, petri_net):
        """Ottiene tutte le transizioni abilitate dal marking."""
        enabled = []
        transitions = petri_net.get("transitions", [])
        
        if isinstance(transitions, dict):
            trans_items = transitions.items()
        else:
            trans_items = [(t.get("name", f"t_{i}"), t) for i, t in enumerate(transitions)]
        
        for trans_name, trans_data in trans_items:
            can_fire = True
            for place in trans_data.get("inputs", []):
                if node_marking.get(place, {}).get("token", 0) <= 0:
                    can_fire = False
                    break
            if can_fire:
                enabled.append(trans_name)
        return enabled
    
    while frontier:
        iteration += 1
        node_id = frontier.popleft()
        
        if node_id in expanded_nodes:
            continue
            
        print(f"[{iteration}] Node: {node_id}")
        
        node = get_node_by_id(shared_state["execution_tree"]["root"], node_id)
        if not node:
            print(f"  ✗ Not found")
            continue
        
        if is_final_node(node, final_marking):
            print(f"  ✓ Final node")
            expanded_nodes.add(node_id)
            continue
        
        # Se ha già figli, mettili in coda
        if node.get("children"):
            children = node["children"]
            print(f"  → Has {len(children)} children")
            for child in children:
                if child["id"] not in expanded_nodes:
                    frontier.append(child["id"])
            expanded_nodes.add(node_id)
            continue
        
        # Espansione: identifica transizioni abilitate
        marking = node.get("snapshot", {}).get("marking", {})
        enabled = get_enabled_transitions(marking, shared_state["petri_net"])
        
        if not enabled:
            print(f"  ✗ No transitions")
            expanded_nodes.add(node_id)
            continue
        
        print(f"  → {len(enabled)} transitions enabled")
        
        # STRATEGIA: Prova TUTTE le transizioni abilitate
        # Se il server crea rami diversi, li raccogliamo tutti
        all_children_map = {}  # child_id -> child_node
        
        # Prova con default prima
        try:
            print(f"    • Trying default expansion")
            response = client.expand_node(shared_state, node_id, choices=[])
            
            for key in ("bpmn", "petri_net", "petri_net_dot", "execution_tree"):
                if key in response:
                    shared_state[key] = response[key]
            
            updated_node = get_node_by_id(shared_state["execution_tree"]["root"], node_id)
            if updated_node and updated_node.get("children"):
                for child in updated_node["children"]:
                    all_children_map[child["id"]] = child
                    print(f"      ✓ Child: {child['id']}")
        except Exception as e:
            print(f"      ✗ Error: {e}")
        
        # Se abbiamo raccolto 1 solo figlio e ci sono MOLTE transizioni,
        # prova a espandere con alcune specifiche per vedere se crea rami diversi
        if len(all_children_map) == 1 and len(enabled) > 1:
            print(f"    • Only 1 child so far, trying specific transitions...")
            # Prova fino a 5 transizioni diverse
            for trans_name in enabled[:5]:
                try:
                    print(f"      • Trying {trans_name}")
                    response = client.expand_node(shared_state, node_id, choices=[trans_name])
                    
                    for key in ("bpmn", "petri_net", "petri_net_dot", "execution_tree"):
                        if key in response:
                            shared_state[key] = response[key]
                    
                    updated_node = get_node_by_id(shared_state["execution_tree"]["root"], node_id)
                    if updated_node and updated_node.get("children"):
                        for child in updated_node["children"]:
                            if child["id"] not in all_children_map:
                                all_children_map[child["id"]] = child
                                print(f"        ✓ NEW child: {child['id']}")
                            else:
                                print(f"        = Same child: {child['id']}")
                except Exception as e:
                    print(f"        ✗ Error: {e}")
        
        # Aggiorna il nodo con TUTTI i children raccolti
        if len(all_children_map) > 1:
            print(f"    → Merging {len(all_children_map)} children")
            node_to_update = get_node_by_id(shared_state["execution_tree"]["root"], node_id)
            if node_to_update:
                node_to_update["children"] = list(all_children_map.values())
        
        # Metti tutti i figli in coda
        for child in all_children_map.values():
            frontier.append(child["id"])
        
        expanded_nodes.add(node_id)
    
    print(f"\n✓ Complete: {iteration} iterations, {len(expanded_nodes)} nodes")
    print(f"  Total tree nodes: {count_nodes(shared_state['execution_tree']['root'])}")
    
    return shared_state


## Esecuzione della costruzione parallela

L'esempio seguente utilizza quattro thread. Alla fine viene mostrato il numero totale di nodi generati.


In [None]:
parse_tree_payload = parse_tree
# Copia con tipi normalizzati per eventuali utility locali
parse_tree_dict = normalize_parse_tree(parse_tree_payload)
impacts_names = bpmn_definition.get(IMPACTS_NAMES, [])



In [None]:
client = SimulatorClient()
builder = ParallelExecutionTreeBuilder(client, max_workers=4)

shared_state_parallel = builder.build(parse_tree_payload)
parallel_root = shared_state_parallel["execution_tree"]["root"]
print("Numero totale di nodi generati (parallelo):", count_nodes(parallel_root))

shared_state_complete = build_complete_execution_tree(client, parse_tree_payload)
complete_root = shared_state_complete["execution_tree"]["root"]
print("Numero totale di nodi generati (algoritmo completo):", count_nodes(complete_root))

execution_tree = shared_state_complete["execution_tree"]
current_node_id = execution_tree.get("current_node")
converted_root = clone_with_children(execution_tree.get("root", {}))
highlight_path = get_path_to_current_node(converted_root, current_node_id) or []
dot_source = wrapper_execution_tree_to_dot(converted_root, impacts_names, highlight_path)
graph = graphviz.Source(dot_source, format="svg")
display(graph)
