In [None]:
!pip install mediapipe opencv-python tensorflow

In [None]:
import pandas as pd
import numpy as np
import os
import logging

In [None]:
import cv2
import mediapipe as mp
from mediapipe.framework.formats import landmark_pb2
mp_holistic = mp.solutions.holistic
mp_drawing = mp.solutions.drawing_utils
mp_drawing_styles = mp.solutions.drawing_styles

In [None]:
import tensorflow as tf
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import (
    Conv2D, MaxPooling2D, BatchNormalization, Flatten, Dense, Dropout,
    LSTM, TimeDistributed, Bidirectional,
    Conv1D, MaxPooling1D, SeparableConv1D, SeparableConv2D,
    Activation, Masking, Input, LayerNormalization
)
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
from tensorflow.keras.regularizers import l2
from tensorflow.keras.optimizers import Adam, RMSprop

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
DATA_PATH = "/content/drive/MyDrive/Merged"

In [None]:
np.random.seed(42)
tf.random.set_seed(42)

# Parameters
# (x,y,z,visibility) -> for pose
# (x,y,z) -> for hands
MAX_SEQUENCE_LENGTH = 75
NUM_FEATURES = 162
BATCH_SIZE = 32
EPOCHS = 100
CLASSES = ['Afternoon', 'Apple', 'April', 'August', 'Banana', 'Day', 'December', 'Evening',
           'Febraury', 'Friday', 'Grapes', 'January', 'July', 'June', 'March', 'May', 'Monday',
           'Morning', 'Night', 'November', 'October', 'Orange', 'Rainy', 'Saturday', 'September',
           'Summer', 'Sunday', 'Thursday', 'Tuesday', 'Valencia_Orange', 'Watermelon', 'Wednesday', 'Winter']

NUM = len(CLASSES)

print(NUM)
print(NUM_FEATURES)

In [None]:
def extract_mediapipe_features(video_path, display_video=False):
    sequence_features = []
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print(f"Error: Could not open video {video_path}")
        return None

    desired_pose_landmark_indices = set(range(11,23))

    with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
        frame_count = 0
        while cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                break

            # Convert the BGR image to RGB.
            image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            image.flags.writeable = False # To improve performance

            # Process the image and find landmarks.
            results = holistic.process(image)

            # Revert to BGR and enable writing for drawing
            image.flags.writeable = True
            image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)

            # --- Extract only desired features ---
            current_frame_features = []

            # Pose landmarks (only desired upper body points, including the first 11)
            if results.pose_landmarks:
                for i, landmark in enumerate(results.pose_landmarks.landmark):
                    if i in desired_pose_landmark_indices:
                        current_frame_features.extend([landmark.x, landmark.y, landmark.z])

                # Pad if not all desired pose landmarks were detected or if some are skipped
                num_expected_pose_features = len(desired_pose_landmark_indices) * 3
                if len(current_frame_features) < num_expected_pose_features:
                    # Pad with zeros for any missing desired landmarks
                    current_frame_features.extend([0.0] * (num_expected_pose_features - len(current_frame_features)))
            else:
                # If no pose landmarks were detected at all, fill with zeros for all expected pose features
                current_frame_features.extend([0.0] * len(desired_pose_landmark_indices) * 3)

            # Left hand landmarks (21 landmarks, each with x, y, z)
            if results.left_hand_landmarks:
                for landmark in results.left_hand_landmarks.landmark:
                    current_frame_features.extend([landmark.x, landmark.y, landmark.z])
            else:
                current_frame_features.extend([0.0] * 21 * 3) # 63 zeros

            # Right hand landmarks (21 landmarks, each with x, y, z)
            if results.right_hand_landmarks:
                for landmark in results.right_hand_landmarks.landmark:
                    current_frame_features.extend([landmark.x, landmark.y, landmark.z])
            else:
                current_frame_features.extend([0.0] * 21 * 3) # 63 zeros

            # Verify the total number of features matches NUM_FEATURES
            if len(current_frame_features) != NUM_FEATURES:
                print(f"Warning: Feature count mismatch at frame {frame_count}. Expected {NUM_FEATURES}, got {len(current_frame_features)}")
                # Fallback to ensure consistent shape by padding with zeros if mismatch occurs
                if len(current_frame_features) < NUM_FEATURES:
                     current_frame_features.extend([0.0] * (NUM_FEATURES - len(current_frame_features)))
                else: # Truncate if too many (unlikely with this logic but good practice)
                    current_frame_features = current_frame_features[:NUM_FEATURES]


            sequence_features.append(current_frame_features)
            frame_count += 1

    cap.release()
    if display_video:
        cv2.destroyAllWindows()

    if not sequence_features:
        print(f"Warning: No features extracted from {video_path}")
        return np.zeros((0, NUM_FEATURES)) # Return empty array with correct feature dimension

    return np.array(sequence_features)


def standardize_sequence(sequence, max_len, num_features):
    """
    Pads, truncates, or extracts the starting segment of a sequence to a fixed length.
    """
    current_len = len(sequence)

    if current_len > max_len:
        # Take the starting max_len elements
        return sequence[:max_len]
    elif current_len < max_len:
        # Pad the sequence with zeros
        padding = np.zeros((max_len - current_len, num_features))
        return np.vstack((sequence, padding))
    else:
        # Sequence is already the correct length
        return sequence

In [None]:
# --- 1. Load Data (Video Paths and Labels) ---
def load_data(data_path, classes_list, max_seq_length, num_features_per_frame):
    sequences = []
    labels = []
    label_map = {label: num for num, label in enumerate(classes_list)}

    for class_name in classes_list:
        class_path = os.path.join(data_path, class_name)
        if not os.path.isdir(class_path):
            print(f"Warning: Directory not found for class {class_name} at {class_path}")
            continue

        print(f"Processing class: {class_name}")
        video_count = 0
        for video_file in os.listdir(class_path):
            video_path = os.path.join(class_path, video_file)
            # Basic check for video file extensions, add more if needed
            if not (video_file.lower().endswith('.mp4') or \
                    video_file.lower().endswith('.avi') or \
                    video_file.lower().endswith('.mov')):
                print(f"Skipping non-video file: {video_file} in {class_name}")
                continue

            # Set display_video=True for debugging a single video, False for batch processing
            keypoints = extract_mediapipe_features(video_path, display_video=False)

            if keypoints is not None and keypoints.shape[0] > 0:
                # Preprocess: pad or truncate
                processed_keypoints = standardize_sequence(keypoints, max_len=max_seq_length, num_features=num_features_per_frame)
                sequences.append(processed_keypoints)
                labels.append(label_map[class_name])
                video_count +=1
            else:
                print(f"Warning: Could not extract features or no frames from {video_path}. Skipping.")
        print(f"Processed {video_count} videos for class {class_name}")


    if not sequences:
        print("Error: No sequences were loaded. Check DATA_PATH and video files.")
        return None, None

    return np.array(sequences), np.array(labels)

In [None]:
X, y = load_data(DATA_PATH, CLASSES, MAX_SEQUENCE_LENGTH, NUM_FEATURES)

In [None]:
# prompt: train test and validate the data in the ratio = 7:2:1

# Calculate the sizes for train, validation, and test sets
train_size = 0.7
val_size = 0.1
test_size = 0.2

# Split the data into training and remaining (validation + test)
X_train, X_rem, y_train, y_rem = train_test_split(X, y, train_size=train_size, random_state=42, stratify=y)

# Calculate the ratio of validation and test sets from the remaining data
# val_size_rem = val_size / (val_size + test_size)
# test_size_rem = test_size / (val_size + test_size) # This is 1 - val_size_rem

# Split the remaining data into validation and test sets
# Since we want a 7:2:1 split of the *original* data, the ratio of val:test
# within the remaining data (y_rem) is val_size / (val_size + test_size)
X_val, X_test, y_val, y_test = train_test_split(X_rem, y_rem, test_size=(test_size/(val_size + test_size)), random_state=42, stratify=y_rem)


print(f"Shape of X_train: {X_train.shape}")
print(f"Shape of y_train: {y_train.shape}")
print(f"Shape of X_val: {X_val.shape}")
print(f"Shape of y_val: {y_val.shape}")
print(f"Shape of X_test: {X_test.shape}")
print(f"Shape of y_test: {y_test.shape}")

In [None]:
# --- One-Hot Encode Labels ---
y_train_cat = to_categorical(y_train, num_classes=NUM)
y_val_cat = to_categorical(y_val, num_classes=NUM)
y_test_categorical = to_categorical(y_test, num_classes=NUM) # Rename to y_test_categorical to avoid conflict later


print(f"Shape of y_train after one-hot encoding: {y_train_cat.shape}")
print(f"Shape of y_val after one-hot encoding: {y_val_cat.shape}")
print(f"Shape of y_test after one-hot encoding: {y_test_categorical.shape}")

In [None]:
def create_stacked_bilstm_model(input_shape, num_classes, lstm_units=512, dropout_rate=0.6):
    inputs = Input(shape=input_shape)
    masked_input = Masking(mask_value=0.)(inputs)

    # First BiLSTM layer with increased units
    x = Bidirectional(LSTM(lstm_units, return_sequences=True))(masked_input)
    x = Dropout(dropout_rate)(x)
    x = BatchNormalization()(x)

    # Second BiLSTM layer
    x = Bidirectional(LSTM(lstm_units, return_sequences=False))(x)
    x = Dropout(dropout_rate)(x)
    x = BatchNormalization()(x)

    # Dense layers with L2 regularization
    x = Dense(lstm_units, activation='relu', kernel_regularizer=l2(0.01))(x)
    x = Dropout(dropout_rate)(x)
    x = BatchNormalization()(x)
    outputs = Dense(num_classes, activation='softmax')(x)

    model = Model(inputs=inputs, outputs=outputs)
    optimizer = RMSprop(learning_rate=0.0005)  # Adjusted learning rate
    model.compile(optimizer=optimizer,
                  loss='categorical_crossentropy',
                  metrics=['accuracy'])

    # Add early stopping
    early_stopping = EarlyStopping(monitor='val_accuracy', patience=10, restore_best_weights=True)

    return model, early_stopping

In [None]:
input_shape = (MAX_SEQUENCE_LENGTH, NUM_FEATURES)
model, early_stopping = create_stacked_bilstm_model(input_shape, NUM) # Unpack the tuple
model.summary()

# Redundant callback definitions removed
reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=5, min_lr=0.00001, verbose=1)
callbacks_list = [early_stopping, reduce_lr]

In [None]:
# prompt: fit the model
if X_train.shape[0] > 0 :
    print("\n--- Starting Model Training ---")
    model_obj, _ = model 

    history = model_obj.fit( # Use the unpacked model object here
      X_train, y_train_cat,
      epochs=200,
      batch_size=BATCH_SIZE,
      validation_data=(X_val, y_val_cat) if X_val.shape[0] > 0 else None,
      callbacks=callbacks_list,
      verbose=1
    )
    print("--- Model Training Finished ---")

In [None]:
# prompt: show the graph between training accuracy and testing accuracy

import matplotlib.pyplot as plt
if X_train.shape[0] > 0 and history is not None:
    # Plot training and validation accuracy curves
    plt.figure(figsize=(12, 6))
    plt.plot(history.history['accuracy'], label='Training Accuracy')
    if 'val_accuracy' in history.history:
        plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
    plt.title('Training and Validation Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.legend()
    plt.grid(True)
    plt.show()

    # Plot training and validation loss curves
    plt.figure(figsize=(12, 6))
    plt.plot(history.history['loss'], label='Training Loss')
    if 'val_loss' in history.history:
        plt.plot(history.history['val_loss'], label='Validation Loss')
    plt.title('Training and Validation Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    plt.grid(True)
    plt.show()
else:
    print("Cannot plot accuracy/loss curves: Training data not available or history object is empty.")


In [None]:
# prompt: I wanna know the train accuracy and also print a heatmap for all the classes as well

from sklearn.metrics import accuracy_score, confusion_matrix
import seaborn as sns

# Evaluate the model on the training data
# Note: It's often better to evaluate on a separate test set to gauge generalization
# If you specifically need train accuracy, calculate it here:
if 'model' in locals() and model is not None and X_train.shape[0] > 0:
    # Unpack the model object from the tuple
    model_obj, _ = model
    train_predictions = model_obj.predict(X_train)
    train_predicted_classes = np.argmax(train_predictions, axis=1)
    train_true_classes = np.argmax(y_train_cat, axis=1) # Use y_train_cat for comparison

    train_accuracy = accuracy_score(train_true_classes, train_predicted_classes)
    print(f"\nTraining Accuracy: {train_accuracy:.4f}")

    # Generate Confusion Matrix for training data
    cm_train = confusion_matrix(train_true_classes, train_predicted_classes)

    # Plotting the heatmap for training confusion matrix
    plt.figure(figsize=(15, 12)) # Adjusted figure size for potentially many classes
    sns.heatmap(cm_train, annot=True, fmt='d', cmap='Blues', xticklabels=CLASSES, yticklabels=CLASSES)
    plt.xlabel('Predicted Class')
    plt.ylabel('True Class')
    plt.title('Confusion Matrix (Training Data)')
    plt.show()

else:
    print("Cannot calculate training accuracy or plot heatmap: Model not available or training data is empty.")

# Evaluate on Test Data and print heatmap for all classes
if 'model' in locals() and model is not None and X_test.shape[0] > 0:
    print("\n--- Evaluating on Test Data ---")
    # Unpack the model object from the tuple
    model_obj, _ = model
    loss, accuracy = model_obj.evaluate(X_test, y_test_categorical, verbose=0) # Use y_test_categorical
    print(f"Test Loss: {loss:.4f}")
    print(f"Test Accuracy: {accuracy:.4f}")

    # Generate Confusion Matrix for test data
    test_predictions = model_obj.predict(X_test)
    test_predicted_classes = np.argmax(test_predictions, axis=1)
    test_true_classes = np.argmax(y_test_categorical, axis=1) # Use y_test_categorical

    cm_test = confusion_matrix(test_true_classes, test_predicted_classes)

    # Plotting the heatmap for test confusion matrix
    plt.figure(figsize=(15, 12)) # Adjusted figure size for potentially many classes
    sns.heatmap(cm_test, annot=True, fmt='d', cmap='Blues', xticklabels=CLASSES, yticklabels=CLASSES)
    plt.xlabel('Predicted Class')
    plt.ylabel('True Class')
    plt.title('Confusion Matrix (Test Data)')
    plt.show()

else:
    print("Cannot evaluate on test data or plot heatmap: Model not available or test data is empty.")