In [6]:
import warnings
import os
import cv2
import mediapipe as mp
import numpy as np
import matplotlib.pyplot as plt
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import TimeDistributed, Conv1D, Flatten, LSTM, Dense, Dropout, BatchNormalization, Input
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix
import seaborn as sns

# Suppress specific warnings
warnings.filterwarnings('ignore', message='SymbolDatabase.GetPrototype() is deprecated.')

# Set the base directory for videos
base_dir = r'C:\Users\AnandaMuthu\Downloads\aaasaturday\dataset'

# Action names
action_names = ["Bhujangasana","Padmasana","Shavasana","Tadasana","Trikonasana","Vrikshasana"]

# Map labels to numeric values
label_map = {name: i for i, name in enumerate(action_names)}

# Function to get all video paths and their labels
def get_video_paths_and_labels(base_dir):
    video_paths = []
    labels = []
    for action in os.listdir(base_dir):
        action_dir = os.path.join(base_dir, action)
        if os.path.isdir(action_dir):
            for video in os.listdir(action_dir):
                if video.endswith(".mp4"):
                    video_paths.append(os.path.join(action_dir, video))
                    labels.append(action)
    return video_paths, labels

# Initialize MediaPipe Pose
mp_pose = mp.solutions.pose
pose_video = mp_pose.Pose(static_image_mode=False, min_detection_confidence=0.5, model_complexity=1)

# Function to perform pose detection and return annotated frame and landmarks
def detect_pose(frame, pose_video):
    frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    results = pose_video.process(frame_rgb)
    if results.pose_landmarks:
        mp.solutions.drawing_utils.draw_landmarks(frame, results.pose_landmarks, mp.solutions.pose.POSE_CONNECTIONS)
    return frame, results

# Function to analyze the results and get coordinates of each landmark
def get_landmark_coordinates(results):
    if not results.pose_landmarks:
        return []
    landmarks = []
    for lm in results.pose_landmarks.landmark:
        landmarks.append((lm.x, lm.y, lm.z))
    return landmarks

# Function to process video, extract frames, perform pose estimation, and collect results
def process_video(video_path, skip_frames=5):
    cap = cv2.VideoCapture(video_path)
    
    if not cap.isOpened():
        print("Error: Could not open video file.")
        return []
    
    frame_number = 0
    all_coordinates = []
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        
        if frame_number % skip_frames == 0:
            # Perform pose detection
            _, results = detect_pose(frame, pose_video)
            coordinates = get_landmark_coordinates(results)
            
            # Store the coordinates
            if coordinates:
                all_coordinates.append(coordinates)

        frame_number += 1
    
    cap.release()
    return all_coordinates

# Function to preprocess data
def preprocess_data(all_coordinates_array, all_labels_array, sequence_length, num_joints):
    sequences = []
    labels = []
    for i in range(len(all_coordinates_array) - sequence_length + 1):
        frame_sequence = np.array(all_coordinates_array[i:i + sequence_length])
        sequences.append(frame_sequence.reshape((sequence_length, num_joints, 3)))  # 3 coordinates (x, y, z)
        labels.append(label_map[all_labels_array[i]])
    sequences = np.array(sequences)
    labels = to_categorical(labels, num_classes=len(action_names))
    return sequences, labels

# Function to build the CNN-LSTM model
def build_model(num_joints, num_classes, sequence_length):
    model = Sequential()
    model.add(Input(shape=(sequence_length, num_joints, 3)))  # Input layer
    model.add(TimeDistributed(Conv1D(filters=16, kernel_size=3, activation='relu')))
    model.add(BatchNormalization())
    model.add(Dropout(0.2))
    model.add(TimeDistributed(Flatten()))
    model.add(LSTM(64, return_sequences=False))  # Adjust LSTM units as needed
    model.add(Dense(num_classes, activation='softmax'))
    model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
    return model


In [None]:
# Get video paths and labels
video_paths, labels = get_video_paths_and_labels(base_dir)
all_coordinates_array = []
all_labels_array = []

# Process each video to extract pose landmarks
for video_path, label in zip(video_paths, labels):
    coordinates = process_video(video_path, skip_frames=5)
    if coordinates:
        all_coordinates_array.extend(coordinates)
        all_labels_array.extend([label] * len(coordinates))

num_joints = 33  # Number of landmarks detected by MediaPipe Pose
sequence_length = 30  # Number of frames to consider in a sequence

# Preprocess data
if len(all_coordinates_array) > sequence_length:
    sequences, labels = preprocess_data(all_coordinates_array, all_labels_array, sequence_length, num_joints)
    
    # Build the model
    model = build_model(num_joints, len(action_names), sequence_length)
    
    # Split data into training and testing sets
    X_train, X_test, y_train, y_test = train_test_split(sequences, labels, test_size=0.2, random_state=42)
    
    # Train the model
    history = model.fit(X_train, y_train, epochs=25, validation_data=(X_test, y_test))
    
    # Save the model
    model.save("yogapose_detection_model.keras")
    
    # Evaluate the model
    y_pred = model.predict(X_test)
    y_pred_classes = np.argmax(y_pred, axis=1)
    y_true = np.argmax(y_test, axis=1)
    
    # Print classification report
    print(classification_report(y_true, y_pred_classes, target_names=action_names))
    
    # Confusion matrix
    cm = confusion_matrix(y_true, y_pred_classes)
    plt.figure(figsize=(10, 7))
    sns.heatmap(cm, annot=True, fmt='d', xticklabels=action_names, yticklabels=action_names)
    plt.xlabel('Predicted')
    plt.ylabel('True')
    plt.title('Confusion Matrix')
    plt.show()
    
    # Plot training & validation accuracy values
    plt.figure(figsize=(12, 6))
    plt.subplot(1, 2, 1)
    plt.plot(history.history['accuracy'])
    plt.plot(history.history['val_accuracy'])
    plt.title('Model Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.legend(['Train', 'Validation'], loc='upper left')
    
    # Plot training & validation loss values
    plt.subplot(1, 2, 2)
    plt.plot(history.history['loss'])
    plt.plot(history.history['val_loss'])
    plt.title('Model Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend(['Train', 'Validation'], loc='upper left')
    
    plt.show()
else:
    print("Error: Not enough data to generate sequences. Ensure you have enough frames and proper input format.")


