In [1]:
# Imports
import numpy as np
import copy as cp

# Mujoco
import mujoco
import mujoco_viewer

# Controller functions
from control.controllers.mppi_locomotion import MPPI

from utils.tasks import get_task
from utils.transforms import batch_world_to_local_velocity

# Visualization
from matplotlib.animation import FuncAnimation
from IPython.display import HTML
import matplotlib.pyplot as plt

import csv
import os

ModuleNotFoundError: No module named 'sklearn'

In [2]:
import os
os.getcwd()


'/Users/anoushka/Projects/whole-body-mppi-improvements/legged_mppi'

## Choose paramters
#### Task:
* `'stand'` or `'stand_rugged_terr'`
* `'walk_straight'` or `'walk_straight_rugged_terr'`
* `'walk_octagon'` or `'walk_octagon_rugged_terr'`
* `'big_box'` or `'big_box_rugged_terr'`
* `'stairs'`

In [2]:
q_noise_std = 0.03  
v_noise_std = 0.2 

In [3]:
task = 'walk_straight_rugged_terr'
warm_start = True
imperfect_state_estimation = True

In [4]:
# Task
task = 'stairs'
task_data = get_task(task)
model_path = task_data['sim_path']

In [5]:
# Model visualizer
model_sim = mujoco.MjModel.from_xml_path(model_path)
dt_sim = 0.01
model_sim.opt.timestep = dt_sim
data_sim = mujoco.MjData(model_sim)
viewer = mujoco_viewer.MujocoViewer(model_sim, data_sim, 'offscreen')

In [None]:
# Reset robot (keyframes are defined in the xml)
mujoco.mj_resetDataKeyframe(model_sim, data_sim, 0) # stand position
mujoco.mj_forward(model_sim, data_sim)
q_init = cp.deepcopy(data_sim.qpos) # save reference pose
v_init = cp.deepcopy(data_sim.qvel) # save reference pose
print("Configuration: {}".format(q_init)) # save reference pose

In [None]:
img = viewer.read_pixels()
plt.imshow(img)

In [None]:
# Initialize controller
controller = MPPI(task=task)
controller.internal_ref = True
controller.reset_planner()

In [9]:
q_curr = cp.deepcopy(data_sim.qpos) # save reference pose
v_curr = cp.deepcopy(data_sim.qvel) # save reference pose
x = np.concatenate([q_curr, v_curr])

In [10]:
# Set simulation time
tfinal = 8 # 14 for stairs, 30 for walk_octagon
tvec = np.linspace(0,tfinal,int(np.ceil(tfinal/dt_sim))+1)

In [11]:
mujoco.mj_resetDataKeyframe(model_sim, data_sim, 1)
mujoco.mj_forward(model_sim, data_sim)

In [None]:
viewer.cam.distance = 4.2
viewer.cam.lookat[:] = [2.2, 0, 0.27]
controller.n_knots

In [None]:
img = viewer.read_pixels()
plt.imshow(img)

In [None]:
%%time
# Run simulation
anim_imgs = []
sim_inputs = []
x_states = []
error_history = []
max_cost_history = []
min_cost_history = []
cost_norm_history = []
num_iter_per_goal = np.zeros(len(controller.goal_pos))

for ticks, ti in enumerate(tvec):
    if imperfect_state_estimation:
        q_curr = cp.deepcopy(data_sim.qpos) + np.random.normal(0, q_noise_std, size=data_sim.qpos.shape)
        v_curr = cp.deepcopy(data_sim.qvel) + np.random.normal(0, v_noise_std, size=data_sim.qvel.shape)
    else:
        q_curr = cp.deepcopy(data_sim.qpos)
        v_curr = cp.deepcopy(data_sim.qvel)
    x = np.concatenate([q_curr, v_curr])
    
    if ticks%1 == 0:
        u_joints, cost_info = controller.update(x)  
        max_cost_history.append(cost_info['max_cost'])
        min_cost_history.append(cost_info['min_cost'])
        cost_norm_history.append(np.linalg.norm(cost_info['max_cost'] - cost_info['min_cost']))

    data_sim.ctrl[:] = u_joints
    mujoco.mj_step(model_sim, data_sim)
    mujoco.mj_forward(model_sim, data_sim)

    error = np.linalg.norm(np.array(controller.body_ref[:3]) - np.array(data_sim.qpos[:3]))
    error_history.append(error)
    
    viewer.add_marker(
        pos=controller.body_ref[:3]*1,         # Position of the marker
        size=[0.15, 0.15, 0.15],     # Size of the sphere
        rgba=[1, 0, 1, 1],           # Color of the sphere (red)
        type=mujoco.mjtGeom.mjGEOM_SPHERE, # Specify that this is a sphere
        label=""
    )

    if error < controller.goal_thresh[controller.goal_index]:
        controller.next_goal()
    
    num_iter_per_goal[controller.goal_index] += 1
    
    img = viewer.read_pixels()
    if ticks % 2 == 0:
        anim_imgs.append(img)
    sim_inputs.append(u_joints)
    x_states.append(x)

In [None]:
plt.bar(np.arange(len(num_iter_per_goal)), num_iter_per_goal)
plt.title("Iteration for each goal")

In [None]:
plt.plot(min_cost_history, label="min_cost")
plt.plot(max_cost_history, label='max_cost')
plt.plot(cost_norm_history, label= 'cost_diff')
plt.title("Cost over time")
plt.legend()

In [None]:
plt.plot(error_history)
plt.title("Error Over Time")

In [None]:
x_states_np = np.array(x_states)
orientations = x_states_np[:,3:7]
velocities = x_states_np[:,19:22]
velocities_trans = batch_world_to_local_velocity(orientations, velocities)

plt.plot(velocities_trans, label=["vx", "vy", "vz"])
plt.title("Body velocities in world frame")
plt.legend()

In [None]:
angular_velocities = x_states_np[:,22:25]
plt.plot(angular_velocities, label=["wx", "wy", "wz"])
plt.title("Angular velocities")
plt.legend()

In [None]:
plt.plot(orientations, label=["w", "x", "y", "z"])
plt.title("Orientation")
plt.legend()

In [None]:
plt.plot(x_states_np[:,7:10], label=["hip","thigh","calf"])
plt.title("Joint angles FR leg")

In [None]:
plt.plot(x_states_np[:,10:13], label=["hip","thigh","calf"])
plt.title("Joint angles FL leg")

In [None]:
plt.plot(x_states_np[:,13:16], label=["hip","thigh","calf"])
plt.title("Joint angles RR leg")

In [None]:
plt.plot(x_states_np[:,16:19], label=["hip","thigh","calf"])
plt.title("Joint angles RL leg")

In [23]:
import csv

# Save the time-series data
time_series_filename = f"simulation_timeseries_{rugged_terr}_{warm_start}_{imperfect_state_estimation}.csv"
with open(time_series_filename, mode='w', newline='') as file:
    writer = csv.writer(file)
    # Write the header
    writer.writerow(['time_step', 'error_history', 'max_cost_history', 'min_cost_history', 'cost_norm_history', 'rugged_terr', 'warm_start', 'imperfect_state'])
    # Write each row of time-series data
    for i in range(len(error_history)):
        writer.writerow([
            i,
            error_history[i],
            max_cost_history[i],
            min_cost_history[i],
            cost_norm_history[i],
            rugged_terr,
            warm_start,
            imperfect_state_estimation
        ])

# Save the goal iteration data
goal_filename = f"simulation_goal_data_{rugged_terr}_{warm_start}_{imperfect_state_estimation}.csv"
with open(goal_filename, mode='w', newline='') as file:
    writer = csv.writer(file)
    # Write the header
    writer.writerow(['goal_index', 'num_iter_per_goal'])
    # Write each row of goal data
    for i, value in enumerate(num_iter_per_goal):
        writer.writerow([i, value])

print(f"Time-series data saved to {time_series_filename}")
print(f"Goal data saved to {goal_filename}")


In [None]:
import csv
import matplotlib.pyplot as plt
import os
import numpy as np

# List CSV files with descriptive names and their corresponding condition keys
csv_files = [
    ("simulation_goal_data_False_False_False.csv", "Flat, Perfect State, Cold Start"),
    ("simulation_goal_data_False_True_False.csv", "Flat, Perfect State, Warm Start"),
    ("simulation_goal_data_False_False_True.csv", "Flat, Imperfect State, Cold Start"),
    ("simulation_goal_data_False_True_True.csv", "Flat, Imperfect State, Warm Start"),
    ("simulation_goal_data_True_False_False.csv", "Bumpy, Perfect State, Cold Start"),
    ("simulation_goal_data_True_True_False.csv", "Bumpy, Perfect State, Warm Start"),
    ("simulation_goal_data_True_False_True.csv", "Bumpy, Imperfect State, Cold Start"),
    ("simulation_goal_data_True_True_True.csv", "Bumpy, Imperfect State, Warm Start"),
]

# Prepare data for plotting
goal_data = {
    "Flat, Perfect State": {"Cold Start": [], "Warm Start": []},
    "Flat, Imperfect State": {"Cold Start": [], "Warm Start": []},
    "Bumpy, Perfect State": {"Cold Start": [], "Warm Start": []},
    "Bumpy, Imperfect State": {"Cold Start": [], "Warm Start": []},
}

# Read data from files
for file, label in csv_files:
    if os.path.exists(file):  # Check if the file exists
        with open(file, mode="r") as f:
            reader = csv.DictReader(f)
            num_iter = [float(row["num_iter_per_goal"]) for row in reader]

            # Determine the condition and start type
            parts = label.split(", ")
            condition = ", ".join(parts[:2])  # First two parts are the condition
            start_type = parts[2]  # Third part is the start type

            if num_iter[0] == 0:
                num_iter.pop(0)
            goal_data[condition][start_type] = num_iter
    else:
        print(f"File {file} not found!")

print(goal_data)
# Plot the data
plt.figure(figsize=(12, 6))
bar_width = 0.35
x = np.arange(len(goal_data))  # Number of conditions

for i, (condition, starts) in enumerate(goal_data.items()):
    cold_start = starts["Cold Start"][:2]  # Goals 1 and 2
    warm_start = starts["Warm Start"][:2]

    # Plot cold start bars
    plt.bar(
        x[i] - bar_width / 2, cold_start, bar_width,
        color="#d9f0f6",label="Cold Start" if i == 0 else ""
    )

    # Plot warm start bars with hatching
    plt.bar(
        x[i] + bar_width / 2, warm_start, bar_width,
        color="#91c3e5", hatch="//", label="Warm Start" if i == 0 else ""
    )

# Add labels, title, and legend
plt.title("Comparison of Warm Start vs Cold Start Across Conditions")
plt.xlabel("Conditions")
plt.ylabel("Number of Iterations")
plt.xticks(x, goal_data.keys(), rotation=45, ha="right")
plt.legend()
plt.tight_layout()
plt.show()


In [None]:
plt.plot(max_cost_history)
plt.title("max_cost_history")
plt.legend()

In [None]:
plt.plot(min_cost_history)
plt.title("min_cost_history")
plt.legend()

In [None]:
plt.plot(cost_norm_history)
plt.title("cost_norm_history")
plt.legend()

In [None]:
plt.plot(error_history)
plt.title("Error")
plt.legend()

In [None]:
plt.plot(sim_inputs_np[:,0:3], label=["hip", "thigh", "calf"])
plt.title("FR")
plt.legend()

In [None]:
plt.plot(sim_inputs_np[:,3:6], label=["hip", "thigh", "calf"])
plt.title("FL")
plt.legend()

In [None]:
plt.plot(sim_inputs_np[:,6:9], label=["hip", "thigh", "calf"])
plt.title("RR")
plt.legend()

In [None]:
plt.plot(sim_inputs_np[:,9:], label=["hip", "thigh", "calf"])
plt.title("RL")
plt.legend()

## Animation

In [None]:
# Get the resolution of the images
image_height, image_width = anim_imgs[0].shape[:2]

# Set the figure size to match the image resolution
fig, ax = plt.subplots(figsize=(image_width / 500, image_height / 500), dpi=100)
skip_frames = 5
interval = dt_sim*1000*skip_frames
# Remove the white border by setting margins and padding to zero
fig.subplots_adjust(left=0, right=1, top=1, bottom=0)
ax.set_position([0, 0, 1, 1])

def animate(i):
    ax.clear()
    ax.imshow(anim_imgs[i * skip_frames])  # Display the image, skipping frames
    ax.axis('off')

# Create animation, considering the reduced frame rate due to skipped frames
ani = FuncAnimation(fig, animate, frames=len(anim_imgs) // skip_frames, interval=interval)  # 50 ms for 20 Hz

# Display the animation
HTML(ani.to_jshtml())

In [None]:
from PIL import Image
import numpy as np
import cv2

# Get the resolution of the images
image_height, image_width = anim_imgs[0].shape[:2]

# List to store frames as PIL images
gif_frames = []

skip_frames = 10  # Adjust as needed

# Convert frames to PIL Image format
for i in range(0, len(anim_imgs), skip_frames):
    frame = anim_imgs[i]
    # Convert the frame to 8-bit unsigned integer if needed (OpenCV compatibility)
    if frame.dtype != np.uint8:
        frame = (frame * 255).astype(np.uint8)
    
    # Convert BGR to RGB (Pillow expects RGB format)
    frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    pil_image = Image.fromarray(frame)
    
    # Append to gif frames list
    gif_frames.append(pil_image)

# Save the frames as a GIF
output_filename = 'stairs_quintic_k_5.gif'
gif_frames[0].save(
    output_filename,
    save_all=True,
    append_images=gif_frames[1:],
    duration=1000 / 20,  # Frame duration in milliseconds (20 fps)
    loop=0  # 0 means the GIF will loop indefinitely
)

print(f"GIF saved as {output_filename}")
