# Reflex RL - Modulating reflex module gains with RL

In [None]:
import warnings
warnings.filterwarnings("ignore", category=DeprecationWarning)

import matplotlib.pyplot as plt

import ReflexInterface_RL
import numpy as np
from scipy.interpolate import interp1d

import gymnasium
import argparse
from datetime import datetime

from gymnasium.envs.registration import register

from stable_baselines3 import PPO
from stable_baselines3.common.noise import NormalActionNoise
from stable_baselines3.common.callbacks import CheckpointCallback

from stable_baselines3.common.vec_env import DummyVecEnv, SubprocVecEnv
from stable_baselines3.common.env_util import make_vec_env
from stable_baselines3.common.env_checker import check_env
#from stable_baselines3.common.utils import get_device

import os
import skvideo.io
import copy

from base64 import b64encode
from IPython.display import HTML

def show_video(video_path, video_width = 500):
    video_file = open(video_path, "r+b").read()
    video_url = f"data:video/mp4;base64,{b64encode(video_file).decode()}"
    return HTML(f"""<video autoplay width={video_width} controls><source src="{video_url}"></video>""")

register(
    id="MyoReflex_RL-v0",
    entry_point="ReflexInterface_RL:ReflexEnv",
    max_episode_steps=2000,
)

In [None]:
# Run clean
# from scipy.interpolate import interp1d

def detect_strides(vgrf, threshold=0.2, min_stride_duration=0.05, dt=0.01):
    """
    Detect strides using vGRF-based FSM.
    """
    heel_strikes = []
    strides = []

    state = "SWING"
    stance_start = None

    for i, force in enumerate(vgrf):
        new_state = "STANCE" if force > threshold else "SWING"

        if new_state == "STANCE" and state == "SWING":
            heel_strikes.append(i)
            if stance_start is not None:
                stride_duration = (i - stance_start) * dt
                if stride_duration >= min_stride_duration:
                    strides.append((stance_start, i))
            stance_start = i

        state = new_state

    return heel_strikes, strides


# Run clean
warnings.filterwarnings("ignore", category=DeprecationWarning)

param_filename = 'myorfl_Kine_2D_1_25_2024Nov25_1718_22mus_DelayKine_Best'
params = np.loadtxt(f"reflex_param/{param_filename}.txt")

eval_env = gymnasium.make('MyoReflex_RL-v0', init_pose='walk', dt=0.01, mode='2D', tgt_field_ver=0, reflex_params=params, episode_limit=2000, 
                          stim_mode='reflex', tgt_vel_mode='eval', delta_mode='delayed', delta_control_mode='sym')
obs, _ = eval_env.reset()
test_model = PPO.load("PPO_outputV8\logs\PPO_4000000_steps", env=eval_env)

frames = []
timesteps = []
pelvisV = []

reward_list = []
reward_dict = {
    'alive_rew': [],
    'truncated': [],
    'effort': [],
    'v_tgt': [],
    'distance': [],
    'timesteps': []
}

# Initialize kinematic data storage
joint_array = []
vgrf = []

# Run simulation and collect rewards and kinematic data
for timestep in np.arange(2000):
    frame = eval_env.MyoEnv.sim.renderer.render_offscreen(camera_id=1)
    frames.append(frame)

    pelvisV_curr = eval_env.SENSOR_DATA['body']['pelvis_vel']
    pelvisV.append(pelvisV_curr)
    
    action, _states = test_model.predict(obs, deterministic=True)
    obs, rewards, is_done, info, _ = eval_env.step(action)
    reward_list.append(rewards)

    # Extract individual rewards from debug_reward_dict
    diag_rew_dict = eval_env.debug_reward_dict

    # Multiply rewards by their weights and store
    for key in reward_dict.keys():
        if key != 'timesteps':
            reward_value = diag_rew_dict.get(key, 0)  # Default to 0 if key is missing
            weight = eval_env.reward_wt.get(key, 1)  # Default to 1 if weight is missing
            reward_dict[key].append(reward_value * weight)
    reward_dict['timesteps'].append(timestep)

    # Collect joint kinematics and vGRF
    joint_array.append([
        eval_env.MyoEnv.sim.data.joint('hip_flexion_r').qpos[0],
        eval_env.MyoEnv.sim.data.joint('hip_flexion_r').qvel[0],
        eval_env.MyoEnv.sim.data.joint('knee_angle_r').qpos[0],
        eval_env.MyoEnv.sim.data.joint('knee_angle_r').qvel[0],
        eval_env.MyoEnv.sim.data.joint('ankle_angle_r').qpos[0],
        eval_env.MyoEnv.sim.data.joint('ankle_angle_r').qvel[0],
    ])
    vgrf.append(eval_env.SENSOR_DATA['r_leg']['load_ipsi'])

    if is_done:
        break

# Convert lists to arrays for processing
joint_array = np.array(joint_array)
vgrf = np.array(vgrf)

# Detect strides
heel_strikes, stride_segments = detect_strides(vgrf)

# Interpolate strides
interpolated_strides = {k: [] for k in ["hip_angle", "hip_velocity", "knee_angle", "knee_velocity", "ankle_angle", "ankle_velocity"]}
for start_idx, end_idx in stride_segments:
    stride_time = np.linspace(0, 1, end_idx - start_idx)
    for joint_idx, joint_name in enumerate(interpolated_strides.keys()):
        f = interp1d(stride_time, joint_array[start_idx:end_idx, joint_idx], kind="linear", fill_value="extrapolate")
        interpolated_strides[joint_name].append(f(np.linspace(0, 1, 101)))

# Average strides
for joint_name in interpolated_strides:
    if interpolated_strides[joint_name]:
        interpolated_strides[joint_name] = np.mean(interpolated_strides[joint_name], axis=0).tolist()
    else:
        interpolated_strides[joint_name] = [0.0] * 101  # Default to zero for missing data

# Final processed kinematic data is stored in `interpolated_strides`

param_filename = '2D_PPO_Test'
skvideo.io.vwrite(f"{param_filename}.mp4",
                  np.asarray(frames), inputdict={"-r": "30"}, outputdict={"-r": "30", "-pix_fmt": "yuv420p"})
show_video(f"{param_filename}.mp4")

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

def plot_kinematics_with_reference(interpolated_strides, reference_file):
    """
    Plots joint kinematics for one stride with overlaid reference data.

    Parameters:
    - interpolated_strides: Dictionary containing interpolated kinematic data for one stride.
    - reference_file: Path to the CSV file containing reference kinematics data.
    """

    # Load and process reference data
    ref_data = pd.read_csv(reference_file, header=None).to_numpy()
    rad_to_deg = 180 / np.pi  # Conversion factor from radians to degrees
    # rad_to_deg = 1 

    # Extract relevant reference columns (Right side: 4, 5, 6)
    reference_kinematics = {
        "hip_angle": (ref_data[:, 1] * rad_to_deg)/10,
        "knee_angle": (ref_data[:, 2] * rad_to_deg)/10,
        "ankle_angle": (ref_data[:, 3] * rad_to_deg)/10
    }

    # Define joint names for plotting
    joint_names = ["Hip Angle", "Hip Velocity", "Knee Angle", "Knee Velocity", "Ankle Angle", "Ankle Velocity"]
    joint_keys = ["hip_angle", "hip_velocity", "knee_angle", "knee_velocity", "ankle_angle", "ankle_velocity"]

    # Create subplots for joint kinematics
    fig, axes = plt.subplots(len(joint_keys), 1, figsize=(10, 15), sharex=True)
    fig.suptitle("Joint Kinematics for One Stride with Reference Data", fontsize=16)

    # Plot data for each joint
    for idx, (joint_name, joint_key) in enumerate(zip(joint_names, joint_keys)):
        ax = axes[idx]
        ax.set_title(joint_name)
        ax.set_xlabel("Gait Cycle (%)")
        ax.set_ylabel("Angle (deg)" if "Angle" in joint_name else "Velocity (deg/s)")

        # Plot interpolated stride data
        stride_data = interpolated_strides.get(joint_key, None)
        if stride_data is not None and len(stride_data) > 0:
            ax.plot(
                np.linspace(0, 100, len(stride_data)),  # Gait cycle percentage
                stride_data,
                label="Model Data",
                color="blue",
                alpha=0.8
            )

        # Overlay reference data (only for angles, not velocities)
        if "Angle" in joint_name and joint_key in reference_kinematics:
            ref_joint_data = reference_kinematics[joint_key.replace("_velocity", "_angle")]
            ax.plot(
                np.linspace(0, 100, len(ref_joint_data)),  # Gait cycle percentage
                ref_joint_data,
                label="Reference Data",
                color="red",
                linestyle="--",
                alpha=0.8
            )

        ax.grid(True)
        ax.legend()

    plt.tight_layout(rect=[0, 0.03, 1, 0.97])  # Adjust layout to include title
    plt.show()


# Call the function with interpolated strides and reference data
reference_file = "ref_kinematics_radians_mod.csv"
plot_kinematics_with_reference(interpolated_strides, reference_file)

In [None]:
def plot_reference_kinematics(reference_file):
    """
    Quickly plots the reference joint kinematics data for visualization.

    Parameters:
    - reference_file: Path to the CSV file containing reference kinematics data.
    """

    # Load and process reference data
    ref_data = pd.read_csv(reference_file, header=None).to_numpy()
    rad_to_deg = 180 / np.pi  # Conversion factor from radians to degrees

    # Extract relevant reference columns (Right side: 4, 5, 6)
    reference_kinematics = {
        "Hip Angle": -1*(ref_data[:, 1] * rad_to_deg)+180, # + = flexion
        "Knee Angle": -1*(ref_data[:, 2] * rad_to_deg)+180,  # + = flexion
        "Ankle Angle": -1*(ref_data[:, 3] * rad_to_deg)+90 # + = flexion
    }
    # Create a figure with subplots for joint kinematics
    fig, axes = plt.subplots(len(reference_kinematics), 1, figsize=(10, 10), sharex=True)
    fig.suptitle("Reference Joint Kinematics", fontsize=16)

    # Plot data for each joint
    for idx, (joint_name, joint_data) in enumerate(reference_kinematics.items()):
        ax = axes[idx]
        ax.set_title(joint_name)
        ax.set_xlabel("Gait Cycle (%)")
        ax.set_ylabel("Angle (deg)")

        # Plot the reference data
        ax.plot(
            np.linspace(0, 100, len(joint_data)),  # Gait cycle percentage
            joint_data,
            label="Reference Data",
            color="black",
            alpha=0.8
        )

        ax.grid(True)
        ax.legend()

    # plt.tight_layout(rect=[0, 0.03, 1, 0.97])  # Adjust layout to include title
    plt.show()


# Call the function with the reference file
reference_file = "ref_kinematics_radians_mod.csv"
plot_reference_kinematics(reference_file)

In [None]:
# No reference data

def plot_kinematics_single_stride(interpolated_strides):
    """
    Generates subplots for joint kinematics (one stride) from interpolated kinematics data.

    Parameters:
    - interpolated_kinematics: A dictionary containing interpolated kinematic data for one stride.
    """

    # Define joint names for plotting
    # joint_names = ["Hip Angle", "Hip Velocity", "Knee Angle", "Knee Velocity", "Ankle Angle", "Ankle Velocity"]
    # joint_keys = ["hip_angle", "hip_velocity", "knee_angle", "knee_velocity", "ankle_angle", "ankle_velocity"]

    joint_names = ["Hip Angle", "Knee Angle",  "Ankle Angle"]
    joint_keys = ["hip_angle", "knee_angle", "ankle_angle"]

    # Conversion factor: radians to degrees
    rad_to_deg = 180 / np.pi

    # Create a figure with subplots for joint kinematics
    fig, axes = plt.subplots(len(joint_keys), 1, figsize=(10, 15), sharex=True)
    fig.suptitle("Joint Kinematics for One Stride", fontsize=16)

    # Plot data for each joint
    for idx, (joint_name, joint_key) in enumerate(zip(joint_names, joint_keys)):
        ax = axes[idx]
        ax.set_title(joint_name)
        ax.set_xlabel("Gait Cycle (%)")
        ax.set_ylabel("Angle (deg)" if "Angle" in joint_name else "Velocity (deg/s)")

        # Get joint data for the specific key
        joint_data = interpolated_strides.get(joint_key, None)
        if joint_data is None or len(joint_data) == 0:
            print(f"Skipping empty or missing data for {joint_name}")
            continue

        # Convert radians to degrees for angles and velocities
        if "Angle" in joint_name or "Velocity" in joint_name:
            joint_data = np.array(joint_data) * rad_to_deg

        # Plot the data
        ax.plot(
            np.linspace(0, 100, len(joint_data)),  # Gait cycle in percentage
            joint_data,
            label=joint_name,
            alpha=0.8,
            color="red"
        )

        ax.grid(True)

    plt.tight_layout(rect=[0, 0.03, 1, 0.97])  # Adjust layout to include title
    plt.show()


# Call the function with your interpolated kinematics data
plot_kinematics_single_stride(interpolated_strides)

In [None]:
import json
import numpy as np
import matplotlib.pyplot as plt

def plot_kinematics_with_cma_reference(interpolated_strides, reference_file):
    """
    Plots joint kinematics for one stride from simulation and overlays it with averaged reference data
    from a JSON file.

    Parameters:
    - interpolated_strides: Dictionary containing interpolated kinematic data for one stride.
    - reference_file: Path to the JSON file containing reference kinematics data.
    """

    # Load and process reference data
    with open(reference_file, "r") as json_file:
        stride_data = json.load(json_file)

    # Define joint names for plotting
    joint_names = ["Hip Angle", "Knee Angle", "Ankle Angle"]
    joint_keys = ["hip_angle", "knee_angle", "ankle_angle"]

    # Conversion factor: radians to degrees
    rad_to_deg = 180 / np.pi

    # Specify the reference condition to use
    reference_condition = "No Exo"  # Adjust this as needed
    if reference_condition not in stride_data:
        print(f"Condition '{reference_condition}' not found in JSON data.")
        return

    reference_trials = stride_data[reference_condition]

    # Create subplots for joint kinematics
    fig, axes = plt.subplots(len(joint_keys), 1, figsize=(10, 10), sharex=True)
    # fig.suptitle("Joint Kinematics Comparison: Simulation vs. CMA Reference", fontsize=16)

    # Plot data for each joint
    for idx, (joint_name, joint_key) in enumerate(zip(joint_names, joint_keys)):
        ax = axes[idx]
        ax.set_title(joint_name)
        ax.set_xlabel("Gait Cycle (%)")
        ax.set_ylabel("Angle (deg)")

        # Plot interpolated stride data (simulated data)
        stride_data = interpolated_strides.get(joint_key, None)
        if stride_data is not None and len(stride_data) > 0:
            ax.plot(
                np.linspace(0, 100, len(stride_data)),  # Gait cycle percentage
                np.array(stride_data) * rad_to_deg,  # Convert to degrees
                label="PPO Data",
                color="red",
                alpha=0.8
            )

        # Collect all trials for the current joint
        joint_averages = []
        for trial_name, trial in reference_trials.items():
            joint_data = trial.get(joint_key, None)
            if joint_data is not None and len(joint_data) > 0:
                joint_averages.append(np.array(joint_data) * rad_to_deg)

        # Compute and plot the average if data exists
        if len(joint_averages) > 0:
            avg_reference_data = np.mean(joint_averages, axis=0)
            ax.plot(
                np.linspace(0, 100, len(avg_reference_data)),  # Gait cycle percentage
                avg_reference_data,
                label="CMA Data",
                color="black",
                linestyle="--",
                alpha=0.8
            )

        ax.grid(True)
        ax.legend()

    # plt.tight_layout(rect=[0, 0.03, 1, 0.97])  # Adjust layout to include title
    plt.show()


# Call the function with your interpolated strides and JSON reference file
reference_file = "processed_data_mod.json"
plot_kinematics_with_cma_reference(interpolated_strides, reference_file)

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

def plot_combined_kinematics(interpolated_strides, reference_file):
    """
    Combines the simulation kinematics (one stride) with reference kinematics data
    in a single plot for comparison.

    Parameters:
    - interpolated_strides: Dictionary containing interpolated kinematic data for one stride.
    - reference_file: Path to the CSV file containing reference kinematics data.
    """

    # Load and process reference data
    ref_data = pd.read_csv(reference_file, header=None).to_numpy()
    rad_to_deg = 180 / np.pi  # Conversion factor from radians to degrees

    # Extract relevant reference columns (Right side: 4, 5, 6)
    reference_kinematics = {
        "hip_angle": -1*((ref_data[:, 1] * rad_to_deg)-170),
        "knee_angle": (ref_data[:, 2] * rad_to_deg)-175,
        "ankle_angle": -1*((ref_data[:, 3] * rad_to_deg)-85)
    }

    # # Extract relevant reference columns (Right side: 4, 5, 6)
    # reference_kinematics = {
    #     "hip_angle": -1*((ref_data[:, 1] * rad_to_deg)),
    #     "knee_angle": (ref_data[:, 2] * rad_to_deg),
    #     "ankle_angle": -1*((ref_data[:, 3] * rad_to_deg))
    # }

    # Define joint names and keys
    joint_names = ["Hip Angle", "Knee Angle", "Ankle Angle"]
    joint_keys = ["hip_angle", "knee_angle", "ankle_angle"]

    # Create a figure with subplots for joint kinematics
    fig, axes = plt.subplots(len(joint_keys), 1, figsize=(10, 10), sharex=True)
    # fig.suptitle("Joint Kinematics Comparison: Simulation vs. Reference", fontsize=16)

    # Plot data for each joint
    for idx, (joint_name, joint_key) in enumerate(zip(joint_names, joint_keys)):
        ax = axes[idx]
        ax.set_title(joint_name)
        ax.set_xlabel("Gait Cycle (%)")
        ax.set_ylabel("Angle (deg)")

        # Plot reference data
        if joint_key in reference_kinematics:
            ax.plot(
                np.linspace(0, 100, len(reference_kinematics[joint_key])),  # Gait cycle percentage
                reference_kinematics[joint_key],
                label="Reference Data",
                color="black",
                linestyle="--",
                alpha=0.8
            )

        # Plot interpolated stride data (simulated data)
        stride_data = interpolated_strides.get(joint_key, None)
        if stride_data is not None and len(stride_data) > 0:
            ax.plot(
                np.linspace(0, 100, len(stride_data)),  # Gait cycle percentage
                np.array(stride_data) * rad_to_deg,  # Convert to degrees
                label="Simulation Data",
                color="red",
                alpha=0.8
            )

        ax.grid(True)
        ax.legend()

    # plt.tight_layout(rect=[0, 0.03, 1, 0.97])  # Adjust layout to include title
    plt.show()


# Call the function with your interpolated kinematics data and reference file
plot_combined_kinematics(interpolated_strides, "ref_kinematics_radians_mod.csv")

In [None]:
dt = 0.01

first_values = [array[0] for array in pelvisV]
indices = np.arange(len(first_values))*dt

window_size = 200
moving_avg = np.convolve(first_values, np.ones(window_size)/window_size, mode='valid')

plt.figure(figsize=(10, 6))
plt.plot(indices, first_values, label='First Value', alpha=0.6)
plt.plot(indices[window_size-1:], moving_avg, label=f'Moving Average ({window_size} indices)', color='orange', linewidth=2)
plt.xlabel('Time (s)')
plt.ylabel('Pelvis Velocity (m/s)')
plt.title('Pelvis Velocity')
plt.grid(True)
plt.show()

In [None]:
# Plot cumulative rewards over time
plt.figure(figsize=(10, 6))
plt.plot(reward_dict['timesteps'], reward_list, label="Cumulative Reward")
plt.xlabel("Timesteps")
plt.ylabel("Cumulative Reward")
plt.title("Cumulative Reward over Time")
plt.legend()
plt.grid(True)
plt.show()

# Save the plot if needed
plt.savefig("PPO_cumulative_rewards_plot.png")

In [None]:
# Plot individual rewards over time
plt.figure(figsize=(10, 6))

for key in reward_dict.keys():
    if key != 'timesteps':  # Skip timesteps key
        plt.plot(reward_dict['timesteps'], reward_dict[key], label=f"{key} (weighted)")

plt.xlabel("Timesteps")
plt.ylabel("Weighted Reward Value")
plt.title("Weighted Individual Rewards over Time")
plt.legend()
plt.grid(True)
plt.show()

# Save the plot if needed
plt.savefig("PPO_weighted_individual_rewards_plot.png")