# Imports

In [3]:
import msk_modelling_python as msk
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import os
import opensim as osim
from xml.etree import ElementTree as ET
import c3d
import shutil
import subprocess
import sys
from IPython.display import clear_output
import xml.dom.minidom 
import scipy.signal as sig
import neurokit2 as nk

current_dir = os.getcwd()
print(current_dir)
one_dir_up = os.path.dirname(current_dir)
print(one_dir_up)

c:\Git\opensim_tutorial\tutorials\repeated_sprinting\code
c:\Git\opensim_tutorial\tutorials\repeated_sprinting


# Rename files

In [None]:
folder = r"C:\Git\opensim_tutorial\tutorials\repeated_sprinting\Simulations\PC013\trial3_r1\Results_SO_and_MA"
file_mapping = {
    '_MuscleAnalysis': 'MuscleAnalysis'
}
# loop through all files in the folder and subfolders
for root, dirs, files in os.walk(folder):
    for file in files:
        if any(substring in file for substring in file_mapping.keys()):
            print(file)
            new_file = file.replace(list(file_mapping.keys())[0], list(file_mapping.values())[0])
            print(os.path.join(root, file))
            
            os.rename(os.path.join(root, file), os.path.join(root, new_file))
            print(f"Renamed {file} to {new_file}")

# Create classes and functions

In [12]:

class File:
    def __init__(self, path):
        self.path = path
        self.name = os.path.basename(path)
        self.extension = os.path.splitext(path)[1]
        
        if not os.path.isfile(path):
            print(f"\033[93mFile not found: {path}\033[0m")
            return        
        
        try:
            endheader_line = self.find_file_endheader_line(path)
        except:
            print(f"Error finding endheader line for file: {path}")
            endheader_line = 0
        # Read file based on extension
        try:
            if self.extension == '.csv':
                self.data = msk.pd.read_csv(path)
            elif self.extension == '.json':
                self.data = msk.bops.import_json_file(path)
            elif self.extension == '.xml':
                self.data = msk.bops.XMLTools.load(path)
            else:
                try:
                    self.data = msk.pd.read_csv(path, sep="\t", skiprows=endheader_line)
                except:
                    self.data = None
                    
            # add time range for the data
            try:
                self.time_range = [self.data['time'].iloc[0], self.data['time'].iloc[-1]]
                try:
                    self.time_range = [self.data['Time'].iloc[0], self.data['Time'].iloc[-1]]
                except:
                    pass
            except:
                self.time_range = None
        
        except Exception as e:
            print(f"Error reading file: {path}")
            print(e)
            self.data = None
            self.time_range = None
    def find_file_endheader_line(self, path):
        with open(path, 'r') as file:
            for i, line in enumerate(file):
                if 'endheader' in line:
                    return i + 1
        return 0    
    
class Trial:
    '''
    Class to store trial information and file paths, and export files to OpenSim format
    
    Inputs: trial_path (str) - path to the trial folder
    
    Attributes:
    path (str) - path to the trial folder
    name (str) - name of the trial folder
    og_c3d (str) - path to the original c3d file
    c3d (str) - path to the c3d file in the trial folder
    markers (str) - path to the marker trc file
    grf (str) - path to the ground reaction force mot file
    ...
    
    Methods: use dir(Trial) to see all methods
    
    '''
    def __init__(self, trial_path):        
        self.path = trial_path
        self.name = os.path.basename(self.path)
        self.subject = os.path.basename(os.path.dirname(self.path))
        self.c3d = os.path.join(os.path.dirname(self.path), self.name + '.c3d')
        self.markers = File(os.path.join(self.path,'markers_experimental.trc'))
        self.grf = File(os.path.join(self.path,'Visual3d_SIMM_grf.mot'))
        self.emg_csv = File(os.path.join(self.path,'processed_emg_signals.csv'))
        self.emg = File(os.path.join(self.path,'processed_emg_signals.mot'))
        self.ik = File(os.path.join(self.path,'Visual3d_SIMM_input.mot'))
        self.id = File(os.path.join(self.path,'inverse_dynamics.sto'))
        self.so_force = File(os.path.join(self.path,'Results_SO_and_MA', f'{self.subject}_StaticOptimization_force.sto'))
        self.so_activation = File(os.path.join(self.path, 'Results_SO_and_MA', f'{self.subject}_StaticOptimization_activation.sto'))
        self.jra = File(os.path.join(self.path,'joint_reacton_loads.sto'))
        
        # load muscle analysis files
        self.ma_targets = ['_MomentArm_', '_Length.sto']
        self.ma_files = []
        try:
            files = os.listdir(os.path.join(self.path, 'Results_SO_and_MA'))
            for file in files:
                if file.__contains__(self.ma_targets[0]) or file.__contains__(self.ma_targets[1]):
                    self.ma_files.append(File(os.path.join(self.path, 'Results_SO_and_MA', file)))
        except:
            self.ma_files = None
                    
        # settings files
        self.grf_xml = File(os.path.join(self.path,'GRF_Setup.xml'))
        self.actuators_so = File(os.path.join(self.path,'actuators_SO.xml'))
        
        self.settings_json = File(os.path.join(self.path,'settings.json'))
                             
    def check_files(self):
        '''
        Output: True if all files exist, False if any file is missing
        '''
        files = self.__dict__.values()
        all_files_exist = True
        for file in files:
            try:
                if not os.path.isfile(file):
                    print('File not found: ' + file)
                    all_files_exist = False
            except:
                pass
        return all_files_exist
    
    def header_mot(self,df,name):

            num_rows = len(df)
            num_cols = len(df.columns) 
            inital_time = df['Time'].iloc[0]
            final_time = df['Time'].iloc[-1]
            df_range = f'{inital_time}  {final_time}'


            return f'name {name}\n datacolumns {num_cols}\n datarows {num_rows}\n range {df_range} \n endheader'
        
    def csv_to_mot(self):
        
        emg_data = msk.bops.pd.read_csv(self.emg_csv)

        fs = int(1/(emg_data['time'][1] - emg_data['time'][0]))

        time = emg_data['time']

        # start time from new time point
        start_time = time.iloc[0]
        end_time = time.iloc[-1] - time.iloc[0] + start_time

        num_samples = len(emg_data)
        #num_samples = int((end_time - start_time) / (1/fs))
        new_time = np.linspace(start_time, end_time, num_samples)

        emg_data['time'] = new_time

        # Define a new file path 
        new_file_path = os.path.join(self.emg_csv.replace('.csv', '.mot'))

        # Save the modified DataFrame
        emg_data.to_csv(new_file_path, index=False)  # index=False prevents adding an extra index column

        # save to mot
        header = self.header_mot(emg_data, "processed_emg_signals")

        mot_path = new_file_path.replace('.csv','.mot')
        with open(mot_path, 'w') as f:
            f.write(header + '\n')  
            # print column names 
            f.write('\t'.join(map(str, emg_data.columns)) + '\n')
            for index, row in emg_data.iterrows():
                f.write('\t'.join(map(str, row.values)) + '\n')  
        
        print(f"File saved: {mot_path}")

    def create_settings_json(self, overwrite=False):
        if os.path.isfile(self.settings_json) and not overwrite:
            print('settings.json already exists')
            return
        
        settings_dict = self.__dict__
        msk.bops.save_json_file(settings_dict, self.settings_json)
        print('trial settings.json created in ' + self.path)
    
    def exportC3D(self):
        msk.bops.c3d_osim_export(self.og_c3d) 

    def change_grf_xml_path(self):

        try:
            self.tree = ET.parse(self.grf_xml.path)
            self.root = self.tree.getroot()
            self.root.find('.//datafile').text = self.grf.path
            
            self.tree.write(self.grf_xml.path)
            
            print(f"GRF file path updated in {self.grf_xml.path}")
        except Exception as e:
            print(f"Error loading XML file: {e}")
            return None

    def save_json_file(self, data, jsonFilePath):
        data = data.__dict__

        with open(jsonFilePath, 'w') as f:
            msk.bops.json.dump(data, f, indent=4)

        json_data = msk.bops.import_json_file(jsonFilePath)
        return json_data
    
    def to_json(self):
        msk.bops.save_json_file(self.__dict__, jsonFilePath = self.settings_json)
        print('settings.json created in ' + self.settings_json)
    
    def run_IK(osim_modelPath, trc_file, resultsDir):
        '''
        Function to run Inverse Kinematics using the OpenSim API.
        
        Inputs:
                osim_modelPath(str): path to the OpenSim model file
                trc_file(str): path to the TRC file
                resultsDir(str): path to the directory where the results will be saved
        '''

        # Load the TRC file
        import pdb; pdb.set_trace()
        tuple_data = msk.bops.import_trc_file(trc_file)
        df = pd.DataFrame.from_records(tuple_data, columns=[x[0] for x in tuple_data])
        column_names = [x[0] for x in tuple_data]
        if len(set(column_names)) != len(column_names):
            print("Error: Duplicate column names found.")
        # Load the model
        osimModel = osim.Model(osim_modelPath)                              
        state = osimModel.initSystem()

        # Define the time range for the analysis
        
        initialTime = msk.TRCData.getIndependentColumn()
        finalTime = msk.TRCData.getLastTime()

        # Create the inverse kinematics tool
        ikTool = osim.InverseKinematicsTool()
        ikTool.setModel(osimModel)
        ikTool.setStartTime(initialTime)
        ikTool.setEndTime(finalTime)
        ikTool.setMarkerDataFileName(trc_file)
        ikTool.setResultsDir(resultsDir)
        ikTool.set_accuracy(1e-6)
        ikTool.setOutputMotionFileName(os.path.join(resultsDir, "ik.mot"))

        # print setup
        ikTool.printToXML(os.path.join(resultsDir, "ik_setup.xml"))         

        # Run inverse kinematics
        print("running ik...")                                             
        ikTool.run()

    def run_inverse_kinematics(model_file, marker_file, output_motion_file):
        # Load model and create an InverseKinematicsTool
        model = osim.Model(model_file)
        ik_tool = osim.InverseKinematicsTool()

        # Set the model for the InverseKinematicsTool
        ik_tool.setModel(model)

        # Set the marker data file for the InverseKinematicsTool
        ik_tool.setMarkerDataFileName(marker_file)

        # Specify output motion file
        ik_tool.setOutputMotionFileName(output_motion_file)

        # Save setup file
        ik_tool.printToXML('setup_ik.xml')

        # Run Inverse Kinematics
        ik_tool.run()

    def run_ID(self, osim_modelPath, coordinates_file, external_loads_file, output_file, LowpassCutoffFrequency=6, run_tool=True):
        
        try: 
            model = osim.Model(osim_modelPath)
        except Exception as e:
            print(f"Error loading model: {osim_modelPath}")
            print(e)
            return
        
        results_folder = os.path.dirname(output_file)
        
        # Setup for excluding muscles from ID
        exclude = osim.ArrayStr()
        exclude.append("Muscles")
        # Setup for setting time range
        IKData = osim.Storage(coordinates_file)

        # Create inverse dynamics tool, set parameters and run
        id_tool = osim.InverseDynamicsTool()
        id_tool.setModel(model)
        id_tool.setCoordinatesFileName(coordinates_file)
        id_tool.setExternalLoadsFileName(external_loads_file)
        id_tool.setOutputGenForceFileName(output_file)
        id_tool.setLowpassCutoffFrequency(LowpassCutoffFrequency)
        id_tool.setStartTime(IKData.getFirstTime())
        id_tool.setEndTime(IKData.getLastTime())
        id_tool.setExcludedForces(exclude)
        id_tool.setResultsDir(results_folder)
        id_tool.printToXML(os.path.join(results_folder, "setup_ID.xml"))
        
        if run_tool:
            id_tool.run()
    
    def export_analog(self, c3dFilePath=None):
        if not c3dFilePath:
            print('C3D file path not provided')
            return
        
        reader = c3d.Reader(open(c3dFilePath, 'rb'))

        # get analog labels, trimmed and replace '.' with '_'
        analog_labels = reader.analog_labels
        analog_labels = [label.strip() for label in analog_labels]
        analog_labels = [label.replace('.', '_') for label in analog_labels]

        # get analog labels, trimmed and replace '.' with '_'
        fs = reader.analog_rate

        # add time to dataframe
        first_frame = reader.first_frame / fs
        final_time = (reader.first_frame + reader.frame_count-1) / fs
        time = msk.np.arange(first_frame / fs, final_time, 1 / fs)  
        num_frames = len(time)
        df = msk.pd.DataFrame(index=range(num_frames),columns=analog_labels)
        df['time'] = time

        # move time to first column
        cols = df.columns.tolist()
        cols = cols[-1:] + cols[:-1]
        df = df[cols] 

        # loop through frames and add analog data to dataframe
        for i_frame, points, analog in reader.read_frames():
            
            # get row number and print loading bar
            i_row = i_frame - reader.first_frame
            # msk.ut.print_loading_bar(i_row/num_frames)
            
            # convert analog data to list
            analog_list  = analog.data.tolist()
            
            # loop through analog channels and add to dataframe
            for i_channel in range(len(analog_list)):
                channel_name = analog_labels[i_channel]
                
                # add channel to dataframe
                df.loc[i_row, channel_name] = analog[i_channel][0]
                
        # save emg data to csv
        df.to_csv(self.emg_csv)
        
        # save to mot
        #self.csv_to_mot()
    
class openSim:
    def __init__(self, legs = ['r', 'l'], subjects =['PC002','PC003','PC006', 'PC013', 'TD006', 'TD013', 'TD017', 'TD021', 'TD023', 'TD026'], trials_to_load = ['trial1','trial2','trial3','normal1', 'normal2', 'normal3', 'crouch1', 'crouch2', 'crouch3'], trial_number = 1):
        try:
            self.code_path = os.path.dirname(__file__)
        except:
            self.code_path = os.getcwd()
        
        self.simulations_path = os.path.join(os.path.dirname(self.code_path), 'Simulations')
        self.subjects = {}
        
        for subject in subjects:
            self.subjects[subject] = {}
            self.subjects[subject]['model'] = os.path.join(self.simulations_path, subject, subject + '_scaled.osim')
            for leg in legs: 
                for trial in trials_to_load:            
                    self.trial_path = os.path.join(self.simulations_path, subject, f'{trial}_{leg}{trial_number}')
                    try:
                        trial = Trial(self.trial_path)
                        self.subjects[subject][trial.name] = trial 
                    except Exception as e:
                        self.subjects[subject][trial] =  None
                        # print(f"Error loading trial: {self.trial_path}")
                        # print(e)
            
        
        self.ik_columns = ["hip_flexion_leg", "hip_adduction_leg", "hip_rotation_leg", "knee_angle_leg", "ankle_angle_leg"]
        self.id_columns = ["hip_flexion_leg" + "_moment", "hip_adduction_leg" + "_moment", "hip_rotation_leg" + "_moment", "knee_angle_leg" + "_moment", "ankle_angle_leg" + "_moment"]
        self.force_columns = ["add_long_leg", "rect_fem_leg", "med_gas_leg", "semiten_leg","tib_ant_leg"]


        self.titles = ["Hip Flexion", "Hip Adduction", "Hip Rotation", "Knee Flexion", "Ankle Plantarflexion"]
        self.titles_muscles = ["Adductor Longus", "Rectus Femoris", "Medial Gastrocnemius", "Semitendinosus", "Tibialis Anterior"]

    # Time Normalisation Function 
    def time_normalised_df(self, df, fs=None):
        if not isinstance(df, msk.pd.DataFrame):
            raise Exception('Input must be a pandas DataFrame')
        
        if not fs:
            try:
                fs = 1 / (df['time'][1] - df['time'][0])  # Ensure correct time column
            except KeyError:
                raise Exception('Input DataFrame must contain a column named "time"')
            
        normalised_df = msk.pd.DataFrame(columns=df.columns)

        for column in df.columns:
            if column == 'time':  # Skip time column
                continue	
            normalised_df[column] = msk.np.zeros(101)

            currentData = df[column].dropna()  # Remove NaN values

            timeTrial = msk.np.linspace(0, len(currentData) / fs, len(currentData))  # Original time points
            Tnorm = msk.np.linspace(0, timeTrial[-1], 101)  # Normalize to 101 points

            normalised_df[column] = msk.np.interp(Tnorm, timeTrial, currentData)  # Interpolate

        return normalised_df

    def plot_single_trial(self, show = False):
        #Read .mot files
        with open(self.mot_file, "r") as file:
            lines = file.readlines()

        # Find the line where actual data starts (usually after 'endheader')
        for i, line in enumerate(lines):
            if "endheader" in line:
                start_row = i + 1  # Data starts after this line
                break
        else:
            start_row = 0  # If 'endheader' is not found, assume no header

        # Load data using Pandas
        self.df_ik = msk.pd.read_csv(self.mot_file, delim_whitespace=True, start_row=start_row)
        self.df_id = msk.pd.read_csv(self.id_file, sep="\t", start_row=6)
        self.df_force = msk.pd.read_csv(self.force_file, sep="\t", start_row=14)

        # Apply normalisation to both IK (angles) and ID (moments) data
        self.df_ik_normalized = self.time_normalised_df(df=self.df_ik)
        self.df_id_normalized = self.time_normalised_df(df=self.df_id)
        self.df_force_normalized = self.time_normalised_df(df=self.df_force)

        # Ensure time is normalized to 101 points
        time_normalized = msk.np.linspace(0, 100, 101)  
 
        # select the specified columns         
        self.ik_data = self.df_ik_normalized[self.ik_columns]
        self.id_data = self.df_id_normalized[self.id_columns]
        self.force_data = self.df_force_normalized[self.force_columns]
            
        # Define the layout 
        fig, axes = plt.subplots(2, 5, figsize=(15, 4)) 

        #Plot IK (angles)
        for i, col in enumerate(self.ik_columns):
            ax = axes[0,i]
            ax.plot(time_normalized, self.ik_data[col], color='red')  # Main curve
            ax.set_title(self.titles[i])
            if i == 0:
                ax.set_ylabel("Angle (deg)")
            ax.grid(True)

        #Plot ID (moments)
        for i, col in enumerate(self.id_columns):
            ax = axes[1,i]
            ax.plot(time_normalized, self.id_data[col], color='blue')  # Main curve
            ax.set_title(self.titles[i])
            if i == 0:
                ax.set_ylabel("Moment (Nm)")
            ax.set_xlabel("% Gait Cycle")
            ax.grid(True)

        plt.tight_layout()


        # PLOT MUSCLE FORCES 
        fig, axes = plt.subplots(nrows=1, ncols=5, figsize=(15, 4), sharex=True)

        for i, col in enumerate(self.force_columns):
            ax = axes[i]
            ax.plot(time_normalized, self.force_data[col], color='green')
            ax.set_title(self.titles_muscles[i])
            if i == 0:
                ax.set_ylabel("Force (N)")
            ax.set_xlabel("% Gait Cycle")
            ax.grid(True)

        plt.tight_layout()
        
        if show:
            plt.show()

    def plot_multiple_trials(self, show=False):
        self.df_ik_list = []  # Store loaded DataFrames
        
        for subject in self.subjects:
            for trial in self.subjects[subject]:
                trial_obj = self.subjects[subject][trial]
                if trial_obj:
                    self.df_ik_list.append(trial_obj.ik.data)
                    
        for file in self.mot_files:  # Loop through each file
            with open(file, "r") as f:
                lines = f.readlines()

            # Load data using Pandas
            df = msk.pd.read_csv(file, delim_whitespace=True, skiprows=5)
            self.df_ik_list.append(df)

        # Normalize all loaded IK data
        self.df_ik_normalized_list = []  # Store normalized DataFrames

        for df in self.df_ik_list:  # Loop through each loaded DataFrame
            df_normalized = self.time_normalised_df(df=df)  # Apply normalization
            self.df_ik_normalized_list.append(df_normalized)  # Store normalized DataFrame

        # Ensure time is normalized to 101 points
        time_normalized = msk.np.linspace(0, 100, 101)

        # Select the specified columns from normalized data
        self.ik_data_list = []  # Store DataFrames with only the required columns

        for df_normalized in self.df_ik_normalized_list:  # Loop through each normalized DataFrame
            if set(self.ik_columns).issubset(df_normalized.columns):  # Check if columns exist
                self.ik_data_list.append(df_normalized[self.ik_columns])  # Select only specified columns
            else:
                print("Warning: Some specified columns are missing in a file.")

        # Plot mean and sd
        # Check if IK data exists
        if not self.ik_data_list:
            print("No IK data available to plot!")
        else:
            # Convert list of DataFrames to a single NumPy array
            combined_df = np.array([df.values for df in self.ik_data_list])  # Shape: (num_trials, num_timepoints, num_columns)

            # Check if data is properly structured
            if combined_df.shape[0] < 2:
                print("Not enough trials to calculate mean and standard deviation!")
            else:
                # Compute Mean and Standard Deviation
                mean_values = np.mean(combined_df, axis=0)
                std_values = np.std(combined_df, axis=0)

                # Normalize time from 0 to 100% Gait Cycle
                time_values = np.linspace(0, 100, combined_df.shape[1])

                # Create a shared figure for all subplots
                fig, axes = plt.subplots(nrows=1, ncols=len(self.ik_columns), figsize=(20, 5), sharex=True)

                if len(self.ik_columns) == 1:
                    axes = [axes]  # If only one column, ensure it's iterable

                for i, col in enumerate(self.ik_columns):
                    ax = axes[i]

                    # Plot mean line
                    ax.plot(time_values, mean_values[:, i], color='red', label="Mean", linewidth=2)

                    # Shade the standard deviation range
                    ax.fill_between(time_values, mean_values[:, i] - std_values[:, i],
                                    mean_values[:, i] + std_values[:, i], color='red', alpha=0.2, label="SD Range")

                    # Formatting
                    ax.set_title(col)
                    ax.set_xlabel("Gait Cycle (%)")
                    ax.set_xlim(0, 100)  # X-axis from 0% to 100% of the gait cycle
                    ax.grid(True)

                    # Set Y-label only for the first subplot
                    if i == 0:
                        ax.set_ylabel("Angle (Degrees)")
                        ax.legend()


                plt.tight_layout()

                if show:
                    plt.show()

def export_c3d(c3dFilePath):
    analog_file_path = os.path.join(os.path.dirname(c3dFilePath),'analog.csv')
    
    # if the file already exists, return the file
    if os.path.isfile(analog_file_path):
        df = msk.pd.read_csv(analog_file_path)
        return df
    
    print('Exporting analog data to csv ...')
    
    # read c3d file
    reader = c3d.Reader(open(c3dFilePath, 'rb'))

    # get analog labels, trimmed and replace '.' with '_'
    analog_labels = reader.analog_labels
    analog_labels = [label.strip() for label in analog_labels]
    analog_labels = [label.replace('.', '_') for label in analog_labels]

    # get analog labels, trimmed and replace '.' with '_'
    first_frame = reader.first_frame
    num_frames = reader.frame_count
    fs = reader.analog_rate

    # add time to dataframe
    initial_time = first_frame / fs
    final_time = (first_frame + num_frames-1) / fs
    time = np.arange(first_frame / fs, final_time, 1 / fs) 

    df = msk.pd.DataFrame(index=range(num_frames),columns=analog_labels)
    df['time'] = time
    
    # move time to first column
    cols = df.columns.tolist()
    cols = cols[-1:] + cols[:-1]
    df = df[cols]    
    
    # loop through frames and add analog data to dataframe
    for i_frame, points, analog in reader.read_frames():
        
        # get row number and print loading bar
        i_row = i_frame - reader.first_frame
        # msk.ut.print_loading_bar(i_row/num_frames)
        
        # convert analog data to list
        analog_list  = analog.data.tolist()
        
        # loop through analog channels and add to dataframe
        for i_channel in range(len(analog_list)):
            channel_name = analog_labels[i_channel]
            
            # add channel to dataframe
            df.loc[i_row, channel_name] = analog[i_channel][0]
    
    # save emg data to csv   
    df.to_csv(analog_file_path)
    print('analog.csv exported to ' + analog_file_path)  
    
    return df

def export_analog(c3dFilePath=None, columns_to_mot='all'):
    if not c3dFilePath:
        print('C3D file path not provided')
        return
    
    reader = c3d.Reader(open(c3dFilePath, 'rb'))

    # get analog labels, trimmed and replace '.' with '_'
    analog_labels = reader.analog_labels
    analog_labels = [label.strip() for label in analog_labels]
    analog_labels = [label.replace('.', '_') for label in analog_labels]
    
    # remove those not in columns_to_mot (fix: use column names to filter and get indices)
    if columns_to_mot != 'all':
        indices = [i for i, label in enumerate(analog_labels) if label in columns_to_mot]
        analog_labels = [analog_labels[i] for i in indices]
    else:
        indices = list(range(len(analog_labels)))
        columns_to_mot = analog_labels

    # get analog labels, trimmed and replace '.' with '_'
    fs = reader.analog_rate

    # add time to dataframe
    first_time = reader.first_frame / fs
    final_time = (reader.first_frame + reader.frame_count-1) / fs
    time = msk.np.arange(first_time / fs, final_time, 1 / fs)  
    num_frames = len(time)
    df = msk.pd.DataFrame(index=range(num_frames), columns=analog_labels)
    df['time'] = time

    # move time to first column
    cols = df.columns.tolist()
    cols = cols[-1:] + cols[:-1]
    df = df[cols] 

    # loop through frames and add analog data to dataframe
    for i_frame, points, analog in reader.read_frames():
        
        # get row number and print loading bar
        i_row = i_frame - reader.first_frame
        # msk.ut.print_loading_bar(i_row/num_frames)
        
        # loop through selected analog channels and add to dataframe (fix: iterate over filtered indices)
        for idx, i_channel in enumerate(indices):
            channel_name = analog_labels[idx]
            df.loc[i_row, channel_name] = analog[i_channel][0]
    
    # remove rows with NaN values
    df = df.dropna()
    
    # save emg data to csv
    analog_csv_path = c3dFilePath.replace('.c3d', '_analog.csv')
    df.to_csv(analog_csv_path, index=False)
    
    # save to mot
    # self.csv_to_mot()
    
    return analog_csv_path

def header_mot(df,name):

        num_rows = len(df)
        num_cols = len(df.columns) 
        inital_time = df['time'].iloc[0]
        final_time = df['time'].iloc[-1]
        df_range = f'{inital_time}  {final_time}'


        return f'name {name}\nnRows={num_rows}\nnColumns={num_cols}\n \nendheader'

def csv_to_mot(emg_csv):
    
    emg_data = msk.bops.pd.read_csv(emg_csv)
    
    time = emg_data['time']

    # start time from new time point
    start_time = time.iloc[0]
    end_time = time.iloc[-1]

    num_samples = len(emg_data)
    new_time = np.linspace(start_time, end_time, num_samples)

    emg_data['time'] = new_time

    # Define a new file path 
    new_file_path = os.path.join(emg_csv.replace('.csv', '.mot'))

    # Save the modified DataFrame
    emg_data.to_csv(new_file_path, index=False)  # index=False prevents adding an extra index column

    # save to mot
    header = header_mot(emg_data, "processed_emg_signals")

    mot_path = new_file_path.replace('.csv','.mot')
    with open(mot_path, 'w') as f:
        f.write(header + '\n')  
        # print column names 
        f.write('\t'.join(map(str, emg_data.columns)) + '\n')
        for index, row in emg_data.iterrows():
            f.write('\t'.join(map(str, row.values)) + '\n')  
    
    print(f"File saved: {mot_path}")
    
    return mot_path

def time_normalised_df(df, fs=None):
    if not isinstance(df, msk.pd.DataFrame):
        raise Exception('Input must be a pandas DataFrame')
    
    if not fs:
        try:
            fs = 1 / (df['time'][1] - df['time'][0])  # Ensure correct time column
        except KeyError:
            raise Exception('Input DataFrame must contain a column named "time"')
        
    normalised_df = msk.pd.DataFrame(columns=df.columns)

    for column in df.columns:
        normalised_df[column] = msk.np.zeros(101)

        currentData = df[column].dropna()  # Remove NaN values

        timeTrial = msk.np.linspace(0, len(currentData) / fs, len(currentData))  # Original time points
        Tnorm = msk.np.linspace(0, timeTrial[-1], 101)  # Normalize to 101 points

        normalised_df[column] = msk.np.interp(Tnorm, timeTrial, currentData)  # Interpolate

    return normalised_df

def save_pretty_xml(tree, save_path):
            """Saves the XML tree to a file with proper indentation."""
            # Convert to string and format with proper indents
            rough_string = ET.tostring(tree.getroot(), 'utf-8')
            reparsed = xml.dom.minidom.parseString(rough_string)
            pretty_xml = reparsed.toprettyxml(indent="   ")

            # Write to file
            with open(save_path, 'w') as file:
                file.write(pretty_xml)

def filter_emg(signal, sample_rate=1000, low_pass_cutoff=6):
    """
    Processes EMG signal: clean, rectify, and filter to get the envelope.
    """
    cleaned_signal = nk.emg_clean(signal, sampling_rate=sample_rate, method='biosppy')
    rectified_signal = np.abs(cleaned_signal)
    low_pass = low_pass_cutoff / (sample_rate / 2)
    b, a = sig.butter(4, low_pass, btype='lowpass')
    emg_envelope = sig.filtfilt(b, a, rectified_signal)
    return emg_envelope

def filter_emg_signals(csv_file, muscles, sample_rate=1000):
    df = msk.pd.read_csv(csv_file)
    filtered_data = {'time': df['time']}  # Add time column
    
    for muscle in muscles:
        if muscle in df.columns:
            filtered_data[muscle] = filter_emg(df[muscle].values, sample_rate)
    
    df_filtered = msk.pd.DataFrame(filtered_data)
    
    filtered_emg_path = csv_file.replace('.csv', '_filtered_emg.csv')
    df_filtered.to_csv(filtered_emg_path, index=False)
    
    return filtered_emg_path

def amplitude_normalise(processed_emg_path):
    emg_data = pd.read_csv(processed_emg_path)
    emg_data_normalised = emg_data.copy()
    emg_data_normalised = emg_data_normalised.drop(columns=['time'])
    
    # normalise data
    emg_data_normalised = emg_data_normalised / emg_data_normalised.max().max()
    
    # add time col back and move to first column
    emg_data_normalised['time'] = emg_data['time']
    cols = emg_data_normalised.columns.tolist()
    cols = cols[-1:] + cols[:-1]
    emg_data_normalised = emg_data_normalised[cols]
    
    # save path 
    normalised_emg_path = processed_emg_path.replace('.csv', '_normalised.csv')
    emg_data_normalised.to_csv(normalised_emg_path, index=False)	
    
    return normalised_emg_path
	

# Name coordinates for the respective leg

In [None]:
ik_columns = ["hip_flexion_leg", "hip_adduction_leg", "hip_rotation_leg", "knee_angle_leg", "ankle_angle_leg"]
ik_columns = [col.replace('_leg', '_r') for col in ik_columns]
ik_columns

# Load Participant Data

In [6]:
subject_list = ['PC002','PC003','PC006', 'PC013', 'TD006', 'TD013', 'TD017', 'TD021', 'TD023', 'TD026']
trial_list = ['trial1','trial2','trial3','normal1', 'normal2', 'normal3', 'crouch1', 'crouch2', 'crouch3']
os_analysis = openSim(legs=['r','l'], subjects=subject_list, trials_to_load=trial_list, trial_number=1)

# clear output
clear_output()

# Convert c3d files

In [8]:
c3dFilePath = msk.ui.select_file()

subject = os.path.basename(os.path.dirname(c3dFilePath))
trial_folder = os.path.basename(c3dFilePath).replace('.c3d','')

print(f"Subject: {subject}")
print(f"Trial: {trial_folder}")
legs = ['r', 'l']

for leg in legs:
    leg_folder_name = trial_folder + f'_{leg}1'
    leg_folder = os.path.join(os.path.dirname(c3dFilePath), leg_folder_name)
    if not os.path.exists(leg_folder):
        os.mkdir(leg_folder)
        print(f"Folder created: {leg_folder}")
    else:
        print(f"Folder already exists: {leg_folder}")
        
    if leg == 'r':
        columns_to_mot = ['RGLTMED','RRF','RADDLONG','RBF','RTA','RGM']
    else:
        columns_to_mot = ['LGLTMED','LRF','LADDLONG','LBF','LTA','LGM'] 
    
    
    # Export analog data to csv
    analog_csv_path = export_analog(c3dFilePath, columns_to_mot=columns_to_mot)
    
    # Process EMG signals
    filtered_emg_path = filter_emg_signals(analog_csv_path, columns_to_mot)

    # Normalise amplitude of EMG signals
    normalised_emg_path = amplitude_normalise(filtered_emg_path)
    
    # convert to mot
    emg_mot_path = csv_to_mot(normalised_emg_path)
    
    # move the file to the leg folder
    want_it = True
    if want_it:
        # move the file to the folder
        new_analog_path = os.path.join(os_analysis.subjects[subject][leg_folder_name].path, 'analog_emg_signals.csv')
        new_emg_mot_path = os.path.join(os_analysis.subjects[subject][leg_folder_name].path, 'processed_emg_signals.mot')

        try:
            shutil.move(analog_csv_path, new_analog_path)
            shutil.move(emg_mot_path, new_emg_mot_path)
        except Exception as e:
            print(f"Error moving file: {e}")
     

Subject: PC003
Trial: trial1
Folder already exists: C:/Git/opensim_tutorial/tutorials/repeated_sprinting/Simulations/PC003\trial1_r1
File saved: C:/Git/opensim_tutorial/tutorials/repeated_sprinting/Simulations/PC003/trial1_analog_filtered_emg_normalised.mot
Folder already exists: C:/Git/opensim_tutorial/tutorials/repeated_sprinting/Simulations/PC003\trial1_l1
File saved: C:/Git/opensim_tutorial/tutorials/repeated_sprinting/Simulations/PC003/trial1_analog_filtered_emg_normalised.mot


In [13]:
analog_csv_path = export_analog(c3dFilePath, columns_to_mot=columns_to_mot)
print(analog_csv_path)

C:/Git/opensim_tutorial/tutorials/repeated_sprinting/Simulations/PC003/trial1_analog.csv


# Start all files from Zero 

In [None]:
trial_name = trial_folder + '_r1'
os_analysis.subjects[subject][trial_name].emg_csv.data

In [None]:
subject = 'PC002'
trial_name = 'trial2_l1'
data = os_analysis.subjects[subject][trial_name].emg_csv.data
if data is None:
    print(f"EMG data for subject {subject}, trial {trial_name} is not loaded correctly.")
else:
    print("EMG data loaded successfully.")
data['time'] = data['time'] - data['time'].iloc[0]
data.to_csv(os_analysis.subjects[subject][trial_name].emg_csv, index=False)

csv_to_mot(os_analysis.subjects[subject][trial_name].emg_csv)

# Check times in all files for simulation


In [21]:
subject_list = ['PC002','PC003','PC006', 'PC013', 'TD006', 'TD013', 'TD017', 'TD021', 'TD023', 'TD026']
trial_list = ['trial1','trial2','trial3','normal1', 'normal2', 'normal3', 'crouch1', 'crouch2', 'crouch3']
os_analysis = openSim(legs=['r','l'], subjects=subject_list, trials_to_load=trial_list, trial_number=1)

# clear output
clear_output()

In [23]:
subject = 'PC003'
trial_name = 'trial1_l1'

# Define relative paths to the files
id_file = os_analysis.subjects[subject][trial_name].id.path
ik_file = os_analysis.subjects[subject][trial_name].ik.path
emg_file = os_analysis.subjects[subject][trial_name].emg.path

# Load the files into Pandas DataFrames
id_df = pd.read_csv(id_file, sep="\t", skiprows=6)  
ik_df = pd.read_csv(ik_file, sep="\t", skiprows=5)
emg_df = pd.read_csv(emg_file, sep="\t", skiprows=5) 

# Extract the time columns
id_time = id_df['time'] if 'time' in id_df.columns else id_df.iloc[:, 0]
ik_time = ik_df['time'] if 'time' in ik_df.columns else ik_df.iloc[:, 0]
emg_time = emg_df['time'] if 'time' in emg_df.columns else emg_df.iloc[:, 0]

print(id_time[0], id_time[len(id_time)-1])
print(ik_time[0], ik_time[len(ik_time)-1])
print(emg_time[0], emg_time[len(emg_time)-1])

print(str(emg_file))


3.18 3.6
3.18 3.6
0.1578 7.047800000000006
c:\Git\opensim_tutorial\tutorials\repeated_sprinting\Simulations\PC003\trial1_l1\processed_emg_signals.mot


In [None]:
# Convert time columns to float, handling any non-numeric values
def convert_to_numeric(series):
    return pd.to_numeric(series, errors='coerce').dropna()  # Convert and drop NaNs

id_time = convert_to_numeric(id_time)
ik_time = convert_to_numeric(ik_time)
emg_time = convert_to_numeric(emg_time)

# Combine time values into a set to remove duplicates
unique_times = sorted(set(id_time).union(set(ik_time), set(emg_time)))

# Convert to numpy array
unique_time_array = np.array(unique_times)

# Create a base DataFrame with unique_time_array as the time column
new_df = pd.DataFrame({'time': unique_time_array})

# Merge with id_df, ik_df, and emg_df using left join on 'time' column
new_id_df = new_df.merge(id_df, on='time', how='left')
new_ik_df = new_df.merge(ik_df, on='time', how='left')
new_emg_df = new_df.merge(emg_df, on='time', how='left')

# Fill NaN values with 0
def fill_nan_values(*dfs):
    return [df.fillna(0) for df in dfs]

new_id_df, new_ik_df, new_emg_df = fill_nan_values(new_id_df, new_ik_df, new_emg_df)

# Save the synchronized DataFrames
def save_files(base_path, id_df, ik_df, emg_df):
    id_path = os.path.join(base_path, "inverse_dynamics_synced.sto")
    ik_path = os.path.join(base_path, "Visual3d_SIMM_input_synced.mot")
    emg_path = os.path.join(base_path, "processed_emg_signals_synced.csv")
    
    # Save EMG DataFrame as CSV
    emg_df.to_csv(emg_path, index=False)
    
    # Save ID DataFrame as .sto
    with open(id_path, 'w') as f:
        f.write("inverse dynamics data\nversion=1\nnRows={}\nnColumns={}\ninDegrees=yes\nendheader\n".format(len(id_df), len(id_df.columns)))
        id_df.to_csv(f, sep='\t', index=False)
    
    # Save IK DataFrame as .mot
    with open(ik_path, 'w') as f:
        f.write("Coordinates\nversion=1\nnRows={}\nnColumns={}\ninDegrees=yes\nendheader\n".format(len(ik_df), len(ik_df.columns)))
        ik_df.to_csv(f, sep='\t', index=False)
    
    print(f"Files saved:\n{id_path}\n{ik_path}\n{emg_path}")

# Define the base path where the files will be saved
base_path = os.path.join(one_dir_up, "Simulations", "PC002", "trial2_r1")

# Save the synchronized DataFrames
save_files(base_path, new_id_df, new_ik_df, new_emg_df)




# Run ID

In [None]:
for subject in subject_list:
    trial_list = os_analysis.subjects[subject].keys()
    for trial in trial_list:
        try:
            os_analysis.subjects[subject][trial].change_grf_xml_path()
            os_analysis.subjects[subject][trial].run_ID(osim_modelPath=os_analysis.subjects[subject]['model'], 
                                                            coordinates_file=os_analysis.subjects[subject][trial].ik.path, 
                                                            output_file=os_analysis.subjects[subject][trial].id.path, 
                                                            external_loads_file=os_analysis.subjects[subject][trial].grf_xml.path, 
                                                            LowpassCutoffFrequency=6, 
                                                            run_tool=True)
            
            print(f"ID ran successfully for {subject} - {trial}")
        except Exception as e:
            print(f"Error running ID for {subject} - {trial}")
            print(e)



# Run MA

# RunSO

In [None]:

def run_SO(model_path, coordinates_file, actuators_file_path, run_tool=True):
    '''
    Function to run Static Optimization using the OpenSim API.
    
    Inputs:
            modelpath(str): path to the OpenSim model file
            trialpath(str): path to the trial folder
            actuators_file_path(str): path to the actuators file
            
    '''
    
    trialpath = os.path.dirname(coordinates_file)   
    # create directories
    results_directory = os.path.relpath(trialpath, trialpath)
    coordinates_file =  os.path.relpath(trialpath, coordinates_file)
    modelpath_relative = os.path.relpath(model_path, trialpath)

    # create a local copy of the actuator file path and update name
    actuators_file_path = os.path.relpath(actuators_file_path, trialpath)

    # start model
    OsimModel = msk.osim.Model(modelpath_relative)

    # Get mot data to determine time range
    motData = msk.osim.Storage(coordinates_file)

    # Get initial and intial time
    initial_time = motData.getFirstTime()
    final_time = motData.getLastTime()

    # Static Optimization
    so = msk.osim.StaticOptimization()
    so.setName('StaticOptimization')
    so.setModel(OsimModel)

    # Set other parameters as needed
    so.setStartTime(initial_time)
    so.setEndTime(final_time)
    so.setMaxIterations(25)

    analyzeTool_SO = msk.classes.osimSetup.create_analysis_tool(coordinates_file,modelpath_relative,results_directory)
    analyzeTool_SO.getAnalysisSet().cloneAndAppend(so)
    analyzeTool_SO.getForceSetFiles().append(actuators_file_path)
    analyzeTool_SO.setReplaceForceSet(False)
    OsimModel.addAnalysis(so)

    analyzeTool_SO.printToXML(".\setup_so.xml")

    analyzeTool_SO = msk.osim.AnalyzeTool(".\setup_so.xml")

    trial = os.path.basename(trialpath)
    print(f"so for {trial}")

    # run
    if run_tool:
        analyzeTool_SO.run()



In [None]:
run_SO(model_path=os_analysis.subjects[subject]['model'], 
        coordinates_file=os_analysis.subjects[subject][trial].ik.path, 
        actuators_file_path=os_analysis.subjects[subject][trial].id.path, 
        external_loads_file=os_analysis.subjects[subject][trial].grf_xml.path, 
        LowpassCutoffFrequency=6, 
        run_tool=True)

# Print trial to Json

In [None]:
jsonFilePath = os.path.join(os_analysis.simulations_path, 'PC002', 'trial2_r1', 'settings.json')
trial = Trial(os.path.join(os_analysis.simulations_path, 'PC002', 'trial2_r1'))
trial.__dict__['jra'].__dict__

data = trial.__dict__
print(trial.name)


In [None]:

run_SO(model_path=os_analysis.subjects['PC002']['model'],
        coordinates_file = os_analysis.subjects['PC002']['trial2'].ik.path,
        actuators_file_path = os_analysis.subjects['PC002']['trial2'].so_force.path)

# CEINMS calibration run

In [None]:
import msk_modelling_python as msk
import subprocess
import os

In [None]:
subject = 'PC002'
trial_name = 'trial2_r1'
subject_list = ['PC002','PC003','PC006', 'PC013', 'TD006', 'TD013', 'TD017', 'TD021', 'TD023', 'TD026']
trial_list = ['trial1','trial2','trial3','normal1', 'normal2', 'normal3', 'crouch1', 'crouch2', 'crouch3']
os_analysis = openSim(legs=['r','l'], subjects=subject_list, trials_to_load=trial_list, trial_number=1)
#check times
print(os_analysis.subjects[subject][trial_name].ik.time_range)
print(os_analysis.subjects[subject][trial_name].id.time_range)
print(os_analysis.subjects[subject][trial_name].emg.time_range)


In [None]:
code_path = os.getcwd()
xml_setup_file = os.path.join(os.path.dirname(code_path), 'Simulations', subject, trial_name, 'ceinms', 'calibrationSetup.xml')
ceinms_install_path = os.path.join(msk.__path__[0], 'src', 'ceinms2', 'src')
command = " ".join([ceinms_install_path + "\CEINMScalibrate.exe -S", xml_setup_file])
print(str(command))
#print(os.getcwd())
result = subprocess.run(command, capture_output=True, text=True, check=True)