In [None]:
# notebook settings
%load_ext autoreload
%autoreload 2
# %matplotlib notebook

# external imports
import os
import numpy as np
import sympy as sp
import matplotlib.pyplot as plt

# internal imports
from pympc.control.hybrid_benchmark.mixed_logical_dynamical_system import MixedLogicalDynamicalSystem
from pympc.control.hybrid_benchmark.controller_mld import HybridModelPredictiveController
from pympc.plot import plot_input_sequence, plot_state_trajectory, plot_output_trajectory

In [None]:
# system numeric parameters
mc = 1.   # mass cart
mp = 1.   # mass pole
l  = 1.   # length pole
d  = .5   # distance walls from origin
k  = 100. # stiffness contacts
nu = 30.  # damping contacts
g  = 10.  # gravity acceleration
h  = .05  # integration time step

# state bounds
x_max = np.array([d, np.pi/8., 2., 1.])
x_min = - x_max

# input bounds
uc_max = np.array([2.])
uc_min = - uc_max

In [None]:
# symbolic states
x = sp.Matrix(sp.symbols('q t qd td'))

# symbolic continuous inputs
uc = sp.Matrix([sp.symbols('u')])

# symbolic binary inputs
ub = sp.zeros(0,1)

# symbolic forces (left, right)
sc = sp.Matrix(sp.symbols('f_l f_r'))

# symbolic auxiliary binaries (left, right)
sb = sp.Matrix(sp.symbols('el_l dam_l el_r dam_r'))

# dynamics
x2_dot = x[1]*g*mp/mc + uc[0]/mc
x3_dot = x[1]*g*(mc+mp)/(l*mc) + uc[0]/(l*mc) + (sc[1] - sc[0])/(l*mp)
x_next = sp.Matrix([
    x[0] + h*x[2],
    x[1] + h*x[3],
    x[2] + h*x2_dot,
    x[3] + h*x3_dot
])

In [None]:
# enforce state constraints
state_upper_bound = x - x_max.reshape(x_max.size, 1)
state_lower_bound = x_min.reshape(x_min.size, 1) - x

# enforce input constraints
input_upper_bound = uc - uc_max.reshape(uc_max.size, 1)
input_lower_bound = uc_min.reshape(uc_min.size, 1) - uc

In [None]:
f = { # force
    'l': sc[0],
    'r': sc[1]
}
el = { # inidicator for penetation
    'l': sb[0],
    'r': sb[2]
}
dam = { # inidicator for pulling
    'l': sb[1],
    'r': sb[3]
}
p = { # penetration
    'l': - d - x[0] + l*x[1],
    'r': x[0] - l*x[1] - d
}
p_dot = { # relative velocity
    'l': - x[2] + l*x[3],
    'r': x[2] - l*x[3]
}
p_min = { # min penetration
    'l': - d - x_max[0] + l*x_min[1],
    'r': x_min[0] - l*x_max[1] - d
}
p_max = { # max penetration
    'l': - d - x_min[0] + l*x_max[1],
    'r': x_max[0] - l*x_min[1] - d
}
p_dot_min = { # min relative velocity
    'l': - x_max[2] + l*x_min[3],
    'r': x_min[2] - l*x_max[3]
}
p_dot_max = { # max relative velocity
    'l': - x_min[2] + l*x_max[3],
    'r': x_max[2] - l*x_min[3]
}
f_min = { # min force
    'l': k*p_min['l'] + nu*p_dot_min['l'],
    'r': k*p_min['r'] + nu*p_dot_min['r']
}
f_max = { # max force
    'l': k*p_max['l'] + nu*p_dot_max['l'],
    'r': k*p_max['r'] + nu*p_dot_max['r']
}

In [None]:
# contact constraints big-M method
contacts = []
for lr in ['l', 'r']:
    
    # el = 1 if p >= 0, el = 0 if otherwise
    contacts.append(sp.Matrix([p_min[lr] * (1. - el[lr]) - p[lr]]))
    contacts.append(sp.Matrix([p[lr] - p_max[lr] * el[lr]]))
    
    # dam = 1 if k p + nu p_dot >= 0, dam = 0 if otherwise
    contacts.append(sp.Matrix([f_min[lr] * (1. - dam[lr]) - k*p[lr] - nu*p_dot[lr]]))
    contacts.append(sp.Matrix([k*p[lr] + nu*p_dot[lr] - f_max[lr] * dam[lr]]))
    
    # el = 0 or dam = 0 implies f = 0
    contacts.append(sp.Matrix([- f[lr]]))
    contacts.append(sp.Matrix([f[lr] - f_max[lr]*el[lr]]))
    contacts.append(sp.Matrix([f[lr] - f_max[lr]*dam[lr]]))
    
    # el = dam = 1 implies f = k p + nu p_dot
    contacts.append(sp.Matrix([k*p[lr] + nu*p_dot[lr] + nu*p_dot_max[lr]*(el[lr]-1.) - f[lr]]))
    contacts.append(sp.Matrix([f[lr] - k*p[lr] - nu*p_dot[lr] - f_min[lr]*(dam[lr]-1.)]))

In [None]:
# gather constraints
constraints = sp.Matrix([
    state_upper_bound,
    state_lower_bound,
    input_upper_bound,
    input_lower_bound,
    sp.Matrix(contacts)
])

In [None]:
# construct MLD system
variables = {'x':x, 'uc':uc, 'ub':ub, 'sc':sc, 'sb':sb}
MLD = MixedLogicalDynamicalSystem.from_symbolic(variables, x_next, constraints)

In [None]:
# controller parameters
N = 30
weight_matrices = {}
weight_matrices['Q'] = 10.*np.eye(MLD.nx)*h
weight_matrices['R'] = 1.*np.eye(MLD.nuc)*h
weight_matrices['P'] = weight_matrices['Q']#*100.

# hybrid controller
controller = HybridModelPredictiveController(MLD, N, weight_matrices)

In [None]:
x0 = np.array([0., 0., 1., 0.])

# solve in closed loop

In [None]:
# solve in open loop
cost_ff, sol_ff, n_nodes_ff, leaves_ff = controller.feedforward(x0)

In [None]:
# solve in closed loop
N_sim = 50
u_sim = []
x_sim = [x0]
warm_start = None
for t in range(N_sim):
    print('Time step ' + str(t) + '.\r'),
    cost, sol, n_nodes, leaves = controller.feedforward(
        x_sim[-1],
        warm_start=warm_start,
        printing_period=None
    )
    u_sim.append(sol['primal']['uc'][0])
    x_sim.append(sol['primal']['x'][1])
    stage_variables = {
        'x_0' : sol['primal']['x'][0],
        'uc_0' : sol['primal']['uc'][0],
        'ub_0' : sol['primal']['ub'][0],
        'sc_0' : sol['primal']['sc'][0],
        'sb_0' : sol['primal']['sb'][0],
        'e_0' : np.zeros(MLD.nx)
    }
    warm_start = controller.construct_warm_start(leaves, stage_variables)

In [None]:
fig, axes = plt.subplots(2, sharex=True, figsize=(6,3.7))
t = range(N_sim+1)
u_ff = sol_ff['primal']['uc']
axes[0].step(t, [uc_min[0]]*(N_sim+1), label=r'Force bound', color='#d62728', linestyle='-.')
axes[0].step(range(N+1), [u_ff[0]] + u_ff, label=r'Force on cart (open loop)', color='#ff7f0e', linestyle='--')
axes[0].step(t, [u_sim[0]] + u_sim, label=r'Force on cart (closed loop)', color='#1f77b4')
handles, labels = axes[0].get_legend_handles_labels()
order = [2,1,0]
axes[0].legend([handles[i] for i in order],[labels[i] for i in order],loc='lower center')
axes[0].set_ylabel(r'$u_{1}$')
axes[0].set_xlim((0,N_sim))

C = np.array([[1., -l, 0., 0.]])
y_sim = [C.dot(x) for x in x_sim]
y_ff = [C.dot(x) for x in sol_ff['primal']['x']]
axes[1].step(t,[d]*(N_sim+1), label=r'Right wall', color='#2ca02c', linestyle='-.')
axes[1].plot(range(N+1), y_ff, label=r'Position tip pole (open loop)', color='#ff7f0e', linestyle='--')
axes[1].plot(t,y_sim, label=r'Position tip pole (closed loop)', color='#1f77b4')
axes[1].set_ylabel(r'$x_{1}-lx_{2}$')
axes[1].set_xlabel(r'Time step $\tau$')
handles, labels = axes[1].get_legend_handles_labels()
order = [2,1,0]
axes[1].legend([handles[i] for i in order],[labels[i] for i in order],loc='lower center')

# axes[1].legend(loc='lower center', ncol=2)
axes[0].grid(True, color=np.ones(3)*.85)
axes[1].grid(True, color=np.ones(3)*.85)

axes[0].set_xticks([i*5 for i in range(11)])
# axes[0].set_yticks([i*1.-2. for i in range(5)])
axes[1].set_yticks([i*.1 for i in range(7)])

fig.tight_layout()
# plt.savefig('simulation.pdf', bbox_inches='tight')

# Nominal case

In [None]:
N_sim = 50
nodes = []
nodes_ws = []
len_ws = []
x_sim = [x0]
ws = None

for t in range(N_sim):

    print 'Time step %d: ' % t,

    # solve without warm start
    cost, sol, n_nodes, leaves = controller.feedforward(
        x_sim[-1],
        printing_period=None
    )
    nodes.append(n_nodes)
    print n_nodes,

    # solve with warm start
    cost_ws, sol_ws, n_nodes_ws, leaves_ws = controller.feedforward(
        x_sim[-1],
        printing_period=None,
        warm_start=ws
    )
    nodes_ws.append(n_nodes_ws)
    print n_nodes_ws,

    # generate warm start
    stage_variables = {
        'x_0' : sol['primal']['x'][0],
        'uc_0' : sol['primal']['uc'][0],
        'ub_0' : sol['primal']['ub'][0],
        'sc_0' : sol['primal']['sc'][0],
        'sb_0' : sol['primal']['sb'][0],
        'e_0' : np.zeros(MLD.nx)
    }
    ws = controller.construct_warm_start(leaves_ws, stage_variables)
    len_ws.append(len(ws))
    print len(ws)

    # update state
    x_sim.append(sol['primal']['x'][1])

np.save('nodes', nodes)
np.save('nodes_ws', nodes_ws)
np.save('len_ws', len_ws)
np.save('errors', errors)

# statistical analysis

In [None]:
e_sd = 0.001# 0.001, 0.003, 0.01
N_sim = 50
N_samples = 100
i_0 = 0

if i_0 == 0:
    nodes = []
    nodes_ws = []
    len_ws = []
    errors = []
else:
    nodes = list(np.load('nodes_sd_%.3f.npy'%e_sd))
    nodes_ws = list(np.load('nodes_ws_sd_%.3f.npy'%e_sd))
    len_ws = list(np.load('len_ws_sd_%.3f.npy'%e_sd))
    errors =list( np.load('errors_sd_%.3f.npy'%e_sd))

with open('cart_pole_with_damped_walls_mld_sd_%.3f.log'%e_sd, 'w') as f:

    f.write('Error standard deviation %.3f\n\n'%e_sd) 
    f.flush()

    i = i_0
    
    while len(nodes) < N_samples:
        np.random.seed(i)

        f.write('\nRollout %d\n'%len(nodes))
        f.write('Iteration n.%d\n\n'%i)
        f.flush()
        rollout_success = True

        nodes_i = []
        nodes_ws_i = []
        len_ws_i = []
        errors_i = []

        x_sim = [x0]
        ws = None

        for t in range(N_sim):
            f.write('Time step %d '%t)
            f.flush()

            # solve without warm start
            try:
                cost, sol, n_nodes, leaves = controller.feedforward(
                    x_sim[-1],
                    printing_period=None
                )
            except:
                rollout_success = False
                f.write('\nUnseccessful rollout')
                break
            nodes_i.append(str(n_nodes) + ' ')
            f.write(str(n_nodes) + ' ')
            f.flush()

            # solve with warm start
            try:
                cost_ws, sol_ws, n_nodes_ws, leaves_ws = controller.feedforward(
                    x_sim[-1],
                    warm_start=ws,
                    printing_period=None
                )
            except:
                rollout_success = False
                f.write('\nUnseccessful rollout')
                break
            nodes_ws_i.append(n_nodes_ws)
            f.write(str(n_nodes_ws) + ' ')
            f.flush()

            # check results
            if cost is None:
                break
            assert np.isclose(cost, cost_ws)

            # generate random error of specified norm
            e_t = e_sd*np.multiply(np.random.randn(MLD.nx), x_max)
            errors_i.append(e_t)

            #generate warm start
            stage_variables = {
                'x_0' : sol['primal']['x'][0],
                'uc_0' : sol['primal']['uc'][0],
                'ub_0' : sol['primal']['ub'][0],
                'sc_0' : sol['primal']['sc'][0],
                'sb_0' : sol['primal']['sb'][0],
                'e_0' : e_t
            }
            ws = controller.construct_warm_start(leaves_ws, stage_variables)
            len_ws_i.append(len(ws))
            f.write(str(len(ws)) + ' ')
            f.flush()

            # update state
            x_sim.append(sol['primal']['x'][1] + e_t)
            f.write('%.3f'%np.linalg.norm(e_t) + ' ' + str(e_t) + '\n')
            f.flush()

        if rollout_success:
            nodes.append(nodes_i)
            nodes_ws.append(nodes_ws_i)
            len_ws.append(len_ws_i)
            errors.append(errors_i)
            np.save('nodes_sd_%.3f'%e_sd, nodes)
            np.save('nodes_ws_sd_%.3f'%e_sd, nodes_ws)
            np.save('len_ws_sd_%.3f'%e_sd, len_ws)
            np.save('errors_sd_%.3f'%e_sd, errors)
        else:
            controller = HybridModelPredictiveController(MLD, N, weight_matrices)

        i += 1