In [1]:
# 필요한 라이브러리들을 임포트합니다.
from tensorflow_docs.vis import embed  # TensorFlow 문서의 시각화를 위한 도구입니다.
from tensorflow import keras  # TensorFlow의 고수준 신경망 API입니다.
from imutils import paths  # 이미지 처리를 위한 유틸리티 함수들을 제공합니다.

import matplotlib.pyplot as plt  # 그래프와 이미지를 시각화하기 위한 라이브러리입니다.
import tensorflow as tf  # TensorFlow 라이브러리입니다, 딥러닝 모델을 구성하고 훈련하기 위해 사용됩니다.
import pandas as pd  # 데이터 분석 및 조작을 위한 라이브러리입니다.
import numpy as np  # 수치 계산을 위한 라이브러리입니다.
import imageio  # 이미지 읽기/쓰기를 위한 라이브러리입니다.
import cv2  # OpenCV 라이브러리, 이미지 및 비디오 처리를 위해 사용됩니다.
import os  # 운영체제와 상호작용을 위한 라이브러리입니다, 파일 경로 조작에 주로 사용됩니다.
import mediapipe as mp

In [2]:
mp_hands = mp.solutions.hands
mp_pose = mp.solutions.pose

In [None]:
dataset_path = os.listdir('dataset/train')
label_types = os.listdir('dataset/train')
# 훈련 데이터셋을 위한 비어있는 리스트 초기화
rooms = []
# dataset_path에 저장된 각 항목(방 유형)에 대해 반복
for item in dataset_path:
    # 'dataset/train' 폴더 내 각 방 유형별로 모든 파일 이름을 가져옴
    all_rooms = os.listdir('dataset/train'+'/'+item)    
    # 가져온 파일 이름을 rooms 리스트에 추가
    for room in all_rooms:
        rooms.append((item, str('dataset/train'+'/'+item)+'/'+room))
# rooms 리스트를 사용하여 데이터프레임 생성
train_df = pd.DataFrame(data=rooms, columns=['tag','video_name']).loc[:,['video_name','tag']]
df = train_df.loc[:,['video_name','tag']]
# 생성된 데이터프레임을 CSV 파일로 저장
df.to_csv('train.csv', encoding='utf-8-sig')
df

In [None]:
dataset_path = os.listdir('dataset/test')
room_types = os.listdir('dataset/test')

rooms = []
# dataset_path에 저장된 각 항목(방 유형)에 대해 반복
for item in dataset_path:
    # 'dataset/test' 폴더 내 각 방 유형별로 모든 파일 이름을 가져옴
    all_rooms = os.listdir('dataset/test'+'/'+item)
    # 가져온 파일 이름을 rooms 리스트에 추가
    for room in all_rooms:
        rooms.append((item, str('dataset/test'+'/'+item)+'/'+room))

# rooms 리스트를 사용하여 데이터프레임 생성
train_df = pd.DataFrame(data=rooms, columns=['tag','video_name'])
df = train_df.loc[:,['video_name','tag']]
# 생성된 데이터프레임을 CSV 파일로 저장
df.to_csv('test.csv', encoding='utf-8-sig')
df

In [None]:
# 이미지의 크기, 배치 크기, 에포크 수를 정의하는 하이퍼파라미터입니다.
IMG_SIZE = 224  # 입력 이미지의 크기를 정의합니다.
BATCH_SIZE = 64  # 한 번에 처리할 이미지의 수를 정의합니다.
EPOCHS = 10  # 모델을 훈련할 때 전체 데이터셋을 반복할 횟수를 정의합니다.

# 비디오 처리에 사용할 최대 시퀀스 길이와 특징 벡터의 크기를 정의합니다.
MAX_SEQ_LENGTH = 20  # 처리할 비디오의 최대 프레임 수를 정의합니다.
NUM_FEATURES = 2048  # 비디오 프레임에서 추출할 특징의 차원 수를 정의합니다.

"""
## 데이터 준비 부분
"""

# 훈련 및 테스트 데이터를 포함하는 CSV 파일을 읽어서 DataFrame 객체를 생성합니다.
train_df = pd.read_csv("train.csv")  # 훈련 데이터셋을 불러옵니다.
test_df = pd.read_csv("test.csv")  # 테스트 데이터셋을 불러옵니다.

# 훈련 및 테스트 데이터셋의 비디오 수를 출력합니다.
print(f"Total videos for training: {len(train_df)}")  # 훈련용 비디오의 총 개수를 출력합니다.
print(f"Total videos for testing: {len(test_df)}")  # 테스트용 비디오의 총 개수를 출력합니다.

# 훈련 데이터셋에서 무작위로 10개의 샘플을 출력합니다.
train_df.sample(10)  # 훈련 데이터셋의 예시를 출력하기 위해 무작위로 10개의 샘플을 선택합니다.

In [None]:
# 비디오 프레임의 중앙을 정사각형으로 자르는 함수입니다.
def crop_center_square(frame):
    # 프레임의 높이와 너비를 가져옵니다.
    y, x = frame.shape[0:2]
    # 높이와 너비 중 더 작은 값을 선택합니다.
    min_dim = min(y, x)
    # 중앙에서 시작할 x, y 좌표를 계산합니다.
    start_x = (x // 2) - (min_dim // 2)
    start_y = (y // 2) - (min_dim // 2)
    # 정사각형 프레임을 반환합니다.
    return frame[start_y : start_y + min_dim, start_x : start_x + min_dim]

# 비디오를 로드하고, 선택적으로 프레임의 수를 제한하며, 프레임을 재조정하는 함수입니다.
def load_video(path, max_frames=0, resize=(IMG_SIZE, IMG_SIZE)):
    # 비디오 경로로부터 비디오 캡처 객체를 생성합니다.
    cap = cv2.VideoCapture(path)
    frames = []  # 프레임을 저장할 리스트입니다.
    try:
        while True:
            ret, frame = cap.read()  # 프레임을 하나씩 읽습니다.
            if not ret:
                break  # 더 이상 프레임이 없으면 종료합니다.
            frame = crop_center_square(frame)  # 중앙 정사각형으로 자릅니다.
            frame = cv2.resize(frame, resize)  # 프레임을 재조정합니다.
            frame = frame[:, :, [2, 1, 0]]  # BGR을 RGB로 순서를 변경합니다.
            frames.append(frame)  # 프레임을 리스트에 추가합니다.

            if len(frames) == max_frames:  # 최대 프레임 수에 도달하면 종료합니다.
                break
    finally:
        cap.release()  # 비디오 캡처 객체를 해제합니다.
    return np.array(frames)  # 프레임의 배열을 반환합니다.

In [None]:
# 특징 추출기를 구축하는 함수입니다.
def build_feature_extractor():
    # InceptionV3 모델을 특징 추출기로 사용합니다. ImageNet 데이터로 사전 훈련된 가중치를 사용하고, 
    # 최상위 층은 포함하지 않으며, 평균 풀링을 사용합니다.
    feature_extractor = keras.applications.InceptionV3(
        weights="imagenet",
        include_top=False,
        pooling="avg",
        input_shape=(IMG_SIZE, IMG_SIZE, 3),
    )
    # InceptionV3에 맞게 입력 데이터를 전처리하는 함수입니다.
    preprocess_input = keras.applications.inception_v3.preprocess_input

    # 모델의 입력을 정의합니다.
    inputs = keras.Input((IMG_SIZE, IMG_SIZE, 3))
    # 입력 데이터를 전처리합니다.
    preprocessed = preprocess_input(inputs)

    # 전처리된 데이터를 특징 추출기에 통과시켜 출력을 얻습니다.
    outputs = feature_extractor(preprocessed)
    # 입력과 출력을 연결하는 케라스 모델을 생성합니다.
    return keras.Model(inputs, outputs, name="feature_extractor")

# 특징 추출기 모델을 생성합니다.
feature_extractor = build_feature_extractor()

# 클래스 라벨을 정수로 변환하는 레이어를 생성합니다.
label_processor = keras.layers.StringLookup(
    num_oov_indices=0, vocabulary=np.unique(train_df["tag"])
)
# 라벨 프로세서가 이해하는 어휘목록을 출력합니다.
print(label_processor.get_vocabulary())

In [None]:
# 주어진 데이터프레임(df)과 루트 디렉터리(root_dir)를 사용하여 모든 비디오를 준비하는 함수입니다.
def prepare_all_videos(df, root_dir):
    num_samples = len(df)  # 샘플의 수를 결정합니다.
    video_paths = df["video_name"].values.tolist()  # 비디오 경로를 리스트로 변환합니다.
    labels = df["tag"].values  # 라벨 값을 가져옵니다.
    labels = label_processor(labels[..., None]).numpy()  # 라벨을 처리하여 넘파이 배열로 변환합니다.

    # `frame_masks`와 `frame_features`는 우리의 순차 모델에 제공될 데이터입니다.
    # `frame_masks`는 시간 단계가 패딩으로 마스킹되었는지 여부를 나타내는 부울 값들을 포함합니다.
    frame_masks = np.zeros(shape=(num_samples, MAX_SEQ_LENGTH), dtype="bool")
    frame_features = np.zeros(
        shape=(num_samples, MAX_SEQ_LENGTH, NUM_FEATURES), dtype="float32"
    )

    # 각 비디오에 대해서.
    for idx, path in enumerate(video_paths):
        # 모든 프레임을 수집하고 배치 차원을 추가합니다.
        frames = load_video(os.path.join(root_dir, path))
        frames = frames[None, ...]

        # 현재 비디오의 마스크와 특징을 저장하기 위한 임시 공간을 초기화합니다.
        temp_frame_mask = np.zeros(
            shape=(
                1,
                MAX_SEQ_LENGTH,
            ),
            dtype="bool",
        )
        temp_frame_features = np.zeros(
            shape=(1, MAX_SEQ_LENGTH, NUM_FEATURES), dtype="float32"
        )

        # 현재 비디오의 프레임에서 특징을 추출합니다.
        for i, batch in enumerate(frames):
            video_length = batch.shape[0]
            length = min(MAX_SEQ_LENGTH, video_length)
            for j in range(length):
                # 특징 추출기를 사용하여 각 프레임의 특징을 예측합니다.
                temp_frame_features[i, j, :] = feature_extractor.predict(
                    batch[None, j, :]
                )
            # 마스크를 설정합니다. 1 = 마스킹되지 않음, 0 = 마스킹됨
            temp_frame_mask[i, :length] = 1  

        # 임시 특징과 마스크를 각각의 특징 및 마스크 배열에 할당합니다.
        frame_features[idx,] = temp_frame_features.squeeze()
        frame_masks[idx,] = temp_frame_mask.squeeze()

    # 특징과 마스크 배열, 라벨을 반환합니다.
    return (frame_features, frame_masks), labels

# 훈련 데이터와 라벨을 준비합니다.
train_data, train_labels = prepare_all_videos(train_df, "train")
# 테스트 데이터와 라벨을 준비합니다.
test_data, test_labels = prepare_all_videos(test_df, "test")

# 훈련 데이터 세트의 프레임 특징과 마스크의 차원을 출력합니다.
print(f"Frame features in train set: {train_data[0].shape}")
print(f"Frame masks in train set: {train_data[1].shape}")

In [None]:
# 순차 모델을 위한 유틸리티입니다.
def get_sequence_model():
    class_vocab = label_processor.get_vocabulary()

    # 입력으로 프레임 특징과 마스크를 받습니다.
    frame_features_input = keras.Input((MAX_SEQ_LENGTH, NUM_FEATURES))
    mask_input = keras.Input((MAX_SEQ_LENGTH,), dtype="bool")

    # `mask` 사용의 중요성을 이해하기 위한 참고 자료:
    # https://keras.io/api/layers/recurrent_layers/gru/
    x = keras.layers.GRU(16, return_sequences=True)(
        frame_features_input, mask=mask_input
    )
    x = keras.layers.GRU(8)(x)
    x = keras.layers.Dropout(0.4)(x)
    x = keras.layers.Dense(8, activation="relu")(x)
    # 출력 계층은 분류할 클래스 수만큼의 노드를 가집니다.
    output = keras.layers.Dense(len(class_vocab), activation="softmax")(x)

    rnn_model = keras.Model([frame_features_input, mask_input], output)

    # 모델 컴파일
    rnn_model.compile(
        loss="sparse_categorical_crossentropy", optimizer="adam", metrics=["accuracy"]
    )
    return rnn_model


# 실험을 실행하기 위한 유틸리티입니다.
def run_experiment():
    filepath = "/tmp/video_classifier"
    checkpoint = keras.callbacks.ModelCheckpoint(
        filepath, save_weights_only=True, save_best_only=True, verbose=1
    )

    seq_model = get_sequence_model()
    history = seq_model.fit(
        [train_data[0], train_data[1]],
        train_labels,
        validation_split=0.3,
        epochs=EPOCHS,
        callbacks=[checkpoint],
    )

    seq_model.load_weights(filepath)
    _, accuracy = seq_model.evaluate([test_data[0], test_data[1]], test_labels)
    print(f"Test accuracy: {round(accuracy * 100, 2)}%")

    return history, seq_model


_, sequence_model = run_experiment()

In [None]:
# 단일 비디오를 준비하기 위한 함수입니다.
def prepare_single_video(frames):
    # ... (중략) ...

# 비디오의 시퀀스 예측을 수행하는 함수입니다.
def sequence_prediction(path):
    # ... (중략) ...

# 이 유틸리티는 시각화를 위한 것입니다.
# 참조된 자료:
# https://www.tensorflow.org/hub/tutorials/action_recognition_with_tf_hub
def to_gif(images):
    # ... (중략) ...

test_video = np.random.choice(test_df["video_name"].values.tolist())
print(f"Test video path: {test_video}")
test_frames = sequence_prediction(test_video)
to_gif(test_frames[:MAX_SEQ_LENGTH])
