# Sparse identification of nonlinear dynamical systems

### Import packages

In [1]:
from copy import copy
from utils import *
import numpy as np
from sklearn.metrics import mean_squared_error
import random
import torch
import os

In [72]:
def fix_seed(seed=42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    os.environ['PYTHONHASHSEED'] = str(seed)

fix_seed(42)


### Setup simmulation parameters

In [73]:
L = 4
M = 2
G = 9.8
simulation_duration = 6

dt = .0001
rk_integrator_args = {}
rk_integrator_args['rtol'] = 1e-13
rk_integrator_args['method'] = 'RK45'
rk_integrator_args['atol'] = 1e-10

odeint_integrator_args = {}
odeint_integrator_args['rtol'] = 1e-13
odeint_integrator_args['atol'] = 1e-10
odeint_integrator_args['full_output'] = 1

### Generate data

In [74]:
t_train = np.arange(0, simulation_duration, dt)
initial_pendulum_config = np.array([np.deg2rad(80), 0])
x_train_theta ,train_joints_over_time, x_train = compute_thetas_over_time(simulation_duration, dt, initial_pendulum_config, rk_integrator_args, G, L)

t_test = np.arange(0, simulation_duration, dt)
test_initial_pendulum_config = np.array([np.deg2rad(65),0])
x_test_theta ,test_joints_over_time,x_test = compute_thetas_over_time(simulation_duration, dt, test_initial_pendulum_config, rk_integrator_args, G, L)

### Sparse regression model

**STLSQ** is an iterative algorithm used to find sparse solutions to regression problems.

#### Goal

Given data matrix $ X \in \mathbb{R}^{n \times d}$ and target(s) $ Y \in \mathbb{R}^{n \times m} $, we want to find a sparse coefficient matrix $ \Xi \in \mathbb{R}^{d \times m} $ such that:

#### Steps

1. Initial Fit (Ridge Regression):

   Solve for $ \Xi $ using a regularized least squares problem:

   
   $\Xi = (X^T X + \alpha I)^{-1} X^T Y$

   where $\alpha$ is a small regularization parameter to stabilize the inversion.

2. Thresholding Step:

   Set all coefficients in $ \Xi $ with magnitude less than a threshold $ \theta $ to zero:

   $\Xi_{ij} = 0 \quad \text{if} \quad |\Xi_{ij}| < \theta$

3. **Refitting:**

   For the remaining non-zero coefficients, refit the model by solving least squares only on the selected terms.

4. **Repeat:**

   Repeat the thresholding and refitting process for a fixed number of iterations or until convergence.

In [75]:
class STLSQ:
    def __init__(self, threshold=0.2, max_iter=35, alpha=1e-5, verbose=False):
        self.threshold = threshold
        self.max_iter = max_iter
        self.alpha = alpha
        self.verbose = verbose
        self.coef_ = None

    def fit(self, X, Y):
        """
        Fit the STLSQ model to single or multiple outputs.

        Parameters:
            X : ndarray, shape (n_samples, n_features)
            Y : ndarray, shape (n_samples,) or (n_samples, n_targets)

        Returns:
            self : fitted model
        """
        if Y.ndim == 1:
            Y = Y.reshape(-1, 1)

        _, n_features = X.shape
        n_outputs = Y.shape[1]
        self.coef_ = np.zeros((n_features, n_outputs))

        for i in range(n_outputs):
            y = Y[:, i]
            Xi = np.linalg.lstsq(X.T @ X + self.alpha * np.eye(n_features), X.T @ y, rcond=None)[0]

            for it in range(self.max_iter):
                small_inds = np.abs(Xi) < self.threshold
                if self.verbose:
                    print(f"Output {i}, Iter {it}: {np.sum(small_inds)} coefficients zeroed")

                Xi[small_inds] = 0
                big_inds = ~small_inds

                if np.count_nonzero(big_inds) == 0:
                    if self.verbose:
                        print(f"All coefficients zeroed for output {i}")
                    break

                Xi[big_inds] = np.linalg.lstsq(
                    X[:, big_inds].T @ X[:, big_inds] + self.alpha * np.eye(np.sum(big_inds)),
                    X[:, big_inds].T @ y, rcond=None
                )[0]

            self.coef_[:, i] = Xi

        return self

    def predict(self, X):
        if self.coef_ is None:
            raise ValueError("Model has not been fitted yet.")
        return X @ self.coef_


### Sparse identification of nonlinear dynamical systems model

In [76]:
class SindyModel:
    def __init__(self, transform_fn, alpha=0.001, l0_penalty=1, fit_intercept=False, max_iter=10000):
        self.transform_fn = transform_fn
        self.alpha = alpha
        self.fit_intercept = fit_intercept
        self.max_iter = max_iter
        self.models = []
        self.X_raw = None
        self.X_transformed = None
        self.Y = None
        self.l0_penalty = l0_penalty

    def tv_denoise(self, signal, lambda_, max_iter=100, tol=1e-6):
        """Total Variation Denoising."""
        n = len(signal)
        u = np.copy(signal)
        px = np.zeros_like(signal)
        tau = 0.125

        for _ in range(max_iter):
            u_old = u.copy()

            grad_u = np.roll(u, -1) - u
            grad_u[-1] = 0

            px += tau * grad_u
            px = px / np.maximum(1.0, np.abs(px))

            div_p = px - np.roll(px, 1)
            div_p[0] = px[0]

            u = signal - lambda_ * div_p

            if np.linalg.norm(u - u_old) < tol:
                break
        return u

    def prepare_data(self, time, variables, denoise=False, lambda_tv=0.1):
        dt = time[1] - time[0]
        variables = [np.asarray(v) for v in variables]
        
        if denoise:
            variables_denoised = [self.tv_denoise(v, lambda_tv) for v in variables]
        else:
            variables_denoised = variables

        derivatives = [finite_difference(v, dt) for v in variables_denoised]

        self.Y = np.stack(derivatives, axis=-1)
        self.X_raw = np.stack(variables_denoised, axis=-1)[1:]
        self.X_transformed = np.array([self.transform_fn(row) for row in self.X_raw])

    def fit(self):
        n_outputs = self.Y.shape[1]
        if isinstance(self.alpha, (int, float)):
            alphas = [self.alpha] * n_outputs
        elif isinstance(self.alpha, (list, tuple, np.ndarray)):
            assert len(self.alpha) == n_outputs, "Length of alpha list must match number of outputs"
            alphas = self.alpha
        else:
            raise ValueError("alpha must be a float or a list of floats")

        self.models = []
        for i in range(n_outputs):
            model = STLSQ(threshold=0.2, max_iter=35, alpha=self.alpha).fit(self.X_transformed, self.Y[:, i])
            self.models.append(model)

    def predict(self, X_raw):
        X_trans = np.array([self.transform_fn(row) for row in X_raw])
        predictions = np.column_stack([model.predict(X_trans) for model in self.models])
        return predictions

    def coefficients(self):
        return np.array([model.coef_ for model in self.models])

### List of candidate functions

In [77]:
t = np.arange(0, simulation_duration, dt)

def custom_features(x):
    x1, y1 = x
    return [
        x1,
        y1,
        x1**2,
        y1**2,
        x1 * y1,
        np.sin(x1),
        np.cos(x1),
        np.sin(2 * x1),
        np.cos(2 * x1),
        np.sin(y1),
        np.cos(y1),
        np.sin(2 * y1),
        np.cos(2 * y1),
        x1 * y1,
        x1**2 * y1,
        x1 * y1**2,
        np.sin(x1) * np.cos(y1),
        np.cos(x1) * np.sin(y1),
        np.sin(x1 + y1),
        np.cos(x1 - y1),
    ]

In [78]:
def select_alpha_with_tracking(x_train_data, x_test_data, alphas, verbose=False, denoise=False):
    results = []
    x_test_c = copy(x_test_data)
    derivatives = np.array([
        pendulum_motion(t, state, G, L)
        for t, state in zip(t_test, x_test_data.T)
    ])

    for alpha in alphas:
        model = SindyModel(transform_fn=custom_features, alpha=alpha)
        model.prepare_data(t, copy(x_train_data), denoise = denoise)
        model.fit()

        print("Learned Coefficient Matrix (columns = d_theta/dt, d_omega/dt):")
        preds = model.predict(model.X_raw)

        mse = mean_squared_error(model.Y, preds)

        preds_test = model.predict(x_test_c.T)
        mse_der = mean_squared_error(derivatives, preds_test)

        nnz = np.count_nonzero(np.abs(model.coefficients()) > 0.05)

        if verbose:
            print(f"Alpha: {alpha:.5e} | MSE: {mse:.5e} | Non-zero Coeffs: {nnz}")

        results.append({
            "alpha": alpha,
            "mse": mse,
            "nnz": nnz,
            "model": model,
            "mse_der": mse_der
        })

    return results

In [79]:
def plot_alpha_results(results):
    alphas = [r["alpha"] for r in results]
    mses = [r["mse"] for r in results]
    mses_der = [r["mse_der"] for r in results]

    nnzs = [r["nnz"] for r in results]

    fig, ax1 = plt.subplots()

    color = 'tab:red'
    ax1.set_xlabel('Alpha (λ)')
    ax1.set_xscale('log')
    ax1.set_ylabel('MSE', color=color)
    ax1.plot(alphas, mses, color=color, marker='o', label='MSE')
    ax1.tick_params(axis='y', labelcolor=color)

    ax2 = ax1.twinx()
    color = 'tab:blue'
    ax2.set_ylabel('Non-Zero Coefficients', color=color)
    ax2.plot(alphas, nnzs, color=color, marker='x', label='Sparsity')
    ax2.tick_params(axis='y', labelcolor=color)

    ax3 = ax1.twinx()
    ax3.spines['right'].set_position(('outward', 60))
    color = 'tab:green'
    ax3.set_ylabel('MSE Derivative', color=color)
    ax3.plot(alphas, mses_der, color=color, marker='^', label='MSE Derivative')
    ax3.tick_params(axis='y', labelcolor=color)

    plt.title("Sparsity vs Error Trade-off Across Alphas")
    fig.tight_layout()
    plt.show()

In [80]:
model = SindyModel(transform_fn=custom_features, alpha=0.1, l0_penalty=0.1)
model.prepare_data(t, x_train)
model.fit()

In [81]:
model.coefficients().T

array([[[ 0.        ,  0.        ],
        [ 0.99999226,  0.        ],
        [ 0.        ,  0.        ],
        [ 0.        ,  0.        ],
        [ 0.        ,  0.        ],
        [ 0.        , -2.45000494],
        [ 0.        ,  0.        ],
        [ 0.        ,  0.        ],
        [ 0.        ,  0.        ],
        [ 0.        ,  0.        ],
        [ 0.        ,  0.        ],
        [ 0.        ,  0.        ],
        [ 0.        ,  0.        ],
        [ 0.        ,  0.        ],
        [ 0.        ,  0.        ],
        [ 0.        ,  0.        ],
        [ 0.        ,  0.        ],
        [ 0.        ,  0.        ],
        [ 0.        ,  0.        ],
        [ 0.        ,  0.        ]]])

In [82]:
def simulate(model, initial_state, t_array, method="euler"):
    states = [initial_state]
    state = copy(initial_state)
    info = {"method": method, "steps": len(t_array)}
    state = state.reshape(1, -1)
    for i in range(1, len(t_array)):
        t = t_array[i-1]
        dt = t_array[i] - t_array[i-1]
        derivative = model.predict(state)

        if method == "euler":
            state = state + dt * derivative
        else:
            raise ValueError(f"Unsupported method: {method}")

        states.append(state.copy()[0])
    return np.array(states), info


In [87]:
def learned_pendulum_dynamics(y, t, model):
    """Use learned differential equation"""
    y_tensor = torch.tensor(y, dtype=torch.float32).unsqueeze(0)
    ddot_theta_tensor = model.predict(y_tensor)
    return [y[1], ddot_theta_tensor[0][1]]

In [88]:

from scipy.integrate import odeint
informed_simulation = odeint(learned_pendulum_dynamics, test_initial_pendulum_config, t, args=(model, ))

  np.sin(x1),
  np.cos(x1),
  np.sin(2 * x1),
  np.cos(2 * x1),
  np.sin(y1),
  np.cos(y1),
  np.sin(2 * y1),
  np.cos(2 * y1),
  np.sin(x1) * np.cos(y1),
  np.cos(x1) * np.sin(y1),
  np.sin(x1 + y1),
  np.cos(x1 - y1),


In [89]:
informed_simulation = informed_simulation.T
informed_simulation[0] = loop_around_angles(informed_simulation[0])
joints_over_time_informed = compute_joints_position(informed_simulation[0], L)

In [93]:
animate_pendulum_versus(test_joints_over_time, joints_over_time_informed,"our_informed_single_pendulum.mp4", interval=0.003, fps=30)

In [91]:
draw_state_diagrams(x_test_theta, informed_simulation[0], t_test, "our_informed_single_simulation.png")

In [92]:
plot_progressive_erros(x_test_theta, informed_simulation[0], t_test, "smoothed_noisy_errors.png")

### Impact of noisy data

In [94]:
noise_magnitude = 0.001
x_train_noise = x_train + np.random.normal(0, 1, x_train.shape) * noise_magnitude

noise_magnitude = 0.001
x_test_noise = x_test + np.random.normal(0, 1, x_test.shape) * noise_magnitude

In [95]:
model = SindyModel(transform_fn=custom_features, alpha=1, l0_penalty=0.1)
model.prepare_data(t, x_train_noise)
model.fit()

In [96]:
model.coefficients().T

array([[[  90.97020927,   32.05706109],
        [ -31.15258862,  198.26331651],
        [  39.99292622,  -21.39530603],
        [  10.70860878,  -16.27162165],
        [  -0.30886338,    0.        ],
        [-132.01520386,   33.31264526],
        [ -76.04976338,   76.85879811],
        [ -23.19246874,    4.30342556],
        [   9.5414803 ,    1.17373575],
        [  27.46299437, -167.29628296],
        [ -57.23016121,   31.79607744],
        [   8.73870999,  -54.98165577],
        [   1.4568133 ,   -2.63074149],
        [  -0.30886338,    0.        ],
        [ -10.91159862,   67.64033592],
        [  22.1288349 ,  -27.85808593],
        [ -10.17077049,   51.70955833],
        [  30.7952163 , -185.95250994],
        [  20.62444581, -134.24295161],
        [   1.01598981,    0.        ]]])