In [None]:
import numpy as np
import sys 
sys.path.append(r'../../Python Script/')
from sympy import symbols, simplify, derive_by_array
from scipy.integrate import solve_ivp
from xLSINDy import *
from sympy.physics.mechanics import *
from sympy import *
import sympy
import torch
import HLsearch as HL
import matplotlib.pyplot as plt

In [None]:
# System parameters
# L= 1
# mp, mc = 0.5, 1
# k1, k2 = 0.5, 0.5
# g = 9.81

In [None]:
states_dim = 4
states = ()
states_dot = ()
for i in range(states_dim):
    if(i<states_dim//2):
        states = states + (symbols('x{}'.format(i)),)
        states_dot = states_dot + (symbols('x{}_t'.format(i)),)
    else:
        states = states + (symbols('x{}_t'.format(i-states_dim//2)),)
        states_dot = states_dot + (symbols('x{}_tt'.format(i-states_dim//2)),)
print('states are:',states)
print('states derivatives are: ', states_dot)

#Turn from sympy to str
states_sym = states
states_dot_sym = states_dot
states = list(str(descr) for descr in states)
states_dot = list(str(descr) for descr in states_dot)

In [None]:
#For friction force
x0 = Symbol(states[0], real=True)
x1 = Symbol(states[1], real=True)
x0_t = Symbol(states[2],real=True)
x1_t = Symbol(states[3],real=True)
q = sympy.Array([x0, x1])
qdot = sympy.Array([x0_t, x1_t])

#True Rayleigh Dissipation function
dummy = Symbol('a', real = True)
R = dummy #0.5*k1*x0_t**2 + 0.5*k2*(x1_t - x0_t)**2 #+ k1*Abs(x0_t) + k2*Abs(x1_t - x0_t)

#friction force
f_forcing = sympy.Matrix(derive_by_array(R, qdot)) 

In [None]:
#for lagrangian
x0 = dynamicsymbols(states[0], real=True)
x1 = dynamicsymbols(states[1], real=True)
x0_t = dynamicsymbols(states[0],1, real=True)
x1_t = dynamicsymbols(states[1],1, real=True)
F = symbols('F')
mc = symbols('mc')
mp = symbols('mp')
L = symbols('L')
g = symbols('g')

#True Lagrangian
L = 0.5*(mc+mp)*x1_t**2 + 0.5*mp*L**2*x0_t**2 + mp*L*x0_t*x1_t*cos(x0) + mp*g*L*cos(x0)
# Lagrange's method
LM = LagrangesMethod(L, [x0,x1])
LM.form_lagranges_equations()
i_forcing = LM.forcing #internal forcing and gravity
e_forcing = sympy.Matrix([0, F]) #external generalized force

In [None]:
# Substituting dynamic symbols

i_forcing = i_forcing.subs(x0_t, states_sym[2])
i_forcing = i_forcing.subs(x1_t, states_sym[3])
i_forcing = i_forcing.subs(x0, states_sym[0])
i_forcing = i_forcing.subs(x1, states_sym[1])

M = LM.mass_matrix
M = M.subs(x0, states_sym[0])
M = M.subs(x1, states_sym[1])

In [None]:
# Generating equation of motion
t_forcing = i_forcing + e_forcing - f_forcing
eom = M.inv()*sympy.Matrix(t_forcing)

In [None]:
eom = simplify(eom)

In [None]:
''' Please copy the string shown to the definition of equation in the function of double pendulum'''
for i in range(len(eom)):
    print('Equation ' + str(i) +': ' + str(eom[i]))
    print('\n')

In [None]:
import time

g=9.81
mp=0.5

mc=1
L=1

def torque(t,omega):
    return 0*np.cos(omega*t), 1*np.cos(omega*t)


def cartpole(t,y,omega):
    from numpy import sin, cos, sign
    x0,x1,x0_t,x1_t = y
    _, F = torque(t, omega)
    x0_tt = -(1.0*g*(mc + mp)*sin(x0) + 1.0*(F + L*mp*x0_t**2*sin(x0))*cos(x0))/(L*(mc + mp*sin(x0)**2))
    x1_tt =  1.0*(F + L*mp*x0_t**2*sin(x0) + g*mp*sin(x0)*cos(x0))/(mc + mp*sin(x0)**2)
    return x0_t,x1_t,x0_tt,x1_tt


def generate_data(func, time, init_values, omega):
    sol = solve_ivp(func,[time[0],time[-1]],init_values,t_eval=time, method='LSODA', rtol=1e-10,atol=1e-10, args=[omega])
    return sol.y.T, np.array([func(time[i],sol.y.T[i,:], omega = omega) for i in range(sol.y.T.shape[0])],dtype=np.float64)

In [None]:
#Saving Directory
rootdir = "../../Cart Pendulum/Data/Active/"

num_sample = 100
create_data = False
training = True
save = False
noiselevel = 1e-1

In [None]:
#Create training data
if(create_data):
    print("Creating Data . . .")
    num_sample = 100
    X, Xdot = [], []
    Tau = []
    Omega = []
    for i in range(num_sample):
        t = np.arange(0,5,0.01)
        theta = np.random.uniform(-np.pi, np.pi)
        thetadot = np.random.uniform(0,0)
        omega = np.random.uniform(np.pi/2, np.pi)
        
        _, F = torque(t, omega)
        tau = np.array([_, F]).T    
        y0=np.array([theta, 0, thetadot, 0])
        x,xdot = generate_data(cartpole,t,y0, omega)
        
        #Omega.append(omega)
        Tau.append(tau)
        X.append(x)
        Xdot.append(xdot)

    X = np.vstack(X)
    Xdot = np.vstack(Xdot)
    Tau = np.vstack(Tau)
    if(save==True):
        np.save(rootdir + "X.npy", X)
        np.save(rootdir + "Xdot.npy",Xdot)
        np.save(rootdir + "Tau.npy", Tau)
else:
    X = np.load(rootdir + "X.npy")
    Xdot = np.load(rootdir + "Xdot.npy")
    Tau = np.load(rootdir + "Tau.npy")

In [None]:
#adding noise
mu, sigma = 0, noiselevel
noise = np.random.normal(mu, sigma, X.shape[0])
for i in range(X.shape[1]):
    X[:,i] = X[:,i]+noise
    Xdot[:,i] = Xdot[:,i]+noise

In [None]:
states_dim = 4
states = ()
states_dot = ()
for i in range(states_dim):
    if(i<states_dim//2):
        states = states + (symbols('x{}'.format(i)),)
        states_dot = states_dot + (symbols('x{}_t'.format(i)),)
    else:
        states = states + (symbols('x{}_t'.format(i-states_dim//2)),)
        states_dot = states_dot + (symbols('x{}_tt'.format(i-states_dim//2)),)
print('states are:',states)
print('states derivatives are: ', states_dot)

In [None]:
#Turn from sympy to str
states_sym = states
states_dot_sym = states_dot
states = list(str(descr) for descr in states)
states_dot = list(str(descr) for descr in states_dot)

In [None]:
#Separating states of pendulum and cart
pendulum_states = []
cartpole_states = []
for i in range(states_dim):
    if(i%2==0):
        pendulum_states.append(states[i])
    else:
        cartpole_states.append(states[i])

In [None]:
#build function expression for the library in str
pend_terms = HL.buildFunctionExpressions(1,states_dim//2,pendulum_states,use_sine=True)
cartpole_terms = HL.buildFunctionExpressions(1,states_dim//2,cartpole_states,use_sine=False)

#Assuming we get a prior knowledge about a single pendulum equations
temp = pend_terms[1:] + cartpole_terms
expr = HL.buildFunctionExpressions(3,len(temp),temp)

In [None]:
### Boundaries for debugging with only the correct terms ###

In [None]:
#Creating library tensor
Zeta, Eta, Delta = LagrangianLibraryTensor(X,Xdot,expr,states,states_dot, scaling=False)

In [None]:
## Case I, input torque provided##
expr = np.array(expr)


#non-penalty index from prev knowledge
i1 = np.where(expr == 'x0_t**2')[0][0]
i2 = np.where(expr == 'cos(x0)')[0][0]

nonpenaltyidx = [i1,i2]

expr = expr.tolist()

In [None]:
#Moving to Cuda
device = 'cuda:0'

Zeta = Zeta.to(device)
Eta = Eta.to(device)
Delta = Delta.to(device)

#computing upsilon
UpsilonR = Upsilonforward(Zeta, Eta, Delta, Xdot, device)

In [None]:
xi_L = torch.ones(len(expr), device=device).data.uniform_(-10,10)
prevxi_L = xi_L.clone().detach()

In [None]:
def loss(pred, targ):
    loss = torch.mean((pred - targ)**2) 
    return loss 

In [None]:
def clip(w, alpha):
    clipped = torch.minimum(w,alpha)
    clipped = torch.maximum(clipped,-alpha)
    return clipped

def proxL1norm(w_hat, alpha, nonpenaltyidx):
    if(torch.is_tensor(alpha)==False):
        alpha = torch.tensor(alpha)
    w = w_hat - clip(w_hat,alpha)
    for idx in nonpenaltyidx:
        w[idx] = w_hat[idx]
    return w

In [None]:
def training_loop(coef, prevcoef, UpsilonR, Tau, xdot, bs, lr, lam, momentum=True):
    loss_list = []
    tl = xdot.shape[0]
    n = xdot.shape[1]

    if(torch.is_tensor(xdot)==False):
        xdot = torch.from_numpy(xdot).to(device).float()
    if(torch.is_tensor(Tau)==False):
        Tau = torch.from_numpy(Tau).to(device).float()

    v = coef.clone().detach().requires_grad_(True)
    prev = v
    
    for i in range(tl//bs):
                
        #computing acceleration with momentum
        if(momentum==True):
            vhat = (v + ((i-1)/(i+2))*(v - prev)).clone().detach().requires_grad_(True)
        else:
            vhat = v.requires_grad_(True).clone().detach().requires_grad_(True)
   
        prev = v

        #Computing loss
        upsilonR = UpsilonR[:,:,i*bs:(i+1)*bs]
        tau = Tau[i*bs:(i+1)*bs]


        #forward
        pred = torch.einsum('jkl,k->jl', upsilonR, vhat)
        targ = tau.T
        
        lossval = loss(pred, targ)
        
        #Backpropagation
        lossval.backward()

        with torch.no_grad():
            v = vhat - lr*vhat.grad
            v = (proxL1norm(v,lr*lam,nonpenaltyidx))
            
            # Manually zero the gradients after updating weights
            vhat.grad = None
        
        
    
        
        loss_list.append(lossval.item())
    print("Average loss : " , torch.tensor(loss_list).mean().item())
    return v, prevcoef, torch.tensor(loss_list).mean().item()

In [None]:
Epoch = 500
i = 0
lr = 5e-6
lam = 0
temp = 200
while(i<=Epoch):
    print("Epoch "+str(i) + "/" + str(Epoch))
    print("Learning rate : ", lr)
    xi_L, prevxi_L, lossitem= training_loop(xi_L,prevxi_L,UpsilonR,Tau,Xdot,128,lr=lr,lam=lam)
    temp = lossitem
    i+=1

In [None]:
## Thresholding
threshold = 1e-2
surv_index = ((torch.abs(xi_L) >= threshold)).nonzero(as_tuple=True)[0].detach().cpu().numpy()
expr = np.array(expr)[surv_index].tolist()

xi_L =xi_L[surv_index].clone().detach().requires_grad_(True)
prevxi_L = xi_L.clone().detach()

## obtaining analytical model
xi_Lcpu = np.around(xi_L.detach().cpu().numpy(),decimals=2)
L = HL.generateExpression(xi_Lcpu,expr,threshold=1e-3)
print("Result stage 1: ", simplify(L))

In [None]:
Zeta, Eta, Delta = LagrangianLibraryTensor(X,Xdot,expr,states,states_dot, scaling=False)

expr = np.array(expr)
i1 = np.where(expr == 'x0_t**2')[0][0]
i2 = np.where(expr == 'cos(x0)')[0][0]

nonpenaltyidx = [i1,i2]


Zeta = Zeta.to(device)
Eta = Eta.to(device)
Delta = Delta.to(device)

#computing upsilon
UpsilonR = Upsilonforward(Zeta, Eta, Delta, Xdot, device)

In [None]:
### Debugging computation
expr = np.array(expr)
i0 = np.where(expr == 'x0_t**2')[0][0]
i1 = np.where(expr == 'x1_t**2')[0][0]
i2 = np.where(expr == 'cos(x0)')[0][0]
i3 = np.where(expr == 'x0_t*cos(x0)*x1_t')[0][0]

expr = [expr[i0],expr[i1],expr[i2],expr[i3]]


In [None]:
#Creating library tensor
Zeta, Eta, Delta = LagrangianLibraryTensor(X,Xdot,expr,states,states_dot, scaling=False)

In [None]:
#define the true coefficient
xi_True = torch.ones(len(expr))
xi_True[0] = 0.11#0.25
xi_True[1] = 0.36#0.75
xi_True[2] = 2.03#4.905
xi_True[3] = 0.24#0.5

In [None]:
#Moving to Cuda
device = 'cuda:0'

Zeta = Zeta.to(device)
Eta = Eta.to(device)
Delta = Delta.to(device)

xi_True = xi_True.to(device)

In [None]:
#compute tau prediction
xdot = torch.from_numpy(Xdot).to(device).float()
UpsilonL = Upsilonforward(Zeta, Eta, Delta, xdot, device)
TauPred = torch.einsum('jkl,k->jl', UpsilonL, xi_True).detach().cpu().numpy().T

TauEL = ELforward(xi_True, Zeta, Eta, Delta, xdot, device).detach().cpu().numpy().T

In [None]:
#plot the figures
left_boundary = 0
right_boundary = 5000
t = np.arange(left_boundary,right_boundary)
plt.plot(t,Tau[left_boundary:right_boundary,0])
plt.plot(t,TauPred[left_boundary:right_boundary,0])
plt.plot(t,TauEL[left_boundary:right_boundary,0])
plt.show()

plt.plot(t,Tau[left_boundary:right_boundary,1])
plt.plot(t,TauPred[left_boundary:right_boundary,1])
plt.plot(t,TauEL[left_boundary:right_boundary,1])
plt.show()

In [None]:
plt.plot(t, Xdot[left_boundary:right_boundary,2])
plt.show()

In [None]:
### Debugging for training with only known terms
expr = np.array(expr)
i0 = np.where(expr == 'x0_t**2')[0][0]
i1 = np.where(expr == 'x1_t**2')[0][0]
i2 = np.where(expr == 'cos(x0)')[0][0]
i3 = np.where(expr == 'x0_t*cos(x0)*x1_t')[0][0]

expr = [expr[i0],expr[i1],expr[i2],expr[i3]]

#non-penalty index from prev knowledge
expr = np.array(expr)
i4 = np.where(expr == 'x0_t**2')[0][0]
i5 = np.where(expr == 'cos(x0)')[0][0]
nonpenaltyidx = [i4,i5]

In [None]:
#Creating library tensor
Zeta, Eta, Delta = LagrangianLibraryTensor(X,Xdot,expr,states,states_dot, scaling=False)

In [None]:
#Moving to Cuda
device = 'cuda:0'

Zeta = Zeta.to(device)
Eta = Eta.to(device)
Delta = Delta.to(device)

In [None]:
#initialize coefficient
xi_L = torch.ones(len(expr), device=device).data.uniform_(-10,10)
prevxi_L = xi_L.clone().detach()

In [None]:
#compute Upsilon
xdot = torch.from_numpy(Xdot).to(device).float()
UpsilonR = Upsilonforward(Zeta, Eta, Delta, Xdot, device)