In [None]:
# Comodo import
from comodo.drakeSimulator.drakeSimulator import DrakeSimulator
from comodo.mujocoSimulator.mujocoSimulator import MujocoSimulator
from comodo.robotModel.robotModel import RobotModel
from comodo.robotModel.createUrdf import createUrdf
from comodo.centroidalMPC.centroidalMPC import CentroidalMPC
from comodo.centroidalMPC.mpcParameterTuning import MPCParameterTuning
from comodo.TSIDController.TSIDParameterTuning import TSIDParameterTuning
from comodo.TSIDController.TSIDController import TSIDController

# optimisation import
import pygmo as pg

# General  import 
import xml.etree.ElementTree as ET
import numpy as np
import tempfile
import urllib.request
import logging
import time
import os
from contextlib import redirect_stderr, redirect_stdout

In [None]:
# set the logging level
logging.getLogger().setLevel(logging.INFO)

In [None]:
# Getting stickbot urdf file and convert it to string 
urdf_robot_file = tempfile.NamedTemporaryFile(mode="w+")
url = 'https://raw.githubusercontent.com/icub-tech-iit/ergocub-gazebo-simulations/master/models/stickBot/model.urdf'
urllib.request.urlretrieve(url, urdf_robot_file.name)
# Load the URDF file
tree = ET.parse(urdf_robot_file.name)
root = tree.getroot()

# Convert the XML tree to a string
robot_urdf_string_original = ET.tostring(root)

create_urdf_instance = createUrdf(
    original_urdf_path=urdf_robot_file.name, save_gazebo_plugin=False
)

In [None]:
# Define parametric links and controlled joints  
legs_link_names = ["hip_3", "lower_leg"]
joint_name_list = [
    "r_shoulder_pitch",
    "r_shoulder_roll",
    "r_shoulder_yaw",
    "r_elbow",
    "l_shoulder_pitch",
    "l_shoulder_roll",
    "l_shoulder_yaw",
    "l_elbow",
    "r_hip_pitch",
    "r_hip_roll",
    "r_hip_yaw",
    "r_knee",
    "r_ankle_pitch",
    "r_ankle_roll",
    "l_hip_pitch",
    "l_hip_roll",
    "l_hip_yaw",
    "l_knee",
    "l_ankle_pitch",
    "l_ankle_roll",
]

In [None]:
# Define the robot modifications
modifications = {}
for item in legs_link_names:
    left_leg_item = "l_" + item
    right_leg_item = "r_" + item
    modifications.update({left_leg_item: 1.2})
    modifications.update({right_leg_item: 1.2})
# Motors Parameters 
Im_arms = 1e-3*np.ones(4) # from 0-4
Im_legs = 1e-3*np.ones(6) # from 5-10
kv_arms = 0.001*np.ones(4) # from 11-14
kv_legs = 0.001*np.ones(6) #from 20

Im = np.concatenate((Im_arms, Im_arms, Im_legs, Im_legs))
kv = np.concatenate((kv_arms, kv_arms, kv_legs, kv_legs))

In [None]:
# Modify the robot model and initialize
create_urdf_instance.modify_lengths(modifications)
urdf_robot_string = create_urdf_instance.write_urdf_to_file()
create_urdf_instance.reset_modifications()

In [None]:
def cost_function(x_k, simulator="drake", urdf_robot_string=urdf_robot_string, visualise_flag = False):
    with redirect_stderr(open(os.devnull, "w")):
        robot_model_init = RobotModel(urdf_robot_string, "stickBot", joint_name_list)
        s_des, xyz_rpy, H_b = robot_model_init.compute_desired_position_walking()
    # Define simulator and initial position
    if simulator == "drake":
        sim_instance = DrakeSimulator()
        sim_instance.set_visualize_robot_flag(visualise_flag)
        sim_instance.load_model(robot_model_init, s=s_des, xyz_rpy=xyz_rpy, kv_motors=kv, Im=Im)

    elif simulator == "mujoco":
        sim_instance = MujocoSimulator()
        sim_instance.load_model(robot_model_init, s=s_des, xyz_rpy=xyz_rpy, kv_motors=kv, Im=Im)
        sim_instance.set_visualize_robot_flag(visualise_flag)
    
    # sim_instance.load_model(robot_model_init, s=s_des, xyz_rpy=xyz_rpy, kv_motors=kv, Im=Im)
    s, ds, tau = sim_instance.get_state()
    t = sim_instance.get_simulation_time()
    H_b = sim_instance.get_base()
    w_b = sim_instance.get_base_velocity()
    # Controller Parameters
    tsid_parameter = TSIDParameterTuning()
    mpc_parameters = MPCParameterTuning()
    if x_k is not None:
        tsid_parameter.set_from_x_k(x_k)

    # TSID Instance
    TSID_controller_instance = TSIDController(frequency=0.01, robot_model=robot_model_init)
    TSID_controller_instance.define_tasks(tsid_parameter)
    TSID_controller_instance.set_state_with_base(s, ds, H_b, w_b, t)

    # MPC Instance
    step_lenght = 0.1
    mpc = CentroidalMPC(robot_model=robot_model_init, step_length=step_lenght)
    mpc.intialize_mpc(mpc_parameters=mpc_parameters)

    # Set desired quantities
    mpc.configure(s_init=s_des, H_b_init=H_b)
    TSID_controller_instance.compute_com_position()
    mpc.define_test_com_traj(TSID_controller_instance.COM.toNumPy())

    # TODO: Set initial robot state and plan trajectories 
    sim_instance.step(1)

    # Reading the state 
    s, ds, tau = sim_instance.get_state()
    try:
        H_b = sim_instance.get_base()
    except RuntimeError as e:
        return sim_instance.get_simulation_time()
    w_b = sim_instance.get_base_velocity()
    t = sim_instance.get_simulation_time()

    # MPC
    mpc.set_state_with_base(s=s, s_dot=ds, H_b=H_b, w_b=w_b, t=t)
    mpc.initialize_centroidal_integrator(s=s, s_dot=ds, H_b=H_b, w_b=w_b, t=t)
    mpc_output = mpc.plan_trajectory()

    # Set loop variables 
    TIME_TH = 20.0

    # Define number of steps
    n_step = int(
        TSID_controller_instance.frequency / sim_instance.get_simulation_frequency()
    )
    n_step_mpc_tsid = int(mpc.get_frequency_seconds() / TSID_controller_instance.frequency)

    counter = 0
    mpc_success = True
    energy_tot = 0.0
    succeded_controller = True

    # Simulation-control loop
    if simulator == "drake" and visualise_flag:
        sim_instance.meshcat.StartRecording()

    while t < TIME_TH:
        
        # Reading robot state from simulator
        s, ds, tau = sim_instance.get_state()
        energy_i = np.linalg.norm(tau)
        try:
            H_b = sim_instance.get_base()
        except RuntimeError as e:
            return sim_instance.get_simulation_time()

        w_b = sim_instance.get_base_velocity()
        t = sim_instance.get_simulation_time()

        # Update TSID
        TSID_controller_instance.set_state_with_base(s=s, s_dot=ds, H_b=H_b, w_b=w_b, t=t)

        # MPC plan 
        if counter == 0:
            mpc.set_state_with_base(s=s, s_dot=ds, H_b=H_b, w_b=w_b, t=t)
            mpc.update_references()
            mpc_success = mpc.plan_trajectory()
            mpc.contact_planner.advance_swing_foot_planner()
            if not (mpc_success):
                logging.error("MPC failed")
                break

        # Reading new references
        com, dcom, forces_left, forces_right = mpc.get_references()
        left_foot, right_foot = mpc.contact_planner.get_references_swing_foot_planner()

        # Update references TSID 
        TSID_controller_instance.update_task_references_mpc(
            com=com,
            dcom=dcom,
            ddcom=np.zeros(3),
            left_foot_desired=left_foot,
            right_foot_desired=right_foot,
            s_desired=np.array(s_des),
            wrenches_left=forces_left,
            wrenches_right=forces_right,
        )

        # Run control 
        succeded_controller = TSID_controller_instance.run()

        if not (succeded_controller):
            logging.error("Controller failed")
            return sim_instance.get_simulation_time()

        tau = TSID_controller_instance.get_torque()

        # Step the simulator
        sim_instance.set_input(tau)
        # sim_instance.step_with_motors(n_step=n_step, torque=tau)
        try:
            sim_instance.step(n_step=n_step)
        except RuntimeError as e:
            logging.error(e)
            return sim_instance.get_simulation_time()

        counter = counter + 1

        if counter == n_step_mpc_tsid:
            counter = 0
    
    if simulator == "drake" and visualise_flag:
        sim_instance.meshcat.StopRecording()
        sim_instance.meshcat.PublishRecording()


    return sim_instance.get_simulation_time()

In [None]:
class GainTuningProblem:
    # @staticmethod
    def fitness(self, x):
        with redirect_stdout(open(os.devnull, "w")):
            result = -cost_function(x, simulator="mujoco")
        return [result]

    def get_bounds(self):
        return ([0] * 28, [1000] * 28)

In [None]:
%%time

# [Warning] 
# This cell can take a long time to run.
# Choose the number of workers, population and generations based on the machine you are using.
num_workers = 1
population = 5
generations = 2

prob = pg.problem(GainTuningProblem())
algo = pg.algorithm(pg.cmaes(gen=generations, force_bounds=True))
algo.set_verbosity(1)
archi = pg.archipelago(
    n=num_workers,
    algo=algo,
    prob=prob,
    pop_size=population,
)
archi.evolve()
print(archi)
ret_flag = archi.wait_check()
res = archi.get_champions_f()
x = archi.get_champions_x()
print(archi, ret_flag)

dict_log = {}
dict_log["res"] = res
dict_log["x"] = x
dict_log["num_workers"] = num_workers
dict_log["population"] = population
dict_log["generations"] = generations
dict_log["ret_flag"] = ret_flag

print(dict_log)

In [None]:
%%time
# extract the best solution
# best_x_k = np.array(x[np.argmin(res)])

# or load a known solution
# best_x_k = np.array([684.25282492, 648.05228734, 569.96763936, 350.65899653,
#        949.98434623, 515.09198331, 845.83335962,  17.95951342,
#        314.5705753 , 676.403354  , 440.72680377, 467.56525625,
#        681.69608946,  97.4551466 , 715.55867788, 416.80935819,
#        470.72979393, 680.49815841, 876.67111456, 654.65448001,
#        329.59585922,  65.34583639, 975.68181074, 342.1368993 ,
#        237.81553041, 606.40982421,  32.24741521,  10.83276846])

# or use default values
best_x_k = None

# run the cost function
cost_function(best_x_k, simulator="drake", visualise_flag = True)