In [None]:
import os

import numpy as np
import tqdm.notebook as tqdmnbk

import matplotlib as mplb
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec

import scipy as sp
import scipy.linalg as splinalg

import ssm
import filtr

%matplotlib inline

## 1. Model

The 3-degree-of-freedom system shown in the figure below consists of 3 masses $m_i=10^3$, for $i=1, 2, 3$, which are connected among them through springs $k_2=k_3=10^4$ and dampers $c_2=c_3=2.5\cdot10^2$. The first mass is fixed with a damper $c_1=2.5\cdot10^2$ and a spring which consists of a linear term $k_1=10^4$ and a cubic one $k_{\mathrm{c}}=10^{12}$, so that the elastic restoring force is equal to $f_1(t) = k_1\,u_1(t)+k_{\mathrm{c}}\,u_1^3(t)$.

<img src="3-dof-system.png" width="500"/>

The system is subjected to a white noise excitation force $F_2(t)$ and the displacement response is calculated for the cases where i) a linear and ii) a non-linear restoring force is assumed for $m_1$.

### 1.1 Definition of system matrix functions

In [None]:
def getStiffnessMatrix():
    
    """
    Get the stiffness matrix of the linear system.
    
    Returns 
    -------
    K: ndarray
        The system stiffness matrix.
    """
    
    K = np.array([
        [k1+k2,   -k2,   0],
        [             -k2, k2+k3, -k3],
        [               0,   -k3,  k3]
    ])
    
    return K


def getDampingMatrix():
    
    """
    Get the damping matrix.
    
    Returns 
    -------
    C: ndarray
        The system damping matrix.
    """
    
    C = np.array([
        [c1+c2,   -c2,   0],
        [  -c2, c2+c3, -c3],
        [    0,   -c3,  c3]
    ])
    
    return C


def getRestoringForceVector(x, kc):

    """
    Get the restoring force vector of the non-linear system.

    Parameters
    ----------
    x: ndarray
        ...
    kc: float
        The non-linear spring stiffness values.
    
    Returns 
    -------
    R: ndarray
        The restoring force vector.
    """

    k_linear = getStiffnessMatrix()
    R = k_linear.dot(x[:3])
    R[0] = R[0]+kc*x[0]**3
    
    return R

### 1.2 Specification of model parameters

In [None]:
# Mass matrix parameters
m1, m2, m3 = 1e3, 1e3, 1e3

# Damping matrix parameters
c1, c2, c3 = 0.25e3, 0.25e3, 0.25e3

# Stiffness matrix parameters
k1, k2, k3 = 1e4, 1e4, 1e4
kc = 1e12

# Evaluate system matrices
M = np.diag([m1, m2, m3])
C = getDampingMatrix()
K = getStiffnessMatrix()

# Perform eigenvalue analysis
values, vectors = splinalg.eigh(K, b=M)
frequencies = np.sqrt(values)/(2*np.pi)

# Print dynamic properties
sys.stdout.write('--------------------------\n')
sys.stdout.write('Natural frequencies [Hz]\n')
sys.stdout.write('--------------------------\n')

for j, frequency in enumerate(frequencies):
    sys.stdout.write(' Mode {}:   {:.2f}\n'.format(j+1, frequency))

sys.stdout.write('--------------------------\n')

# Plot mode shapes

plt.figure(figsize=(12, 4))
plt.title('Mode shapes')

for j in range(vectors.shape[1]):
    plt.plot(np.arange(1, vectors.shape[1]+1), vectors[:, j]/np.max(np.abs(vectors[:, j])))
    
legend = ['Mode {}'.format(str(item+1)) for item in range(vectors.shape[1])]
plt.legend(legend)

plt.xticks([1, 2, 3])
plt.ylim([-1.1, 1.1])
plt.xlabel('Mass')

plt.tight_layout()

### 1.3 Construction of input signal

In [None]:
# Specify the input selection matrix
Sp = np.zeros((3, 1))
Sp[2, 0] = 1

# Define time vector
time = np.arange(0, 600, 0.005)
force = np.zeros(time.shape)
magnitude = 1

# Specify frequence range of the excitation [Hz]
frange = np.array([0.1, 4])

# Specify resolution of the spectrum [Hz]
resolution = 0.001
phase = 0

for f in np.arange(frange[0], frange[1], resolution):
    force += magnitude*np.random.rand(1)[0]*np.sin(2*np.pi*f*time+phase)
    phase = np.random.rand(1)[0]*np.pi
    
L, k, x0 = 2, 1, 0
stepf = L/(1+np.exp(-k*(time-x0)))-1
force = stepf*force


plt.figure(figsize=(15, 4))
plt.title('Input force')
gridWidth = 6
gridspec.GridSpec(1,gridWidth)

# Plot force time history
plt.subplot2grid((1,gridWidth), (0,0), colspan=gridWidth-2, rowspan=1)
plt.plot(time, force, 'g')

plt.xlabel('Time [s]')
plt.ylabel('Force [N]')
plt.xlim([0, time[-1]])


# Plot zoom window
zoomWidth = [100, 110]
zoomWindow = np.array([
    [zoomWidth[0], zoomWidth[1], zoomWidth[1], zoomWidth[0], zoomWidth[0]],
    [min(force), min(force), max(force), max(force), min(force)]])
plt.plot(zoomWindow[0, :], zoomWindow[1, :], 'k')


# Plot zoomed time history
plt.subplot2grid((1,gridWidth), (0,gridWidth-2), colspan=2, rowspan=1)
plt.plot(time, force, 'g')

plt.xlabel('Time [s]')
plt.xlim(zoomWidth)

frame1 = plt.gca()
frame1.axes.yaxis.set_ticklabels([])

plt.tight_layout()

### 1.4 Definition of State-Space functions

In [None]:
inverse_mass = np.linalg.inv(M)

def getLinearSystem(x, t, p, *parameters):
    
    """
    Description
    """
    
    K = getStiffnessMatrix()
    a = inverse_mass.dot(Sp.dot(p)-K.dot(x[:3])-C.dot(x[3:]))
    x = np.hstack((x[3:], a))
 
    return x

def getOutputLinear(x, t, p, *parameters):
    
    """
    Description
    """
    
    xdot = getLinearSystem(x, t, p, *parameters)
    output = np.hstack((x[:3], x[3:], xdot[3:]))
    
    return output


def getNonlinearSystem(x, t, p, *parameters):
    
    """
    Description
    """
    
    R = getRestoringForceVector(x, kc)
    a = inverse_mass.dot(Sp.dot(p)-R-C.dot(x[3:]))
    x = np.hstack((x[3:], a))
 
    return x
    

def getOutputNonlinear(x, t, p, *parameters):
    
    """
    Description
    """
    
    xdot = getNonlinearSystem(x, t, p, *parameters)
    output = np.hstack((x, xdot[3:]))
    
    return output


def addNoise(signal, level):

    """
    Add Gaussian white noise to signals.

    Parameters
    ----------
    signal: ndarray
        An (n x s) array containing the signals to be corrupted, where n is
        the number of samples and s is the number of signals.
    level: float, positive
        The level of noise to be added, as percentage of the signal standard
        deviation.

    Returns
    -------
    signal: ndarray
        An array containing the noisy signals.
    """

    mean = np.mean(signal, axis=0)
    std = np.sqrt(np.sum((signal-mean)**2/signal.shape[0], axis=0))

    noisy_signal = signal+level*std*np.random.normal(size=signal.shape)
    noisy_signal = noisy_signal.squeeze()

    return noisy_signal

### 1.5 Simulation of state-space model

In [None]:
nsamples = 20000
t = time[:nsamples]
p = np.array([force[:nsamples]])

n_states = 6
n_inputs = 1
n_outputs = 9

model_linear = ssm.Nonlinear(getLinearSystem, getOutputLinear, n_states, n_inputs, n_outputs)
model_nonlinear = ssm.Nonlinear(getNonlinearSystem, getOutputNonlinear, n_states, n_inputs, n_outputs)

model_linear_discrete = model_linear.toDiscrete(t[1]-t[0])
model_nonlinear_discrete = model_nonlinear.toDiscrete(t[1]-t[0])

y_linear, x_linear, t = model_linear.simulate(t, p)
y_nonlinear, x_nonlinear, t = model_nonlinear.simulate(t, p)

### 1.6 System response plots

In [None]:
def plotResponse(responseA, responseB, legend, label, units):

    plt.figure(figsize=(16, 4))

    for j in range(3):
        plt.subplot('13{}'.format(j+1))
        plt.plot(t[:responseA.shape[1]], responseA[j, :])
        plt.plot(t[:responseB.shape[1]], responseB[j, :], 'r')
        plt.xlabel('Time [s]', fontsize=13)
        plt.ylabel('${}_{}$ [{}]'.format(label, j+1, units), fontsize=13)
        plt.legend(legend)

    plt.tight_layout()

legend = ['Linear', 'Nonlinear']
plotResponse(y_linear[:3, :], y_nonlinear[:3, :], legend, 'u', 'm')
plotResponse(y_linear[3:6, :], y_nonlinear[3:6, :], legend, '\dot{u}', 'm/s')
plotResponse(y_linear[6:, :], y_nonlinear[6:, :], legend, '\ddot{u}', 'm/s$^2$')

### 1.7 Response difference plots

In [None]:
def plotResponseDifference(difference, label, units):

    plt.figure(figsize=(16, 4))

    for j in range(3):
        plt.subplot('13{}'.format(j+1))
        plt.plot(t, difference[j, :], 'b')
        plt.xlabel('Time [s]', fontsize=13)
        plt.ylabel('${}_{}$ [{}]'.format(label, j+1, units), fontsize=13)
        
    plt.tight_layout()
    

plotResponseDifference(y_linear[:3, :]-y_nonlinear[:3, :], 'u', 'm')
plotResponseDifference(y_linear[3:6, :]-y_nonlinear[3:6, :], '\dot{u}', 'm/s')
plotResponseDifference(y_linear[6:, :]-y_nonlinear[6:, :], '\ddot{u}', 'm/s$^2$')

## 1. State estimation

### Select measurement signals

In [None]:
nsamples = 1000

displacement = [0]
velocity = []
acceleration = [0, 1, 2]

displacement_signals = y_nonlinear[:3, :][displacement, :]
velocity_signals = y_nonlinear[3:6, :][velocity, :]
acceleration_signals = y_nonlinear[6:, :][acceleration, :]

measurement_signals = np.vstack((displacement_signals, velocity_signals, acceleration_signals))
nmeasurements = measurement_signals.shape[0]
time = t[:nsamples]

# Add noise to measurements

noisy_measurement_signals = addNoise(measurement_signals.T, 0.02).T

def getNonlinearSystem(x, t, p, *parameters):
    
    K = getStiffnessMatrix()
    R = getRestoringForceVector(x, kc)
    a = inverse_mass.dot(Sp.dot(p)-R-C.dot(x[3:]))
    xdot = np.hstack((x[3:], a))
 
    return xdot

def getOutput(x, t, p, *parameters):
    
    xdot = getNonlinearSystem(x, t, p, *parameters)
    output = np.hstack((x[:3][displacement], x[3:][velocity], xdot[3:][acceleration]))
    
    return output

nonlinear_ss = ssm.Nonlinear(getNonlinearSystem, getOutput, 6, 1, nmeasurements)
nonlinear_ss_discrete = nonlinear_ss.toDiscrete(t[1]-t[0])

### 1.1 Unscented Kalman Filter (UKF)

In [None]:
ukf = filtr.UKF(nonlinear_ss_discrete)

stateSize = ukf.model.stateSize
outputSize = ukf.model.outputSize

pnoise = np.eye(stateSize)*1e-12
pnoise = np.diag([1e-9, 1e-9, 1e-9, 1e-14, 1e-14, 1e-14])
ukf.setProcessNoise(pnoise)

mnoise = np.eye(outputSize)*1e-6
mnoise = np.diag([1e-5, 1e-8, 1e-8, 1e-8])
ukf.setMeasurementNoise(mnoise)

mean = np.zeros(stateSize)
covariance = np.eye(stateSize)*1e-20
ukf.setInitialState(mean, covariance)

# Initialize state time history array
ukf_state = np.zeros((ukf.model.stateSize, t.size))
ukf_covariance = np.zeros((ukf.model.stateSize, t.size))

for k in tqdmnbk.tqdm(range(len(time))):
    ukf.predict(p[:, k])
    ukf.correct(noisy_measurement_signals[:, k], p[:, k])
    
    ukf_state[:, k] = ukf.state.mean

In [None]:
legend = ['Response', 'UKF']
plotResponse(y_nonlinear[:3, :nsamples], ukf_state[:3, :nsamples], legend, 'u', 'm')
plotResponse(y_nonlinear[3:6, :nsamples], ukf_state[3:6, :nsamples], legend, '\dot{u}', 'm/s')

### 1.2 Particle Filter (PF)

In [None]:
pf = filtr.PF(nonlinear_ss_discrete, 40)
pf.setResamplingThreshold(5)

stateSize = pf.model.stateSize
outputSize = pf.model.outputSize

pnoise = np.eye(stateSize)*1e-12
pnoise = np.diag([1e-9, 1e-9, 1e-9, 1e-14, 1e-14, 1e-14])
pf.setProcessNoise(pnoise)

mnoise = np.eye(outputSize)*1e-6
mnoise = np.diag([1e-5, 1e-8, 1e-8, 1e-8])
pf.setMeasurementNoise(mnoise)

mean = np.zeros(stateSize)
covariance = np.eye(stateSize)*1e-20
pf.setInitialState(mean, covariance)

# Initialize state time history array
pf_state = np.zeros((stateSize, t.size))
pf_covariance = np.zeros((stateSize, t.size))

for k in tqdmnbk.tqdm(range(len(time))):
    pf.predict(p[:, k])
    pf.correct(noisy_measurement_signals[:, k], p[:, k])
    
    pf_state[:, k] = pf.state.mean

In [None]:
legend = ['Response', 'PF']
plotResponse(y_nonlinear[:3, :nsamples], pf_state[:3, :nsamples], legend, 'u', 'm')
plotResponse(y_nonlinear[3:6, :nsamples], pf_state[3:6, :nsamples], legend, '\dot{u}', 'm/s')

### 1.3 Particle Filter with Mutation (MPF)

In [None]:
mpf = filtr.MPF(nonlinear_ss_discrete, 20)
mpf.setResamplingThreshold(5)

stateSize = mpf.model.stateSize
outputSize = mpf.model.outputSize

pnoise = np.eye(stateSize)*1e-12
pnoise = np.diag([1e-9, 1e-9, 1e-9, 1e-14, 1e-14, 1e-14])
mpf.setProcessNoise(pnoise)

mnoise = np.eye(outputSize)*1e-6
mnoise = np.diag([1e-5, 1e-8, 1e-8, 1e-8])
mpf.setMeasurementNoise(mnoise)

mean = np.zeros(stateSize)
covariance = np.eye(stateSize)*1e-20
mpf.setInitialState(mean, covariance)

# Initialize state time history array
mpf_state = np.zeros((stateSize, t.size))
mpf_covariance = np.zeros((stateSize, t.size))

for k in tqdmnbk.tqdm(range(len(time))):
    mpf.predict(p[:, k])
    mpf.correct(noisy_measurement_signals[:, k], p[:, k])
    
    mpf_state[:, k] = mpf.state.mean

In [None]:
legend = ['Response', 'MPF']
plotResponse(y_nonlinear[:3, :nsamples], mpf_state[:3, :nsamples], legend, 'u', 'm')
plotResponse(y_nonlinear[3:6, :nsamples], mpf_state[3:6, :nsamples], legend, '\dot{u}', 'm/s')

## 2. State-parameter estimation

### Select measurement signals

In [None]:
nsamples = 1000

displacement = [0]
velocity = []
acceleration = [0, 1, 2]

displacement_signals = y_nonlinear[:3, :][displacement, :]
velocity_signals = y_nonlinear[3:6, :][velocity, :]
acceleration_signals = y_nonlinear[6:, :][acceleration, :]

measurement_signals = np.vstack((displacement_signals, velocity_signals, acceleration_signals))
nmeasurements = measurement_signals.shape[0]
time = t[:nsamples]

# Add noise to measurements

noisy_measurement_signals = addNoise(measurement_signals.T, 0.02).T


def getNonlinearSystem(x, t, p, *parameters):
    
    K = getStiffnessMatrix()
    R = getRestoringForceVector(x, x[-1]*1e12)
    a = inverse_mass.dot(Sp.dot(p)-R-C.dot(x[3:6]))
    x = np.hstack((x[3:6], a, np.zeros(1)))
 
    return x

def getOutput(x, t, p, *parameters):
    
    xdot = getNonlinearSystem(x, t, p, *parameters)
    output = np.hstack((x[:3][displacement], x[3:6][velocity], xdot[3:][acceleration]))
    
    return output

nonlinear_ss = ssm.Nonlinear(getNonlinearSystem, getOutput, 7, 1, nmeasurements)
nonlinear_ss_discrete = nonlinear_ss.toDiscrete(t[1]-t[0])

### 2.1 Unscented Kalman Filter (UKF)

In [None]:
ukf = filtr.UKF(nonlinear_ss_discrete)

stateSize = ukf.model.stateSize
outputSize = ukf.model.outputSize

pnoise = np.diag([1e-10, 1e-9, 1e-9, 1e-14, 1e-14, 1e-14, 1e-25])
ukf.setProcessNoise(pnoise)

mnoise = np.diag([1e-10, 1e-8, 1e-8, 1e-8])
ukf.setMeasurementNoise(mnoise)

mean = np.zeros(stateSize)
mean[-1] = 2
covariance = np.eye(stateSize)*1e-10
covariance[-1, -1] = 1e-2
ukf.setInitialState(mean, covariance)

# Initialize state time history array
ukf_state = np.zeros((ukf.model.stateSize, nsamples))
ukf_covariance = np.zeros((ukf.model.stateSize, nsamples))

for k in tqdmnbk.tqdm(range(len(time))):
    ukf.predict(p[:, k])
    ukf.correct(noisy_measurement_signals[:, k], p[:, k])
    
    ukf_state[:, k] = ukf.state.mean

In [None]:
legend = ['Response', 'UKF']
plotResponse(y_nonlinear[:3, :nsamples], ukf_state[:3, :nsamples], legend, 'u', 'm')
plotResponse(y_nonlinear[3:6, :nsamples], ukf_state[3:6, :nsamples], legend, '\dot{u}', 'm/s')

plt.figure(figsize=(12, 3))
plt.plot(ukf_state[-1, :])

### 2.2 Particle Filter (PF)

In [None]:
nparticles = 1000
pf = filtr.PF(nonlinear_ss_discrete, nparticles)
pf.setResamplingThreshold(0.2*nparticles)

pnoise = np.diag([1e-11, 1e-9, 1e-9, 1e-11, 1e-9, 1e-9, 1e-25])
pf.setProcessNoise(pnoise)

mnoise = np.diag([1e-15, 1e-4, 1e-8, 1e-8])*1e5 # 2nd term was 1e-6
pf.setMeasurementNoise(mnoise)

mean = np.zeros(pf.model.stateSize)
mean[-1] = 2.5
covariance = np.diag([1e-20, 1e-19, 1e-19, 1e-19, 1e-19, 1e-19, 1e-10])*1e8
pf.setInitialState(mean, covariance)

# Initialize state time history array
pf_state = np.zeros((pf.model.stateSize, nsamples))
pf_covariance = np.zeros((pf.model.stateSize, nsamples))

for k in tqdmnbk.tqdm(range(len(time))):    
    pf.predict(p[:, k])
    pf.correct(noisy_measurement_signals[:, k], p[:, k])
    
    pf_state[:, k] = pf.state.mean

In [None]:
legend = ['Response', 'PF']
plotResponse(y_nonlinear[:3, :nsamples], pf_state[:3, :nsamples], legend, 'u', 'm')
plotResponse(y_nonlinear[3:6, :nsamples], pf_state[3:6, :nsamples], legend, '\dot{u}', 'm/s')

plt.figure(figsize=(12, 3))
plt.plot(pf_state[-1, :])

### 2.3 Particle Filter with Mutation (MPF)

In [None]:
nparticles = 400
mpf = filtr.MPF(nonlinear_ss_discrete, nparticles)
mpf.setResamplingThreshold(0.3*nparticles)
mpf.setMutationParameters(pr=0.05, pm=0.25, states=[6], radius=np.array([0.01]))

pnoise = np.diag([1e-11, 1e-9, 1e-9, 1e-11, 1e-9, 1e-9, 1e-25])
mpf.setProcessNoise(pnoise)

mnoise = np.diag([1e-15, 1e-4, 1e-8, 1e-8])*1e6
mpf.setMeasurementNoise(mnoise)

mean = np.zeros(mpf.model.stateSize)
mean[-1] = 2
covariance = np.diag([1e-20, 1e-19, 1e-19, 1e-19, 1e-19, 1e-19, 1e-6])*1e8
mpf.setInitialState(mean, covariance)

# Initialize state time history array
mpf_state = np.zeros((mpf.model.stateSize, nsamples))
mpf_covariance = np.zeros((mpf.model.stateSize, nsamples))

for k in tqdmnbk.tqdm(range(len(time))):
    mpf.predict(p[:, k])
    mpf.correct(noisy_measurement_signals[:, k], p[:, k])
    
    mpf_state[:, k] = mpf.state.mean

In [None]:
legend = ['Response', 'MPF']
plotResponse(y_nonlinear[:3, :nsamples], mpf_state[:3, :nsamples], legend, 'u', 'm')
plotResponse(y_nonlinear[3:6, :nsamples], mpf_state[3:6, :nsamples], legend, '\dot{u}', 'm/s')

plt.figure(figsize=(12, 3))
plt.plot(mpf_state[-1, :])

### 2.4 Rao-Blackwellised Particle Filter (RBPF)

In [None]:
def getNonlinearSystem(x, t, p, theta):
    
    K = getStiffnessMatrix()
    R = getRestoringForceVector(x, theta[0]*1e12)
    a = inverse_mass.dot(Sp.dot(p)-R-C.dot(x[3:6]))
    x = np.hstack((x[3:6], a))
 
    return x

def getOutput(x, t, p, theta):
    
    xdot = getNonlinearSystem(x, t, p, theta)
    output = np.hstack((x[:3][displacement], x[3:6][velocity], xdot[3:][acceleration]))
    
    return output

nonlinear_ss = ssm.Nonlinear(getNonlinearSystem, getOutput, 6, 1, nmeasurements)
nonlinear_ss_discrete = nonlinear_ss.toDiscrete(t[1]-t[0])

In [None]:
nparticles = 20
nparticles_latent = 70

pfs = [filtr.UKF(nonlinear_ss_discrete) for j in range(nparticles_latent)]
rbpf = filtr.RBPF(pfs, 0.95, 1)
rbpf.setResamplingThreshold(0.05*nparticles_latent)

# Initialize marginalized state
rbpf.setLatentState(np.array([1.5]), np.eye(1)*8)
rbpf.state.mean[0] = 1

pnoise = np.diag([1e-11, 1e-9, 1e-9, 1e-11, 1e-9, 1e-9])*1e-5
pnoise = np.diag([1e-11, 1e-11, 1e-11, 1e-2, 1e-2, 1e-2])*1e-5
for j in range(len(pfs)):
    pfs[j].setProcessNoise(pnoise)

mnoise = np.diag([1e-15, 1e-4, 1e-8, 1e-8])*1e-4
mnoise = np.diag([1e-15, 1e-3, 1e-5, 1e-3])*1e1
for j in range(len(pfs)):
    pfs[j].setMeasurementNoise(mnoise)

mean = np.zeros(pfs[0].model.stateSize)
covariance = np.diag([1e-20, 1e-19, 1e-19, 1e-19, 1e-19, 1e-19])*1e12
for j in range(len(pfs)):
    pfs[j].setInitialState(mean, covariance)

# Initialize state time history array
rbpf_marginalized = np.zeros((pfs[0].model.stateSize, nsamples))
rbpf_covariance = np.zeros((pfs[0].model.stateSize, nsamples))
rbpf_state = np.zeros((1, nsamples))

for k in tqdmnbk.tqdm(range(len(time))):
    rbpf.predict(p[:, k])
    rbpf.correct(noisy_measurement_signals[:, k], p[:, k])
    
    rbpf_marginalized[:, k] = rbpf.marginalized.mean
    rbpf_state[:, k] = rbpf.state.mean

In [None]:
legend = ['Response', 'RBPF']
plotResponse(y_nonlinear[:3, :nsamples], rbpf_marginalized[:3, :nsamples], legend, 'u', 'm')
plotResponse(y_nonlinear[3:6, :nsamples], rbpf_marginalized[3:6, :nsamples], legend, '\dot{u}', 'm/s')

plt.figure(figsize=(12, 3))
plt.plot(rbpf_state[0, :])

### 3. Input, state and parameter estimation

#### 3.1 Dual Kalman Filter - Unscented Kalman Filter (DKF-UKF)

In [None]:
nsamples = 3000

displacement = [0, 1]
velocity = []
acceleration = [0, 1, 2]

displacement_signals = y_nonlinear[:3, :][displacement, :]
velocity_signals = y_nonlinear[3:6, :][velocity, :]
acceleration_signals = y_nonlinear[6:, :][acceleration, :]

measurement_signals = np.vstack((displacement_signals, velocity_signals, acceleration_signals))
nmeasurements = measurement_signals.shape[0]
time = t[:nsamples]

# Add noise to measurements

noisy_measurement_signals = addNoise(measurement_signals.T, 0.02).T


def getNonlinearSystem(x, t, p, *parameters):
    
    K = getStiffnessMatrix()
    R = getRestoringForceVector(x, x[-1]*1e12)
    a = inverse_mass.dot(Sp.dot(p)-R-C.dot(x[3:6]))
    x = np.hstack((x[3:6], a, np.zeros(1)))
 
    return x

def getOutput(x, t, p, *parameters):
    
    xdot = getNonlinearSystem(x, t, p, *parameters)
    output = np.hstack((x[:3][displacement], x[3:6][velocity], xdot[3:][acceleration]))
    
    return output

nonlinear_ss = ssm.Nonlinear(getNonlinearSystem, getOutput, 7, 1, nmeasurements)
nonlinear_ss_discrete = nonlinear_ss.toDiscrete(t[1]-t[0])

In [None]:
D = np.zeros((nmeasurements, 1))
D[-2, 0] = 1/m2

dkfukf = filtr.DKFUKF(nonlinear_ss_discrete, D)

pnoise = np.diag([1e-10, 1e-9, 1e-9, 1e-8, 1e-8, 1e-8, 1e-25])
dkfukf.setProcessNoise(pnoise)

mnoise = np.diag([1e-10, 1e-8, 1e-8, 1e-8])
mnoise = np.diag([1e-10, 1e-5, 1e1, 1e-6])*1e-4
mnoise = np.diag([1e-7, 1e-5, 1e-5, 1e1, 1e-6])*1e-4
dkfukf.setMeasurementNoise(mnoise)

dkfukf.setInputNoise(np.array([[0.2e-2]]))
dkfukf.setInputNoise(np.array([[0.2e1]])) # Initial data set
dkfukf.setInputNoise(np.array([[0.2e2]])) # Data set 2
dkfukf.setInputNoise(np.array([[0.2e3]]))

mean = np.zeros(dkfukf.model.stateSize)
mean[-1] = 2
covariance = np.eye(dkfukf.model.stateSize)*1e-5
covariance[-1, -1] = 1e-2
dkfukf.setInitialState(mean, covariance)

mean = np.array([force[0]])# np.zeros(dkfukf.model.inputSize)
covariance = np.eye(dkfukf.model.inputSize)*1e10
dkfukf.setInitialInput(mean, covariance)

# Initialize state time history array
dkfukf_state = np.zeros((dkfukf.model.stateSize, nsamples))
dkfukf_input = np.zeros((dkfukf.model.inputSize, nsamples))

for k in tqdmnbk.tqdm(range(len(time))):
    dkfukf.predict()
    dkfukf.correct(noisy_measurement_signals[:, k])
    
    dkfukf_state[:, k] = dkfukf.state.mean
    dkfukf_input[:, k] = dkfukf.input.mean

In [None]:
legend = ['Response', 'DKF-UKF']
plotResponse(y_nonlinear[:3, :nsamples], dkfukf_state[:3, :nsamples], legend, 'u', 'm')
plotResponse(y_nonlinear[3:6, :nsamples], dkfukf_state[3:6, :nsamples], legend, '\dot{u}', 'm/s')

plt.figure(figsize=(12, 3))
plt.plot(dkfukf_state[-1, :])