## Import packages

In [1]:
from mushroom_rl.core import Core, Agent
from loco_mujoco import LocoEnv
import mujoco
import numpy as np
from numpy import linalg as LA
from operator import itemgetter 
from scipy.interpolate import CubicSpline
import matplotlib.pyplot as plt

## Load Agent from checkpoint
Sometimes the agent would fumble or the heel strike detection is not working perfectly, so we set a cycle_length_cutoff that sets the minimum length of a complete cycle. If a cycle is shorter than this, it will get discarded

In [2]:
dataset_env = LocoEnv.make('HumanoidTorque.walk', dataset_type='perfect', random_start=False)
trained_env = LocoEnv.make("HumanoidTorque.walk")

agent = Agent.load("./logs/loco_mujoco_evalution_2024-09-29_21-32-22/env_id___HumanoidTorque.walk/0/agent_epoch_180_J_150.328129.msh")

In [3]:
dataset_heel_strikes, dataset_grfs = dataset_env.play_trajectory(n_episodes=5, get_dataset=True, record=False, render=False)

In [4]:
dataset = np.load('../loco_mujoco/datasets/humanoids/perfect/humanoid_torque_walk/perfect_expert_dataset_det.npz')
dataset = (dataset['states'][:5000], dataset['actions'][:5000])

In [5]:
obs_keys = dict(
    q_hip_flexion_l=9,
    q_knee_angle_l=12,
    q_ankle_angle_l=13,
    dq_hip_flexion_l=28,
    dq_knee_angle_l=31,
    dq_ankle_angle_l=32,
)
act_keys = dict(
    mot_hip_flexion_l=8,
    mot_knee_angle_l=11,
    mot_ankle_angle_l=12,
)

In [6]:
processed_dataset = []
for i in range(5000):
    data = []
    for j in obs_keys.values():
        data.append(dataset[0][i][j])
    for k in act_keys.values():
        data.append(dataset[1][i][k])
    data.extend(dataset_grfs[i])

    processed_dataset.append(np.array(data))

processed_dataset = np.array(processed_dataset)
print(processed_dataset.shape)

(5000, 12)


In [7]:
def trained_get_heelstrike_and_grf(sample):
    global trained_step
    global trained_l_foot_contact
    global trained_heel_strikes
    global trained_grfs

    # heel strike
    contact_group = []
    for coni in range(trained_env._data.ncon):
        con = trained_env._data.contact[coni]
        geom1 = mujoco.mj_id2name(trained_env._model, mujoco.mjtObj.mjOBJ_GEOM, con.geom1)
        geom2 = mujoco.mj_id2name(trained_env._model, mujoco.mjtObj.mjOBJ_GEOM, con.geom2)
        contact_group.append(set([geom1, geom2]))

    if set(['floor', 'foot_box_l']) in contact_group:
        if not trained_l_foot_contact:
            trained_l_foot_contact = True
            trained_heel_strikes.append(trained_step)
    else:
        trained_l_foot_contact = False

    # grf
    trained_l_grf = trained_env._get_ground_forces()[-3:]
    trained_grfs.append(trained_l_grf)
        
    trained_step +=1

In [8]:
trained_step = 0
trained_l_foot_contact = False
trained_heel_strikes = []
trained_grfs = []
core = Core(agent, trained_env, callback_step=trained_get_heelstrike_and_grf)
trained_dataset = core.evaluate(n_episodes=5, render=False, record=False)

                                             

In [9]:
processed_trained_dataset = []
# format: obs(6), act(3), grf(3)
for i in range(len(trained_dataset)):
    data = []
    for j in obs_keys.values():
        data.append(trained_dataset[i][0][j])
    for k in act_keys.values():
        data.append(trained_dataset[i][1][k])
    data.extend(trained_grfs[i])
    
    processed_trained_dataset.append(np.array(data))

processed_trained_dataset = np.array(processed_trained_dataset)
print(processed_trained_dataset.shape)

(793, 12)


In [10]:
cycle_length_cutoff = 90

In [11]:
trained_cycles = []
trained_cycle_lengths = np.diff(trained_heel_strikes)
for i, trained_cycle_length in enumerate(trained_cycle_lengths):
    if trained_cycle_length > cycle_length_cutoff:
        trained_cycles.append(processed_trained_dataset[trained_heel_strikes[i]:trained_heel_strikes[i+1]])
print(f'Number of recorded trained_cycle: {len(trained_cycle_lengths)}')
print(f'Cycle length cutff: {cycle_length_cutoff}')
print(f'Number of effective trained_cycle: {len(trained_cycles)}')

Number of recorded trained_cycle: 45
Cycle length cutff: 90
Number of effective trained_cycle: 0


In [12]:
n_sample = 20
rng = np.random.default_rng()
idxs = np.arange(len(trained_cycles))
rng.shuffle(idxs, axis=0)

trained_sampled_cycles = itemgetter(*idxs[:n_sample])(trained_cycles)
trained_sampled_cycles_lengths = [len(cycle) for cycle in trained_sampled_cycles]
trained_mean_length = round(np.mean(trained_sampled_cycles_lengths))
assert len(trained_sampled_cycles) == n_sample
print(f'Number of sampled cycle: {n_sample}')
print(f'Env Mean length: {trained_mean_length}')

TypeError: itemgetter expected 1 argument, got 0