neccessary imports, setting the compute device, setting seed, loading the network and scalers

In [None]:
import numpy as np 
import pandas as pd 
import matplotlib.pyplot as plt
import os 
import random
import torch 
import pickle
import optuna 

## user inputs ## 
# names and directories 
model_name = "forward_MLP.pth" 
scaler_model_dir_name = "network_data"  
input_scaler_filename = "input_scaler.pkl" 
state_scaler_filename = "state_scaler.pkl"
# neural network configuration  
lag_input = 0 
lag_state = 1
num_hidden_layers = 0
hidden_units = 30 
input_flat_size = 6 + (lag_state*6) + 3 + (lag_input*3) 
output_size = 6 
# optimization parameters 
x_intial = np.array([3.3065416622541037e-06, 0, -0.19036912150652113, 6.0826336879046396e-06, 0, -0.3907576704717413])
X = np.vstack([x_intial, x_intial]) 
u_initial = np.array([0,0,0]) 
U = u_initial
delta_umax = 12 
umax = 12 
tmax = 3  
dt = 0.1 # sampling time
z_g = -1 # structure height 
g = 9.8 # gravity acceleration
des_land_pos = [0.15, 0.15] # desired landing pose 
Q = 1 # landing pose weight term  
n_trials = 1000 # number of trials

# selecting the compute device 
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") 
print(f"using compute device: {device}") 
if device.type == "cuda":
    print(f"GPU: {torch.cuda.get_device_name(0)}")   

# setting seeds 
def set_all_seeds(seed: int = 42):
    """
    sets the seeds for python, numpy, and pytoch (CPU & GPU) 
    """
    # python random module
    random.seed(seed)
    # numpy
    np.random.seed(seed)
    # pytorch
    torch.manual_seed(seed)
    # pytorch (GPU)
    if torch.cuda.is_available():
        torch.cuda.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)
        torch.backends.cudnn.deterministic = True
        torch.backends.cudnn.benchmark = False
    print(f"Seeds set to {seed}") 

# defining the model 
class MLP_model(torch.nn.Module): 
    def __init__(self, input_flat_size:int, hidden_units:int, output_size:int, num_hidden_layers:int) :
        super().__init__()
        self.input_flat_size = input_flat_size 
        self.hidden_units = hidden_units 
        self.output_size = output_size 
        self.num_hidden_layers = num_hidden_layers 

        hidden_layers = [] 

        in_dimension = self.input_flat_size 

        self.input_layer = torch.nn.Linear(in_features=in_dimension, out_features=self.hidden_units) 
        
        for i in range(self.num_hidden_layers) : 
            hidden_layers.append(torch.nn.Linear(in_features=self.hidden_units, out_features=self.hidden_units)) 
            hidden_layers.append(torch.nn.ReLU()) 

        self.backbone = torch.nn.Sequential(*hidden_layers) 
        
        self.output_layer = torch.nn.Linear(in_features=self.hidden_units, out_features=self.output_size) 
 
        self.relu = torch.nn.ReLU()    

    def forward(self,x): 
        out = self.input_layer(x) 
        out = self.relu(out)
        out = self.backbone(out)  
        out = self.output_layer(out) 
        return out  

# instantiating an NN object 
forward_model = MLP_model(input_flat_size=input_flat_size, hidden_units=hidden_units, output_size=output_size, num_hidden_layers=num_hidden_layers) 

# loading the statedicts 
script_path = os.path.dirname(os.path.abspath(__file__)) if "__file__" in globals() else os.getcwd() 
model_data_path = os.path.join(script_path, scaler_model_dir_name) 
forward_model.load_state_dict(torch.load(os.path.join(model_data_path, model_name), weights_only=True)) 
forward_model = forward_model.to(device=device) 
forward_model.eval()  

# loading the scalers 
with open(os.path.join(model_data_path, input_scaler_filename), "rb") as file : 
    input_scaler = pickle.load(file) 
with open(os.path.join(model_data_path, state_scaler_filename), "rb") as file : 
    state_scaler = pickle.load(file)

using compute device: cpu


defining the objective function

In [None]:
def objective(trial) : 
    # defining the decision variables 
    u1_step = trial.suggest_float("act1_step", 0, umax) # actuator 1 
    u2_step = trial.suggest_float("act2_step", 0, umax) # actuator 2
    u3_step = trial.suggest_float("act3_step", 0, umax) # actuator 3 
    u_step = np.array([u1_step, u2_step, u3_step]) # shape (,3)
    # smoothstep and release time  
    t_smoothstep = trial.suggest_float("t_smoothstep", 0, tmax) 
    t_release = trial.suggest_float("t_release", 0, t_smoothstep) 
    
    # generating the input array based on the smoothstep and sampling time  
    ramp_steps = round(t_smoothstep/dt) 
    release_steps = round(t_release/dt) 
    smoothstep_factor = np.zeros((ramp_steps,3)) # shape (ramp_steps, 3)
    for i in range(ramp_steps) : 
        x_val = (i-0)/((ramp_steps-1)-0)
        smoothstep_factor[i,:] = x_val * x_val * (3 - 2 * x_val) 
    u_array = smoothstep_factor * u_step # shape (ramp_steps, 3) but factored (broadcasted)

    # scaling the input array, initial states and conversion to torch tensor  
    u_array_scaled = input_scaler.transform(u_array) 
    u_array_torch = torch.from_numpy(u_array_scaled).type(torch.float32) 
    X_init = state_scaler.transform(X) 
    X_init_scaled = torch.from_numpy(X_init).type(torch.float32) 

    # rollout (dynamics simulation) 
    max_lag = max(lag_input, lag_state)
    # initial buffer 
    current_state = X_init_scaled[max_lag,:] 
    if lag_state == 0 : 
        past_state = X_init_scaled[max_lag:max_lag] 
    else : 
        past_state = X_init_scaled[max_lag-lag_state:max_lag,:] 
        past_state = torch.flatten(input=past_state) 
    current_input = u_array_torch[max_lag,:] 
    if lag_input == 0 : 
        past_input = u_array_torch[max_lag:max_lag,:]
    else : 
        past_input = u_array_torch[max_lag-lag_input:max_lag,:] 
        past_input = torch.flatten(input=past_input) 

    if past_state.size(dim=0) == 0 and past_input.size(dim=0) == 0 : 
        joined_features = torch.concatenate((current_state, current_input), dim=0) 
    elif past_state.size(dim=0) != 0 and past_input.size(dim=0) == 0 : 
        joined_features = torch.concatenate((current_state, past_state, current_input), dim=0)
    elif past_state.size(dim=0) == 0 and past_input.size(dim=0) != 0 : 
        joined_features = torch.concatenate((current_state, current_input, past_input), dim=0) 
    else : 
        joined_features = torch.concatenate((current_state, past_state, current_input, past_input), dim=0) 

    preds = []  

    X_buffer = [X_init_scaled]

    with torch.inference_mode(): 
        for i in range(max_lag+1, len(u_array_torch)) : 
            pred = forward_model(joined_features.unsqueeze(0)) 
            pred = pred.squeeze(0) 

            preds.append(pred) 
            X_buffer.append(pred)

            # updating the buffer 
            current_state = pred
            if lag_state == 0 : 
                past_state = X_buffer[i:i] 
            else : 
                past_state = X_buffer[i-lag_state:i,:] 
                past_state = torch.flatten(input=past_state) 
            current_input = u_array_torch[i,:] 
            if lag_input == 0 : 
                past_input = u_array_torch[i:i,:]
            else : 
                past_input = u_array_torch[i-lag_input:i,:] 
                past_input = torch.flatten(input=past_input) 

            if past_state.size(dim=0) == 0 and past_input.size(dim=0) == 0 : 
                joined_features = torch.concatenate((current_state, current_input), dim=0) 
            elif past_state.size(dim=0) != 0 and past_input.size(dim=0) == 0 : 
                joined_features = torch.concatenate((current_state, past_state, current_input), dim=0)
            elif past_state.size(dim=0) == 0 and past_input.size(dim=0) != 0 : 
                joined_features = torch.concatenate((current_state, current_input, past_input), dim=0) 
            else : 
                joined_features = torch.concatenate((current_state, past_state, current_input, past_input), dim=0)
    
    preds_tensor = torch.stack(preds, dim=0) 
    preds_np = preds_tensor.numpy() 

    # unscaling the predictions (states corrresponding to the inputs)  
    preds_np = state_scaler.inverse_transform(preds_np) 

    # calculating the veloctiy  
    diff = np.diff(preds_np, axis=0) 
    velocities = np.vstack([np.zeros((1, 6)), diff / dt]) 

    # predicting the landing positions 
    delta_z = preds_np[:, -1] - z_g 
    sqrt_term = velocities[:,-1]**2 + (2 * g * delta_z)
    t_flight = (velocities[:,-1] + np.sqrt(sqrt_term)) / g 
    x_landing = preds_np[:,3] + velocities[:,3] * t_flight
    y_landing = preds_np[:,4] + velocities[:,4] * t_flight  

    # actual landing based on the release time 
    act_landing_x = x_landing[release_steps] 
    act_landing_y = y_landing[release_steps] 
    land_pos = np.array([act_landing_x, act_landing_y]) 

    # distance to the goal 
    dist = np.linalg.norm(des_land_pos - land_pos) 

    cost = Q * dist 
    
    return cost

bayesian optimization settings and initiation

In [None]:
study = optuna.create_study(direction="minimize")
study.optimize(objective, n_trials=n_trials, show_progress_bar=True) 