In [None]:
import joblib
import numpy as np

import mujoco

from hydrax.algs import CEM, MPPI, PredictiveSampling, Evosax, xNES, CMAES, GaussianSmoothing, xMPPI, nMPPI
from hydrax.simulation.deterministic import run_interactive
from hydrax.tasks.cart_pole import CartPole
from hydrax.tasks.cart_pole_unconstrained import CartPoleUnconstrained
from hydrax.tasks.pusht_unconstrained import PushTUnconstrained
from hydrax.tasks.cube import CubeRotation
from hydrax.tasks.pusht import PushT
from hydrax.tasks.humanoid_standup import HumanoidStandup
from hydrax.tasks.humanoid_mocap import HumanoidMocap
from hydrax.tasks.walker import Walker
from hydrax.simulation.traj_opt import traj_opt_helper

In [None]:
# Parameters
num_trails = 6
max_iterations = 100
num_samples = 2048
sigma = 0.3
temperature = 0.1
spline = "zero"
horizon = 1.0
num_knots = 100

# # CubeRotation
# task = CubeRotation()
# mj_model = task.mj_model
# mj_data = mujoco.MjData(mj_model)

# CartPole
# task = CartPoleUnconstrained()
# mj_model = task.mj_model
# mj_data = mujoco.MjData(mj_model)

# HumanoidStandup
# task = HumanoidStandup()
# mj_model = task.mj_model
# # Set the initial state so the robot falls and needs to stand back up
# mj_data = mujoco.MjData(mj_model)
# mj_data.qpos[:] = mj_model.keyframe("stand").qpos
# mj_data.qpos[:3] = [0, 0, 0.1]
# mj_data.qpos[3:7] = [0.7, 0.0, -0.7, 0.0]


# # Define the task (cost and dynamics)
# task = HumanoidMocap(reference_filename="Lafan1/mocap/UnitreeG1/walk1_subject2.npz") # Humanoid balancing!
# # Define the model used for simulation
# mj_model = task.mj_model
# mj_data = mujoco.MjData(mj_model)
# mj_data.qpos[:] = task.reference[0]
# initial_knots = task.reference[: num_knots, 7:]



# # PushT
task = PushTUnconstrained()
mj_model = task.mj_model
mj_data = mujoco.MjData(mj_model)
mj_data.qpos = [0.1, 0.1, 1.3, 0.0, 0.0]

In [None]:
nmppi = nMPPI(
    task,
    num_samples = num_samples,
    temperature= temperature,
    sigma = sigma,
    lr_cov = 0.5,
    lr_mean = 1.0,
    plan_horizon= horizon,
    spline_type=spline,
    num_knots= num_knots
)

to = traj_opt_helper(nmppi, mj_model, mj_data)
to.trails(max_iteration=max_iterations, num_trails = num_trails)

In [None]:

xmppi = xMPPI(
    task,
    num_samples = num_samples,
    temperature= temperature,
    sigma = sigma,
    lr_sigma = 0.5,
    lr_B = 0.5,
    plan_horizon= horizon,
    spline_type=spline,
    num_knots= num_knots
)
to = traj_opt_helper(xmppi, mj_model, mj_data)
to.trails(max_iteration=max_iterations, num_trails = num_trails)


In [None]:

ps = PredictiveSampling(
    task,
    num_samples = num_samples,
    noise_level=sigma,
    plan_horizon= horizon,
    spline_type= spline,
    num_knots= num_knots,
)

to = traj_opt_helper(ps, mj_model, mj_data)
to.trails(max_iteration=max_iterations, num_trails = num_trails)



In [None]:

xnes = xNES(
    task,
    num_samples = num_samples,
    temperature= temperature,
    sigma = sigma,
    lr_sigma = 0.5,
    lr_B = 0.5,
    plan_horizon= horizon,
    spline_type=spline,
    num_knots= num_knots
)
to = traj_opt_helper(xnes, mj_model, mj_data)
to.trails(max_iteration=max_iterations, num_trails = num_trails)


In [None]:

cem = CEM(
    task,
    num_samples = num_samples,
    num_elites = 10,
    sigma_start= sigma,
    sigma_min = 0.1,
    plan_horizon=horizon,
    spline_type=spline,
    num_knots=num_knots,
)

# cem = CEM(
#     task,
#     num_samples=num_samples,
#     num_elites=20,
#     sigma_start=0.2,
#     sigma_min=0.05,
#     explore_fraction=0.5,
#     plan_horizon=horizon,
#     spline_type="cubic",
#     num_knots= num_knots,
# )

to = traj_opt_helper(cem, mj_model, mj_data)
to.trails(max_iteration=max_iterations, num_trails = num_trails)

In [None]:

cmaes = CMAES(
    task,
    num_samples = num_samples,
    sigma = sigma,
    plan_horizon= horizon,
    spline_type=spline,
    num_knots=num_knots,
)

to = traj_opt_helper(cmaes, mj_model, mj_data)
to.trails(max_iteration=max_iterations, num_trails = num_trails)


In [None]:
mppi = MPPI(
    task,
    num_samples = num_samples,
    temperature = temperature,
    noise_level= sigma,
    plan_horizon= horizon,
    spline_type=spline,
    num_knots=num_knots
)
mj_model = task.mj_model

to = traj_opt_helper(mppi, mj_model, mj_data)
to.trails(max_iteration=max_iterations, num_trails = num_trails)

In [None]:
gs = GaussianSmoothing(
    task,
    num_samples = num_samples,
    temperature = temperature,
    sigma = sigma,
    plan_horizon= horizon,
    spline_type= spline,
    num_knots=num_knots
)
mj_model = task.mj_model

to = traj_opt_helper(gs, mj_model, mj_data)
to.trails(max_iteration=max_iterations, num_trails = num_trails)


# Load costs

In [None]:
path = to.get_path()

PS_costs = joblib.load(path + "PredictiveSampling_costs_trails_average.pkl")
xNES_costs = joblib.load(path + "xNES_costs_trails_average.pkl")
MPPI_costs = joblib.load(path + "MPPI_costs_trails_average.pkl")
xMPPI_costs = joblib.load(path + "xMPPI_costs_trails_average.pkl")
CMAES_costs = joblib.load(path + "CMAES_costs_trails_average.pkl")
CEM_costs = joblib.load(path + "CEM_costs_trails_average.pkl")
GS_costs = joblib.load(path + "GaussianSmoothing_costs_trails_average.pkl")
nMPPI_costs = joblib.load(path + "nMPPI_costs_trails_average.pkl")

## Plot the costs

In [None]:
import matplotlib.pyplot as plt

task_name = task.__class__.__name__
# print(xNES_costs)
# # Plot the costs
plt.figure(figsize=(10, 6))

xNES_costs = np.array(xNES_costs).T
MPPI_costs = np.array(MPPI_costs).T
xMPPI_costs = np.array(xMPPI_costs).T
CMAES_costs = np.array(CMAES_costs).T
CEM_costs = np.array(CEM_costs).T
GS_costs = np.array(GS_costs).T
PS_costs = np.array(PS_costs).T
nMPPI_costs = np.array(nMPPI_costs).T

plt.plot(xNES_costs,   label="xNES")
plt.plot(MPPI_costs,   label="MPPI")
plt.plot(xMPPI_costs,   label="xMPPI")
plt.plot(nMPPI_costs,   label="nMPPI")
plt.plot(CMAES_costs,  label="CMA-ES")
plt.plot(CEM_costs,  label="CEM")
plt.plot(GS_costs,  label="Randomized smoothing")
plt.plot(PS_costs,  label="Predictive sampling")


plt.title( task_name + " Task")
plt.xlabel("Iteration")
plt.ylabel("Cost")

# single legend call:
plt.legend(loc="best")
plt.tight_layout()
plt.show()

## Visualization of trajectory rollouts

In [None]:
# to.visualize_rollout(task, controller=ps)
# to.visualize_rollout(task, controller=mppi)

to.visualize_rollout(task, controller=xmppi)
# to.visualize_rollout(task, controller=xnes)

In [None]:
# to.visualize_rollout(task, controller=gs)

In [None]:
to.visualize_rollout(task, controller=cem)