In [9]:
import cv2
import tqdm
import pickle
import numpy as np
import matplotlib.pyplot as plt
import os
from keras.utils import to_categorical
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
import tensorflow as tf


In [10]:


dataset = {
    "dataset_path": os.path.join("dataset", "Fit3D Video Dataset"),
    "sequence_length": 75,
    "resize_width": 128,
    "resize_height": 128,
    "color_channels": 1,
    "validation_ratio": 0.2,
    "test_ratio": 0.2
}

In [11]:
def get_mean_std_frame(self, frames):
       mean = tf.math.reduce_mean(frames)
       std = tf.math.reduce_std(tf.cast(frames, tf.float32))
       return tf.cast((frames - mean), tf.float32) / std

In [13]:
def process_frame(frame):
       resized_frame = tf.image.resize(frame, [dataset["resize_height"], dataset["resize_width"]])
       gray_scaled_image = tf.image.rgb_to_grayscale(resized_frame)
       gray_scaled_image = tf.cast(gray_scaled_image, tf.float32)
       normalized_frame = gray_scaled_image / 255.0
       
       return normalized_frame

In [19]:

def read_video(video_path):
    video_reader = cv2.VideoCapture(video_path)
    frames = []
    if not video_reader.isOpened():
        print("Error: Could not open video.")
        return
    
    frame_count = int(video_reader.get(cv2.CAP_PROP_FRAME_COUNT))
    skip_frames_window = max(int(frame_count / dataset["sequence_length"]), 1)
    
    with tqdm.tqdm(bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_noinv_fmt}]", 
                    desc="Reading sequence frames...", unit=" frames", total=dataset["sequence_length"], leave=False) as pbar_frames:
        for frame_index in range(dataset["sequence_length"]):
            video_reader.set(cv2.CAP_PROP_POS_FRAMES, (frame_index * skip_frames_window))
            
            success, frame = video_reader.read()
        
            if frame is None:
                break
        
            if not success:
                print("Error: Could not read frame.")
                break
            
            processed_frame = process_frame(frame)
            frames.append(processed_frame)
            pbar_frames.update(1)
            
    video_reader.release()
    return get_mean_std_frame(frames)

In [15]:
PATH="dataset/Fit3D Video Dataset"
tagged_paths = {}
categories = os.listdir(PATH)
categories 

['Deadlift',
 'Walk The Box',
 'Barbell Shrug',
 '.DS_Store',
 'W Raise',
 'Mule Kick',
 'Squat',
 'Standing Ab Twists',
 'Clean And Press',
 'Band Pull Apart',
 'Overhead Extension Thruster',
 'Man Maker',
 'Pushup',
 'One Arm Row',
 'Side Lateral Raise',
 'Burpees']

In [20]:
tagged_paths = {}
categories = os.listdir(dataset["dataset_path"])
for category in categories:
    if category == '.DS_Store':
        continue  # Skip this item
    if category not in tagged_paths:
        tagged_paths[category] = []
    print(f"Processing category: {category}")    
    folder_path = os.path.join(dataset["dataset_path"], category)
    video_list = os.listdir(folder_path)
    for video in video_list:
        if video == '.DS_Store':
            continue  # Skip this item
        video_path = os.path.join(folder_path, video)
        tagged_paths[category].append(tf.convert_to_tensor(video_path))
        print(video_list)

Processing category: Deadlift
['deadlift_17.mp4', 'deadlift_16.mp4', 'deadlift_14.mp4', 'deadlift_28.mp4', 'deadlift_29.mp4', 'deadlift_15.mp4', 'deadlift_11.mp4', 'deadlift_10.mp4', 'deadlift_12.mp4', 'deadlift_13.mp4', 'deadlift_9.mp4', 'deadlift_8.mp4', 'deadlift_1.mp4', 'deadlift_3.mp4', 'deadlift_2.mp4', 'deadlift_6.mp4', 'deadlift_7.mp4', 'deadlift_5.mp4', 'deadlift_4.mp4', 'deadlift_22.mp4', 'deadlift_23.mp4', 'deadlift_35.mp4', 'deadlift_21.mp4', 'deadlift_20.mp4', 'deadlift_34.mp4', 'deadlift_18.mp4', 'deadlift_30.mp4', 'deadlift_24.mp4', 'deadlift_25.mp4', 'deadlift_31.mp4', 'deadlift_19.mp4', 'deadlift_27.mp4', 'deadlift_33.mp4', 'deadlift_32.mp4', 'deadlift_26.mp4']
['deadlift_17.mp4', 'deadlift_16.mp4', 'deadlift_14.mp4', 'deadlift_28.mp4', 'deadlift_29.mp4', 'deadlift_15.mp4', 'deadlift_11.mp4', 'deadlift_10.mp4', 'deadlift_12.mp4', 'deadlift_13.mp4', 'deadlift_9.mp4', 'deadlift_8.mp4', 'deadlift_1.mp4', 'deadlift_3.mp4', 'deadlift_2.mp4', 'deadlift_6.mp4', 'deadlift_7.mp

In [21]:
tagged_paths.items()

dict_items([('Deadlift', [<tf.Tensor: shape=(), dtype=string, numpy=b'dataset/Fit3D Video Dataset/Deadlift/deadlift_17.mp4'>, <tf.Tensor: shape=(), dtype=string, numpy=b'dataset/Fit3D Video Dataset/Deadlift/deadlift_16.mp4'>, <tf.Tensor: shape=(), dtype=string, numpy=b'dataset/Fit3D Video Dataset/Deadlift/deadlift_14.mp4'>, <tf.Tensor: shape=(), dtype=string, numpy=b'dataset/Fit3D Video Dataset/Deadlift/deadlift_28.mp4'>, <tf.Tensor: shape=(), dtype=string, numpy=b'dataset/Fit3D Video Dataset/Deadlift/deadlift_29.mp4'>, <tf.Tensor: shape=(), dtype=string, numpy=b'dataset/Fit3D Video Dataset/Deadlift/deadlift_15.mp4'>, <tf.Tensor: shape=(), dtype=string, numpy=b'dataset/Fit3D Video Dataset/Deadlift/deadlift_11.mp4'>, <tf.Tensor: shape=(), dtype=string, numpy=b'dataset/Fit3D Video Dataset/Deadlift/deadlift_10.mp4'>, <tf.Tensor: shape=(), dtype=string, numpy=b'dataset/Fit3D Video Dataset/Deadlift/deadlift_12.mp4'>, <tf.Tensor: shape=(), dtype=string, numpy=b'dataset/Fit3D Video Dataset/De

In [22]:
included_video_paths = []
labels = []
video_frames = [] 

total_videos = sum([len(v) for k, v in tagged_paths.items()])
with tqdm.tqdm(bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_noinv_fmt}]",
                desc="Saving videos...", unit=" videos", total=total_videos, leave=False) as pbar_full:
    label_count = len(tagged_paths)
    
    with tqdm.tqdm(bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_noinv_fmt}]", 
                    desc="Reading folders...", unit=" folders", total=label_count, leave=False) as pbar_folders:
        for category_index, (category, video_path_list) in enumerate(tagged_paths.items()):
            pbar_folders.set_description(f"Reading folder: \"{category}\"")
            
            video_count = len(video_path_list)
            with tqdm.tqdm(bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_noinv_fmt}]", 
                            desc="Reading videos...", unit= " videos", total=video_count, leave=False) as pbar_videos:
                for video_path in video_path_list:
                    decoded_video_path = video_path.numpy().decode("utf-8")
                    print(decoded_video_path)
                    dataset_tag, main_folder, category, video_name = decoded_video_path.split('/')
                    pbar_videos.set_description(f"Reading video: \"{video_name}\"")
                    print(decoded_video_path)
                    current_video_frames = read_video(decoded_video_path)
                    if len(current_video_frames) == dataset["sequence_length"]:
                        labels.append(category_index)
                        video_frames.append(current_video_frames)
                        included_video_paths.append(video_path)
                    
                    pbar_videos.update(1)
                    pbar_full.update(1)
                    
            pbar_folders.update(1)
                    
        
        print("Dataset read successfully.")
        

Saving videos...:   0%|          | 0/525 [00:00<?, ? videos/s]
[A
[A

dataset/Fit3D Video Dataset/Deadlift/deadlift_17.mp4
dataset/Fit3D Video Dataset/Deadlift/deadlift_17.mp4




[A[A

[A[A
                                                              

TypeError: process_frame() missing 1 required positional argument: 'frame'

In [None]:
self.labeled_video_paths, self.labels, self.videos, self.included_video_paths = self.read_dataset()
        
self.videos = np.asarray(self.videos)

label_encoder = LabelEncoder()
self.labels = label_encoder.fit_transform(self.labels)
self.labels = to_categorical(self.labels, num_classes=len(self.labeled_video_paths))

self.X_train, self.X_test, self.Y_train, self.Y_test = train_test_split(self.videos, self.labels, test_size=self.TEST_RATIO, shuffle=True)
self.X_train, self.X_validation, self.Y_train, self.Y_validation = train_test_split(self.X_train, self.Y_train, test_size=self.VALIDATION_RATIO, shuffle=True)

self.X_train = np.asarray(self.X_train)
self.X_validation = np.asarray(self.X_validation)
self.X_test = np.asarray(self.X_test)
self.Y_train = np.asarray(self.Y_train)
self.Y_validation = np.asarray(self.Y_validation)
self.Y_test = np.asarray(self.Y_test)

**Training model**

In [None]:
from keras.models import Sequential
from keras.layers import Conv3D
from keras.layers import MaxPooling3D
from keras.layers import LSTM
from keras.layers import TimeDistributed
from keras.layers import Flatten
from keras.layers import Dense
from keras.layers import Dropout
model = Sequential()
        
model.add(Conv3D(32,kernel_size=(3,3,3),activation='relu',input_shape=(75,128,128,1))),

model.add(MaxPooling3D(pool_size=(2,2,2))),

model.add(Conv3D(64,kernel_size=(3,3,3),activation='relu')),

model.add(MaxPooling3D(pool_size=(2,2,2))),

model.add(Conv3D(128,kernel_size=(3,3,3),activation='relu')),

model.add(MaxPooling3D(pool_size=(2,2,2))),

model.add(Conv3D(256,kernel_size=(3,3,3),activation='relu')),

model.add(MaxPooling3D(pool_size=(2,2,2)))

model.add(TimeDistributed(Flatten()))

model.add(LSTM(128))

model.add(Dense(512,activation='relu'))

model.add(Dropout(0.5))

model.add(Dense(47,activation='softmax'))

model.summary()