In [2]:
#!pip install facenet_pytorch

^C
[31mERROR: Operation cancelled by user[0m[31m
[0m

In [3]:
import os           

# Check if string is appropriate youtube link
import re
import cv2
import numpy as np
import pandas as pd
from facenet_pytorch import MTCNN
from matplotlib import pyplot as plt
from keras.models import load_model
from pytube import YouTube

import face_recognition

import seaborn as sns
import plotly.express as px

# DEBUG-Mode stops operations when max_emotions were detected and also prints a short summary
DEBUG = True
if DEBUG:
    import time
    debug_params = {
        'max_emotions' : 500
    }

In [30]:
def get_path(kind):
    if kind == "youtube":
        #return "https://www.youtube.com/watch?v=vtT78TfDfXU"                   # Random video
        return 'https://www.youtube.com/watch?v=embYkODkzcs'                 # 7 basic emotions
        #return 'https://www.youtube.com/watch?v=m70UInZKJjU'                    # Two persons
    if kind == "local":
        # adjust individually
        return '/Users/steve/Neue_Fische/face_demo/vids/Video_One_output.mp4'
    if kind == "error_on_purpose":
        return "wrongful path"
    else:
        raise ValueError(f"Passed Argument kind must bei in ['youtube', 'local', 'error_on_purpose'] but was: {kind}")

def youtube_stream(yt_link):
    # Load the video from YouTube
    yt_video = YouTube(yt_link)
    stream = yt_video.streams.get_highest_resolution() 
    stream.download()
    return cv2.VideoCapture(stream.default_filename)

def local_stream(local_path):
    return cv2.VideoCapture(local_path)

def get_stream(path):
    # Check if the string is a YouTube link
    if re.match(r'(https?://)?(www\.)?(youtube\.com|youtu\.?be)/.+$', path):
        return youtube_stream(path)
    # Check if the string is a local path
    elif os.path.isfile(path):
        return local_stream(path)
    # Check if the path is a local file path but no file is found
    elif os.path.exists(path):
        raise ValueError(f"File not found at path: {path}")
    # If it's neither a local path nor a YouTube link, raise an error
    else:
        raise ValueError("The input string is neither a local path nor a YouTube link.")
    
def load_emotion_classifier():
    return load_model("../models/emotion_model.hdf5", compile=False)

def preprocess_face(face, input_face_size):
    face = cv2.cvtColor(face, cv2.COLOR_RGB2GRAY)  # Convert the face to grayscale
    face = cv2.resize(face, (input_face_size[1], input_face_size[0]))  # Swap width and height
    face = face.astype('float32') / 255.0
    face = np.expand_dims(face, axis=-1)  # Add an additional dimension for grayscale channel
    face = np.expand_dims(face, axis=0)
    return face

def print_debug_report(operating_results):    
    print(f'{operating_results["analyzed_emotions"]} faces found in {operating_results["analyzed_frames"]} frames.')
    print(f'{operating_results["frames_without_faces"]} frames had no face detected ({operating_results["frames_without_faces_ratio"]}%).')
    print(f'Stopped operations after around {operating_results["processed_video_time"]} seconds into the video.')
    print(f'Execution time: {operating_results["runtime"]} seconds, processing (roughly) {round(operating_results["processed_video_time"]/operating_results["runtime"],2)} seconds of video per second of execution')

def initialize_face_detector(model_type):
    if model_type == 'haarcascade':
        return cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')
    elif model_type == "MTCNN":
        return MTCNN(keep_all=True, post_process=False, margin=20)
    else:
        raise ValueError("By now, only Haarcascade is implemented.")

def preprocess_frame_for_face_detection_haarcascade(frame):
    return cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

def preprocess_frame_for_emotion_detection(frame):
    return cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

def normalize_boxes_mtcnn(boxes):
    """
    Normalize the bounding box coordinates from MTCNN to numpy indexing format.
    Output format: np.array(y_min, y_max, x_min, x_max)
    """
    normalized_boxes = []
    for box in boxes:
        x_min, y_min, x_max, y_max = box.astype(int)
        normalized_boxes.append([y_min, y_max, x_min, x_max])
    return np.array(normalized_boxes)

def normalize_boxes_cv2(boxes):
    """
    Normalize the bounding box coordinates from OpenCV's format to numpy indexing format.
    Output format: np.array(y_min, y_max, x_min, x_max)
    """
    normalized_boxes = []
    for box in boxes:
        x, y, w, h = box
        normalized_boxes.append([y, y+h, x, x+w])
    return np.array(normalized_boxes)

def detect_faces(frame, face_detector, model_type = 'haarcascade'):
    if model_type == 'haarcascade':
        frame_pp = preprocess_frame_for_face_detection_haarcascade(frame)
        boxes =  face_detector.detectMultiScale(frame_pp, scaleFactor = 1.3, minNeighbors = 3)
        if boxes is None:
            return None
        else:
            return normalize_boxes_cv2(boxes)
    elif model_type == "MTCNN":
        # No preprocessing needed for VideoCapture Frame
        boxes, _ = face_detector.detect(frame)
        if boxes is None:
            return None
        else:
            return normalize_boxes_mtcnn(boxes)
        #return [(x['box'][1], x['box'][0] + x['box'][2], x['box'][1] + x['box'][3], x['box'][0]) for x in bounding_boxes]
    #elif model_type == "fast_MTCNN"
    # https://towardsdatascience.com/face-detection-using-mtcnn-a-guide-for-face-extraction-with-a-focus-on-speed-c6d59f82d49
    else:
        raise ValueError("By now, only Haarcascade is implemented.")
    
def get_ordered_emotions():
    return ['Angry', 'Disgust', 'Fear', 'Happy', 'Sad', 'Surprise', 'Neutral']

def emotions_probability(frame, face_location, emotion_classifier):
    frame_pp = preprocess_frame_for_emotion_detection(frame)
    y_min, y_max, x_min, x_max = face_location
    face = frame_pp[y_min:y_max, x_min:x_max]
    face = preprocess_face(face, input_face_size=emotion_classifier.input_shape[1:3])
    prob = emotion_classifier.predict(face)[0]  # check for underscore
    return prob


def output_video(video, filename):
    width = int(video.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(video.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = int(video.get(cv2.CAP_PROP_FPS))
    fourcc = cv2.VideoWriter_fourcc(*'XVID')
    return cv2.VideoWriter(filename, fourcc, 10, (width,height))

def get_face(frame, face_location):
    # Extract the face region from the frame
    #(top, right, bottom, left) = face_location
    (y_min, y_max, x_min, x_max) = face_location
    face_region = frame[x_min:x_max, y_min:y_max]
    face = cv2.cvtColor(face_region, cv2.COLOR_RGB2GRAY)  # Convert the face to grayscale
    face_region = cv2.resize(face_region, (256, 256))    
    face = face.astype('float32') / 255.0
    face = np.expand_dims(face, axis=-1)  # Add an additional dimension for grayscale channel
    face = np.expand_dims(face, axis=0)
    if face.dtype != "uint8":
        # Find the maximum value based on the current image depth
        image_max_value = 2 ** (8 * face.itemsize) - 1
        # Convert the image to 8-bit
        face = cv2.convertScaleAbs(face, alpha=(255.0 / image_max_value))


    return face

    
def get_face_encoding(face_region):
    face_encoding = face_recognition.face_encodings(cv2.cvtColor(face_region, cv2.COLOR_BGR2RGB))
    return face_encoding

# Initialize known_faces dictionary and character_id
character_id = 1
face_ids = []
known_faces = {}
known_faces_list = [known_faces]
threshold = 0.6

def assign_character_id(face_region, known_faces, threshold, character_id = 1):
    current_face_encoding = get_face_encoding(face_region)

    """if len(current_face_encoding) == 0:         
        return None, known_faces
    """
    
    # If known_faces is empty, add the current face encoding and assign the first character ID
    if not known_faces:
        # Add the current face encoding to the dictionary with the first ID
        known_faces[character_id] = current_face_encoding
        return character_id, known_faces
    

    # Compare the current face encoding with the known faces
    for char_id, face_encoding in known_faces.items():
        match = face_recognition.compare_faces([np.asarray(face_encoding)], np.asarray(current_face_encoding), tolerance=threshold)
        if match:
            # If a match is found, return the character ID
            return char_id, known_faces

    # No match found, add the face to the list of known faces and assign a new character ID
    character_id += 1
    known_faces[character_id] = current_face_encoding

    return character_id, known_faces


def get_overview_df(emotions, character, frame_info, frame_info_cols):
    assert len(frame_info[0]) == len(frame_info_cols), \
        f"Number of columns in frame_info and number of passed names in frame_info_cols is not the same: {len(frame_info[0])} != {len(frame_info_cols)}."

    df_emotions = pd.DataFrame(emotions, columns=get_ordered_emotions())
    df_character = pd.DataFrame(character_ids, columns='character_id')
    df_frame_info = pd.DataFrame(frame_info, columns=frame_info_cols)
    df_all_info = pd.concat([df_emotions, df_character, df_frame_info], axis=1)
    return df_all_info

def get_plottable_df(emotions, character, frame_info, frame_info_cols):

    assert len(frame_info[0]) == len(frame_info_cols), \
        f"Number of columns in frame_info and number of passed names in frame_info_cols is not the same: {len(frame_info[0])} != {len(frame_info_cols)}."

    df_emotions = pd.DataFrame(emotions, columns=get_ordered_emotions())
    df_frame_info = pd.DataFrame(frame_info, columns=frame_info_cols)
    df_character = pd.DataFrame(character_ids, columns='character_id')

    df_all_info = pd.concat([df_emotions, df_character, df_frame_info], axis=1)
    df_plotting = pd.melt(df_all_info, id_vars=frame_info_cols, value_vars=get_ordered_emotions(), var_name='emotion', value_name='probability')
    return df_plotting




In [31]:
if DEBUG: start_time = time.time()

# Define video path
path = get_path('youtube')

# Set the number of frames to skip
frames_to_skip = 1

# Get Video as cv2.VideoCapture
# Can access Youtube Video or local file
video = get_stream(path)

# Initialize the face detection model
#model_type = "MTCNN"
model_type = "haarcascade"
face_detector = initialize_face_detector(model_type)


# Initialize the emotion detection model
emotion_classifier = load_emotion_classifier()

# Initialize lists to store emotions and frame_info
emotions = []
frame_info = []
face_ids = []

face_embeddings = {}


# Initialize counters
frames_without_faces_counter = 0



In [32]:
writer = output_video(video, filename='Output_video.mp4') # ADDED THIS!!
character_ids= []


person_counter = 0
known_face_encodings = []
character_ids = []
FACE_DISTANCE_THRESHOLD = 0.6

# Dictionary that will store the known face encodings for each character ID
character_faces = {}
individual_id_counter = 0


# Loop through each frame of the video
while True:

    # Read the next frame from the video
    ret, frame = video.read()

    # Check if the frame was successfully read
    if not ret:
        break

    # Increment the frame counter
    current_frame_nr = int(video.get(cv2.CAP_PROP_POS_FRAMES))

    # Skip frames based on the frames_to_skip parameter
    if current_frame_nr % frames_to_skip != 0:
        continue
    
    # Find faces within a frame and return list of coordinates of bounding boxes
    face_locations = detect_faces(frame, face_detector, model_type)

    # Check if any faces were found
    if face_locations is None:
        frames_without_faces_counter += 1
        continue

    for i, face_location in enumerate(face_locations):
        face = get_face(frame, face_location)
        prob = emotions_probability(frame, face_location, emotion_classifier)
        emotions.append(prob)

        if i < len(face_locations):
                face_encoding = face_recognition.face_encodings(face, [face_locations[i]])[0]
        else:
            continue            
        current_individual_ids = []

        # Compare the current face encoding with the existing face embeddings
        for individual_id, individual_face_encoding in face_embeddings.items():
            if face_recognition.compare_faces([individual_face_encoding], face_encoding)[0]:
                current_individual_ids.append(individual_id)
                break
        else:
            individual_id_counter += 1
            current_individual_ids.append(individual_id_counter)
            face_embeddings[individual_id_counter] = face_encoding    
        
        # Add the current timestamp (milliseconds) and probabilities of emotions to the frame_description list
        # current implementation to prepare for tuple-wise operation, change frame later to character_nr
        
        frame_info.append(
            (round(video.get(cv2.CAP_PROP_POS_MSEC) / 1000, 2),
                current_frame_nr
                )
        )    

        max_emotion, max_prob = np.argmax(prob), np.max(prob)
        emotion_labels = ['Angry', 'Disgust', 'Fear', 'Happy', 'Sad', 'Surprise', 'Neutral']
        emotion_text = emotion_labels[max_emotion]


        # Assign character IDs to each face in the frame
        character_id = assign_character_id(frame, [face_location])
        character_ids.append(character_id)


        for y_min, y_max, x_min, x_max in face_locations:
            cv2.rectangle(frame, (x_min, y_min), (x_max, y_max), (0, 255, 0), 2)
            cv2.putText(frame, f"Prob: {max_prob:.1%}", (x_min, y_max + 60), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 1)
            cv2.putText(frame, f"{emotion_text}", (x_min, y_max + 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 1)
            # Add the character ID as an annotation to the frame
            cv2.putText(frame, f"Character ID: {current_individual_ids[i]}", (x_min, y_min - 20), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 1)



    writer.write(frame) # ADDED THIS!!!

    cv2.imshow("Faces found", frame)

    # Wait for Esc key to stop
    if cv2.waitKey(20) == ord('q'):
        break

    if DEBUG:
        # For debugging reasons, we stop when we have 1000 emotion values
        if len(emotions) > debug_params['max_emotions']:
            break

# Release the video and close the window
video.release() 

writer.release() # ADDED THIS!!!

# De-allocate any associated memory usage
cv2.destroyAllWindows()

if DEBUG: end_time = time.time()

if DEBUG:
    # When in DEBUG-mode, print some statistics about the faces and emotions detected
    operating_results = {'analyzed_emotions': len(emotions),
                         'analyzed_frames':frame_info[-1][1], 
                         'frames_without_faces':frames_without_faces_counter,
                         'frames_without_faces_ratio': round(100*frames_without_faces_counter/frame_info[-1][1],2),
                         'processed_video_time': round(frame_info[-1][0] / 1000 ,2),
                         'runtime': round(end_time - start_time,2)}
    
    print_debug_report(operating_results)
    
df_plotting = get_plottable_df(emotions, character, frame_info, frame_info_cols=['pos_sec', 'frame'])

OpenCV: FFMPEG: tag 0x44495658/'XVID' is not supported with codec id 12 and format 'mp4 / MP4 (MPEG-4 Part 14)'
OpenCV: FFMPEG: fallback to use tag 0x7634706d/'mp4v'




2023-03-22 10:23:24.029487: I tensorflow/core/grappler/optimizers/custom_graph_optimizer_registry.cc:114] Plugin optimizer for device_type GPU is enabled.


RuntimeError: Unsupported image type, must be 8bit gray or RGB image.