# Learning Bisimulation Quotients


This notebook comprises the full implementation of the algorithms presented in the dissertation **Learning Bisimulation Quotients**. The dissertation submitted in Trinity Term 2023 at the University of Oxford, for the degree **Master of Science in Advanced Computer Science**. 

The notebook contains the implementation of our CEGIS approach for the synthesis of bisimulation quotients, the extraction of subquotients and the extraction of polytopic regions for which the obtained abstractions are exact.


### Imports

In [None]:
from z3 import *
import numpy as np
import networkx as nx
from utils import decimal, extract_quotient

import matplotlib.pyplot as plt
from matplotlib.colors import ListedColormap 

import time

#### SMT Variables

In [None]:
x = Real("x")
y = Real("y")

x1 = Real("x1")
y1 = Real("y1")

x2 = Real("x2")
y2 = Real("y2")

p = Int("p")
q = Int("q")

### Model Definitions

In [None]:
# # Dynamics Piece-Wise constant
# def successor(x):
#     x[0] = If(x[1] > 1, x[0] - 1., x[0])
#     x[1] = If(x[1] <= 1, x[1] + 1., x[1])
#     return x

# Dynamics constant
def successor(x): 
    x[0] = x[0] - 1.
    return x

# # Dynamics 4-state HA cycle
# def successor(x):
#     res = list(x)
#     res[0] = If(And(x[0] >= 0, x[1] >= 0), x[0] - 1., If(And(x[0] < 0, x[1] < 0), x[0] + 1., x[0]))
#     res[1] = If(And(x[0] < 0, x[1] >= 0), x[1] - 1., If(And(x[0] >= 0, x[1] < 0), x[1] + 1., x[1]))

#     return res

# # Dynamics 90 degree rotation
# def successor(x): 
#     rot_mat = np.array([[0, -1],
#                         [1, 0]])
#     x = np.matmul(rot_mat, x)
#     return x

# # Dynamics 45 degree rotation
# def simpl(a):
#     dec = simplify(a).as_decimal(10)
#     if dec[-1] == "?":
#         return RealVal(float(dec[:-1]))
#     else: 
#         return RealVal(float(dec))
    
# def successor(x): 
#     pi_4 = decimal(Sqrt(2) / 2)
#     rot_mat = np.array([[pi_4, -pi_4],
#                         [pi_4, pi_4]])
#     x = np.matmul(rot_mat, x)
#     return x

# # Dynamics Piece-Wise constant
# def successor(x):
#     x[0] = If(x[1] <= 0, x[0] + 1., x[0] - 1.)
#     return x

# # Discrete Timed Automaton, without reset and 
# def successor(x):
#     x[0] = x[0] + 1
#     x[1] = x[1] + 1
#     return x


num_params = 5 # Decision boundary parameters (right side of poly-hedral constraint)
num_coefficients = 6 # Polyhedral Coefficients

# Model
partitions = [IntVal(i) for i in range(num_params)]

# Domain (State Space)
lower_x = 0
upper_x = 3
lower_y = 0
upper_y = 3
def domain(x):
    return And(x[0] <= upper_x, x[0] >= lower_x, x[1] >= lower_y, x[1] <= upper_y)


## Learner

#### Binary Decision Trees

In [None]:
class BDTLeave:
    def __init__(self, outcome):
        self.outcome = outcome

    def formula(self):
        return self.outcome

    def robust_formula(self):
        return self.outcome
    
    def pred(self, x, robust=False):
        return self.outcome.as_long()
    
class BDTNode:
    def __init__(self, condition, param, res_l, res_r, eps):
        self.condition = condition
        self.param = param
        self.res_l = res_l
        self.res_r = res_r
        self.eps = eps
    
    def formula(self):
        return If(self.condition <= self.param, self.res_l.formula(), self.res_r.formula())
    
    def robust_formula(self):
        return If(self.condition <= self.param - self.eps, self.res_l.robust_formula(), 
                  If(self.condition >= self.param + self.eps, self.res_r.robust_formula(), IntVal(-1)))

class BDTNodePoly:
    def __init__(self, coefficients: list, variables: list, param, res_l, res_r, eps):
        self.coeffs = coefficients
        self.vars = variables
        self.param = param
        self.res_l = res_l
        self.res_r = res_r

        self.condition = np.dot(coefficients, variables)

        norm = simplify(Sqrt(np.sum(np.square(self.coeffs))))
        norm = norm.approx() if type(norm) == AlgebraicNumRef else norm

        self.eps = eps * norm

    def formula(self):
        return If(self.condition <= self.param, self.res_l.formula(), self.res_r.formula())
    
    def robust_formula(self):
        return If(self.condition <= self.param - self.eps, self.res_l.robust_formula(), 
                  If(self.condition >= self.param + self.eps, self.res_r.robust_formula(), IntVal(-1)))
    
    def pred(self, x, robust = False):
        if not robust:
            if decimal(np.dot(self.coeffs, x)) <= decimal(self.param):
                return self.res_l.pred(x)
            else:
                return self.res_r.pred(x)
        else:
            if decimal(np.dot(self.coeffs, x)) <= decimal(self.param) - decimal(self.eps):
                return self.res_l.pred(x, robust)
            elif decimal(np.dot(self.coeffs, x)) >= decimal(self.param) + decimal(self.eps):
                return self.res_r.pred(x, robust)
            else:
                return -1
            
class BDTNodePolyEq:
    def __init__(self, coefficients: list, variables: list, param, res_l, res_r, eps):
        self.coeffs = coefficients
        self.vars = variables
        self.param = param
        self.res_l = res_l
        self.res_r = res_r

        self.condition = np.dot(coefficients, variables)

        norm = simplify(Sqrt(np.sum(np.square(self.coeffs))))
        norm = norm.approx() if type(norm) == AlgebraicNumRef else norm

        self.eps = eps * norm

    def formula(self):
        return If(And(self.vars[0] == RealVal(0), self.vars[1] == RealVal(0)) , self.res_l.formula(), self.res_r.formula())
    
    def robust_formula(self):
        return If(self.condition <= self.param - self.eps, self.res_l.robust_formula(), 
                  If(self.condition >= self.param + self.eps, self.res_r.robust_formula(), IntVal(-1)))
    
    def pred(self, x, robust = False):
        if not robust:
            if decimal(x[0]) == 0. and decimal(x[1]) == 0.:
                return self.res_l.pred(x)
            else:
                return self.res_r.pred(x)
        else:
            if decimal(np.dot(self.coeffs, x)) <= decimal(self.param) - decimal(self.eps):
                return self.res_l.pred(x, robust)
            elif decimal(np.dot(self.coeffs, x)) >= decimal(self.param) + decimal(self.eps):
                return self.res_r.pred(x, robust)
            else:
                return -1

#### Learner Parameters

In [None]:
adjacency_params = [[Bool("a_%s_%s" % (i, j)) for j in range(len(partitions))] for i in range(len(partitions))]

model_params = [Real("p_%s" % i) for i in range(num_params)] + [Real("a_%s" % i) for i in range(num_coefficients)]

robustness_vector = [Real("eps_%i" % i) for i in range(num_params + 4)] # + 4 for the 4 domain bounds

#### Learner Models (**BDT Templates**)

In [None]:
###
# Left-Right Systems
###

###
# Single Class observation left, 8 Classes for right side
###
def build_b3(params, x, robustness_vector, robust = False):
        b = BDTNodePoly([RealVal(1),RealVal(0)],x, RealVal(0), 
                        BDTLeave(partitions[0]), 
                        BDTNodePoly([params[num_params],params[num_params+1]],x ,params[0], 
                                BDTNodePoly([params[num_params + 2], params[num_params + 3]],x , params[1], 
                                        BDTNodePoly([params[num_params + 6], params[num_params + 7]],x, params[3],
                                                BDTLeave(partitions[8]), BDTLeave(partitions[1]), robustness_vector[3]),
                                        BDTNodePoly([params[num_params + 8], params[num_params + 9]],x, params[4],
                                                BDTLeave(partitions[2]), BDTLeave(partitions[3]), robustness_vector[4]),
                                        robustness_vector[1]), 
                                BDTNodePoly([params[num_params + 4], params[num_params + 5]],x, params[2],
                                        BDTNodePoly([params[num_params + 10], params[num_params + 11]],x, params[5],
                                                BDTLeave(partitions[4]), BDTLeave(partitions[5]), robustness_vector[5]),
                                        BDTNodePoly([params[num_params + 12], params[num_params + 13]],x, params[6],
                                                BDTLeave(partitions[6]), BDTLeave(partitions[7]), robustness_vector[6]),
                                        robustness_vector[2]),
                                robustness_vector[7]),
                robustness_vector[0])
        if not robust:
               return b.formula(), b
        else:
                return b.robust_formula(), b
        
###
# 2 Classes both sides
###
def build_b4(params, x, robustness_vector, robust = False):
        b = BDTNodePoly([RealVal(1),RealVal(0)],x, RealVal(0), 
                BDTNodePoly([params[num_params+1], params[num_params+2]], x, params[0],
                                BDTLeave(partitions[1]), BDTLeave(partitions[2]), robustness_vector[1]),
                BDTNodePoly([params[num_params+3], params[num_params+4]], x, params[1],
                                BDTLeave(partitions[3]), BDTLeave(partitions[4]), robustness_vector[2]),
                robustness_vector[0])
        if not robust:
               return b.formula(), b
        else:
                return b.robust_formula(), b

###
# 6 Classes on both sides
###
def build_b5(params, x, robustness_vector, robust = False):
        b = BDTNodePoly([RealVal(1),RealVal(0)],x, RealVal(0),  
                        BDTNodePoly([params[num_params + 2], params[num_params + 3]],x , params[1], 
                                BDTNodePoly([params[num_params + 6], params[num_params + 7]],x, params[3],
                                        BDTLeave(partitions[0]), BDTLeave(partitions[1]), robustness_vector[3]),
                                BDTNodePoly([params[num_params + 8], params[num_params + 9]],x, params[4],
                                        BDTNodePoly([params[num_params + 14], params[num_params + 15]],x, params[7],
                                                BDTLeave(partitions[2]), BDTLeave(partitions[3]), robustness_vector[7]),
                                        BDTNodePoly([params[num_params + 16], params[num_params + 17]],x, params[8],
                                                BDTLeave(partitions[8]), BDTLeave(partitions[9]), robustness_vector[8])
                                        ,robustness_vector[4]),
                                robustness_vector[1]), 
                        BDTNodePoly([params[num_params + 4], params[num_params + 5]],x, params[2],
                                BDTNodePoly([params[num_params + 10], params[num_params + 11]],x, params[5],
                                        BDTLeave(partitions[4]), BDTLeave(partitions[5]), robustness_vector[5]),
                                BDTNodePoly([params[num_params + 12], params[num_params + 13]],x, params[6],
                                        BDTNodePoly([params[num_params + 18], params[num_params + 19]],x, params[9],
                                                BDTLeave(partitions[6]), BDTLeave(partitions[7]), robustness_vector[9]),
                                        BDTNodePoly([params[num_params + 20], params[num_params + 21]],x, params[10],
                                                BDTLeave(partitions[10]), BDTLeave(partitions[11]), robustness_vector[10])
                                        , robustness_vector[6]),
                                robustness_vector[2]),
                        robustness_vector[7])
        if not robust:
               return b.formula(), b
        else:
                return b.robust_formula(), b

        
###
# Single Class observation left, 4 Classes for right side
###
def build_b7(params, x, robustness_vector, robust = False):
        b = BDTNodePoly([RealVal(1),RealVal(0)],x, RealVal(0), 
                        BDTLeave(partitions[0]), 
                        BDTNodePoly([params[num_params],params[num_params+1]],x ,params[0], 
                                BDTNodePoly([params[num_params + 2], params[num_params + 3]],x , params[1], 
                                        BDTLeave(partitions[1]), BDTLeave(partitions[2]),    
                                        robustness_vector[1]), 
                                BDTNodePoly([params[num_params + 4], params[num_params + 5]],x, params[2],
                                        BDTLeave(partitions[3]), BDTLeave(partitions[4]),
                                        robustness_vector[2]),
                                robustness_vector[3]),
                robustness_vector[0])
        if not robust:
               return b.formula(), b
        else:
                return b.robust_formula(), b
        
###
# 4 Classes on both sides 
###
def build_b8(params, x, robustness_vector, robust = False):
        b = BDTNodePoly([RealVal(1),RealVal(0)],x, RealVal(0), 
                        BDTNodePoly([params[num_params + 2], params[num_params + 3]],x , params[0], 
                                        BDTNodePoly([params[num_params + 6], params[num_params + 7]],x, params[2],
                                                BDTLeave(partitions[0]), BDTLeave(partitions[1]), robustness_vector[3]),
                                        BDTNodePoly([params[num_params + 8], params[num_params + 9]],x, params[3],
                                                BDTLeave(partitions[2]), BDTLeave(partitions[3]), robustness_vector[4]),
                                        robustness_vector[1]),
                        BDTNodePoly([params[num_params + 4], params[num_params + 5]],x, params[1],
                                BDTNodePoly([params[num_params + 10], params[num_params + 11]],x, params[4],
                                        BDTLeave(partitions[4]), BDTLeave(partitions[5]), robustness_vector[5]),
                                BDTNodePoly([params[num_params + 12], params[num_params + 13]],x, params[5],
                                        BDTLeave(partitions[6]), BDTLeave(partitions[7]), robustness_vector[6]),
                                robustness_vector[2]),
                robustness_vector[0])
        if not robust:
               return b.formula(), b
        else:
                return b.robust_formula(), b
        

###
#  Radial Systems   
###

###
# 4 Classes on both sides and (0,0) as fixed class
###
def build_b6(params, x, robustness_vector, robust = False):
        b = BDTNodePoly([RealVal(1),RealVal(0)],x, RealVal(0), 
                        BDTNodePolyEq([RealVal(1),RealVal(1)],x, RealVal(0), 
                                BDTLeave(partitions[0]), 
                                BDTNodePoly([params[num_params + 2], params[num_params + 3]],x , params[1], 
                                        BDTNodePoly([params[num_params + 6], params[num_params + 7]],x, params[3],
                                                BDTLeave(partitions[8]), BDTLeave(partitions[1]), robustness_vector[3]),
                                        BDTNodePoly([params[num_params + 8], params[num_params + 9]],x, params[4],
                                                BDTLeave(partitions[2]), BDTLeave(partitions[3]), robustness_vector[4]),
                                        robustness_vector[1]),
                                robustness_vector[7]),
                        BDTNodePoly([params[num_params + 4], params[num_params + 5]],x, params[2],
                                BDTNodePoly([params[num_params + 10], params[num_params + 11]],x, params[5],
                                        BDTLeave(partitions[4]), BDTLeave(partitions[5]), robustness_vector[5]),
                                BDTNodePoly([params[num_params + 12], params[num_params + 13]],x, params[6],
                                        BDTLeave(partitions[6]), BDTLeave(partitions[7]), robustness_vector[6]),
                                robustness_vector[2]),
                robustness_vector[0])
        if not robust:
               return b.formula(), b
        else:
                return b.robust_formula(), b
        


###
# Timed Automata-like systems
###

def build_b_ta(params, x, robustness_vector, robust = False):
        b = BDTNodePoly([RealVal(-1),RealVal(0)],x, RealVal(-3), 
                        BDTLeave(partitions[0]), 
                        BDTNodePoly([RealVal(0),RealVal(-1)],x, RealVal(-3), 
                                BDTLeave(partitions[0]), 
                                BDTNodePoly([params[num_params],params[num_params+1]],x ,params[0], 
                                        BDTNodePoly([params[num_params + 2], params[num_params + 3]],x , params[1], 
                                                BDTLeave(partitions[1]), BDTLeave(partitions[2]),    
                                                robustness_vector[1]), 
                                        BDTNodePoly([params[num_params + 4], params[num_params + 5]],x, params[2],
                                                BDTLeave(partitions[3]), BDTLeave(partitions[4]),
                                                robustness_vector[2]),
                                        robustness_vector[3]),
                                robustness_vector[4]),
                robustness_vector[0])
        if not robust:
               return b.formula(), b
        else:
                return b.robust_formula(), b
        
def build_b_ta2(params, x, robustness_vector, robust = False):
        b = BDTNodePoly([RealVal(-1),RealVal(0)],x, RealVal(-3), 
                        BDTLeave(partitions[0]), 
                        BDTNodePoly([RealVal(0),RealVal(-1)],x, RealVal(-3), 
                                BDTLeave(partitions[0]), 
                                BDTNodePoly([params[num_params],params[num_params+1]],x ,params[0], 
                                        BDTNodePoly([params[num_params + 2], params[num_params + 3]],x , params[1], 
                                                BDTNodePoly([params[num_params + 6], params[num_params + 7]],x, params[3],
                                                        BDTLeave(partitions[1]), BDTLeave(partitions[2]), robustness_vector[4]),
                                                BDTNodePoly([params[num_params + 8], params[num_params + 9]],x, params[4],
                                                        BDTLeave(partitions[3]), BDTLeave(partitions[4]), robustness_vector[5]),
                                        robustness_vector[2]), 
                                        BDTNodePoly([params[num_params + 4], params[num_params + 5]],x, params[2],
                                                BDTNodePoly([params[num_params + 10], params[num_params + 11]],x, params[5],
                                                        BDTLeave(partitions[5]), BDTLeave(partitions[6]), robustness_vector[6]),
                                                BDTNodePoly([params[num_params + 12], params[num_params + 13]],x, params[6],
                                                        BDTLeave(partitions[7]), BDTLeave(partitions[8]), robustness_vector[7]),
                                        robustness_vector[3]),
                                robustness_vector[7]),
                        robustness_vector[1]),
                robustness_vector[0])
        if not robust:
               return b.formula(), b
        else:
                return b.robust_formula(), b
        
def build_b_ta3(params, x, robustness_vector, robust = False):
        b = BDTNodePoly([RealVal(-1),RealVal(0)],x, RealVal(-1), 
                        BDTLeave(partitions[0]), 
                        BDTNodePoly([RealVal(1),RealVal(-1)],x, RealVal(0), 
                                BDTLeave(partitions[1]), 
                                BDTNodePoly([params[num_params],params[num_params+1]],x ,params[0], 
                                        BDTNodePoly([params[num_params+2],params[num_params+3]],x ,params[1], 
                                                BDTNodePoly([params[num_params + 3], params[num_params + 4]],x , params[3], 
                                                        BDTNodePoly([params[num_params + 5], params[num_params + 6]],x, params[4],
                                                                BDTLeave(partitions[2]), BDTLeave(partitions[3]), robustness_vector[2]),
                                                        BDTNodePoly([params[num_params + 7], params[num_params + 8]],x, params[5],
                                                                BDTLeave(partitions[4]), BDTLeave(partitions[5]), robustness_vector[3]),
                                                robustness_vector[4]), 
                                                BDTNodePoly([params[num_params + 9], params[num_params + 10]],x, params[6],
                                                        BDTNodePoly([params[num_params + 11], params[num_params + 12]],x, params[7],
                                                                BDTLeave(partitions[6]), BDTLeave(partitions[7]), robustness_vector[5]),
                                                        BDTNodePoly([params[num_params + 13], params[num_params + 14]],x, params[8],
                                                                BDTLeave(partitions[8]), BDTLeave(partitions[9]), robustness_vector[6]),
                                                robustness_vector[7]),
                                        robustness_vector[8]), 
                                        BDTNodePoly([params[num_params + 15],params[num_params + 16]],x ,params[9], 
                                                BDTNodePoly([params[num_params + 18], params[num_params + 19]],x , params[10], 
                                                        BDTNodePoly([params[num_params + 20], params[num_params + 21]],x, params[11],
                                                                BDTLeave(partitions[10]), BDTLeave(partitions[11]), robustness_vector[9]),
                                                        BDTNodePoly([params[num_params + 22], params[num_params + 23]],x, params[12],
                                                                BDTLeave(partitions[12]), BDTLeave(partitions[13]), robustness_vector[10]),
                                                robustness_vector[11]), 
                                                BDTNodePoly([params[num_params + 24], params[num_params + 25]],x, params[13],
                                                        BDTNodePoly([params[num_params + 26], params[num_params + 27]],x, params[14],
                                                                BDTLeave(partitions[14]), BDTLeave(partitions[15]), robustness_vector[12]),
                                                        BDTNodePoly([params[num_params + 28], params[num_params + 29]],x, params[15],
                                                                BDTLeave(partitions[16]), BDTLeave(partitions[17]), robustness_vector[13]),
                                                robustness_vector[14]),
                                        robustness_vector[15]),
                                robustness_vector[16]),
                        robustness_vector[1]),
                robustness_vector[0])
        if not robust:
               return b.formula(), b
        else:
                return b.robust_formula(), b



#### Classification (**Partitions**)

In [None]:
def classify(params, x, robustness_vector):
    f, _ = build_b7(params, x, robustness_vector)
    return f

def classify_robust(params, x, robustness_vector):
    f, _ = build_b7(params, x, robustness_vector, robust = True)
    return f

classify(model_params, [x,y], robustness_vector)
print(classify(model_params, [x,y], robustness_vector))

#### Classification (**Adjacency**)

In [None]:
def generate_adjacency(p,q, i , j, params, mx):
    if i == mx and j == mx:
        return If(And(p == i, q == j),If(params[i][j],1,0),0)
    elif i <= mx and j < mx:
        return If(And(p == i, q == j), If(params[i][j],1,0), generate_adjacency(p,q,i,j+1,params,mx))
    elif i < mx and j == mx:
        return If(And(p == i, q == j), If(params[i][j],1,0), generate_adjacency(p,q,i+1,0,params,mx))


def adjacency(params, p, q): 
    return generate_adjacency(p,q,0,0,params,len(partitions) - 1)


#### Extract Learner from SMT

In [None]:
def extract_learner(s : Solver):
    print("Checking Constraints")
    print("Num Constraints:", len(s.assertions()))
    print(s.check())
    print("Checked Constraints")

    if s.check() == sat:
        m = s.model()
        adj = [ [ m.evaluate(adjacency_params[i][j]) for j in range(len(partitions)) ] for i in range(len(partitions)) ]
        eps = [m.evaluate(robustness_vector[i]) for i in range(num_params + 4)]
        params = [m.evaluate(param) for param in model_params]

        for i in range(len(params)):
            if not(type(params[i]) == ArithRef):
                print(model_params[i], decimal(params[i]))
            else:
                print(model_params[i], params[i])

        for i in range(len(robustness_vector)):
            if not(type(eps[i]) == ArithRef):
                print(robustness_vector[i], decimal(eps[i]))
            else:
                print(robustness_vector[i], robustness_vector[i])
      
        return params, adj, eps
    else:
        print("failed to solve")
        return None, None, None

#### Generate Initial Samples

In [None]:
def generate_samples(num: int):
    samples = (np.random.uniform([lower_x,lower_y], [upper_x,upper_y], (num, 2)))
    samples = np.array([[RealVal(i) for i in x] for x in samples])
    return samples

#### Conditions on Counterexamples

Add bisimulation conditions on counterexamples to solver

In [None]:
def cond_1_ctxs(solver, model_params, adj_params, eps, ctxs):
    for [x,y] in ctxs:
        solver.add(
            ForAll([p,q], Implies(And(classify(model_params, [x,y], eps) == p, 
                                            classify(model_params, successor([x,y]), eps) == q, domain([x,y])), 
                                adjacency(adj_params, p, q) == 1))
        )
            
def cond_2_ctxs(solver, model_params, adj_params, eps, ctxs):
    for [x,y] in ctxs:
        solver.add(
            ForAll([p,q], Implies(adjacency(adj_params, p ,q) == 1, 
                            Implies(And(classify(model_params,[x,y], eps) == p, domain([x,y])), classify(model_params,successor([x,y]),eps) == q)))
        )

In [None]:
def learn_ctxs(s, ctxs, new_ctxs):
    if new_ctxs == None: return ctxs
    ctxs = np.concatenate((ctxs, new_ctxs), axis = 0)
    cond_1_ctxs(s, model_params, adjacency_params,robustness_vector, new_ctxs)
    cond_2_ctxs(s, model_params, adjacency_params,robustness_vector, new_ctxs)
    return ctxs

Generate Solver and initialize with conditions on initial samples

In [None]:
s = Solver()
ctxs = generate_samples(20)

cond_1_ctxs(s, model_params, adjacency_params,robustness_vector ,ctxs)
cond_2_ctxs(s, model_params, adjacency_params,robustness_vector ,ctxs)
    
#mp, ap, eps = extract_learner(s) # First iteration is done in the CEGIS loop or here

## Verifier

Verify bisimulation conditions on learned partition and transition relation

In [None]:
def eval(m, x):
    return m.evaluate(x) if not (m.evaluate(x) == x) else 0

def ver_cond_1(model_params, adjacency_params, eps):
    s = Solver()
    s.add(Exists([p,q],(And(And(classify(model_params, [x,y], eps) == p, 
                        classify(model_params, successor([x,y]), eps) == q, domain([x,y])),
                    Not(adjacency(adjacency_params, p, q) == 1)))))
    if s.check() == sat:
        m = s.model()
        ctx = [[eval(m,x), eval(m,y)]]
        return ctx
    else:
        return None


def ver_cond_2(model_params, adjacency_params, eps):
    s = Solver()
    s.add(Exists([p,q],(And(adjacency(adjacency_params, p ,q) == 1, 
                        classify(model_params,[x,y], eps) == p, 
                        Not(classify(model_params,successor([x,y]), eps) == q),
                        domain([x,y])))))
    if s.check() == sat:
        m = s.model()
        ctx = [[eval(m,x), eval(m,y)]]
        return ctx
    else:
        return None

#### Augment Counterexamples

In [None]:
# Sample n-samples uniformly in a given region around a found counter example
def augment_ctxs(ctxs, num = 2, eps = 0.1):
    if ctxs == None: return None
    samples = []
    for ctx in ctxs:
        low0 = max(lower_x, decimal(ctx[0]) - eps)
        up0 = min(upper_x, decimal(ctx[0]) + eps)
        low1 = max(lower_x, decimal(ctx[1]) - eps)
        up1 = min(upper_x, decimal(ctx[1]) + eps)
        diff_samples = np.random.uniform([low0, low1],[up0, up1], (num, 2))
        samples += [ctx] + [[RealVal(x) for x in sample] for sample in diff_samples]
    return samples

## CEGIS

Run Learner and Verifier in a CEGIS loop. Verifier feeds Learner with counterexamples and Learner adjusts its parameters to adapt to the new data.

In [None]:
def cegis(s : Solver, num_it, ctxs):
    mp, ap, eps = extract_learner(s)
    verified = True
    for _ in range(num_it):
        s.push()
        ctxs_cond_1 = augment_ctxs(ver_cond_1(mp, ap, eps))

        if ctxs_cond_1 != None: 
            ctxs = learn_ctxs(s, ctxs, ctxs_cond_1)
            verified = False

        ctxs_cond_2 = augment_ctxs(ver_cond_2(mp, ap, eps))

        if ctxs_cond_2 != None: 
            ctxs = learn_ctxs(s, ctxs, ctxs_cond_2)
            verified = False

        if verified: break
        verified = True
        print("Extracting Learner Parameters")
        mp, ap, eps = extract_learner(s)
    return mp, ap, eps, ctxs

#### Run CEGIS

In [None]:
mp, ap, eps, ctxs = cegis(s,8, ctxs)

## Approximation Quantification

Extract decision boundary epsilons from Learner

In [None]:
# Condition 1 - Concrete transitions imply abstract transitions
def eps_cond_1(solver, model_params, adj_params, eps):
    solver.add(
        ForAll([p,q], Implies(Exists([x,y], And(classify_robust(model_params, [x,y], eps) == p, 
                                            classify_robust(model_params, successor([x,y]), eps) == q, 
                                            x >= 0 + eps[-1], x <= 4 - eps[-2], y <= 4 - eps[-3], y >= 0 + eps[-4], Not(p == -1), Not(q == -1))), 
                        adjacency(adj_params, p, q) == 1)
        )
    )

# Condition 2 - Abstract transitions imply concrete transitions
def eps_cond_2(solver, model_params, adj_params, eps):
    solver.add(
        ForAll([p,q], Implies(adjacency(adj_params, p ,q) == 1, 
                ForAll([x,y], Or(Implies(classify_robust(model_params,[x,y], eps) == p, classify_robust(model_params,successor([x,y]), eps) == q), 
                                 x < 0 + eps[-1], x > 4 - eps[-2], y < 0 + eps[-3], y > 4 - eps[-4])))
        )
    )

### Check Epsilon

In [None]:
def verify_eps(mp, ap, upper_bound):
    e = Solver()
    eps_cond_1(e, mp, ap, robustness_vector)
    eps_cond_2(e, mp, ap, robustness_vector)

    for eps in robustness_vector:
        e.add(eps >= 0.0)
        e.add(eps <= upper_bound)
    
    _, _, eps = extract_learner(e)

    return eps


In [None]:
ver_eps = verify_eps(mp, ap, .085)

#### Synthesize Epsilon

In [None]:
def synthesize_eps(mp, ap, upper_bound, depth = 10):
    l = 0.
    u = upper_bound
    
    eps_min = None

    for _ in range(depth):
        m = (l + u) / 2.
        eps = verify_eps(mp, ap, m)
        print("M:",m)
        if (l == u):
            break

        if eps != None:
            eps_min = eps
            u = m
        else:
            l = m
    return eps_min

In [None]:
ver_eps = [0.01 + i * 0.005 for i in range(len(robustness_vector))]

### Quotient Extraction

In [None]:
nx.draw_networkx(extract_quotient(ap))

## Partition Visualization

In [None]:
def predictor(a, robust = False):
    result = []
    _, tree = build_b7(mp, [x,y], (ver_eps if ver_eps != None else robustness_vector))
    a = a.tolist()
    counter = 0
    for i in a:
        res = tree.pred([i[0],i[1]], robust)
        result.append(res)
        counter += 1
    return np.array(result)

In [None]:
def transform_ctx(ctxs, s):
    res = []
    for ctx in ctxs:
        new = []
        for comp in ctx:
            dec = simplify(comp).as_decimal(10)
            if dec[-1] == "?":
                new.append(float(dec[:-1]))
            else:
                new.append(float(dec))
        res.append(np.array(new))
    return np.array(res)

### Visualize

In [None]:
def visualize(acc = 0.01, robust = False):
    x_min, x_max = -1, 3
    y_min, y_max = -1, 3
    xx, yy = np.meshgrid(np.arange(x_min, x_max, acc), np.arange(y_min, y_max, acc))

    print(len(np.around(np.c_[xx.ravel(), yy.ravel()], decimals = 3)))
    Z = predictor(np.around(np.c_[xx.ravel(), yy.ravel()], decimals = 3), robust=robust)
    Z = Z.reshape(xx.shape)

    # Create a colormap for the decision regions
    if robust :
        cmap = ListedColormap(['#A4AFBF', '#AAFFFF','#FFAAAA','#BBAAFF','#AAAAFF','#DDBBFF','#AADDFF','#CCDDFF','#CCCCFF','#FFCCFF'])
    else:
        cmap = ListedColormap(['#FFAAAA', '#AAFFFF', '#AAAAFF','#BBAAFF','#AADDFF','#DDBBFF','#CCDDFF','#CCCCFF','#FFCCFF','#FBBCFF','#FBBCCC','#CCBCCC','#FBBAAA','#AAACCC','#DDBDDC','#DDBFFF','#FFFDDC','#DFFFFC'])

    # Plot the decision regions
    plt.figure()
    plt.pcolormesh(xx,yy,Z, cmap=cmap)

    # Plot the training data points
    examples = transform_ctx(ctxs, s)
    plt.scatter(examples[:,0],examples[:,1])
    plt.xlabel('x_0') 
    plt.ylabel('x_1')
    plt.axis('square')
    plt.title('Decision Regions of Decision Tree Classifier')
    annotaed = []

    example_classes = predictor(examples)
    for i in range(len(examples)):
        ex = examples[i]
        c = example_classes[i]
        if c not in annotaed:
            plt.annotate(c, ex)
            annotaed.append(c)

    plt.savefig('./figures/vis_{}.png'.format(time.time()), dpi = 800, bbox_inches='tight')
    # plt.show()

In [None]:
visualize(acc = 0.04)

In [None]:
visualize(robust = True, acc = 0.04)

## Weak Bisimulation


#### Stutter Steps

In [None]:
def successor(x):
    y = [i for i in x]
    y[0] = If(x[0] - x[1] <= 0, x[0], x[0] + 0.15)
    y[1] = If(x[0] - x[1] <= 0.2, If(x[0] - x[1] <= 0, x[1], x[1] + 0.2), x[1] + 0.3)
    return y

In [None]:
def exists_stutt(k, x, p, q, model_params, robustness_vector):
    if k == 1:
        return classify(model_params,successor(x), robustness_vector) == q
    else:
        return Or(classify(model_params,successor(x), robustness_vector) == q, 
                  And(classify(model_params,successor(x), robustness_vector) == p, exists_stutt(k-1,successor(x),p,q, model_params, robustness_vector)))

#### Stuttering Bisimulation Condition for Learner

In [None]:
def cond_2_ctxs_stutt(solver, model_params, adj_params, eps, ctxs):
    for [x,y] in ctxs:
        solver.add(
            ForAll([p,q], Implies(And(adjacency(adj_params, p ,q) == 1, Not(p == q)), 
                            Implies(And(classify(model_params,[x,y], eps) == p, domain([x,y])), 
                                    exists_stutt(2, [x,y], p, q, model_params, eps))))
        )

In [None]:
def learn_ctxs_stutt(s, ctxs, new_ctxs):
    if new_ctxs == None: return ctxs
    ctxs = np.concatenate((ctxs, new_ctxs), axis = 0)
    cond_1_ctxs(s, model_params, adjacency_params,robustness_vector, new_ctxs)
    cond_2_ctxs_stutt(s, model_params, adjacency_params,robustness_vector, new_ctxs)
    return ctxs

#### Stuttering Bisimulation Verification

In [None]:
def ver_cond_2_stutt(model_params, adjacency_params, eps):
    s = Solver()
    s.add(Exists([p,q],(And(adjacency(adjacency_params, p ,q) == 1, 
                        classify(model_params,[x,y], eps) == p, 
                        Not(exists_stutt(2, [x,y], p, q, model_params, eps)),
                        domain([x,y])))))
    if s.check() == sat:
        m = s.model()
        ctx = [[eval(m,x), eval(m,y)]]
        return ctx
    else:
        return None

In [None]:
s = Solver()
ctxs = generate_samples(10)

cond_1_ctxs(s, model_params, adjacency_params,robustness_vector ,ctxs)
cond_2_ctxs_stutt(s, model_params, adjacency_params,robustness_vector ,ctxs)
    
mp, ap, eps = extract_learner(s)

## CEGIS for Stuttering

In [None]:
mps = []
aps = []
ctxs_log = []
def cegis_stutter(s : Solver, num_it, ctxs):
    mp, ap, eps = extract_learner(s)
    verified = True
    for _ in range(num_it):
        s.push()
        ctxs_cond_1 = augment_ctxs(ver_cond_1(mp, ap, eps))

        if ctxs_cond_1 != None: 
            ctxs = learn_ctxs_stutt(s, ctxs, ctxs_cond_1)
            verified = False

        ctxs_cond_2 = augment_ctxs(ver_cond_2_stutt(mp, ap, eps))

        if ctxs_cond_2 != None: 
            ctxs = learn_ctxs_stutt(s, ctxs, ctxs_cond_2)
            verified = False

        if verified: break
        verified = True
        print("Extracting Learner Parameters")
        mp, ap, eps = extract_learner(s)
        
    mps.append(mp)
    aps.append(aps)
    ctxs_log.append(ctxs)
    return mp, ap, eps, ctxs

## Approximation Quantification for Stuttering

In [None]:
def exists_stutt_robust(k, x, p, q, model_params, robustness_vector):
    if k == 1:
        return classify_robust(model_params,successor(x), robustness_vector) == q
    else:
        return Or(classify_robust(model_params,successor(x), robustness_vector) == q, 
                  And(classify_robust(model_params,successor(x), robustness_vector) == p, exists_stutt_robust(k-1,successor(x),p,q, model_params, robustness_vector)))

In [None]:
# Condition 2 - Abstract transitions imply concrete transitions
def eps_cond_2_stutt(solver, model_params, adj_params, eps):
    solver.add(
        ForAll([p,q], Implies(And(adjacency(adj_params, p ,q) == 1, Not(p == q)), 
                ForAll([x,y], Or(Implies(classify_robust(model_params,[x,y], eps) == p, exists_stutt_robust(2, [x,y], p,q, model_params, eps)), 
                                 x < -1 + eps[-1], x > 4 - eps[-2], y < 4 + eps[-3], y > -1 - eps[-4])))
        )
    )

In [None]:
def verify_eps_stutt(mp, ap, upper_bound):
    e = Solver()
    eps_cond_1(e, mp, ap, ver_eps)
    eps_cond_2_stutt(e, mp, ap, ver_eps)

    for eps in robustness_vector:
        e.add(eps >= 0.0)
        e.add(eps <= upper_bound)
    _, _, eps = extract_learner(e)

    return eps

In [None]:
ver_eps = verify_eps_stutt(mp, ap, .3)

### Visualization

In [None]:
visualize(robust = True)

In [None]:
visualize()

In [None]:
nx.draw_networkx(extract_quotient(ap))