# Analysis of motor synergies, Todorov's paper (panel A)

In [None]:
from definitions import ROOT_DIR
import os
import numpy as np
import pandas as pd
from functions_notebook import PCvsVar, plot_cumvar
import matplotlib.pyplot as plt
from sklearn.decomposition import PCA
import json
import joblib
from matplotlib.cm import get_cmap

## Load the rollouts from all tasks

In [None]:
data_dir = os.path.join(ROOT_DIR, "data", "rollouts")
control_tasks_dict = {
    "hand_pose": "hand_pose_1000_episodes_lattice.h5",
    "hand_reach": "hand_reach_1000_episodes_lattice.h5",
    "reorient": "reorient_1000_episodes_lattice.h5",
    "pen": "pen_1000_episodes_lattice.h5",
}
df_dict = {
    key: pd.read_hdf(os.path.join(data_dir, "control", value))
    for key, value in control_tasks_dict.items()
}

baoding_data_path = os.path.join(data_dir, "final_model_500_episodes_activations_info_small_variations_ccw", "data.hdf")
baoding_df = pd.read_hdf(baoding_data_path)
df_dict.update({"baoding": baoding_df})

early_baoding_data_path = os.path.join(data_dir, "step_12_500_episodes_activations_info_ccw", "data.hdf")
early_baoding_df = pd.read_hdf(early_baoding_data_path)
df_dict.update({"early_baoding": early_baoding_df})

In [None]:
def get_episode_vel(episode_pos):
    episode_vel = np.zeros_like(episode_pos)
    episode_vel[1:, :] = (episode_pos[1:] - episode_pos[:-1])  # 40 sim steps per second
    # episode_vel = signal.savgol_filter(episode_pos, window_length=3, polyorder=1, deriv=1, axis=0)
    return episode_vel

def get_pos_vel_act(df):
    if "task" in df.keys():
        pos_list = df.groupby(["episode", "task"])["observation"].agg(lambda x: np.vstack(x)[:, :23]).tolist()
    else:
        pos_list = df.groupby(["episode"])["observation"].agg(lambda x: np.vstack(x)[:, :23]).tolist()
    vel_list = [get_episode_vel(episode_pos) for episode_pos in pos_list]
    pos = np.vstack(pos_list)
    vel = np.vstack(vel_list)
    muscle_act = np.vstack(df.muscle_act)
    return pos, vel, muscle_act

task_pos_vel_act_dict = {}
for key, value in df_dict.items():
    pos, vel, muscle_act = get_pos_vel_act(value)
    task_pos_vel_act_dict[key] = {"pos": pos, "vel": vel, "muscle_act": muscle_act}

In [None]:
# Plot active muscles as a function of the activity threshold
threshold_vec = np.linspace(0, 1, 100)

active_muscles_dict = {}
for task, pos_vel_act_dict in task_pos_vel_act_dict.items():
    active_muscle_list = []
    muscle_act = pos_vel_act_dict["muscle_act"]
    for threshold in threshold_vec:
        muscle_act_bin = (muscle_act > threshold).astype(int).sum(axis=1)
        avg_active_muscles = np.mean(muscle_act_bin)
        active_muscle_list.append(avg_active_muscles)
    active_muscles_dict[task] = active_muscle_list
    
plt.figure(figsize=(10, 4))
for task, active_muscle_list in active_muscles_dict.items():
    plt.plot(threshold_vec, active_muscle_list, label=task)
plt.title("Average active muscles")
plt.xlabel("Activation threshold")
plt.ylabel("Number of muscles")
plt.legend()
plt.savefig(os.path.join(ROOT_DIR, "data", "figures", "rebuttal", f"activation_vs_threshold"), format="png", dpi=600, bbox_inches="tight")
plt.show()



In [None]:
# PCA of the hand pose, velocity and muscle activations for all tasks
num_joints = 23 
num_muscles = 39 
exp_var_dict = {
    "pos": {},
    "vel": {},
    "muscle_act": {}
}
for task, pos_vel_act_dict in task_pos_vel_act_dict.items():
    pos = pos_vel_act_dict["pos"]
    vel = pos_vel_act_dict["vel"]
    muscle_act = pos_vel_act_dict["muscle_act"]
    exp_var_pos = PCvsVar(pos, n_comp=num_joints)
    exp_var_vel = PCvsVar(vel, n_comp=num_joints)
    exp_var_muscle_act = PCvsVar(muscle_act, n_comp=num_muscles)
    exp_var_dict["pos"][task] = exp_var_pos
    exp_var_dict["vel"][task] = exp_var_vel
    exp_var_dict["muscle_act"][task] = exp_var_muscle_act


In [None]:
def get_dof_count(exp_var, threshold=0.85):
    cum_exp_var = np.cumsum(exp_var)
    for idx, val in enumerate(cum_exp_var):
        if val > threshold:
            return idx + 1

In [None]:
levels = [0.85, 0.95]
dof_count_dict = {}
for data_type, task_var_dict in exp_var_dict.items():
    dof_per_task_dict = {}
    for task, exp_var in task_var_dict.items():
        dof_per_level_dict = {}
        for l in levels:
            dof_count = get_dof_count(exp_var, l)
            dof_per_level_dict[l] = dof_count
        dof_per_level_dict["avg"] = np.mean(list(dof_per_level_dict.values()))
        dof_per_task_dict[task] = dof_per_level_dict
    dof_count_dict[data_type] = dof_per_task_dict

print(json.dumps(dof_count_dict, indent=4))

In [None]:
# Extracted from Todorov's paper 
experimental_dof = {
    "pos": {
        "control": {
            "0.85": 7,
            "0.95": 10,
            "avg": 8.5
        },
        "baoding": {
            "0.85": 3,
            "0.95": 7,
            "avg": 5
        }
    },
    "vel": {
        "control": {
            "0.85": 8,
            "0.95": 12,
            "avg": 10
        },
        "baoding": {
            "0.85": 4,
            "0.95": 8,
            "avg": 6
        }
    }
}

## Comparison between the control manifolds of different tasks

In [None]:
n_comp = num_muscles
for task, pos_vel_act_dict in task_pos_vel_act_dict.items():
    muscle_act = pos_vel_act_dict["muscle_act"]
    pca = PCA(n_components=n_comp).fit(muscle_act)
    out_path = os.path.join(ROOT_DIR, "data", "pca", f"pca_muscle_act_{task}.joblib")
    joblib.dump(pca, out_path)

In [None]:
def ev(X, X_approx, model_mean):
    return 1 - np.sum((X - X_approx)**2) / np.sum((X - model_mean)**2)

def plot_explained_variance_ratio(exp_var, task_name, color, ax=None, fig=None):
    if ax is None or fig is None:
        fig, ax = plt.subplots()
    ax.step(range(1, len(exp_var) + 1), exp_var, where='mid', linewidth=3, color=color, label=task_name)
    ax.set_xlabel('Number of PCs',fontsize=21)
    ax.set_ylabel('Cum. explained variance',fontsize=21)
    plt.legend(fontsize=14,loc='best')
    ax.tick_params(axis='both', labelsize=20)
    ax.axhline(y=0.95, color='black', linestyle='--', alpha=0.5)
    ax.axhline(y=0.85, color='black', linestyle='--', alpha=0.5)
    ax.text(18, 0.9, '95%', color = 'black', fontsize=18)
    ax.text(18, 0.8, '85%', color = 'black', fontsize=18)
    return fig, ax
    
# Load the precomputed PCAs
pca_dict = {}
for task in task_pos_vel_act_dict.keys():
    pca = joblib.load(os.path.join(ROOT_DIR, "data", "pca", f"pca_muscle_act_{task}.joblib"))
    pca_dict[task] = pca

cmap = get_cmap("plasma")
task_list = ["baoding", "early_baoding", "hand_pose", "hand_reach", "pen", "reorient"]
task_colors = {
    task: cmap((idx + 1) / (len(task_list) + 2))
    for idx, task in enumerate(task_list)
}

# Plot the cumulative variance projected on the different PCA spaces
for base_task in task_list:
    fig, ax = plt.subplots()
    ax.set_title(f"Explained variance ratio for {base_task}", fontsize=21)
    for target_task, pca in pca_dict.items():
        muscle_act = task_pos_vel_act_dict[base_task]["muscle_act"]
        muscle_act_projected = pca.transform(muscle_act)
        muscle_act_approx = pca.inverse_transform(muscle_act_projected)
        exp_var = ev(muscle_act, muscle_act_approx, pca.mean_)
        exp_var_ratio_list = [exp_var]
        for i in range(1, num_muscles):
            muscle_act_projected[:, -i:] = 0
            muscle_act_approx = pca.inverse_transform(muscle_act_projected)
            exp_var = ev(muscle_act, muscle_act_approx, pca.mean_)
            exp_var_ratio_list.append(exp_var)
        exp_var_ratio_list.reverse()

        plot_explained_variance_ratio(exp_var_ratio_list, task_name=target_task, color=task_colors[target_task], ax=ax, fig=fig)
    fig.savefig(os.path.join(ROOT_DIR, "data", "figures", "rebuttal", "cum_var", f"cum_var_{base_task}_onto_all.png"), format="png", dpi=600, bbox_inches="tight")
    fig.show()


In [None]:
task_colors

In [None]:
# def plot_compare_explained_variance(exp_var_1, exp_var_2, exp_var_1_label=None, exp_var_2_label=None):
#     assert len(exp_var_1) == len(exp_var_2)
#     fig, ax = plt.subplots()
#     ax.step(range(1, len(exp_var_1) + 1), np.cumsum(exp_var_1), where='mid',label=exp_var_1_label, linewidth=3, color="dodgerblue")
#     ax.step(range(1, len(exp_var_1) + 1), np.cumsum(exp_var_2), where='mid',label=exp_var_2_label, linewidth=3, color="orangered")
#     ax.set_xlabel('Number of PCs',fontsize=21)
#     ax.set_ylabel('Cum. explained variance',fontsize=21)
#     plt.legend(fontsize=21,loc='best')
#     ax.tick_params(axis='both', labelsize=20)
#     ax.axhline(y=0.95, color='black', linestyle='--', alpha=0.5)
#     ax.axhline(y=0.85, color='black', linestyle='--', alpha=0.5)
#     ax.text(18, 0.9, '95%', color = 'black', fontsize=18)
#     ax.text(18, 0.8, '85%', color = 'black', fontsize=18)
#     return fig, ax
    
# fig, ax = plot_compare_explained_variance(exp_var_pos_baoding, exp_var_pos_control, "Baoding", "Control (hand pose)")
# # fig.savefig(os.path.join(ROOT_DIR, "data", "figures", "panel_1", "pca_pos.png"), format="png", dpi=600, bbox_inches="tight")
# fig.show()

# fig, ax = plot_compare_explained_variance(exp_var_vel_baoding, exp_var_vel_control, "Baoding", "Control (hand pose)")
# # fig.savefig(os.path.join(ROOT_DIR, "data", "figures", "panel_1", "pca_vel.png"), format="png", dpi=600, bbox_inches="tight")

# fig, ax = plot_compare_explained_variance(exp_var_muscle_baoding, exp_var_muscle_control, "Baoding", "Control (hand pose)")
# # fig.savefig(os.path.join(ROOT_DIR, "data", "figures", "panel_1", "pca_muscle_act.png"), format="png", dpi=600, bbox_inches="tight")


Interestingly, while the poses are embedded in a lower dimensional space for the baoding balls task, this is not the case for the muscle activations. In fact, we can hypotesize that the presence of objects and variable environment conditions forces the policy to be more robust, thus preventing the emergence of too stereotypical muscle activations