In [None]:
import librosa
import librosa.display
import numpy as np
import pandas as pd
import ffmpeg as ff
import os
from typing import List, Tuple, Dict
import matplotlib.pyplot as plt
import random
import cv2
import pickle
import dlib
import datetime
from facenet_pytorch import MTCNN
import logging
from einops import rearrange
import logging
import torch
from torchvision import transforms, utils, models
from PIL import Image
import face_recognition

In [None]:
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
formatter = logging.Formatter('[%(asctime)s]::%(module)s::%(levelname)s::%(message)s')
streamHandler = logging.StreamHandler()
streamHandler.setFormatter(formatter)
fileHandler = logging.FileHandler('./LOG/personalityLog.log')
fileHandler.setFormatter(formatter)
logger.addHandler(streamHandler)
logger.addHandler(fileHandler)

In [None]:
USE_CUDA = torch.cuda.is_available()
device = torch.device('cuda:0' if USE_CUDA else 'cpu')
margin = 60
number_of_samples =15


In [None]:
face_detector = dlib.get_frontal_face_detector()   
landmark_detector = dlib.shape_predictor("detector/shape_predictor_68_face_landmarks.dat")
LIP_MARGIN = 0.5                # Marginal rate for lip-only image.
RESIZE = (224,224) 

In [None]:
# USE_CUDA = torch.cuda.is_available()
# device = torch.device('cuda:0' if USE_CUDA else 'cpu')
mtcnn= MTCNN(image_size=224, margin=margin, min_face_size= 60, thresholds=[0.6, 0.7, 0.7], post_process=True,device=device)

In [None]:
def get_number_of_frames(file_path: str) -> int:
    probe = ff.probe(file_path)
    video_streams = [stream for stream in probe["streams"] if stream["codec_type"] == "video"]
    #width = video_streams[0]['coded_width']
    #height = video_streams[0]['coded_height']
    del probe
    return video_streams[0]['nb_frames']

In [None]:
def extract_audio_from_video(file_path: str) -> np.ndarray:
    inputfile = ff.input(file_path)#입력 파일 만들기
    out = inputfile.output('-', format='f32le', acodec='pcm_f32le', ac=1, ar='44100')#추출할 오디오 데이터 형식 지정（字节数组）
    raw = out.run(capture_stdout=True)#오디오 파일 추출
    del inputfile, out
    return np.frombuffer(raw[0],np.float32)

In [None]:
def preprocess_audio_series(raw_data: np.ndarray) -> np.ndarray:
    N, M = 24, 1319
    mfcc_data = librosa.feature.mfcc(y=raw_data, sr=44100, n_mfcc=24)#计算raw_data的MFCC特征 ,  n_mfcc=24  Mel Frequency Cepstrum Coefficient
# sr=44100,
    # Getting spectral mean (centroid)
    # mean = librosa.feature.spectral_centroid(result)
    # Standardizing MFCC (zero mean and unit variance)
    mfcc_data_standardized = (mfcc_data - np.mean(mfcc_data)) / np.std(mfcc_data)#mfcc_data를 표준화하다
    # Use pre-padding (Note: with 0, which is also the mean after standardization) to unify the length of the samples.
    number_of_columns_to_fill = M - mfcc_data_standardized.shape[1]#0을 몇 줄 더 보충해야 합니다.
    padding = np.zeros((N, number_of_columns_to_fill))
    padded_data = np.hstack((padding, mfcc_data_standardized))#배열이 수평으로 쌓입니다
   
    return padded_data.reshape(N, M, 1) # Reshaping to N,M,1
    #给定的(N, M, 1)形状可以看作是在指定时间内检测到的M个频率的强度值，其中N是MFCC特征数

In [None]:
def shape_to_list(shape):
	coords = []
	for i in range(36, 42):
		coords.append((shape.part(i).x, shape.part(i).y))
	return coords

In [None]:
def crop_image_to_eyes(frame,landmark):
    lip_landmark = landmark[0:68]                                          # Landmark corresponding to lip
    lip_x = sorted(lip_landmark,key = lambda pointx: pointx[0])             # Lip landmark sorted for determining lip region
    lip_y = sorted(lip_landmark, key = lambda pointy: pointy[1])
    x_add = int((-lip_x[0][0]+lip_x[-1][0])*LIP_MARGIN)                     # Determine Margins for lip-only image
    y_add = int((-lip_y[0][1]+lip_y[-1][1])*LIP_MARGIN)
    crop_pos = (lip_x[0][0]-x_add, lip_x[-1][0]+x_add, lip_y[0][1]-y_add, lip_y[-1][1]+y_add)   # Crop image
    cropped = frame[crop_pos[2]:crop_pos[3],crop_pos[0]:crop_pos[1]]
    if cropped.size != 0:
        cropped = cv2.resize(cropped,(RESIZE[0],RESIZE[1]),interpolation=cv2.INTER_CUBIC)        # Resize
        return cropped
    return None

In [None]:
#추출 할수 있는 얼굴
def extract_N_video_frames(frame_num: str ,file_path: str, number_of_samples: int = 15) -> List[np.ndarray]:
    full_video_frames = []#프레임 데이터를 저장할 배열 만들기
    eyes_video_frames = []
                   # Final image size
    trans=transforms.ToTensor()
    
    frame_num=number_of_samples #추출된 프레임의 개수
    begin_num=0 # 프레임의 시작index
    indexes=[None] * number_of_samples # [0,2.7,5.4......]->[0,2,5......] // [0, 1, 2, 3, 4, 5.... 450?]
    get_frames=int(get_number_of_frames(file_path))
    x=get_frames/number_of_samples
    
    for i in range(frame_num):
        indexes[i]=int(begin_num)
        # print(type(get_number_of_frames(file_path)))
        begin_num+=x
    j=0    
    
    cap = cv2.VideoCapture(file_path)
    logger.debug(file_path)
    for ind in indexes:
        frame_count = 0
        # print(ind,'\n')
        i=ind
        first_go_back_to_before_frames =0
        cap.set(1, ind)
        res, frame = cap.read()#프레임을 읽습니다
        # print("framedatatype",type(frame))
        frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        face = face_detector(frame,1)#1111111111111111111111111111
        while len(face)<1:
            if frame_count > x:
                if first_go_back_to_before_frames==0:
                    i = j+1
                    frame_count = 0
                    first_go_back_to_before_frames=1
                elif first_go_back_to_before_frames==1:
                    break
            i+=1
            if i>=get_frames:
                i=j+1
            cap.set(1,i)
            res, frame = cap.read()
            frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            face = face_detector(frame,1)#1111111111111111111111111111111
            frame_count+=1
        j=i
        if len(face)<1:
            logger.debug("no dectected face")
            break
        elif len(face)>1:
            logger.debug("too many dectected faces")
            break
        else:
            rect = face[0]                    # Proper number of face
            landmark = landmark_detector(frame, rect)
            landmark = shape_to_list(landmark)   # Detect face landmarks
            eyes = crop_image_to_eyes(frame,landmark)
            if eyes is not None:
                eyes = np.uint8(eyes)
                eyes_video_frames.append(cv2.cvtColor(cv2.cvtColor(eyes, cv2.COLOR_BGR2RGB), cv2.COLOR_BGR2RGB))  
                full_video_frames.append(frame)#RGB 형식으로 변환하여 video_frames에 추가합니다

    cap.release()
    del cap, indexes
    return full_video_frames,eyes_video_frames

In [None]:
def resize_image(image: np.ndarray, new_size: Tuple[int,int]) -> np.ndarray:
    image = np.array(image)
    return cv2.resize(image, new_size, interpolation = cv2.INTER_AREA)
#在训练模型时，选取图像的一个随机128x128的窗口；在预测模型时，选取图像中心的128x128窗口。最后返回裁剪后的图像

In [None]:
def crop_image_window(image: np.ndarray, training: bool = True) -> np.ndarray:#这个不是太理解·······························
    height, width, _ = image.shape
    if training:
        MAX_N = height - 128
        MAX_M = width - 128
        rand_N_index, rand_M_index = random.randint(0, MAX_N) , random.randint(0, MAX_M)
        return image[rand_N_index:(rand_N_index+128),rand_M_index:(rand_M_index+128),:]
    else:
        N_index = (height - 128) // 2
        M_index = (width - 128) // 2
        return image[N_index:(N_index+128),M_index:(M_index+128),:]

In [None]:
def reading_label_data(file_name: str, dictionary: Dict[str,str]) -> np.ndarray:#영상의 5대 점수를 획득했습니다
    features = ['extraversion', 'neuroticism', 'agreeableness', 'conscientiousness', 'openness']
    extracted_data = [float(dictionary[label][file_name]) for label in features]
    return np.stack(extracted_data).reshape(5,1)

In [None]:
def preprocessing_input(file_path: str, file_name: str, dictionary: Dict[str, str], training: bool = True) -> Tuple[
    np.ndarray, np.ndarray, np.ndarray]:
    # Audio
    extracted_audio_raw = extract_audio_from_video(file_path=file_path)#오디오 데이터 가져오기
    preprocessed_audio = preprocess_audio_series(raw_data=extracted_audio_raw)#오디오 파일 처리
    frame_num = int(get_number_of_frames(file_path))
    # Video
    full_video_frames,face_video_frames = extract_N_video_frames(frame_num=frame_num,file_path=file_path, number_of_samples=number_of_samples)#128프레임 획득
    resized_Full_images = [resize_image(image=im, new_size=(224, 224)) for im in full_video_frames]#크기를 조정하다
    # cropped_images = [crop_image_window(image=resi, training=training) / 255.0 for resi in resized_images]#对图像进行裁剪
    if len(full_video_frames)==number_of_samples:
        preprocessed_full_video = np.stack(resized_Full_images)#하나의 배열을 합성하다
        preprocessed_face_video = np.stack(face_video_frames)
        # Ground Truth
        video_gt = reading_label_data(file_name=file_name, dictionary=dictionary)#태그 읽기
        del face_video_frames,full_video_frames
        return (preprocessed_audio,preprocessed_full_video,preprocessed_face_video, video_gt)
    return (None,None, None, None)

In [None]:
def reshape_to_expected_fullinput(dataset: List[Tuple[ np.ndarray, np.ndarray]]) -> Tuple[
     np.ndarray, np.ndarray]:
    x0_list = []
    x1_list = []
    x2_list = []
    for i in range(0, len(dataset)):
        x0_list.append(dataset[i][0])
        x1_list.append(dataset[i][1])
        x2_list.append(dataset[i][3])
    return (np.stack(x0_list), np.stack(x1_list),np.stack(x2_list))

In [None]:
def reshape_to_expected_faceinput(dataset: List[Tuple[ np.ndarray, np.ndarray]]) -> Tuple[
     np.ndarray, np.ndarray]:
    x0_list = []
    x1_list = []
    x2_list = []
    for i in range(0, len(dataset)):
        x0_list.append(dataset[i][0])
        x1_list.append(dataset[i][2])
        x2_list.append(dataset[i][3])
    return (np.stack(x0_list), np.stack(x1_list),np.stack(x2_list))

In [None]:
training_set_data = []
fullsavename = '/home/ssrlab/qx/video-swin-transformer-pytorch/data/15Frames/eyes/full/valid_set.dat'
facesavename = '/home/ssrlab/qx/video-swin-transformer-pytorch/data/15Frames/eyes/eyes/valid_set.dat'
path ='/home/ssrlab/qx/Big5/valid'
gt = pickle.load( open( "/home/ssrlab/qx/Big5/gt/annotation_validation.pkl", "rb" ), encoding='latin1' )#태그 정보 얻기
t1 = datetime.datetime.utcnow()
i=1
filenum=1 #500개의 비디오마다 하나의 파일에 저장
for filename in os.listdir(path):#파일 내 비디오 둘러보기
    filePath = path+'/'+filename
    data = preprocessing_input(file_path= filePath, file_name= filename, dictionary= gt, training= True)
    if int(get_number_of_frames(filePath))>128:
        if data[0] is None:
            continue
        else: 
            training_set_data.append(data)
            print('2000/',i)
            i+=1
with open(fullsavename, "wb") as f:
    pickle.dump(reshape_to_expected_fullinput(training_set_data), f)
fullsavename = []
with open(facesavename, "wb") as f:
    pickle.dump(reshape_to_expected_faceinput(training_set_data), f)
facesavename = []
t2 = datetime.datetime.utcnow()
#Measuring execution time
print('Elapsed time: ' + str(t2-t1))#

In [None]:
training_set_data = []
fullsavename = '/home/ssrlab/qx/video-swin-transformer-pytorch/data/15Frames/eyes/full/train_set.dat'
facesavename = '/home/ssrlab/qx/video-swin-transformer-pytorch/data/15Frames/eyes/eyes/train_set.dat'
path ='/home/ssrlab/qx/Big5/train'
gt = pickle.load( open( "/home/ssrlab/qx/Big5/gt/annotation_training.pkl", "rb" ), encoding='latin1' )#태그 정보 얻기
t1 = datetime.datetime.utcnow()

i=1
filenum=1 #500개의 비디오마다 하나의 파일에 저장
for filename in os.listdir(path):#파일 내 비디오 둘러보기
    filePath = path+'/'+filename
    data = preprocessing_input(file_path= filePath, file_name= filename, dictionary= gt, training= True)
    frame_num = int(get_number_of_frames(filePath))
    if frame_num>128:
        if data[0] is None:
            continue
        else: 
            training_set_data.append(data)
            print('6000/',i)
            i+=1
with open(fullsavename, "wb") as f:
    pickle.dump(reshape_to_expected_fullinput(training_set_data), f)
fullsavename = []
with open(facesavename, "wb") as f:
    pickle.dump(reshape_to_expected_faceinput(training_set_data), f)
facesavename = []
t2 = datetime.datetime.utcnow()
#Measuring execution time
print('Elapsed time: ' + str(t2-t1))#

In [None]:
# training_set_data = []
# path = '/home/ssrlab/qx/Big5/train'
# gt = pickle.load( open( "/home/ssrlab/qx/Big5/gt/annotation_training.pkl", "rb" ), encoding='latin1' )
# t1 = datetime.datetime.utcnow()
# i=1
# continue_start_num=1
# partnum = 1
# filenum=1 #500개의 비디오마다 하나의 파일에 저장
# count=0 #500개의 데이터가 있는지 검증
# for filename in os.listdir(path):#파일 내 비디오 둘러보기
#     filePath = path+'/'+filename
#     alldata=preprocessing_input(file_path= filePath, file_name= filename, dictionary= gt, training= True)
#     if partnum < 12:
#         if count==500:
#             fullsavename = '/home/ssrlab/qx/video-swin-transformer-pytorch/data/32Framesfullandfaceandaudio/full/train_set{}.dat'.format(filenum)
#             facesavename = '/home/ssrlab/qx/video-swin-transformer-pytorch/data/32Framesfullandfaceandaudio/face/train_set{}.dat'.format(filenum)
#             with open(fullsavename, "wb") as f:
#                 pickle.dump(reshape_to_expected_fullinput(training_set_data), f)
#             with open(facesavename, "wb") as f:
#                 pickle.dump(reshape_to_expected_faceinput(training_set_data), f)
#             count=0
#             filenum+=1
#             training_set_data = []
#             partnum+=1
#         else:
#             if int(get_number_of_frames(filePath))>128:
#                 if alldata[0] is None:
#                     continue
#                 else:
#                     training_set_data.append(alldata)
#                     # print(len(reshape_to_expected_input(reshape_to_expected_fullinput(training_set_data))[0][1]))
#                     print('6000/',i)
#                     i+=1
#                     count+=1
#     else:
#         fullsavename = '/home/ssrlab/qx/video-swin-transformer-pytorch/data/32Framesfullandfaceandaudio/full/train_set{}.dat'.format(filenum)
#         facesavename = '/home/ssrlab/qx/video-swin-transformer-pytorch/data/32Framesfullandfaceandaudio/face/train_set{}.dat'.format(filenum)
#         if int(get_number_of_frames(filePath))>128:
#             if alldata[0] is None:
#                 continue
#             else:
#                 training_set_data.append(alldata)
#                 print('6000/',i)
#                 i+=1
#                 count+=1
# with open(fullsavename, "wb") as f:
#     pickle.dump(reshape_to_expected_fullinput(training_set_data), f)
# fullsavename = []
# with open(facesavename, "wb") as f:
#     pickle.dump(reshape_to_expected_faceinput(training_set_data), f)
# facesavename = []
# t2 = datetime.datetime.utcnow()
# #Measuring execution time
# print('Elapsed time: ' + str(t2-t1))#
# #얻은 데이터를 파일에 저장합니다

In [None]:
# training_set_data = []
# path ='/home/ssrlab/qx/Big5/test'
# gt = pickle.load( open( "/home/ssrlab/qx/Big5/gt/annotation_test.pkl", "rb" ), encoding='latin1' )#태그 정보 얻기
# t1 = datetime.datetime.utcnow()
# i=1
# filenum=1 #500개의 비디오마다 하나의 파일에 저장
# for filename in os.listdir(path):#파일 내 비디오 둘러보기
#     filePath = path+'/'+filename
#     if int(get_number_of_frames(filePath))>280:
#         print('2000/',i)
#         i+=1
#         training_set_data.append(preprocessing_input(file_path= filePath, file_name= filename, dictionary= gt, training= True))
# savename = '/home/ssrlab/qx/code/test/video-swin-transformer-pytorch/data/face/15Frames/test/qxtest_set{}.dat'.format(filenum)
# with open(savename, "wb") as f:
#     pickle.dump(training_set_data, f)
# t2 = datetime.datetime.utcnow()
# #Measuring execution time
# print('Elapsed time: ' + str(t2-t1))#

In [None]:
# training_set_data = []
# path ='/home/ssrlab/qx/Big5/train'
# gt = pickle.load( open( "/home/ssrlab/qx/Big5/gt/annotation_training.pkl", "rb" ), encoding='latin1' )#태그 정보 얻기
# t1 = datetime.datetime.utcnow()
# i=1
# filenum=1 #500개의 비디오마다 하나의 파일에 저장
# count=0 #500개의 데이터가 있는지 검증
# for filename in os.listdir(path):#파일 내 비디오 둘러보기
#     filePath = path+'/'+filename
#     if count==500:
#         savename = '/home/ssrlab/qx/code/test/video-swin-transformer-pytorch/data/face/15Frames/train/qxtrain_set{}.dat'.format(filenum)
#         with open(savename, "wb") as f:
#             pickle.dump(training_set_data, f)
#         count=1
#         filenum+=1
#         training_set_data = []
#     else:
#         if int(get_number_of_frames(filePath))>280:
#             print('6000/',i)
#             i+=1
#             training_set_data.append(preprocessing_input(file_path= filePath, file_name= filename, dictionary= gt, training= True))
#             count+=1
# t2 = datetime.datetime.utcnow()
# #Measuring execution time
# print('Elapsed time: ' + str(t2-t1))#
# #얻은 데이터를 파일에 저장합니다


In [None]:
# validation_set_data = []
# path = '/home/ssrlab/qx/Big5/valid'
# gt = pickle.load( open( "/home/ssrlab/qx/Big5/gt/annotation_validation.pkl", "rb" ), encoding='latin1' )
# t1 = datetime.datetime.utcnow()
# i=1
# filenum=1
# count=0
# for filename in os.listdir(path):#파일 내 비디오 둘러보기
#     filePath = path+'/'+filename
#     if count==500:
#         savename = '/home/ssrlab/qx/code/test/video-swin-transformer-pytorch/data/face/15Frames/valid/qxvalid_set{}.dat'.format(filenum)
#         with open(savename, "wb") as f:
#             pickle.dump(validation_set_data, f)
#         count=1
#         filenum+=1
#         validation_set_data = []
#     else:
#         if int(get_number_of_frames(filePath))>280:
#             print('6000/',i)
#             i+=1
#             validation_set_data.append(preprocessing_input(file_path= filePath, file_name= filename, dictionary= gt, training= True))
#             count+=1
# t2 = datetime.datetime.utcnow()
# #Measuring execution time
# print('Elapsed time: ' + str(t2-t1))#
# #얻은 데이터를 파일에 저장합니다

In [None]:
# test_set_data = []
# path = '/home/ssrlab/qx/Big5/test'
# gt = pickle.load( open( "/home/ssrlab/qx/Big5/gt/annotation_test.pkl", "rb" ), encoding='latin1' )
# t1 = datetime.datetime.utcnow()
# i=1
# filenum=1
# count=0
# for filename in os.listdir(path):#파일 내 비디오 둘러보기
#     filePath = path+'/'+filename
#     if count==500:
#         savename = '/home/ssrlab/qx/code/test/video-swin-transformer-pytorch/data/face/15Frames/test/qxtest_set{}.dat'.format(filenum)
#         with open(savename, "wb") as f:
#             pickle.dump(test_set_data, f)
#         count=1
#         filenum+=1
#         test_set_data = []
#     else:
#         if int(get_number_of_frames(filePath))>280:
#             print('2000/',i)
#             i+=1
#             test_set_data.append(preprocessing_input(file_path= filePath, file_name= filename, dictionary= gt, training= True))
#             count+=1
# t2 = datetime.datetime.utcnow()
# #Measuring execution time
# print('Elapsed time: ' + str(t2-t1))#
# #얻은 데이터를 파일에 저장합니다