In [2]:
# 필요한 라이브러리 설치하기
!pip install pafy youtube-dl moviepy opencv-python

Collecting pafy
  Downloading pafy-0.5.5-py2.py3-none-any.whl (35 kB)
Collecting youtube-dl
  Downloading youtube_dl-2021.12.17-py2.py3-none-any.whl (1.9 MB)
Collecting moviepy
  Downloading moviepy-1.0.3.tar.gz (388 kB)
Collecting decorator<5.0,>=4.0.2
  Downloading decorator-4.4.2-py2.py3-none-any.whl (9.2 kB)
Collecting tqdm<5.0,>=4.11.2
  Downloading tqdm-4.64.0-py2.py3-none-any.whl (78 kB)
Collecting proglog<=1.0.0
  Downloading proglog-0.1.10-py3-none-any.whl (6.1 kB)
Collecting imageio<3.0,>=2.5
  Downloading imageio-2.19.3-py3-none-any.whl (3.4 MB)
Collecting imageio_ffmpeg>=0.2.0
  Downloading imageio_ffmpeg-0.4.7-py3-none-win_amd64.whl (22.6 MB)
Building wheels for collected packages: moviepy
  Building wheel for moviepy (setup.py): started
  Building wheel for moviepy (setup.py): finished with status 'done'
  Created wheel for moviepy: filename=moviepy-1.0.3-py3-none-any.whl size=110743 sha256=c1ff8dfb1346781453b358e69805bba07f00591a7af8f776da1868c243882acc
  Stored in direc

In [3]:
# 라이브러리 불러오기
import os
import cv2
import random
import numpy as np
import tensorflow as tf
from moviepy.editor import *
from collections import deque

from sklearn.model_selection import train_test_split

from tensorflow.keras.layers import *
from tensorflow.keras.models import Sequential
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint

In [6]:
# 랜덤 시드 설정하기
seed_constant = 99
np.random.seed(seed_constant)
random.seed(seed_constant)
tf.random.set_seed(seed_constant)

In [4]:
# 영상 크기 설정하기
image_height, image_width = 128, 128

# 최대 프레임 수 설정하기
max_images_per_class = 600

dataset_directory = '분류된 폴더가 있는 경로를 입력하면 됨'
classes_list = ['불안, 슬픔, 공포', '편안, 안정, 행복', '화남, 불쾌, 공격성'] # os.listdir로 불러와도 됨

model_output_size = len(classes_list)

In [7]:
# 프레임 추출하기
def frames_extraction(video_path):
    
    frames_list = []
    video_reader = cv2.VideoCapture(video_path)

    while True:
        success, frame = video_reader.read() 
        
        if not success:
            break

        resized_frame = cv2.resize(frame, (image_height, image_width))
        normalized_frame = resized_frame / 255
        frames_list.append(normalized_frame)
    
    video_reader.release()

    return frames_list

In [8]:
# 데이터셋 생성하기
def create_dataset():

    temp_features = []
    features = []
    labels = []
    
    for class_index, class_name in enumerate(classes_list):
        print(f'Extracting Data of Class: {class_name}')
        
        files_list = os.listdir(os.path.join(dataset_directory, class_name))
        
        for file_name in files_list:
            video_file_path = os.path.join(dataset_directory, class_name, file_name)
            frames = frames_extraction(video_file_path)
            temp_features.extend(frames)        
       
        features.extend(random.sample(temp_features, max_images_per_class))
        labels.extend([class_index] * max_images_per_class)
        temp_features.clear()

    features = np.asarray(features)
    labels = np.array(labels)  

    return features, labels

features, labels = create_dataset()

In [None]:
one_hot_encoded_labels = to_categorical(labels)
features_train, features_test, labels_train, labels_test = train_test_split(features, one_hot_encoded_labels, shuffle = True, random_state = seed_constant)

In [None]:
# 모델 생성하기(VGG16 모델 전이학습)
from tensorflow.keras.applications import VGG16

conv_base = VGG16(weights = "imagenet", include_top = False, input_shape = (image_height, image_width, 3))

def create_model():

    model = Sequential()
    model.add(conv_base)

    model.add(Flatten())
    model.add(BatchNormalization())
    model.add(Dense(units = 32, activation = "relu"))
    model.add(Dense(units = 64, activation = "relu"))
    model.add(Dense(units = 128, activation = "relu"))
    model.add(Dense(units = 128, activation = "relu"))
    model.add(Dense(units = 64, activation = "relu"))
    model.add(Dense(units = 32, activation = "relu"))
    model.add(Dense(units = model_output_size, activation = "softmax")) 

    return model

model = create_model()

In [9]:
model.compile(loss = 'categorical_crossentropy', optimizer = 'Adam', metrics = ["accuracy"])

model_save_dir = "모델을 저장할 폴더의 경로를 입력하면 됨"
filepath = model_save_dir + "model_acc_{accuracy:.3f}_vacc_{val_accuracy:.3f}_loss_{loss:.3f}_vloss_{val_loss:.3f}.hdf5"

model_checkpoint_callback = ModelCheckpoint(filepath = filepath, monitor = 'val_accuracy', save_best_only = True)    
early_stopping_callback = EarlyStopping(monitor = 'val_accuracy', patience = 15, mode = 'max', restore_best_weights = True)

model_training_history = model.fit(x = features_train, y = labels_train, epochs = 100, batch_size = 10 , shuffle = True, validation_data = (features_test, labels_test), callbacks = [model_checkpoint_callback, early_stopping_callback])

In [16]:
# 모델 불러오고 동영상 분석하기
from tensorflow.keras.models import load_model

model = load_model("불러올 모델 파일의 경로를 입력하면 됨")

def make_average_predictions(video_file_path, predictions_frames_count):
    
    predicted_labels_probabilities_np = np.zeros((predictions_frames_count, model_output_size), dtype = np.float)

    video_reader = cv2.VideoCapture(video_file_path)

    video_frames_count = int(video_reader.get(cv2.CAP_PROP_FRAME_COUNT))
    skip_frames_window = video_frames_count // predictions_frames_count

    for frame_counter in range(predictions_frames_count):
        
        video_reader.set(cv2.CAP_PROP_POS_FRAMES, frame_counter * skip_frames_window)
        _ , frame = video_reader.read() 

        resized_frame = cv2.resize(frame, (image_height, image_width))
        normalized_frame = resized_frame / 255

        predicted_labels_probabilities = model.predict(np.expand_dims(normalized_frame, axis = 0))[0]
        predicted_labels_probabilities_np[frame_counter] = predicted_labels_probabilities

    predicted_labels_probabilities_averaged = predicted_labels_probabilities_np.mean(axis = 0)
    predicted_labels_probabilities_averaged_sorted_indexes = np.argsort(predicted_labels_probabilities_averaged)[::-1]
    
    dic_res = {} # 최종 결과값을 저장할 딕셔너리를 생성함
    
    for predicted_label in predicted_labels_probabilities_averaged_sorted_indexes:

        predicted_class_name = classes_list[predicted_label]
        predicted_probability = predicted_labels_probabilities_averaged[predicted_label]

        dic_res[predicted_class_name] = predicted_probability

    video_reader.release()

In [17]:
input_video_file_path = "분석할 동영상 파일의 경로를 입력하면 됨"
make_average_predictions(input_video_file_path, 60) # 결과는 딕셔너리 형태로 반환됨

CLASS NAME: 화남, 불쾌, 공격성   AVERAGED PROBABILITY: 0.8540238777796427
{'화남, 불쾌, 공격성': 0.8540238777796427}
CLASS NAME: 편안, 안정, 행복   AVERAGED PROBABILITY: 0.11335821500979364
{'화남, 불쾌, 공격성': 0.8540238777796427, '편안, 안정, 행복': 0.11335821500979364}
CLASS NAME: 불안, 슬픔, 공포   AVERAGED PROBABILITY: 0.0326179020810135
{'화남, 불쾌, 공격성': 0.8540238777796427, '편안, 안정, 행복': 0.11335821500979364, '불안, 슬픔, 공포': 0.0326179020810135}
