In [None]:
import os
import cv2
import numpy as np
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv3D, MaxPooling3D, Flatten, Dense, Dropout
from tensorflow.keras.utils import to_categorical
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from tensorflow.keras.models import load_model
from tensorflow.keras.layers import BatchNormalization
from itertools import product
from keras.optimizers import Adam
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
import matplotlib.pyplot as plt
import random


In [2]:
# 1. Load and Preprocess Videos
def load_videos_from_folders(folder_path, img_size=(64, 64), sequence_length=30):
    classes = os.listdir(folder_path)
    data, labels = [], []

    for label, activity in enumerate(classes):
        activity_folder = os.path.join(folder_path, activity)
        for video_file in os.listdir(activity_folder):
            video_path = os.path.join(activity_folder, video_file)
            frames = video_to_frames(video_path, img_size, sequence_length)
            if frames is not None:
                data.append(frames)
                labels.append(label)

    data = np.array(data)
    labels = to_categorical(labels, num_classes=len(classes))

    return data, labels, classes

def video_to_frames(video_path, img_size, sequence_length):
    cap = cv2.VideoCapture(video_path)
    frames = []
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        frame = cv2.resize(frame, img_size)
        frames.append(frame)
        if len(frames) == sequence_length:
            break
    cap.release()

    if len(frames) < sequence_length:
        return None  # Ignore short videos

    return np.array(frames)


In [3]:
def build_3dcnn(input_shape, num_classes, conv_filters=[64, 128, 256], kernel_size=(3, 3, 3), dense_units=1024, dropout_rate=0.5, learning_rate=0.001):
    model = Sequential()

    # First Conv Layer
    model.add(Conv3D(conv_filters[0], kernel_size=kernel_size, activation='relu', input_shape=input_shape))
    model.add(BatchNormalization())
    model.add(MaxPooling3D(pool_size=(2, 2, 2)))

    # Second Conv Layer
    model.add(Conv3D(conv_filters[1], kernel_size=kernel_size, activation='relu'))
    model.add(BatchNormalization())
    model.add(MaxPooling3D(pool_size=(2, 2, 2)))

    # Third Conv Layer
    model.add(Conv3D(conv_filters[2], kernel_size=kernel_size, activation='relu'))
    model.add(BatchNormalization())
    model.add(MaxPooling3D(pool_size=(2, 2, 2)))

    # Fully Connected Layers
    model.add(Flatten())
    model.add(Dense(dense_units, activation='relu'))
    model.add(Dropout(dropout_rate))
    model.add(Dense(num_classes, activation='softmax'))

    # Compile the model
    optimizer = Adam(learning_rate=learning_rate)
    model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy'])

    return model


In [None]:

# Define random search function
def run_random_search(train_data_folder, param_grid, iterations=3, n_combinations=10):
    # Load data
    data, labels, activity_classes = load_videos_from_folders(train_data_folder)
    input_shape = (30, 64, 64, 3)  # (sequence_length, img_size, img_size, channels)
    num_classes = len(activity_classes)

    # To store results
    best_accuracy = 0.0
    best_params = None
    best_model_path = None
    best_val_true_labels = None
    best_val_predicted_labels = None

    # Randomly sample a subset of parameter combinations
    param_combinations = [dict(zip(param_grid.keys(), values)) for values in random.sample(list(product(*param_grid.values())), n_combinations)]

    for param_comb in param_combinations:
        # Unpack the current parameter combination
        params = dict(param_comb)
        print(f"\nTesting with parameters: {params}")

        for i in range(iterations):
            print(f"\nIteration {i + 1}/{iterations}")
            # Split data
            X_train, X_val, y_train, y_val = train_test_split(data, labels, test_size=0.2, random_state=i)

            # Build and train the model with the current hyperparameters
            model = build_3dcnn(
                input_shape, num_classes,
                conv_filters=params['conv_filters'],
                kernel_size=params['kernel_size'],
                dense_units=params['dense_units'],
                dropout_rate=params['dropout_rate'],
                learning_rate=params['learning_rate']
            )

            # Train the model for a set number of epochs
            model.fit(X_train, y_train, validation_data=(X_val, y_val), epochs=params['epochs'], batch_size=params['batch_size'], verbose=1)

            # Evaluate model on validation set
            val_predictions = model.predict(X_val)
            val_predicted_labels = np.argmax(val_predictions, axis=1)
            val_true_labels = np.argmax(y_val, axis=1)

            # Calculate accuracy for this iteration
            accuracy = accuracy_score(val_true_labels, val_predicted_labels)
            print(f"Iteration {i + 1} Validation Accuracy: {accuracy}")

            # Check if this is the best accuracy so far
            if accuracy > best_accuracy:
                best_accuracy = accuracy
                best_params = params
                best_model_path = f"best_model_{params}_accuracy_{best_accuracy:.2f}.h5"
                model.save(best_model_path)
                print(f"Best model saved with accuracy {best_accuracy}")

                # Store the best validation predictions and labels for confusion matrix
                best_val_true_labels = val_true_labels
                best_val_predicted_labels = val_predicted_labels

    # Print and plot confusion matrix for the best model
    if best_val_true_labels is not None and best_val_predicted_labels is not None:
        cm = confusion_matrix(best_val_true_labels, best_val_predicted_labels)
        disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=activity_classes)
        disp.plot(cmap=plt.cm.Blues)
        plt.title(f'Confusion Matrix for Best Model\nAccuracy: {best_accuracy:.2f}')
        plt.show()

    # Return the best parameters and accuracy
    print(f"\nBest hyperparameters: {best_params} with accuracy: {best_accuracy}")
    return best_params, best_accuracy, best_model_path


In [4]:
# Define grid search function
def run_grid_search(train_data_folder, param_grid, iterations=3):
    # Load data
    data, labels, activity_classes = load_videos_from_folders(train_data_folder)
    input_shape = (30, 64, 64, 3)  # (sequence_length, img_size, img_size, channels)
    num_classes = len(activity_classes)

    # To store results
    best_accuracy = 0.0
    best_params = None
    best_model_path = None
    best_val_true_labels = None
    best_val_predicted_labels = None

    # Generate all combinations of hyperparameters
    param_combinations = list(product(*param_grid.values()))

    for param_comb in param_combinations:
        # Unpack the current parameter combination
        params = dict(zip(param_grid.keys(), param_comb))
        print(f"\nTesting with parameters: {params}")

        for i in range(iterations):
            print(f"\nIteration {i + 1}/{iterations}")
            # Split data
            X_train, X_val, y_train, y_val = train_test_split(data, labels, test_size=0.2, random_state=i)

            # Build and train the model with the current hyperparameters
            model = build_3dcnn(
                input_shape, num_classes,
                conv_filters=params['conv_filters'],
                kernel_size=params['kernel_size'],
                dense_units=params['dense_units'],
                dropout_rate=params['dropout_rate'],
                learning_rate=params['learning_rate']
            )

            # Train the model for a set number of epochs
            model.fit(X_train, y_train, validation_data=(X_val, y_val), epochs=params['epochs'], batch_size=params['batch_size'], verbose=1)

            # Evaluate model on validation set
            val_predictions = model.predict(X_val)
            val_predicted_labels = np.argmax(val_predictions, axis=1)
            val_true_labels = np.argmax(y_val, axis=1)

            # Calculate accuracy for this iteration
            accuracy = accuracy_score(val_true_labels, val_predicted_labels)
            print(f"Iteration {i + 1} Validation Accuracy: {accuracy}")

            # Check if this is the best accuracy so far
            if accuracy > best_accuracy:
                best_accuracy = accuracy
                best_params = params
                best_model_path = f"best_model_{params}_accuracy_{best_accuracy:.2f}.h5"
                model.save(best_model_path)
                print(f"Best model saved with accuracy {best_accuracy}")

                # Store the best validation predictions and labels for confusion matrix
                best_val_true_labels = val_true_labels
                best_val_predicted_labels = val_predicted_labels

    # Print and plot confusion matrix for the best model
    if best_val_true_labels is not None and best_val_predicted_labels is not None:
        cm = confusion_matrix(best_val_true_labels, best_val_predicted_labels)
        disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=activity_classes)
        disp.plot(cmap=plt.cm.Blues)
        plt.title(f'Confusion Matrix for Best Model\nAccuracy: {best_accuracy:.2f}')
        plt.show()

    # Return the best parameters and accuracy
    print(f"\nBest hyperparameters: {best_params} with accuracy: {best_accuracy}")
    return best_params, best_accuracy, best_model_path


In [5]:
param_grid = {
    'conv_filters': [[32, 64, 128], [64, 128, 256]],  # Filter sizes for the 3 Conv layers
    'kernel_size': [(3, 3, 3), (5, 5, 5)],  # Kernel size for Conv layers
    'dense_units': [512, 1024],  # Number of units in Dense layer
    'dropout_rate': [0.4, 0.5],  # Dropout rates
    'learning_rate': [0.001, 0.0001],  # Learning rates
    'batch_size': [8, 16],  # Batch size
    'epochs': [10, 20]  # Number of epochs
}

train_data_folder = "/content/drive/MyDrive/PAM/test"  # Replace with your path to training data

# Run grid search over multiple hyperparameters
best_params, best_accuracy, best_model_path = run_grid_search(train_data_folder, param_grid)

# Load the best-performing model (if needed)
best_model = load_model(best_model_path)
print(f"Loaded best model from: {best_model_path}")



Testing with parameters: {'conv_filters': [32, 64, 128], 'kernel_size': (3, 3, 3), 'dense_units': 512, 'dropout_rate': 0.4, 'learning_rate': 0.001, 'batch_size': 8, 'epochs': 10}

Iteration 1/3


  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


Epoch 1/10
[1m 28/171[0m [32m━━━[0m[37m━━━━━━━━━━━━━━━━━[0m [1m12:14[0m 5s/step - accuracy: 0.0171 - loss: 15.1816

KeyboardInterrupt: 

Code for Predicting Activity from a Live Camera Feed:

In [None]:
# 1. Define a function to process frames from the webcam
def process_live_frames(img_size=(64, 64), sequence_length=30):
    cap = cv2.VideoCapture(0)  # Open webcam (0 is the default for most cameras)
    frames = []

    while True:
        ret, frame = cap.read()
        if not ret:
            break

        # Resize the frame to match the model input size
        resized_frame = cv2.resize(frame, img_size)
        frames.append(resized_frame)

        # Display the frame
        cv2.imshow('Live Video Feed', frame)

        # If we have enough frames for a sequence, yield them for prediction
        if len(frames) == sequence_length:
            yield np.array(frames)  # Return sequence as numpy array
            frames = []  # Reset for the next sequence

        # Press 'q' to exit the live feed
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

    cap.release()
    cv2.destroyAllWindows()

# 2. Function to predict activity on live frames
def predict_live_activity(model, activity_classes, img_size=(64, 64), sequence_length=30):
    for frames in process_live_frames(img_size, sequence_length):
        frames = np.expand_dims(frames, axis=0)  # Add batch dimension
        prediction = model.predict(frames)
        predicted_label = np.argmax(prediction, axis=1)[0]
        predicted_activity = activity_classes[predicted_label]

        print(f"Predicted Activity: {predicted_activity}")

# 3. Load the pre-trained model and activity classes
model_path = "path_to_saved_model.h5"  # Path to your saved model
model = load_model(model_path)

# Assuming `activity_classes` is the list of activities in your training data
activity_classes = ['Activity1', 'Activity2', 'Activity3']

# 4. Start the live prediction
predict_live_activity(model, activity_classes, img_size=(64, 64), sequence_length=30)


FileNotFoundError: [Errno 2] Unable to synchronously open file (unable to open file: name = 'path_to_saved_model.h5', errno = 2, error message = 'No such file or directory', flags = 0, o_flags = 0)