In [1]:
import numpy as np
import matplotlib.pyplot as plt
from matplotlib import rcParams
from control.matlab import *
import slycot
from scipy import signal,io
import os
from scipy.linalg import fractional_matrix_power
from dyn_utils import hankel_matrix

# Python control toolbox available at https://python-control.readthedocs.io/

plt.rcParams['figure.figsize'] = [8, 8]
plt.rcParams.update({'font.size': 18})


In [2]:
os.listdir('../../data')

['a.csv',
 'c.csv',
 'b.csv',
 '.DS_Store',
 'd.csv',
 'my_face.jpg',
 'video_data.npy',
 'testSys.mat']

In [3]:
q = 2   # Number of inputs
p = 2   # Number of outputs
n = 100 # State dimension
r = 10 # Reduced model order

# This file is missing:
#testSys_mat = io.loadmat(os.path.join('..','..','data','testSys_ABCD.mat'))
#A = testSys_mat['A']
#B = testSys_mat['B']
#C = testSys_mat['C']
#D = testSys_mat['D']

# I made csv files instead
data_dir = '../../data'
A = np.genfromtxt(os.path.join(data_dir, 'a.csv'), delimiter=',')
B = np.genfromtxt(os.path.join(data_dir, 'b.csv'), delimiter=',')
C = np.genfromtxt(os.path.join(data_dir, 'c.csv'), delimiter=',')
D = np.genfromtxt(os.path.join(data_dir, 'd.csv'), delimiter=',')

sysFull = ss(A,B,C,D,1)

In [4]:
yFull = np.zeros((r*5+2,p,q))
tspan = np.arange(0,(r*5+2),1)
m = len(tspan)

for qi in range(q):
    yFull[:,:,qi],t = impulse(sysFull,T=tspan,input=qi)


YY = np.transpose(yFull,axes=(1,2,0)) # reorder to size p x q x m

In [5]:
## ERA and OKID Function Definitions

def ERA(YY,m,n,nin,nout,r):
    Dr = np.zeros((nout,nin))
    Y = np.zeros((nout,nin,YY.shape[2]-1))
    for i in range(nout):
        for j in range(nin):
            Dr[i,j] = YY[i,j,0]
            Y[i,j,:] = YY[i,j,1:]
            
    assert len(Y[:,1,1]) == nout
    assert len(Y[1,:,1]) == nin
    assert len(Y[1,1,:]) >= m+n
    
    H = np.zeros((nout*m,nin*n))
    H2 = np.zeros((nout*m,nin*n))
    
    for i in range(m):
        for j in range(n):
            for Q in range(nout):
                for P in range(nin):
                    H[nout*i+Q,nin*j+P] = Y[Q,P,i+j]
                    H2[nout*i+Q,nin*j+P] = Y[Q,P,i+j+1]
                    
    U,S,VT = np.linalg.svd(H,full_matrices=0)
    V = VT.T
    Sigma = np.diag(S[:r])
    Ur = U[:,:r]
    Vr = V[:,:r]
    Ar = fractional_matrix_power(Sigma,-0.5) @ Ur.T @ H2 @ Vr @ fractional_matrix_power(Sigma,-0.5)
    Br = fractional_matrix_power(Sigma,-0.5) @ Ur.T @ H[:,:nin]
    Cr = H[:nout,:] @ Vr @ fractional_matrix_power(Sigma,-0.5)
    HSVs = S
    
    return Ar, Br, Cr, Dr, HSVs


def ERA_vectorized(YY, m, n, nin, nout, r):
    """Compute a reduced order linear dynamical system model
    from impulse response data using the Eigensystem Realization
    Algorithm (ERA).
    
    Arguments:
        YY : Impulse response data shape (nout, nin, T)
        nin, nout : number of inputs and outputs
        m, n : dimensions of Hankel matrix
        r: dimensions of reduced model
    """

    Dr = YY[:,:,0]
    Y = YY[:,:,1:]
    assert Dr.shape == (nout, nin)
    assert Y.shape[2] >= m+n
    
    H = np.zeros((nout*m, nin*n))
    H2 = np.zeros((nout*m, nin*n))
    for i in range(m):
        for j in range(n):
            H[nout*i:nout*(i+1), nin*j:nin*(j+1)] = Y[:, :, i+j]
            H2[nout*i:nout*(i+1), nin*j:nin*(j+1)] = Y[:, :, i+j+1]
    
    U, S, VT = np.linalg.svd(H,full_matrices=0)
    V = VT.T
    Sigma = np.diag(S[:r])
    Ur = U[:, :r]
    Vr = V[:, :r]
    Ar = fractional_matrix_power(Sigma, -0.5) @ Ur.T @ H2 @ Vr @ \
         fractional_matrix_power(Sigma, -0.5)
    Br = fractional_matrix_power(Sigma, -0.5) @ Ur.T @ H[:,:nin]
    Cr = H[:nout, :] @ Vr @ fractional_matrix_power(Sigma, -0.5)
    HSVs = S
    
    return Ar, Br, Cr, Dr, HSVs


def OKID(y,u,r):
    # inputs:  y (sampled output), u (sampled input), r (effective system order)
    # outputs: H (Markov parameters), M (Observer gain)
    
    PP = y.shape[0] # number of outputs
    MM = y.shape[1] # number of output samples
    QQ = u.shape[0] # number of inputs
    lu = u.shape[1] # number of input samples
    
    assert MM == lu
    
    LL = r*5
    
    # Form data matrices y and V
    V = np.zeros((QQ+(QQ+PP)*LL,MM))
    for i in range(MM):
        V[:QQ,i] = u[:QQ,i]
        
    for i in range(1,LL+1):
        for j in range(MM-i):
            vtemp = np.concatenate((u[:,j],y[:,j]))
            V[QQ+(i-1)*(QQ+PP):QQ+i*(QQ+PP),i+j] = vtemp
    
    # Solve for observer Markov parameters Ybar
    Ybar = y @ np.linalg.pinv(V,rcond=10**(-3))
    
    # Isolate system Markov parameters H, and observer gain M
    D = Ybar[:,:QQ] # feed-through term (or D matrix) is the first term
    
    Y = np.zeros((PP,QQ,LL))
    Ybar1 = np.zeros((PP,QQ,LL))
    Ybar2 = np.zeros((PP,QQ,LL))
    
    for i in range(LL):
        Ybar1[:,:,i] = Ybar[:,QQ+(QQ+PP)*i : QQ+(QQ+PP)*i+QQ]
        Ybar2[:,:,i] = Ybar[:,QQ+(QQ+PP)*i+QQ : QQ+(QQ+PP)*(i+1)]
    
    Y[:,:,0] = Ybar1[:,:,0] + Ybar2[:,:,0] @ D
    for k in range(1,LL):
        Y[:,:,k] = Ybar1[:,:,k] + Ybar2[:,:,k] @ D
        for i in range(k-1):
            Y[:,:,k] += Ybar2[:,:,i] @ Y[:,:,k-i-1]
            
    H = np.zeros((D.shape[0],D.shape[1],LL+1))
    H[:,:,0] = D
    
    for k in range(1,LL+1):
        H[:,:,k] = Y[:,:,k-1]
        
    return H

In [6]:
## Compute ERA from impulse response
mco = int(np.floor((yFull.shape[0]-1)/2)) # m_c = m_o = (m-1)/2
Ar, Br, Cr, Dr, HSVs = ERA(YY,mco,mco,q,p,r)
Ar2, Br2, Cr2, Dr2, HSVs2 = ERA_vectorized(YY,mco,mco,q,p,r)

assert np.array_equal(Ar, Ar2)
assert np.array_equal(Br, Br2)
assert np.array_equal(Cr, Cr2)
assert np.array_equal(Dr, Dr2)
assert np.array_equal(HSVs, HSVs2)

sysERA = ss(Ar,Br,Cr,Dr,1)

In [7]:
m = n = mco
nin = q
nout = p
Dr = YY[:,:,0]
Y = YY[:,:,1:]
assert Dr.shape == (nout, nin)
assert Y.shape[2] >= m+n

h = hankel_matrix(Y.T, m, n+1)
H = h[:,:-2]
H2 = h[:,2:]

assert H.shape == (50, 50)
assert H2.shape == (50, 50)

In [8]:
np.array_equal(H, h[:,:-2])
np.array_equal(H2, h[:,2:])

True

In [9]:
H[:5, :3]

array([[ 7.71088095,  5.25967538,  1.66223156],
       [-6.17798632,  4.71536273,  2.10706513],
       [ 1.66223156, -1.71574197,  4.26277725],
       [ 2.10706513,  3.5877729 , -4.80887737],
       [ 4.26277725,  5.93954291,  1.3357039 ]])

In [10]:
H2[:5, :3]

array([[ 1.66223156, -1.71574197,  4.26277725],
       [ 2.10706513,  3.5877729 , -4.80887737],
       [ 4.26277725,  5.93954291,  1.3357039 ],
       [-4.80887737,  2.97997009,  2.51509526],
       [ 1.3357039 ,  1.24350083,  2.23509732]])

In [11]:
h[:5, :3]

array([[ 7.71088095,  5.25967538,  1.66223156],
       [-6.17798632,  4.71536273,  2.10706513],
       [ 1.66223156, -1.71574197,  4.26277725],
       [ 2.10706513,  3.5877729 , -4.80887737],
       [ 4.26277725,  5.93954291,  1.3357039 ]])

In [12]:
# Some time-series data
q = 3  # Number of inputs
p = 2  # Number of outputs
nt = 10  # Number of timesteps
a = np.arange(nt*p*q).reshape(nt,p,q)
print(a.shape)
y = a.transpose()
assert y.shape == (q, p, nt)
print(y.shape)
print(y[:,:,0])
print(y[:,:,1])
print(y[:,:,2])

(10, 2, 3)
(3, 2, 10)
[[0 3]
 [1 4]
 [2 5]]
[[ 6  9]
 [ 7 10]
 [ 8 11]]
[[12 15]
 [13 16]
 [14 17]]


In [13]:
# Desired Hankel matrix
m = n = 5  # dimensions
assert nt >= m + n - 1
h = np.zeros((q*m, p*n), dtype=int)
for i in range(m):
    for j in range(n):
        h[q*i:q*(i+1), p*j:p*(j+1)] = y[:, :, i+j]
print(h)

[[ 0  3  6  9 12 15 18 21 24 27]
 [ 1  4  7 10 13 16 19 22 25 28]
 [ 2  5  8 11 14 17 20 23 26 29]
 [ 6  9 12 15 18 21 24 27 30 33]
 [ 7 10 13 16 19 22 25 28 31 34]
 [ 8 11 14 17 20 23 26 29 32 35]
 [12 15 18 21 24 27 30 33 36 39]
 [13 16 19 22 25 28 31 34 37 40]
 [14 17 20 23 26 29 32 35 38 41]
 [18 21 24 27 30 33 36 39 42 45]
 [19 22 25 28 31 34 37 40 43 46]
 [20 23 26 29 32 35 38 41 44 47]
 [24 27 30 33 36 39 42 45 48 51]
 [25 28 31 34 37 40 43 46 49 52]
 [26 29 32 35 38 41 44 47 50 53]]


In [14]:
# Alternative method using stacking
b = np.hstack([y[:,:,i] for i in range(y.shape[2])])
assert b.shape == (q, p*nt)
h = np.vstack([b[:, i*p:i*p+n*p] for i in range(m)])
print(h)

[[ 0  3  6  9 12 15 18 21 24 27]
 [ 1  4  7 10 13 16 19 22 25 28]
 [ 2  5  8 11 14 17 20 23 26 29]
 [ 6  9 12 15 18 21 24 27 30 33]
 [ 7 10 13 16 19 22 25 28 31 34]
 [ 8 11 14 17 20 23 26 29 32 35]
 [12 15 18 21 24 27 30 33 36 39]
 [13 16 19 22 25 28 31 34 37 40]
 [14 17 20 23 26 29 32 35 38 41]
 [18 21 24 27 30 33 36 39 42 45]
 [19 22 25 28 31 34 37 40 43 46]
 [20 23 26 29 32 35 38 41 44 47]
 [24 27 30 33 36 39 42 45 48 51]
 [25 28 31 34 37 40 43 46 49 52]
 [26 29 32 35 38 41 44 47 50 53]]


In [15]:
from numpy.lib.stride_tricks import as_strided

def hankel_using_loops(y, m, n):
    """Compute the Hankel matrix from impulse response data.
    
    Arguments
        y : Impulse response data as array of shape (nout, nin, T).
        m, n : Dimensions of Hankel matrix.
    
    Returns
        h : Hankel matrix as m x n array.
    """
    nt = y.shape[2]
    assert nt >= m + n - 1
    q, p = y.shape[0:2]  # number of inputs, outputs
    h = np.zeros((q*m, p*n), dtype=int)
    for i in range(m):
        for j in range(n):
            h[q*i:q*(i+1), p*j:p*(j+1)] = y[:, :, i+j]
    return h

def hankel_using_strides(y, m, n):
    nt = y.shape[0]
    assert nt >= m + n - 1
    s0, s1, s2 = a.strides
    return as_strided(a, shape=(m,q,n,p), 
                      strides=(s0,s2,s0,s1)).reshape(m*q,n*p)

assert np.array_equal(
    hankel_using_loops(y, m, n),
    hankel_using_strides(a, m, n)
)

In [16]:
hankel_using_loops(y, 3, 3)

array([[ 0,  3,  6,  9, 12, 15],
       [ 1,  4,  7, 10, 13, 16],
       [ 2,  5,  8, 11, 14, 17],
       [ 6,  9, 12, 15, 18, 21],
       [ 7, 10, 13, 16, 19, 22],
       [ 8, 11, 14, 17, 20, 23],
       [12, 15, 18, 21, 24, 27],
       [13, 16, 19, 22, 25, 28],
       [14, 17, 20, 23, 26, 29]])

In [17]:
hankel_using_strides(a, 3, 3)

array([[ 0,  3,  6,  9, 12, 15],
       [ 1,  4,  7, 10, 13, 16],
       [ 2,  5,  8, 11, 14, 17],
       [ 6,  9, 12, 15, 18, 21],
       [ 7, 10, 13, 16, 19, 22],
       [ 8, 11, 14, 17, 20, 23],
       [12, 15, 18, 21, 24, 27],
       [13, 16, 19, 22, 25, 28],
       [14, 17, 20, 23, 26, 29]])

In [18]:
%timeit hankel_using_loops(y, m, n)

47.1 µs ± 790 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)


In [19]:
%timeit hankel_using_strides(a, m, n)

18.1 µs ± 2.05 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)


In [20]:
4110/27.7

148.3754512635379

In [21]:
def hankel_matrix(y, m, n):
    """Compute the Hankel matrix from input-output data.
    
    Arguments
        y : Impulse response data as array of shape (T, nout, nin).
        m, n : Dimensions of Hankel matrix.
    
    Returns
        h : Hankel matrix as m x n array.
    """
    nt = y.shape[0]
    assert nt >= m + n - 1
    if len(y.shape) == 1:
        y = y.reshape(nt, 1, 1)
        p = q = 1  # SISO system
    elif len(y.shape) == 3:
        p, q = y.shape[1:3]  # MIMO system
    else:
        raise ValueError("y must be 1 or 3 dimensional")
    s0, s1, s2 = y.strides
    return as_strided(y, shape=(m, q, n, p), 
                      strides=(s0, s2, s0, s1)).reshape(m*q, n*p)

y = np.arange(5)
h = hankel_matrix(y, 3, 3)
h_true = np.array([
    [0, 1, 2],
    [1, 2, 3],
    [2, 3, 4]
])
assert np.array_equal(h, h_true)

y = np.arange(5).astype(float)
h = hankel_matrix(y, 4, 2)
h_true = np.array([
    [0., 1.],
    [1., 2.],
    [2., 3.],
    [3., 4.]
])
assert np.array_equal(h, h_true)

y = np.arange(20).reshape(5,2,2)
h = hankel_matrix(y, 3, 3)
h_true = np.array([
    [ 0,  2,  4,  6,  8, 10],
    [ 1,  3,  5,  7,  9, 11],
    [ 4,  6,  8, 10, 12, 14],
    [ 5,  7,  9, 11, 13, 15],
    [ 8, 10, 12, 14, 16, 18],
    [ 9, 11, 13, 15, 17, 19]
])
assert np.array_equal(h, h_true)

y = np.arange(30).reshape(5,3,2)
h = hankel_matrix(y, 2, 4)
h_true = np.array([
    [ 0,  2,  4,  6,  8, 10, 12, 14, 16, 18, 20, 22],
    [ 1,  3,  5,  7,  9, 11, 13, 15, 17, 19, 21, 23],
    [ 6,  8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28],
    [ 7,  9, 11, 13, 15, 17, 19, 21, 23, 25, 27, 29]
])
assert np.array_equal(h, h_true)

y = np.arange(30).reshape(5,3,2)  # Excess data
h = hankel_matrix(y, 2, 3)
h_true = np.array([
    [ 0,  2,  4,  6,  8, 10, 12, 14, 16],
    [ 1,  3,  5,  7,  9, 11, 13, 15, 17],
    [ 6,  8, 10, 12, 14, 16, 18, 20, 22],
    [ 7,  9, 11, 13, 15, 17, 19, 21, 23]
])
assert np.array_equal(h, h_true)

In [22]:
y = np.arange(30).reshape(5,3,2)
h = hankel_matrix(y, 2, 3)
h

array([[ 0,  2,  4,  6,  8, 10, 12, 14, 16],
       [ 1,  3,  5,  7,  9, 11, 13, 15, 17],
       [ 6,  8, 10, 12, 14, 16, 18, 20, 22],
       [ 7,  9, 11, 13, 15, 17, 19, 21, 23]])

In [23]:
y = np.arange(30).reshape(5,3,2)
y

array([[[ 0,  1],
        [ 2,  3],
        [ 4,  5]],

       [[ 6,  7],
        [ 8,  9],
        [10, 11]],

       [[12, 13],
        [14, 15],
        [16, 17]],

       [[18, 19],
        [20, 21],
        [22, 23]],

       [[24, 25],
        [26, 27],
        [28, 29]]])

In [24]:
y = np.arange(100)
%timeit hankel_matrix(y, 50, 50)

14.9 µs ± 222 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)


In [25]:
def hankel_matrix_with_delay(x, n_rows):
    """Hankel matrix of eigen-time-delay coordinates.
    """
    assert n_rows > 1
    h = np.stack([np.roll(x, -i) for i in range(n_rows)])
    return h[:, :-n_rows+1]

h_test = hankel_matrix_with_delay(np.array(range(5)), 3)
h_true = np.array([
    [0, 1, 2],
    [1, 2, 3],
    [2, 3, 4]
])
assert np.array_equal(h_test, h_true)

In [26]:
y = np.arange(100)
%timeit hankel_matrix_with_delay(y, 50)

1.46 ms ± 18.9 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)


In [27]:
%timeit hankel_matrix(y, 50, 50)

15.3 µs ± 563 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)


In [28]:
y.shape

(100,)

In [29]:
## Compute random input simulation for OKID
uRandom = np.random.randn(q,200) # Random forcing input
yRandom = lsim(sysFull,uRandom,range(200))[0].T # Output

ValueError: shapes (100,2) and (3,) not aligned: 2 (dim 1) != 3 (dim 0)

In [None]:
## Compute OKID and then ERA
H = OKID(yRandom,uRandom,r)
mco = int(np.floor((H.shape[2]-1)/2)) # m_c = m_o
Ar,Br,Cr,Dr,HSVs = ERA(H,mco,mco,q,p,r)
sysERAOKID = ss(Ar,Br,Cr,Dr,1)

In [None]:
## Plot impulse responses for all methods

y1 = np.zeros((200,p,q))
y2 = np.zeros((100,p,q))
y3 = np.zeros((100,p,q))

for qi in range(q):
    y1[:,:,qi],t1 = impulse(sysFull,np.arange(200),input=qi)
    y2[:,:,qi],t2 = impulse(sysERA,np.arange(100),input=qi)
    y3[:,:,qi],t3 = impulse(sysERAOKID,np.arange(100),input=qi)

fig, axs = plt.subplots(2,2, figsize=(9,5))
axs = axs.reshape(-1)

axs[0].step(t1,y1[:,0,0],linewidth=2)
axs[0].step(t2,y2[:,0,0],linewidth=1.2)
axs[0].step(t3,y3[:,0,0],linewidth=1)
axs[0].set_ylabel('$y_1$')
axs[0].set_title('$u_1$')
axs[0].grid()

axs[1].step(t1,y1[:,0,1],linewidth=2)
axs[1].step(t2,y2[:,0,1],linewidth=1.2)
axs[1].step(t3,y3[:,0,1],linewidth=1)
axs[1].set_title('$u_2$')
axs[1].grid()

axs[2].step(t1,y1[:,1,0],linewidth=2)
axs[2].step(t2,y2[:,1,0],linewidth=1.2)
axs[2].step(t3,y3[:,1,0],linewidth=1)
axs[2].set_ylabel('$y_2$')
axs[2].grid()

axs[3].step(t1,y1[:,1,1],linewidth=2,label='Full model, n=100')
axs[3].step(t2,y2[:,1,1],linewidth=1.2,label='ERA, r={}'.format(r))
axs[3].step(t3,y3[:,1,1],linewidth=1,label='ERA/OKID, r={}'.format(r))
axs[3].legend(prop={'size': 12})
axs[3].grid()

for ax in axs:
    ax.set_xlim(0,60)

# Put a legend to the right of the current axis
axs[3].legend(loc='center left', bbox_to_anchor=(1, 0.5))

plt.show()

In [None]:
## Plot input/output pair for OKID
fig,axs = plt.subplots(2)

axs[0].set_title('Inputs')
axs[0].step(range(uRandom.shape[1]),uRandom[0,:],label='u1')
axs[0].step(range(uRandom.shape[1]),uRandom[1,:],label='u2')
axs[0].set_xlabel('t')
axs[0].set_ylabel('u')

axs[1].set_title('Outputs')
axs[1].step(range(yRandom.shape[1]),uRandom[0,:],label='y1')
axs[1].step(range(yRandom.shape[1]),uRandom[1,:],label='y2')
axs[1].set_xlabel('t')
axs[1].set_ylabel('y')

for ax in axs:
    ax.legend(prop={'size': 12})


plt.show()

In [None]:
uRandom.shape