In [1]:
import numpy as np
import pysindy as ps

from scipy.integrate import solve_ivp
from pysindy.utils import linear_damped_SHO

import matplotlib.pyplot as plt

In [2]:
integrator_keywords = {}
integrator_keywords['rtol'] = 1e-12
integrator_keywords['method'] = 'LSODA'
integrator_keywords['atol'] = 1e-12

In [3]:
# training data
dt = 0.01
t_train = np.arange(0, 25, dt)
t_train_span = (t_train[0], t_train[-1])
x0_train = [2, 0]
x_train = solve_ivp(linear_damped_SHO, t_train_span,
                    x0_train, t_eval=t_train, **integrator_keywords).y.T

In [4]:
# differentiate
def gen_diff(x_data, t_data, order=2, diff=None):
    diff = ps.differentiation.FiniteDifference(order=order) if diff is None else diff
    x_dot = diff._differentiate(x_data, t_data)
    return x_dot

In [5]:
def gen_Theta(data, lib="poly", order=3):
    if isinstance(lib, str):
        if lib=="poly":
            from sklearn.preprocessing import PolynomialFeatures

            return PolynomialFeatures(order).fit_transform(data)[:,1:]
        elif lib=="trig":
            from pysindy.feature_library import FourierLibrary

            return np.array(FourierLibrary(order).fit(data).transform(data))
    else:
        from pysindy.feature_library import CustomLibrary

        return np.array(CustomLibrary(library_functions=lib).fit(data).transform(data))

In [6]:
import numpy as np

def calculate_regression(theta: np.ndarray, uprime: np.ndarray,
                         threshold: float, max_iterations: int) -> np.ndarray:
    """Finds a xi matrix that fits theta * xi = uprime, using the sequential
    thresholded least-squares algorithm, which is a regression algorithm that
    promotes sparsity.

    The authors of the SINDy paper designed this algorithm as an alternative
    to LASSO, because they found LASSO to be algorithmically unstable, and
    computationally expensive for very large data sets.
    """
    # Solve theta * xi = uprime in the least-squares sense.
    xi = np.linalg.lstsq(theta, uprime, rcond=None)[0]
    n = xi.shape[1]

    print(xi)

    # Add sparsity.
    for i in range(max_iterations):
        small_indices = np.abs(xi) < threshold
        xi[small_indices] = 0
        for j in range(n):
            big_indices = np.logical_not(small_indices[:, j])
            xi[big_indices, j] = np.linalg.lstsq(theta[:, big_indices],
                                                 uprime[:, j],
                                                 rcond=None)[0]

    return xi

In [7]:
x_dot_train = gen_diff(x_train, t_train, order=3)
Theta = gen_Theta(x_train, order=1)
xi = calculate_regression(Theta, x_dot_train, 0.01, 1)

[[-0.10000129 -1.99999973]
 [ 1.99999972 -0.10000131]]


In [8]:
xi
Theta

array([[ 2.        ,  0.        ],
       [ 1.99760141, -0.03995736],
       [ 1.99440741, -0.07981887],
       ...,
       [ 0.15601786,  0.0526537 ],
       [ 0.1568827 ,  0.04947352],
       [ 0.15768297,  0.04627988]])

In [9]:
import torch
import torch.nn as nn
import torch.optim as optim

class SINDy(nn.Module):
    def __init__(self, input_dim, output_dim, sparsity):
        super(SINDy, self).__init__()
        self.linear = nn.Linear(input_dim, output_dim)
        self.sparse_loss = nn.L1Loss()
        self.sparsity = sparsity
        
    def forward(self, x):
        return self.linear(x)
    
    def lasso_loss(self, parameters):
        l1_penalty = torch.norm(parameters, 1)
        return self.sparsity * l1_penalty
    
    def fit(self, X, y, lr=0.01, epochs=100):
        optimizer = optim.Adam(self.parameters(), lr=lr)
        for epoch in range(epochs):
            optimizer.zero_grad()
            output = self(X)
            loss = self.sparse_loss(output, y) + self.lasso_loss(self.linear.weight)
            loss.backward()
            optimizer.step()
            if epoch % 10 == 0:
                print(f'Epoch [{epoch}/{epochs}], Loss: {loss.item()}')

# Example usage
# Assuming X is your input data (features) and y is your target data
X = torch.tensor(x_train, dtype=torch.float32)
y = torch.tensor(x_train, dtype=torch.float32)

input_dim = X.shape[1]  # Number of input features
output_dim = y.shape[1]  # Number of output features
sparsity = 0.1  # Sparsity parameter

model = SINDy(input_dim, output_dim, sparsity)
model.fit(X, y)

coefficients = model.linear.weight.detach().numpy()
print("Coefficients:")
print(coefficients)


Epoch [0/100], Loss: 0.890059769153595
Epoch [10/100], Loss: 0.776807963848114
Epoch [20/100], Loss: 0.6694894433021545
Epoch [30/100], Loss: 0.5678475499153137
Epoch [40/100], Loss: 0.4791262745857239
Epoch [50/100], Loss: 0.3905414044857025
Epoch [60/100], Loss: 0.33241647481918335
Epoch [70/100], Loss: 0.3263525664806366
Epoch [80/100], Loss: 0.31071481108665466
Epoch [90/100], Loss: 0.30293238162994385
Coefficients:
[[3.0561677e-01 1.3608538e-04]
 [1.5463101e-03 9.9218392e-01]]


In [10]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim

class SINDy(nn.Module):
    def __init__(self, input_dim, output_dim, library_functions):
        super(SINDy, self).__init__()
        self.library_functions = library_functions
        self.output_dim = output_dim
        self.coefficients = nn.Parameter(torch.randn(len(library_functions), output_dim))

    def forward(self, x):
        return torch.matmul(x, self.coefficients)

def generate_library_functions(X, degree):
    # Generate library functions up to a certain degree
    library_functions = []
    for d in range(1, degree + 1):
        for i in range(X.shape[1]):
            library_functions.append(np.power(X[:, i], d))
    library_functions = np.column_stack(library_functions)
    return library_functions

def lasso_loss(parameters, sparsity):
    l1_penalty = torch.norm(parameters, 1)
    return sparsity * l1_penalty

def train_sindy(X, y, degree, sparsity, lr=0.01, epochs=100):
    library_functions = generate_library_functions(X, degree)
    input_dim = library_functions.shape[1]
    output_dim = y.shape[1]

    model = SINDy(input_dim, output_dim, library_functions)
    optimizer = optim.Adam(model.parameters(), lr=lr)

    for epoch in range(epochs):
        optimizer.zero_grad()
        output = model(torch.tensor(library_functions, dtype=torch.float32))
        loss = torch.mean((output - torch.tensor(y, dtype=torch.float32)) ** 2) + lasso_loss(model.coefficients, sparsity)
        loss.backward()
        optimizer.step()
        if epoch % 10 == 0:
            print(f'Epoch [{epoch}/{epochs}], Loss: {loss.item()}')

    return model.coefficients.detach().numpy()

# Example usage
# Assuming X is your input data (features) and y is your target data
X = np.array(x_train)
y = np.array(x_train)

degree = 2  # Maximum degree of the polynomial terms
sparsity = 0.1  # Sparsity parameter
coefficients = train_sindy(X, y, degree, sparsity)
print("Coefficients:")
print(coefficients)


RuntimeError: mat1 and mat2 shapes cannot be multiplied (2500x4 and 2500x2)

In [None]:
import torch
from torch import nn

class LassoRegression(torch.nn.Module):
    def __init__(self, input_size, output_size, alpha):
        super(LassoRegression, self).__init__()
        self.linear = torch.nn.Linear(input_size, output_size)
        self.alpha = alpha
        
    def forward(self, x):
        return self.linear(x)
    
    def l1_penalty(self):
        l1_crit = torch.nn.L1Loss(reduction='sum')
        reg_loss = 0
        for param in self.parameters():
            reg_loss += l1_crit(param, torch.zeros_like(param))
        return self.alpha * reg_loss

# Sample data
X = torch.randn(100, 10)  # 100 samples, 10 features
Y = torch.randn(100, 1)    # 100 samples, 1 output

# Model, criterion, and optimizer
model = LassoRegression(input_size=10, output_size=1, alpha=0.01)
criterion = torch.nn.MSELoss()
optimizer = torch.optim.SGD(model.parameters(), lr=0.01)

# Training loop
num_epochs = 1000
for epoch in range(num_epochs):
    # Forward pass
    outputs = model(X)
    # Loss
    loss = criterion(outputs, Y) + model.l1_penalty()
    # Backward pass and optimization
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    if (epoch+1) % 10 == 0:
        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item()}')

# After training
print('Final Lasso regression coefficients:')
for name, param in model.named_parameters():
    if 'weight' in name:
        print(name, param.data)


AttributeError: partially initialized module 'torch._dynamo' has no attribute 'external_utils' (most likely due to a circular import)