In [19]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report

In [20]:

def load_and_preprocess_data():
    try:
        bl_df = pd.read_csv('s03_standing_ab_twists_bl.csv')
        br_df = pd.read_csv('s03_standing_ab_twists_br.csv')
        fr_df = pd.read_csv('s03_standing_ab_twists_fr.csv')
        fl_df = pd.read_csv('s03_standing_ab_twists_fl.csv')

        combined_df = pd.concat([bl_df, br_df, fr_df, fl_df], axis=1)

        combined_df = combined_df.loc[:,~combined_df.columns.duplicated()]

        combined_df = combined_df.fillna(method='ffill')  

        # Normalize the coordinates
        coord_columns = [col for col in combined_df.columns if col.endswith(('_x', '_y', '_z'))]

        if coord_columns:
            scaler = StandardScaler()
            combined_df[coord_columns] = scaler.fit_transform(combined_df[coord_columns])
        else:
            print("Warning: No coordinate columns found. Skipping normalization.")

        return combined_df
    except FileNotFoundError as e:
        print(f"Error: Unable to load one or more CSV files. {str(e)}")
        return None

In [21]:

def calculate_angle(v1, v2):
    """Calculate angle between two vectors."""
    dot_product = np.dot(v1, v2)
    magnitudes = np.linalg.norm(v1) * np.linalg.norm(v2)
    return np.arccos(dot_product / magnitudes)

def engineer_features(df):
    required_columns = ['shoulder_x', 'shoulder_y', 'shoulder_z', 
                        'hip_x', 'hip_y', 'hip_z', 
                        'knee_x', 'knee_y', 'knee_z']
    
    if not all(col in df.columns for col in required_columns):
        print("Warning: Not all required columns found for feature engineering.")
        print("Available columns:", df.columns.tolist())
        return df

    # Calculate angles
    shoulder = df[['shoulder_x', 'shoulder_y', 'shoulder_z']].values
    hip = df[['hip_x', 'hip_y', 'hip_z']].values
    knee = df[['knee_x', 'knee_y', 'knee_z']].values

    torso_angle = np.apply_along_axis(calculate_angle, 1, shoulder - hip, knee - hip)
    df['torso_angle'] = torso_angle

    # Calculate velocities
    df['shoulder_velocity'] = np.linalg.norm(np.diff(shoulder, axis=0, prepend=shoulder[:1]), axis=1)
    df['hip_velocity'] = np.linalg.norm(np.diff(hip, axis=0, prepend=hip[:1]), axis=1)

    # Calculate accelerations
    df['shoulder_acceleration'] = np.diff(df['shoulder_velocity'], prepend=df['shoulder_velocity'].iloc[0])
    df['hip_acceleration'] = np.diff(df['hip_velocity'], prepend=df['hip_velocity'].iloc[0])

    return df

In [22]:

def train_and_evaluate_model(df):
    # Check if 'label' column exists
    if 'label' not in df.columns:
        print("Error: 'label' column not found in the dataframe.")
        print("Available columns:", df.columns.tolist())
        return None

    # Prepare features and target
    X = df.drop('label', axis=1)
    y = df['label']

    # Split the data
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

    # Train a Random Forest model
    rf_model = RandomForestClassifier(n_estimators=100, random_state=42)
    rf_model.fit(X_train, y_train)

    # Make predictions
    y_pred = rf_model.predict(X_test)

    
    print("Model Evaluation:")
    print(classification_report(y_test, y_pred))

    return rf_model

In [23]:

def needs_adjustment(model, new_data, confidence_threshold=0.7):
    prediction = model.predict_proba(new_data)[0]
    # Check if model output has probabilities for both classes
    if len(prediction) == 2:
        return prediction[1] < confidence_threshold
    else:
        return prediction[0] < confidence_threshold

In [24]:

# Load and preprocess data
print("Loading and preprocessing data...")
df = load_and_preprocess_data()

if df is None:
    print("Failed to load data. Please check your CSV files and try again.")
else:
    # Engineer features
    print("Engineering features...")
    df = engineer_features(df)

    # Display info about the processed dataframe
    print("\nProcessed dataframe info:")
    print(df.info())
    print("\nFirst few rows of the processed dataframe:")
    print(df.head())
    print("\nColumns in the processed dataframe:")
    print(df.columns.tolist())

    # Train and evaluate the model
    print("\nTraining and evaluating the model...")
    model = train_and_evaluate_model(df)

    if model is not None:
        print("\nExample prediction:")
        new_data = df.drop('label', axis=1).iloc[0].values.reshape(1, -1)  
        if needs_adjustment(model, new_data):
            print("Form needs adjustment")
        else:
            print("Form is correct")
    else:
        print("Model training failed. Please check the data and try again.")

Loading and preprocessing data...
Engineering features...
Available columns: ['label', 'timestamp', 'x_0', 'y_0', 'z_0', 'visibility_0', 'x_1', 'y_1', 'z_1', 'visibility_1', 'x_2', 'y_2', 'z_2', 'visibility_2', 'x_3', 'y_3', 'z_3', 'visibility_3', 'x_4', 'y_4', 'z_4', 'visibility_4', 'x_5', 'y_5', 'z_5', 'visibility_5', 'x_6', 'y_6', 'z_6', 'visibility_6', 'x_7', 'y_7', 'z_7', 'visibility_7', 'x_8', 'y_8', 'z_8', 'visibility_8', 'x_9', 'y_9', 'z_9', 'visibility_9', 'x_10', 'y_10', 'z_10', 'visibility_10', 'x_11', 'y_11', 'z_11', 'visibility_11', 'x_12', 'y_12', 'z_12', 'visibility_12', 'x_13', 'y_13', 'z_13', 'visibility_13', 'x_14', 'y_14', 'z_14', 'visibility_14', 'x_15', 'y_15', 'z_15', 'visibility_15', 'x_16', 'y_16', 'z_16', 'visibility_16', 'x_17', 'y_17', 'z_17', 'visibility_17', 'x_18', 'y_18', 'z_18', 'visibility_18', 'x_19', 'y_19', 'z_19', 'visibility_19', 'x_20', 'y_20', 'z_20', 'visibility_20', 'x_21', 'y_21', 'z_21', 'visibility_21', 'x_22', 'y_22', 'z_22', 'visibility_22

  combined_df = combined_df.fillna(method='ffill')


In [25]:
# Rep Counting Function
def count_reps(model, df, movement_threshold=0.5):
    """Counts the number of reps completed with correct form."""
    rep_count = 0
    in_rep = False  
    
    
    for i in range(len(df)):
        # Get the current frame's data
        frame_data = df.drop('label', axis=1).iloc[i].values.reshape(1, -1)
        
        # Check if the form needs adjustment
        if not needs_adjustment(model, frame_data):
            #Torso angle
            torso_angle = df.iloc[i]['torso_angle']
            
            if torso_angle > movement_threshold:
                if not in_rep:
                    in_rep = True  
            elif in_rep:
                in_rep = False
                rep_count += 1 
        else:
            in_rep = False
    
    return rep_count




