In [None]:
from collections import namedtuple
from dataclasses import dataclass, replace
import pickle
from IPython.display import display
import ipywidgets as widgets
import cvxopt
import numpy as np
from matplotlib import pyplot
from donotation import do
import statemonad
import polymat, polymat.typing
import sosopt, sosopt.typing

In [None]:
# Initialize state object
state = polymat.init_state()

In [None]:
@dataclass
class ModelParam:
    w_n: float
    r: float
    l: float


model = ModelParam(
    w_n=2*np.pi*50,
    l=0.16,
    r=0.02,
)

In [None]:
# polynomial degrees
####################
u_degrees = (0, 1)
V_degrees = (0, 1, 2)
B_degrees = (1, 2)


# variables
###########

# define state variables
variable_names = ('i_d', 'i_q', r'v_{f,d}', r'v_{f,q}', 'i_{ref,d}', 'i_{ref,q}', 'v_d', 'v_q')
state_variables = tuple(polymat.define_variable(name) for name in variable_names)
i_d, i_q, vf_d, vf_q, i_ref_d, i_ref_q, v_d, v_q = state_variables
x = polymat.v_stack(state_variables)

# define input variables
variable_names = ("u_1", "u_2", "u_3", "u_4")
input_variables = tuple(polymat.define_variable(name) for name in variable_names)
u1, u2, u3, u4 = input_variables
u = polymat.from_(input_variables)

n_states = len(state_variables)
n_inputs = len(input_variables)


# system model
##############

# The dynamical system model is given by x_dot := f(x) + G(x) @ u
# i_dot = - Z_c i - v + u

pu_norm = model.w_n/model.l

f = polymat.from_((
    (-(model.r*i_d - model.l*i_q + v_d) * pu_norm,),
    (-(model.r*i_q + model.l*i_d + v_q) * pu_norm,),
    (0,),
    (0,),
    (0,),
    (0,),
    (0,),
    (0,),
))

G = polymat.from_((
    (pu_norm, 0, 0, 0), 
    (0, pu_norm, 0, 0), 
    (0, 0, 1, 0),
    (0, 0, 0, 1),
    (0, 0, 0, 0),
    (0, 0, 0, 0),
    (0, 0, 0, 0),
    (0, 0, 0, 0),
))


# state feedback controller
###########################

# The state feedback controller is defined as the reational function u(x) := p(x) / s(x)

# maximum system input norm
u_max = 2

p_monom = x.combinations(degrees=u_degrees)
p = sosopt.define_polynomial(
    name="p", 
    monomials=p_monom, 
    polynomial_variables=x,
    shape=(n_inputs, 1),
)

G_at_p = (G @ p).cache()

x_dot = (f + G_at_p).cache()


# nominal controller
####################

u_n = polymat.from_((
    # DPC
    (model.r*i_ref_d - model.l*i_ref_q + vf_d,),
    (model.r*i_ref_q + model.l*i_ref_d + vf_q,),
    (100 * (v_d - vf_d),),
    (100 * (v_q - vf_q),),
))

u_n_modified = u_n + polymat.from_((
    (0.2 * (i_ref_d - i_d),),
    (0.2 * (i_ref_q - i_q),),
    (1000 * (v_d - vf_d),),
    (1000 * (v_q - vf_q),),
))

x_n_dot = f + G @ u_n_modified


# state constraints
###################

i_max = 1.24
vf_max = 2

# Define the state constraints as the intersection of zero-sublevel sets of two polynomials, w1 and w2
w1 = (i_d/i_max)**2 + (i_q/i_max)**2 + (vf_d/vf_max)**2 + (vf_q/vf_max)**2 - 1

In [None]:
# Control Lyapunov and Barrier Functions
########################################

V_monom = x.combinations(degrees=V_degrees)
V = sosopt.define_polynomial(
    name="V", 
    monomials=V_monom, 
    polynomial_variables=x,
)
dV = V.diff(x).T.cache()

B_monom = x.combinations(degrees=B_degrees)
B_var = sosopt.define_polynomial(
    name="B", 
    monomials=B_monom, 
    polynomial_variables=x,
)
B = B_var - 1
dB = B.diff(x).T.cache()

# margins used for solving the bilinear problem using an alternating algorithm
clf_epsilon = sosopt.define_variable(name="clf_epsilon")
cbf_epsilon = sosopt.define_variable(name="cbf_epsilon")


# Region Of Interest (ROI)
##########################

i_ref_max = 1.18
v_max = 0.1

# Increase the zero sublevel set of the region of interest by decreasing epsilon from 1 to 0
delta_roi = sosopt.define_variable(name="delta_roi")

# The region of interest is defined by the zero sublevel set of the following polynomials
# It is selected to contain the allowable set X_a.
roi1 = (
    - ((i_d/i_max)**2 + (i_q/i_max)**2 + (vf_d/vf_max)**2 + (vf_q/vf_max)**2 - 1)
).cache()
roi2 = (
    - ((i_ref_d/i_ref_max)**2 + (i_ref_q/i_ref_max)**2 - 1)
).cache()
roi3 = (
    - ((v_d/v_max)**2 + (v_q/v_max)**2 - 1)
).cache()

roi_dict = {'roi1': roi1, 'roi2': roi2, 'roi3': roi3}

roiV_dict = {name: roi - delta_roi for name, roi in roi_dict.items()}
roiB_dict = {name: roi - delta_roi for name, roi in roi_dict.items()}


# Dissipation rate
##################

# dissipation_rate = 0.01 * (V + 1)
dissipation_rate = 0

In [None]:
@do()
def define_constraints():

    # Control Lyapunov Function (CLF) condition
    clf_condition = yield from sosopt.sos_constraint_putinar(
        name="clf",
        greater_than_zero=-(dV.T @ x_dot) - dissipation_rate + clf_epsilon,
        domain=sosopt.set_(
            greater_than_zero=roiV_dict | {"V": V},
        ),
    )

    # Forward invariance condition of the nominal region w.r.t. the nominal controller.
    clf_unom_condition = yield from sosopt.sos_constraint_putinar(
        name="unom",
        greater_than_zero=-(dV.T @ x_n_dot) - dissipation_rate,
        domain=sosopt.set_(
            greater_than_zero=roiV_dict,
            equal_zero={"V": V},
        ),
    )

    # Control Lyapunov Function (CBF) condition
    cbf_condition = yield from sosopt.sos_constraint_putinar(
        name="cbf",
        greater_than_zero=-(dB.T @ x_dot) + cbf_epsilon,
        domain=sosopt.set_(
            greater_than_zero=roiB_dict,
            equal_zero={"B": B},
        ),
    )

    # These conditions ensure that the safety set contains nominal region
    b_contains_v = yield from sosopt.sos_constraint(
        name="bv",
        greater_than_zero=V - B,
    )

    # These conditions ensure that the safety set is contained in allowable region
    w1_contains_b = yield from sosopt.sos_constraint_putinar(
        name="w1b",
        greater_than_zero=B,
        domain=sosopt.set_(
            greater_than_zero={"w": w1},
        ),
    )

    # This positivity condition ensures that the region of interest grows only as much as necessary
    delta_roi_min = yield from sosopt.sos_constraint(
        name="delta_roi_min",
        greater_than_zero=delta_roi,
    )

    # Limit CLF/CBF condition margins for numerical stability
    min_margin = -0.01
    clf_epsilon_min = yield from sosopt.sos_constraint(
        name="clf_epsilon_min",
        greater_than_zero=clf_epsilon - min_margin,
    )
    
    cbf_epsilon_min = yield from sosopt.sos_constraint(
        name="cbf_epsilon_min",
        greater_than_zero=cbf_epsilon - min_margin,
    )

    # Input constraints
    u12 = polymat.from_((u1, u2, 0, 0))
    u0 = polymat.from_((-1.0, 0, 0, 0))
    u_lim = yield from sosopt.sos_constraint_putinar(
        name="ulim",
        greater_than_zero=u_max**2 - (p - u0).T @ u12,
        domain=sosopt.set_(
            greater_than_zero={"w": u_max**2 - u12.T @ u12} | roiB_dict,
        ),
    )

    constraints = (
        clf_condition,
        clf_unom_condition,
        cbf_condition,

        w1_contains_b,
        b_contains_v,
        
        delta_roi_min,
        
        clf_epsilon_min,
        cbf_epsilon_min,

        u_lim,
    )
    return statemonad.from_({c.name: c for c in constraints})

state, constraints = define_constraints().apply(state)

In [None]:
@do()
def get_initial_symbol_values():
    init_multiplier_value = 1
    
    # initialize the algorithm with an initial guess
    init_values = (
        (p, u_n),
        (constraints['clf'].multipliers["V"], init_multiplier_value),
        (constraints['unom'].multipliers["V"], init_multiplier_value),
        (constraints['cbf'].multipliers["B"], init_multiplier_value),
        (delta_roi, 1),
    )

    def to_tuple():
        for expr, value_expr in init_values:
            if isinstance(value_expr, (float, int)):
                value_expr = polymat.from_vector(value_expr)

            @do()
            def to_symbol_data_tuple(expr, symbol, monomials=None):
                data = yield from polymat.to_tuple(
                    expr.linear_in(variables=x, monomials=monomials)
                )
                return statemonad.from_((symbol, data[0]))                
            
            match expr:
                case sosopt.typing.PolynomialVariable(monomials=monomials):
                    for (row, col), param in expr.iterate_coefficients():
                        yield to_symbol_data_tuple(
                            expr=value_expr[row, col],
                            symbol=param.symbol,
                            monomials=monomials,
                        )

                case polymat.typing.VariableExpression() as var_expr:
                    yield to_symbol_data_tuple(
                        expr=value_expr,
                        symbol=var_expr.symbol,
                    )

    values_tuple = yield from statemonad.zip(to_tuple())
    return statemonad.from_(dict(values_tuple))

# If True, load the initialization from a pickle file
load_from_file = False

if not load_from_file:
    state, inital_symbol_values = get_initial_symbol_values().apply(state)

else:
    file_name = '2_symbol_values.p'
    with open(file_name, 'rb') as file:   
        inital_symbol_values = pickle.load(file)

In [None]:
# data collected at each iteration of the algorithm
###################################################

@dataclass
class IterationData:
    state: polymat.typing.State
    symbol_values: dict
    solver_data: sosopt.typing.SolverData | None

iter_data = IterationData(state=state, symbol_values=inital_symbol_values, solver_data=None)

In [None]:
# define steps/alternations of the algorithm
############################################

@dataclass
class Step:
    lin_cost: polymat.typing.MatrixExpression
    quad_cost: polymat.typing.MatrixExpression | None
    substitutions: tuple
    override_symbol_values: dict

def init_step(lin_cost, substitutions, quad_cost=None, override_symbol_values={}):
    return Step(
        lin_cost=lin_cost, quad_cost=quad_cost, substitutions=substitutions, 
        override_symbol_values=override_symbol_values,
    )

# Set CLF/CBF margins to zero when they are not being optimizied
reset_epsilon_to_zero = {
    clf_epsilon.symbol: (0,),
    cbf_epsilon.symbol: (0,),
}

step_1_substitutions = (
    p,
    constraints['clf'].multipliers["V"],
    constraints['unom'].multipliers["V"],
    constraints['cbf'].multipliers["B"],
)

step_2_substitutions = (
    V, 
    B_var,
)

step_1_margin = init_step(
    lin_cost=clf_epsilon + cbf_epsilon,
    # lin_cost=cbf_epsilon,
    substitutions=step_1_substitutions + (
        delta_roi,
    )
)

step_1_roi = init_step(
    lin_cost=delta_roi,
    substitutions=step_1_substitutions + (
        *[constraints['clf'].multipliers[name] for name in roi_dict],
        *[constraints['unom'].multipliers[name] for name in roi_dict],
        *[constraints['cbf'].multipliers[name] for name in roi_dict],
        *[constraints['ulim'].multipliers[name] for name in roi_dict],
        clf_epsilon, 
        cbf_epsilon,
    ),
    override_symbol_values=reset_epsilon_to_zero,
)

step_1_vol = init_step(
    lin_cost=B.to_gram_matrix(x).trace() + V.to_gram_matrix(x).trace(),
    substitutions=step_1_substitutions + (
        delta_roi,
        clf_epsilon, 
        cbf_epsilon,
    ),
    override_symbol_values=reset_epsilon_to_zero,
)

step_2_margin = init_step(
    lin_cost=clf_epsilon + cbf_epsilon,
    # lin_cost=cbf_epsilon,
    substitutions=step_2_substitutions + (
        delta_roi,
    )
)

step_2_roi = init_step(
    lin_cost=delta_roi,
    substitutions=step_2_substitutions + (
        constraints['cbf'].multipliers["B"],
        *[constraints['clf'].multipliers[name] for name in roi_dict],
        *[constraints['unom'].multipliers[name] for name in roi_dict],
        *[constraints['cbf'].multipliers[name] for name in roi_dict],
        *[constraints['ulim'].multipliers[name] for name in roi_dict],
        clf_epsilon, 
        cbf_epsilon,
    ),
    override_symbol_values=reset_epsilon_to_zero,
)

In [None]:
# select solver
solver = sosopt.cvx_opt_solver
# solver = sosopt.mosek_solver

# CVXOPT solver options
#######################

cvxopt.solvers.options['show_progress'] = False
# cvxopt.solvers.options['show_progress'] = True

cvxopt.solvers.options['maxiters'] = 100
# cvxopt.solvers.options['reltol'] = 1e-6

@do()
def solve_problem(step: Step, iter_data: IterationData):
    problem = sosopt.sos_problem(
        lin_cost=step.lin_cost,
        quad_cost=step.quad_cost,
        constraints=constraints.values(),
        solver=solver,
    )

    # overwrite values
    symbol_values = iter_data.symbol_values | step.override_symbol_values

    substitutions = {
        symbol: symbol_values[symbol] for param in step.substitutions for symbol in param.iterate_symbols()
    }
    
    # filter values that need to be substituted
    problem = problem.eval(substitutions)

    # # print the decision variables for each constraint
    # for primitive in problem.constraint_primitives:
    #     print(f'{primitive.name=}, {primitive.decision_variable_symbols=}')
    
    # solve SOS problem
    sos_result = yield from problem.solve()
    solver_data = sos_result.solver_data

    if not solver_data.is_successful:
        raise Exception(f'No Solution has been found. Solver terminated with {solver_data.status}')
    
    print(f'{solver_data.status=}, {solver_data.iterations=}, {solver_data.cost=}')
    
    # update iteration data
    n_iter_data = replace(
        iter_data, 
        symbol_values=symbol_values | sos_result.symbol_values, 
        solver_data=solver_data,
    )
    
    # epsilon_roi = (cbf_epsilon, delta_roi)
    epsilon_roi = (clf_epsilon, cbf_epsilon, delta_roi)
    print(f'epsilon = {tuple(n_iter_data.symbol_values[e.symbol] for e in epsilon_roi)}')

    return statemonad.from_(n_iter_data)

In [None]:
np.set_printoptions(threshold=np.inf, linewidth=200)

for idx in range(40):
    print(f'iteration: {idx}')

    if 1e-6 < iter_data.symbol_values[delta_roi.symbol][0]:
        state, iter_data = solve_problem(step_1_margin, iter_data).apply(state)
        state, iter_data = solve_problem(step_1_roi, iter_data).apply(state)
        state, iter_data = solve_problem(step_2_margin, iter_data).apply(state)
        state, iter_data = solve_problem(step_2_roi, iter_data).apply(state)
        
    else:
        break

In [None]:
# maximize a surrogate volume of the safe set

# the primal cost of the previous iteration
previous_cost = None

if not (
    1e-6 < iter_data.symbol_values[delta_roi.symbol][0]
):
    for idx in range(5):
        print(f'iteration: {idx}')
    
        state, iter_data = solve_problem(step_2_margin, iter_data).apply(state)
        state, iter_data = solve_problem(step_1_vol, iter_data).apply(state)

        if previous_cost is not None:
            if (previous_cost - iter_data.solver_data.cost) < 0.01 * iter_data.solver_data.cost:
                break
        previous_cost = iter_data.solver_data.cost

In [None]:
def save_symbol_values(arg):
    file_name = '2_symbol_values.p'
    
    with open(file_name, 'wb') as file:   
        pickle.dump(iter_data.symbol_values, file)

# Create a button to ensure that the file is not overritten by accident.
button_download = widgets.Button(description = 'Save symbol values')   
button_download.on_click(save_symbol_values)
display(button_download)

In [None]:
@do()
def plot_result(symbol_values):

    # Helper function to project the 3 dimensional state onto 2 dimensions
    def map_to_xy(x, y):
        # return np.array((x, y) + (0,) * (n_states - 2)).reshape(-1, 1)
        return np.array((x, y) + (0, 0, -0.9, 0, 0, 0)).reshape(-1, 1)
    
    pyplot.close()
    fig = pyplot.figure(figsize=(8, 8))
    ax = fig.subplots()

    # Create stream plot
    ####################
    
    ax.add_patch(pyplot.Circle((0, 0), 1.3, fill=False, color='r', linestyle='dashed'))

    # # Create stream plot
    # ####################

    f_array = yield from polymat.to_array(f, x)
    G_array = yield from polymat.to_array(G, x)
    p_array = yield from polymat.to_array(p.eval(symbol_values), x)
    un_array = yield from polymat.to_array(u_n_modified, x)
    
    def get_x_dot(x):
        x = np.array(x).reshape(-1, 1)
        u = p_array(x)
        # u = un_array(x)
        xdot = f_array(x) + G_array(x) @ u
        return np.squeeze(xdot)

    ticksX = np.arange(-1.4, 1.4, 0.04)
    ticksY = np.arange(-1.4, 1.4, 0.04)
    n_row, n_col = len(ticksY), len(ticksX)
    X = np.matlib.repmat(ticksX, n_row, 1)
    Y = np.matlib.repmat(ticksY.reshape(-1, 1), 1, n_col)

    stream_U = np.zeros((n_row, n_col))
    stream_V = np.zeros((n_row, n_col))
    def create_stream_data():
        for row, (x_row, y_row) in enumerate(zip(X, Y)):
            for col, (x, y_val) in enumerate(zip(x_row, y_row)):
                u, v, *_ = get_x_dot(map_to_xy(x, y_val))
                stream_U[row, col] = u
                stream_V[row, col] = v

    create_stream_data()

    ax.streamplot(X, Y, stream_U, stream_V, density=[0.5, 0.7])

    # Plot Sublevel sets
    ####################
    
    ticks = np.arange(-2.1, 2.1, 0.04)
    X = np.matlib.repmat(ticks, len(ticks), 1)
    Y = X.T

    V_array = yield from polymat.to_array(V.eval(symbol_values), x)
    ZV = np.vectorize(lambda x, y: V_array(map_to_xy(x, y)))(X, Y)
    ax.contour(X, Y, ZV, [0.0], linewidths=2, colors=['#17202A'])

    B_array = yield from polymat.to_array(B.eval(symbol_values), x)
    ZB = np.vectorize(lambda x, y: B_array(map_to_xy(x, y)))(X, Y)
    ax.contour(X, Y, ZB, [0.0], linewidths=2, colors=['#17202A'])

    ax.set_xlim(-1.4, 1.4)
    ax.set_ylim(-1.4, 1.4)

    ax.set_xlabel(r'${\tilde i}_{d}$ [p.u.]')
    ax.set_ylabel(r'${\tilde i}_{q}$ [p.u.]')
    
    pyplot.show()

    return statemonad.from_(fig)

state, fig = plot_result(iter_data.symbol_values).apply(state)

In [None]:
def save_arrays(arg):
    @do()
    def gen_arrays(symbol_values):
        symbol_values = iter_data.symbol_values

        s_array = yield from polymat.to_array(s.eval(symbol_values), x)
        
        V_array = yield from polymat.to_array(V.eval(symbol_values), x)
        B_array = yield from polymat.to_array(B.eval(symbol_values), x)

        dV_array = yield from polymat.to_array(dV.eval(symbol_values), x)
        dB_array = yield from polymat.to_array(dB.eval(symbol_values), x)

        gV_array = yield from polymat.to_array(constraints['clf'].multipliers['V'].eval(symbol_values), x)
        gB_array = yield from polymat.to_array(constraints['cbf'].multipliers['B'].eval(symbol_values), x)

        arrays = {
            'V': V_array,
            'B': B_array,
            'dV': dV_array,
            'dB': dB1_array,
            'gV': gV_array,
            'gB': gB1_array,
        }

        return statemonad.from_(arrays)

    _, arrays = gen_arrays(iter_data.symbol_values).apply(state)

    file_name = '2_arrays.p'

    with open(file_name, 'wb') as file:   
        pickle.dump(arrays, file)

# Create a button to ensure that the file is not overritten by accident.
button_download = widgets.Button(description = 'Save arrays')   
button_download.on_click(save_arrays)
display(button_download)