# Discrete SOL

In [1]:
from __future__ import division
import numpy as np
from vpython import *

import matplotlib.pyplot as plt

from math import atan2
import os
from shutil import copy
# from functools import partial

try:
    import tkinter as tk
except:
    import tk
from pathlib import Path

import scipy as sp

<IPython.core.display.Javascript object>

In [2]:
class SimResults():
    def __init__(self,t,Lib,DB,SysID,Ctrl,output_dir_path,select={'states':1,'value':1,'P':1,'error':1}, demand_flag = False):
        self.t=t
        len_t=len(t)
        self.Lib=Lib
        self.DB=DB
        self.SysID=SysID
        self.Ctrl=Ctrl
        self.x_s_history=np.zeros((self.Lib.n,len_t))
        self.u_history=np.zeros((self.Lib.m,len_t))
        self.P_history=np.zeros((len_t,self.Lib._Phi_dim,self.Lib._Phi_dim))
        self.V_history=np.zeros((len_t))
        self.error_history=np.zeros((len_t))
        self.select=select
        self.output_dir_path=output_dir_path
        self.demand_flag = demand_flag
        self.demand = np.zeros((len_t))
        self.pallet=['r','g','b','m','#E67E22','#1F618D']
    def record(self,i,x_s,u,P,V,error, demand = 0):
        self.x_s_history[:,i]=x_s
        self.u_history[:,i]=u
        self.P_history[i]=P
        self.V_history[i]=V
        self.error_history[i]=error
        self.demand[i] = demand
    def graph(self,j,i):
        px=self.Lib._Phi_dim
        if self.select['P']:
            fig = plt.figure()
            for ii in range(px):
                for jj in range(px):
                    plt.plot(self.t[:i],self.P_history[:i,ii,jj],'g')
            plt.savefig(self.output_dir_path+'/fig_P{}.png'.format(j))
            plt.close(fig)
            plt.show()
        if self.select['states']:
            fig1 = plt.figure()
            legend_1 = []
            for im in range(self.Lib.m):
                plt.plot(self.t[:i], self.u_history[im,:i],   self.pallet[im%len(self.pallet)]+'--')
                legend_1.append("u"+str(im))
            for ii in range(self.Lib.n):
                plt.plot(self.t[:i], self.x_s_history[ii,:i], self.pallet[ii%len(self.pallet)])
                legend_1.append("x"+str(ii))
            plt.legend(legend_1, loc=1)
            plt.xlabel('t (sec)')
            plt.ylabel('States and Control')
            plt.ylim((-10, 10))
            plt.grid(color='k', linestyle=':', linewidth=1)
            plt.savefig(self.output_dir_path+'/fig_states_control{}.png'.format(j))
            plt.close(fig1)
            plt.show()
            fig0, axs = plt.subplots(3, 1)
            b1,=axs[0].plot(self.t[:i],self.V_history[:i],'b')
            axs[0].set_ylabel('Value')
            axs[0].grid(color='k', linestyle=':', linewidth=1)
            for ii in range(px):
                for jj in range(px):
                    axs[1].plot(self.t[:i],self.P_history[:i,ii,jj],'g')
            axs[1].set_ylabel('Parameters')
            axs[1].grid(color='k', linestyle=':', linewidth=1)
            b1,=axs[2].plot(self.t[:i],self.error_history[:i],'r')
            axs[2].set_ylabel('Error')
            axs[2].set_ylim([0, 1])
            axs[2].grid(color='k', linestyle=':', linewidth=1)
            plt.tight_layout()
            plt.savefig(self.output_dir_path+'/fig_states_control_Value_Param_Error{}.png'.format(j))
            plt.close(fig0)
            plt.show()
        if self.demand_flag:
            fig4 = plt.figure()
            plt.plot(self.t[:i],self.demand[:i],'y')
            plt.savefig(self.output_dir_path+'/fig_demand{}.png'.format(j))
            plt.close(fig4)

        if self.select['error']:
            fig2 = plt.figure()
            plt.plot(self.t[:i],self.error_history[:i],'g')
            plt.ylim((0,200))
            plt.savefig(self.output_dir_path+'/fig_error{}.png'.format(j))
            plt.close(fig2)
            plt.show()
        if self.select['value']:
            fig3 = plt.figure()
            plt.plot(self.t[:i],self.V_history[:i],'b')
            plt.tight_layout()
            plt.savefig(self.output_dir_path+'/fig_value{}.png'.format(j))
            plt.close(fig3)
            plt.show()
    def printout(self,j):
        print('Episode {}:'.format(j+1))
        if self.DB.db_overflow:
            print('Number of samples in database : ',self.DB.db_dim)
        else:
            print('Number of samples in database : ',self.DB.db_index)
        chosen_basis_label=self.Lib._Phi_lbl
        for ii in range(self.Lib.n):
            handle_str='x_dot({}) = '.format(ii+1)
            for jj in range(self.DB.Theta_dim):
                if self.SysID.Weights[ii,jj]!=0:
                    if jj<self.Lib._Phi_dim:
                        handle_str=handle_str+(' {:7.3f}*{} '.format(self.SysID.Weights[ii,jj],chosen_basis_label[jj]))
                    elif jj>=self.Lib._Phi_dim:
                        handle_str=handle_str+(' {:7.3f}*{}*u{} '.format(self.SysID.Weights[ii,jj],chosen_basis_label[jj%self.Lib._Phi_dim],jj//self.Lib._Phi_dim))
            print(handle_str)
        #print: obtained value function
        handle_str='V(x) = '
        for ii in range(self.Lib._Phi_dim):
            for jj in range(ii+1):
                if (self.Ctrl.P[ii,jj]!=0):
                    if (ii==jj):
                        handle_str=handle_str+'{:7.3f}*{}^2'.format(self.Ctrl.P[ii,jj],chosen_basis_label[jj])
                    else:
                        handle_str=handle_str+'{:7.3f}*{}*{}'.format(2*self.Ctrl.P[ii,jj],chosen_basis_label[ii],chosen_basis_label[jj])
        print(handle_str)
        print("% of non-zero elements in P: {:4.1f} %".format(100*np.count_nonzero(self.Ctrl.P)/(self.Lib._Phi_dim**2)))

class Objective():
    def __init__(self,Q,R,gamma):
        self.gamma=gamma
        self.Q=Q
        self.R=R
    def stage_cost(self,x,u):
        return np.matmul(np.matmul(x,self.Q),x)+np.matmul(np.matmul(u,self.R),u)
    
class Discrete_controller():
    def __init__(self, h, Objective, Lib, dare_flag = True):        
        self.Objective=Objective
        self.R = self.Objective.R
        self.Lib=Lib
        self.Qb=np.zeros((self.Lib._Phi_dim,self.Lib._Phi_dim))
        const = int('1' in Lib.chosen_bases)
        self.Qb[const:self.Lib.n+const,const:self.Lib.n+const]=self.Objective.Q
        self.P=self.Qb
        self.h=h
        self.dare_flag = dare_flag
        self.omega = np.zeros( (self.Lib.n, self.Lib._Phi_dim) )
        self.omega[0:self.Lib.n, 1:self.Lib.n+1 ] = np.eye(self.Lib.n)

    def precompute_u( self,x,Wt ):
        self.Phi_x  = self.Lib._Phi_(x)
        self.pPhi_x = self.Lib._pPhi_(x)
        W=Wt[:,:self.Lib._Phi_dim]
        w_phi = np.zeros( (self.Lib.n, self.Lib.m) )
        for im in range(self.Lib.m):
            w_phi[:,im] = np.matmul( Wt[:,self.Lib._Phi_dim*(im+1):self.Lib._Phi_dim*(im+2)], self.Phi_x )
        self.B = np.matmul( self.pPhi_x, w_phi)
        self.A = np.eye(self.Lib._Phi_dim) + np.matmul(self.pPhi_x, (W - self.omega))
        self.G = np.matmul( self.B.T, self.P )
        self.k_star = -np.matmul( np.linalg.pinv( self.R+np.matmul(self.G, self.B) ), np.matmul(self.G, self.A) )  # (R + BPB)^-1 BPA

    def update_P(self, x, Wt, sparsify = False):
        self.precompute_u(x, Wt)
        if self.dare_flag:
            self.P = sp.linalg.solve_discrete_are(self.A, self.B, self.Qb, self.R)
        else:
            self.P = (self.A.T).dot(self.P).dot(self.A) + self.Qb + (self.A.T).dot(self.G.T).dot(self.k_star)
        if (sparsify):
            absPk=np.absolute(self.P)
            maxP=np.amax(absPk)
            small_index = absPk<(0.001*maxP)
            self.P[small_index]=0    
    
    def calculate(self, x, Wt, u_lim):
        self.precompute_u(x, Wt)
        u = np.matmul( self.k_star, self.Phi_x)
        u = np.clip(u, -u_lim,u_lim)
        return u
    def value(self):
        return np.matmul(np.matmul(self.Phi_x,self.P),self.Phi_x)

class new_SysID():
    def __init__(self,select_ID_algorithm,Database,Weights,Lib, lam = 0.01):
        self.ID_alg=select_ID_algorithm
        self.DB=Database
        self.Weights=Weights
        self.Lib=Lib
        self.Theta=np.zeros((self.DB.Theta_dim))
        self.P_rls = np.zeros((self.Lib.n,self.DB.Theta_dim,self.DB.Theta_dim))
        self.lam = lam
        for i in range(self.Lib.n):
            self.P_rls[i]=np.eye(self.DB.Theta_dim)*1000

    def update(self,x,X_next,u):
        if self.ID_alg['SINDy']:
            lam= self.lam
            if self.DB.db_overflow:
                self.Weights=(self.SINDy(self.DB.db_X_next,self.DB.db_Theta,lam))
            else:
                self.Weights=(self.SINDy(self.DB.db_X_next[:,:self.DB.db_index],self.DB.db_Theta[:,:self.DB.db_index],lam))
        elif self.ID_alg['RLS']:
            _Phi_=self.Lib._Phi_(x)
            self.Theta[:self.Lib._Phi_dim]=_Phi_
            for im in range(self.Lib.m):
                self.Theta[self.Lib._Phi_dim*(im+1):self.Lib._Phi_dim*(im+2)]=_Phi_*u[im]
            #for i in range(self.Lib.n):
            self.Weights,self.P_rls[0] = self.identification_RLS(self.Weights, self.P_rls[0], X_next, self.Theta)
        elif self.ID_alg['LS']:
            Phi, X_dot = self.DB.read()
            self.Weights = np.dot( X_dot, np.linalg.pinv(Phi) )
        elif self.ID_alg['GD']:
            lam = self.lam
            prediction = self.evaluate( x, u )
            self.Weights -= lam*np.outer( (prediction-X_next), self.Theta )
        return self.Weights
    
    def _reg_(self, a, b):
        return np.linalg.lstsq(a.T, b.T,rcond=None)[0].T

    def SINDy(self, X_next, Theta, lam=0.01):
        W = self._reg_(Theta,X_next) # initial guess: Least-squares
        for k in range(10):
            smallinds = np.absolute(W)<lam
            W[smallinds]=0
            for ind in range(X_next.shape[0]):
                biginds =np.invert(smallinds[ind,...])
                W[ind,biginds]= self._reg_(Theta[np.where(biginds)[0].T,...],X_next[ind,...])
        return W
    
    def identification_RLS(self,W,S,X_next,Theta,forget_factor=0.9):
        g=np.matmul(S,Theta)/(forget_factor+np.matmul(np.matmul(Theta.T,S),Theta)) 
        S=(1/forget_factor)*(S-np.matmul(g.reshape(S.shape[0],1),np.matmul(Theta.T,S).reshape(1,S.shape[0])) )
        W=W+np.matmul( (X_next-np.matmul(W,Theta)).reshape(len(X_next),1) ,np.matmul(Theta.T, S).reshape(1,len(Theta)))
        return [W,S]

    def evaluate(self,x,u):
        _Phi_=self.Lib._Phi_(x)
        self.Theta[:self.Lib._Phi_dim]=_Phi_
        for im in range(self.Lib.m):
            self.Theta[self.Lib._Phi_dim*(im+1):self.Lib._Phi_dim*(im+2)]=_Phi_*u[im]
        return np.matmul(self.Weights,self.Theta)
    
    def save(self):
        if self.ID_alg['RLS']:
            if (self.DB.save):
                np.save(self.DB.output_dir_path+'/Weights.npy', self.Weights)
                np.save(self.DB.output_dir_path+'/P_rls.npy', self.P_rls)

class Library():
    def __init__(self,chosen_bases,measure_dim,m, x_ref):
        self.chosen_bases=chosen_bases
        self.n=measure_dim
        self.m=m
        self.x_ref = x_ref
        #library of bases
        self.lib={'1':lambda x:[1],\
                 'x':lambda x:x-self.x_ref,\
                 'sqrtx':lambda x:np.sqrt( np.abs(x)),\
                 'x^2':lambda x: x**2,\
                 'x^3':lambda x: x**3,\
                 'sinx':lambda x:np.sin(x),\
                 '(sinx)^2':lambda x:np.sin(x)**2,\
                 'cosx':lambda x:np.cos(x),\
                 '(cosx)^2':lambda x:np.cos(x)**2,\
                 'xx':lambda x:self.build_product(x)}
        #library of the corresponding gradients
        self.plib={'1':lambda x:x*0,\
                  'x':lambda x:np.diag(x**0),\
                  'sqrtx':lambda x:np.diag(0.5/np.sqrt( np.abs(x))),\
                  'x^2':lambda x: np.diag(2*x),\
                  'x^3':lambda x: np.diag(3*(x**2)),\
                  'sinx':lambda x:np.diag(np.cos(x)),\
                  '(sinx)^2':lambda x:np.diag(np.multiply(2*np.sin(x),np.cos(x))),\
                  'cosx':lambda x:np.diag(-np.sin(x)),\
                  '(cosx)^2':lambda x:np.diag(np.multiply(-2*np.cos(x),np.sin(x))),\
                  'xx':lambda x:self.build_pproduct(x)}
        #library of the corresponding labels
        self.lib_labels={'1':'1',\
                        'x':self.build_lbl('x'),\
                        'sqrtx':self.build_lbl('sqrtx'),\
                        'x^2':self.build_lbl('x^2'),\
                        'x^3':self.build_lbl('x^3'),\
                        'sinx':self.build_lbl('sinx'),\
                        '(sinx)^2':self.build_lbl('(sinx)^2'),\
                        'cosx':self.build_lbl('cosx'),\
                        '(cosx)^2':self.build_lbl('(cosx)^2'),\
                        'xx':self.build_lbl_product('xx')}
        self.lib_dims={'1':1,\
                        'x':self.n,\
                        'sqrtx':self.n,\
                        'x^2':self.n,\
                        'x^3':self.n,\
                        'sinx':self.n,\
                        '(sinx)^2':self.n,\
                        'cosx':self.n,\
                        '(cosx)^2':self.n,\
                        'xx':(self.n**2-self.n)/2}

        self._Phi_lbl=[]
        for i in self.chosen_bases:
            self._Phi_lbl.extend(self.lib_labels[i])
        self._Phi_dim=len(self._Phi_lbl)
        #reserve the memeory required to evaluate Phi
        self._Phi_res=np.zeros((self._Phi_dim))
        #reserve the memeory required to evaluate pPhi
        self._pPhi_res=np.zeros((self._Phi_dim,self.n))

    def build_product(self,x):
        function=np.zeros((int((self.n**2-self.n)/2)))
        ind=0
        for i in range(self.n):
            for j in range(i+1,self.n):
                  function[ind]=x[i]*x[j]
                  ind+=1
        return function
    def build_pproduct(self,x):
        g=np.zeros((int((self.n**2-self.n)/2),self.n))
        ind=0
        for i in range(self.n):
            for j in range(i+1,self.n):
                g[ind][i]=x[j]
                g[ind][j]=x[i]
                ind+=1
        return g
    def build_lbl(self,func_name):
        lbl=[]
        for i in range(self.n):
            index=func_name.find('x')
            lbl.append(func_name[:index+1]+'({})'.format(i+1)+func_name[index+1:])
        return lbl
    def build_lbl_product(self,func_name):
        lbl=[]
        for i in range(self.n):
            for j in range(i+1,self.n):
                index1=func_name.find('x')
                index2=func_name.find('x',index1+1)
                lbl.append(func_name[:index1+1]+'({})'.format(i+1)+func_name[index1+1:index2+1]+'({})'.format(j+1)+func_name[index2+1:])
        return lbl
    def _Phi_(self,x):
        i=0
        for key in self.chosen_bases:
            temp=int(self.lib_dims[key])
            self._Phi_res[i:i+temp]=self.lib[key](x)
            i+=temp
        return self._Phi_res
    def _pPhi_(self,x):
        i=0
        for key in self.chosen_bases:
            temp=int(self.lib_dims[key])
            self._pPhi_res[i:i+temp,:]=self.plib[key](x)
            i+=temp
        return self._pPhi_res

class Database():
    def __init__(self,db_dim,Theta_dim,Lib,output_dir_path = False, load=True,save=True):
        self.db_dim=db_dim
        if output_dir_path:
            self.output_dir_path=output_dir_path
        else:
            self.output_dir_path = 'inside'
        self.Lib=Lib
        #self.n=self.Lib.n
        self._Phi_dim=self.Lib._Phi_dim
        self.Theta_dim=Theta_dim
        self.save=save
        self.load=load
        if load & os.path.exists(self.output_dir_path+'/db_Theta.npy'):
            self.db_Theta=np.load(self.output_dir_path+'/db_Theta.npy')
            self.db_X_next=np.load(self.output_dir_path+'/db_X_next.npy')
            self.db_overflow=np.load(self.output_dir_path+'/db_overflow.npy')
            #print(self.db_X_next[1,:10].T)
            print('Theta_dict:',self.db_Theta)
            self.db_index=np.load(output_dir_path+'/db_index.npy')
            self.db_overflow=np.load(output_dir_path+'/db_overflow.npy')
        else:
            self.Lib=Lib
            self.db_Theta=np.zeros((Theta_dim,self.db_dim))
            self.db_X_next=np.zeros((self.Lib.n,self.db_dim))
            self.db_overflow=False
            self.db_index=0
    def add(self,x,X_next,u):
        self.db_X_next[:,self.db_index]=X_next
        _Phi_=self.Lib._Phi_(x)
        self.db_Theta[:self._Phi_dim,self.db_index]=_Phi_
        for im in range(self.Lib.m):
            self.db_Theta[self._Phi_dim*(im+1):self._Phi_dim*(im+2),self.db_index]=_Phi_*u[im]
        self.db_index+=1
        if self.db_index>(self.db_dim-1):
            self.db_overflow=True
            self.db_index=0
    def read(self):
        if self.db_overflow:
            db=[self.db_Theta,self.db_X_next]
        else:
            db=[self.db_Theta[:,:self.db_index],self.db_X_next[:,:self.db_index]]
        return db
    def DB_save(self):
        if self.save:
            np.save(self.output_dir_path+'/db_Theta.npy', self.db_Theta)
            np.save(self.output_dir_path+'/db_X_next.npy', self.db_X_next)
            np.save(self.output_dir_path+'/db_index.npy', self.db_index)
            np.save(self.output_dir_path+'/db_overflow.npy', self.db_overflow)

# Control Model

In [16]:
def intialize(Model,SaveFolder,Q,R,h,X_ref, t, num_of_episode = 1, gamma = 0.9,graphs = ['states'], chosen_bases=['1','x'], ID_alg = 'SINDy'):
    select_output_graphs={'states':0,'value':0,'P':0,'error':0}
    for i in graphs:
        select_output_graphs[i] = 1
    select_ID_algorithm={'SINDy':0,'RLS':0, 'LS':0, 'GD':0} #options={'SINDy','RLS','LS', 'GD'}
    select_ID_algorithm[ ID_alg ] = 1

    n = Model.dim_n; m = Model.dim_m; measure_dim = n
    Lib=Library(chosen_bases,measure_dim,m, X_ref)
    p=Lib._Phi_dim
    Theta_dim=(m+1)*p
    print('Set of Bases:',Lib._Phi_lbl)
    db_dim=2000
    Obj=Objective(Q,R,gamma)
    SysID_Weights=np.ones((measure_dim,Theta_dim))*0.001
    DB=Database(db_dim=db_dim,Theta_dim=Theta_dim,Lib=Lib,load=0,save=0, output_dir_path=SaveFolder)
    SysID=new_SysID(select_ID_algorithm,DB,SysID_Weights,Lib)
    Controller=Discrete_controller(h,Objective=Obj,Lib=Lib)   # Control_full_R
    Sim=SimResults(np.repeat(t,num_of_episode),Lib,DB,SysID,Controller,select=select_output_graphs,output_dir_path=SaveFolder)
    return (n,m,measure_dim,Sim,Controller,SysID, DB)

class linear_model():
    def __init__(self, h, A = None, B = None, C = None, D = None):
        if( A is None ):
            self.dim_m = 1
            self.dim_n = 2
            self.A = np.array( [[1,h],[0,1]] )
            self.B = np.array( [[0, 1]] ).T
            self.A = np.array( [1] )
            self.B = np.array( [1] )
            self.dim_n = 1
            self.C = np.eye(self.dim_n)
            self.D = np.zeros( (self.dim_n, self.dim_m) )
        else:
            self.A = A; self.B = B, self.C = C, self.D = D
            self.dim_m = B.shape[1]; self.dim_n = A.shape[0]
        self.h = h
        self.x      =   np.zeros((self.dim_n))
        self.x_dot  =   np.zeros((self.dim_n))

    def next_x(self, u):
        self.x = self.A.dot(self.x) + np.squeeze(self.B.dot(u))
    
    def Read_sensor(self):
        return self.C.dot(self.x)

In [17]:
t_end=24;       h=0.01;    t=np.arange(0,t_end+h,h)
num_of_episode=4

Model = linear_model(h)
x_ref = np.array( [5, 1.5] )
X_Start = np.array( [7, 0] ) 
x_ref = np.array( [1] )
X_Start = np.array( [2] )

script_path=str(Path.cwd())
SaveFolder=script_path+'\Discrete_control'
Q = np.eye(Model.dim_n);      R = np.eye(Model.dim_m)

n,m,measure_dim,Sim,Controller,SysID,DB = intialize(Model, SaveFolder, Q, R, h, x_ref, t, num_of_episode)
X_buf=np.zeros((measure_dim,5))
U_buf=np.zeros((m,3))

u_lim = 200000
t_ID = 0

Set of Bases: ['1', 'x(1)']


In [18]:
x_history = np.zeros( (measure_dim,t.size))
Model.x = X_Start
error = np.zeros( t.size )
x_next = Model.Read_sensor()
for i in range(t.size):
    x_history[:,i] = x_next
    u = np.array([[np.sin(t[i])]])
    Model.next_x(u)
    x_next = Model.Read_sensor()
    x_next_hat = SysID.evaluate( x_history[:,i], u )
    error[i]=np.linalg.norm(x_next-x_next_hat)
    if ( np.abs(error[i])>0.5*np.mean(np.abs(error[:i])) ):
        DB.add( x_history[:,i] ,x_next , u) 
        SysID_Weights=SysID.update( x_history[:,i] ,x_next , u)
    
DB.DB_save()
SysID.save()
# Sim.graph(j,i)
Sim.printout(0)

Episode 1:
Number of samples in database :  4
x_dot(1) =    1.000*1    1.000*x(1)    1.000*1*u1 
V(x) =   1.000*x(1)^2
% of non-zero elements in P: 25.0 %


  return _methods._mean(a, axis=axis, dtype=dtype,
  ret = ret.dtype.type(ret / rcount)


In [19]:
Controller.dare_flag = 0
for i in range(100):
    Controller.update_P(Model.x, SysID_Weights)
    print(Controller.k_star)

ValueError: Input must be 1- or 2-d.

In [25]:
np.diag([1])

array([[1]])

In [26]:
Controller.Lib._pPhi_([2])

ValueError: could not broadcast input array from shape (0,) into shape (1,1)

In [96]:
print(Controller.A)
print(Controller.B)
print(Controller.Qb)
print(Controller.R)
Controller.k_star

[[1.    0.    0.   ]
 [5.015 1.    0.01 ]
 [1.5   0.    1.   ]]
[[0.]
 [0.]
 [1.]]
[[0. 0. 0.]
 [0. 1. 0.]
 [0. 0. 1.]]
[[1.]]


array([[-129.48836797,   -0.49141105,   -0.62596832]])

In [97]:
Controller.P

array([[7.84498458e+06, 2.07205844e+04, 3.36694212e+02],
       [2.07205844e+04, 8.21243030e+01, 1.30265408e+00],
       [3.36694212e+02, 1.30265408e+00, 1.63899486e+00]])