In [4]:
import tensorflow as tf
import matplotlib.pyplot as plt
import numpy as np
import os 
import cv2
import json
from random import shuffle
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, Activation, Flatten, Input
from tensorflow.keras import activations
from tensorflow.keras.callbacks import TensorBoard
from sklearn.metrics import confusion_matrix, accuracy_score
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
import time
from test import validate_config

In [None]:
GOOD_POSE_DIR = 'good_pose'
BAD_POSE_DIR = 'bad_pose'
IMG_SIZE = 256

In [None]:
def label_image(image):
    """Function to encode and give a label to an image.
    
    Arguments:
        image {string} -- Represents the image name.

    Returns
        integer -- 0/1 encoding for a bad/good pose image.
    """
    world_label = image.split('_')[0]
    if world_label == 'good':
        return 1
    return 0

In [None]:
def process_data(form):
    """Function to process the data and save it in a numpy array form (friendlier to neural networks).

    Arguments:
        form {string} -- represents the type of images it receives.
    """
    data = []
    if form == 'good':
        dir = os.listdir(GOOD_POSE_DIR)
    else:
        dir = os.listdir(BAD_POSE_DIR)

    for image in dir:
        label = label_image(image)
        if form == 'good':
            path = os.path.join(GOOD_POSE_DIR, image)
        else:
            path = os.path.join(BAD_POSE_DIR, image)
        img = cv2.resize(cv2.imread(path, cv2.IMREAD_COLOR), (IMG_SIZE, IMG_SIZE))
        data.append([np.array(img), label])
    
    # Saves the data in numpy form.
    if form == 'good':
        np.save('data\good_pose_data.npy', data)
    else:
        np.save('data\\bad_pose_data.npy', data)

In [None]:
# Loading the data
good_pose = np.load('data\good_pose_data.npy', allow_pickle=True)
bad_pose = np.load('data\\bad_pose_data.npy', allow_pickle=True)

In [None]:
# Mixing the good pose data and bad pose data
good_and_bad_pose = np.concatenate((good_pose, bad_pose), axis=0)

# Split the data in train/test.
data = good_and_bad_pose[:, 0]
labels = good_and_bad_pose[:, 1]

X_train, X_valid, y_train, y_valid = train_test_split(data, labels, train_size=0.8, random_state=4)

y_train = np.asarray(y_train).astype(np.float32)
y_valid = np.asarray(y_valid).astype(np.float32)

## Running Openpose over the images to get the data

In [None]:
BODY_PARTS = { "Nose": 0, "Neck": 1, "RShoulder": 2, "RElbow": 3, "RWrist": 4,
               "LShoulder": 5, "LElbow": 6, "LWrist": 7, "RHip": 8, "RKnee": 9,
               "RAnkle": 10, "LHip": 11, "LKnee": 12, "LAnkle": 13, "REye": 14,
               "LEye": 15, "REar": 16, "LEar": 17, "Background": 18 }

In [None]:
# Load the weigths
net = cv2.dnn.readNetFromTensorflow('graph_opt.pb')

In [None]:
def get_openpose_data(frame, thr=0.2):
    """Function to run the openpose model over an image and get the coordinates for all 19 keypoints.
    
    Arguments:
        frame {list} -- The image that the openpose model is run over.
    
    Returns:
        list -- A list that consists of coordinates of the keypoints.
    """
    frameWidth = frame.shape[1]
    frameHeight = frame.shape[0]
    
    net.setInput(cv2.dnn.blobFromImage(frame, 1.0, (IMG_SIZE, IMG_SIZE), (127.5, 127.5, 127.5), swapRB=True, crop=False))
    out = net.forward()
    out = out[:, :19, :, :]

    assert(len(BODY_PARTS) == out.shape[1])

    points = []
    for i in range(len(BODY_PARTS)):
        heatMap = out[0, i, :, :]

        _, conf, _, point = cv2.minMaxLoc(heatMap)
        x = (frameWidth * point[0]) / out.shape[3]
        y = (frameHeight * point[1]) / out.shape[2]

        points.append([int(x), int(y)] if conf > thr else None)

    return points

In [None]:
def process_openpose_data(data, scaler, fit=True):
    """Functions that is processing the data output of openpose.

    Arguments:
        data {list} -- List of keypoints.
        scaler {object} -- Scikit-learn scaler.
        fit {boolean} -- Boolean that tells the function to fit the data before transforming.

    Returns:
        {numpy.array} -- Processed data.
    """
    x_data = []

    for image in data:
        openpose_data = get_openpose_data(image)
        for idx, coords in enumerate(openpose_data):
            if coords == None:
                openpose_data[idx] = [0, 0]
        x_data.append(openpose_data[1:5])
    
    x_data = np.array(x_data).astype(float)
    # Reshape so we can fit the scaler on our data.
    x_data = x_data.reshape(x_data.shape[0], x_data.shape[1] * 2)
    
    # Normalize the data
    if fit == True:
        scaler.fit(x_data)
    x_data = scaler.transform(x_data)
    return x_data

In [None]:
# def visualize_performance(model, f1, loss):

In [None]:
scaler = MinMaxScaler()
X_train_openpose = process_openpose_data(X_train, scaler, fit=True)
X_valid_openpose = process_openpose_data(X_valid, scaler, fit=False)

In [None]:
# Visualizing the keypoints
for coords in X_train_openpose:
    for coord, body_part in zip(coords, list(BODY_PARTS.keys())[1:5]):
        print(f'{body_part} -> {coord}')
    print('\n')

## The fully conected model that takes the keypoint coordinates

In [None]:
NAME = f'openpose_bicepscurl_4keypoints_newdata_{int(time.time())}'
tensorboard = TensorBoard(Logdir=f'logs/{NAME}')

In [None]:
model = Sequential()
model.add(Input(shape=(8)))
model.add(Flatten())
model.add(Dense(100, activation='relu'))
model.add(Dense(100, activation='relu'))
model.add(Dense(100, activation='relu'))
model.add(Dense(1, activation='sigmoid'))

optimizer = tf.keras.optimizers.Adam(lr=0.0001)
model.compile(loss='binary_crossentropy',
                optimizer=optimizer,
                metrics=['accuracy'])

In [None]:
NAME = f'openpose_bicepscurl_4keypoints_newdata2_{int(time.time())}'
tensorboard = TensorBoard(log_dir=f'logs2/{NAME}')
model.fit(X_train_openpose, y_train, epochs=200, callbacks=[tensorboard], validation_split=0.2)

In [None]:
output = model.predict(X_valid_openpose, callbacks=[tensorboard])

In [None]:
print(confusion_matrix(y_valid, output.round()))
print(accuracy_score(y_valid, output.round()))

In [None]:
# Saving the model
model.save('saved_models\openpose_bicepscurl_8keypoints_lr0.0001')

## Working on the video preprocessing

In [None]:
model = tf.keras.models.load_model('saved_models\openpose_bicepscurl_8keypoints_newdata')

In [None]:
def fragment_video(video_name, save_location, interval):
    """Function to fragment video into frames at a given interval of time
        and saves the in a given directory.

    Arguments:
        video_name {string} -- Path to the video.
        save_location {string} -- Path to the save location.
        interval {integer} -- Interval in ms.
    """
    vidcap = cv2.VideoCapture(video_name)
    success, image = vidcap.read()

    # Change the current directory if we are not already there.
    cwd = os.getcwd()
    if cwd != save_location:
        os.chdir(save_location)

    # While we have frames, we read and save them.
    count = 1
    while success:
        vidcap.set(cv2.CAP_PROP_POS_MSEC, (count*interval))
        cv2.imwrite(f'frame{count}.jpg', image)
        success, image = vidcap.read()
        count += 1

    # Change the directory back.
    os.chdir(cwd)

In [None]:
def read_frames(location):
    """Functions that reads the video frames from a director.

    Arguments:
        location {string} -- Path to the directory where the frames are located at.

    Returns:
        {list} -- Resized video frames.
    """
    frames = []
    directory = os.listdir(location)

    for frame in directory:
        path = os.path.join(location, frame)
        frame = cv2.resize(cv2.imread(path, cv2.IMREAD_COLOR), (IMG_SIZE, IMG_SIZE))
        frames.append(frame)
    return np.array(frames).astype(np.float32)

In [None]:
def delete_frames(location):
    """Functions that deletes the frames that we generated to free the memory.

    Arguments:
        location {string} -- Path to the directory where the frames are located at.
    """
    directory = os.listdir(location)

    for frame in directory:
        path = os.path.join(location, frame)
        os.remove(path)

In [None]:
fragment_video('D:\College\coding (Python)\OpenPose - Biceps Curl\\videos\\2.mp4', 'frames', 250)
input_data = read_frames('frames')
delete_frames('frames')

In [None]:
scaler = MinMaxScaler()
x_valid_video = process_openpose_data(input_data, scaler, fit=True)

In [None]:
output = model.predict(x_valid_video)

## Transforming the model into a tensorflow lite model

In [None]:
def convert_to_tflite(model):
    """Functions that converts our keras model to a tensorflow lite model.

    Arguments:
        model {object} -- Keras model that needs to be converted.
    """
    converter = tf.lite.TFLiteConverter.from_keras_model(model)
    tflite_model = converter.convert()

    with open('tflite_models\model.tflite', 'wb') as f:
        f.write(tflite_model)

In [None]:
convert_to_tflite(model)

## Parsing the config file and building the model from it.

In [5]:
def parse_config_file(location):
    with open(location, 'r') as f:
        model_json = json.load(f)
    return model_json

In [None]:
def build_model_from_json(location):
    # Read the json file
    model_json = parse_config_file(location)
    print(model_json)

    # Validate the file
    if not validate_config(model_json):
        raise('Config file is not valid')

    # Create the keras model
    model = Sequential()

    # Input Layers
    model.add(Input(shape=model_json['Input']['shape']))
    if model_json['Input']['flatten'] == True:
        model.add(Flatten())

    # Hidden Layers
    for layer in model_json['Layers']:
        if layer['type'] == 'dense':
            model.add(Dense(layer['neurons'], activation=layer['activation']))
        # else if layer['type'] == 'conv2d':
            # model.add(Conv2d(layer['neurons'], activation=layer['activation']))
    
    # Optimizer
    if model_json['Optimizer']['name'] == 'Adam':
        optimizer = tf.keras.optimizers.Adam(lr=model_json['Optimizer']['learning_rate'])

    # Loss and metrics
    m = []
    for metric in model_json['Metrics'].values():
        m.append(metric)

    model.compile(loss=model_json['Loss_function']['name'],
                optimizer=optimizer,
                metrics=m)
    
    return model

In [None]:
model = build_model_from_json('model_config_files\model_4dense_bc_4keypoints.hjson')