<a href="https://colab.research.google.com/github/MOAzeemKhan/MM_Reaction_Analysis/blob/main/MM_Emotional_Detection_From_Directory_Final_GPU.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Approach 1
## About Model: ViT-Face-Expression model from Hugging Face, a transformer-based pre-trained model explicitly designed for emotion detection tasks.

## Using: Facenet_pytorch's MTCNN for accurate face detection and transformers for leveraging the cutting-edge ViT-Face-Expression model.

In [None]:
!pip install facenet-pytorch

In [None]:
import torch

# Set device to GPU if available, otherwise use CPU
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print('Running on device: {}'.format(device))

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from matplotlib.backends.backend_agg import FigureCanvasAgg as FigureCanvas
import seaborn as sns
from tqdm.notebook import tqdm

from moviepy.editor import VideoFileClip, ImageSequenceClip

import torch
from facenet_pytorch import (MTCNN)

from transformers import (AutoFeatureExtractor,
                          AutoModelForImageClassification,
                          AutoConfig)

from PIL import Image, ImageDraw
from moviepy.editor import VideoFileClip
import numpy as np
from tqdm import tqdm
import os
from moviepy.editor import VideoFileClip, concatenate_videoclips
import time
import gc

In [None]:
def detect_emotions(image):
    print("In Detect Emotions")
    """
    Detect emotions from a given image, displays the detected
    face and the emotion probabilities in a bar plot.

    Parameters:
    image (PIL.Image): The input image.

    Returns:
    PIL.Image: The cropped face from the input image.
    """

    # Create a copy of the image to draw on
    temporary = image.copy()
    #plt.imshow(temporary)
    # Use the MTCNN model to detect faces in the image
    sample = mtcnn.detect(temporary)
    print(sample)
    # If a face is detected
    if sample[0] is not None:
        #print("IN IFFFF")
        # Get the bounding box coordinates of the face
        box = sample[0][0]

        # Crop the detected face from the image
        face = temporary.crop(box)

        # Pre-process the cropped face to be fed into the
        # emotion detection model
        inputs = extractor(images=face, return_tensors="pt")

        # Pass the pre-processed face through the model to
        # get emotion predictions
        outputs = model(**inputs)

        # Apply softmax to the logits to get probabilities
        probabilities = torch.nn.functional.softmax(outputs.logits,
                                                    dim=-1)

        # Retrieve the id2label attribute from the configuration
        id2label = AutoConfig.from_pretrained(
            "trpakov/vit-face-expression", cache_dir = path_cache
        ).id2label

        # Convert probabilities tensor to a Python list
        probabilities = probabilities.detach().numpy().tolist()[0]

        # Map class labels to their probabilities
        class_probabilities = {id2label[i]: prob for i,
                               prob in enumerate(probabilities)}

        # Define colors for each emotion
        colors = {
            "angry": "red",
            "disgust": "green",
            "fear": "gray",
            "happy": "yellow",
            "neutral": "purple",
            "sad": "blue",
            "surprise": "orange"
        }
        #palette = [colors[label] for label in class_probabilities.keys()]
        #print("IN IFFF")
        # Prepare a figure with 2 subplots: one for the face image,
        # one for the bar plot
        #fig, axs = plt.subplots(1, 2, figsize=(15, 6))

        # Display the cropped face in the left subplot
        #axs[0].imshow(np.array(face))
        #axs[0].axis('off')

        # Create a horizontal bar plot of the emotion probabilities in
        # the right subplot
        #sns.barplot(ax=axs[1],
                    #y=list(class_probabilities.keys()),
                    #x=[prob * 100 for prob in class_probabilities.values()],
                    #palette=palette,
                    #orient='h')
        #axs[1].set_xlabel('Probability (%)')
        #axs[1].set_title('Emotion Probabilities')
        #axs[1].set_xlim([0, 100])  # Set x-axis limits to show percentages

        # Show the plot
        #plt.show()
        del inputs, outputs, probabilities
        torch.cuda.empty_cache()
        return face, class_probabilities
    del temporary, sample
    gc.collect()
    return None, None

In [None]:
def create_combined_image(face, class_probabilities):
    print("In Create Combined Functions")
    """
    Create an image combining the detected face and a barplot
    of the emotion probabilities.

    Parameters:
    face (PIL.Image): The detected face.
    class_probabilities (dict): The probabilities of each
        emotion class.

    Returns:
    np.array: The combined image as a numpy array.
    """
    # Define colors for each emotion
    colors = {
        "angry": "red",
        "disgust": "green",
        "fear": "gray",
        "happy": "yellow",
        "neutral": "purple",
        "sad": "blue",
        "surprise": "orange"
    }
    palette = [colors[label] for label in class_probabilities.keys()]

    # Create a figure with 2 subplots: one for the
    # face image, one for the barplot
    fig, axs = plt.subplots(1, 2, figsize=(15, 6))

    # Display face on the left subplot
    axs[0].imshow(np.array(face))
    axs[0].axis('off')

    # Create a barplot of the emotion probabilities
    # on the right subplot
    sns.barplot(ax=axs[1],
                y=list(class_probabilities.keys()),
                x=[prob * 100 for prob in class_probabilities.values()],
                palette=palette,
                orient='h')
    axs[1].set_xlabel('Probability (%)')
    axs[1].set_title('Emotion Probabilities')
    axs[1].set_xlim([0, 100])  # Set x-axis limits

    # Convert the figure to a numpy array
    canvas = FigureCanvas(fig)
    canvas.draw()
    img = np.frombuffer(canvas.tostring_rgb(), dtype='uint8')
    img  = img.reshape(canvas.get_width_height()[::-1] + (3,))

    plt.close(fig)
    del fig, axs, canvas, palette
    gc.collect()
    return img

In [None]:
path_cache = "/content/"
# Set cache directories for XDG and Hugging Face Hub
os.environ['XDG_CACHE_HOME'] = '/home/msds2023/jlegara/.cache'
os.environ['HUGGINGFACE_HUB_CACHE'] = '/home/msds2023/jlegara/.cache'

In [None]:
# Set cache directories for XDG and Hugging Face Hub
#os.environ['XDG_CACHE_HOME'] = '/data/mpstme-azeem/msds2023/jlegara/.cache'
#os.environ['HUGGINGFACE_HUB_CACHE'] = '/data/mpstme-azeem/msds2023/jlegara/.cache'

In [None]:
# Initialize MTCNN model for single face cropping
mtcnn = MTCNN(
    image_size=160,
    margin=0,
    min_face_size=200,
    thresholds=[0.6, 0.7, 0.7],
    factor=0.709,
    post_process=True,
    keep_all=False,
    device=device
)
# Load the pre-trained model and feature extractor
extractor = AutoFeatureExtractor.from_pretrained(
    "trpakov/vit-face-expression", cache_dir = path_cache
)
model = AutoModelForImageClassification.from_pretrained(
    "trpakov/vit-face-expression", cache_dir = path_cache
)

In [None]:
# Set cache directories for XDG and Hugging Face Hub
#os.environ['XDG_CACHE_HOME'] = '/data/mpstme-azeem/msds2023/jlegara/.cache'
#os.environ['HUGGINGFACE_HUB_CACHE'] = '/data/mpstme-azeem/msds2023/jlegara/.cache'

In [None]:
# Set cache directories for XDG and Hugging Face Hub
os.environ['XDG_CACHE_HOME'] = '/home/msds2023/jlegara/.cache'
os.environ['HUGGINGFACE_HUB_CACHE'] = '/home/msds2023/jlegara/.cache'

In [17]:
def split_video_into_chunks(path, output_dir):
    start2 = time.time()
    print("In Split Video into Chunks Function")

    # Load your video
    clip = VideoFileClip(path)
    vid_fps = clip.fps
    video = clip.without_audio()

    batch_size = 32
    chunk_size = 500
    batch_frames = []
    current_timestamps = []
    chunk_count = 0

    # Initialize current_chunk and current_chunk_timestamps
    current_chunk = []
    current_chunk_timestamps = []

    # Process video frames
    for t, frame in tqdm(video.iter_frames(with_times=True)):
        batch_frames.append(np.array(frame))
        current_timestamps.append(t)

        if len(batch_frames) >= batch_size:
            current_chunk.extend(batch_frames)
            current_chunk_timestamps.extend(current_timestamps)
            current_timestamps = []
            batch_frames = []

        if len(current_chunk) >= chunk_size:
            # Save current chunk
            np.save(os.path.join(output_dir, f'video_chunk_{chunk_count}.npy'), np.array(current_chunk))
            np.save(os.path.join(output_dir, f'timestamps_chunk_{chunk_count}.npy'), np.array(current_chunk_timestamps))
            chunk_count += 1
            # Clear current chunk
            current_chunk = []
            current_chunk_timestamps = []
            gc.collect()

    # Save any remaining frames
    if len(current_chunk) > 0:
        np.save(os.path.join(output_dir, f'video_chunk_{chunk_count}.npy'), np.array(current_chunk))
        np.save(os.path.join(output_dir, f'timestamps_chunk_{chunk_count}.npy'), np.array(current_chunk_timestamps))

    # Close the video clip
    clip.reader.close()
    if clip.audio:
        clip.audio.reader.close_proc()

    # Delete variables and collect garbage
    del video, batch_frames, current_chunk, current_timestamps, current_chunk_timestamps, clip
    gc.collect()

    end2 = time.time()
    print(f"Total chunks saved: {chunk_count + 1}\nTotal Chunking Took: {end2 - start2}")
    return vid_fps


In [None]:
def rest_code(path_chunk, chunk_output_dir, vid_fps, skips):
  print("In Rest Code Function")

  #chunk_dir -> path of a particular chunk
  #path_chunk -> path of the directory containing all chunks
  # Take care of timestamp

  # Ensure the output directory exists
  os.makedirs(chunk_output_dir, exist_ok=True)

  npy_files = [f for f in os.listdir(path_chunk) if f.endswith('.npy') and 'video_chunk' in f]

  for file in npy_files:
    reduced_video = []
    # List to hold the combined images
    combined_images = []
    # Create a list to hold the class probabilities for all frames
    all_class_probabilities = []

    print(f"Processing chunk: {file}")

    chunk_dir = os.path.join(path_chunk, file)
    loaded_chunk = np.load(chunk_dir)
    timestamp_chunk = np.load(chunk_dir.replace('video_chunk', 'timestamps_chunk'))

    print(f"Loaded chunk shape: {loaded_chunk.shape}")

    for i in tqdm(range(0, len(loaded_chunk), skips)):
        reduced_video.append(loaded_chunk[i])

    reduced_timestamps = [timestamp_chunk[i] for i in range(0, len(timestamp_chunk), skips)]

    # Define a list of emotions
    emotions = ["angry", "disgust", "fear", "happy", "neutral", "sad", "surprise"]

    #timestamps = []

    # Loop over video frames
    for i, frame in tqdm(enumerate(reduced_video),
                        total=len(reduced_video),
                        desc="Processing frames"):
        # Convert frame to uint8
        frame = frame.astype(np.uint8)
        #print(frame)
        # Calculate the accurate timestamp for this frame
        # Since we're skipping frames, the timestamp is calculated based on the original frame index (i * skips)
        #timestamp = (i * skips) * time_per_frame
        #timestamps.append(timestamp)

        # Call detect_emotions to get face and class probabilities
        #print(Image.fromarray(frame))
        face, class_probabilities = detect_emotions(Image.fromarray(frame))

        # If a face was found
        if face is not None:
            # Create combined image for this frame
            combined_image = create_combined_image(face, class_probabilities)

            # Append combined image to the list
            combined_images.append(combined_image)
        else:
            # If no face was found, set class probabilities to None
            class_probabilities = {emotion: None for emotion in emotions}

        # Append class probabilities to the list
        all_class_probabilities.append(class_probabilities)

    # Convert list of images to video clip
    clip_with_plot = ImageSequenceClip(combined_images,
                                      fps=vid_fps/skips)  # Choose the frame rate (fps) according to your requirement

    # Generate unique filenames for this chunk
    chunk_name = os.path.basename(chunk_dir).split('.')[0]
    video_output_path = os.path.join(chunk_output_dir, f"{chunk_name}_plot_video.mp4")
    #prob_output_path = os.path.join(chunk_output_dir, f"{chunk_name}_class_probabilities.pkl")
    df_output_path = os.path.join(chunk_output_dir, f"{chunk_name}_class_probabilities.csv")
    #reduced_video_output_path = os.path.join(chunk_output_dir, f"{chunk_name}_reduced_video.npy")

    # Write the video to a file with a specific frame rate
    clip_with_plot.write_videofile(video_output_path, fps=vid_fps/skips)

    # Define colors for each emotion
    colors = {
        "angry": "red",
        "disgust": "green",
        "fear": "gray",
        "happy": "yellow",
        "neutral": "purple",
        "sad": "blue",
        "surprise": "orange"
    }

    # Convert list of class probabilities into a DataFrame
    df = pd.DataFrame(all_class_probabilities)

    # Convert probabilities to percentages
    df = df * 100
    df['timestamp'] = reduced_timestamps
    # Save DataFrame to CSV file
    df.to_csv(df_output_path, index=False)

    print(f"Last timestamp: {reduced_timestamps[-1]}")
    print(f"Processed chunk saved: {video_output_path},{df_output_path}")

    # Clear memory after each chunk is processed
    del reduced_video
    del combined_images
    del all_class_probabilities
    del clip_with_plot
    del df

  print(f"All Processed Chunks Successfully saved to: {chunk_output_dir}")

In [19]:
def merge(chunk_output_dir, final_dir):
    chunk_dir = chunk_output_dir
    print("In Merge Function")
    start = time.time()
    # Ensure the output directory exists
    os.makedirs(final_dir, exist_ok=True)
    os.makedirs(chunk_output_dir, exist_ok=True)

    final_df_path = os.path.join(final_dir, "final_class_probabilities.csv")
    output_file = os.path.join(final_dir, "final_merged_video.mp4")

    # List all CSV files in the chunk output directory
    csv_files = [f for f in os.listdir(chunk_output_dir) if f.endswith('_class_probabilities.csv')]

    # Sort the CSV files numerically by the chunk number (extracted from 'video_chunk_<number>_class_probabilities.csv')
    csv_files = sorted(csv_files, key=lambda x: int(x.split('_')[2]))

    # Initialize an empty DataFrame to store the final merged DataFrame
    final_df = pd.DataFrame()

    # Variable to track the current frame count across all chunks
    frame_count = 0

    # Iterate over each CSV file in the correct order and concatenate them
    for csv_file in csv_files:
        # Load the chunk DataFrame
        chunk_df = pd.read_csv(os.path.join(chunk_output_dir, csv_file))

        # Update the index to reflect the correct frame order
        chunk_df.index = range(frame_count, frame_count + len(chunk_df))

        # Append the chunk DataFrame to the final DataFrame
        final_df = pd.concat([final_df, chunk_df], axis=0)

        # Update the frame count
        frame_count += len(chunk_df)

    final_df.to_csv(final_df_path, index=True)
    # Check timestamp distribution
    print(f"Timestamp range: {final_df['timestamp'].min()} - {final_df['timestamp'].max()}")
    print(f"Final DataFrame saved to: {final_df_path}")

    # Define colors for each emotion
    colors = {"angry": "red","disgust": "green","fear": "gray","happy": "yellow","neutral": "purple","sad": "blue","surprise": "orange"}

    #Exclude 'timestamp' from columns for plotting emotions, but use it as the x-axis
    columns_to_plot = [col for col in final_df.columns if col != 'timestamp']

    plt.figure(figsize=(15, 8))
    for emotion in columns_to_plot:
        plt.plot(final_df['timestamp'], final_df[emotion], label=emotion, color=colors[emotion])

    plt.xlabel('Timestamp (seconds)')
    plt.ylabel('Emotion Probability (%)')
    plt.title('Emotion Probabilities Over Time')
    plt.legend()
    plt.savefig(os.path.join(final_dir, "final_plot.png"))

    plt.show()
    plt.close()

    # List all processed video chunks in the directory
    video_files = [f for f in os.listdir(chunk_output_dir) if f.endswith('_plot_video.mp4')]

    # Sort the files in ascending order based on chunk number
    video_files.sort(key=lambda f: int(f.split('_')[2]))

    # List to hold video clips
    clips = []

    # Load each video chunk and append to clips list
    for video_file in video_files:
        video_path = os.path.join(chunk_dir, video_file)
        clip = VideoFileClip(video_path)
        clips.append(clip)

    # Concatenate the video clips in the correct order
    final_clip = concatenate_videoclips(clips, method="compose")

    # Write the final merged video to output_file
    final_clip.write_videofile(output_file)

    # Close all the clips to release resources
    for clip in clips:
        clip.close()

    end = time.time()
    print(f"Merged video saved to {output_file}")
    print(f"Total Merging Took: {end-start}")

In [None]:
def working(video_dir, output_base_dir, skips):
  print("In working Function")
  os.makedirs(output_base_dir, exist_ok=True)

  start1 = time.time()
  # Directory where chunks are saved
  final_output_dir = os.path.join(output_base_dir, 'processed_reaction_videos')
  os.makedirs(final_output_dir, exist_ok=True)

  video_files = [f for f in os.listdir(video_dir) if f.endswith('.mp4')]
  for video_file in video_files:

    video_name = os.path.splitext(video_file)[0]  # Get filename without extension
    video_output_dir = os.path.join(final_output_dir, video_name)
    chunk_output_dir = os.path.join(video_output_dir, 'chunks')
    processed_output_dir = os.path.join(video_output_dir, 'processed_data')
    #final_video_dir = os.path.join(video_output_dir, 'final_video')

    # Create directory to save the chunks
    os.makedirs(chunk_output_dir, exist_ok=True)

    video_path = os.path.join(video_dir, video_file)
    print(f"Processing video: {video_file}")

    vid_fps = split_video_into_chunks(video_path, chunk_output_dir)
    print(vid_fps)
    print(f"Splitting in Chunks Completed, Chunks saved at: {chunk_output_dir}")

    processed_output_dir = os.path.join(video_output_dir, 'processed_frames')
    final_dir = os.path.join(video_output_dir, 'final_video_plot')
    os.makedirs(processed_output_dir, exist_ok=True)
    #os.makedirs(final_video_dir, exist_ok=True)

    rest_code(chunk_output_dir, processed_output_dir, vid_fps, skips)
    merge(processed_output_dir, final_dir)
    end1 = time.time()
    print(f"Total Processing Time: {end1-start1}")

In [None]:
working("/content/drive/MyDrive/MM/Code/test", "/content/drive/MyDrive/MM/Code/test/", 2)

In working Function
Processing video: Copy of daivik_cropped_clipped.mp4
In Split Video into Chunks Function


508it [00:04, 81.92it/s]