# Final Product of Swim Pose Recognition & Feedback Program

##### Massive thanks to my mentor Derrick Trinh and my advisor Professor Chang Choo of San Jose State University for their guidance

In [None]:
# importing utility modules
import numpy as np
import tensorflow as tf

import math
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
 
# importing machine learning models for prediction
from sklearn.ensemble import RandomForestRegressor
import xgboost as xgb
from sklearn.linear_model import LinearRegression


from sklearn.ensemble import VotingClassifier

import torch
from torchvision import transforms

from utils.datasets import letterbox
from utils.general import non_max_suppression_kpt
from utils.plots import output_to_keypoint, plot_skeleton_kpts

import matplotlib.pyplot as plt
import cv2
import statistics

## Modified Pose Estimation Algorithm (YOLOv7 Pose Estimation Base)

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

def load_model():
    model = torch.load('yolov7-w6-pose.pt', map_location=device)['model']
    # Put in inference mode
    model.float().eval()

    if torch.cuda.is_available():
        # half() turns predictions into float16 tensors
        # which significantly lowers inference time
        model.half().to(device)
    return model

model = load_model()

In [None]:
def run_inference(image):
    # Resize and pad image
    image = letterbox(image, 960, stride=64, auto=True)[0] # shape: (567, 960, 3)
    # Apply transforms
    image = transforms.ToTensor()(image) # torch.Size([3, 567, 960])
    if torch.cuda.is_available():
        image = image.half().to(device)
    # Turn image into batch
    image = image.unsqueeze(0) # torch.Size([1, 3, 567, 960])
    with torch.no_grad():
        output, _ = model(image)
    return output, image

In [None]:
def draw_keypoints(output, image):
    global fc, c, t, nK, lK, na, nc, e, v
    output = non_max_suppression_kpt(output, 
                                     0.03, # Confidence Threshold
                                     0.2, # IoU Threshold
                                     nc=model.yaml['nc'], # Number of Classes
                                     nkpt=model.yaml['nkpt'], # Number of Keypoints
                                     kpt_label=True)
    #0.03, 0.2
    with torch.no_grad():
        output = output_to_keypoint(output)
        #print(f'Frame Number: {fc}; Data Size: {output.shape}')
    try:
        t = output[0] #retrieves only first skeleton data
        t = t[-36:] #cuts all head keypoints
        #append all nose x coords
        nK.append(t[0])
        #appends all hip x coords
        lK.append(t[18]) #array t is unsorted, the x values is every other starting at index 0 and y values every other starting at 1
        #sets coordinate point as (0,0) if the algorithm is less than 20% confident in its prediction of said joint
        for i in range(0, len(t), 3):
            g = t[i:i+3]
            if g[2] <= 0.20:
                t[i] = 0
                t[i+1] = 0
                v+=1
                e+=1
        t = [x for i, x in enumerate(t) if (i+1)%3 != 0]
        na.append(t)
    except:
        c += 1
    nimg = image[0].permute(1, 2, 0) * 255
    nimg = nimg.cpu().numpy().astype(np.uint8)
    nimg = cv2.cvtColor(nimg, cv2.COLOR_RGB2BGR)
    for idx in range(output.shape[0]):
        plot_skeleton_kpts(nimg, output[idx, 7:].T, 3)
    return nimg

In [None]:
def rotate(coordinates, ang):
    #coordinates = np.array(coords)
    #angles = np.random.uniform(low=-max_angle, high=max_angle)
    angles = np.deg2rad(ang)
    center = np.mean(coordinates, axis=(0, 1))  # Compute the center of rotation

    rotation_matrix = np.array([[np.cos(angles), -np.sin(angles)],
                                [np.sin(angles), np.cos(angles)]])

    rotated_coordinates = np.zeros_like(coordinates)

    for i in range(coordinates.shape[0]):
        for j in range(coordinates.shape[1]):
            # Translate coordinates to the center of rotation
            translated_coord = coordinates[i, j] - center

            # Apply rotation to the translated coordinates
            rotated_coord = np.dot(rotation_matrix, translated_coord.T).T

            # Translate back to the original position
            rotated_coordinates[i, j] = rotated_coord + center

    rotated_coordinates = rotated_coordinates.tolist()
    return rotated_coordinates

In [None]:
nc = 0
fa = []
e=0
v=0
fc = 1
na = []
actualKp = []
c=0
lK = []
nK = []
la = []
def swimPose_estimate(filename, savepath):
    global fc, c, t, nK, lK, na, fa, e, v
    
    cap = cv2.VideoCapture(filename)
    totalFrames = math.floor(int(cap.get(cv2.CAP_PROP_FRAME_COUNT))/32)
    print(f'TF: {totalFrames}')
    i = 0
    fa = []
    e=0
    
    while i < totalFrames:
        na = []
        fc = 0
        nK = []
        lK = []
        c=0
        poseH(filename, "none", i*32)
        print(f'Original data: {c} empty frames')
        cap.release()
        cv2.destroyAllWindows()
        #Perm Rotations
        fc = 0
        c=0
        v=0
        if statistics.median(nK) > statistics.median(lK):
            na = []
            v=0
            #print("needs counterclockwise rotation")
            poseH(filename, "cc", i*32)
            #print("transformation completed")
            print(f'Counterclockwise rotation; {c} empty frames')
            if c <= 5:
                z = np.array(na)
                z = np.reshape(z, (z.shape[0], 12, 2))
                fa.extend(rotate(z, (-90)))
                #print(v)
            else:
                print("too many missing frames, batch discarded")
        else:
            na = []
            #print("needs clockwise rotation")
            poseH(filename, "c", i*32)
            #print("transformation completed")
            print(f'Clockwise rotation; {c} empty frames')
            if c <= 5:
                z = np.array(na)
                z = np.reshape(z, (z.shape[0], 12, 2))
                fa.extend(rotate(z, 90))
                #print(v)
            else:
                print("too many missing frames, batch discarded")
            
        i += 1
        print(f'batch {i} complete')
        
    print("=======================================================")
    print("-----------Skeleton Data Extraction Complete-----------")
    print("=======================================================")

    x = np.array(fa)
    x = np.reshape(x, (x.shape[0], 12, 2))
    print(f'Array shape: {x.shape}')
    np.save(savepath, x)
    print(f'Data saved to: {savepath}')
    print(f'{e} total coordinates voided')

    print("=======================================================")
    cv2.destroyAllWindows()

In [None]:
def poseH(filename, rotation, currentFrame):
    global fc, c, t, nK, lK, na
    cap = cv2.VideoCapture(filename)
    # VideoWriter for saving the video
    fourcc = cv2.VideoWriter_fourcc(*'MP4V')
    out = cv2.VideoWriter('Free_Skel.mp4', fourcc, 30.0, (int(cap.get(3)), int(cap.get(4))))
    cap.set(cv2.CAP_PROP_POS_FRAMES, currentFrame)
    while fc < 32 and cap.isOpened():
        (ret, frame) = cap.read()
        if ret == True:
            frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            if rotation == "cc":
                frame = cv2.rotate(frame, cv2.ROTATE_90_COUNTERCLOCKWISE)
            elif rotation == "c":
                frame = cv2.rotate(frame, cv2.ROTATE_90_CLOCKWISE)
            elif rotation == "none":
                pass
            output, frame = run_inference(frame)
            frame = draw_keypoints(output, frame)
            fc += 1
            frame = cv2.resize(frame, (int(cap.get(3)), int(cap.get(4))))
            if rotation == "cc" or rotation == "c":
                frame = cv2.resize(frame,(720,1280),fx=0,fy=0, interpolation = cv2.INTER_CUBIC)
            else:
                frame = cv2.resize(frame,(1280,720),fx=0,fy=0, interpolation = cv2.INTER_CUBIC)
            out.write(frame)
            cv2.imshow('Pose estimation', frame)
        else:
            break
        if cv2.waitKey(10) & 0xFF == ord('q'):
            break
    cap.release()
    out.release()


## Load Desired Video From Computer

In [None]:
%%time
video = "C:/Users/jonso/OneDrive/Desktop/Testing Data 2.mp4"
path = 'skeleton_data.npy'
swimPose_estimate(video, path)
#counterclockwise

## Model Loading
This program utilizes a decision tree, with 12 total individual models. The axisBC series consists of 5 separately trained ResNET models. The shortAxis series consists of 5 separately trained Convolutional Neural Networks (CNNs) models. The longAxis series consists of 2 separately trained Dense Neural Networks (DNNs). 

In [None]:
longAxisBC2 = tf.keras.models.load_model('testModel76')
longAxisBC3 = tf.keras.models.load_model('testModel77')

shortAxisBC0 = tf.keras.models.load_model('testModel81')
shortAxisBC1 = tf.keras.models.load_model('testModel82')
shortAxisBC2 = tf.keras.models.load_model('testModel83')
shortAxisBC3 = tf.keras.models.load_model('testModel84')
shortAxisBC4 = tf.keras.models.load_model('testModel85')

axisBC0 = tf.keras.models.load_model('testModel90')
axisBC1 = tf.keras.models.load_model('testModel91')
axisBC2 = tf.keras.models.load_model('testModel92')
axisBC3 = tf.keras.models.load_model('testModel93')
axisBC4 = tf.keras.models.load_model('testModel94')

## Decision Tree Build

In [None]:
def longAxisBC(data):
    m0 = longAxisBC2.predict(np.reshape(data, (1, 32, 12, 3)))
    m1 = longAxisBC3.predict(np.reshape(data, (1, 32, 12, 3)))
    conf = [m0, m1]
    conf_array = np.array(conf).reshape(-1, 2)
    # Calculate the average of values at index 0 and index 1
    conf_array = [np.mean(conf_array[:, 0]), np.mean(conf_array[:, 1])]  # Average of index 0 values
    return np.argmax(conf_array)

def shortAxisBC(data):
    m0 = shortAxisBC0.predict(np.reshape(data, (1, 32, 12, 3)))
    m1 = shortAxisBC1.predict(np.reshape(data, (1, 32, 12, 3)))
    m2 = shortAxisBC2.predict(np.reshape(data, (1, 32, 12, 3)))
    m3 = shortAxisBC3.predict(np.reshape(data, (1, 32, 12, 3)))
    m4 = shortAxisBC4.predict(np.reshape(data, (1, 32, 12, 3)))
    conf = [m0, m1, m2, m3, m4]
    num_models = len(conf)  # Number of models

    # Initialize a dictionary to store accumulated class probabilities
    conf_array = np.array(conf).reshape(-1, 2)

    # Calculate the average of values at index 0 and index 1
    conf_array = [np.mean(conf_array[:, 0]), np.mean(conf_array[:, 1])]  # Average of index 0 values
    return np.argmax(conf_array)
def bothAxisBC(data):
    x1 = np.argmax(shortAxisBC0.predict(np.reshape(data, (1, 32, 12, 3))), axis=1)
    x2 = np.argmax(shortAxisBC1.predict(np.reshape(data, (1, 32, 12, 3))), axis=1)
    x3 = np.argmax(shortAxisBC2.predict(np.reshape(data, (1, 32, 12, 3))), axis=1)
    x4 = np.argmax(shortAxisBC3.predict(np.reshape(data, (1, 32, 12, 3))), axis=1)
    x5 = np.argmax(shortAxisBC4.predict(np.reshape(data, (1, 32, 12, 3))), axis=1)
    array = [x1, x2, x3, x4, x5]
    c=0
    count += 1
    for i in array:
        if i == [0]:
            c+=1
        else:
            pass
    if c >=3: #takes the majority vote of the model voting ensemble, returning 0 if Freestyle/Backstroke; returns 1 if Butterfly/Breastroke
        return 0
    else:
        return 1

In [None]:
y_check = []
for data in x_test:
    if bothAxisBC(data) == 0:
        if longAxisBC(data) == 0:
            #Type Check; Free 0 Back 1
            y_check.append(0)
        else:
            y_check.append(1)
    else:
        if shortAxisBC(data) == 0:
            y_check.append(3)
        else:
            y_check.append(4)

## Stroke Feedback Classes
Each of the 4 swim strokes are divided into classes. Within each class are the respective functions taking in keypoint joint coordinates as input, and outputting 0 if there is no error detected and 1 if there is error detected. Depending on the error type and depending on the number of frames with an error or no error present, the batch will be labeled to have error present.

In [None]:
def angle3pt(a, b, c):
    """Counterclockwise angle in degrees by turning from a to c around b
        Returns a float between 0.0 and 360.0"""
    ang = math.degrees(
        math.atan2(c[1]-b[1], c[0]-b[0]) - math.atan2(a[1]-b[1], a[0]-b[0]))
    return ang + 360 if ang < 0 else ang

In [None]:
class freestyle:
    def elbowDrop(data):
        return 0 if not ((60 < angle3pt(data[10], data[8], data[6]) < 300) or (90 < angle3pt(data[11], data[9], data[7]) < 270)) else 1
    def kneeAngle(data):
        return 0 if ((70 < angle3pt(data[6], data[8], data[10]) < 290) or (90 < angle3pt(data[11], data[9], data[7]) < 270)) else 1
    def sinkHip(data):
        return 0 if not ((130 < angle3pt(data[0], data[6], data[8] < 230)) or (130 < angle3pt(data[1], data[7], data[9]) < 230)) else 1

In [None]:
class backstroke:
    def kneeAngle(data):
        return 0 if ((70 < angle3pt(data[6], data[8], data[10]) < 290) or (90 < angle3pt(data[11], data[9], data[7]) < 270)) else 1
    def sinkHip(data):
        return 0 if not ((130 < angle3pt(data[0], data[6], data[8] < 230)) or (130 < angle3pt(data[1], data[7], data[9]) < 230)) else 1
    def straightArm(data):
        return 1 if ((170 < angle3pt(data[0], data[2], data[4]) < 190) or (170 < angle3pt(data[1], data[3], data[5]) < 190)) else  0 

In [None]:
class butterfly:
    def elbowDrop(data):
        return 0 if not ((60 < angle3pt(data[10], data[8], data[6]) < 300) or (90 < angle3pt(data[11], data[9], data[7]) < 270)) else 1
    def kickAngle(data):
        return 0 if ((70 < angle3pt(data[6], data[8], data[10]) < 290) or (90 < angle3pt(data[11], data[9], data[7]) < 270)) else 1
    def chestDown(data):
        return 0 if min(data[0][1], data[1][1]) < min(data[6][1], data[7][1]) else 1
    def legsTogether(data):
        return 0 if not ((30 < angle3pt(data[8], data[6], data[9]) < 330) or (30 < angle3pt(data[8], data[7], data[9]) < 330)) else 1

In [None]:
class breastroke:
    def noKick(data):
        return 0 if ((70 < angle3pt(data[6], data[8], data[10]) < 290) or (90 < angle3pt(data[11], data[9], data[7]) < 270)) else 1

In [None]:
class underwater:
    def legsTogether(data):
        return 0 if not ((30 < angle3pt(data[8], data[6], data[9]) < 330) or (30 < angle3pt(data[8], data[7], data[9]) < 330)) else 1

In [None]:
class dive:
    def hips(data):
        print(data[6][1])
        return 0 if min(data[5][1], data[6][1]) <= min(data[0][1], data[1][1]) else 1
    def kneeAngle(data):
        return 0 if (90 < angle3pt(data[10], data[8], data[6]) < 270) or (90 < angle3pt(data[11], data[9], data[7]) < 270) else 1