In [None]:
import sys
sys.path.append('sources')
from etl.cpi import load_cpi_file, analize_cpi
from cpi_to_mdp.cpitospin import create_cpi_visualization, create_spin_visualization

# Choose the test
All the test available are in folder 'CPIs'

In [None]:
process_name = "parallel-loop" #"" #choice-task-init-reverse

cpi_dict = load_cpi_file(process_name)
if cpi_dict:
	spin_model = analize_cpi(cpi_dict)

In [None]:
cpi_viz = create_cpi_visualization(cpi_dict)
display(cpi_viz)
cpi_output = process_name + '_cpi'
cpi_viz.render(cpi_output, cleanup=True)

# FROM CPI TO SPIN

## Process Visualization

The CPI dictionary can be visualized as a directed graph to better understand its structure. In this visualization:

- **Task nodes** show duration and impact values (cost, time, quality)
- **Nature nodes** display their probability values (e.g., "p=0.7")
- **Sequence nodes** connect components with "head" and "tail" edges
- **Parallel nodes** show concurrent branches with "first" and "second" edges
- **Choice nodes** represent decision points with "true" and "false" branches

Each node type is represented as a box, with edges showing the relationships between components. This hierarchical representation helps understand the process flow and decision points in the system.

In [None]:
spin_viz = create_spin_visualization(spin_model)
display(spin_viz)
spin_output = process_name + '_spin'
spin_viz.render(spin_output, cleanup=True)

# FROM SPIN TO PRISM

In [None]:
prism_model = spin_model.generate_prism_model()
lines = prism_model.split('\n')

output_file =  "models/" + process_name + '.nm'
with open(output_file, 'w') as f:
	f.write(prism_model)

print("PRISM model generated successfully!")
print(f"\nFirst 20 lines of PRISM model:")
print("-" * 40)
for i, line in enumerate(lines[:20]):
	print(f"{i + 1:2d}: {line}")

if len(lines) > 20:
	print(f"... ({len(lines) - 20} more lines)")


## RUN PRISM ANALYSIS

In [None]:
from prism import run_prism_analysis

run_prism_analysis(process_name, create_mdp=True)

# FROM PRISM TO MDP

Since PRISM models are based on an extended form of MDPs, we now provide the compact version of the MDP generated by PRISM, which corresponds to the equivalent SPIN model. This enhances clarity and facilitates easier comparison between the two representations.

CASE 1: deterministic, 2 states directly connected

CASE 2: time passage 2 states connected with label="{{∅}}" fillcolor=lightsalmon

Case 3: choice: red transition with internal writing some choice I take (1+) and saying true/false --> written in the next state

case 4 natural: as choice but green transition and label on the probability arcs

Case 5: nature and contemporary choice: double transition first choice and then nature

In [None]:
from etl.dot_visualization import is_passing_time, places_label, add_impacts, add_empty_transition, add_choice, \
    add_nature, add_loop
from etl.prism_model import load_prism_model, find_next_state, find_exclusive_gateways, decisions_combinations
from graphviz import Source


def manage_xor_splits(idx, idx_next, probability, choices, natures, loops):
    lines = []

    if len(choices) > 0 and len(loops) == 0 and len(natures) == 0:
        for decision_combination in decisions_combinations(choices):
            lines.extend(add_choice(decision_combination, idx, idx_next))

    elif len(choices) == 0 and len(loops) == 0  and len(natures) > 0:
        for decision_combination in decisions_combinations(natures):
            lines.extend(add_nature(decision_combination, idx, idx_next, probability))

    elif len(choices) == 0 and len(natures) == 0 and len(loops) > 0:
        for decision_combination in decisions_combinations(loops):
            lines.extend(add_loop(decision_combination, idx, idx_next, probability))

    else:
        for decision_combination in decisions_combinations(choices):
            idx_c = ''.join(list(decision_combination.keys()) + list(decision_combination.values()))
            decision_combination_str = '\n'.join(f'{k}{v}' for k, v in decision_combination.items())
            lines.append(f'{idx_c} [label="{{{decision_combination_str}}}" , style="filled", fillcolor="lightcoral", shape="ellipse" ];')

            if f'{idx} -> {idx_c};' not in lines:
                lines.append(f'{idx} -> {idx_c};')

            for decision_combinations_nat in decisions_combinations(natures):
                for decision_combinations_loop in decisions_combinations(loops):
                    merged_decision_combination = {**decision_combinations_nat, **decision_combinations_loop}
                    idx_nat_loop = ''.join(list(merged_decision_combination.keys()) + list(merged_decision_combination.values()))
                    decision_combination_str = '\n'.join(f'{k}{v}' for k, v in merged_decision_combination.items())

                    lines.append(f'{idx_c}{idx_nat_loop} [label="{{{decision_combination_str}}}" , style="filled", fillcolor="lightgreen", shape="ellipse" ];')
                    lines.append(f'{idx_c} -> {idx_c}{idx_nat_loop}[label = "{probability}"];')
                    lines.append(f'{idx_c}{idx_nat_loop} -> {idx_next};')

    return lines

def is_loop_place(places):
    for place in places:
        print("place:", place, type(place))
        if "loop" in place:
            return True
    return False

def create_states_mdp(states, trans_dict, rewards_dict, save =False):
    lines = ['digraph LTS {', 'node [label="", shape="box"];']

    for idx, places in states.items():
        lines.append(places_label(idx, places)) # add node states

        next_states = find_next_state(idx, trans_dict, states)

        if len(next_states) == 1: # case 1 or case 2
            idx_next, _, label = next_states[0]
            next_places = states[idx_next]
            if label:
                lines.extend(add_impacts(idx, label, rewards_dict))
            if is_passing_time(places, next_places):# Case 2
                lines.extend(add_empty_transition(idx, idx_next))
            else:# Case 1
                lines.append(f'"{idx}" -> "{idx_next}";')
        else:
            for t in next_states:
                idx_next, probability, label = t
                next_places = states[idx_next]
                choices, natures, loops = find_exclusive_gateways(places, next_places)
                print(f'choices: {choices}\n natures: {natures}\n loops: {loops}', 'label:', label)
                if len(choices) > 0 or len(loops) > 0 or len(natures) > 0:
                    lines.extend(manage_xor_splits(idx, idx_next, probability, choices, natures, loops))
                else: # Ricongiungimento diretto
                    print("idx_next", idx_next, "next_places", next_places)
                    lines.append(f'"{idx}" -> "{idx_next}";')


    lines.append('}')
    compress_dot = "\n".join(lines)
    if save:
        Source(compress_dot).render(filename=f"models/{process_name}_cleaned", format='svg', cleanup=True)
    return compress_dot


states, transitions, rewards = load_prism_model(process_name)

compressed_dot = create_states_mdp(states, transitions, rewards)

#print(compressed_dot)
Source(compressed_dot)