Step 1: Upload or Collect Video Dataset

In [None]:
!pip install mediapipe
!pip install pandas
!pip install  numpy



In [None]:
from google.colab import drive
import os
drive.mount('/content/drive')
output_csv = '/content/drive/MyDrive/output/keypoints6.csv'
data_path = '/content/drive/MyDrive/SportActivityDataset/'
activities = os.listdir(data_path)
print(activities)

Mounted at /content/drive
['push-up', 'shoulder press', 'squat', 'barbell biceps curl', 'lstm_model222.keras', 'lstm_model29.keras', 'lstm_model33.keras', 'model12.keras', 'model13.keras']


In [None]:


import cv2
import numpy as np
import pandas as pd
import mediapipe as mp
import os
import csv

# Initialisation MediaPipe
mp_pose = mp.solutions.pose
pose = mp_pose.Pose()

# les Landmarks importants
IMPORTANT_LMS = [
    "NOSE",
    "LEFT_SHOULDER",
    "RIGHT_SHOULDER",
    "RIGHT_ELBOW",
    "LEFT_ELBOW",
    "RIGHT_WRIST",
    "LEFT_WRIST",
    "LEFT_HIP",
    "RIGHT_HIP","RIGHT_KNEE","LEFT_KNEE","RIGHT_ANKLE","LEFT_ANKLE"
]
#les  angles importants
ANGLE_JOINTS = [
    ("LEFT_SHOULDER", "LEFT_ELBOW", "LEFT_WRIST"),
    ("RIGHT_SHOULDER", "RIGHT_ELBOW", "RIGHT_WRIST"),
    ("LEFT_HIP", "LEFT_KNEE", "LEFT_ANKLE"),
    ("RIGHT_HIP", "RIGHT_KNEE", "RIGHT_ANKLE")
]
# Création les colonnes du CSV
angle_headers = [f"{a}_{b}_{c}_angle" for a, b, c in ANGLE_JOINTS]
HEADERS = ["label"]

for lm in IMPORTANT_LMS:
    HEADERS += [f"{lm.lower()}_x", f"{lm.lower()}_y", f"{lm.lower()}_z", f"{lm.lower()}_v"]

HEADERS = HEADERS+ angle_headers



In [None]:

import cv2
#fonction to rescale the frame
def rescale_frame(frame, percent=50):
    width = int(frame.shape[1] * percent / 100)
    height = int(frame.shape[0] * percent / 100)
    dim = (width, height)
    return cv2.resize(frame, dim, interpolation=cv2.INTER_AREA)
# Fonction pour extraire les keypoints
mp_pose = mp.solutions.pose

# Fonction pour extraire les keypoints d'une frame (utiliser dans le test)
def extract_keypoints(image):
    with mp_pose.Pose(static_image_mode=True) as pose:
        results = pose.process(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
        if not results.pose_landmarks:
            return None

        landmarks = results.pose_landmarks.landmark
        keypoints = []

        for lm in IMPORTANT_LMS:
            point = landmarks[mp_pose.PoseLandmark[lm].value]
            keypoints.append([point.x, point.y, point.z, point.visibility])

        normalized_keypoints = normalize_keypoints(keypoints)
        normalized_keypoints = np.array(normalized_keypoints).flatten()

        # Calculate angles
        angles = []
        for joint1, joint2, joint3 in ANGLE_JOINTS:
            a = landmarks[mp_pose.PoseLandmark[joint1].value]
            b = landmarks[mp_pose.PoseLandmark[joint2].value]
            c = landmarks[mp_pose.PoseLandmark[joint3].value]

            angle = calculate_angle([a.x, a.y], [b.x, b.y], [c.x, c.y])
            angles.append(angle)

        # Concatenate keypoints and angles
        all_features = np.concatenate([normalized_keypoints, angles])

        return all_features



# Fonction pour préparer la séquence des keypoints (cette fonction est utiliser dans le test)
def prepare_sequence(video_path, sequence_length=30):
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print("Erreur lors de l'ouverture de la vidéo.")
        return None

    sequence = []

    success, frame = cap.read()
    while success:
        keypoints = extract_features(frame)
        if keypoints is not None:
            sequence.append(keypoints)

        if len(sequence) == sequence_length:
            cap.release()
            break

        success, frame = cap.read()




    if len(sequence) == sequence_length:
        return np.array(sequence)  # Retourne une séquence complète de keypoints
    else:
        return None  # Si la séquence est trop courte


In [None]:
print(HEADERS)

['label', 'LEFT_SHOULDER_LEFT_ELBOW_LEFT_WRIST_angle', 'RIGHT_SHOULDER_RIGHT_ELBOW_RIGHT_WRIST_angle', 'LEFT_HIP_LEFT_KNEE_LEFT_ANKLE_angle', 'RIGHT_HIP_RIGHT_KNEE_RIGHT_ANKLE_angle']


Step 4: Normalize the Data

In [None]:
import numpy as np
#normalizing keypoints coordinations
def normalize_keypoints(keypoints):
    keypoints = np.array(keypoints)
    return (keypoints - np.min(keypoints)) / (np.max(keypoints) - np.min(keypoints))


In [None]:
import os
import csv
import numpy as np

# Initialiser le fichier CSV avec les headers si non existant
def init_csv(dataset_path: str):
    if os.path.exists(dataset_path):
        return  # Ne rien faire si le fichier existe déjà
    with open(dataset_path, mode="w", newline="") as f:
        csv_writer = csv.writer(f)
        os.makedirs(os.path.dirname(output_csv), exist_ok=True)
        csv_writer.writerow(HEADERS)


def calculate_angle(a, b, c):
    """
    Calculate the angle between three points
    a, b, c are each a list or array of [x, y]
    Returns the angle in degrees
    """
    a = np.array(a[:2])  # Only x and y
    b = np.array(b[:2])
    c = np.array(c[:2])

    ba = a - b
    bc = c - b

    cosine_angle = np.dot(ba, bc) / (np.linalg.norm(ba) * np.linalg.norm(bc))
    angle = np.arccos(np.clip(cosine_angle, -1.0, 1.0))

    return np.degrees(angle)

# Enregistrer les keypoints + label dans le CSV
def export_landmark_to_csv(dataset_path: str, results, label: str):
    try:
        landmarks = results.pose_landmarks.landmark
        keypoints = []

        for lm in IMPORTANT_LMS:
            point = landmarks[mp_pose.PoseLandmark[lm].value]
            keypoints.append([point.x, point.y, point.z, point.visibility])
            normalized_keypoints = normalize_keypoints(keypoints)

        keypoints_flat = list(np.array(normalized_keypoints).flatten())

        # Add angle calculations
        angles = []
        for joint1, joint2, joint3 in ANGLE_JOINTS:
            a = landmarks[mp_pose.PoseLandmark[joint1].value]
            b = landmarks[mp_pose.PoseLandmark[joint2].value]
            c = landmarks[mp_pose.PoseLandmark[joint3].value]

            angle = calculate_angle(
                [a.x, a.y], [b.x, b.y], [c.x, c.y]
            )
            angles.append(angle)

        #all_features = [label] + keypoints_flat + angles
        all_features = [label]+[landmarks[mp_pose.PoseLandmark[lm].value].x,landmarks[mp_pose.PoseLandmark[lm].value].y]+ angles # utiliser dans un test (à supprimer !!)
        with open(dataset_path, mode="a", newline="") as f:
            csv_writer = csv.writer(f)
            csv_writer.writerow(all_features)

    except Exception as e:
        print("Erreur:", e)



In [None]:
import cv2
import numpy as np
import csv
import os

# Extraire les frames et enregistrer les keypoints dans le CSV
def process_video(video_path, output_csv):
    # Label = nom du dossier parent (= activité)
    activity_label = os.path.basename(os.path.dirname(video_path))
    cap = cv2.VideoCapture(video_path)
    pose = mp_pose.Pose()
    while True:
            success, frame = cap.read()
            if not success:
                break
            image_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            results = pose.process(image_rgb)

            if results.pose_landmarks:
                  export_landmark_to_csv(output_csv, results, activity_label)




    cap.release()
    pose.close()



In [None]:
import os
# Initialisation du fichier CSV une seule fois
def create_the_csv_file():

 for activity in activities:
    activity_path = os.path.join(data_path, activity)
    videos = os.listdir(activity_path)
    for video in videos:
        video_path = os.path.join(activity_path, video)
        process_video(video_path, output_csv)
        print("Le dataset   des keypoints a été créé for "+video_path)
    print("Le dataset   des keypoints a été créé avec succès fro "+activity)
 print("Le dataset des keypoints a été créé avec succès !")
 return output_csv





2. Préparation des données pour le modèle  dans la forme deq sequences

In [None]:

import pandas as pd
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout

# Define le as a global variable
le = LabelEncoder()

# Charger le dataset
def prepare_data(output_csv):
    df = pd.read_csv(output_csv)

    # Extraire les features et labels
    X = df.drop('label', axis=1).values
    y = df['label'].values

    # Encoder les labels using the global le
    y_encoded = le.fit_transform(y)

    # Paramètres
    sequence_length = 20  # nombre de frames par séquence

    # Construction des séquences cohérentes
    X_sequences = []
    y_sequences = []

    # On parcourt les données pour créer des séquences de frames du même label
    for i in range(len(X) - sequence_length):
        # Vérifier que toutes les frames appartiennent au même label
        if len(set(y[i:i+sequence_length])) == 1:
            X_sequences.append(X[i:i+sequence_length])
            y_sequences.append(y_encoded[i])

    X_sequences = np.array(X_sequences)
    y_sequences = np.array(y_sequences)

    print("Shape des données LSTM : ", X_sequences.shape)  # (nb_sequences, 30, nb_features)

    # Split
    X_train, X_test, y_train, y_test = train_test_split(X_sequences, y_sequences, test_size=0.2, random_state=42)
    return X_train, X_test, y_train, y_test



Creation du premier modèle LSTM

In [None]:


from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout

# Architecture du modèle LSTM
def create_lstm_model(input_shape):
    model = Sequential()
    model.add(LSTM(64, return_sequences=True, activation='tanh', input_shape=(sequence_length, X_train.shape[2])))
    model.add(LSTM(128, return_sequences=False, activation='tanh'))
    model.add(Dropout(0.5))
    model.add(Dense(64, activation='relu'))
    model.add(Dense(len(le.classes_), activation='softmax'))
    # Compilation
    model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
    return model




def train_model(model, X_train, y_train, X_test, y_test):
# Entraînement avec batch_size
  model.fit(X_train, y_train,
          epochs=20,
          batch_size=20,  # par exemple
          validation_data=(X_test, y_test))


Tester le premier modèle


In [None]:
import cv2
import numpy as np
import joblib
import mediapipe as mp

# fonction pour faire la prédiction
def predict_activity(video_path,model):

# Préparer la séquence de keypoints
  sequence = prepare_sequence(video_path)

  if sequence is not None:
    # Reshaper la séquence pour qu'elle soit compatible avec l'entrée du modèle LSTM
    sequence = sequence.reshape(1, sequence.shape[0], sequence.shape[1])  # (1, 30, 36)

    # Prédiction
    prediction = model.predict(sequence)

    # Décodage du label
    predicted_label = np.argmax(prediction, axis=1)
    predicted_label = le.inverse_transform(predicted_label)

    # Afficher le résultat
     # Décodage du label avec l'encodeur
    print(f"L'activité prédite est : {predicted_label}")
  else:
    print("La séquence est trop courte ou aucun keypoint détecté dans la vidéo.")


In [None]:
#preparing train and test data

#output_csv = '/content/drive/MyDrive/output/keypoints7.csv'

#print(X_train.shape[2])
#print(y_test[1])



In [None]:
#create_lstm_model
#model=create_lstm_model(X_train.shape)
#train the model
#train_model(model, X_train, y_train, X_test, y_test)
#save the model
#model.save('/content/drive/MyDrive/SportActivityDataset/DATA/final_lstm_mode.keras')

In [None]:

from tensorflow.keras.models import load_model

sequence_length=30
#load the model

le.fit_transform(['barbell biceps curl', 'push-up', 'shoulder press', 'squat'])


array([0, 1, 2, 3])

visualize the data and clean it for better results

In [None]:
import pandas as pd
df=pd.read_csv('/content/drive/MyDrive/output/keypoints6.csv')
print(df.columns)
data=df[df['label']!='hammer curl']
print(data['label'].unique())
print(data[data['label']=='shoulder press'].head())

Index(['label', 'nose_x', 'nose_y', 'nose_z', 'nose_v', 'left_shoulder_x',
       'left_shoulder_y', 'left_shoulder_z', 'left_shoulder_v',
       'right_shoulder_x', 'right_shoulder_y', 'right_shoulder_z',
       'right_shoulder_v', 'right_elbow_x', 'right_elbow_y', 'right_elbow_z',
       'right_elbow_v', 'left_elbow_x', 'left_elbow_y', 'left_elbow_z',
       'left_elbow_v', 'right_wrist_x', 'right_wrist_y', 'right_wrist_z',
       'right_wrist_v', 'left_wrist_x', 'left_wrist_y', 'left_wrist_z',
       'left_wrist_v', 'left_hip_x', 'left_hip_y', 'left_hip_z', 'left_hip_v',
       'right_hip_x', 'right_hip_y', 'right_hip_z', 'right_hip_v',
       'right_knee_x', 'right_knee_y', 'right_knee_z', 'right_knee_v',
       'left_knee_x', 'left_knee_y', 'left_knee_z', 'left_knee_v',
       'right_ankle_x', 'right_ankle_y', 'right_ankle_z', 'right_ankle_v',
       'left_ankle_x', 'left_ankle_y', 'left_ankle_z', 'left_ankle_v',
       'LEFT_SHOULDER_LEFT_ELBOW_LEFT_WRIST_angle',
       'RIGHT_SH

changer la structure de data pour un meilleur entrainement

In [None]:
import numpy as np
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
df=data[['label',
       'LEFT_SHOULDER_LEFT_ELBOW_LEFT_WRIST_angle',
       'RIGHT_SHOULDER_RIGHT_ELBOW_RIGHT_WRIST_angle',
       'LEFT_HIP_LEFT_KNEE_LEFT_ANKLE_angle',
       'RIGHT_HIP_RIGHT_KNEE_RIGHT_ANKLE_angle']]
def normalize_angles(df):
    """
    Normalise les angles articulaires entre -1 et 1 en conservant la relation cyclique des angles
    """
    # Création d'une copie pour ne pas modifier l'original
    normalized_df = df.copy()

    # Normalisation sinus/cosinus pour préserver la circularité des angles
    for col in ['LEFT_SHOULDER_LEFT_ELBOW_LEFT_WRIST_angle',
       'RIGHT_SHOULDER_RIGHT_ELBOW_RIGHT_WRIST_angle',
       'LEFT_HIP_LEFT_KNEE_LEFT_ANKLE_angle',
       'RIGHT_HIP_RIGHT_KNEE_RIGHT_ANKLE_angle']:
        # Conversion en radians
        # Normalisation circulaire
        normalized_df[col+'_sin']= np.sin(normalized_df[col])
        normalized_df[col+'_cos']= np.cos(normalized_df[col])

        # Suppression de la colonne originale
        normalized_df.drop(col, axis=1, inplace=True)

    return normalized_df
df=normalize_angles(df)
print(df.head())


                 label    nose_x    nose_y  left_shoulder_x  left_shoulder_y  \
0  barbell biceps curl  0.536823  0.394492         0.579151         0.448345   
1  barbell biceps curl  0.644883  0.471560         0.697117         0.534556   
2  barbell biceps curl  0.644585  0.459688         0.698101         0.534154   
3  barbell biceps curl  0.639845  0.447251         0.693448         0.527830   
4  barbell biceps curl  0.638382  0.440549         0.692775         0.526917   

   right_shoulder_x  right_shoulder_y  right_elbow_x  right_elbow_y  \
0          0.494607          0.456272       0.487493       0.566115   
1          0.597209          0.541820       0.591782       0.674158   
2          0.599484          0.542223       0.592465       0.670539   
3          0.593281          0.536076       0.587828       0.666124   
4          0.592473          0.536303       0.586838       0.667377   

   left_elbow_x  ...  left_ankle_x  left_ankle_y  \
0      0.592113  ...      0.565274      

In [None]:
print(df.columns)

Index(['label', 'nose_x', 'nose_y', 'left_shoulder_x', 'left_shoulder_y',
       'right_shoulder_x', 'right_shoulder_y', 'right_elbow_x',
       'right_elbow_y', 'left_elbow_x', 'left_elbow_y', 'right_wrist_x',
       'right_wrist_y', 'left_wrist_x', 'left_wrist_y', 'left_hip_x',
       'left_hip_y', 'right_hip_x', 'right_hip_y', 'right_knee_x',
       'right_knee_y', 'left_knee_x', 'left_knee_y', 'right_ankle_x',
       'right_ankle_y', 'left_ankle_x', 'left_ankle_y',
       'LEFT_SHOULDER_LEFT_ELBOW_LEFT_WRIST_angle_sin',
       'LEFT_SHOULDER_LEFT_ELBOW_LEFT_WRIST_angle_cos',
       'RIGHT_SHOULDER_RIGHT_ELBOW_RIGHT_WRIST_angle_sin',
       'RIGHT_SHOULDER_RIGHT_ELBOW_RIGHT_WRIST_angle_cos',
       'LEFT_HIP_LEFT_KNEE_LEFT_ANKLE_angle_sin',
       'LEFT_HIP_LEFT_KNEE_LEFT_ANKLE_angle_cos',
       'RIGHT_HIP_RIGHT_KNEE_RIGHT_ANKLE_angle_sin',
       'RIGHT_HIP_RIGHT_KNEE_RIGHT_ANKLE_angle_cos'],
      dtype='object')


creation du dexième modèle:

In [None]:

from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout
def create_ultralight_model(sequence_length=30, n_features=8, n_classes=4):
    model = Sequential([
        LSTM(32, input_shape=(sequence_length, n_features)),
        Dense(n_classes, activation='softmax')
    ])

    model.compile(
        optimizer='adam',
        loss='sparse_categorical_crossentropy',
        metrics=['accuracy']
    )
    return model

In [None]:

def prepare_data(df):
    # Extraire les features et labels
    X = df.drop('label', axis=1).values
    y = df['label'].values

    # Encoder les labels using the global le
    y_encoded = le.fit_transform(y)

    # Paramètres
    sequence_length = 30 # nombre de frames par séquence

    # Construction des séquences cohérentes
    X_sequences = []
    y_sequences = []

    # On parcourt les données pour créer des séquences de frames du même label
    for i in range(len(X) - sequence_length):
        # Vérifier que toutes les frames appartiennent au même label
        if len(set(y[i:i+sequence_length])) == 1:
            X_sequences.append(X[i:i+sequence_length])
            y_sequences.append(y_encoded[i])

    X_sequences = np.array(X_sequences)
    y_sequences = np.array(y_sequences)

    print("Shape des données LSTM : ", X_sequences.shape)  # (nb_sequences, 30, nb_features)

    # Split
    X_train, X_test, y_train, y_test = train_test_split(X_sequences, y_sequences, test_size=0.2, random_state=42)
    return X_train, X_test, y_train, y_test

entrainement du deuxième modèle:

In [None]:

from tensorflow.keras.callbacks import EarlyStopping
X_train, X_test, y_train, y_test=prepare_data(df)
model = create_ultralight_model()
model.summary()
# Entraînement
early_stopping = EarlyStopping(
    monitor='val_loss',    # Métrique à surveiller
    patience=10,          # Nombre d'epochs sans amélioration avant arrêt
    restore_best_weights=True  # Restaure les poids du meilleur modèle
)

# Entraînement du modèle avec EarlyStopping
history = model.fit(
    X_train, y_train,
    validation_data=(X_test, y_test),
    epochs=41,
    batch_size=64,
    callbacks=[early_stopping]  # Ajout du callback
)

In [None]:
model.save('/content/drive/MyDrive/SportActivityDataset/clean_data/model14.keras')