# Probabilistic Computation Tree Logic

In [2]:
from scipy.optimize import linprog
import numpy as np
import pandas as pd
from mdp import run as run_mdp

In [51]:
# Já definida em implementation_cours, aqui está incrementada para states com actions
def find_states(df, target_state):
    """
    Identifies states leading to a target state with certainty (S_sure), states that may lead to the target state (S_may),
    and states that can never reach the target state (S_never) in a Markov chain represented by a DataFrame.

    Parameters:
    - df (pandas.DataFrame): DataFrame containing the transition probabilities with 'Origin' as states and other columns as possible next states.
    - target_state (str): The state of interest to trace back from.

    Returns:
    - tuple of lists: A tuple containing three lists representing states that are sure to reach the target (S_sure),
                      states that may reach the target (S_may), and states that can never reach the target (S_never).
    """
    # Identify all possible states from the 'Origin' column
    states = df['Origin'].tolist()
    
    # Initialize sets for S_sure, S_may, and S_never
    S_sure = set()
    S_may = set()
    S_never = set(states)  # Initially, consider all states as S_never
    S_sure.add(target_state)
    S_never.remove(target_state)
    
    # To identify S_may and adjust S_never, consider reverse transitions.
    # This involves more complex logic for indirect paths; here we simplify to only direct transitions.
    visited = set()
    to_visit = [target_state]
    while to_visit:
        current_state = to_visit.pop()
        visited.add(current_state)
        for origin in states:
            if len(df.loc[df['Origin'] == origin]) == 1: # If there's only one possible action or NA
                if df.loc[df['Origin'] == origin, current_state].values[0] > 0:
                    if df.loc[df['Origin'] == origin, current_state].values[0] < 1.0:
                        S_may.add(origin)
                        if origin in S_never:
                            S_never.remove(origin)
                        if origin not in visited:
                            to_visit.append(origin)
                    else:
                        S_sure.add(origin)
                        if origin in S_never:
                            S_never.remove(origin)
            else:
                if df.loc[df['Origin'] == origin, current_state].values.sum() > 0:
                    if 1.0 not in df.loc[df['Origin'] == origin, current_state].values:
                        S_may.add(origin)
                        if origin in S_never:
                            S_never.remove(origin)
                        if origin not in visited:
                            to_visit.append(origin)
                    else:
                        S_sure.add(origin)
                        if origin in S_never:
                            S_never.remove(origin)
    
    return list(S_sure), list(S_may), list(S_never)

In [52]:
printer = run_mdp(path = "mdp_examples\\correct_ex..mdp", return_printer=True)

ANTLR runtime and generated code versions disagree: 4.11.1!=4.13.1
ANTLR runtime and generated code versions disagree: 4.11.1!=4.13.1
Initialy declared states: ['S0', 'S1', 'S2']
Initialy declared actions: ['a', 'b', 'c']
Transition from S0 with no action and targets ['S1', 'S2'] with weights [5, 5]
Transition from S1 with action b and targets ['S1', 'S0'] with weights [2, 8]
Transition from S1 with action a and targets ['S2', 'S0', 'S1', 'S3'] with weights [1, 3, 6, 2]
Transition from S2 with action c and targets ['S0', 'S1', 'S3'] with weights [5, 5, 10]
Transition from S2 with action d and targets ['S0', 'S3'] with weights [5, 7]
Transition from S3 with action e and targets ['S1', 'S2'] with weights [2, 2]

( 0 ) - Undeclared state S3 targeted in transition: S1 with action a, declared automaticaly
( 1 ) - Undeclared action in transition: S2 with action d, declared automaticaly
( 2 ) - Undeclared action in transition: S3 with action e, declared automaticaly
( 3 ) - State S0 reward wa

In [53]:
df = printer.transactions_prob

S_sure, S_may, S_never = find_states(df, 'S1')

S_sure, sorted(S_may), sorted(S_never)

print("S_sure:\n", S_sure)
print("S_may:\n", S_may)
print("S_never:\n", S_never)

S_sure:
 ['S1']
S_may:
 ['S1', 'S2', 'S0', 'S3']
S_never:
 []


# Solving system for markov chains (no actions)

In [54]:
def solve_system(df, S_may, S_sure):
    """
    Solves the system y = Ay + b for a given set of states.

    This function computes the transition probability matrix A and vector b based on
    the states specified in S_may and S_sure. Then, it solves the system of linear
    equations to find y.

    Parameters:
    - df (pandas.DataFrame): DataFrame containing the transition probabilities.
    - S_may (list): List of states that may eventually lead to the target state (S_sure).
    - S_sure (list): List of target states which can be reached with certainty from the selected state.

    Returns:
    - A (numpy.ndarray): Transition probability matrix for states in S_may.
    - b (numpy.ndarray): Vector containing probabilities of transitioning to states in S_sure from S_may.
    - y (numpy.ndarray): Solution to the system of equations y = Ay + b.

    Raises:
    - np.linalg.LinAlgError: If the system of equations cannot be solved due to matrix singularity.
    """

    # Ensure S_may and S_sure are lists
    if not isinstance(S_may, list):
        S_may = list(S_may)
    if not isinstance(S_sure, list):
        S_sure = list(S_sure)

    # Create matrix A
    A = df.loc[df['Origin'].isin(S_may), S_may].values

    # Create vector b
    b = np.sum(df.loc[df['Origin'].isin(S_may), S_sure].values, axis=1)

    # Solve the system y = Ay + b
    I = np.eye(len(S_may))  # Identity matrix
    try:
        y = np.linalg.solve(I - A, b)
    except np.linalg.LinAlgError as e:
        raise np.linalg.LinAlgError(f"Error solving the system: {e}")

    return A, b, y


In [55]:
printer = run_mdp(path = "mdp_examples\\lancer_de_pieces.mdp", return_printer=True)

ANTLR runtime and generated code versions disagree: 4.11.1!=4.13.1
ANTLR runtime and generated code versions disagree: 4.11.1!=4.13.1
Initialy declared states: ['I', 'S0', 'S1', 'S2', 'S3', 'S4', 'S5', 'F1', 'F2', 'F3', 'F4', 'F5', 'F6']
Initialy declared actions: ['NA']
Transition from I with no action and targets ['S0', 'S1'] with weights [1, 1]
Transition from S0 with no action and targets ['S2', 'S3'] with weights [1, 1]
Transition from S1 with no action and targets ['S4', 'S5'] with weights [1, 1]
Transition from S2 with no action and targets ['F1', 'S0'] with weights [1, 1]
Transition from S3 with no action and targets ['F2', 'F3'] with weights [1, 1]
Transition from S4 with no action and targets ['S1', 'F4'] with weights [1, 1]
Transition from S5 with no action and targets ['F5', 'F6'] with weights [1, 1]

( 0 ) - State I reward wasn't assigned, using zero as reward
( 1 ) - State S0 reward wasn't assigned, using zero as reward
( 2 ) - State S1 reward wasn't assigned, using zero 

In [56]:
df = printer.transactions_prob

S_sure, S_may, S_never = find_states(df, 'S1')
S_sure, sorted(S_may), sorted(S_never)

print("S_sure:\n", S_sure)
print("S_may:\n", S_may)
print("S_never:\n", S_never)
A, b, y = solve_system(df, S_may, S_sure)

print("Matriz A:\n", A)
print("Vetor b:\n", b)
print("Solução y:\n", y)

S_sure:
 ['S1']
S_may:
 ['S1', 'I', 'S4']
S_never:
 ['S5', 'S0', 'S3', 'S2']
Matriz A:
 [[0.5 0.  0. ]
 [0.  0.  0.5]
 [0.5 0.  0. ]]
Vetor b:
 [0.5 0.  0.5]
Solução y:
 [1.  0.5 1. ]


# Solving system for a MDP (Markovian Decision Process, avec actions)

In [21]:
# Coeficientes da função objetivo: probabilidades de alcançar W (minimizar)
c = [0.5, 0, 1.0, 0]  # Probabilidades de S0->W e S2->W

# Restrições: garantir a seleção de ao menos uma ação para cada estado relevante
# Como são ações binárias, cada ação é ou não escolhida (0 ou 1)
A = [
    [-1, -1, 0, 0],  # Para garantir que pelo menos uma ação seja selecionada de S0
    [0, 0, -1, -1]   # Para garantir que pelo menos uma ação seja selecionada de S2
]
b = [-1, -1]  # Pelo menos uma ação deve ser escolhida de cada

# Limites para as variáveis de decisão
x_bounds = [(0, 1) for _ in range(len(c))]  # Ações binárias

# Solução do problema de otimização
result = linprog(c, A_ub=A, b_ub=b, bounds=x_bounds, method='highs')

print("Resultado da Otimização:", result)


Resultado da Otimização:         message: Optimization terminated successfully. (HiGHS Status 7: Optimal)
        success: True
         status: 0
            fun: 0.0
              x: [ 0.000e+00  1.000e+00  0.000e+00  1.000e+00]
            nit: 0
          lower:  residual: [ 0.000e+00  1.000e+00  0.000e+00  1.000e+00]
                 marginals: [ 5.000e-01  0.000e+00  1.000e+00  0.000e+00]
          upper:  residual: [ 1.000e+00  0.000e+00  1.000e+00  0.000e+00]
                 marginals: [ 0.000e+00  0.000e+00  0.000e+00  0.000e+00]
          eqlin:  residual: []
                 marginals: []
        ineqlin:  residual: [ 0.000e+00  0.000e+00]
                 marginals: [-0.000e+00 -0.000e+00]
 mip_node_count: 0
 mip_dual_bound: 0.0
        mip_gap: 0.0


In [105]:
def solve_system(df, S_may, S_sure):
    # Initialize lists for inequality and equality constraints
    A_ub, b_ub, A_eq, b_eq = [], [], [], []
    
    # Iterate over source states in S_may
    for source_state in S_may:
        # Check if there is only one transition from the source state
        if len(df.loc[df['Origin'] == source_state]) == 1:
            # If only one transition, add it as an equality constraint
            A_eq.append(df.loc[df['Origin'] == source_state, S_may].values)
            b_eq.append(np.sum(df.loc[df['Origin'] == source_state, S_sure].values, axis=1))
        else:
            # If multiple transitions, add them as inequality constraints
            for i in range(len(df.loc[df['Origin'] == source_state, S_may].values)):
                A_ub.append(df.loc[df['Origin'] == source_state, S_may].values[i])
                b_ub.append(1 - np.sum(df.loc[df['Origin'] == source_state, S_sure].values, axis=1)[i])
    
    c = np.zeros(len(S_may))
    
    A_ub, b_ub, A_eq, b_eq = [None if not v else v for v in [A_ub, b_ub, A_eq, b_eq]]
    A_eq, B_eq = [np.vstack(m) if m is not None else None for m in [A_eq, A_ub]]
    
    # Solve the linear programming problem
    print(f"A_ub: {A_ub}\n")
    print(f"b_ub: {b_ub}\n")
    print(f"A_eq: {A_eq}\n")
    print(f"b_eb: {b_eq}\n")
    print(f"c: {c}")

    res = linprog(c=c, A_ub=A_ub, b_ub=b_ub, A_eq=A_eq, b_eq=b_eq)

    return res

# Não foi testado ainda! Verificar numericamente abaixo

In [109]:
printer = run_mdp(path = "mdp_examples\\lancer_de_pieces.mdp", return_printer=True)

ANTLR runtime and generated code versions disagree: 4.11.1!=4.13.1
ANTLR runtime and generated code versions disagree: 4.11.1!=4.13.1
Initialy declared states: ['I', 'S0', 'S1', 'S2', 'S3', 'S4', 'S5', 'F1', 'F2', 'F3', 'F4', 'F5', 'F6']
Initialy declared actions: ['NA']
Transition from I with no action and targets ['S0', 'S1'] with weights [1, 1]
Transition from S0 with no action and targets ['S2', 'S3'] with weights [1, 1]
Transition from S1 with no action and targets ['S4', 'S5'] with weights [1, 1]
Transition from S2 with no action and targets ['F1', 'S0'] with weights [1, 1]
Transition from S3 with no action and targets ['F2', 'F3'] with weights [1, 1]
Transition from S4 with no action and targets ['S1', 'F4'] with weights [1, 1]
Transition from S5 with no action and targets ['F5', 'F6'] with weights [1, 1]

( 0 ) - State I reward wasn't assigned, using zero as reward
( 1 ) - State S0 reward wasn't assigned, using zero as reward
( 2 ) - State S1 reward wasn't assigned, using zero 

In [108]:
df = printer.transactions_prob

S_sure, S_may, S_never = find_states(df, 'S1')
S_sure, sorted(S_may), sorted(S_never)

print("S_sure:\n", S_sure)
print("S_may:\n", S_may)
print("S_never:\n", S_never)

solve_system(df, S_may, S_sure)

S_sure:
 ['S1']
S_may:
 ['S1', 'I', 'S4']
S_never:
 ['S5', 'S0', 'S3', 'S2']
A_ub: None

b_ub: None

A_eq: [[0.  0.  0.5]
 [0.5 0.  0. ]
 [0.5 0.  0. ]]

b_eb: [array([0.]), array([0.5]), array([0.5])]

c: [0. 0. 0.]


        message: Optimization terminated successfully. (HiGHS Status 7: Optimal)
        success: True
         status: 0
            fun: 0.0
              x: [ 1.000e+00  0.000e+00  0.000e+00]
            nit: 0
          lower:  residual: [ 1.000e+00  0.000e+00  0.000e+00]
                 marginals: [ 0.000e+00  0.000e+00  0.000e+00]
          upper:  residual: [       inf        inf        inf]
                 marginals: [ 0.000e+00  0.000e+00  0.000e+00]
          eqlin:  residual: [ 0.000e+00  0.000e+00  0.000e+00]
                 marginals: [-0.000e+00 -0.000e+00 -0.000e+00]
        ineqlin:  residual: []
                 marginals: []
 mip_node_count: 0
 mip_dual_bound: 0.0
        mip_gap: 0.0