In [2]:
import mediapipe as mp
import cv2
import pandas as pd
import pickle
import numpy as np
import csv
import seaborn as sns
import matplotlib.pyplot as plt

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.calibration import CalibratedClassifierCV
from sklearn.linear_model import LogisticRegression, SGDClassifier
from sklearn.svm import SVC
from sklearn.neighbors import KNeighborsClassifier
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.naive_bayes import GaussianNB

from sklearn.metrics import precision_score, accuracy_score, f1_score, recall_score, confusion_matrix, roc_curve, auc

import warnings
warnings.filterwarnings('ignore')

# Drawing helpers
mp_drawing = mp.solutions.drawing_utils
mp_pose = mp.solutions.pose

In [3]:
# Determine important landmarks for squat
IMPORTANT_LMS = [
    "LEFT_SHOULDER",
    "RIGHT_SHOULDER",
    "LEFT_HIP",
    "RIGHT_HIP",
    "LEFT_WRIST",
    "RIGHT_WRIST",
    "LEFT_KNEE",
    "RIGHT_KNEE",
    "LEFT_ANKLE",
    "RIGHT_ANKLE"
]

# Generate all columns of the data frame

landmarks = ["label"] # Label column

for lm in IMPORTANT_LMS:
    landmarks += [f"{lm.lower()}_x", f"{lm.lower()}_y", f"{lm.lower()}_z", f"{lm.lower()}_v"]

In [4]:
def rescale_frame(frame, percent=50):
    '''
    Rescale a frame to a certain percentage compare to its original frame
    '''
    width = int(frame.shape[1] * percent/ 100)
    height = int(frame.shape[0] * percent/ 100)
    dim = (width, height)
    return cv2.resize(frame, dim, interpolation =cv2.INTER_AREA)
    

def init_csv(dataset_path: str):
    '''
    Create a blank csv file with just columns
    '''

    # Write all the columns to a file
    with open(dataset_path, mode="w", newline="") as f:
        csv_writer = csv.writer(f, delimiter=",", quotechar='"', quoting=csv.QUOTE_MINIMAL)
        csv_writer.writerow(landmarks)

        
def export_landmark_to_csv(dataset_path: str, results, action: str) -> None:
    '''
    Export Labeled Data from detected landmark to csv
    '''
    landmarks = results.pose_landmarks.landmark
    keypoints = []

    try:
        # Extract coordinate of important landmarks
        for lm in IMPORTANT_LMS:
            keypoint = landmarks[mp_pose.PoseLandmark[lm].value]
            keypoints.append([keypoint.x, keypoint.y, keypoint.z, keypoint.visibility])
        
        keypoints = list(np.array(keypoints).flatten())

        # Insert action as the label (first column)
        keypoints.insert(0, action)

        # Append new row to .csv file
        with open(dataset_path, mode="a", newline="") as f:
            csv_writer = csv.writer(f, delimiter=",", quotechar='"', quoting=csv.QUOTE_MINIMAL)
            csv_writer.writerow(keypoints)
        

    except Exception as e:
        print(e)
        pass


def concat_csv_files_with_same_headers(file_paths: list, saved_path: str):
    '''
    Concat different csv files
    '''
    all_df = []
    for path in file_paths:
        df = pd.read_csv(path, index_col=None, header=0)
        all_df.append(df)
    
    results = pd.concat(all_df, axis=0, ignore_index=True)
    results.to_csv(saved_path, sep=',', encoding='utf-8', index=False)


def describe_dataset(dataset_path: str):
    ''''''

    data = pd.read_csv(dataset_path)
    print(f"Headers: {list(data.columns.values)}")
    print(f'Number of rows: {data.shape[0]} \nNumber of columns: {data.shape[1]}\n')
    print(f"Labels: \n{data['label'].value_counts()}\n")
    print(f"Missing values: {data.isnull().values.any()}\n")
    duplicate = data[data.duplicated()]
    print(f"Duplicate Rows : {duplicate.sum(axis=1)}")

    return data


def round_up_metric_results(results) -> list:
    '''Round up metrics results such as precision score, recall score, ...'''
    return list(map(lambda el: round(el, 3), results))

In [5]:
DATASET_PATH = "./train.csv"
init_csv(DATASET_PATH)

In [6]:
import glob 
videos = glob.glob(f"D:\\University\\Semester5\\PBL4-AIPT\\Data\\squat_dataset\\good\\1115_video\\*.mp4")

In [7]:
videos

['D:\\University\\Semester5\\PBL4-AIPT\\Data\\squat_dataset\\good\\1115_video\\0918_squat_000001.mp4',
 'D:\\University\\Semester5\\PBL4-AIPT\\Data\\squat_dataset\\good\\1115_video\\0918_squat_000003.mp4',
 'D:\\University\\Semester5\\PBL4-AIPT\\Data\\squat_dataset\\good\\1115_video\\0918_squat_000004.mp4',
 'D:\\University\\Semester5\\PBL4-AIPT\\Data\\squat_dataset\\good\\1115_video\\0918_squat_000005.mp4',
 'D:\\University\\Semester5\\PBL4-AIPT\\Data\\squat_dataset\\good\\1115_video\\0918_squat_000006.mp4',
 'D:\\University\\Semester5\\PBL4-AIPT\\Data\\squat_dataset\\good\\1115_video\\0918_squat_000007.mp4',
 'D:\\University\\Semester5\\PBL4-AIPT\\Data\\squat_dataset\\good\\1115_video\\0918_squat_000009.mp4',
 'D:\\University\\Semester5\\PBL4-AIPT\\Data\\squat_dataset\\good\\1115_video\\0918_squat_000044.mp4',
 'D:\\University\\Semester5\\PBL4-AIPT\\Data\\squat_dataset\\good\\1115_video\\0918_squat_000045.mp4',
 'D:\\University\\Semester5\\PBL4-AIPT\\Data\\squat_dataset\\good\\1115_v

In [8]:
train_set, test_set = train_test_split(videos, test_size=0.2, random_state=42)

In [9]:
for video in train_set : 
     cap = cv2.VideoCapture(video)
     up_save_count = 0
     down_save_count = 0

     # init_csv(DATASET_PATH)

     with mp_pose.Pose(min_detection_confidence=0.5, min_tracking_confidence=0.5) as pose:
          while cap.isOpened():
               ret, image = cap.read()

               if not ret:
                    break
               
               # Reduce size of a frame
               image = rescale_frame(image, 60)
               image = cv2.flip(image, 1)

               # Recolor image from BGR to RGB for mediapipe
               image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
               image.flags.writeable = False

               results = pose.process(image)
               if not results.pose_landmarks: 
                    break
               # Recolor image from BGR to RGB for mediapipe
               image.flags.writeable = True
               image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)

               # Draw landmarks and connections
               mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_pose.POSE_CONNECTIONS, mp_drawing.DrawingSpec(color=(244, 117, 66), thickness=2, circle_radius=4), mp_drawing.DrawingSpec(color=(245, 66, 230), thickness=2, circle_radius=2))

               # Display the saved count
               cv2.putText(image, f"UP saved: {up_save_count}", (50, 150), cv2.FONT_HERSHEY_COMPLEX, 2, (255, 255, 255), 1, cv2.LINE_AA)
               cv2.putText(image, f"DOWN saved: {down_save_count}", (50, 200), cv2.FONT_HERSHEY_COMPLEX, 2, (255, 255, 255), 1, cv2.LINE_AA)

               cv2.imshow("CV2", image)

               # Pressed key for action
               k = cv2.waitKey(1) & 0xFF

               if k == ord('d'): 
                    export_landmark_to_csv(DATASET_PATH, results, "down")
                    down_save_count += 1
               elif k == ord("u"):
                    export_landmark_to_csv(DATASET_PATH, results, "up")
                    up_save_count += 1
               # Press q to stop
               elif k == ord("q"):
                    break
               else: continue

          cap.release()
          cv2.destroyAllWindows()

     # (Optional)Fix bugs cannot close windows in MacOS (https://stackoverflow.com/questions/6116564/destroywindow-does-not-close-window-on-mac-using-python-and-opencv)
     for i in range (1, 5):
          cv2.waitKey(1)
          

KeyboardInterrupt: 

: 