In [None]:
from collections import namedtuple
from dataclasses import dataclass, replace
import pickle
from IPython.display import display
import ipywidgets as widgets
import cvxopt
import numpy as np
from matplotlib import pyplot
from scipy.integrate import odeint
import quadprog
from donotation import do
import statemonad
import polymat, polymat.typing
import sosopt, sosopt.typing

In [None]:
cvxopt.solvers.options['show_progress'] = False

# Initialize state object
shared_state = [polymat.init_state()]

# 5. Simulations

## Introduction

In this notebook, we constuct the advanced safety filter from the previously computed Control Lyapunov Function (CLF) and two Control Barrier Functions (CBFs), and then conduct simulations to evaluate its performance.

## Dynamical Model

The following subsection defines the dynamical system model using polynomial matrices.

In [None]:
@dataclass
class ModelParam:
    s_n: float
    w_n: float

    v_grid_phph: float
    r_grid_si: float
    l_grid_si: float

    r_tr: float
    x_tr: float

    c_dc_si: float
    r_dc_si: float
    v_dc_n: float

    def __post_init__(self):
        self.v_n = self.v_grid_phph / np.sqrt(3)
        self.i_n = self.s_n / self.v_n
        self.z_n = self.v_n**2 / self.s_n
        self.l_n = self.z_n / self.w_n
        self.z_dc_n = self.v_dc_n**2 / self.s_n
        self.c_dc_n = 1 / (self.z_dc_n * self.w_n)

        self.r_grid = self.r_grid_si / self.z_n
        self.x_grid = self.l_grid_si / self.l_n
        self.c_dc = self.c_dc_si / self.c_dc_n
        self.r_dc = self.r_dc_si / self.z_dc_n

        self.g_dc = 1 / self.r_dc

        self.l = self.x_tr + self.x_grid


model = ModelParam(
    s_n=20e6 / 3,
    w_n=2 * np.pi * 50,
    v_grid_phph=130e3,
    r_grid_si=1e-3,
    l_grid_si=0.01,
    r_tr=2 * 3e-3,
    x_tr=2 * 9e-2,
    c_dc_si=2 * 10e-3,
    r_dc_si=1000,
    v_dc_n=2400,
)

# steady state input offset
u0 = np.array(((1, -model.l*model.g_dc),)).T

In [None]:
# polynomial degrees
####################

u_degrees = (0, 1, 2, 3)
V_degrees = (0, 1, 2, 3, 4)
B_degrees = (1, 2, 3, 4)


# variables
###########

# define state variables
variable_names = ("v_dc", "i_d", "i_q", "i_{q,ref}")
state_variables = tuple(polymat.define_variable(name) for name in variable_names)
v_dc, i_d, i_q, i_qref = state_variables
x = polymat.v_stack(state_variables)

# define input variables
variable_names = ("u_1", "u_2")
input_variables = tuple(polymat.define_variable(name) for name in variable_names)
u1, u2 = input_variables
u = polymat.from_(input_variables)

n_states = len(state_variables)
n_inputs = len(input_variables)


# system model
##############

# The dynamical system model is given by x_dot := G @ x + f @ u

scale = polymat.from_((
    (1 / model.c_dc, 1 / model.l, 1 / model.l, 0),
)).diag() * model.w_n

f = scale @ polymat.from_((
    (-model.g_dc*v_dc - i_d + model.g_dc*model.l*i_q,),
    (v_dc + model.l*i_q,),
    (-model.g_dc*model.l*v_dc - model.l*i_d,),
    (0,),
))

G = scale @ polymat.from_((
    (model.g_dc-i_d, -i_q), 
    (1+v_dc, 0), 
    (0, 1+v_dc),
    (0, 0),
))


# state feedback controller
###########################

# maximum system input norm
u_max = 1.3


# nominal controller
####################

u_n = polymat.from_(np.array((
    ((0.1*v_dc - i_d),),
    ((i_qref - i_q),),
)))


# retrieve arrays
#################

shared_state[0], f_array = polymat.to_array(f, x).apply(shared_state[0])
shared_state[0], G_array = polymat.to_array(G, x).apply(shared_state[0])
shared_state[0], u_n_array = polymat.to_array(u_n, x).apply(shared_state[0])

with open('2_arrays.p', 'rb') as file:   
    arrays_2 = pickle.load(file)

with open('3_arrays.p', 'rb') as file:   
    arrays_3 = pickle.load(file)

# with open('4_arrays.p', 'rb') as file:   
#     arrays_4 = pickle.load(file)

## Advanced Safety Filter

In [None]:
def solve_qcqp(C, b, u_nom, qcqp):
    if qcqp:
        shared_state[0], solver_args = sosopt.solve_args(
            indices=u,
            lin_cost=-2 * polymat.from_(u_nom.T) @ u,
            quad_cost=u,
            l_data=(
                polymat.from_(C) @ u - b,
            ),
            q_data=(
                polymat.v_stack((1.3, u + u0)),
            ),
        ).apply(shared_state[0])
        
        solver_data = sosopt.cvx_opt_solver.solve(solver_args)
        return solver_data.solution.reshape(-1, 1)

    else:
        return quadprog.solve_qp(
            G=np.eye(2),
            a=u_nom.reshape(-1),
            C=C.T,
            b=b.reshape(-1),
            meq=0,
        )[0].reshape(-1, 1)


def define_advanced_safety_filter(array):
    size = array['B1'].n_param
    
    def func(x):        
        fx = f_array(x)[:size,:]
        Gx = G_array(x)[:size,:]

        x_ = x[:size,:]
        
        V = array['V'](x_)
        B1 = array['B1'](x_)
        B2 = array['B2'](x_)

        dV = array['dV'](x_)
        dB1 = array['dB1'](x_)
        dB2 = array['dB2'](x_)

        gV = array['gV'](x_)
        gB1 = array['gB1'](x_)
        gB2 = array['gB2'](x_)

        s = array['s'](x_)
        
        C = np.vstack((
            -dV.T @ Gx,
            -dB1.T @ Gx, 
            -dB2.T @ Gx,
        ))
        b = np.vstack((
            dV.T @ fx + gV * V / s,
            dB1.T @ fx + gB1 * B1 / s, 
            dB2.T @ fx + gB2 * B2 / s,
        ))
        
        u = solve_qcqp(
            C=C,
            b=b,
            u_nom=u_n_array(x),
            qcqp=enforce_ulim_using_qcqp,
        )

        return u

    return func


@do()
def define_benchmark_safety_filter(array):  
    size = array['B1'].n_param
    
    def func(x):
        fx = f_array(x)[:size,:]
        Gx = G_array(x)[:size,:]

        x_ = x[:size,:]

        B1 = array['B1'](x_)
        B2 = array['B2'](x_)

        dB1 = array['dB1'](x_)
        dB2 = array['dB2'](x_)
        
        C = np.vstack((
            -dB1.T @ Gx, 
            -dB2.T @ Gx,
        ))
        b = np.vstack((
            dB1.T @ fx + 10 * B1,
            dB2.T @ fx + 10 * B2,
        ))
        
        u = solve_qcqp(
            C=C,
            b=b,
            u_nom=u_n_array(x),
            qcqp=False,
        )

        return u

    return func

## Simulation Setup

In [None]:
def project_input_to_constraint(u):
    v1 = u[0, 0] + u0[0, 0]
    v2 = u[1, 0] + u0[1, 0]
    v_abs = np.sqrt(v1**2 + v2**2)
    if u_max < v_abs:        
        u1 = v1 * u_max / v_abs - u0[0, 0]
        u2 = v2 * u_max / v_abs - u0[1, 0]

        return np.array((u1, u2)).reshape(-1, 1) 
    else:
        return u


@dataclass
class SimResult:
    v_dc: np.ndarray
    i_d: np.ndarray
    i_q: np.ndarray
    u: np.ndarray


def simulate(time, x0, controller):
    t_data = []
    u_data = []
    
    def get_x_dot(state, time):
        [v_dc, i_d, i_q, i_q_ref] = state           
        x = np.array(((v_dc, i_d, i_q, i_q_ref),)).T
    
        u = project_input_to_constraint(controller(x))
    
        t_data.append(time)
        u_data.append(np.sqrt((u[0, 0] + u0[0, 0])**2 + (u[1, 0] + u0[1, 0])**2))
        
        fx = f_array(x)
        Gx = G_array(x)
    
        xdot = fx + Gx @ u
    
        return np.squeeze(xdot)
    
    sol = odeint(
        get_x_dot, 
        x0, 
        time,
    )

    # interpolate collected time input samples with the provided time vector
    u_interp = np.interp(time, t_data, u_data)
    
    result = SimResult(
        v_dc = sol[:, 0:1],
        i_d = sol[:, 1:2],
        i_q = sol[:, 2:3],
        u = u_interp,
    )

    return result


def plot_simulation_results(time, result_n, result_b, result_s):
    pyplot.close('all')
    fig = pyplot.figure(figsize=(8, 6))
    axs = fig.subplots(3, 1, sharex=True)
    
    color_n = '#1f77b4'
    color_b = '#ff7f0e'
    color_s = '#2ca02c'
    
    name_n = '$u_{n}(x)$'
    name_b = '$u_{b}(x)$'
    name_s = '$u_{s}(x)$'
    
    axs[0].plot(time, result_n.v_dc, color_n, label=name_n)
    axs[0].plot(time, result_b.v_dc, color_b, label=name_b)
    axs[0].plot(time, result_s.v_dc, color_s, label=name_s)
    axs[0].legend()
    axs[0].plot(time, 0.2*np.ones_like(time), 'r', linestyle='dashed')
    axs[0].plot(time, -0.8*np.ones_like(time), 'r', linestyle='dashed')
    axs[0].set_ylabel(r'${\tilde v}_{dc}$ [p.u.]')
    
    norm = lambda r: np.sqrt(r.i_d**2 + r.i_q**2)
    
    axs[1].plot(time, norm(result_n), color_n, label=name_n)
    axs[1].plot(time, norm(result_b), color_b, label=name_b)
    axs[1].plot(time, norm(result_s), color_s, label=name_s)
    axs[1].plot(time, 1.3*np.ones_like(time), 'r', linestyle='dashed')
    axs[1].plot(time, -1.3*np.ones_like(time), 'r', linestyle='dashed')
    axs[1].legend()
    axs[1].set_ylabel(r'$\| \tilde i \|$ [p.u.]')
    
    axs[2].plot(time, result_n.u, color_n, label=name_n)
    axs[2].plot(time, result_b.u, color_b, label=name_b)
    axs[2].plot(time, result_s.u, color_s, label=name_s)
    axs[2].plot(time, 1.3*np.ones_like(time), 'r', linestyle='dashed')
    axs[2].plot(time, -1.3*np.ones_like(time), 'r', linestyle='dashed')
    axs[2].legend()
    axs[2].set_ylabel(r'$\| u \|$ [p.u.]')
    axs[2].set_xlabel('time [s]')

    return axs

## Perform Simulations

### Simulation: State constraint compliance 

In [None]:
# Fig. 7

# simulation time
t_sim = 0.005

# initial state
x0 = (0.16, -0.9, 0.9, 0)

dt_sim = 1e-6
n_samples = int(t_sim/dt_sim)
time = np.linspace(0, t_sim, n_samples)

enforce_ulim_using_qcqp = False
# enforce_ulim_using_qcqp = True

u_b = define_benchmark_safety_filter(arrays_2)
u_s = define_advanced_safety_filter(arrays_3)

print('simulate with u=u_n')
result_n = simulate(time=time, x0=x0, controller=u_n_array)
print('simulate with u=u_b')
result_b = simulate(time=time, x0=x0, controller=u_b)
print('simulate with u=u_s')
result_s = simulate(time=time, x0=x0, controller=u_s)

print('plot results')
axs = plot_simulation_results(time, result_n, result_b, result_s)

axs[0].set_ylim((0.14, 0.21))
axs[1].set_ylim((-0.1, 1.4))
axs[2].set_ylim((0.7, 1.4))

### Simulation: Finite-time convergence to the nominal region

In [None]:
# Fig. 8

# simulation time
t_sim = 0.1

# initial state
x0 = (-0.2, -0, 0, 0)

dt_sim = 1e-6
n_samples = int(t_sim/dt_sim)
time = np.linspace(0, t_sim, n_samples)

enforce_ulim_using_qcqp = False
# enforce_ulim_using_qcqp = True

u_b = define_benchmark_safety_filter(arrays_2)
u_s = define_advanced_safety_filter(arrays_3)

print('simulate with u=u_n')
result_n = simulate(time=time, x0=x0, controller=u_n_array)
print('simulate with u=u_b')
result_b = simulate(time=time, x0=x0, controller=u_b)
print('simulate with u=u_s')
result_s = simulate(time=time, x0=x0, controller=u_s)

print('plot results')
axs = plot_simulation_results(time, result_n, result_b, result_s)

axs[0].set_ylim((-0.3, 0.1))
axs[1].set_ylim((-0.1, 0.3))
axs[2].set_ylim((0.9, 1.4))

### Simulation: Quadrature current reference tracking

In [None]:
# Fig. 9

# simulation time
t_sim = 0.25

# initial state
x0 = (0.16, -0.5, -0.3, 1)

dt_sim = 1e-6
n_samples = int(t_sim/dt_sim)
time = np.linspace(0, t_sim, n_samples)

enforce_ulim_using_qcqp = False
# enforce_ulim_using_qcqp = True

u_b = define_benchmark_safety_filter(arrays_2)
u_s = define_advanced_safety_filter(arrays_4)

print('simulate with u=u_n')
result_n = simulate(time=time, x0=x0, controller=u_n_array)
print('simulate with u=u_b')
result_b = simulate(time=time, x0=x0, controller=u_b)
print('simulate with u=u_s')
result_s = simulate(time=time, x0=x0, controller=u_s)

print('plot results')
axs = plot_simulation_results(time, result_n, result_b, result_s)

axs[0].set_ylim((-0.3, 0.21))
axs[1].set_ylim((0, 1.4))
axs[2].set_ylim((0.7, 1.4))