In [11]:
import os
import traceback

import pandas as pd
import numpy as np
import cv2
import mediapipe as mp

# for audio / video extraction, conversion:
import moviepy.editor as moviepyEditor

from matplotlib import pyplot as plt
import numpy as np


#------------------------------------------------------------------------------------
def extract_audio_from_video(video_path):  # , save_mp3_audio_as):
    
    # define clip from path:
    clip = moviepyEditor.VideoFileClip(video_path)
    
    # extract audio:
    return clip.audio.to_soundarray()
    # clip.audio.write_audiofile(save_mp3_audio_as)
    

#------------------------------------------------------------------------------------
# run through single video:
def process_video(video_path, class_name, webcam=False, verbose=False):
    
    detected_keypoints_coordinates = []
    
    #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
    # Media Pipe Initialization:
    mp_drawing = mp.solutions.drawing_utils
    mp_drawing_styles = mp.solutions.drawing_styles
    mp_face_mesh = mp.solutions.face_mesh
    #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
    
    print("Video Capture Path:", video_path)
    
    # For webcam input:
    drawing_spec = mp_drawing.DrawingSpec(thickness=1, circle_radius=1)
    cap = cv2.VideoCapture(video_path)
    with mp_face_mesh.FaceMesh(
        max_num_faces=1,
        refine_landmarks=True,
        min_detection_confidence=0.5,
        min_tracking_confidence=0.5) as face_mesh:
        while cap.isOpened():
            success, image = cap.read()
            if not success:
                if webcam:
                    print("Ignoring empty camera frame.")
                    # If loading a video, use 'break' instead of 'continue'.
                    continue
                else:
                    break
            # To improve performance, optionally mark the image as not writeable to
            # pass by reference.
            image.flags.writeable = False
            image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
            results = face_mesh.process(image)

            # Draw the face mesh annotations on the image.
            image.flags.writeable = True
            image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
            
            '''
            Collection of detected/tracked faces, where each face is represented as a list of 468 face landmarks and 
            each landmark is composed of x, y and z. x and y are normalized to [0.0, 1.0] by the image width and height 
            respectively. z represents the landmark depth with the depth at center of the head being the origin, 
            and the smaller the value the closer the landmark is to the camera. The magnitude of z uses roughly the same
            scale as x.
            '''
            if results.multi_face_landmarks:
                for face in results.multi_face_landmarks:
                    for landmark in face.landmark:
                        x = landmark.x
                        y = landmark.y
                        z = landmark.z

                        if verbose:
                            shape = image.shape 
                            relative_x = int(x * shape[1])
                            relative_y = int(y * shape[0])

                            cv2.circle(image, (relative_x, relative_y), radius=1, color=(225, 0, 100), thickness=1)
            else:
                # print(traceback.format_exc())
                x, y, z = 0.0, 0.0, 0.0
                
            
            # Append detected_keypoints_coordinates:
            detected_keypoints_coordinates.append([x,y,z])                
            
            if verbose:
                cv2.imshow('MediaPipe Face Mesh', cv2.flip(image, 1))
            
            if cv2.waitKey(20) & 0xFF == 27:
                break
    
    cap.release()
    cv2.destroyAllWindows()
    
    return np.array(detected_keypoints_coordinates)


#------------------------------------------------------------------------------------
def mkdir_if_none(directory):
    if not os.path.exists(directory):
        os.makedirs(directory)


#------------------------------------------------------------------------------------    
def save_label_data(class_num, sample_coordinates_data, sample_audio, save_as):
    
    class_label_npy = np.array([class_num])
    # sample_coordinates_data
    
    save_label_as = f"{save_as}_label.npy"
    save_coordinates_as = f"{save_as}_coord.npy"
    save_audio_as = f"{save_as}_audio.npy"
    
    np.save(save_label_as, class_label_npy)
    np.save(save_coordinates_as, sample_coordinates_data)
    np.save(save_audio_as, sample_audio)
    
    print("Saved with shape:")
    print("class_label_npy.shape:", class_label_npy.shape)
    print("sample_coordinates_data.shape:", sample_coordinates_data.shape)
    print("sample_audio.shape:", sample_audio.shape)
    

#------------------------------------------------------------------------------------
# run through all videos and collect - (1) face features and (2) audio features, and (3) save label reference if needed:
def train_data_preparation(video_data_dir="data/Video_chunks/Video_chunks",
                           csv_path="data/Video_chunks/Labels.xlsx",
                           class_to_num = {"truth": 0, "lie": 1},
                           num_to_class = {0: "truth", 1: "lie"},
                           save_keypoints_npy_to=os.path.join(os.getcwd(), "mediaPipe_keypoints_data")):
    
    mkdir_if_none(save_keypoints_npy_to)
    
    # read train data:
    csv_data = pd.read_excel(csv_path, sheet_name="All_Gestures_Deceptive and Trut")
    
    # go over each video and collect face features + audio features:
    for video_name in os.listdir(video_data_dir):
        # collect information about video:
        class_name = video_name.split("_")[1]
        class_num = class_to_num[class_name]
        video_path = os.path.join(video_data_dir, video_name)
        sample_name = video_name.split(".")[0]
        save_as = os.path.join(save_keypoints_npy_to, sample_name)
        
        # run through frames of current video:
        extracted_keypoints_npy = process_video(video_path=video_path, class_name=class_name, verbose=True)
        
        # extract audio:
        extracted_audio_npy = extract_audio_from_video(video_path=video_path)  # , save_mp3_audio_as="sample_audio.mp3")
        
        # save training sample:
        save_label_data(class_num=class_num, 
                        sample_coordinates_data=extracted_keypoints_npy, sample_audio=extracted_audio_npy, 
                        save_as=save_as)
        
        break


# run video processing for (face & audio) feature collection:
train_data_preparation()    

Video Capture Path: data/Video_chunks/Video_chunks\trial_lie_001_000.mp4
Saved with shape:
class_label_npy.shape: (1,)
sample_coordinates_data.shape: (126, 3)
sample_audio.shape: (185661, 2)
