<a href="https://colab.research.google.com/github/modellatore/Codes/blob/main/Umassi_LNN_double_pendulum.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install torchdiffeq

In [None]:
import numpy as np
from scipy.integrate import odeint
from scipy.integrate import solve_ivp
import matplotlib.pyplot as plt 
import time

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

from functools import partial
from torch.autograd.functional import jacobian, hessian
from torchdiffeq import odeint as tor_odeint

In [None]:
def E(x, g=9.81, m1=1, m2=1, l1=1, l2=1):
    try:
        q, qt = torch.split(x, 2)
        cos = torch.cos
    except: 
        q, qt = np.split(x, 2)
        cos = np.cos
    t1, t2 = q[0], q[1]     #theta 1 and 2
    tt1, tt2 = qt[0], qt[1] #time derivatives of t1 and t2

    T = 0.5*m1*l1**2* tt1**2 * 0.5*m2*((l1*tt1)**2 + (l2*tt2)**2+2*l1*l2* tt1*tt2 *cos(t1-t2))
    V = -(m1+m2)*g*l1* cos(t1) - m2*g*l2* cos(t2)
    return T + V

def get_qdtt(q, qt, g=9.81, m1=1, m2=1, l1=1, l2=1):                                                #given generalized coord & vel it gives back accelerations

    qdtt = np.zeros_like(q)
    cos = np.cos
    sin = np.sin
    
    t1, t2 = q[:,0], q[:,1]       #theta 1 and 2
    tt1, tt2 = qt[:,0], qt[:,1]   #velocities of t1 and t2
    
    a = -g*(2*m1+m2)*sin(t1) - m2*g*sin(t1-2*t2) -2*m2*sin(t1-t2) * (l2*tt2**2 + l1*tt1**2*cos(t1-t2)) 
    b = l1 * (2*m1+m2-m2*cos(2*t1-2*t2))
    qdtt[:, 0] =  a / b  # acceleration of theta1 
    
    c = 2*sin(t1-t2) * (l1*tt1**2*(m1+m2) + g*(m1+m2)*cos(t1) + l2*m2*tt2**2*cos(t1-t2))
    qdtt[:, 1] = c / b   # acceleration of theta2

    return qdtt  

def get_xt_anal(x, t):
    d = np.zeros_like(x)
    d[:, :2] = x[:, 2:]                           #take last 2 coloumns of x and put them in the first 2 of d (so qt)
    d[:, 2:] = get_qdtt(x[:, :2], x[:, 2:])       #compute the accelerations (from q, qt)
    return d                                      # d = qt (input), qtt analytical

def anal_solve_ode(q0, qt0, t,):

    x0 = np.append(q0, qt0)

    def f_anal(x, t):
        d = np.zeros_like(x)
        d[:2] = x[2:]                                                                    #qt same as input
        d[2:] = np.squeeze(get_qdtt(np.expand_dims(x[:2], axis=0), np.expand_dims(x[2:], axis=0))) #qtt analytical
        return d 
    return odeint(f_anal, x0, t)                                  #integrate for q, qt
    
def q2xy(ql, l1=1, l2=1):         #from q, qt to cartesian coordinates and velocities 
    
    try: 
        xy = np.zeros_like(ql)
        sin = np.sin
        cos = np.cos
    except: 
        xy = torch.zeros_like(ql)
        sin = torch.sin
        cos = torch.cos
        
    t1, t2 = ql[:,0], ql[:,1]     #theta 1 and 2
    tt1, tt2 = ql[:,2], ql[:,3]  #time derivatives of t1 and t2

    xy[:, 0] = l1*sin(t1) + l2*sin(t2)                       #x
    xy[:, 1] = l1*cos(t1) + l2*cos(t2)                       #y
    xy[:, 2] = l1*tt1*cos(t1) + l2*tt2*cos(t2)               #vx
    xy[:, 3] = l1*tt1*sin(t1) + l2*tt2*sin(t2)               #vy
    return xy


In [None]:
##################################### M O D E L ####################################################

class LNN(nn.Module):
    def __init__(self):
        super(LNN, self).__init__()
        self.fc1 = nn.Linear(4, 16)
        self.fc2 = nn.Linear(16, 16)
        self.fc3 = nn.Linear(16, 16)
        self.fc4 = nn.Linear(16, 1)
        self.sp = nn.Softplus(beta=1)   #beta = 1 is steeper, so derivative = 1 is approached faster 

        self.apply(self._init_weights)

    def lagrangian(self, x): 
        x = self.sp(self.fc1(x))
        x = self.sp(self.fc2(x))
        x = self.sp(self.fc3(x))
        x = self.fc4(x)
        return x   
    
    def _init_weights(self, module):
      if isinstance(module, nn.Linear):                     #normalize weights according to paper
          self.fc1.weight.data.normal_(mean=0.0, std=2.2/4)  #1st layer is 2.2/sqrt(n_1)
          self.fc2.weight.data.normal_(mean=0.0, std=0.58/4) #i-th layer is 0.58i/sqrt(n_i)
          self.fc3.weight.data.normal_(mean=0.0, std=0.58*2/4)
          self.fc4.weight.data.normal_(mean=0.0, std=4)    #last layer is sqrt(n)
          self.fc1.bias.data.zero_()
          self.fc2.bias.data.zero_()
          self.fc3.bias.data.zero_()
          self.fc4.bias.data.zero_()
    
    def forward(self, x):  #   r, th, rt, tht 
        n = x.shape[1]//2  #   2   
        xv = torch.autograd.Variable(x, requires_grad=True)                  
        xv_tup = tuple([xi for xi in x]) 
        tqt = xv[:, n:]   #   rt, tht  

        jacpar = partial(jacobian,  self.lagrangian, create_graph=True)
        hesspar = partial(hessian,  self.lagrangian, create_graph=True)
      
        A = tuple(map(hesspar, xv_tup))    #   
        B = tuple(map(jacpar, xv_tup))
        #                                                 dqt dqt         dq          dqt dq  @ qt
        #                                                 [2:, 2:]       [:2, 0]     [2:, :2] @ rt, tht        
        multi = lambda Ai, Bi, tqti, n:  torch.pinverse(Ai[n:, n:]) @ (Bi[:n, 0] - Ai[n:, :n] @ tqti) 
        multi_par = partial(multi, n=n)
        tqtt_tup = tuple(map(multi_par, A, B, tqt))
        tqtt = torch.cat([tqtti[None] for tqtti in tqtt_tup])

        xt = torch.cat([tqt, tqtt], axis=1)
        xt.retain_grad()
        return xt                        #qt (same as input), qtt 

    def t_forward(self, t, x):
        return self.forward(x)

tr_params = sum(p.numel() for p in LNN().parameters() if p.requires_grad)
print('Trainable parameters= %i' %tr_params)

def loss(pred, targ):
    return torch.mean((pred - targ)**2)

def nn_solve_ode(model, x0, t):

    x0 = x0.detach().numpy()
    def f(x, t):
        x_tor = torch.tensor(np.expand_dims(x, 0), requires_grad=True).float()
        return np.squeeze(model(x_tor).detach().numpy(), axis=0)
    return odeint(f, x0, t)


In [None]:
############################ P A R A M E T E R S #############################################################

eps = 200           #epochs
t = 3              #duration of motion
tl = 4             #length of samples
N = tl              #len of training sample (if shorter, less motion)

samples = 512        #samples to train on
batches = [2]      #different batch sizes for training
lrs = [1e-3]        #different learning rates for optimizer

#StepLR decays the learn rate
ef, gm = 10 , 0.5  #every st epochs, lr= lr*gm, so lr_final= lr* gm^ef
st = eps//ef

b_len, lr_len = len(batches), len(lrs)
print('Sample size= %i' %int(samples*tl))

In [None]:
############################# T R A I N # S E T # A N D # P L O T S ##########################################

t_train = torch.tensor(np.linspace(0, t, tl)).float()
ini_cond = torch.rand(samples, 4)  #randomized initial conditions

x_train = torch.empty((0, 4))
xt_train = torch.empty((0, 4))

for x0 in ini_cond:
    x = torch.tensor(anal_solve_ode(x0[:2], x0[2:], t_train)) #q, qt analytical
    xt = torch.tensor(get_xt_anal(x, t_train)).float()    #qt, qtt anal 

    x_train = torch.cat((x_train, x))    #chain it all   
    xt_train = torch.cat((xt_train, xt))         

    
ind = torch.randperm(x_train.shape[0])
x_train = x_train[ind, :]     #shuffle
xt_train = xt_train[ind, :]

In [None]:
############################# T R A I N I N G #################################################################

model = [LNN()]*len(batches)
model = np.expand_dims(model, 1)
a = model
for i in range(lr_len-1):
    model = np.concatenate((model,a),axis=1)   #model is now a matrix containing LNN() in all entries
                                               #model[i,j] is then trained with i-th batch size and j-th learning rate

loss_list = np.zeros((b_len, lr_len,eps))
times = []

loss_list = np.zeros((b_len, lr_len,eps))
times = []

for i in range(b_len):
    start = time.time()
    
    for j in range(lr_len):
        #optimizer = optim.SGD(model[i,j].parameters(), lr=lrs[j], momentum=0.9)
        optimizer = optim.Adam(model[i,j].parameters(), lr=lrs[j])
        scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=st, gamma=gm)  #decaying learn rate

        for e in range(eps):
            running_loss = 0.

            for k in range((tl*samples) // batches[i]):
                optimizer.zero_grad()

                xi = x_train[k*batches[i]:(k+1)*batches[i]]
                xti = xt_train[k*batches[i]:(k+1)*batches[i]]   #qt, qtt 4 comp
                
                xt_pred = model[i,j](xi.float())      #qt, qtt
                loss_val = loss(xt_pred[:,2:], xti[:,2:])
                loss_val.backward()
                optimizer.step()
                running_loss += loss_val.item()
                    
            loss_list[i,j,e] = running_loss / N / samples 
            scheduler.step()
            running_loss = 0.0

    end = time.time()
    times.append((end-start) / lr_len)

In [None]:
############################ S A V I N G ########################################################

r = int(time.time() % 10000)
nmod = b_len * lr_len

np.save('DP2 %i' %r, model)
#model= np.load('9 models 9545.npy', allow_pickle=True)


In [None]:
###################################### T E S T ##########################################################

q0 = torch.rand(1, 4) #          initial conditions
l = 256              #          length
tx = 5               #          duration
t_n = torch.tensor(np.linspace(0, tx, l)).float()

x_anal = torch.tensor(anal_solve_ode(q0[:2], q0[2:], t_n))      #q, qt analytical       
qtt_anal = torch.tensor(get_xt_anal(x_anal, t_n))      #qt(same as input), qtt anal
traj_x = q2xy(x_anal)
E_x = [E(x) for x in x_anal]

x_nn = np.zeros((b_len, lr_len, l, 4))
qtt_nn = np.zeros((b_len, lr_len, l, 4))
traj_nn = np.zeros((b_len, lr_len, l, 4))
E_nn = np.zeros((b_len, lr_len, l))
for i in range(b_len):
    for j in range(lr_len):
        qtt_nn[i,j,:,:] = torch.tensor(model[i,j](x_anal.float()).detach().numpy())
        x_nn[i,j,:,:] = nn_solve_ode(model[i,j], x_anal[0], t_n)
        traj_nn[i,j,:,:] = q2xy(x_nn[i,j])  
        E_nn[i,j,:] = [E(x) for x in x_nn[i,j]]

losses = np.zeros((b_len, lr_len, 2))
tresh = 400
for i in range(b_len):
    for j in range(lr_len):
        losses[i,j,0]=loss(qtt_anal[:,2], qtt_nn[i,j,:,2])
        losses[i,j,1]=loss(qtt_anal[:,3], qtt_nn[i,j,:,3])

In [None]:
############################# A C C E L E R A T I O N S #########################################
fig = plt.figure(figsize=(16,12)) 

plt.subplot(2,1,1)
plt.plot(t_n, qtt_anal[:, 2], 'black', label='Truth')
for i in range(b_len):
    for j in range(lr_len):
        if losses[i,j,0] < tresh: 
            plt.plot(t_n, qtt_nn[i,j,:,2], '-', label='lr %.0e, bs %i' %(lrs[j], batches[i]))
plt.xlabel('Time')
plt.ylabel('Theta1_tt')
plt.title('Theta1_tt, %i epochs, N=%i, samples=%i' %(eps, N, samples))
plt.legend()

plt.subplot(2,1,2)
plt.plot(t_n, qtt_anal[:, 3], 'black', label='Truth')
for i in range(b_len):
    for j in range(lr_len):
        if losses[i,j,1] < tresh: 
            plt.plot(t_n, qtt_nn[i,j,:,3], '-', label='lr %.0e, bs %i' %(lrs[j], batches[i]))
plt.xlabel('Time')
plt.ylabel('Theta2_tt')
plt.title('Theta2_tt, %i epochs, N=%i, samples=%i' %(eps, N, samples))
plt.legend()

plt.show()


In [None]:
############################# P O S I T I O N S ##############################################

fig = plt.figure(figsize=(16,12)) 

plt.subplot(2,1,1)
plt.plot(t_n, x_anal[:, 0], 'black', label='Truth')
for i in range(b_len):
    for j in range(lr_len):
        if losses[i,j,0] < tresh: 
            plt.plot(t_n, x_nn[i,j,:,0], '-', label='lr %.0e, bs %i' %(lrs[j], batches[i]))
plt.xlabel('Time')
plt.ylabel('Theta1_tt')
plt.title('Theta1, %i epochs, N=%i, samples=%i' %(eps, N, samples))
plt.legend()

plt.subplot(2,1,2)
plt.plot(t_n, x_anal[:, 1], 'black', label='Truth')
for i in range(b_len):
    for j in range(lr_len):
        if losses[i,j,1] < tresh: 
            plt.plot(t_n, x_nn[i,j,:,1], '-', label='lr %.0e, bs %i' %(lrs[j], batches[i]))
plt.xlabel('Time')
plt.ylabel('Theta2_tt')
plt.title('Theta2, %i epochs, N=%i, samples=%i' %(eps, N, samples))
plt.legend()


plt.show()

In [None]:
############################# T R A J E C T O R I E S #########################################

a, b = 0, l  #shows array[a:b] in plots (l is their normal length)

fig = plt.figure(figsize=(16,16)) 

plt.subplot(2,1,1)
plt.plot(traj_x[a:b, 0], traj_x[a:b, 1], 'black', label='Truth')
for i in range(b_len):
    for j in range(lr_len):
        if losses[i,j,0] < tresh: 
           plt.plot(traj_nn[i,j,a:b, 0], traj_nn[i,j,a:b, 1], '-', label='lr %.0e, bs %i' %(lrs[j], batches[i]))
plt.xlabel('x(t)')
plt.ylabel('y(t)')
plt.title('Trajectory y vs x, %i epochs, N=%i, samples=%i' %(eps, N, samples))
plt.legend()

plt.subplot(2,1,2)
plt.plot(t_n, np.zeros(l), 'black', label='Truth')

for i in range(b_len):
    for j in range(lr_len):
        if losses[i,j,0] < tresh: 
           r = ((traj_nn[i,j,a:b, 0]-traj_x[a:b,0])**2 + (traj_nn[i,j,a:b, 1]-traj_x[a:b,1])**2)**0.5
           plt.plot(t_n[a:b], r, '-', label='lr %.0e, bs %i' %(lrs[j], batches[i]))
plt.xlabel('Time')
plt.ylabel('sqrt dx^2+dy^2')
plt.title('Euclidean distance from truth, %i epochs, N=%i, samples=%i' %(eps, N, samples))
plt.legend()

plt.show()


In [None]:
############################# E N E R G I E S #########################################

fig = plt.figure(figsize=(16,16)) 

plt.subplot(2,1,1)
plt.plot(t_n, E_x, 'black', label='Truth')
for i in range(b_len):
    for j in range(lr_len):
        if losses[i,j,1] < tresh: 
            plt.plot(t_n[a:b], E_nn[i,j,a:b], '-', label='lr %.0e, bs %i' %(lrs[j], batches[i]))
plt.xlabel('Time')
plt.ylabel('Energy')
plt.title('Energy vs time, %i epochs, N=%i, samples=%i' %(eps, N, samples))
plt.legend()

############################# L O S S E S #########################################
ae, be = 0, eps

plt.subplot(2,1,2)
for i in range(b_len):
    for j in range(lr_len):
            plt.plot(np.arange(1+ae, be+1, 1), loss_list[i,j,ae:be], '-', label='lr %.0e, bs %i' %(lrs[j], batches[i]))
plt.xlabel('Epoch')
plt.ylabel('MSE(truth-pred)')
plt.title('Loss vs epochs, %i epochs, N=%i, samples=%i' %(eps, N, samples))
plt.legend()
plt.show()