In [33]:
import os
import fileinput
import numpy as np
import argparse
import math
import matplotlib.pyplot as plt
import coremltools as ct 

print("Initiating Tensorflow...")
import tensorflow as tf
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.layers import LSTM, Dense, Dropout
from tensorflow.keras.models import Sequential
# from tensorflow.keras.optimizers import Adam
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report


Initiating Tensorflow...


Global variables

In [34]:
# Stroke types in order
STROKE_TYPE = ["Serve",
            "Forehand",
            "Backhand",
            "NoStroke"]

# Stroke type indices
SERVE       = 0
FOREHAND    = 1
BACKHAND    = 2
NOSTROKE    = 3

# Video Keypoint locations
SERVE_OUTPUT_LOC    = f"videos/{STROKE_TYPE[SERVE]}/output/"
FOREHAND_OUTPUT_LOC = f"videos/{STROKE_TYPE[FOREHAND]}/output/"
BACKHAND_OUTPUT_LOC = f"videos/{STROKE_TYPE[BACKHAND]}/output/"
NOSTROKE_OUTPUT_LOC = f"videos/{STROKE_TYPE[NOSTROKE]}/output/"

# Directory to save graphed distances for each stroke based on keypoints
PLT_FILE_PATH       = "images/"

# Frames in a video
FRAMES_PER_SAMPLE   = 58

# Required Mediapipe pose keypoints
# Reference: https://github.com/google/mediapipe/blob/master/docs/solutions/pose.md
LEFT_SHOULDER   = 11
RIGHT_SHOULDER  = 12
LEFT_HAND       = 15
RIGHT_HAND      = 16
LEFT_HIP        = 23 
RIGHT_HIP       = 24
LEFT_ANKLE      = 27
RIGHT_ANKLE     = 28

DISTANCES_TYPE = ["RH to C", 
                "LH to C", 
                "RH to RA", 
                "LH to LA",
                "RH to LH",
                "RH to LHIP",
                "LH to RHIP",
                "MHIP-X to RH-X",
                "MHIP-X to LH-X"]

# Each keypoint is of the form [ID, X, Y]
ID      = 0
X_COORD = 1
Y_COORD = 2

# Model Parameters
RANDOM_SEED         = 42
TEST_PERCENT        = 0.2   # Percent for Testing

# Outputs from LSTM
# Also used for hidden count
LSTM_OP_SPACE       = 100

DROPOUT_FACTOR      = 0.2
MODEL_EPOCH_COUNT   = 50
MODEL_BATCH_SIZE    = 32
SHOULD_GEN_GRAPH    = False  # Generate graphs for each frame?
MODEL_NAME          = "models/tf_simple_lstm_model"

Helper functions

In [35]:
# Each type has dimensions: video_num, frame_num, key_num
# Read the text file
def getRows(location):
    # Get all files in the folder
    file_list = []
    for root, dirs, files in os.walk(location):
        for file in files:
            file_path = os.path.join(root, file)
            file_list.append(file_path)
    
    videos = []
    # Iterate over each file and read its lines
    for file_path in file_list:     # Parse all the videos
        print(f"File: {file_path}")
        frames = []
        for line in fileinput.input(file_path): # Parse all the frames
            i = 0
            row = line.split()
            row = [int(value) for value in row]
            keypoints = []
            while i < len(row): # Parse all the keypoints
                keypoint = []
                keypoint.append(row[i])
                # print(row[i])
                keypoint.append(row[i+1])
                keypoint.append(row[i+2])
                # print(str(keypoint) + '\n')
                keypoints.append(keypoint) 
                i += 3
        
            frames.append(keypoints)
        fileinput.close()
        videos.append(frames)
    return videos

In [36]:
# Points p0 and p1 are in the form [0:pointNum 1:xCoord 2:yCoord]
def distance(p0, p1):
    return math.sqrt(
        (p1[X_COORD] - p0[X_COORD]) ** 2 + 
        (p1[Y_COORD] - p0[Y_COORD]) ** 2)

In [37]:
# Function to create a directory if it doesn't exist
# os.listdir()
def create_dir(directory):
    if not os.path.exists(directory):
        print(f"Directory Created!: {directory}")
        os.mkdir(directory)

This where the model building happens

In [46]:

def main():
    STROKE_COUNT = len(STROKE_TYPE)
    DISTANCES_COUNT  = len(DISTANCES_TYPE)

    # construct the argument parser and parse the arguments
    # ap = argparse.ArgumentParser()
    # ap.add_argument("-g", "--graphs", type=int, default=0,
    #     help="Set as 0 if you dont want to generate graphs for the videos")
    # ap.add_argument("-o", "--optimize", type=int, default=0,
    #     help="Set as 0 if you dont want to optimize the model for mobile devices")
    # args = vars(ap.parse_args())

    # SHOULD_GEN_GRAPH = args["graphs"]
    # SHOULD_MOBILE_OPTIMIZE = args["optimize"]


    print("Reading Keypoint files...")

    # Input keypoints for all videos (30 serves, 30 forehands, 30 backhands and 60 no stroke play) with 30 FPS Recordings
    serves      = getRows(SERVE_OUTPUT_LOC)
    forehands   = getRows(FOREHAND_OUTPUT_LOC)
    backhands   = getRows(BACKHAND_OUTPUT_LOC)
    nostrokes   = getRows(NOSTROKE_OUTPUT_LOC)

    dataset = []
    dataset.append(serves)
    dataset.append(forehands)
    dataset.append(backhands)
    dataset.append(nostrokes)

    # Color the distances
    cmap = plt.get_cmap("rainbow", DISTANCES_COUNT)  
    # VIOLET        - RH to C
    # BLUE          - LH to C
    # LIGHT BLUE    - RH to RA
    # CYAN          - LH to LA
    # LIGHT GREEN   - RH to LH
    # GREEN         - RH to LHIP
    # LIGHT ORANGE  - LH to RHIP
    # ORANGE        - MHIP-X to RH-X
    # RED           - MHIP-X to LH-X

    input_set = []
    label_per_video = []
    print("Calculating Distances...\n")
    # For each set of keypoints in a video in all videos
    for i in range(len(dataset)):
        j = 0
        for video in dataset[i]:
            sequence = []
            k = 0
            for frame in video:
                distances = []
                # Calculate the 9 distances as follows:
                chest_point = [-1, 
                            (frame[LEFT_SHOULDER][X_COORD]+
                            frame[RIGHT_SHOULDER][X_COORD])/2,  
                            (frame[LEFT_SHOULDER][Y_COORD]+
                            frame[RIGHT_SHOULDER][Y_COORD])/2]
                hip_center = [-1, 
                            (frame[LEFT_HIP][X_COORD]+
                            frame[RIGHT_HIP][X_COORD])/2,  
                            (frame[LEFT_HIP][Y_COORD]+
                            frame[RIGHT_HIP][Y_COORD])/2]
                # torso length is the distance between shoulder mid point and hip mid
                torso_length = distance(chest_point, hip_center)
                if torso_length == 0:
                    print(f"\n\nTorso Length 0 for {STROKE_TYPE[i]}_video_{j+1} Frame: {k+1}?\nChest Point: {chest_point}, Hip Center: {hip_center}\nDefaulting Torso Length to 1")
                    print(f"Left Shoulder: {frame[LEFT_SHOULDER]}")
                    print(f"Right Shoulder: {frame[RIGHT_SHOULDER]}")
                    print(f"Left Hip: {frame[LEFT_HIP]}")
                    print(f"Right Hip: {frame[RIGHT_HIP]}")
                    torso_length = 1
                # Normalize all the distances based on torso length and append them to the distance array
                # Dominant hand to chest 
                distances.append(distance(
                                    frame[RIGHT_HAND], chest_point)/torso_length)
                # Non-dominant hand to chest
                distances.append(distance(
                                    frame[LEFT_HAND], chest_point)/torso_length)
                # Dominant hand to dominant side foot
                distances.append(distance(
                                    frame[RIGHT_HAND], frame[RIGHT_ANKLE])/torso_length)
                # Non-dominant hand to non-dominant hand side foot
                distances.append(distance(frame[LEFT_HAND], frame[LEFT_ANKLE])/torso_length)
                # Hand to hand 
                distances.append(distance(frame[LEFT_HAND], frame[RIGHT_HAND])/torso_length)
                # Dominant hand to nondominant side hip 
                distances.append(distance(frame[RIGHT_HAND], frame[LEFT_HIP])/torso_length)
                # Non-Dominant hand to dominant side hip
                distances.append(distance(frame[LEFT_HAND], frame[RIGHT_HIP])/torso_length)
                # Body (Hip Center) to dominant hand x-axis distance
                distances.append(abs(
                                hip_center[X_COORD]-frame[RIGHT_HAND][X_COORD])/torso_length)
                # Body (Hip Center) to non-dominant hand x-axis distance 
                distances.append(abs(
                                hip_center[X_COORD]-frame[LEFT_HAND][X_COORD])/torso_length)
                # Store in a data set array
                sequence.append(distances)
                k+=1
            # print(f"Generate Graphs: {SHOULD_GEN_GRAPH}")
            if SHOULD_GEN_GRAPH != 0:  
                fig, ax = plt.subplots()
                arr = np.array(sequence)
                # Iterate over each distance column and plot with a different color
                for l in range(DISTANCES_COUNT):
                    ax.plot(arr[:,l], color=cmap(l), label=f"{DISTANCES_TYPE[l]}")    

                # Set the title and labels
                ax.set_title(f"Distances for {STROKE_TYPE[i]}, Video {j+1}")
                ax.set_xlabel("Frames")
                ax.set_ylabel("Distance")

                # Add a legend outside the graph
                ax.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
                # Create images directory
                create_dir(PLT_FILE_PATH)
                # Save graph path
                filename = f"{PLT_FILE_PATH}{STROKE_TYPE[i]}_video_{j+1}_distances.png"
                print(f"Saved graph: {filename}")
                plt.savefig(filename, bbox_inches='tight')
                # Close the figure to free up resources
                plt.close(fig)
            
            input_set.append(sequence)
            j+=1
            # Label the video respectively as (0: Serve, 1: Forehand, 2: Backhand, 3: No Stroke) 
            label_per_video.append(i)

    SAMPLE_COUNT = len(input_set) # Store total video count
    # print("Inputs:\n", SAMPLE_COUNT)
    # print("\n\nLabels:\n",label_per_video)

    # Reshape the data matrix: Assuming you have a data matrix called data with shape (SAMPLE_COUNT, FRAMES_PER_SAMPLE, DISTANCES_COUNT), 
    # where the first dimension represents the number of videos, the second dimension represents the 
    # number of frames per video, and the third dimension represents the number of distances per frame. 
    # You can reshape this data matrix to have the shape (total_samples, timesteps, features).
    reshaped_data = np.array(input_set).reshape((-1, FRAMES_PER_SAMPLE, DISTANCES_COUNT))
    print(f"Reshaped Data: {reshaped_data.shape}")

    # Assuming 'labels' is a list or array containing the action labels for each video
    one_hot_labels = to_categorical(label_per_video)
    print(f"\nOne hot Labels:\n{one_hot_labels}")

    print("Splitting Dataset...")
    # Divide the data set samples into 80:20 for train:test

    # Reference: https://scikit-learn.org/stable/modules/generated/sklearn.model_selection.train_test_split.html
    X_train, X_test, Y_train, Y_test = train_test_split(reshaped_data, one_hot_labels, test_size=TEST_PERCENT, random_state=RANDOM_SEED)
    X_train, X_val, Y_train, Y_val = train_test_split(X_train, Y_train, test_size=TEST_PERCENT, random_state=RANDOM_SEED)
    # print(f"Len of X_train: {len(X_train)}\nLen of X_test: {len(X_test)}\nLen of Y_train: {len(Y_train)}\nLen of Y_test: {len(Y_test)}\nLen of X_val: {len(X_val)}\nLen of Y_val: {len(Y_val)}")

    print("Creating Simple LSTM Model...")
    # Training Phase
    # Create a TF Sequential Model
    model = Sequential()
    # Add an LSTM Layer with 100 output space and input shape as FRAMES_PER_SAMPLExDISTANCES_COUNT (ref: https://sci-hub.se/https://doi.org/10.1109/CITISIA50690.2020.9371776)
    model.add(LSTM(LSTM_OP_SPACE, input_shape=(FRAMES_PER_SAMPLE, DISTANCES_COUNT)))  # Adjust the number of units as needed
    # Add a Dropout Layer with 100 output space
    model.add(Dropout(DROPOUT_FACTOR))
    # Add a Dense Layer with 100 output space
    model.add(Dense(LSTM_OP_SPACE))
    # Add a Dense Layer with STROKE_COUNT output space
    model.add(Dense(STROKE_COUNT, activation='softmax'))
    # Compile the model
    model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
    # Fit the model with the current dataset with the xvalues and the labelvalues
    model.fit(X_train, Y_train, validation_data=(X_val, Y_val), epochs=MODEL_EPOCH_COUNT, batch_size=MODEL_BATCH_SIZE)
    # print(f"\nTensorflow Model:\n{model}")
    print(f"\nModel Summary:\n{model.summary()}")


    # The code commented below has not been tested to Optimize the model for phones
    """
    if SHOULD_MOBILE_OPTIMIZE != 0:
        from tensorflow.lite.python.util import run_graph_optimizations
        from tensorflow.lite.python.convert import convert

        # Apply model quantization
        converter = tf.lite.TFLiteConverter.from_keras_model(model)
        converter.optimizations = [tf.lite.Optimize.DEFAULT]
        tflite_model = converter.convert()

        # Apply model compression techniques
        # e.g. prune or quantize the model further

        # Save the optimized model
        with open('optimized_model.tflite', 'wb') as f:
            f.write(tflite_model)

        # Load and run the optimized model on mobile devices
        interpreter = tf.lite.Interpreter(model_path='optimized_model.tflite')
        interpreter.allocate_tensors()

        # Prepare input data for inference
        input_data = data.astype(np.float32)

        # Run inference on the input data
        interpreter.set_tensor(interpreter.get_input_details()[0]['index'], input_data)
        interpreter.invoke()
        output_data = interpreter.get_tensor(interpreter.get_output_details()[0]['index'])

        # Evaluate the model accuracy
        predicted_labels = np.argmax(output_data, axis=1)
        accuracy = np.mean(predicted_labels == labels)

        print(f"Accuracy: {accuracy}")
    """

    # Testing Phase
    # Call Predict function on the test array and get the label
    # Test all the videos for testing and calculate the factors "Precision", "Recall", "F1 Score"
        # Check this article for formulae: https://towardsdatascience.com/a-look-at-precision-recall-and-f1-score-36b5fd0dd3ec

    # Perform inference on testing data
    Y_pred = model.predict(X_test)
    y_test = np.argmax(Y_test, axis=1)
    print(f"\ny_test:\n{y_test}")
    # Convert predicted probabilities to class labels (if needed)
    predicted_labels = np.argmax(Y_pred, axis=1)
    print(f"predicted_labels:\n{predicted_labels}")

    report = classification_report(y_test, predicted_labels)
    print(f"\nModel Report:\n{report}")

    # Save the model as SavedModel Format
    model.save(f"{MODEL_NAME}.h5")

    # To Save the model as HDF5
    # model.save('your_model.h5')

    print(f"Model saved as {MODEL_NAME}...")

    # Define the flexible input shape (min: 1 x 58 x 9, max: 2 x 58 x 9)
    input_shape = ct.Shape(shape=(ct.RangeDim(1, 2), 58, 9))

    # Convert the Keras model to a CoreML model
    coreml_model = ct.convert(
        model,
        inputs=[ct.TensorType(shape=input_shape)]
    )

    # Save the CoreML model to a file
    coreml_model.save('coremodel.mlpackage')

In [47]:
if __name__ == "__main__":
    main()

Reading Keypoint files...
File: videos/Forehand/output/SHOT_NO49.mp4.points
File: videos/Forehand/output/0021.mp4.points
File: videos/Forehand/output/0016.mp4.points
File: videos/Forehand/output/SHOT_NO37.mp4.points
File: videos/Forehand/output/SHOT_NO9.mp4.points
File: videos/Forehand/output/SHOT_NO11.mp4.points
File: videos/Forehand/output/SHOT_NO26.mp4.points
File: videos/Forehand/output/SHOT_NO44.mp4.points
File: videos/Forehand/output/0007.mp4.points
File: videos/Forehand/output/0030.mp4.points
File: videos/Forehand/output/SHOT_NO4.mp4.points
File: videos/Forehand/output/SHOT_NO23.mp4.points
File: videos/Forehand/output/SHOT_NO14.mp4.points
File: videos/Forehand/output/SHOT_NO41.mp4.points
File: videos/Forehand/output/0029.mp4.points
File: videos/Forehand/output/SHOT_NO1.mp4.points
File: videos/Forehand/output/0002.mp4.points
File: videos/Forehand/output/0013.mp4.points
File: videos/Forehand/output/0024.mp4.points
File: videos/Forehand/output/SHOT_NO19.mp4.points
File: videos/Fore

  saving_api.save_model(
When both 'convert_to' and 'minimum_deployment_target' not specified, 'convert_to' is set to "mlprogram" and 'minimum_deployment_target' is set to ct.target.iOS15 (which is same as ct.target.macOS12). Note: the model will not run on systems older than iOS15/macOS12/watchOS8/tvOS15. In order to make your model run on older system, please set the 'minimum_deployment_target' to iOS14/iOS13. Details please see the link: https://apple.github.io/coremltools/docs-guides/source/target-conversion-formats.html
Running TensorFlow Graph Passes: 100%|██████████| 6/6 [00:01<00:00,  4.26 passes/s]
Converting TF Frontend ==> MIL Ops:   0%|          | 0/51 [00:00<?, ? ops/s]Saving value type of int64 into a builtin type of int32, might lose precision!
Converting TF Frontend ==> MIL Ops: 100%|██████████| 14/14 [00:00<00:00, 110376.42 ops/s]
Input ls elem type unknown. Override with <class 'coremltools.converters.mil.mil.types.type_tensor.tensor.<locals>.tensor'>
Converting TF Fr