In [1]:
from matplotlib import animation, patches
from IPython.display import display_html
from matplotlib import pyplot as plt
from itertools import product
from math import sin, cos
from tqdm import tqdm
import pandas as pd
import numpy as np
from z3 import *

pd.options.display.float_format = '{:,.2f}'.format
%config InlineBackend.figure_format = "svg"
%matplotlib notebook

### Constantes globais

In [2]:
V_LOW, V_HIGH = 1, 10 # velocidades dos modos baixo e alto, em m
THETA = 15 # angulo de viragem entre modos
TAU = 10 # tempo minimo entre transicoes timed
ALL_ROUTES = [i*THETA for i in range(int(360/THETA))] # todos os ângulos possíveis
ALL_VELS = [V_LOW, V_HIGH] # todas as velocidades possiveis

GAUSS_DP = 500 # desvio padrão da gaussiana de inicialização das posições em m
SAFE_DIST = 100 # distância de segurança entre barcos em m

Mode, (M_LOW, M_HIGH) = EnumSort("Mode", ("V_LOW", "V_HIGH"))

### Funções auxiliares

In [3]:
deg_to_rad = lambda a: a * np.pi / 180
rad_to_deg = lambda a: a * 180 / np.pi
z3tofloat = lambda v: float(v.numerator_as_long())/float(v.denominator_as_long())
Abs = lambda x: If(x>=0, x, -x)
val_angle = lambda a: If(a<0, 360+a, If(a>=360, a-360, a))
vel_to_mode = lambda v: M_LOW if (v == V_LOW) else M_HIGH
mode_to_vel = lambda m: V_LOW if (m == M_LOW) else V_HIGH

In [4]:
def model_to_dfs(m):
    m_ = {str(elem): m[elem] for elem in m}
    sortedkeys = sorted(m_, key=str.lower)

    prop_names = sorted(list(set([str(elem)[:-1] for elem in sortedkeys])))
    df_dict = {p: [] for p in prop_names}

    for col in df_dict:
        for elem in sortedkeys:
            if col in elem:
                if m_[elem].sort() == RealSort():
                    value = z3tofloat(m_[elem])
                else:
                    value = m_[elem]
                df_dict[col].append(value)

    df_dict = {i: {elem: df_dict[elem] for elem in df_dict if f"b{i}" in elem} for i in range(3)}

    dfs = [pd.DataFrame(df_dict[i]) for i in range(len(df_dict))]
        
    return dfs
    
def display_side_by_side(*args):
    html_str=''
    for df in args:
        html_str+=df.to_html()
    display_html(html_str.replace('table','table style="display:inline"'),raw=True)

## Funções de declaração, inicialização e transição

In [5]:
def declare(t, fixed_dist=None, num_boats=3):
    # Iterar os barcos
    trace = {}
    for i in range(num_boats):
        trace[i] = {}
        
        # Variáveis de modo
        trace[i]["v"] = Const(f"b{i}_v{t}", Mode)
        trace[i]["a"] = Int(f"b{i}_a{t}")
        
        # Variáveis de estado
        trace[i]["x"] = Real(f"b{i}_x{t}")
        trace[i]["y"] = Real(f"b{i}_y{t}")
        trace[i]["t"] = Real(f"b{i}_t{t}")
        
        # Flag de transicao
        trace[i]["f"] = Bool(f"b{i}_f{t}")
        
    # Distancia de seguranca
    if fixed_dist == None:
        trace["d"] = Int(f"d{t}")
    else:
        trace["d"] = fixed_dist
        
    return trace
        
        
def init(tr, random_values):
    # Gerar ângulos iniciais aleatórios
    random_angles = ALL_ROUTES
    
    # Iterar os barcos
    r = []
    for i in range(len(tr)-1):
        
        # Condições do modo VERIFICACAO MODO VELOCIDADE INIT
        r.append(tr[i]["v"] == M_HIGH)
        r.append(tr[i]["a"] == random_values[i][0])
        # r.append(tr[i]["a"] == int((210 + 120*i)%360))
        
        # Condições do estado
        r.append(tr[i]["x"] == random_values[i][1])
        r.append(tr[i]["y"] == random_values[i][2])
        # r.append(tr[i]["x"] == 60 * cos(deg_to_rad(30 + 120*i)))
        # r.append(tr[i]["y"] == 60 * sin(deg_to_rad(30 + 120*i)))
        r.append(tr[i]["t"] == 0)
        
        # Condicao da flag
        r.append(tr[i]["f"] == False)
        
    # Condicao da distancia de seguranca
    if type(tr["d"]) != int:
        r.append(And(tr["d"] > 0, tr["d"] < 100))
        
    r = And(r)
    
    return r
        
    
def timed(prev, curr, fixed_step=False):
    modes = list(product(*[ALL_VELS, ALL_ROUTES]))
    
    # Iterar cada um dos barcos
    r = []
    for i in range(len(prev)-1):
        
        # Condição do tempo
        if fixed_step:
            r.append(curr[i]["t"] - prev[i]["t"] == TAU)
        else:
            r.append(curr[i]["t"] - prev[i]["t"] > TAU)
        
        # Condição da rota e posição
        routes_cond = []
        for j in range(len(modes)):
            route_cond = []
            
            # Condição dos modos
            v, a = modes[j]
            route_cond.append(And(prev[i]["a"] == a, prev[i]["v"] == vel_to_mode(v)))
            route_cond.append(And(curr[i]["a"] == a, curr[i]["v"] == vel_to_mode(v)))
            
            # Incremento de posição
            dx = v * cos(deg_to_rad(a)) * (curr[i]["t"] - prev[i]["t"])
            dy = v * sin(deg_to_rad(a)) * (curr[i]["t"] - prev[i]["t"])
            
            # Condição da posição
            route_cond.append(curr[i]["x"] == prev[i]["x"] + dx)
            route_cond.append(curr[i]["y"] == prev[i]["y"] + dy)
            
            # Fazer o And de todas as condições
            routes_cond.append(And(route_cond))
            
        # Caso esteja com V_HIGH e a uma distancia de perigo
        r.append(Not(And(prev[i]["v"] == M_HIGH, Not(switch_safe(prev, i)))))
        
        # Caso esteja com V_LOW e a uma distancia segura
        r.append(Not(And(prev[i]["v"] == M_LOW, switch_safe(prev, i))))
        
        # Adicionar uma de todas as rotas possíveis
        r.append(Or(routes_cond))
        
    r = And(r)
        
    return r


def switch_safe(tr, boat_id, dist=None):
    r = []
    
    if dist == None:
        dist = tr["d"]
    
    # Iterar cada um dos barcos
    for i in range(len(tr)-1):
        if i != boat_id:
            # Condição das distâncias
            x = Abs(tr[i]["x"]-tr[boat_id]["x"]) > dist
            y = Abs(tr[i]["y"]-tr[boat_id]["y"]) > dist
            
            t_conds = []
            all_vels = list(product(*[ALL_VELS, ALL_VELS]))
            for v1, v2 in all_vels:
                
                # Condição do tempo
                m1, m2 = vel_to_mode(v1), vel_to_mode(v2)
                t1 = And(m1 == tr[boat_id]["v"], m2 == tr[i]["v"])
                t2 = Abs(tr[i]["t"]-tr[boat_id]["t"])*(v1+v2)/2 > dist
                
                t_conds.append(And(t1, t2))
            
            t = Or(t_conds)
            
            # Verificar se uma das distâncias é superior à distância de segurança
            r.append(Or(x, y, t))
            
    r = And(r)
            
    return r


def untimed(prev, curr):
    r = []
    
    # Iterar cada um dos barcos
    for i in range(len(prev)-1):
        
        # Condições da posição e tempo
        r.append(curr[i]["x"] == prev[i]["x"])
        r.append(curr[i]["y"] == prev[i]["y"])
        r.append(curr[i]["t"] == prev[i]["t"])

        # barco V_LOW transita para V_LOW
        low_low = []
        low_low.append(And(prev[i]["v"] == M_LOW, curr[i]["v"] == M_LOW))
        low_low.append(Not(switch_safe(prev, i)))
        low_low.append(Or(curr[i]["a"] == val_angle(prev[i]["a"]+THETA), curr[i]["a"] == val_angle(prev[i]["a"]-THETA)))
        low_low = And(low_low)

        # barco V_LOW transita para barco V_HIGH
        low_high = []
        low_high.append(And(prev[i]["v"] == M_LOW, curr[i]["v"] == M_HIGH))
        low_high.append(switch_safe(prev, i))
        low_high.append(curr[i]["a"] == prev[i]["a"])
        low_high = And(low_high)

        # barco V_HIGH transita para barco V_LOW
        high_low = []
        high_low.append(And(prev[i]["v"] == M_HIGH, curr[i]["v"] == M_LOW))
        high_low.append(Not(switch_safe(prev, i)))
        high_low.append(Or(curr[i]["a"] == val_angle(prev[i]["a"]+THETA), curr[i]["a"] == val_angle(prev[i]["a"]-THETA)))
        high_low = And(high_low)

        # barco V_HIGH transita para barco V_HIGH
        high_high = []
        high_high.append(And(prev[i]["v"] == M_HIGH, curr[i]["v"] == M_HIGH))
        high_high.append(switch_safe(prev, i))
        high_high.append(curr[i]["a"] == prev[i]["a"])
        high_high = And(high_high)

        # Adicionar uma destas possíveis transições
        r.append(And(Or(low_low, low_high, high_low, high_high), Not(prev[i]["f"])))
        
    # O estado não pode ficar igual
    same = []
    for i in range(len(prev)-1):
        same.append(prev[i]["v"] == curr[i]["v"])
        same.append(prev[i]["a"] == curr[i]["a"])
        same.append(prev[i]["x"] == curr[i]["x"])
        same.append(prev[i]["y"] == curr[i]["y"])
    same = Not(And(same))

    # Todas as condições da transição devem ser cumpridas
    r = And(And(r), same)
    
    return r


def trans(prev, curr, fixed_step=False):
    # Condições timed e untimed
    untimed_cond = untimed(prev, curr)
    timed_cond = timed(prev, curr, fixed_step)
    
    # Condições de sincronismo
    eq_cond = And([curr[i]["t"] == curr[i+1]["t"] for i in range(len(curr)-2)])
    
    # Condicao da manutencao da distancia de seguranca
    d_cond = prev["d"] == curr["d"]
    
    # Condicao da evolucao da flag
    f_cond = []
    for i in range(len(prev)-1):
        v_cond = prev[i]["v"] != curr[i]["v"]
        a_cond = prev[i]["a"] != curr[i]["a"]
        f_cond.append(If(Or(v_cond, a_cond), curr[i]["f"], Not(curr[i]["f"])))
    f_cond = And(f_cond)
    
    r = And(Or(untimed_cond, timed_cond), eq_cond, d_cond, f_cond)
    
    return r

In [6]:
def get_random_values(num_boats=3):
    angs = [ALL_ROUTES[np.random.randint(len(ALL_ROUTES))] for i in range(num_boats)]
    xs = [GAUSS_DP * np.random.randn() for _ in range(num_boats)]
    ys = [GAUSS_DP * np.random.randn() for _ in range(num_boats)]
    r = [(angs[i], xs[i], ys[i]) for i in range(num_boats)]
    
    return r

### Geracao do traco

In [7]:
def gen_trace(declare, init, trans, k, fixed_step=False, fixed_dist=None):
    solver = Solver()
    trace = {i: declare(i, fixed_dist) for i in range(k)}
    solver.add(init(trace[0], get_random_values()))
    
    for i in range(k-1):
        solver.add(trans(trace[i], trace[i+1], fixed_step))
        
    if solver.check() == sat:
        m = solver.model()
        
        for elem in m:
            if str(elem) == "b0_d":
                print(f"Safety distance = {m[elem]}\n")
        
        r = model_to_dfs(m)
        display_side_by_side(*r)
    else:
        r = None
        
    return r

m = gen_trace(declare, init, trans, 10, True, SAFE_DIST)

Unnamed: 0,b0_a,b0_f,b0_t,b0_v,b0_x,b0_y
0,60,False,0.0,V_HIGH,529.83,-701.42
1,60,False,10.0,V_HIGH,579.83,-614.81
2,60,False,20.0,V_HIGH,629.83,-528.21
3,60,False,30.0,V_HIGH,679.83,-441.61
4,60,False,40.0,V_HIGH,729.83,-355.01
5,60,False,50.0,V_HIGH,779.83,-268.4
6,60,False,60.0,V_HIGH,829.83,-181.8
7,60,False,70.0,V_HIGH,879.83,-95.2
8,60,False,80.0,V_HIGH,929.83,-8.6
9,60,False,90.0,V_HIGH,979.83,78.01

Unnamed: 0,b1_a,b1_f,b1_t,b1_v,b1_x,b1_y
0,210,False,0.0,V_HIGH,-91.18,515.32
1,210,False,10.0,V_HIGH,-177.78,465.32
2,210,False,20.0,V_HIGH,-264.38,415.32
3,210,False,30.0,V_HIGH,-350.99,365.32
4,210,False,40.0,V_HIGH,-437.59,315.32
5,210,False,50.0,V_HIGH,-524.19,265.32
6,210,False,60.0,V_HIGH,-610.79,215.32
7,210,False,70.0,V_HIGH,-697.4,165.32
8,210,False,80.0,V_HIGH,-784.0,115.32
9,210,False,90.0,V_HIGH,-870.6,65.32

Unnamed: 0,b2_a,b2_f,b2_t,b2_v,b2_x,b2_y
0,210,False,0.0,V_HIGH,569.26,-196.73
1,210,False,10.0,V_HIGH,482.66,-246.73
2,210,False,20.0,V_HIGH,396.06,-296.73
3,210,False,30.0,V_HIGH,309.45,-346.73
4,210,False,40.0,V_HIGH,222.85,-396.73
5,210,False,50.0,V_HIGH,136.25,-446.73
6,210,False,60.0,V_HIGH,49.65,-496.73
7,210,False,70.0,V_HIGH,-36.96,-546.73
8,210,False,80.0,V_HIGH,-123.56,-596.73
9,210,False,90.0,V_HIGH,-210.16,-646.73


### Verificar se os barcos viajam sempre sem colisoes

In [8]:
def no_collisions(trace):
    
    safe = []
    for i in range(len(trace)-1):
        safe.append(switch_safe(trace, i, 20))
        
    return And(safe)

def bmc_always(inv, K, fixed_dist=None):
    
    random_values = get_random_values()
    
    for k in tqdm(range(1,K+1), total=K, desc="Checking Traces"):
        solver = Solver()
        trace = {i: declare(i, fixed_dist) for i in range(k)}
        solver.add(init(trace[0], random_values))

        for i in range(1,k):
            solver.add(trans(trace[i-1], trace[i]))
        
        solver.add(Not(inv(trace[k-1])))
        
        if solver.check() == sat:
            print("Counter Example:")
            
            m = solver.model()
            r = model_to_dfs(m)
            display_side_by_side(*r)
            
            return
        
    print (f"Property is valid up to traces of length {K}")


bmc_always(no_collisions, 10, 1000)

Checking Traces:  40%|████      | 4/10 [00:01<00:02,  2.49it/s]

Counter Example:


Unnamed: 0,b0_a,b0_f,b0_t,b0_v,b0_x,b0_y
0,60,False,0.0,V_HIGH,133.16,-470.9
1,45,True,0.0,V_LOW,133.16,-470.9
2,45,False,1905.75,V_LOW,1480.73,876.67
3,60,True,1905.75,V_LOW,1480.73,876.67
4,60,False,2186.29,V_LOW,1621.0,1119.62

Unnamed: 0,b1_a,b1_f,b1_t,b1_v,b1_x,b1_y
0,120,False,0.0,V_HIGH,-484.9,675.96
1,135,True,0.0,V_LOW,-484.9,675.96
2,135,False,1905.75,V_LOW,-1832.47,2023.53
3,135,True,1905.75,V_HIGH,-1832.47,2023.53
4,135,False,2186.29,V_HIGH,-3816.13,4007.19

Unnamed: 0,b2_a,b2_f,b2_t,b2_v,b2_x,b2_y
0,15,False,0.0,V_HIGH,-247.8,-30.62
1,30,True,0.0,V_LOW,-247.8,-30.62
2,30,False,1905.75,V_LOW,1402.63,922.26
3,45,True,1905.75,V_LOW,1402.63,922.26
4,45,False,2186.29,V_LOW,1601.0,1120.62


Checking Traces:  40%|████      | 4/10 [00:05<00:08,  1.38s/it]


### Verificar se podem nao ocorrer colisoes

In [9]:
def bmc_always(k, fixed_dist=None):
    solver = Solver()
    trace = {i: declare(i, fixed_dist) for i in range(k)}
    solver.add(init(trace[0], get_random_values()))
    
    # Condicoes de transicao
    for i in range(k-1):
        solver.add(trans(trace[i], trace[i+1]))
        
    # Condicao de nao colisao
    for i in range(k):
        solver.add(no_collisions(trace[i]))
        
    if solver.check() == sat:
        m = solver.model()
        
        print("No collision solution")
        
        for elem in m:
            if str(elem) == "b0_d":
                print(f"Safety distance = {m[elem]}")
        
        r = model_to_dfs(m)
        display_side_by_side(*r)
    else:
        r = None
        
    return r

m = bmc_always(10)

No collision solution


Unnamed: 0,b0_a,b0_f,b0_t,b0_v,b0_x,b0_y
0,105,False,0.0,V_HIGH,-895.81,-146.02
1,105,False,10.25,V_HIGH,-922.34,-47.01
2,105,False,20.5,V_HIGH,-948.87,52.0
3,105,False,30.75,V_HIGH,-975.4,151.0
4,105,False,41.0,V_HIGH,-1001.93,250.01
5,105,False,51.25,V_HIGH,-1028.46,349.02
6,105,False,91.15,V_HIGH,-1131.73,734.45
7,105,False,101.4,V_HIGH,-1158.26,833.46
8,105,False,111.65,V_HIGH,-1184.79,932.47
9,105,False,121.9,V_HIGH,-1211.32,1031.48

Unnamed: 0,b1_a,b1_f,b1_t,b1_v,b1_x,b1_y
0,300,False,0.0,V_HIGH,302.13,-376.93
1,300,False,10.25,V_HIGH,353.38,-465.69
2,300,False,20.5,V_HIGH,404.63,-554.46
3,300,False,30.75,V_HIGH,455.88,-643.23
4,300,False,41.0,V_HIGH,507.13,-732.0
5,300,False,51.25,V_HIGH,558.38,-820.76
6,300,False,91.15,V_HIGH,757.9,-1166.34
7,300,False,101.4,V_HIGH,809.15,-1255.11
8,300,False,111.65,V_HIGH,860.4,-1343.87
9,300,False,121.9,V_HIGH,911.65,-1432.64

Unnamed: 0,b2_a,b2_f,b2_t,b2_v,b2_x,b2_y
0,30,False,0.0,V_HIGH,406.21,258.44
1,30,False,10.25,V_HIGH,494.98,309.69
2,30,False,20.5,V_HIGH,583.75,360.94
3,30,False,30.75,V_HIGH,672.52,412.19
4,30,False,41.0,V_HIGH,761.29,463.44
5,30,False,51.25,V_HIGH,850.05,514.69
6,30,False,91.15,V_HIGH,1195.63,714.2
7,30,False,101.4,V_HIGH,1284.39,765.45
8,30,False,111.65,V_HIGH,1373.16,816.7
9,30,False,121.9,V_HIGH,1461.93,867.95
