# Timoshenko Beam Control

In [1]:
import time
import torch
import torch.nn as nn
import numpy as np
import matplotlib.pyplot as plt

from torchdyn.core import NeuralODE
from torchdyn.datasets import *
from torchdyn.numerics import odeint, Euler, HyperEuler

In [2]:
%load_ext autoreload
%autoreload 2
import sys; sys.path.append(2*'../') # go n dirs back
from src import *
from dicts import *
from timoshenko_utils import *

In [3]:
# Change device according to your configuration
# device = torch.bdevice('cuda:1') if torch.cuda.is_available() else torch.device('cpu')
device = torch.device('cpu')

### Load Timoshenko Beam

In [4]:
# Load discretization data
A = torch.load('A_sys').to(device).float()
B = torch.load('B_sys').to(device).float()
x0 = torch.load('x0')[None].to(device).float()
x_dim, u_dim = x0.shape[1], 2

u = BoxConstrainedController(x_dim, u_dim, h_dim=512, num_layers=3).to(device)

f = TimoshenkoBeam(A, B, u).to(device)

### Test model

In [5]:
class HyperNet(nn.Module):
    """Simple hypernetwork for controlled systems
    Input: current x, f and u from the controlled system
    Output: p-th order residuals"""
    def __init__(self, net):
        super().__init__()
        self.net = net
        
    def forward(self, t, x):
        xfu = torch.cat([x, f.cur_f, f.cur_u], -1)
        return self.net(xfu)
# hdim = 256
# snake_activation = Snake(hdim)
hypersolver = torch.load('saved_models/hypersolver_0.005_256_snake_rw_layers_2_new.pt')

In [6]:
u_low = torch.Tensor([-1, -1]).to(device)
u_high = torch.Tensor([1, 1]).to(device)
u_dist = torch.distributions.Uniform(u_low, u_high)
f.u = RandConstController()

f.u.u0 = u_dist.sample((1,)) # set  random controller to be common for all experiments
t0, tf, dt = 0, 3, 0.005
steps = int((tf - t0)/dt) + 1
t = torch.linspace(t0, tf, steps)

### Train with `odeint` and `Pytorch Lightning`


In [8]:
import pytorch_lightning as pl
from numpy import pi
import time

def dummy_trainloader():
    tl = torch.utils.data.DataLoader(torch.utils.data.TensorDataset(torch.Tensor(1), torch.Tensor(1)), batch_size=1)
    return tl

trainloader = dummy_trainloader()
x0 = torch.load('x0')[None].to(device).float()
x_dim, u_dim = x0.shape[1], 2

class Learner(pl.LightningModule):
    def __init__(self,
                 model:nn.Module,
                 span,
                 solver='rk4',
                 lr=1e-3):
        super().__init__()
        self.model = model.to(device)
        self.t = span.to(device)
        self.solver = solver
        self.lr=lr
        self.z0 = x0
        self.model.nfe = 0 
        self.flag = 0
        self.current_time = 0
        self.times = []
        
    def forward(self):
        _, zT = odeint(self.model, self.z0, self.t, 
                    solver=self.solver)
        return zT
    
    def training_step(self, batch, batch_idx):      
        if not self.flag:
            self.current_time = time.time()
            self.flag = 1.        
        fw_time = time.time() - self.current_time
        self.current_time = time.time()
        
        # save times for each forward pass
        if self.flag:
            self.times.append(fw_time)
        
        # forward pass
        self.model.nfe = 0

        zT = self()
        fw_nfe = self.model.nfe

        control_loss = torch.norm(zT[:,0,dofs_dict['sig_t']], p=2, dim=-1).mean()
        control_loss = control_loss + torch.norm(zT[:,0,dofs_dict['sig_r']], p=2, dim=-1).mean()
        reg_loss = 1e-3*self.model.u(0., zT).abs().mean()
        loss = control_loss + reg_loss
        return {'loss': loss}   
    
    def configure_optimizers(self):
        return torch.optim.Adam(self.model.u.parameters(), lr=self.lr)

    def train_dataloader(self):
        return trainloader

### Parameters

In [9]:
# Time span
t0, dt, tf = 0, 0.005, 3
steps = int((tf - t0)/dt) + 1
t = torch.linspace(t0, tf, steps)

# Training hyperparameters
lr = 1e-3
epochs = 1000
h_dim = 1024
num_layers = 3

## Run experiments

In [None]:
# Solver
solver = hypersolver

# Controller
u = BoxConstrainedController(x_dim, u_dim, h_dim=h_dim, num_layers=num_layers, constrained=True).to(device)
f.u = u

# Train
learn = Learner(f, t, solver=solver, lr=lr).to(device)
trainer = pl.Trainer(max_epochs=epochs, gradient_clip_val=.3) #, logger=logger
trainer.fit(learn)

# Save
torch.save(f.u, 'saved_models/u_hypersolver.pt')
training_times = learn.times[1:] # exclude the first dummy time

exp = 'hypersolver'
with open("results/"+exp+"_times.txt", "w") as output:
    output.write(str(training_times))
    
print(r'Mean runtime : {} ± {} s'.format(round(np.mean(training_times), 4), round(np.std(training_times), 4)))
print(r'Total runtime : {} s'.format(round(np.sum(training_times), 4)))

GPU available: True, used: False
TPU available: False, using: 0 TPU cores

  | Name   | Type           | Params
------------------------------------------
0 | model  | TimoshenkoBeam | 2 M   
1 | solver | HyperEuler     | 189 K 


HBox(children=(HTML(value='Training'), FloatProgress(value=1.0, bar_style='info', layout=Layout(flex='2'), max…

In [None]:
# Solver
solver = 'rk4'

# Controller
u = BoxConstrainedController(x_dim, u_dim, h_dim=h_dim, num_layers=num_layers, constrained=True).to(device)
f.u = u

# Train
learn = Learner(f, t, solver=solver, lr=lr).to(device)
trainer = pl.Trainer(max_epochs=epochs, gradient_clip_val=.3) #, logger=logger
trainer.fit(learn)

# Save
torch.save(f.u, 'saved_models/u_rk4.pt')
training_times = learn.times[1:] # exclude the first dummy time

exp = 'rk4'
with open("results/"+exp+"_times.txt", "w") as output:
    output.write(str(training_times))
    
print(r'Mean runtime : {} ± {} s'.format(round(np.mean(training_times), 4), round(np.std(training_times), 4)))
print(r'Total runtime : {} s'.format(round(np.sum(training_times), 4)))

In [None]:
# Solver
solver = 'midpoint'

# Controller
u = BoxConstrainedController(x_dim, u_dim, h_dim=h_dim, num_layers=num_layers, constrained=True).to(device)
f.u = u

# Train
learn = Learner(f, t, solver=solver, lr=lr).to(device)
trainer = pl.Trainer(max_epochs=epochs, gradient_clip_val=.3) #, logger=logger
trainer.fit(learn)

# Save
torch.save(f.u, 'saved_models/u_midpoint.pt')
training_times = learn.times[1:] # exclude the first dummy time

exp = 'midpoint'
with open("results/"+exp+"_times.txt", "w") as output:
    output.write(str(training_times))
    
print(r'Mean runtime : {} ± {} s'.format(round(np.mean(training_times), 4), round(np.std(training_times), 4)))
print(r'Total runtime : {} s'.format(round(np.sum(training_times), 4)))

In [None]:
# Solver
solver = 'euler'

# Controller
u = BoxConstrainedController(x_dim, u_dim, h_dim=h_dim, num_layers=num_layers, constrained=True).to(device)
f.u = u

# Train
learn = Learner(f, t, solver=solver, lr=lr).to(device)
trainer = pl.Trainer(max_epochs=epochs, gradient_clip_val=.3) #, logger=logger
trainer.fit(learn)

# Save
torch.save(f.u, 'saved_models/u_euler.pt')
training_times = learn.times[1:] # exclude the first dummy time

exp = 'euler'
with open("results/"+exp+"_times.txt", "w") as output:
    output.write(str(training_times))
    
print(r'Mean runtime : {} ± {} s'.format(round(np.mean(training_times), 4), round(np.std(training_times), 4)))
print(r'Total runtime : {} s'.format(round(np.sum(training_times), 4)))

### Test controller with nominal trajectory

In [None]:
t = torch.linspace(0, 5, 500+1)

def plot_test_controller(f, x0, 
                         span=torch.linspace(0, 5, 500+1), 
                         title='Euler trajectories and control policy'):
    _, xT = odeint(f.to(device), x0.to(device), t, solver='tsit5', atol=1e-5, rtol=1e-5)
    xT = xT.detach().cpu()
    uT = f.u(0, xT.to(device))

    v_t = xT[:,0,dofs_dict['v_t']]
    v_r = xT[:,0,dofs_dict['v_r']]
    sig_t = xT[:,0,dofs_dict['sig_t']]
    sig_r = xT[:,0,dofs_dict['sig_r']]

    fig, axs = plt.subplots(3, 1, figsize=(10, 5))
    axs[0].plot(t.cpu(), v_t, ':k');
    axs[0].plot(t.cpu(), v_r, 'b');
    axs[1].plot(t.cpu(), sig_t, ':k');
    axs[1].plot(t.cpu(), sig_r, 'b');
    axs[2].plot(t.cpu(), uT[:,0,:].detach().cpu(), ':b');

plot_test_controller(f, x0)

In [None]:
v_t = xT[:,0,dofs_dict['v_t']].cpu()
v_r = xT[:,0,dofs_dict['v_r']].cpu()
sig_t = xT[:,0,dofs_dict['sig_t']].cpu()
sig_r = xT[:,0,dofs_dict['sig_r']].cpu()

x_v_t = x_dict['v_t'].cpu()
x_v_r = x_dict['v_r'].cpu()
x_sig_t = x_dict['sig_t'].cpu()
x_sig_r = x_dict['sig_r'].cpu()


fig, axs = plt.subplots(2, 2)
axs[0,0].scatter(x_v_t, v_t[-1])
axs[0,1].scatter(x_v_r, v_r[-1])
axs[1,0].scatter(x_sig_t, sig_t[-1])
axs[1,1].scatter(x_sig_r, sig_r[-1])
axs[0,0].scatter(x_v_t, v_t[0])
axs[0,1].scatter(x_v_r, v_r[0])
axs[1,0].scatter(x_sig_t, sig_t[0])
axs[1,1].scatter(x_sig_r, sig_r[0])

axs[1,0].set_ylim([-.1, .1])
axs[1,1].set_ylim([-.1, .1])

## Complexity and FLOPS calculation

In [None]:
from ptflops import get_model_complexity_info

bs = 1 # batch size we used in training
def get_macs(net:nn.Module):
    params = []
    for p in net.parameters(): params.append(p.shape)
    with torch.cuda.device(0):
        macs, _ = get_model_complexity_info(net, (bs, params[0][1]), as_strings=False)
    return int(macs)

controller_test = nn.Sequential(
                nn.Linear(160, 1024),
                nn.Softplus(),
                nn.Linear(1024, 1024),
                nn.Softplus(),
                nn.Linear(1024, 1024),
                nn.Tanh(),
                nn.Linear(1024, 2))

hypersolver_test = nn.Sequential(nn.Linear(322, 256), nn.Softplus(), nn.Linear(256, 256), 
                                 nn.Softplus(), nn.Linear(256, 160)).to(device)

hs_macs = get_macs(hypersolver_test)
u_macs = get_macs(controller_test)

print('Controller MACs per NFE:', u_macs, '\nHypersolver MACs per NFE:', hs_macs)