In [1]:
def extract_frames_from_folder(subject_folder_path):
    """
    Extract frames from all .avi video clips in the specified folder and save them in a 'Separated_Frames' subfolder.
    
    Parameters:
    - subject_folder_path: Path to the folder containing .avi video clips.
    """
    
    # Check if the subject folder exists
    if not os.path.exists(subject_folder_path):
        print(f"Error: The folder '{subject_folder_path}' does not exist.")
        return

    # List all .avi files in the subject folder
    avi_files = [f for f in os.listdir(subject_folder_path) if f.endswith('.avi')]
    all_frames = 0
    # Inform the user about the total number of clips in the folder
    print(f"\t...total clips in the folder: {len(avi_files)}")

    # Check if there are any .avi files in the folder
    if not avi_files:
        print(f"No .avi files found in the folder '{subject_folder_path}'.")
        return

    # Create the 'Separated_Frames' subfolder inside the subject folder
    output_folder = os.path.join(subject_folder_path, 'Separated_Frames')
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)
        print(f"\t...creating a new folder to store frames from clips at: {output_folder}")

    for n, avi_file in enumerate(avi_files):
        video_path = os.path.join(subject_folder_path, avi_file)
        
        # Check if the video file exists (just to be extra cautious)
        if not os.path.exists(video_path):
            print(f"Error: The video file '{video_path}' does not exist.")
            continue
        
        # Create a sub-folder for the extracted frames of this video inside the 'Separated_Frames' folder
        video_name = os.path.splitext(avi_file)[0]  # Remove the .avi extension
        video_output_folder = os.path.join(output_folder, video_name)
        
        if not os.path.exists(video_output_folder):
            os.makedirs(video_output_folder)

        # Open the video using OpenCV
        cap = cv2.VideoCapture(video_path)

        # Check if the video was opened successfully
        if not cap.isOpened():
            print(f"Error: Unable to open the video file '{video_path}'.")
            continue

        # Get the total number of frames
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

        # Loop through each frame and save it as an image
        for frame_num in range(total_frames):
            all_frames += 1
            print(f"\r\t...extracting frame: {frame_num} from clip: {n} of {len(avi_files)}  ", end='', flush=True)
            ret, frame = cap.read()
            if not ret:
                print(f"Error: Unable to read frame {frame_num} from video '{video_path}'.")
                break
            # Construct the output image path
            output_image_path = os.path.join(video_output_folder, f"frame_{frame_num:04d}.png")
            cv2.imwrite(output_image_path, frame)

        # Release the video capture object
        cap.release()

    # Print the summary
    example_frame_name = os.path.join(video_output_folder, "frame_0000.png")
    print(f"\n\n   SUMMARY: \n\tExtracted {all_frames} frames from the folder '{subject_folder_path}' and saved in '{output_folder}'")
    print(f"\tTotal clips extracted: {len(avi_files)}")
    print(f"\tExample frame name: {example_frame_name}")


In [2]:
def prepare_data_from_frames_VGG16(base_folder, emotion_to_int, emotion_labels):
    """
    Prepare data from frames using the VGG16 pre-trained model.
    """
    
    print('\t...loading VGG16 pre-trained model', flush=True)
    # Load VGG16 model without the top classification layers
    base_model = VGG16(weights='imagenet', include_top=False)
    feature_extractor = Model(inputs=base_model.input, outputs=base_model.layers[-1].output)

    frames_folder = os.path.join(base_folder, 'Separated_Frames')
    emotion_folders = sorted(os.listdir(frames_folder))
    
    print(f"\t...found {len(emotion_folders)} folders of frames, each from a clip", flush=True)
    
    print('\t\t...resizing frames...', flush=True)
    print('\t\t...extracting features with the CNN model...', flush=True)
    print('\t\t...creating labels from titles of folders...', flush=True)
    
    sequences = []
    labels = []

    total_folders = len(emotion_folders)
    
    for idx, emotion_folder in enumerate(emotion_folders, 1):

        emotion_folder_path = os.path.join(frames_folder, emotion_folder)
        if not os.path.isdir(emotion_folder_path):
            continue

        clip_files = sorted(os.listdir(emotion_folder_path))
        total_files = len(clip_files)
        clip_features = []
        
        for file_idx, clip_file in enumerate(clip_files):
            print(f"\r\t\t...of file {file_idx+1:03}/{total_files} in folder {idx}, called {emotion_folder}  ", end='', flush=True)
            
            clip_path = os.path.join(emotion_folder_path, clip_file)
            frame = cv2.imread(clip_path)
            
            if frame is None:
                print(f"\tError reading image: {clip_path}")
                continue

            # Resize the frame to 224x224 pixels and convert to RGB
            frame = cv2.resize(frame, (224, 224))
            frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        
            frame = preprocess_input(frame)  # VGG16 specific preprocessing
            frame = np.expand_dims(frame, axis=0)

            feature_vector = feature_extractor.predict(frame)
            flattened_vector = np.reshape(feature_vector, (25088,))  # Flatten the feature_vector
            clip_features.append(flattened_vector)  # Append the feature vector without adding an extra dimension

        sequences.append(clip_features)
        labels.append(emotion_to_int[map_to_emotion(emotion_folder)])

    print(f"\n\n   SUMMARY: \n\tProcessed all emotion folders from '{frames_folder}'", flush=True)
    print(f"\t    Total folders processed: {len(sequences)}", flush=True)
    print(f"\t    Labels assigned: {labels}", flush=True)

    return sequences, np.array(labels)



In [5]:
def add_jitter(sequence, factor=0.05):
    # Add some jitter to the sequence for data augmentation
    noise = np.random.normal(0, factor, sequence.shape)
    return sequence + noise

In [6]:
class CustomPrintCallback(Callback):
    def on_epoch_end(self, epoch, logs=None):
        logs = logs or {}
        train_acc = logs.get('accuracy')
        val_acc = logs.get('val_accuracy')
        print(f"\r\t\tEpoch {epoch+1} - train_accuracy: {train_acc:.4f} - val_accuracy: {val_acc:.4f}", end='', flush=True)

In [1]:
def run_lstm_VGG16(sequences, y_train_labels, emotion_labels):
    print('\t...preprocessing inputs', flush=True)
    
    print("\t\t...randomly shuffling sequences and labels'", flush=True)
    # Shuffle the sequences and labels
    shuffled_indices = np.arange(len(sequences))
    np.random.shuffle(shuffled_indices)
    sequences = [sequences[i] for i in shuffled_indices]
    y_train_labels = y_train_labels[shuffled_indices]

    print("\t\t...padding sequences of features with 'post'", flush=True)
    # Pad sequences to the length of the longest sequence
    padded_sequences = pad_sequences(sequences, padding='post', dtype='float32')
    

    # Convert to numpy array
    sequences_array = np.array(padded_sequences)
    
    print('\t\t...transforming labels with one-hot encoding', flush=True)
    y_train_onehot = to_categorical(y_train_labels, num_classes=len(emotion_labels))
    
     # Ask the user if they want to verify the inputs
    verify_choice = input("\nWould you like to verify the inputs before proceeding? (yes/no): ").lower()
    
    if verify_choice == "yes":
        while True:
            print("\n\tChoose a method of verification:")
            print("\t1. Display a summary of the data (shape, data type).")
            print("\t2. Display the first few rows of the data.")
            print("\t3. Display random samples from the data.")
            print("\t4. Display basic statistics for the data.")
            print("\t5. Proceed without further verification.")
            
            verification_method = input("Enter your choice (1/2/3/4/5): ")
            
            if verification_method == "1":
                print("\n\t\tSequences shape:", sequences_array.shape)
                print("\t\tOne-hot encoded labels shape:", y_train_onehot.shape)
            elif verification_method == "2":
                print("\n\t\tFirst few rows of sequences:\n", sequences_array[:5])
                print("\n\t\tFirst few rows of one-hot encoded labels:\n", y_train_onehot[:5])
            elif verification_method == "3":
                random_idx = np.random.randint(0, len(sequences_array), 5)
                print("\n\t\tRandom samples from sequences:\n", sequences_array[random_idx])
                print("\n\t\tRandom samples from one-hot encoded labels:\n", y_train_onehot[random_idx])
            elif verification_method == "4":
                print("\n\t\tSequences mean:", np.mean(sequences_array))
                print("\t\tSequences median:", np.median(sequences_array))
                print("\t\tSequences standard deviation:", np.std(sequences_array))
            elif verification_method == "5":
                break
            else:
                print("\nInvalid choice. Please choose again.")
    
    print('\t...creating the model', flush=True)
    # Define the input shape for the feature vectors
    input_shape = (None, 25088)  # Variable sequence length and VGG16 feature shape

     # Flatten the VGG16 features before feeding them to the LSTM
    inputs = Input(shape=input_shape)
    x = TimeDistributed(Flatten())(inputs)
    x = LSTM(128, return_sequences=False)(x)  # Process the sequence with LSTM layers
    x = Dropout(0.5)(x)  # Added dropout for regularization
    outputs = Dense(len(emotion_labels), activation='softmax')(x)  # Classification layer

    model = Model(inputs=inputs, outputs=outputs, name="Our_Chosen_RNN_Model_VGG16")

    # Adjust the learning rate
    optimizer = Adam(learning_rate=0.0005)  # Adjusted learning rate

    # Compile the model
    model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy'])
    print('\n')
    print('*'*70)
    model.summary()
    print('Dropout: 0.5', '\nActivation: SoftMax', '\nOptimizer: Adam(learning_rate=0.0005)', '\nLoss: loss: categorical_crossentropy')
    print('*'*70, '\n')
    print("\t...implementing data-augmentation with 'jitter'", end='', flush=True)
    # Augment the entire dataset
    X_augmented = [add_jitter(seq) for seq in sequences_array]
    X_augmented = np.array(X_augmented)
    X_augmented = X_augmented.reshape(X_augmented.shape[0], X_augmented.shape[1], -1)
    
    print('\n\t...training the model with k-fold validation', end='', flush=True)
    # Define number of splits
    n_splits = 5
    kf = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=42)
    best_val_accuracy = 0  # to keep track of the best validation accuracy
    best_model = None  # to store the best model
    best_fold = 0
    accuracies = []
    fold = 1
    
    custom_print_callback = CustomPrintCallback()
    early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)
    
    for train_index, val_index in kf.split(X_augmented, y_train_labels):
        print(f"\n\t   Training on fold {fold}/{n_splits}")

        # Split data into training and validation sets for the current fold
        X_train_fold, X_val_fold = X_augmented[train_index], X_augmented[val_index]
        y_train_fold, y_val_fold = y_train_onehot[train_index], y_train_onehot[val_index]

        # Train the model
        # history = model.fit(X_train_fold, y_train_fold, epochs=100, batch_size=16, validation_data=(X_val_fold, y_val_fold), callbacks=[early_stopping])
        history = model.fit(X_train_fold, y_train_fold, epochs=100, batch_size=16, validation_data=(X_val_fold, y_val_fold), verbose=0,
                            callbacks=[custom_print_callback, early_stopping])

        # After training, get the validation accuracy for this fold
        val_accuracy = history.history['val_accuracy'][-1]  # get the last epoch's validation accuracy
        accuracies.append(val_accuracy)

        # Check if this model performs better than previous ones
        if val_accuracy > best_val_accuracy:
            best_val_accuracy = val_accuracy
            best_model = model  # store the current model as the best model
            best_fold = fold
        fold += 1
        
    print(f'\n\t...saving the best model from fold: {best_fold} with accuracy: {best_val_accuracy} as: Models/best_rnn_model.h5', flush=True)
    # After all folds are done, save the best model
    best_model.save("Models/best_rnn_model.h5")
    
    print("\n\n4) Evaluation")
    evaluate_model(best_model, history, X_val_fold, y_val_fold)


In [8]:
def plot_confusion_matrix(best_model, X_val_fold, y_val_fold, emotion_labels):
    # Get the model's predictions
    y_pred_probs = best_model.predict(X_val_fold)
    y_pred = np.argmax(y_pred_probs, axis=1)

    # Convert one-hot encoded y_val_fold back to label indices
    y_true = np.argmax(y_val_fold, axis=1)

    # Compute the confusion matrix
    cm_data = confusion_matrix(y_true, y_pred, labels=range(len(emotion_labels)))
    
    # Convert the confusion matrix to a DataFrame for visualization
    cm = pd.DataFrame(cm_data, columns=emotion_labels, index=emotion_labels)
    cm.index.name = 'Actual'
    cm.columns.name = 'Predicted'
    
    # Plot the confusion matrix
    plt.figure(figsize=(15, 10))
    plt.title('Confusion Matrix', fontsize=20)
    sns.set(font_scale=1.2)
    ax = sns.heatmap(cm, cbar=False, cmap="Blues", annot=True, annot_kws={"size": 16}, fmt='g')
    plt.show()


In [9]:
def evaluate_model(best_model, history, X_val_fold, y_val_fold):
    
    y_pred = best_model.predict(X_val_fold)
    y_pred_classes = np.argmax(y_pred, axis=1)
    y_true = np.argmax(y_val_fold, axis=1)

    while True:
        print("\n    Choose an evaluation option:")
        print("\t1) Evaluate on validation set")
        print("\t2) Plot training loss vs validation loss graph")
        print("\t3) Plot training accuracy vs validation accuracy graph")
        print("\t4) Print confusion matrix")
        print("\t5) Print classification report")
        print("\t6) Exit evaluation")

        choice = input("\nEnter your choice (1/2/3/4/5/6): ")

        if choice == "1":
            accuracy = np.mean(y_true == y_pred_classes)
            print(f"Validation Accuracy: {accuracy * 100:.2f}%")
        elif choice == "2":
            plt.plot(history.history["loss"],'r', label="Training Loss")
            plt.plot(history.history["val_loss"],'b', label="Validation Loss")
            plt.legend()
            plt.show() 
            # plot_loss(history)
        elif choice == "3":
            plt.plot(history.history["accuracy"],'r',label="Training Accuracy")
            plt.plot(history.history["val_accuracy"],'b',label="Validation Accuracy")
            plt.legend()
            plt.show() 
            # plot_accuracy(history)
        elif choice == "4":
             plot_confusion_matrix(best_model, X_val_fold, y_val_fold, emotion_labels)
        elif choice == "5":
            print("Classification Report:")
            print(classification_report(y_true, y_pred_classes))
        elif choice == "6":
            break
        else:
            print("Invalid choice. Please enter a number between 1 and 5.")
