In [1]:
%pip install stable-baselines3[extra]
%pip install gym --upgrade
%pip install imitation


Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.


In [2]:
%pip install tensorflow


Note: you may need to restart the kernel to use updated packages.


In [3]:
import tensorflow as tf
print(tf.config.list_physical_devices('GPU'))

[]


In [4]:
import os
import numpy as np
import gym
from gym import Env
from gym.spaces import Discrete, Box
from gym.envs.registration import register
from sklearn.model_selection import train_test_split
from stable_baselines3 import PPO
from stable_baselines3.ppo import MlpPolicy
from stable_baselines3.common.evaluation import evaluate_policy
from stable_baselines3.common.env_util import make_vec_env
from imitation.data import rollout
from imitation.algorithms import bc

In [5]:
data_folder = os.path.join('data_copy')

In [6]:
# Define a mapping of body parts to column ranges
body_parts_mapping = {
    'Head': (0, 1),
    'SpineShoulders': (3, 4),
    'SpineMid': (6, 7),
    'LeftShoulder': (9, 10),
    'RightShoulder': (12, 13),
    'LeftElbow': (15, 16),
    'RightElbow': (18, 19),
    'LeftWrist': (21, 22),
    'RightWrist': (24, 25),
    'SpineBase': (27, 28),
    'LeftHip': (30, 31),
    'RightHip': (33, 34),
    'LeftKnee': (36, 37),
    'RightKnee': (39, 40),
    'LeftAnkle': (42, 43),
    'RightAnkle': (45, 46),
}

In [7]:
# Function to preprocess data
def preprocess_data(data_folder):
    scaled_joint_positions = []  # Store scaled joint positions
    actions = []

    exercise_folders = os.listdir(data_folder)
    exercise_folders.remove('guide')
    exercise_folders.remove('plot') 

    # Define the output folder path
    output_folder = os.path.join("States and Actions")
    os.makedirs(output_folder, exist_ok=True)

    for exercise_folder in exercise_folders:

        exercise_path = os.path.join(data_folder, exercise_folder)
        exercise_path_list = os.listdir(exercise_path)
        
        for typedata_folder in exercise_path_list:
        
            typedata_path = os.path.join(exercise_path, typedata_folder)
            typedata_path_list = os.listdir(typedata_path)

            for typepatient_folder in typedata_path_list:

                typepatient_path = os.path.join(typedata_path, typepatient_folder)
                typepatient_path_list = os.listdir(typepatient_path)

                for patient_folder in typepatient_path_list:

                    patient_path = os.path.join(typepatient_path, patient_folder)
                    patient_path_list = os.listdir(patient_path)

                    for trial_file in patient_path_list:
                        if not trial_file.startswith('.'):  # Skip files/directories starting with a period
                            file_path = os.path.join(patient_path, trial_file)
                            joints_data = read_txt_file(file_path)
        
                            # Extract relevant columns based on body parts mapping
                            extracted_data = extract_body_parts(joints_data)
                            
                            # Print metadata
                            print('-----------------------------------------------')
                            print('TypePatient -', typepatient_folder)
                            print('Exercise -', exercise_folder)
                            print('Typedata -', typedata_folder)
                            print('Patient -', patient_folder)
                            print('Trial -', trial_file)
                            
                            # Initialize list to store scaled coordinates for each instance
                            scaled_instance_coords_list = []
                            
                            # Save the joint positions
                            joint_positions_output_folder = os.path.join(output_folder, typepatient_folder, exercise_folder, typedata_folder, patient_folder)
                            os.makedirs(joint_positions_output_folder, exist_ok=True)
                            joint_positions_file = os.path.join(joint_positions_output_folder, f"{trial_file}_joint_positions.txt")
                            with open(joint_positions_file, 'w') as file:
                                for instance in extracted_data:
                                    #file.write(f"{instance}:\n")
                                    scaled_instance_coords = []
                                    for joint, coords in instance.items():
                                        # Multiply each coordinate by ...
                                        scaled_coords = [round(coord * 100) for coord in coords]
                                        scaled_instance_coords.extend(scaled_coords)
                                        # Write the joint coordinates without brackets and with commas
                                        file.write(','.join(map(str, scaled_coords)))
                                        file.write(",")  # Add a comma between joint coordinates     
                                    file.write("\n")  # Move to the next line after writing all joint coordinates for an instance
                            
                                    # Append scaled joint positions to the list
                                    scaled_instance_coords_list.append(scaled_instance_coords)
                            
                            # Append scaled joint positions to the list
                            scaled_joint_positions.extend(scaled_instance_coords_list)

                            scaled_instance_coords_list = np.array(scaled_instance_coords_list)
                            print(scaled_instance_coords_list.shape)

                            # Process each instance and generate actions
                            instance_actions = generate_actions(scaled_instance_coords_list)
                            actions.extend(instance_actions)
                            
                            # Save the actions as text file
                            actions_output_folder = os.path.join(output_folder, typepatient_folder, exercise_folder, typedata_folder, patient_folder)
                            os.makedirs(actions_output_folder, exist_ok=True)
                            actions_file = os.path.join(actions_output_folder, f"{trial_file}_actions.txt")
                            with open(actions_file, 'w') as file:
                                for action in instance_actions:
                                    file.write(f"{action[0]} {action[1]}\n")

                            #scaled_joint_positions.append(extracted_data)
    
     # Split data into training and testing sets
    train_data, test_data = train_test_split(scaled_joint_positions, test_size=0.2, random_state=42)
    train_actions, test_actions = train_test_split( actions, test_size=0.2, random_state=42)
    
    print('The End')

    
    return train_data, test_data, train_actions, test_actions

# Function to read data from a text file
def read_txt_file(file_path):
    with open(file_path, 'r') as file:
        lines = file.readlines()
        joints_data = [list(map(float, line.split(';'))) for line in lines]
    return joints_data

# Function to extract relevant body parts from joint data
def extract_body_parts(joints_data):
    num_instances = len(joints_data)
    extracted_data = []

    # Iterate over instances
    for instance_idx in range(num_instances):
        instance_data = {}
        spine_base_coords = None  # Initialize spine base coordinates for each instance
        # Iterate over body parts
        for body_part, (start_col, end_col) in body_parts_mapping.items():
            body_part_data = joints_data[instance_idx][start_col:end_col + 1]
            instance_data[body_part] = body_part_data
            if body_part == 'SpineBase':
                spine_base_coords = body_part_data
        # Calculate relative positions of all other joints with respect to spine base coordinates
        for body_part, coords in instance_data.items():
                relative_coords = [coord - spine_base_coord for coord, spine_base_coord in zip(coords, spine_base_coords)]
                instance_data[body_part] = relative_coords
        extracted_data.append(instance_data)

    return extracted_data



def generate_grid(joint_coords, spine_base_coords):
    # Ensure joint_coords is a list, even if it contains only one coordinate pair
    if isinstance(joint_coords[0], int):
        joint_coords = [joint_coords]
    
    # Determine the minimal x and y values of all joints
    min_x = min(coord[0] for coord in joint_coords)
    min_y = min(coord[1] for coord in joint_coords)
    
    # Determine the grid size based on the minimal x and y values
    grid_size_x = max(int((max(coord[0] for coord in joint_coords) - min_x) * 10) + 1, 1)
    grid_size_y = max(int((max(coord[1] for coord in joint_coords) - min_y) * 10) + 1, 1)
    
    print(f"Grid size: {grid_size_x} x {grid_size_y}")

    # Determine grid position of the joint coordinates centered around SpineBase
    grid_x = int((joint_coords[0][0] - spine_base_coords[0] + 0.5) * 10)
    grid_y = int((joint_coords[0][1] - spine_base_coords[1] + 0.5) * 10)
    
    return grid_x, grid_y


# Function to generate actions based on joint positions
def generate_actions(scaled_instance_coords_list):
    actions = []
    current_x_left_wrist = []
    next_x_left_wrist = []
    current_y_left_wrist = []
    next_y_left_wrist = []
    current_x_right_wrist = []
    next_x_right_wrist = []
    current_y_right_wrist = []
    next_y_right_wrist = []

    print(len(scaled_instance_coords_list))
    
    for i in range(len(scaled_instance_coords_list) - 1):
        current_instance = scaled_instance_coords_list[i]
        next_instance = scaled_instance_coords_list[i + 1]

        

        # Extract wrist grid positions for current and next instances
        current_x_left_wrist = current_instance[14]
        next_x_left_wrist = next_instance[14]
        current_y_left_wrist = current_instance[15]
        next_y_left_wrist = next_instance[15]

        current_x_right_wrist = current_instance[16]
        next_x_right_wrist = next_instance[16]
        current_y_right_wrist = current_instance[17]
        next_y_right_wrist = current_instance[17]

        instance_actions = generate_instance_actions(current_x_left_wrist, next_x_left_wrist, current_y_left_wrist, next_y_left_wrist, current_x_right_wrist, next_x_right_wrist, current_y_right_wrist, next_y_right_wrist)
        actions.extend(instance_actions)
    return actions

def generate_instance_actions(current_x_left_wrist, next_x_left_wrist, current_y_left_wrist, next_y_left_wrist, current_x_right_wrist, next_x_right_wrist, current_y_right_wrist, next_y_right_wrist):
    actions = []
    # Generate actions for every other instance
    # Compare wrist positions to determine movement direction for left wrist
    left_movement = compare_positions((current_x_left_wrist, current_y_left_wrist), (next_x_left_wrist, next_y_left_wrist))
    # Compare wrist positions to determine movement direction for right wrist
    right_movement = compare_positions((current_x_right_wrist, current_y_right_wrist), (next_x_right_wrist, next_y_right_wrist))

    # Map grid position comparison to action
    left_action = f"{left_movement}" if left_movement != "Hold" else "Hold"
    right_action = f"{right_movement}" if right_movement != "Hold" else "Hold"

    actions.append((left_action, right_action))
    return actions

# Function to compare grid positions and determine movement direction
def compare_positions(current_grid, next_grid):
    # Calculate the change in coordinates
    delta_x = next_grid[0] - current_grid[0]
    delta_y = next_grid[1] - current_grid[1]
    
    # Determine the movement direction based on the change in coordinates
    if delta_x == 0 and delta_y == 0:
        return "Hold"
    elif delta_x > 0:
        if delta_y > 0:
            return "UpRight"
        elif delta_y < 0:
            return "DownRight"
        else:
            return "Right"
    elif delta_x < 0:
        if delta_y > 0:
            return "UpLeft"
        elif delta_y < 0:
            return "DownLeft"
        else:
            return "Left"
    else:
        if delta_y > 0:
            return "Up"
        elif delta_y < 0:
            return "Down"
    return "no movement detected"  # Default action if no movement detected


In [8]:
# Step 2: Load the Preprocessed Data
train_data, test_data, train_actions, test_actions = preprocess_data(data_folder)

-----------------------------------------------
TypePatient - affected
Exercise - E1
Typedata - capture
Patient - P01
Trial - E1_filt_captured_P01_affected_1.txt
(30, 32)
30
-----------------------------------------------
TypePatient - affected
Exercise - E1
Typedata - capture
Patient - P01
Trial - E1_filt_captured_P01_affected_10.txt
(26, 32)
26
-----------------------------------------------
TypePatient - affected
Exercise - E1
Typedata - capture
Patient - P01
Trial - E1_filt_captured_P01_affected_2.txt
(28, 32)
28
-----------------------------------------------
TypePatient - affected
Exercise - E1
Typedata - capture
Patient - P01
Trial - E1_filt_captured_P01_affected_3.txt
(42, 32)
42
-----------------------------------------------
TypePatient - affected
Exercise - E1
Typedata - capture
Patient - P01
Trial - E1_filt_captured_P01_affected_4.txt
(47, 32)
47
-----------------------------------------------
TypePatient - affected
Exercise - E1
Typedata - capture
Patient - P01
Trial - E1_

In [14]:
import gym
from stable_baselines3.common.vec_env import DummyVecEnv
from imitation.util.util import make_vec_env
from imitation.data.wrappers import RolloutInfoWrapper
import numpy as np

class CustomEnv(gym.Env):
    def __init__(self, data, expected_actions, num_states=10, num_actions=2):
        super(CustomEnv, self).__init__()
        self.data = data
        self.num_states = num_states
        self.num_actions = num_actions
        self.expected_actions = expected_actions
        self.state = 0  # Start at beginning
        self.action_space = gym.spaces.Discrete(self.num_actions)
        self.observation_space = gym.spaces.Box(low=-np.inf, high=np.inf, shape=(self.data.shape[1],))

    def step(self, action):
        assert self.action_space.contains(action), "Invalid action!"
        is_done = False
        reward = -1.0  # Default reward when is_done is False
        expected_action = self.expected_actions[self.state]

        # Compare action and expected_action correctly
        if np.array_equal(action, expected_action):  # action matches the expected action
            reward = 1.0
        if action == 0:  # action 0 means move to the next state
            if self.state < self.num_states - 1:
                self.state += 1
            else:
                is_done = True  # End of states, the episode is done
        elif action == 1:  # action 1 means stay in the current state
            pass
        return self.data[self.state], reward, is_done, {}


    def reset(self):
        self.state = 0  # Reset to the start state
        return self.data[self.state]
# Register the environment
gym.register(
    id="custom/CustomEnv-v0",
    entry_point='__main__:CustomEnv',  # This should be the path to the class, e.g. `module:CustomEnv`
    kwargs={'data': np.array(train_data), 'expected_actions': train_actions}
)

# Create a single environment for training an expert with SB3
env = gym.make("custom/CustomEnv-v0")



  logger.warn(f"Overriding environment {new_spec.id} already in registry.")


In [21]:
from stable_baselines3 import PPO
from stable_baselines3.ppo import MlpPolicy
from stable_baselines3.common.evaluation import evaluate_policy
from gymnasium.wrappers import TimeLimit

expert = PPO(
    policy=MlpPolicy,
    env=env,
    seed=0,
    batch_size=64,
    ent_coef=0.0,
    learning_rate=0.0003,
    n_epochs=1,
    n_steps=64,
)

reward, _ = evaluate_policy(expert, env, 1)
print(f"Reward before training: {reward}")

KeyboardInterrupt: 