In [102]:
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
import scipy


In [103]:
## B-spline class

class Bspline():
    
    def __init__(self, order, knots):
        
        self.knots = knots
        self.order = order
        self.degree = order - 1
        self.num_params = len(knots) + order
        
    def bspline_basis(self, x, k, i, t):
        """
        B-spline basis function value B(j,n) at x.
        
        Input arguments:
         x:
            value where the basis function is to be evaluated
         n:
            B-spline order (2 for linear, 1 for quadratic, etc.)
         i:
            interval index, 0 =< j < numel(t)-k-1

         t:
            knot vector

         Output arguments:
         y:
        
        """
        if k == 0:

            return 1.0 if t[i] <= x < t[i+1] else 0.0

        if t[i+k] == t[i]:

            c1 = 0.0 #indeterminate forms 0/0 are deemed to be zero

        else:
            
            c1 = (x - t[i])/(t[i+k] - t[i]) * self.bspline_basis(x, k-1, i, t)

        if t[i+k+1] == t[i+1]:

            c2 = 0.0 #indeterminate forms 0/0 are deemed to be zero

        else:

            c2 = (t[i+k+1] - x)/(t[i+k+1] - t[i+1]) * self.bspline_basis(x, k-1, i+1, t)
        
        return c1 + c2
    
    def basis_matrix(self, x):
        """
        B-spline basis function value matrix B(n) for x.
        
        Input arguments:
         n:
            B-spline order (2 for linear, 3 for quadratic, etc.)
         t:
            knot vector
         x (optional):
            an m-dimensional vector of values where the basis function is to be
            evaluated
        
        Output arguments:
         B:
            a matrix of m rows and numel(t)-n columns
        """
        n = self.order
        t = self.knots
        k = n-1
        
        B = np.zeros((len(x),len(t) - n))
        
        for i in range(len(t) - n):
            for j in range(len(x)):
                B[j,i] = self.bspline_basis(x[j], k, i, t)
            
        return B        
        
    def fit(self, x_data, y_data):
        
        Bmat =  self.basis_matrix(x_data)
        
        #Solve least squares y = B * c
        self.coeffs = np.linalg.lstsq(Bmat, y_data, rcond=None)[0]
    
    def predict(self, x_pred, coeffs):
        
        Bmat =  self.basis_matrix(x_pred)
        
        y_pred = Bmat @ coeffs
        
        return y_pred

In [106]:
import sklearn.preprocessing
class DynamicsBasis(sklearn.preprocessing.PolynomialFeatures):
    
    def __init__(self, degree, X):  
        super().__init__(degree = degree)
        self.dim = X.shape[1]
        self.m_samples = X.shape[0]
        self.degree = degree
        self.X = X
        self.phi_matrix = self.fit_transform(self.X)
        self.num_params = self.phi_matrix.shape[1]
        self.basis_names = self.get_feature_names()
        
    def phi_matrix(self, X):
        
        self.phi_matrix = self.fit_transform(X)
        
        return self.phi_matrix
        
    def psi_matrix(self, X, t):
        
        Dt = t[1] - t[0]
        
        T = Dt * np.tril(np.ones(self.m_samples))
        
        self.phi_matrix = self.fit_transform(X)
        
        return T @ self.phi_matrix
    
#     def big_psi_matrix(self, t):
        
#         psi_mat = self.psi_matrix(t)
        
#         return np.kron(np.eye(self.dim), psi_mat)
        
        

In [123]:
class SRSplineIdentification():
    
    def __init__(self, X, t, poly_degree, spline_order):
        
        self.Y_data = X.flatten()
        self.t = t
        self.m_samples = X.shape[0]
        self.dim = X.shape[1]
        knots = np.concatenate([t[0]*np.ones(3), t, t[-1]*np.ones(3)])
        self.bspline = Bspline(spline_order, knots)#Initialize bspline object
        
        self.spline_matrix = np.kron(np.eye(self.dim), self.bspline.basis_matrix(t))
        self.t_half = 0.5 * (t[1:-1] + t[0:-2])
        self.spline_matrix_half = np.kron(np.eye(self.dim), self.bspline.basis_matrix(self.t_half))
        self.dyn_basis = DynamicsBasis(poly_degree, X)#Initialize dynamics basis object
        
        self.parameters = self.initialize_parameters()
        
        self.eps = 1e-4
        self.alpha = 1e-1
        self.lambd = 1e-3

        
    def initialize_parameters(self):
        
        self.xi = tf.constant(np.random.rand(self.dyn_basis.num_params, self.dim).flatten(), dtype = tf.float32)
        self.theta = tf.constant(np.random.rand(self.bspline.num_params, self.dim).flatten(), dtype = tf.float32)
        
        return tf.concat([self.xi, self.theta], axis = 0)
    
    def spline_predict(self, mode):
        
        if mode == 0:
            return tf.matmul(self.spline_matrix, self.theta)
        else:
            print(self.spline_matrix_half)
            return tf.matmul(self.spline_matrix_half, self.theta)
        
    def spline_residual(self):
        
        Y_pred = self.spline_predict(0)
                        
        return self.Y_data - Y_pred
        
    def dynamics_residual(self):
        
        Y_tilde = self.Y_data[1:-1]
        
        Y_pred = self.spline_predict(1)
        
        psi_mat = self.dyn_basis.psi_matrix(self.t_half, Y_pred)
        
        D_big = np.kron(np.eye(self.dim), psi_mat)
                
        return Y_tilde - tf.matmul(D_big,self.xi)
        
    def l1_residual(self):
        
        W = tf.linalg.diag(1 / tf.abs(self.xi) + self.eps * tf.ones(len(xi)))
        L = tf.sqrt(W);
        
        return  tf.matmul(L, self.xi)
    
    def residual_vec(self):
        
        return tf.concat([self.dynamics_residual(), tf.sqrt(self.alpha) * self.spline_residual(),
                          tf.sqrt(self.lambd) * self.l1_residual()], axis = 0)
        
    def loss(self):
        
        return 0.5 * tf.reduce_sum(tf.square(self.residual_vec()))
        
    def get_jacobian(self):
        
        with tf.GradientTape(persistent = True) as tape:
            
            tape.watch(self.parameters)
            
            res_vec = self.residual_vec()
            
        jac = tape.jacobian(res_vec, self.parameters)
        
        del tape
        
        return jac            
        
    def get_gradient(self):
        
        with tf.GradientTape(persistent = True) as tape:
            
            tape.watch(self.parameters)
            
            loss_value = self.loss()
            
        grad = tape.gradient(loss_value, self.parameters)
        
        del tape
        
        return grad
        
    def train_step(self):
        pass
    
    def train(self):
        pass
        


In [124]:
X = np.random.rand(100,2)
t = np.linspace(0,1,100)
poly_deg = 3
spline_order = 4

model = SRSplineIdentification(X, t, poly_deg, spline_order)

In [125]:
model.get_gradient()

[[0.125      0.59375    0.26041667 ... 0.         0.         0.        ]
 [0.         0.03125    0.46875    ... 0.         0.         0.        ]
 [0.         0.         0.02083333 ... 0.         0.         0.        ]
 ...
 [0.         0.         0.         ... 0.         0.         0.        ]
 [0.         0.         0.         ... 0.02083333 0.         0.        ]
 [0.         0.         0.         ... 0.46875    0.03125    0.        ]]


InvalidArgumentError: cannot compute MatMul as input #1(zero-based) was expected to be a double tensor but is a float tensor [Op:MatMul]