# MPC Project 3 v2 Submission

The first code chunks will introduce all approaches for convex problems mentioned in the report in a fashion similar to the exercises.

Below that there is a wrapper function that can be used to run specific setups multiple times.

Finally, at the bottom one can find our implementation for Stochastic MPC with non-convex problems.

The group's members are:
* Benjamin Michalak, 371905
* Benno Kutschank, 374572
* Jannis Grönberg, 356198

### Imports and settings

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import matplotlib as mpl
from matplotlib.lines import Line2D
from uuid import uuid4
import random
import os
from casadi import *
from scipy.special import comb
import pickle
import imageio  # can be ignored if not installed, is only used for creating nice looking gifs
import scipy.io

# seed for reproducible results
np.random.seed(1) 

# font size like in the exercises
mpl.rcParams['font.size'] = 16

### System definition

In [None]:
# inputs
x = SX.sym("x", 2, 1)
u = SX.sym("u", 3, 1)
d = SX.sym("d", 3, 1)
w = SX.sym("w", 2, 1)

alpha = SX.sym('alpha', 2, 1)

# system
A = np.array([[ alpha[0] * 0.8511,  0],
               [0, alpha[1] * 1]])
B = np.array([[ 0.0035,  0, 0],
               [0, -5, 0]])
E = 1e-3 * np.array([[ 22.217,  1.7912, 42.212],
               [0, 0, 0]])
D = np.array([[ -1,  1, 1],
               [1, 1, 1]])
G = np.array([[ 0,  0.5, 0],
               [0, 0.5, 0]])

x_next = A@x + B@u + E@d + w
system = Function("system", [x,u,d,w,alpha], [x_next])

m_cons = D@u + G@d
mixed_constraint = Function("mixed_constraint", [u,d], [m_cons])

# constraints
lb_x = np.array([[20.0],
                 [0.0]])
ub_x = np.array([[23.0],
                 [200000.0]])
lb_u = np.array([[-2000.0],
                 [-2000.0],
                 [-2000.0]])
ub_u = np.array([[2000.0],
                 [1000.0],
                 [2000.0]])
lb_m = np.array([[0.0],
                 [0.0]])
ub_m = np.array([[np.inf],
                 [np.inf]])

# cost function and its parameters
gamma = 0.5
E_ref = 50000

cost = u[2] + gamma * (x[1] - E_ref)**2
stage_cost_fcn = Function("cost_fcn", [x,u] , [cost])

# electricity price used for deriving grid expenses
grid_price = 0.3137

### External disturbances

Assumes that the external_disturbances.mat file is present in the working directory.

In [None]:
try:    
    mat = scipy.io.loadmat('./external_disturbances.mat')
    d_T = mat['external_temperature']
    d_sr = mat['solar_radiation']
    d_int = mat['internal_gains']
    d_vals = np.concatenate([d_T, d_sr, d_int], axis=1)
    print(d_vals)
    N_sim = d_vals.shape[0]
except Exception as e:
    print(e)    

In [None]:
N_sim  # should evaluate to 145

### Uncontrolled system

Simulates the uncontrolled system with disturbances for some $\alpha$ value.

In [None]:
x_0 = np.array([22, 100000]).reshape((2,1))
u_k = np.array([[0,0,0]])  # constant u
w_k = [np.array([random.uniform(-0.1,0.1),random.uniform(-100,100)]).reshape((2,1)) for k in range(N_sim)]
nominal_alpha = np.array([0.95,0.9]).reshape((2,1))

res_x = [x_0]

for i in range(N_sim):
    d_k = np.array(d_vals[i,:]).reshape((3,1))
    x_next = system(x_0,u_k,d_k,w_k[i],nominal_alpha)
    res_x.append(x_next)
    x_0 = x_next

res_x = np.concatenate(res_x,axis=1)
    

# plot
fig, ax = plt.subplots(figsize=(10,6))

lines = ax.plot(res_x.T[:,0],color="blue")
ax.set_ylabel('Temperature in °C',color="blue");
ax.set_xlabel('Time steps');
ax = ax.twinx()
lines = ax.plot(res_x.T[:,1], color="orange")
ax.set_ylabel('SoC',color="orange");
plt.title("Uncontrolled system");

### Nominal MPC

Implements a controller while disregarding additive disturbances and assuming medium values for $\alpha$.

In [None]:
# setup

N_pred = 15  # prediction horizon

# variables for CasADi
X = SX.sym("X", (N_pred+1) * 2, 1)
U = SX.sym("U", (N_pred) * 3, 1)
D = SX.sym("D", (N_pred) * 3, 1)
W = SX.sym("W", (N_pred) * 2, 1)

In [None]:
# prediction step definition, copied and modified from exercises

J = 0
lb_X = []
ub_X = []
lb_U = []
ub_U = []
g    = [] 
lb_g = []
ub_g = []

for k in range(N_pred):
    
    x_k      = X[k*2:(k+1)*2,:]
    x_k_next = X[(k+1)*2:(k+2)*2,:]
    u_k      = U[k*3:(k+1)*3,:]
    d_k      = D[k*3:(k+1)*3,:]
    w_k      = W[k*2:(k+1)*2,:]
    
    J += stage_cost_fcn(x_k, u_k)
    
    x_k_next_calc = system(x_k,u_k,d_k,w_k,alpha)
    m_cons_k = mixed_constraint(u_k,d_k)
    
    g.append(vertcat(x_k_next - x_k_next_calc, m_cons_k))
    lb_g.append(vertcat(np.zeros((2,1)), lb_m))
    ub_g.append(vertcat(np.zeros((2,1)), ub_m))
    
    lb_X.append(lb_x)
    ub_X.append(ub_x)
    lb_U.append(lb_u)
    ub_U.append(ub_u)
    
x_terminal = X[N_pred*2:(N_pred+1)*2,:]
J += stage_cost_fcn(x_terminal, [0,0,0])  # last step has no control input
lb_X.append(lb_x)
ub_X.append(ub_x)

In [None]:
# CasADi solver
b_d = [np.array(d_vals[l,:]).reshape((3,1)) for l in range(N_pred)]
w_test = [np.array([0,0]).reshape((2,1)) for k in range(N_pred)]

lbx = vertcat(*lb_X, *lb_U)
ubx = vertcat(*ub_X, *ub_U)
x = vertcat(X,U)
g = vertcat(*g)
lbg = vertcat(*lb_g)
ubg = vertcat(*ub_g)
param_vektor = vertcat(D,alpha,W)

prob = {'f':J,'x':x,'p':param_vektor,'g':g}
solver = nlpsol('solver','ipopt',prob)

In [None]:
# MPC loop

x_0 = np.array([22, 200000]).reshape(2,1)
x_0_orig = x_0
res_x = [x_0]
res_u = []

# generate a "true" alpha value, the optimizer will use the nominal one from above
alpha_system = np.array([random.uniform(0.90,1),random.uniform(0.8,1)]).reshape((2,1))

for i in range(N_sim): 
    
    # uncertainties used in the optimizer
    w_i = [np.array([0,0]).reshape((2,1)) for k in range(N_pred)]
    alpha_i = nominal_alpha  # defined above, 0.95,0.9
    
    # true uncertainties
    alpha_k = alpha_system
    w_k = [random.uniform(-0.1,0.1),random.uniform(-100,100)]
    
    # external disturbances
    b_d = [np.array(d_vals[min(k+i,N_sim-1),:]).reshape((3,1)) for k in range(N_pred)]
    
    
    d_param = vertcat(*b_d,*alpha_i,*w_i)
    lbx[:2]=x_0
    ubx[:2]=x_0

    # solve using the nominal values
    res = solver(lbx=lbx,ubx=ubx,lbg=lbg,ubg=ubg,p=d_param);

    u_k = np.array(res['x'][(N_pred+1)*2:(N_pred+1)*2+3,:]).reshape((3,1))
    
    d_k = b_d[0]

    res_u.append(u_k)

    # simulate the system using the true values
    x_next = system(x_0,u_k,d_k,w_k,alpha_k)
    res_x.append(x_next)
    x_0 = x_next
    
# Make an array from the list of arrays:
res_x = np.concatenate(res_x,axis=1)
res_u = np.concatenate(res_u, axis=1)

In [None]:
# Plot
fig, ax = plt.subplots(3,1, figsize=(20,16), sharex=True)
ax[0].plot(res_x.T[:,0],color="blue")
ax[0].axhline(23,color="blue",linestyle="dashed")
ax[0].axhline(20,color="blue",linestyle="dashed")
ax[0].set_ylabel('Temperature in °C',color="blue")
ax[0].set_xlim(-0.1,i+1+5)
ax2 = ax[0].twinx()
ax2.plot(res_x.T[:,1],color="orange")
ax2.set_ylabel("SoC",color="orange")
ax2.set_ylim(0,200000)
ax2.axhline(E_ref,color="orange",linestyle="dashed")
ax[0].set_ylim(18,25)
lines = ax[1].plot(res_u.T)
ax[1].set_ylabel('Control inputs')
ax[1].set_xlabel('Time step')
custom_lines = [Line2D([0], [0], color="blue", lw=2),
            Line2D([0], [0], color="orange", lw=2),
            Line2D([0], [0], color="green", lw=2)]
ax[1].legend(custom_lines, ['HVAC', 'Battery', 'Grid'],loc='center left', bbox_to_anchor=(1, 0.5))
ax[1].axhline(ub_u[0],color="blue",linestyle="dashed")
ax[1].axhline(lb_u[0],color="blue",linestyle="dashed")
ax[1].axhline(ub_u[1],color="orange",linestyle="dashed")
ax[1].axhline(lb_u[1],color="orange",linestyle="dashed")
ax[1].axhline(ub_u[2],color="green",linestyle="dotted")
ax[1].axhline(lb_u[2],color="green",linestyle="dotted")

ax[0].plot(0,x_0_orig[0], 'o', color='black')
ax2.plot(0,x_0_orig[1], 'o', color='black')

ax[2].plot(d_vals[:,0],color="blue")
ax[2].plot(d_vals[:,2],color="green")
ax[2].set_xlabel('Time')
ax3 = ax[2].twinx()
ax3.plot(d_vals[:,1],color="orange")
ax[2].legend(custom_lines, ['External Temperature', 'Internal Gains', 'Solar Radiation'],loc='center left', bbox_to_anchor=(1.05, 0.5))

fig.align_ylabels()
fig.tight_layout()
plt.show()

### Stochastic MPC

In this section, we implement Stochastic MPC for convex problems.

#### Plotting wrapper

As the we also plot the state predictions, the plotting function gets quite large. Thus we put into its own wrapping function here.

In [None]:
def plot_system(i,x,u,X_stoch,U_stoch,num_plotted,open_loop,x_0_orig,E_ref):

    res_x = x
    res_u = u
    
    #Plot
    fig, ax = plt.subplots(3,1, figsize=(20,16), sharex=True)
    ax2 = ax[0].twinx()
    for j in range(num_plotted):
        X_orig = np.array([np.array(objecto).reshape((2,)) for objecto in res_x[:-1]])
        ax[0].plot(X_orig[:,0],color="blue")   
        ax2.plot(X_orig[:,1],color="orange")
        X_plot = X_stoch[j*(N_pred+1):(j+1)*(N_pred+1)]
        X_plot = np.vstack((X_orig,X_plot[1:]))
        ax[0].plot(X_plot[:,0],color="blue",alpha = 0.15)   
        ax2.plot(X_plot[:,1],color="orange",alpha=0.15)

    ax2.set_ylabel("SoC",color="orange")
    ax2.set_ylim(0,200000)
    ax2.axhline(E_ref,color="orange",linestyle="dashed")
    ax[0].axhline(23,color="blue",linestyle="dashed")
    ax[0].axhline(20,color="blue",linestyle="dashed")    
    ax[0].set_xlim(-0.1,i+1+N_pred)
    ax[0].set_ylabel('Temperature in °C',color="blue")
    ax[0].set_ylim(18,25)

    if not open_loop:
        for i in range(num_plotted):
            U_orig = np.array([np.array(objecto).reshape((3,)) for objecto in res_u])
            U_plot = np.vstack((U_orig, U_stoch[1+(N_pred-1)*i:1+(N_pred-1)*(i+1)]))
            lines = ax[1].plot(U_orig[:,0],color="blue")
            lines = ax[1].plot(U_orig[:,1],color="orange")
            lines = ax[1].plot(U_orig[:,2],color="green")
            lines = ax[1].plot(U_plot[:,0],color="blue",alpha = 0.15)
            lines = ax[1].plot(U_plot[:,1],color="orange",alpha = 0.15)
            lines = ax[1].plot(U_plot[:,2],color="green",alpha = 0.15)
    else:
        res_u_np = np.concatenate(res_u, axis=1)
        lines = ax[1].plot(res_u_np.T)
    ax[1].set_ylabel('Control inputs')
   
    custom_lines = [Line2D([0], [0], color="blue", lw=2),
                Line2D([0], [0], color="orange", lw=2),
                Line2D([0], [0], color="green", lw=2)]
    ax[1].legend(custom_lines, ['HVAC', 'Battery', 'Grid'],loc='center left', bbox_to_anchor=(1, 0.5))
    ax[1].axhline(ub_u[0],color="blue",linestyle="dashed")
    ax[1].axhline(lb_u[0],color="blue",linestyle="dashed")
    ax[1].axhline(ub_u[1],color="orange",linestyle="dashed")
    ax[1].axhline(lb_u[1],color="orange",linestyle="dashed")
    ax[1].axhline(ub_u[2],color="green",linestyle="dotted")
    ax[1].axhline(lb_u[2],color="green",linestyle="dotted")

    ax[0].plot(0,x_0_orig[0], 'o', color='black')
    ax2.plot(0,x_0_orig[1], 'o', color='black')

    ax[2].plot(d_vals[:,0],color="blue")
    ax[2].plot(d_vals[:,2],color="green")
    ax[2].set_xlabel('Time')
    ax3 = ax[2].twinx()
    ax3.plot(d_vals[:,1],color="orange")
    ax[2].legend(custom_lines, ['External Temperature', 'Internal Gains', 'Solar Radiation'],loc='center left', bbox_to_anchor=(1.05, 0.5))

    fig.align_ylabels()
    fig.tight_layout()
    return fig

#### Setup

In [None]:
# Stochastic MPC
epsilon = 0.2
beta = 1e-6
N_pred = 4
gamma = 0.5
E_ref = 50000
N_free_var = 3 * N_pred
N_stochastic = int(np.ceil(2./epsilon * np.log(1./beta) + 2.* N_free_var + 2.* N_free_var/epsilon * np.log(2./epsilon)))
N_stochastic

Switch for open and closed loop approaches

In [None]:
open_loop = True  # determines which of the two approaches from the report is used

In [None]:
# variables for CasADi
X = SX.sym("X", N_stochastic * (N_pred+1) * 2, 1)
U = SX.sym("U", (N_pred) * 3, 1)
U_closed_loop = SX.sym("U_closed_loop", 3 + (N_pred-1) * 3 * N_stochastic, 1)
D = SX.sym("D", (N_pred) * 3, 1)
W = SX.sym("W", N_stochastic * (N_pred) * 2, 1)
Alpha = SX.sym("Alpha", N_stochastic * 2, 1)

In [None]:
if open_loop:
    J = 0
    lb_X = []
    ub_X = []
    lb_U = [lb_u for value in range(N_pred)]
    ub_U = [ub_u for value in range(N_pred)]
    g    = [] 
    lb_g = []
    ub_g = []
    x_0 = np.array([22, 200000]).reshape(2,1)


    for i in range(N_stochastic):

        alpha_i = Alpha[2*i:2*i+2]

        for k in range(N_pred):

            x_k      = X[i*(N_pred+1)*2+k*2:i*(N_pred+1)*2+(k+1)*2,:]
            x_k_next = X[i*(N_pred+1)*2+(k+1)*2:i*(N_pred+1)*2+(k+2)*2,:]
            u_k      = U[k*3:(k+1)*3,:]
            d_k      = D[k*3:(k+1)*3,:]
            w_k      = W[k*2:(k+1)*2,:]


            J += stage_cost_fcn(x_k, u_k)

            x_k_next_calc = system(x_k,u_k,d_k,w_k,alpha_i)
            m_cons_k = mixed_constraint(u_k,d_k)

            g.append(vertcat(x_k_next - x_k_next_calc, m_cons_k))
            lb_g.append(vertcat(np.zeros((2,1)), lb_m))
            ub_g.append(vertcat(np.zeros((2,1)), ub_m))
            if k == 0:
                lb_X.append(x_0)
                ub_X.append(x_0)
            else:
                lb_X.append(lb_x)
                ub_X.append(ub_x)

        x_terminal = X[i*(N_pred+1)+N_pred*2:i*(N_pred+1)+(N_pred+1)*2,:]
        J += stage_cost_fcn(x_terminal, [0,0,0])
        lb_X.append(lb_x)
        ub_X.append(ub_x)
else:
    J = 0
    lb_X = []
    ub_X = []
    lb_U = [lb_u for value in range(1 + (N_pred-1) * N_stochastic)]
    ub_U = [ub_u for value in range(1 + (N_pred-1) * N_stochastic)]
    g    = [] 
    lb_g = []
    ub_g = []
    x_0 = np.array([22, 200000]).reshape(2,1)

    for i in range(N_stochastic):

        alpha_i = Alpha[2*i:2*i+2]
        for k in range(N_pred):

            if k == 0:
                lb_X.append(x_0)
                ub_X.append(x_0)
                u_k = U_closed_loop[:3]
            else:
                lb_X.append(lb_x)
                ub_X.append(ub_x)
                u_k      = U_closed_loop[i*(N_pred-1)*3+k*3:i*(N_pred-1)*3+(k+1)*3,:]


            x_k      = X[i*(N_pred+1)*2+k*2:i*(N_pred+1)*2+(k+1)*2,:]
            x_k_next = X[i*(N_pred+1)*2+(k+1)*2:i*(N_pred+1)*2+(k+2)*2,:]

            d_k      = D[k*3:(k+1)*3,:]
            w_k      = W[2*i*N_pred + k*2:2*i*N_pred+(k+1)*2,:]

            J += stage_cost_fcn(x_k, u_k)

            x_k_next_calc = system(x_k,u_k,d_k,w_k,alpha_i)
            m_cons_k = mixed_constraint(u_k,d_k)

            g.append(vertcat(x_k_next - x_k_next_calc, m_cons_k))
            lb_g.append(vertcat(np.zeros((2,1)), lb_m))
            ub_g.append(vertcat(np.zeros((2,1)), ub_m))


        x_terminal = X[i*2*(N_pred+1)+N_pred*2:i*2*(N_pred+1)+(N_pred+1)*2,:]
        J += stage_cost_fcn(x_terminal, [0,0,0])
        lb_X.append(lb_x)
        ub_X.append(ub_x)

In [None]:
# CasADi solver
lbx = vertcat(*lb_X, *lb_U)
ubx = vertcat(*ub_X, *ub_U)
if open_loop:
    x = vertcat(X,U)
else:
    x = vertcat(X,U_closed_loop)
g = vertcat(*g)
lbg = vertcat(*lb_g)
ubg = vertcat(*ub_g)
param_vektor = vertcat(D,W,Alpha)

options = {"ipopt.print_level":0,"verbose_init":0}
prob = {'f':J,'x':x,'p':param_vektor,'g':g}
solver = nlpsol('solver','ipopt',prob,options)

Warning: the code below will plot the states, control inputs and disturbances at every time step
Can be turned off by commenting the code highlighted below.

In [None]:
# MPC loop
x_0 = np.array([22, 200000]).reshape(2,1)
x_0_orig = x_0
# generates random "true" uncertainties
# open loop only works with alpha_1>=.95
# closed loop works for >=.9
alpha_system = np.array([random.uniform(0.95,1),random.uniform(0.8,1)]).reshape((2,1))
w_system = [np.array([np.random.uniform(-0.1,0.1),np.random.uniform(-100,100)]).reshape((2,1)) for k in range(N_sim)]

res_x = [x_0]
res_u = []

N_sim = 10

for i in range(N_sim): 
    
    b_d = [np.array(d_vals[min(k+i,N_sim-1),:]).reshape((3,1)) for k in range(N_pred)]
    
    # uncertainties for optimizer
    w_i = [np.array([np.random.uniform(-0.1,0.1),np.random.uniform(-100,100)]).reshape((2,1)) for k in range(N_pred*N_stochastic)]
    alpha_i = [np.array([np.random.uniform(0.95,1),np.random.uniform(0.8,1)]).reshape((2,1)) for k in range(N_stochastic)]
    
    # true uncertainties
    alpha_k = alpha_system
    w_k = w_system[i]
    
    d_param = vertcat(*b_d,*w_i,*alpha_i)
    
    for l in range(N_stochastic):
        lbx[l*(N_pred+1)*2:2+l*(N_pred+1)*2]=x_0
        ubx[l*(N_pred+1)*2:2+l*(N_pred+1)*2]=x_0

    res = solver(lbx=lbx,ubx=ubx,lbg=lbg,ubg=ubg,p=d_param);

    u_k = np.array(res['x'][N_stochastic*(N_pred+1)*2:N_stochastic*(N_pred+1)*2+3,:]).reshape((3,1))
    
    d_k = b_d[0]
    res_u.append(u_k)

    # simulate the system
    x_next = system(x_0,u_k,d_k,w_k,alpha_k)
    res_x.append(x_next)
    
    print("Found feasible solution?")
    print(solver.stats()["success"])
    
    x_0 = x_next 
    
    # state predictions
    X_stoch = res['x'][:N_stochastic*(N_pred+1)*2].full().reshape(N_stochastic*(N_pred+1), 2)
    
    if not open_loop:
        # control predictions for closed loop
        U_stoch = res['x'][N_stochastic*(N_pred+1)*2:N_stochastic*(N_pred+1)*2+3+(N_pred-1)*3*N_stochastic].full().reshape(1+(N_pred-1)*N_stochastic, 3)
    else:
        U_stoch = None
    
    
    
    num_plotted = 10  # number of predictions plotted, can be max N_stochastic, eats more performance as it gets higher

    # plot, comment out for better performance
    fig = plot_system(i,res_x,res_u,X_stoch,U_stoch,num_plotted,open_loop,x_0_orig,E_ref)
    plt.show()
    
# Make an array from the list of arrays:
res_x = np.concatenate(res_x, axis=1)
res_u = np.concatenate(res_u, axis=1)

### Wrapper for multiple runs

This wrapper can be used to run the MPC loop with specific settings multiple times. A detailed explanation of the function parameters is given further below.

In [None]:
# folder setup
try:
    os.makedirs("./plots")
except Exception as e:
    print(e)
try:
    os.makedirs("./pickle")
except Exception as e:
    print(e)

In [None]:
def run_mpc(x_0=[22,200000],N_stochastic=400,N_pred=3, N_sim=145,
            gamma=0.5,E_ref=100000,open_loop=True,
            lb_alpha=[0.9,0.8],ub_alpha=[1,1],lb_w=[-0.1,-100],
           ub_w=[0.1,100],plot=True,nominal=False):
    
    if nominal:
        N_stochastic = 1
    
    x = SX.sym("x", 2, 1)
    u = SX.sym("u", 3, 1)
    d = SX.sym("d", 3, 1)
    w = SX.sym("w", 2, 1)
    cost = u[2] + gamma * (x[1] - E_ref)**2
    stage_cost_fcn = Function("cost_fcn", [x,u] , [cost])
    
    X = SX.sym("X", N_stochastic * (N_pred+1) * 2, 1)
    U = SX.sym("U", (N_pred) * 3, 1)
    U_closed_loop = SX.sym("U_closed_loop", 3 + (N_pred-1) * 3 * N_stochastic, 1)
    D = SX.sym("D", (N_pred) * 3, 1)
    W = SX.sym("W", N_stochastic * (N_pred) * 2, 1)
    Alpha = SX.sym("Alpha", N_stochastic * 2, 1)

    run_id = str(uuid4())
    if plot:
        os.makedirs("./plots/"+run_id)
    
    if open_loop:
        J = 0
        lb_X = []
        ub_X = []
        lb_U = [lb_u for value in range(N_pred)]
        ub_U = [ub_u for value in range(N_pred)]
        g    = [] 
        lb_g = []
        ub_g = []


        for i in range(N_stochastic):

            alpha_i = Alpha[2*i:2*i+2]

            for k in range(N_pred):

                x_k      = X[i*(N_pred+1)*2+k*2:i*(N_pred+1)*2+(k+1)*2,:]
                x_k_next = X[i*(N_pred+1)*2+(k+1)*2:i*(N_pred+1)*2+(k+2)*2,:]
                u_k      = U[k*3:(k+1)*3,:]
                d_k      = D[k*3:(k+1)*3,:]
                w_k      = W[k*2:(k+1)*2,:]


                J += stage_cost_fcn(x_k, u_k)

                x_k_next_calc = system(x_k,u_k,d_k,w_k,alpha_i)
                m_cons_k = mixed_constraint(u_k,d_k)

                g.append(vertcat(x_k_next - x_k_next_calc, m_cons_k))
                lb_g.append(vertcat(np.zeros((2,1)), lb_m))
                ub_g.append(vertcat(np.zeros((2,1)), ub_m))
                lb_X.append(lb_x)
                ub_X.append(ub_x)

            x_terminal = X[i*(N_pred+1)+N_pred*2:i*(N_pred+1)+(N_pred+1)*2,:]
            J += stage_cost_fcn(x_terminal, [0,0,0])
            lb_X.append(lb_x)
            ub_X.append(ub_x)
    else:
        J = 0
        lb_X = []
        ub_X = []
        lb_U = [lb_u for value in range(1 + (N_pred-1) * N_stochastic)]
        ub_U = [ub_u for value in range(1 + (N_pred-1) * N_stochastic)]
        g    = [] 
        lb_g = []
        ub_g = []

        for i in range(N_stochastic):

            alpha_i = Alpha[2*i:2*i+2]
            for k in range(N_pred):

                if k == 0:
                    u_k = U_closed_loop[:3]
                else:
                    u_k      = U_closed_loop[i*(N_pred-1)*3+k*3:i*(N_pred-1)*3+(k+1)*3,:]

                    
                lb_X.append(lb_x)
                ub_X.append(ub_x)

                x_k      = X[i*(N_pred+1)*2+k*2:i*(N_pred+1)*2+(k+1)*2,:]
                x_k_next = X[i*(N_pred+1)*2+(k+1)*2:i*(N_pred+1)*2+(k+2)*2,:]

                d_k      = D[k*3:(k+1)*3,:]
                w_k      = W[2*i*N_pred + k*2:2*i*N_pred+(k+1)*2,:]

                J += stage_cost_fcn(x_k, u_k)

                x_k_next_calc = system(x_k,u_k,d_k,w_k,alpha_i)
                m_cons_k = mixed_constraint(u_k,d_k)

                g.append(vertcat(x_k_next - x_k_next_calc, m_cons_k))
                lb_g.append(vertcat(np.zeros((2,1)), lb_m))
                ub_g.append(vertcat(np.zeros((2,1)), ub_m))


            x_terminal = X[i*2*(N_pred+1)+N_pred*2:i*2*(N_pred+1)+(N_pred+1)*2,:]
            J += stage_cost_fcn(x_terminal, [0,0,0])
            lb_X.append(lb_x)
            ub_X.append(ub_x)
        
    # CasADi solver
    lbx = vertcat(*lb_X, *lb_U)
    ubx = vertcat(*ub_X, *ub_U)
    if open_loop:
        x = vertcat(X,U)
    else:
        x = vertcat(X,U_closed_loop)
    g = vertcat(*g)
    lbg = vertcat(*lb_g)
    ubg = vertcat(*ub_g)
    param_vektor = vertcat(D,W,Alpha)

    options = {"ipopt.print_level":0,"verbose_init":0}
    prob = {'f':J,'x':x,'p':param_vektor,'g':g}
    solver = nlpsol('solver','ipopt',prob,options)
    
    solution_found = True
    
    # MPC loop
    x_0 = np.array(x_0).reshape(2,1)
    x_0_orig = x_0
    alpha_fest = np.array([np.random.uniform(lb_alpha[0],ub_alpha[0]),np.random.uniform(lb_alpha[1],ub_alpha[1])]).reshape((2,1))
    w_i_fest = [np.array([np.random.uniform(-0.1,0.1),np.random.uniform(-100,100)]).reshape((2,1)) for k in range(N_sim)]
    res_x = [x_0]
    res_u = []


    for i in range(N_sim): 
        b_d = [np.array(d_vals[min(k+i,N_sim-1),:]).reshape((3,1)) for k in range(N_pred)]
        if not nominal:
            w_i = [np.array([np.random.uniform(lb_w[0],ub_w[0]),np.random.uniform(lb_w[1],ub_w[1])]).reshape((2,1)) for k in range(N_pred*N_stochastic)]
        else:
            w_i = [np.array([0,0]).reshape((2,1)) for k in range(N_pred*N_stochastic)]
        alpha_i = [np.array([np.random.uniform(lb_alpha[0],ub_alpha[0]),np.random.uniform(lb_alpha[1],ub_alpha[1])]).reshape((2,1)) for k in range(N_stochastic)]
        d_param = vertcat(*b_d,*w_i,*alpha_i)
        
        for l in range(N_stochastic):
            lbx[l*(N_pred+1)*2:2+l*(N_pred+1)*2]=x_0
            ubx[l*(N_pred+1)*2:2+l*(N_pred+1)*2]=x_0
            

        res = solver(lbx=lbx,ubx=ubx,lbg=lbg,ubg=ubg,p=d_param);
        
        u_k = np.array(res['x'][N_stochastic*(N_pred+1)*2:N_stochastic*(N_pred+1)*2+3,:]).reshape((3,1))

        d_k = b_d[0]
        res_u.append(u_k)

        # simulate the system
        x_next = system(x_0,u_k,d_k,w_i_fest[i],alpha_fest)
        res_x.append(x_next)
        print("Feasible solution found?")
        print(solver.stats()["success"])
        
        solution_found = solution_found and solver.stats()["success"]

        x_0 = x_next 

        X_stoch = res['x'][:N_stochastic*(N_pred+1)*2].full().reshape(N_stochastic*(N_pred+1), 2)
        if not open_loop:
            U_stoch = res['x'][N_stochastic*(N_pred+1)*2:N_stochastic*(N_pred+1)*2+3+(N_pred-1)*3*N_stochastic].full().reshape(1+(N_pred-1)*N_stochastic, 3)
        else:
            U_stoch = None


        num_plotted = 20 
        # plot
        if plot:
            fig = plot_system(i,res_x,res_u,X_stoch,U_stoch,num_plotted,open_loop,x_0_orig,E_ref)

            plt.savefig("./plots/"+run_id+"/"+str(i))
            plt.close()

    # Make an array from the list of arrays:
    res_x = np.concatenate(res_x,axis=1)
    res_u = np.concatenate(res_u, axis=1)

    return {'solution_found_all':solution_found,'x':res_x,'u':res_u,'grid_expenses':sum(res_u[:,1])*N_sim/2. *1./1000. *grid_price,'alpha':alpha_fest,'run_id':run_id,'open_loop':open_loop}

The following is an example for how to use this wrapper:

In [None]:
opt_dict_open_loop = {'x_0':[22,200000],  # x_0 is the initial state
               'N_stochastic':N_stochastic, #  N_stochastic is the number of samples for Stochastic MPC
               'N_pred':N_pred,  # N_pred is the prediction horizon
                'N_sim':N_sim,  # N_sim is the number of time steps to calculate
               'gamma':0.5,  # gamma is the penalty parameter for the reference SoC
               'E_ref':50000,  # reference SoC
               'open_loop':True,  # determines whether the open or closed loop appraoch is used
               'lb_alpha':[0.95,0.8],  # lower bounds for alpha
               'ub_alpha':[1,1],  # upper bounds for alpha
               'lb_w':[-0.1,-100],  # lower bounds for w_k
               'ub_w':[0.1,100],  # upper bounds for w_k
                      "plot":False,  # if true will create plots for every single timestep and save them in the ./plots folder under the run's uuid
                      "nominal":False}  # if true will use no disturbances in calculating a controller

# will run MPC loop with the above configuration 5 times, saving the results as pickle files in the ./pickle folder if so desired
sol_array = []
for i in range(5):
    run_id = str(uuid4())+"-open_loop_gamma_pointfive"
    run_obj = run_mpc(**opt_dict_open_loop)
    #pickle.dump(run_obj,open(os.path.join('./pickle',run_id),'wb'))
    sol_array.append(run_obj)

The following code chunk goes through every folder within the ./plots folder and creates gifs (like the ones we used in the final screencast) from the saved plot images. This can take a while (roughly 4 minutes for 6 folder of 145 images each).

In [None]:
img_names = [str(value)+".png" for value in range(N_sim)]
for directory in os.listdir("./plots"):
    images = []
    for image in img_names:
        images.append(imageio.imread("./plots/"+directory+"/"+image))
    imageio.mimsave("./plots/"+directory+"/simulation.gif",images,duration=0.05)

The following code chunk reads in the pickle files created above for use in the 2D states plots. It relies on a particular naming convention for the pickle files which should be followed in order to use this seamlessly.

In [None]:
resultate = {"open":{"zero":[],"pointfive":[],"five":[]},"closed":{"zero":[],"pointfive":[],"five":[]}}
for pickle_file in os.listdir("./pickle/"):
    if not "alt" in pickle_file and not "neu" in pickle_file:
        
        key1 = pickle_file.split("-")[5].split("_")[0]
        key2 = pickle_file.split("-")[5].split("_")[3]
        with open(os.path.join('./pickle/',pickle_file),"rb") as infile:
            temp = pickle.load(infile)
            resultate[key1][key2].append(temp)

This code chunk creates 2D states plots as seen in our report and screencast from the data from above.

In [None]:
fig, ax = plt.subplots(1, figsize=(20,16))
mpl.rcParams.update({'font.size': 22})
plt.xlim(18,25)
plt.ylim(-500,210000)
ax.axvline(ub_x[0],color="blue",linestyle="dashdot")
ax.axvline(lb_x[0],color="blue",linestyle="dashdot")
ax.axhline(ub_x[1],color="orange",linestyle="dashdot")
ax.axhline(lb_x[1],color="orange",linestyle="dashdot")
ax.axhline(50000,color="orange",linestyle="dashed")

for ele in resultate["open"]["pointfive"]:  # replace open and pointfive as needed
    falsch = False
    falsch_array = []
    for i,ele2 in enumerate(ele['x'][0]):
        if ele2>23 or ele2<20:
            falsch_array.append(i)
            falsch = True
    ax.scatter(ele['x'][0].T,ele['x'][1].T,color="green" if not falsch else "yellow", alpha = 0.1 if not falsch else 0.3)
    for j in falsch_array:
        ax.scatter(ele['x'][0][j],ele['x'][1][j],color="red")
        
for ele in resultate["open"]["five"]:
    falsch = False
    falsch_array = []
    for i,ele2 in enumerate(ele['x'][0]):
        if ele2>23 or ele2<20:
            falsch_array.append(i)
            falsch = True
    ax.scatter(ele['x'][0].T,ele['x'][1].T,color="green" if not falsch else "yellow", alpha = 0.1 if not falsch else 0.3)
    for j in falsch_array:
        ax.scatter(ele['x'][0][j],ele['x'][1][j],color="red")

ax.scatter(22,200000,color="black")
plt.xlabel("Temperature in °C")
plt.ylabel("SoC")

custom_lines = [Line2D([0], [0], marker='o', color='w',
                          markerfacecolor='black', markersize=8),
                Line2D([0], [0], marker='o', color='w',
                          markerfacecolor='white', markersize=8),
                Line2D([0], [0], marker='o', color='w',
                          markerfacecolor='green', markersize=8),
                Line2D([0], [0], marker='o', color='w',
                          markerfacecolor='yellow', markersize=8),
                Line2D([0], [0], marker='o', color='w',
                          markerfacecolor='red', markersize=8),
                Line2D([0], [0], color="blue",linestyle="-.", lw=2),
                Line2D([0], [0], color="orange",linestyle="-.", lw=2),
               Line2D([0], [0], color="orange",linestyle="dashed", lw=2)]
ax.legend(custom_lines, ['initial state','scenarios with','   no constraints violated', "   at least one constraint violated",'invalid states', "Temperature bounds","Soc bounds","reference SoC"],loc='center left', bbox_to_anchor=(1, 0.5))

plt.tight_layout()
plt.show()

The following code chunk analyzes for which percentage of scenarios within the data at least one temperature state was out of bounds.

In [None]:
total = 0
invalid = 0
for ele in resultate["closed"]["pointfive"]:  # replace closed and pointfive as needed
    total +=1
    for ele2 in ele['x'][0]:
        if ele2>23 or ele2<20:
            invalid += 1
            break
            
invalid/total

## Stochastic MPC for non-convex problems

This implements the greedy algorithm as described by Campi, Garatti & Ramponi. We also provide a MPC loop which uses this algorithm to come up with a posteriori epsilon values. Note that this expects the open loop version of the solver.

In [None]:
beta = 1e-6
sample_size = N_stochastic

In [None]:
def calculate_epsilon(x_0, u_k_old, b_d, alpha_n, w_n, sample_size, beta, tolerance = 1):
    
    uncertainties = [(alpha_n[idx], w_n[idx*N_pred:(idx+1)*N_pred]) for idx in range(len(alpha_n))]
    uncertainties_tmp = uncertainties.copy()
    
    idx = 0
    while idx < len(uncertainties):
        uncertainties_tmp.pop(idx)
        n_scenarios = len(uncertainties_tmp)
        
        # rechnen
        X_loc = SX.sym("X_loc", n_scenarios * (N_pred+1) * 2, 1)
        U_loc = SX.sym("U_loc", (N_pred) * 3, 1)
        D_loc = SX.sym("D_loc", (N_pred) * 3, 1)
        W_loc = SX.sym("W_loc", n_scenarios * (N_pred) * 2, 1)
        Alpha_loc = SX.sym("Alpha_loc", n_scenarios * 2, 1)
        
        J = 0
        lb_X = []
        ub_X = []
        lb_U = [lb_u for value in range(N_pred)]
        ub_U = [ub_u for value in range(N_pred)]
        g    = [] 
        lb_g = []
        ub_g = []
        
        for i in range(n_scenarios):

            alpha_i = Alpha_loc[2*i:2*i+2]

            for k in range(N_pred):

                x_k      = X_loc[i*(N_pred+1)*2+k*2:i*(N_pred+1)*2+(k+1)*2,:]
                x_k_next = X_loc[i*(N_pred+1)*2+(k+1)*2:i*(N_pred+1)*2+(k+2)*2,:]
                u_k      = U_loc[k*3:(k+1)*3,:]
                d_k      = D_loc[k*3:(k+1)*3,:]
                w_k      = W_loc[k*2:(k+1)*2,:]


                J += stage_cost_fcn(x_k, u_k)

                x_k_next_calc = system(x_k,u_k,d_k,w_k,alpha_i)
                m_cons_k = mixed_constraint(u_k,d_k)

                g.append(vertcat(x_k_next - x_k_next_calc, m_cons_k))
                lb_g.append(vertcat(np.zeros((2,1)), lb_m))
                ub_g.append(vertcat(np.zeros((2,1)), ub_m))
                if k == 0:
                    lb_X.append(x_0)
                    ub_X.append(x_0)
                else:
                    lb_X.append(lb_x)
                    ub_X.append(ub_x)

            x_terminal = X_loc[i*(N_pred+1)+N_pred*2:i*(N_pred+1)+(N_pred+1)*2,:]
            J += stage_cost_fcn(x_terminal, [0,0,0])
            lb_X.append(lb_x)
            ub_X.append(ub_x)
            
        lbx = vertcat(*lb_X, *lb_U)
        ubx = vertcat(*ub_X, *ub_U)
        x_loc = vertcat(X_loc,U_loc)
        g = vertcat(*g)
        lbg = vertcat(*lb_g)
        ubg = vertcat(*ub_g)
        param_vektor = vertcat(D_loc,W_loc,Alpha_loc)
        
        w_n = np.concatenate([ele[1] for ele in uncertainties_tmp],axis=1)
        alpha_n = [ele[0] for ele in uncertainties_tmp]
        
        d_param = vertcat(*b_d,*w_n,*alpha_n)

        options = {"ipopt.print_level":0,"verbose_init":0}
        prob = {'f':J,'x':x_loc,'p':param_vektor,'g':g}
        solver = nlpsol('solver','ipopt',prob,options)
        
        res = solver(lbx=lbx,ubx=ubx,lbg=lbg,ubg=ubg,p=d_param);
        sol = np.array(res['x'][n_scenarios*(N_pred+1)*2:n_scenarios*(N_pred+1)*2+3,:]).reshape((3,1))
        
        # uses a small tolerance to determine whether a solution counts as equal or not
        if abs(sol[0]-u_k_old[0])<tolerance and abs(sol[1]-u_k_old[1])<tolerance and abs(sol[2]-u_k_old[2])<tolerance:
            uncertainties.pop(idx)
            print("Removed a sample.")
        else:
            idx += 1
        uncertainties_tmp = uncertainties.copy()
     
    s_n = len(uncertainties)
    print("minimal support sub-sample of size:")
    print(s_n)
    if s_n == sample_size:
        prob = 1
    else:
        prob = 1 - (beta / comb(sample_size,s_n))**(1/(sample_size - s_n))   
    print("A posteriori epsilon:")
    print(prob)

In [None]:
# MPC loop
x_0 = np.array([22, 200000]).reshape(2,1)
x_0_orig = x_0
alpha_fest = np.array([0.95,1])
res_x = [x_0]
res_u = []

N_sim = 5

for i in range(N_sim): 
    b_d = [np.array(d_vals[min(k+i,N_sim-1),:]).reshape((3,1)) for k in range(N_pred)]
    w_i = [np.array([np.random.uniform(-0.1,0.1),np.random.uniform(-100,100)]).reshape((2,1)) for k in range(N_pred*N_stochastic)]
    alpha_i = [np.array([np.random.uniform(0.95,1),np.random.uniform(0.9,1)]).reshape((2,1)) for k in range(N_stochastic)]
    d_param = vertcat(*b_d,*w_i,*alpha_i)
    for l in range(N_stochastic):
        lbx[l*(N_pred+1)*2:2+l*(N_pred+1)*2]=x_0
        ubx[l*(N_pred+1)*2:2+l*(N_pred+1)*2]=x_0

    res = solver(lbx=lbx,ubx=ubx,lbg=lbg,ubg=ubg,p=d_param);  # solver expects open loop here
    u_k = np.array(res['x'][N_stochastic*(N_pred+1)*2:N_stochastic*(N_pred+1)*2+3,:]).reshape((3,1))
    
    calculate_epsilon(x_0, u_k, b_d,alpha_i,w_i,N_stochastic,sample_size)
    
    d_k = b_d[0]
    res_u.append(u_k)

    # simulate the system
    x_next = system(x_0,u_k,d_k,w_i[0],alpha_fest)
    res_x.append(x_next)
    
    x_0 = x_next 
    
# Make an array from the list of arrays:
res_x = np.concatenate(res_x,axis=1)
res_u = np.concatenate(res_u, axis=1)