# Init

> From Google: https://google.github.io/mediapipe/solutions/pose_classification.html

In [1]:
# preprocessing
import csv
import cv2
import numpy as np
import pandas as pd
import os
import sys
import keras
from tqdm.notebook import tqdm
from mediapipe.python.solutions import drawing_utils as mp_drawing
from mediapipe.python.solutions import pose as mp_pose
import vg
import itertools

# network
from keras.utils import *
from keras.layers import *
from keras.models import *
from keras.optimizers import *
from keras.callbacks import *
import sklearn
import matplotlib.pyplot as plt

Using TensorFlow backend.
  _np_qint8 = np.dtype([("qint8", np.int8, 1)])
  _np_quint8 = np.dtype([("quint8", np.uint8, 1)])
  _np_qint16 = np.dtype([("qint16", np.int16, 1)])
  _np_quint16 = np.dtype([("quint16", np.uint16, 1)])
  _np_qint32 = np.dtype([("qint32", np.int32, 1)])
  np_resource = np.dtype([("resource", np.ubyte, 1)])
  _np_qint8 = np.dtype([("qint8", np.int8, 1)])
  _np_quint8 = np.dtype([("quint8", np.uint8, 1)])
  _np_qint16 = np.dtype([("qint16", np.int16, 1)])
  _np_quint16 = np.dtype([("quint16", np.uint16, 1)])
  _np_qint32 = np.dtype([("qint32", np.int32, 1)])
  np_resource = np.dtype([("resource", np.ubyte, 1)])


In [2]:
# Folder containing the dataset
images_in_folder = "referee_dataset"

# Folder containing the pictures with skeleton on top
images_out_folder = "referee_data_out_basic"

# output csv data
csv_out_path = "referee_data_out.csv"

# check if dataset is in directory
assert os.path.exists("referee_dataset"), "No dataset in directory"

# make dirs if they do not exist
os.makedirs(images_out_folder, exist_ok=True)

# Fetch landmarks

> This function runs blaze pose on the dataset, storing the x y and z coords in a csv file.

In [3]:
def get_landmarks():
    if os.path.exists(csv_out_path):
        return
    with open(csv_out_path, "w", newline="") as csv_out_file:
        csv_out_writer = csv.writer(csv_out_file, delimiter=',', quoting=csv.QUOTE_MINIMAL)

        # Folder names are used as pose class names.
        pose_class_names = sorted([n for n in os.listdir(images_in_folder) if not n.startswith('.')])
        
        for pose_class_name in tqdm(pose_class_names, desc="Bootstrapping\t"):
            if not os.path.exists(os.path.join(images_out_folder, pose_class_name)):
                os.makedirs(os.path.join(images_out_folder, pose_class_name))

            image_names = sorted([
                n for n in os.listdir(os.path.join(images_in_folder, pose_class_name))
                if not n.startswith('.')])
            for image_name in tqdm(image_names, leave=False , desc=pose_class_name+"\t"):
                input_frame = cv2.imread(os.path.join(images_in_folder, pose_class_name, image_name))
                input_frame = cv2.cvtColor(input_frame, cv2.COLOR_BGR2RGB)

                # Initialize fresh pose tracker and run it.
                with mp_pose.Pose() as pose_tracker:
                    result = pose_tracker.process(image=input_frame)
                    pose_landmarks = result.pose_landmarks

                # Save image with pose prediction (if pose was detected).
                output_frame = input_frame.copy()
                if pose_landmarks is not None:
                    mp_drawing.draw_landmarks(
                        image=output_frame,
                        landmark_list=pose_landmarks,
                        connections=mp_pose.POSE_CONNECTIONS)
                output_frame = cv2.cvtColor(output_frame, cv2.COLOR_RGB2BGR)
                cv2.imwrite(os.path.join(images_out_folder, pose_class_name, image_name), output_frame)

                # Save landmarks.
                if pose_landmarks is not None:
                    # Check the number of landmarks and take pose landmarks.
                    assert len(pose_landmarks.landmark) == 33, 'Unexpected number of predicted pose landmarks: {}'.format(len(pose_landmarks.landmark))
                    pose_landmarks = [[lmk.x, lmk.y, lmk.z] for lmk in pose_landmarks.landmark]

                    # Map pose landmarks from [0, 1] range to absolute coordinates to get
                    # correct aspect ratio.
                    frame_height, frame_width = output_frame.shape[:2]
                    pose_landmarks *= np.array([frame_width, frame_height, frame_width])

                    # Write pose sample to CSV.
                    pose_landmarks = np.around(pose_landmarks, 5).flatten().astype(str).tolist()
                    csv_out_writer.writerow([image_name, pose_class_name] + pose_landmarks)
                
get_landmarks()

# Featurizers

> While these landmarks can already work pretty well as features, I also tested some additional preprocessing.

In [None]:
def get_position(data, i):
    return data[i+2:i+5]

def normalise(vector):
    return vector / np.linalg.norm(vector)

def get_angle(data, a, b, c):
    line_1 = get_position(data, a) - get_position(data, b)
    line_2 = get_position(data, c) - get_position(data, b)
    return vg.angle(line_1.astype(float), line_2.astype(float))

def distance(p1, p2):
    squared_dist = np.sum((p1-p2)**2, axis=0)
    return np.sqrt(squared_dist)

In [12]:
# distance based features
def get_embedding_v1(data):
    # restack data in 3d vectors & exclude face landmarks
    data = np.reshape(data, (-1, 3))[11:]
    
    # get all distance combinations
    embedding = np.array([vector - data[0] for vector in data])
    
    # normalise with hips distance
    embedding /= embedding[0]
    return embedding

# angle based features
def get_embedding_v2(data):
    embedding = np.array([get_angle(data, 13, 11, 23),
                          get_angle(data, 15, 13, 11),
                          get_angle(data, 17, 15, 13),
                          get_angle(data, 14, 12, 24),
                          get_angle(data, 16, 14, 12),
                          get_angle(data, 18, 16, 14),
                         ])
    return embedding

# vector based features
def get_embedding_v3(data):
    embedding = np.array([normalise(get_position(data, 13) - get_position(data, 11)),
                          normalise(get_position(data, 15) - get_position(data, 13)),
                          normalise(get_position(data, 17) - get_position(data, 15)),
                          normalise(get_position(data, 14) - get_position(data, 12)),
                          normalise(get_position(data, 16) - get_position(data, 14)),
                          normalise(get_position(data, 18) - get_position(data, 16)),
    ])
    return embedding.flatten()

# distance based features
def get_embedding_v4(data):
    # restack data in 3d vectors & exclude face landmarks
    data = np.reshape(data, (-1, 3))[11:]
    
    # get all distance combinations
    embedding = np.array([distance(p1, p2) for p1, p2 in itertools.combinations(data, 2)])
    
    # normalise with hips distance
    embedding /= embedding[0]
    return embedding

In [None]:
# processes single feature row
def embedder(version, feature_array):
    if version == 0:
        return feature_array
    if version == 1:
        return get_embedding_v1(feature_array)
    elif version  == 2:
        return get_embedding_v2(feature_array)
    elif version  == 3:
        return get_embedding_v2(feature_array)

# Builds feature csv from landmark data
def featurizer(version):
    # return features csv if it already exist
    feature_csv_name = "pose_features_"+ str(version) + ".csv"
    if os.path.exists(feature_csv_name):
        return pd.read_csv(feature_csv_name, index_col=0, header=None)
    
    # load feature data & return dataframe if version = 0
    landmark_data = pd.read_csv("referee_data_out.csv", index_col=0, header=None)
    if version == 0:
        return landmark_data
    
    # write feature csv
    with open(feature_csv_name, "w", newline="") as csv_out_file:
        csv_out_writer = csv.writer(csv_out_file, delimiter=',', quoting=csv.QUOTE_MINIMAL)
        
        for row in landmark_data.iterrows():
            embedding = embedder(version, np.array(row[1])[1:])
            csv_out_writer.writerow(np.append([row[0], row[1][1]], embedding))       
    return pd.read_csv(feature_csv_name, index_col=0, header=None)

# Model

In [5]:
# Randomise and split dataframe into X and Y
def feature_target_split(df, shuffle=True):
    if shuffle:
        df = df.sample(frac = 1)
    
    dataset = df.values
    X = dataset[:,1:].astype(float)
    
    Y = dataset[:,0]
    encoder = sklearn.preprocessing.LabelEncoder()
    encoder.fit(Y)
    Y = to_categorical(encoder.transform(Y))
    return X, Y

def kfold_index(df, k=5):
    N = len(df)
    minimum_number_of_points_per_slice = N // k
    remaining_number_of_points = N % k
    starting_point = 0
    out = []
    for islice in range(0, k):
        end_point = starting_point + minimum_number_of_points_per_slice + ( islice < remaining_number_of_points )
        out.append((starting_point, end_point))
        starting_point = end_point
    return out

In [6]:
mass_data = pd.read_csv("referee_data_out.csv", index_col=0, header=None)
label_count = len(mass_data[1].unique())
id_to_pose = [pose for pose in mass_data[1].unique()]

def two_layer_integrated(X):
    inputs = Input(shape= (X.shape[1]-1,))
    layer = Dense(256, activation="relu")(inputs)
    outputs = Dense(label_count, activation="sigmoid")(layer)
    model = Model(inputs, outputs)
    model.compile(loss = "binary_crossentropy",optimizer = "adam",metrics = ["acc"])
    mc = ModelCheckpoint("best_pose_model.hdf5", monitor="val_loss", verbose=1, save_best_only=True, mode="min")
    return model, mc

In [7]:
from sklearn.model_selection import train_test_split

def train(version, plot=True):
    mass_data = featurizer(version)
    
    # train test split
    X, Y = feature_target_split(mass_data)
    X_train, X_test, Y_train, Y_test = train_test_split(X, Y, test_size=0.20, 
                                                        random_state=12, shuffle=True)
    
    # model training
    model, model_checkpoint = two_layer_integrated(mass_data)
    history = model.fit(X_train, Y_train ,epochs=500, callbacks=[model_checkpoint], batch_size=32, 
                        validation_data=(X_test, Y_test))

    # load the best model weights
    model.load_weights('best_pose_model.hdf5')
    
    # save model
    model.save("pose_model_" + str(version) + ".h5")

    # summarize history for loss
    if plot:
        plt.plot(history.history["acc"])
        plt.plot(history.history["val_acc"])
        plt.title("model accuracy")
        plt.ylabel("accuracy")
        plt.xlabel("epoch")
        plt.legend(["train", "val"], loc="upper left")
        plt.show()
        
    return model

# Detect

> source: https://www.youtube.com/watch?v=06TE_U21FK4

In [18]:
def detect_live(version):
    # load model
    try:
        model = keras.models.load_model("pose_model_" + str(version) + ".h5")
    except:
        model = train(version)

    cap = cv2.VideoCapture(0)
    cap.set(cv2.CAP_PROP_FRAME_WIDTH, 3 * 200)
    cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 4 * 200)
    ## Setup mediapipe instance
    with mp_pose.Pose(min_detection_confidence=0.5, min_tracking_confidence=0.5) as pose:
        while cap.isOpened():
            ret, frame = cap.read()

            # Recolor image to RGB
            image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            image.flags.writeable = False

            # Make detection
            result = pose.process(image)
            pose_landmarks = result.pose_landmarks

            # Recolor back to BGR
            image.flags.writeable = True
            image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)

            # Render detections
            mp_drawing.draw_landmarks(image, result.pose_landmarks, mp_pose.POSE_CONNECTIONS,
                                      mp_drawing.DrawingSpec(color=(245,117,66), thickness=2, circle_radius=2),
                                      mp_drawing.DrawingSpec(color=(245,66,230), thickness=2, circle_radius=2)
                                     )            

            # Save landmarks.
            if pose_landmarks is not None:
                # Check the number of landmarks and take pose landmarks.
                assert len(pose_landmarks.landmark) == 33, 'Unexpected number of predicted pose landmarks: {}'.format(len(pose_landmarks.landmark))
                pose_landmarks = [[lmk.x, lmk.y, lmk.z] for lmk in pose_landmarks.landmark]
                frame_height, frame_width = image.shape[:2]
                pose_landmarks *= np.array([frame_width, frame_height, frame_width])
                #pose_landmarks = np.around(pose_landmarks, 5).flatten().astype(str).tolist()
                pose_landmarks = np.around(pose_landmarks, 5).flatten()

                # predicting
                features = embedder(version, pose_landmarks)
                #print(features)
                features = np.expand_dims(features, axis=0).astype(float)
                prediction_list = list(model.predict(features, verbose=0)[0])
                prediction_id = prediction_list.index(max(prediction_list))

                # show on screen
                font = cv2.FONT_HERSHEY_SIMPLEX
                text = 'Pose Id: ' + id_to_pose[prediction_id] + " " + str(round(max(prediction_list)*100)) + "%"
                cv2.putText(image, text, (50, 50), font, 1, (255, 0, 0),2, cv2.LINE_4)

            cv2.imshow('Mediapipe Feed', image)

            if cv2.waitKey(10) & 0xFF == ord('q'):
                break

        cap.release()
        cv2.destroyAllWindows()
        
detect_live(0)