# Lien vers la base : 
https://www.kaggle.com/datasets/hasyimabdillah/workoutfitness-video

In [15]:
import cv2
import mediapipe as mp
import os
import subprocess

# Initialize MediaPipe body segmentation
mp_selfie_segmentation = mp.solutions.selfie_segmentation.SelfieSegmentation(model_selection=1)

def process_segmentation(input_path, output_path):
    cap = cv2.VideoCapture(input_path)
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    fps = cap.get(cv2.CAP_PROP_FPS)
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH) / 2)  # Reduce the size by half
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT) / 2)  # Reduce the size by half
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        # Convert the BGR image to RGB
        rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

        # Process the image to get the segmentation mask
        result = mp_selfie_segmentation.process(rgb_frame)

        # Extract segmentation mask
        mask = result.segmentation_mask
        condition = mask > 0.1

        # Apply the mask to the frame
        segmented_frame = cv2.bitwise_and(frame, frame, mask=condition.astype('uint8') * 255)

        # Resize the frame
        resized_frame = cv2.resize(segmented_frame, (width, height))

        # Write the frame to the output video
        out.write(resized_frame)

    cap.release()
    out.release()

def process_edges(input_path, output_path):
    cap = cv2.VideoCapture(input_path)
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    fps = cap.get(cv2.CAP_PROP_FPS)
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH) / 2)  # Reduce the size by half
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT) / 2)  # Reduce the size by half
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        # Resize the frame
        resized_frame = cv2.resize(frame, (width, height))

        # Convert to grayscale
        gray_frame = cv2.cvtColor(resized_frame, cv2.COLOR_BGR2GRAY)

        # Apply Canny edge detection
        edges = cv2.Canny(gray_frame, 100, 200)

        # Convert edges to BGR
        edges_bgr = cv2.cvtColor(edges, cv2.COLOR_GRAY2BGR)

        # Write the frame to the output video
        out.write(edges_bgr)

    cap.release()
    out.release()

def convert_to_h264(input_path, output_path):
    command = ['ffmpeg', '-i', input_path, '-vcodec', 'libx264', output_path]
    print(f"Running command: {' '.join(command)}")
    result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    if result.returncode != 0:
        print(f"Error converting to H264: {result.stderr.decode()}")
    else:
        print(f"Successfully converted to H264: {output_path}")

def process_videos_in_folder(input_folder, output_folder):
    for root, dirs, files in os.walk(input_folder):
        for file in files:
            if file.endswith(".mov") or file.endswith(".mp4"):
                input_path = os.path.join(root, file)
                relative_path = os.path.relpath(root, input_folder)
                output_subfolder = os.path.join(output_folder, relative_path)

                if not os.path.exists(output_subfolder):
                    os.makedirs(output_subfolder)

                output_segmentation_path = os.path.join(output_subfolder, f"{os.path.splitext(file)[0]}_segmentation.mp4")
                output_segmentation_h264_path = os.path.join(output_subfolder, f"{os.path.splitext(file)[0]}_segmentation_h264.mp4")
                output_edges_path = os.path.join(output_subfolder, f"{os.path.splitext(file)[0]}_edges.mp4")
                output_edges_h264_path = os.path.join(output_subfolder, f"{os.path.splitext(file)[0]}_edges_h264.mp4")

                print(f"Processing {input_path}...")

                # Process segmentation
                process_segmentation(input_path, output_segmentation_path)
                # Convert segmentation to H264
                convert_to_h264(output_segmentation_path, output_segmentation_h264_path)
                # Process edges
                process_edges(output_segmentation_h264_path, output_edges_path)
                # Convert edges to H264
                convert_to_h264(output_edges_path, output_edges_h264_path)

                # Remove other files
                os.remove(output_segmentation_path)
                os.remove(output_segmentation_h264_path)
                os.remove(output_edges_path)

                print(f"Processed and saved {file} in {output_subfolder}")

if __name__ == "__main__":
    input_folder = "test"  
    output_folder = "FINAL" 

    process_videos_in_folder(input_folder, output_folder)


Processing test\barbell biceps curl\barbell biceps curl_1.mp4...
Running command: ffmpeg -i FINAL\barbell biceps curl\barbell biceps curl_1_segmentation.mp4 -vcodec libx264 FINAL\barbell biceps curl\barbell biceps curl_1_segmentation_h264.mp4
Successfully converted to H264: FINAL\barbell biceps curl\barbell biceps curl_1_segmentation_h264.mp4
Running command: ffmpeg -i FINAL\barbell biceps curl\barbell biceps curl_1_edges.mp4 -vcodec libx264 FINAL\barbell biceps curl\barbell biceps curl_1_edges_h264.mp4
Successfully converted to H264: FINAL\barbell biceps curl\barbell biceps curl_1_edges_h264.mp4
Processed and saved barbell biceps curl_1.mp4 in FINAL\barbell biceps curl
Processing test\barbell biceps curl\barbell biceps curl_10.mp4...
Running command: ffmpeg -i FINAL\barbell biceps curl\barbell biceps curl_10_segmentation.mp4 -vcodec libx264 FINAL\barbell biceps curl\barbell biceps curl_10_segmentation_h264.mp4
Successfully converted to H264: FINAL\barbell biceps curl\barbell biceps cu

In [24]:
import os
import cv2
import numpy as np
import tensorflow as tf

class VideoDataGenerator(tf.keras.utils.Sequence):
    def __init__(self, folder_path, img_size=(112, 112), max_frames=30, batch_size=4, shuffle=True):
        self.folder_path = folder_path
        self.img_size = img_size
        self.max_frames = max_frames
        self.batch_size = batch_size
        self.shuffle = shuffle
        self.classes = os.listdir(folder_path)
        self.filepaths = []
        self.labels = []
        
        for cls in self.classes:
            cls_folder = os.path.join(folder_path, cls)
            videos = os.listdir(cls_folder)
            for video in videos:
                self.filepaths.append(os.path.join(cls_folder, video))
                self.labels.append(self.classes.index(cls))
        
        self.on_epoch_end()
    
    def __len__(self):
        return len(self.filepaths) // self.batch_size
    
    def __getitem__(self, index):
        batch_filepaths = self.filepaths[index*self.batch_size:(index+1)*self.batch_size]
        batch_labels = self.labels[index*self.batch_size:(index+1)*self.batch_size]
        
        X, y = self.__data_generation(batch_filepaths, batch_labels)
        return X, y
    
    def on_epoch_end(self):
        self.indexes = np.arange(len(self.filepaths))
        if self.shuffle:
            np.random.shuffle(self.indexes)
    
    def __data_generation(self, batch_filepaths, batch_labels):
        X = np.zeros((self.batch_size, self.max_frames, self.img_size[0], self.img_size[1], 3), dtype=np.float32)
        y = np.zeros((self.batch_size), dtype=int)  # Changed from np.int to int
        
        for i, (filepath, label) in enumerate(zip(batch_filepaths, batch_labels)):
            cap = cv2.VideoCapture(filepath)
            frames = []
            frame_count = 0
            
            while cap.isOpened() and frame_count < self.max_frames:
                ret, frame = cap.read()
                if not ret:
                    break
                frame = cv2.resize(frame, self.img_size)
                frames.append(frame)
                frame_count += 1
            
            cap.release()
            
            # If less frames, pad with zeros
            while len(frames) < self.max_frames:
                frames.append(np.zeros((self.img_size[0], self.img_size[1], 3), dtype=np.uint8))
            
            X[i] = np.array(frames)
            y[i] = label
        
        X = X / 255.0  # Normalize pixel values
        return X, y

# Load data using generator
data_path = 'FINAL'
img_size = (112, 112)
max_frames = 30
batch_size = 4  # Reduced batch size

train_generator = VideoDataGenerator(data_path, img_size=img_size, max_frames=max_frames, batch_size=batch_size, shuffle=True)
val_generator = VideoDataGenerator(data_path, img_size=img_size, max_frames=max_frames, batch_size=batch_size, shuffle=False)


In [27]:
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, Flatten, Dense, LSTM, TimeDistributed

def create_model(input_shape, num_classes):
    inputs = Input(shape=input_shape)
    
    x = TimeDistributed(Conv2D(16, (3, 3), activation='relu'))(inputs)
    x = TimeDistributed(MaxPooling2D((2, 2)))(x)
    
    x = TimeDistributed(Conv2D(32, (3, 3), activation='relu'))(x)
    x = TimeDistributed(MaxPooling2D((2, 2)))(x)
    
    x = TimeDistributed(Flatten())(x)
    x = LSTM(64)(x)
    
    x = Dense(32, activation='relu')(x)
    outputs = Dense(num_classes, activation='softmax')(x)
    
    model = Model(inputs, outputs)
    return model

input_shape = (max_frames, img_size[0], img_size[1], 3)
num_classes = len(os.listdir(data_path))
model = create_model(input_shape, num_classes)

model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
model.summary()


In [28]:
# Train model using the generator
model.fit(train_generator, epochs=10)


Epoch 1/10


  self._warn_if_super_not_called()


[1m148/148[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m357s[0m 2s/step - accuracy: 0.0382 - loss: 3.1899
Epoch 2/10
[1m148/148[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m338s[0m 2s/step - accuracy: 0.1293 - loss: 2.8522
Epoch 3/10
[1m148/148[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m360s[0m 2s/step - accuracy: 0.2792 - loss: 2.3461
Epoch 4/10
[1m148/148[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m352s[0m 2s/step - accuracy: 0.6080 - loss: 1.5002
Epoch 5/10
[1m148/148[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m351s[0m 2s/step - accuracy: 0.8809 - loss: 0.6884
Epoch 6/10
[1m148/148[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m356s[0m 2s/step - accuracy: 0.9827 - loss: 0.2047
Epoch 7/10
[1m148/148[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m348s[0m 2s/step - accuracy: 1.0000 - loss: 0.0694
Epoch 8/10
[1m148/148[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m341s[0m 2s/step - accuracy: 0.9937 - loss: 0.0593
Epoch 9/10
[1m148/148[0m [32m━━━━━━━━━━━

<keras.src.callbacks.history.History at 0x1e5981761d0>

In [29]:
# Evaluate model
test_loss, test_acc = model.evaluate(val_generator)
print(f"Test accuracy: {test_acc}")

# Save model
model.save('workout_classifier_model.h5')


[1m148/148[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m39s[0m 248ms/step - accuracy: 1.0000 - loss: 0.0082




Test accuracy: 1.0


In [33]:
import os
import cv2
import numpy as np
from tensorflow.keras.models import load_model

# Load the model
model = load_model('workout_classifier_model.h5')

# Preprocess the video
def preprocess_video(video_path, img_size=(112, 112), max_frames=30):
    cap = cv2.VideoCapture(video_path)
    frames = []
    frame_count = 0

    while cap.isOpened() and frame_count < max_frames:
        ret, frame = cap.read()
        if not ret:
            break
        frame = cv2.resize(frame, img_size)
        frames.append(frame)
        frame_count += 1

    cap.release()

    # If less frames, pad with zeros
    while len(frames) < max_frames:
        frames.append(np.zeros((img_size[0], img_size[1], 3), dtype=np.uint8))

    X = np.array(frames, dtype=np.float32)
    X = X / 255.0  # Normalize pixel values
    X = np.expand_dims(X, axis=0)  # Add batch dimension
    return X

# Predict the video
def predict_video(model, video_path, labels, img_size=(112, 112), max_frames=30):
    X = preprocess_video(video_path, img_size, max_frames)
    predictions = model.predict(X)
    predicted_class = np.argmax(predictions, axis=1)
    predicted_label = labels[predicted_class[0]]
    return predicted_label

# Get the labels
data_path = 'FINAL'
labels = os.listdir(data_path)
labels.sort()  # Ensure the list is sorted to match the order of class indices
print(f'Labels: {labels}')

# Example usage
video_path = 'FINAL\\deadlift\\deadlift_7_edges_h264.mp4'
predicted_label = predict_video(model, video_path, labels)
print(f'The predicted label for the video is: {predicted_label}')




Labels: ['barbell biceps curl', 'bench press', 'chest fly machine', 'deadlift', 'decline bench press', 'hammer curl', 'hip thrust', 'incline bench press', 'lat pulldown', 'lateral raise', 'leg extension', 'leg raises', 'plank', 'pull Up', 'push-up', 'romanian deadlift', 'russian twist', 'shoulder press', 'squat', 't bar row', 'tricep Pushdown', 'tricep dips']
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 4s/step
The predicted label for the video is: deadlift
