## Lógica Computacional: 25/26
---
## TP4

$Grupo$ $05$ 

*   Vasco Ferreira Leite (A108399)
*   Gustavo da Silva Faria (A108575)
*   Afonso Henrique Cerqueira Leal (A108472)
---

Neste notebook modela-se um sistema híbrido de controlo de tráfego marítimo num canal estreito, seguindo o enunciado do Trabalho Prático 4. O objetivo é verificar propriedades de segurança (suficiente e forte) usando *Bounded Model Checking* com o solver SMT Z3.

## Importação de bibliotecas

In [None]:
from z3 import *
import time

## Configuração global do sistema

Definem-se constantes físicas, passo temporal, zonas do canal e mapas de adjacência para os dois navios (A → B e B → A).

In [None]:
from z3 import *
import time

SIGMA = 0.5
DT = 0.25       
LIMIT_Z = 1.0   

ZONAS_ACEL_A = {11, 13, 2, 4}
ZONAS_DECEL_A = {1, 3, 12, 14}
ZONAS_ACEL_B = {1, 3, 12, 14}
ZONAS_DECEL_B = {11, 13, 2, 4}

ADJ_A = {
    11: [5, 7], 13: [9, 7],
    5: [1], 7: [1, 3], 9: [3],
    1: [0], 3: [0],        
    0: [2, 4],             
    2: [6, 8], 4: [10, 8],
    6: [12], 8: [12, 14], 10: [14],
    12: [99], 14: [99], 99: [99]
}

ADJ_B = {
    12: [6, 8], 14: [10, 8],
    6: [2], 8: [2, 4], 10: [4],
    2: [0], 4: [0],        
    0: [1, 3],             
    1: [5, 7], 3: [9, 7],
    5: [11], 7: [11, 13], 9: [13],
    11: [98], 13: [98], 98: [98]
}

## Definição do estado do sistema

Cada estado inclui o sector discreto, posição contínua, velocidade e uma flag que indica se o navio foi forçado a esperar.

In [None]:
def declare_state(i):
    """Cria variáveis simbólicas para o passo i."""
    s = {}
    # Navio A
    s['sA'] = Int(f'sA_{i}')
    s['zA'] = Real(f'zA_{i}')
    s['vA'] = Real(f'vA_{i}')
    s['waitA'] = Bool(f'waitA_{i}') 

    # Navio B
    s['sB'] = Int(f'sB_{i}')
    s['zB'] = Real(f'zB_{i}')
    s['vB'] = Real(f'vB_{i}')
    s['waitB'] = Bool(f'waitB_{i}') 
    
    return s

def init(s):
    """Estado Inicial (t=0)."""
    return And(
        # A começa no topo esquerdo
        s['sA'] == 11, s['zA'] == 0.0, s['vA'] == 0.6, s['waitA'] == False,
        # B começa no fundo direito
        s['sB'] == 14, s['zB'] == 0.0, s['vB'] == 0.0, s['waitB'] == False
    )



## Dinâmica e lógica de transição

Modela-se a evolução contínua (movimento dentro do sector) e discreta (transição entre sectores), incluindo a lógica de exclusão mútua que abstrai o semáforo.

In [None]:
def get_gamma_A(s):
    return If(s == 0, 0.2,
           If(Or([s == z for z in ZONAS_ACEL_A]), 1.0,
           If(Or([s == z for z in ZONAS_DECEL_A]), 0.0,
           0.5)))

def get_gamma_B(s):
    return If(s == 0, 0.2,
           If(Or([s == z for z in ZONAS_ACEL_B]), 1.0,
           If(Or([s == z for z in ZONAS_DECEL_B]), 0.0,
           0.5)))

def physics_v(v, gamma):
    return v + (gamma - SIGMA * v) * DT

def trans_navio(s_curr, z_curr, v_curr, s_next, z_next, v_next, wait_next, 
                adj, get_gamma, other_s_curr, other_s_next):
    """
    Gera a lógica de transição para UM navio.
    IMPORTANTE: Recebe 'other_s_next' para resolver conflitos de entrada simultânea.
    """
    
    cond_flow = z_curr < LIMIT_Z
    gamma = get_gamma(s_curr)
    
    logic_flow = And(
        cond_flow,
        s_next == s_curr,
        z_next == z_curr + v_curr * DT,
        v_next == physics_v(v_curr, gamma),
        wait_next == False
    )

    cond_jump = z_curr >= LIMIT_Z
    
    possible_jumps = []
    
    dests = []

    for src, targets in adj.items():
        
        choices = []
        for dst in targets:
            
            can_enter = And(other_s_curr != dst, other_s_next != dst)
            
            enter = And(
                can_enter,
                s_next == dst,
                z_next == 0.0,
                v_next == v_curr,
                wait_next == False
            )
            choices.append(enter)
            
        wait_logic = And(
            s_next == s_curr,
            z_next == z_curr, 
            v_next == If(v_curr > 0, v_curr - (SIGMA * v_curr)*DT, 0.0),
            wait_next == True 
        )
        choices.append(wait_logic)
        
        possible_jumps.append(Implies(s_curr == src, Or(choices)))

    finished = Or(s_curr == 99, s_curr == 98)
    logic_finish = And(
        finished, 
        s_next == s_curr, 
        v_next == 0.0, 
        wait_next == False
    )

    logic_jump = And(cond_jump, If(finished, logic_finish, And(possible_jumps)))

    return If(cond_flow, logic_flow, logic_jump)


def trans(curr, nxt):
    """Transição Global do Sistema"""
    
    t_A = trans_navio(
        curr['sA'], curr['zA'], curr['vA'],
        nxt['sA'], nxt['zA'], nxt['vA'], nxt['waitA'],
        ADJ_A, get_gamma_A, curr['sB'], nxt['sB']
    )
    
    t_B = trans_navio(
        curr['sB'], curr['zB'], curr['vB'],
        nxt['sB'], nxt['zB'], nxt['vB'], nxt['waitB'],
        ADJ_B, get_gamma_B, curr['sA'], nxt['sA']
    )
    
    return And(t_A, t_B)



## Verificação por Bounded Model Checking (BMC)

Define-se a propriedade de insegurança e verifica-se se existe um contra-exemplo até um limite finito k.

In [None]:
def run_bmc(k, check_type="sufficient"):
    print(f"\n--- Iniciando BMC (k={k}) | Modo: {check_type.upper()} SAFETY ---")
    solver = Solver()
    
    states = [declare_state(i) for i in range(k + 1)]
    
    solver.add(init(states[0]))
    
    for i in range(k):
        solver.add(trans(states[i], states[i+1]))
        
    unsafe_prop = False
    
    if check_type == "sufficient":
        collision = Or([
            And(states[i]['sA'] == states[i]['sB'], 
                states[i]['sA'] != 99, states[i]['sA'] != 98)
            for i in range(k + 1)
        ])
        unsafe_prop = collision
        
    elif check_type == "strong":
        waited = Or([
            Or(states[i]['waitA'], states[i]['waitB'])
            for i in range(1, k + 1)
        ])
        unsafe_prop = waited

    solver.add(unsafe_prop)
    
    start_time = time.time()
    result = solver.check()
    duration = time.time() - start_time
    
    if result == sat:
        print(f"Resultado: UNSAFE (Falha encontrada em {duration:.2f}s)")
        print(f"Contra-exemplo encontrado para '{check_type} safety':")
        m = solver.model()
        for i in range(k + 1):
            sa = m[states[i]['sA']]
            sb = m[states[i]['sB']]
            wa = is_true(m[states[i]['waitA']])
            wb = is_true(m[states[i]['waitB']])
            
            wa_str = " [WAIT]" if wa else ""
            wb_str = " [WAIT]" if wb else ""
            
            if i == 0 or \
               str(sa) != str(m[states[i-1]['sA']]) or \
               str(sb) != str(m[states[i-1]['sB']]) or wa or wb:
                print(f"Step {i:02}: A no s{sa}{wa_str} | B no s{sb}{wb_str}")
        return False
    else:
        print(f"Resultado: SAFE (Nenhuma falha até k={k} em {duration:.2f}s)")
        return True



## Execução dos testes

Executa-se o BMC para segurança suficiente e segurança forte.

In [None]:
if __name__ == "__main__":
    run_bmc(k=30, check_type="sufficient")
    run_bmc(k=30, check_type="strong")