# Notebook for Running Biomechanics Simulations with Reboot Motion Data

__[CoLab Notebook Link](https://githubtocolab.com/RebootMotion/reboot-toolkit/blob/main/examples_biomechanics/RebootMotionBiomechanicsSimulation.ipynb)__

Run the cells in order, making sure to enter AWS credentials in the cell when prompted

In [None]:
#@title Install Python Package

!pip install git+https://github.com/RebootMotion/reboot-toolkit.git@v2.10.3#egg=reboot_toolkit > /dev/null
!echo "Done Installing Reboot Toolkit"

In [None]:
#@title Install MuJoCo Viewer

!pip install -q mjc_viewer
print("Done installing Mujoco Viewer")

In [None]:
#@title Import Python Libraries

import IPython
import matplotlib.pyplot as plt
import mujoco
import numpy as np
import os
import pandas as pd
from random import choice

from IPython.display import display
from tqdm import tqdm

import reboot_toolkit as rtk

from reboot_toolkit import S3Metadata, MocapType, MovementType, Handedness, FileType, PlayerMetadata

In [None]:
#@title AWS Credentials

# Upload your Organization's .env file to the local file system, per https://pypi.org/project/python-dotenv/
# OR input your credentials string generated by the Reboot Dashboard

boto3_session = rtk.setup_aws()

In [None]:
#@title User Input - No code changes required below this section, just enter information in forms

# Update the below info to match your desired analysis information
# Common changes you might want to make:

# To analyze both Hawk-Eye HFR data from the Stats API,
# and also Hawk-Eye Action files (e.g. from the DSP),
#  set mocap_types=[MocapType.HAWKEYE_HFR, MocapType.HAWKEYE]

# To analyze baseball-hitting,
# set movement_type=MovementType.BASEBALL_HITTING

# To analyze right-handed players,
# set handedness=Handedness.RIGHT

# To analyze data from the momentum and energy files,
# set file_type=FileType.MOMENTUM_ENERGY

# See https://docs.rebootmotion.com/ for all available file types and the data in each
mocap_types = [MocapType.HAWKEYE_HFR, MocapType.HAWKEYE]
movement_type = MovementType.BASEBALL_PITCHING
handedness = Handedness.RIGHT
file_type = FileType.INVERSE_KINEMATICS

# Update the label to whatever you'd like to be displayed in the visuals
primary_segment_label = 'Primary Segment'
comparison_segment_label = 'Comparison Segment'

# Use this bool to add columns of data, like pitch_type and start_speed, from the stats API
add_stats_api = False  # True or False

if add_stats_api:
    print("Will add data from the Stats API like velo and pitch type")
    
else:
    print("Will NOT add data from the Stats API like velo and pitch type (set to True above if needed)")

In [None]:
#@title The Function for Dynamically Simulating a Single Play with MuJoCo

from mjc_viewer import Serializer, Trajectory

def mujoco_sim(mj_model, positions_df, dom_hand, do_render=False):
    
    # set the suffix to be used for all resulting simulation columns
    col_suffix = "invdyn"
    
    # create the mujoco data element for the simulation
    mj_data = mujoco.MjData(mj_model)
    
    # for lefties, revert the post-processing negation that was done based on handedness
    if dom_hand.lower().startswith('l'):
        positions_df['pelvis_rot'] = -(positions_df['pelvis_rot'] - 180.)
        positions_df['pelvis_side'] = -positions_df['pelvis_side']

        positions_df['torso_rot'] = -positions_df['torso_rot']
        positions_df['torso_side'] = -positions_df['torso_side']
    
    # set the simulation time step as the median time step in the play (all should be uniform)
    dt = positions_df['time'].diff().median()
    mj_model.opt.timestep = dt
    
    # reorder the IK data frame to make it easier to set the values in the simulation
    positions_df = rtk.reorder_joint_angle_df_like_model(mj_model, positions_df.copy())
    angle_cols = [
        col for col in list(positions_df) if not col.endswith('translation')
    ]
    positions_df[angle_cols] = positions_df[angle_cols].apply(np.radians)
    
    # compute the gradient of the positions and velocities
    # (the dynamic state of the model includes both positions and velocities)
    velocities_df = positions_df.copy().apply(np.gradient, raw=True) / dt
    accel_df = velocities_df.copy().apply(np.gradient, raw=True) / dt
    
    sim_results = []

    if do_render:
        # create a Serializer and Trajectory instance
        serializer = Serializer(mj_model)
        trajectory = Trajectory(mj_data)

    else:
        trajectory = None
        serializer = None
    
    # Below is the most straightforward simulation loop with MuJoCo using "mj_step"
    # Here is documentation for the general simulation framework:
    # https://mujoco.readthedocs.io/en/latest/programming/simulation.html
    for i, row_pos in positions_df.iterrows():
        
        row_vel = velocities_df.iloc[i]
        row_acc = accel_df.iloc[i]
        
        mj_data.qpos = row_pos.to_numpy()
        
        mj_data.qvel = row_vel.to_numpy()

        mj_data.qacc - row_acc.to_numpy()
        
        mujoco.mj_step(mj_model, mj_data)
        
        mujoco.mj_inverse(mj_model, mj_data)

        if trajectory is not None:
            # store results in trajectory instance
            trajectory.step()
        
        # here you can save any simulation values you'd like...
        # see here for options: https://mujoco.readthedocs.io/en/stable/APIreference/APItypes.html#mjdata
        sim_results.append(mj_data.qfrc_inverse.copy())
        
    sim_results_df = pd.DataFrame(
        data=sim_results, columns=[f"{jn}_{col_suffix}" for jn in list(positions_df)]
    )
    
    if trajectory is not None:
        return sim_results_df, serializer, trajectory

    return sim_results_df

In [None]:
#@title The Function for Running a Biomechanics Simulation for Each Play in a Data Segment

def simulate_segment_df(model_xml_str, segment_data_df, dom_hand):
    
    model = mujoco.MjModel.from_xml_string(model_xml_str)

    sim_dfs = []
    
    print('Running a simulation for each play in the data segment...')
    for org_movement_id in tqdm(segment_data_df['org_movement_id'].unique()):
        
        ik_df = segment_data_df.loc[
            segment_data_df['org_movement_id'] == org_movement_id
        ].copy().reset_index(drop=True)
        
        current_sim_df = mujoco_sim(model, ik_df, dom_hand)
        
        sim_dfs.append(current_sim_df)
    
    sim_df = pd.concat(sim_dfs, ignore_index=True)
    
    return pd.concat([segment_data_df, sim_df], axis=1)

In [None]:
#@title Set S3 File Info

s3_metadata = S3Metadata(
    org_id=os.environ['ORG_ID'],
    mocap_types=mocap_types,
    movement_type=movement_type,
    handedness=handedness,
    file_type=file_type,
)

s3_df = rtk.download_s3_summary_df(s3_metadata)

In [None]:
#@title Optional Look Up Player ID by Name

name_to_look_up = "Jacob deGrom"

rtk.find_player_matches(s3_df, name_to_look_up, match_threshold=50., max_results=5)

In [None]:
#@title Display the Interface for Selecting the Primary Data Segment to Analyze

# Run this cell to display the dropdown menus and reset all options to NULL
primary_segment_widget = rtk.create_interactive_widget(s3_df)
display(primary_segment_widget)

In [None]:
#@title Set Primary Analysis Segment Info

primary_segment_data = primary_segment_widget.children[1].result
primary_analysis_segment = PlayerMetadata(
    org_player_ids=primary_segment_data["org_player_ids"],
    session_dates=primary_segment_data["session_dates"],
    session_nums=primary_segment_data["session_nums"],
    session_date_start=primary_segment_data["session_date_start"],
    session_date_end=primary_segment_data["session_date_end"],
    year=primary_segment_data["year"],
    org_movement_id=None, # set the play GUID for the skeleton animation; None defaults to the first play
    s3_metadata=s3_metadata,
)

primary_segment_summary_df = rtk.filter_s3_summary_df(primary_analysis_segment, s3_df)

primary_segment_data_df = rtk.load_games_to_df_from_s3_paths(
    primary_segment_summary_df['s3_path_delivery'].tolist(), add_ik_joints=True, add_elbow_var_val=True
)

if add_stats_api:
    print('Adding Stats API data (like pitch speed) to the data df...')
    primary_segment_data_df = rtk.decorate_primary_segment_df_with_stats_api(primary_segment_data_df)
    print("Available Pitch Types:")
    print(primary_segment_data_df['pitch_type'].unique())

# Common Issue:
# If no data files are returned here,
# check that the segment selection widget and the S3 File Info above are set correctly,
# also if the cells were updated after running once, check that they were run again with any new selections.

In [None]:
#@title Optional: After adding the Stats API data, uncomment below to filter the data

# # FILTER BY PITCH TYPES
# pitch_types = {'Four-Seam Fastball', 'Curveball'}  # list the pitch types you want to include
# primary_segment_data_df = primary_segment_data_df.loc[
#     primary_segment_data_df['pitch_type'].isin(pitch_types)
# ].copy().reset_index(drop=True)

# # FILTER BY A VELO RANGE
# velo_lo = 90
# velo_hi = 100
# primary_segment_data_df = primary_segment_data_df[
#     (primary_segment_data_df["start_speed"] > velo_lo) & (primary_segment_data_df["start_speed"] < velo_hi)
# ].copy().reset_index(drop=True)

# # Uncomment to print number of rows returned by filters
# print('Num available rows:', len(primary_segment_data_df))

In [None]:
#@title Run a Dynamic Simulation for Every Play in the Primary Data Segment

# Set the mass for the player in the primary segment (use kilograms)
primary_player_mass = 81.6

# Retrieve the scaled model (both in length and mass) from AWS
primary_model_xml_str = rtk.scale_human_xml(
    primary_segment_data_df, primary_player_mass, movement_type, boto3_session=boto3_session
)

# Simulate all the plays in the primary segment with the retrieved model
primary_sim_data_df = simulate_segment_df(primary_model_xml_str, primary_segment_data_df, handedness.value)

primary_segment_dict = rtk.load_data_into_analysis_dict(
    primary_analysis_segment, primary_sim_data_df, segment_label=primary_segment_label
)

In [None]:
#@title Visualize the Simulation for a Specific Play

#Set a name for the simulation html report
simulation_name = "Simulation.html"

# Either set a specific play ID, or simulate a random play ID
# play_id_of_interest = 'xxxxx-xxxxx-xxxxxx-xxxxxx'
play_id_of_interest = choice(primary_segment_data_df['org_movement_id'])

# Select the IK data for the specific play
ik_df_of_interest = primary_segment_data_df.loc[
    primary_segment_data_df['org_movement_id'] == play_id_of_interest
].copy().reset_index(drop=True)

primary_model = mujoco.MjModel.from_xml_string(primary_model_xml_str)

results_df, primary_serializer, primary_trajectory = mujoco_sim(
    primary_model, ik_df_of_interest, handedness.value, do_render=True
)

# Create the simulation report
simulation_html = primary_serializer.render(primary_trajectory)

# Save report to the local file system
with open(simulation_name, "w") as f:
    f.write(simulation_html)

# Display the simulation in the notebook
IPython.display.HTML(simulation_html)

In [None]:
#@title Display the Interface for Selecting the Comparison Data Segment to Analyze

comparison_segment_widget = rtk.create_interactive_widget(s3_df)
display(comparison_segment_widget)

In [None]:
#@title Set Comparison Analysis Segment Inputs

comparison_s3_metadata = s3_metadata
comparison_segment_data = comparison_segment_widget.children[1].result

comparison_analysis_segment = PlayerMetadata(
    org_player_ids=comparison_segment_data["org_player_ids"],
    session_dates=comparison_segment_data["session_dates"],
    session_nums=comparison_segment_data["session_nums"],
    session_date_start=comparison_segment_data["session_date_start"],
    session_date_end=comparison_segment_data["session_date_end"],
    year=comparison_segment_data["year"],
    org_movement_id=None, # set the play GUID for the skeleton animation; None defaults to the first play
    s3_metadata=comparison_s3_metadata,
)

comparison_segment_summary_df = rtk.filter_s3_summary_df(comparison_analysis_segment, s3_df)

comparison_segment_data_df = rtk.load_games_to_df_from_s3_paths(
    comparison_segment_summary_df['s3_path_delivery'].tolist(), add_ik_joints=True, add_elbow_var_val=True
)

if add_stats_api:
    print('Adding Stats API data (like pitch speed) to the data df...')
    comparison_segment_data_df = rtk.decorate_primary_segment_df_with_stats_api(comparison_segment_data_df)
    print("Available Pitch Types:")
    print(comparison_segment_data_df['pitch_type'].unique())

In [None]:
#@title Optional: After adding the Stats API data, uncomment below to filter the data

# # FILTER BY PITCH TYPES
# pitch_types = {'Four-Seam Fastball', 'Curveball'}  # list the pitch types you want to include
# comparison_segment_data_df = comparison_segment_data_df.loc[
#     comparison_segment_data_df['pitch_type'].isin(pitch_types)
# ].copy().reset_index(drop=True)

# # FILTER BY A VELO RANGE
# velo_lo = 90
# velo_hi = 100
# comparison_segment_data_df = comparison_segment_data_df[
#     (comparison_segment_data_df["start_speed"] >= velo_lo) & (comparison_segment_data_df["start_speed"] <= velo_hi)
# ].copy().reset_index(drop=True)

# # Uncomment to print number of rows returned by filters
# print('Num available rows:', len(comparison_segment_data_df))

In [None]:
#@title Run a Dynamic Simulation for Every Play in the Comparison Data Segment

# Set the mass for the player in the comparison segment (use kilograms)
comparison_player_mass = 81.6

comparison_model_xml_str = rtk.scale_human_xml(
    comparison_segment_data_df, comparison_player_mass, movement_type, boto3_session=boto3_session
)

comparison_sim_data_df = simulate_segment_df(comparison_model_xml_str, comparison_segment_data_df, handedness.value)

comparison_segment_dict = rtk.load_data_into_analysis_dict(
    comparison_analysis_segment, comparison_sim_data_df, segment_label=comparison_segment_label
)

In [None]:
#@title Optional - Create Simple Comparison Plots

# If you only ran the primary segment data, 
# uncomment here to run the below code with just primary segment dict
# analysis_dicts = [primary_segment_dict]
analysis_dicts = [primary_segment_dict, comparison_segment_dict]

# Available time options for the x_column include: 'time_from_max_hand', 'norm_time', 'rel_frame', 'time'
x_column = 'time_from_max_hand'
plot_x_domain = [-0.2, 0.05]

# Available inverse dynamics variables include one force / torque for each model joint 
# (including a static elbow var val joint). For linear joints that allow translation rather than rotation, 
# for example the x, y and z translation of the pelvis relative to the world, the output will be force in units of Newtons.
# For joints that allow rotation, for example shoulder internal / external rotation, 
# the output will be torque in units of Newton-meters. Available joints are here: https://docs.rebootmotion.com/definitions#inverse-kinematics

# Example rotational joints where the output is torque...
y_columns = ['right_shoulder_rot_invdyn', 'right_elbow_var_invdyn', 'right_elbow_invdyn']

# Example linear joints where the output is force...
# Note that because these joints connect the pelvis to the world, 
# these outputs represent the net ground reaction force on the skeleton applied to the center of the pelvis
y_columns = y_columns + ['x_translation_invdyn', 'y_translation_invdyn', 'z_translation_invdyn']

# Set the y-axis label to whatever is appropriate for the y-columns above
y_axis_label = "Torque (Nm) or Force (N)"

plot_y_range_torque = [-150, 50]
plot_y_range_force = [-1000, 500]

# Update to the number of standard deviations you want to shade in the plot relative to the mean
stand_devs_to_shade = 1.0

mpl_figs = []

for y_column in y_columns:

    mpl_fig = plt.figure()

    for segment_dict in analysis_dicts:

        y = segment_dict['df_mean'][y_column]

        y_lo = segment_dict['df_mean'][y_column] - (stand_devs_to_shade * segment_dict['df_std'][y_column])
        y_hi = segment_dict['df_mean'][y_column] + (stand_devs_to_shade * segment_dict['df_std'][y_column])

        plt.fill_between(segment_dict['df_mean'][x_column], y_lo, y_hi, alpha=0.4)

        plt.plot(segment_dict['df_mean'][x_column], y, label=segment_dict['segment_label'])

    plt.xlabel(x_column)
    plt.xlim(plot_x_domain)
    
    plt.ylabel(y_axis_label)
    if 'translation' in y_column:
        plt.ylim(plot_y_range_force)
        
    else:
        plt.ylim(plot_y_range_torque)
    
    plt.title(y_column)

    plt.legend()

    plt.grid()

    plt.show()

    mpl_figs.append(mpl_fig)

In [None]:
#@title Save Plots to a PDF

from matplotlib.backends.backend_pdf import PdfPages

pdf_file_name = 'analysis.pdf'

pdf_analysis = PdfPages(pdf_file_name)

for mpl_fig in mpl_figs:

    pdf_analysis.savefig(mpl_fig)

pdf_analysis.close()

print('Saved plots to', pdf_file_name)

In [None]:
#@title Save Maxes and Mins to CSV

# Save max values and min values to a CSV for the same y_columns explored above
# To save all columns, use the "all" option below, or create your own list
cols_to_save_in_csv = y_columns
# cols_to_save_in_csv = "all"
# cols_to_save_in_csv = ['y_translation_invdyn']

# Note that the same x domain used in the plots above
# will limit the range of data used here for finding the maxes and mins
for analysis_dict in analysis_dicts:
    if cols_to_save_in_csv == "all":
        print('Saving all columns for', analysis_dict['segment_label'])
        desired_cols = list(analysis_dict['df_mean'])
    
    else:
        desired_cols = cols_to_save_in_csv

    df_mean = analysis_dict['df_mean'].loc[
        (analysis_dict['df_mean'][x_column] >= plot_x_domain[0]) 
        & (analysis_dict['df_mean'][x_column] <= plot_x_domain[-1])
    ][desired_cols]
  
    df_mean.max().to_csv(f"{analysis_dict['segment_label'].replace(' ', '_')}_maxes.csv")
    df_mean.min().to_csv(f"{analysis_dict['segment_label'].replace(' ', '_')}_mins.csv")

    print('Done saving for', analysis_dict['segment_label'], '- check the local file system!')