In [1]:
# Import numerical process libraries
import numpy as np
import pandas as pd
from scipy.linalg import svd, lstsq

# Import plotly for graphing
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots

import sys
print(sys.executable)

c:\Users\brent\AppData\Local\Programs\Python\Python311\python.exe


Example SSI from data invented by chatGPT.

X = [ lat acc; 
    long acc; 
    yaw rate; 
    v_x]

u = [steer; brake; throttle]


In [2]:
# Open and read file
df= pd.read_csv("vehicle_data.csv")
veh_data = df.drop(columns=['time'], axis=1)

headers = veh_data.columns
print(headers)
del(headers)

np.shape(veh_data)

Index(['yaw_rate', 'lateral_acc', 'long_acc', 'forward_speed',
       'steering_angle', 'brake_pressure', 'throttle'],
      dtype='object')


(1000, 7)

In [3]:
def hank_matrix(data, p):
    """
    Construct a Hankel Matrix

    Args:
        data: time series data that will be the basis of new Hankel matrix
        p: Past window to apply to matrix
    """
    
    # Convert data to a NumPy float64 array
    veh_data = np.array(data, dtype=np.float64)
    
    # Define the size of matrix
    T,n = veh_data.shape
    cols = T - p + 1
    
    # Create blank matrix
    Z = np.zeros((p * n, cols))
    
    # Populate matrix from data series
    for i in range(cols):
        # For each column, stack p consecutive data vectors
        for j in range(p):
            Z[j*n:(j+1)*n, i] = veh_data[i+j,:]
    
    return Z

In [None]:
fig = make_subplots(rows=4,cols=1,
    shared_xaxes=True,
    vertical_spacing=0.1, specs=[[{"secondary_y": False}], [{"secondary_y": True}], [{"secondary_y": True}], [{"secondary_y": True}]])

#show the steering trace
fig.add_trace(go.Scatter(x=df['time'],y=df['steering_angle'], mode = 'lines', 
                            name='Steering (°)'), row =1, col=1)

#show the long input traces
fig.add_trace(go.Scatter(x=df['time'],y=df['brake_pressure'], mode = 'lines',
                            name='Brake'), row =2, col=1, secondary_y=False)
fig.add_trace(go.Scatter(x=df['time'],y=df['throttle'], mode = 'lines', 
                            name='Throttle', ), row =2, col=1, secondary_y=True)

#show the acceleration traces
fig.add_trace(go.Scatter(x=df['time'],y=df['lateral_acc'], mode = 'lines',
                         name='Lateral acc [G]'), row =3, col=1, secondary_y=False)
fig.add_trace(go.Scatter(x=df['time'],y=df['long_acc'], mode = 'lines',
                         name='Long Acc [G]'), row =3, col=1, secondary_y=False)


#show the velocity traces
fig.add_trace(go.Scatter(x=df['time'],y=df['forward_speed'], mode = 'lines',
                         name='speed[m/s]'), row =4, col=1, secondary_y=False)
fig.add_trace(go.Scatter(x=df['time'],y=df['yaw_rate'], mode = 'lines',
                         name='Yaw'), row =4, col=1, secondary_y=True)


fig.show()

del fig

In [10]:
def subspace_id(U, Y, p):
    """
    Implements Subspace System Identification (SSI) to estimate system matrices.
    - U: (T, m) Input data
    - Y: (T, n) Output data
    - p: Past window size
    Returns estimated A, B, C matrices.
    """
    # Step 1: Construct Hankel Matrices
    H_U = hank_matrix(U, p)  # Input Hankel
    H_Y = hank_matrix(Y, p)  # Output Hankel

    # Step 2: Perform SVD on Output Hankel Matrix
    U_svd, S_svd, Vh_svd = svd(H_Y, full_matrices=False)

    # Step 3: Estimate System Order (rank selection)
    system_order = np.sum(S_svd > 1e-2)  # Select rank based on singular values

    # Step 4: Compute Extended Observability Matrix
    O_p = U_svd[:, :system_order] @ np.diag(np.sqrt(S_svd[:system_order]))

    # Step 5: Extract A and C from Observability
    O_p1 = O_p[:-Y.shape[1], :]
    O_p2 = O_p[Y.shape[1]:, :]
    
    A = lstsq(O_p1, O_p2)[0]  # A matrix (state transition)
    C = O_p[:Y.shape[1], :]  # C matrix (output matrix)

    # Step 6: Solve for B using least squares
    X = lstsq(O_p, H_Y[:O_p.shape[0], :])[0]  # Estimate states
    B = lstsq(X[:, :-1].T @ X[:, :-1], X[:, :-1].T @ U[:-1, :])[0]  # Solve B

    return A, B, C


In [8]:
U = veh_data.iloc[:,4:]
Y = veh_data.iloc[:,:4]

In [12]:
# Define past horizon window size (p)
p = 100  # Typically chosen based on expected system order

# Run Subspace System Identification
A_est, B_est, C_est = subspace_id(U, Y, p)

# Print Results
print("Estimated A matrix (State Transition):\n", A_est)
print("\nEstimated B matrix (Input Influence):\n", B_est)
print("\nEstimated C matrix (Output Mapping):\n", C_est)

InvalidIndexError: (slice(None, -1, None), slice(None, None, None))