<a href="https://colab.research.google.com/github/DanJoWi/NLE_Project/blob/main/nonlin_est_ukf.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import matplotlib as mpl

from scipy.integrate import odeint
import scipy.optimize

import sympy as sp
import pandas as pd




In [None]:
import sys
import requests
import importlib

def import_local_or_github(package_name, function_name=None, directory=None, giturl=None):
    # Import functions directly from github
    # Important: note that we use raw.githubusercontent.com, not github.com

    try: # to find the file locally
        if directory is not None:
            if directory not in sys.path:
                sys.path.append(directory)

        package = importlib.import_module(package_name)
        if function_name is not None:
            function = getattr(package, function_name)
            return function
        else:
            return package

    except: # get the file from github
        if giturl is None:
            giturl = 'https://raw.githubusercontent.com/florisvb/Nonlinear_and_Data_Driven_Estimation/main/Utility/' + str(package_name) + '.py'

        r = requests.get(giturl)
        print('Fetching from: ')
        print(r)

        # Store the file to the colab working directory
        with open(package_name+'.py', 'w') as f:
            f.write(r.text)
        f.close()

        # import the function we want from that file
        package = importlib.import_module(package_name)
        if function_name is not None:
            function = getattr(package , function_name)
            return function
        else:
            return package

plot_tme = import_local_or_github('plot_utility', 'plot_tme', directory='../Utility')
unscented_kalman_filter = import_local_or_github('unscented_kalman_filter', directory='../Utility')

Fetching from: 
<Response [200]>
Fetching from: 
<Response [200]>


In [None]:
try:
    import casadi
except:
    !pip install casadi
    import casadi

try:
    import do_mpc
except:
    !pip install do_mpc
    import do_mpc

try:
    import pybounds
except:
    #!pip install pybounds
    !pip install git+https://github.com/vanbreugel-lab/pybounds
    import pybounds

Collecting casadi
  Downloading casadi-3.7.2-cp312-none-manylinux2014_x86_64.whl.metadata (2.2 kB)
Downloading casadi-3.7.2-cp312-none-manylinux2014_x86_64.whl (75.6 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m75.6/75.6 MB[0m [31m9.5 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: casadi
Successfully installed casadi-3.7.2
Collecting do_mpc
  Downloading do_mpc-5.1.1-py3-none-any.whl.metadata (3.5 kB)
Downloading do_mpc-5.1.1-py3-none-any.whl (164 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m164.1/164.1 kB[0m [31m10.0 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: do_mpc
Successfully installed do_mpc-5.1.1




Collecting git+https://github.com/vanbreugel-lab/pybounds
  Cloning https://github.com/vanbreugel-lab/pybounds to /tmp/pip-req-build-rk099sie
  Running command git clone --filter=blob:none --quiet https://github.com/vanbreugel-lab/pybounds /tmp/pip-req-build-rk099sie
  Resolved https://github.com/vanbreugel-lab/pybounds to commit b847eb839f75308ed8ba402f0e3356f6148d465e
  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pybounds
  Building wheel for pybounds (setup.py) ... [?25l[?25hdone
  Created wheel for pybounds: filename=pybounds-0.0.13-py3-none-any.whl size=19568 sha256=f168568d8623d5a236dd896fc4c3a89c0df37919a753908016f0e1f57964b104
  Stored in directory: /tmp/pip-ephem-wheel-cache-16gujljx/wheels/cb/f8/fb/ff1887f9a2f35c3edad7b1acb7da69437c1fd8d885a800578d
Successfully built pybounds
Installing collected packages: pybounds
Successfully installed pybounds-0.0.13


# New Section

**SYSTEM DEF**

In [None]:
def f(x_vec, u_vec=None, return_state_names=False):
    if return_state_names:
        return ['x', 'x_dot', 'mu', 'fl', 'a']

    # Extract state variables
    x = x_vec[0]      #position
    x_dot = x_vec[1]  #velocity
    mu = x_vec[2]      #damping ratio (unknown param)
    fl = x_vec[3]    #linear natural frequency
    a = x_vec[4]

    # Extract control inputs
   # if u_vec is not None and len(u_vec) > 0:
   #     j1 = u_vec[0]
   # else:
   #     j1 = 0.0

    j1 = 0.0 # Default value if u_vec is None or empty
    if u_vec is not None:
        if np.isscalar(u_vec): # Check if u_vec is a scalar
            j1 = u_vec
        elif hasattr(u_vec, '__len__') and len(u_vec) > 0: # Check if it's array-like and not empty
            j1 = u_vec[0]

    # f0 component: drift dynamics (no controls)
    f0_contribution = np.array([ x_dot,
                                 -mu*x_dot-fl*x-a*x**3,
                                 0,
                                 0,
                                 0])

    # f1 component: multiplied by control j1
    f1_contribution = j1 * np.array([0,
                                     1,
                                     0,
                                     0,
                                     0])

    # combined dynamics
    x_dot_vec = f0_contribution + f1_contribution

    return x_dot_vec

In [None]:
def h(x_vec, u_vec, return_measurement_names=False):
    if return_measurement_names:
        return ['x_dot']


    # Extract state variables
    x = x_vec[0]      #position
    x_dot = x_vec[1]  #velocity
    mu = x_vec[2]      #damping ratio (unknown param)
    fl= x_vec[3]    #linear natural frequency
    a = x_vec[4]

    # Extract control inputs
    if u_vec is not None and len(u_vec) > 0:
        j1 = u_vec[0]
    else:
        j1 = 0.0

    # Measurements
    y_vec = [x_dot]

    # Return measurement
    return y_vec

In [None]:
state_names = ['x', 'x_dot', 'mu', 'fl', 'a']
input_names = ['j1']
measurement_names = ['x_dot']

In [None]:
dt = 0.01  # [s]

In [None]:
simulator = pybounds.Simulator(f, h, dt=dt, state_names=state_names,
                               input_names=input_names, measurement_names=measurement_names, mpc_horizon=10)


# New Section

**SETPOINTS**

In [None]:
# First define the set-point(s) to follow
t_f = 16.0
tsim = np.arange(0, t_f, step=dt)
NA = np.zeros_like(tsim)
ONES = np.ones_like(tsim)
tpi = 2*np.pi
b = 0.7*tpi
m = (1*tpi-b)/t_f

setpoint = {'x': 1*np.sin(m*(tsim)**2+b*(tsim)), #3*np.sin((m*4+b)*tsim), #3*np.sin((m*tsim+b)*tsim),
            'x_dot': NA,
            'mu': 1*ONES,
            'fl': 1*ONES,
            'a': 1*ONES
           }

In [None]:
# Update the simulator set-point
simulator.update_dict(setpoint, name='setpoint')

In [None]:
# Define MPC cost function: penalize the squared error between the setpoint for g and the true g
cost_x = (simulator.model.x['x'] - simulator.model.tvp['x_set']) ** 2
#cost_z = (simulator.model.x['mu'] - simulator.model.tvp['mu_set']) ** 2
cost = cost_x #+ cost_z #+ cost_k

In [None]:
# Set cost function
simulator.mpc.set_objective(mterm=cost, lterm=cost)  # objective function

# Set input penalty: make this small for accurate state tracking
simulator.mpc.set_rterm(j1=1e-4)

In [None]:
simulator.mpc.bounds['lower', '_x', 'mu'] = 0
simulator.mpc.bounds['upper', '_x', 'mu'] = 10

simulator.mpc.bounds['lower', '_x', 'fl'] = 0
simulator.mpc.bounds['upper', '_x', 'fl'] = 10

simulator.mpc.bounds['lower', '_x', 'a'] = 0
simulator.mpc.bounds['upper', '_x', 'a'] = 10

simulator.mpc.bounds['lower', '_x', 'x'] = -10
simulator.mpc.bounds['upper', '_x', 'x'] = 10

#simulator.mpc.bounds['lower', '_u', 'j1'] = -1
#simulator.mpc.bounds['upper', '_u', 'j1'] = 1

# New Section

**SIMULATOR**

In [None]:
# Run simulation using MPC
t_sim, x_sim, u_sim, y_sim = simulator.simulate(x0=None, u=None, mpc=True, return_full_output=True)

In [None]:
measurement_noise_stds = {'x_dot': 0.01
                         }

In [None]:
y_noisy = {key: y_sim[key] + np.random.normal(0, measurement_noise_stds[key], len(y_sim[key])) for key in y_sim.keys()}

In [None]:
y_noisy_df = pd.DataFrame(y_noisy)
u_sim_df = pd.DataFrame(u_sim)
x_sim_df = pd.DataFrame(x_sim)

# Kalman filter parameters and initilization

In [None]:
x0 = np.zeros(len(x_sim))
x = np.array([0, 0, 20, 20, 20])
u0 = np.zeros(1)
P0 = np.diag([1, 1, 100, 100, 100])
P0

array([[  1,   0,   0,   0,   0],
       [  0,   1,   0,   0,   0],
       [  0,   0, 100,   0,   0],
       [  0,   0,   0, 100,   0],
       [  0,   0,   0,   0, 100]])

In [None]:
print(x0)
print(x)

[0. 0. 0. 0. 0.]
[ 0  0 20 20 20]


In [None]:
R = np.diag( list(measurement_noise_stds.values()) )**2
Q = np.diag([1e-4]*len(x0))

In [None]:
dt = np.mean(np.diff(t_sim))

# Unscented Kalman Filter

In [None]:
UKF = unscented_kalman_filter.UKF(f, h, x, u0, P0, Q, R,
                                 dynamics_type='continuous', discretization_timestep=dt,
                                 alpha=0.5)

In [None]:
UKF.estimate(y_noisy_df, u_sim_df)

In [None]:
UKF.history.keys()

In [None]:
# State estimate
x_est = pd.DataFrame(np.vstack(UKF.history['X']), columns=f(None,None,return_state_names=True))

In [None]:
# Covariance diagonals
P_diags = np.vstack([np.diag(UKF.history['P'][i]) for i in range(len(UKF.history['P']))])
P_diags = pd.DataFrame(P_diags, columns=f(None,None,return_state_names=True))

In [None]:
fig = plt.figure()
ax = fig.add_subplot(111)

state = 'mu'

plot_tme(t_sim, x_sim[state], None, x_est[state], label_var=state, ax=ax)

plus3sigma = x_est[state] + 3*np.sqrt(P_diags[state])
minus3sigma = x_est[state] - 3*np.sqrt(P_diags[state])

ax.fill_between(t_sim, plus3sigma, minus3sigma, facecolor='red', edgecolor='none', alpha=0.2)
ax.set_ylim(1, 3)

fig.savefig('mu_est_inout.png')

In [None]:
fig = plt.figure()
ax = fig.add_subplot(111)

state = 'fl'

plot_tme(t_sim, x_sim[state], None, x_est[state], label_var=state, ax=ax)

plus3sigma = x_est[state] + 3*np.sqrt(P_diags[state])
minus3sigma = x_est[state] - 3*np.sqrt(P_diags[state])

ax.fill_between(t_sim, plus3sigma, minus3sigma, facecolor='red', edgecolor='none', alpha=0.2)
ax.set_ylim(0, 2)

fig.savefig('lambda_est_inout.png')

In [None]:
fig = plt.figure()
ax = fig.add_subplot(111)

state = 'a'

plot_tme(t_sim, x_sim[state], None, x_est[state], label_var=state, ax=ax)
#plot_tme(t_sim[800:1600], x_sim[state][800:1600], None, x_est[state][800:1600], label_var=state, ax=ax)

plus3sigma = x_est[state] + 3*np.sqrt(P_diags[state])
minus3sigma = x_est[state] - 3*np.sqrt(P_diags[state])
ax.fill_between(t_sim, plus3sigma, minus3sigma, facecolor='red', edgecolor='none', alpha=0.2)
# Set y-axis limits
#ax.set_ylim(4, 6)

fig.savefig('alpha_est_inout.png')