In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
!pip install tensorflow-docs

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting tensorflow-docs
  Downloading tensorflow_docs-2023.5.24.56664-py3-none-any.whl (183 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m183.6/183.6 kB[0m [31m5.8 MB/s[0m eta [36m0:00:00[0m
Collecting astor (from tensorflow-docs)
  Downloading astor-0.8.1-py2.py3-none-any.whl (27 kB)
Installing collected packages: astor, tensorflow-docs
Successfully installed astor-0.8.1 tensorflow-docs-2023.5.24.56664


In [3]:
from tensorflow_docs.vis import embed
from tensorflow import keras
from imutils import paths

import matplotlib.pyplot as plt
import tensorflow as tf
import pandas as pd
import numpy as np
import imageio
import cv2
import os

In [39]:
IMG_SIZE = 224
BATCH_SIZE = 64
EPOCHS = 10

MAX_SEQ_LENGTH = 20
NUM_FEATURES = 2048

In [40]:
train_df = pd.read_csv("/content/drive/MyDrive/PFA/videos/video_train.csv")
val_df = pd.read_csv("/content/drive/MyDrive/PFA/videos/video_val.csv")
test_df = pd.read_csv("/content/drive/MyDrive/PFA/videos/video_test.csv")


In [41]:
train_df = train_df.dropna()
val_df = val_df.dropna()
test_df = test_df.dropna()

In [42]:
print(f"Total videos for training: {len(train_df)}")
print(f"Total videos for validating: {len(val_df)}")
print(f"Total videos for testing: {len(test_df)}")

train_df.sample(10)

Total videos for training: 141
Total videos for validating: 30
Total videos for testing: 31


Unnamed: 0,video_path,F1,F2,F3,F4,F5
11,/content/drive/MyDrive/PFA/videos/train/extrov...,extrovert,intuitive,feeling,judging,dale.am
54,/content/drive/MyDrive/PFA/videos/train/introv...,introvert,sensing,feeling,perceiving,mariafrey
131,/content/drive/MyDrive/PFA/videos/train/extrov...,extrovert,intuitive,feeling,perceiving,mariannacastaldi
74,/content/drive/MyDrive/PFA/videos/train/introv...,introvert,sensing,feeling,judging,bicky
34,/content/drive/MyDrive/PFA/videos/train/extrov...,extrovert,sensing,feeling,perceiving,jolanta
106,/content/drive/MyDrive/PFA/videos/train/introv...,introvert,intuitive,feeling,judging,wolfiezero
52,/content/drive/MyDrive/PFA/videos/train/extrov...,extrovert,intuitive,feeling,perceiving,michaelguenther
66,/content/drive/MyDrive/PFA/videos/train/extrov...,extrovert,sensing,feeling,judging,daveruse
137,/content/drive/MyDrive/PFA/videos/train/extrov...,extrovert,intuitive,feeling,judging,adrihussey
85,/content/drive/MyDrive/PFA/videos/train/introv...,introvert,intuitive,feeling,judging,bfrank


In [43]:
def crop_center_square(frame):
    y, x = frame.shape[0:2]
    min_dim = min(y, x)
    start_x = (x // 2) - (min_dim // 2)
    start_y = (y // 2) - (min_dim // 2)
    return frame[start_y : start_y + min_dim, start_x : start_x + min_dim]

In [44]:
def load_video(path, max_frames=0, resize=(IMG_SIZE, IMG_SIZE)):
    cap = cv2.VideoCapture(path)
    frames = []
    try:
        while True:
            ret, frame = cap.read()
            if not ret:
                break
            frame = crop_center_square(frame)
            frame = cv2.resize(frame, resize)
            frame = frame[:, :, [2, 1, 0]]
            frames.append(frame)

            if len(frames) == max_frames:
                break
    finally:
        cap.release()
    return np.array(frames)

In [45]:
def build_feature_extractor():
    feature_extractor = keras.applications.InceptionV3(
        weights="imagenet",
        include_top=False,
        pooling="avg",
        input_shape=(IMG_SIZE, IMG_SIZE, 3),
    )
    preprocess_input = keras.applications.inception_v3.preprocess_input

    inputs = keras.Input((IMG_SIZE, IMG_SIZE, 3))
    preprocessed = preprocess_input(inputs)

    outputs = feature_extractor(preprocessed)
    return keras.Model(inputs, outputs, name="feature_extractor")

In [46]:
feature_extractor = build_feature_extractor()

In [54]:
label_processor = keras.layers.StringLookup(
    num_oov_indices=0, vocabulary=np.unique(train_df["F2"])
)
print(label_processor.get_vocabulary())

['intuitive', 'sensing']


In [55]:
def prepare_all_videos(df, root_dir=''):
    num_samples = len(df)
    print(num_samples)
    video_paths = df["video_path"].values.tolist()
    labels = df["F2"].values
    labels = label_processor(labels[..., None]).numpy()

    # `frame_masks` and `frame_features` are what we will feed to our sequence model.
    # `frame_masks` will contain a bunch of booleans denoting if a timestep is
    # masked with padding or not.
    frame_masks = np.zeros(shape=(num_samples, MAX_SEQ_LENGTH), dtype="bool")
    frame_features = np.zeros(
        shape=(num_samples, MAX_SEQ_LENGTH, NUM_FEATURES), dtype="float32"
    )
    # For each video.
    for idx, path in enumerate(video_paths):
        # Gather all its frames and add a batch dimension.
        frames = load_video(os.path.join(root_dir, path))
        frames = frames[None, ...]

        # Initialize placeholders to store the masks and features of the current video.
        temp_frame_mask = np.zeros(shape=(1, MAX_SEQ_LENGTH,), dtype="bool")
        temp_frame_features = np.zeros(
            shape=(1, MAX_SEQ_LENGTH, NUM_FEATURES), dtype="float32"
        )
        # Extract features from the frames of the current video.
        for i, batch in enumerate(frames):
            video_length = batch.shape[0]
            length = min(MAX_SEQ_LENGTH, video_length)
            for j in range(length):
                temp_frame_features[i, j, :] = feature_extractor.predict(
                    batch[None, j, :]
                )
            temp_frame_mask[i, :length] = 1  # 1 = not masked, 0 = masked

        frame_features[idx,] = temp_frame_features.squeeze()
        frame_masks[idx,] = temp_frame_mask.squeeze()

    return (frame_features, frame_masks), labels

In [56]:
train_data, train_labels = prepare_all_videos(train_df)
print(f"Frame features in train set: {train_data[0].shape}")


141
Frame features in train set: (141, 20, 2048)


In [57]:
validation_data, validation_labels = prepare_all_videos(val_df)
print(f"Frame features in validation set: {validation_data[0].shape}")

30
Frame features in validation set: (30, 20, 2048)


In [58]:
test_data, test_labels = prepare_all_videos(test_df)
print(f"Frame masks in train set: {test_data[1].shape}")

31
Frame masks in train set: (31, 20)


In [59]:
from keras import backend as K

def recall_m(y_true, y_pred):
    true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
    possible_positives = K.sum(K.round(K.clip(y_true, 0, 1)))
    recall = true_positives / (possible_positives + K.epsilon())
    return recall

def precision_m(y_true, y_pred):
    true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
    predicted_positives = K.sum(K.round(K.clip(y_pred, 0, 1)))
    precision = true_positives / (predicted_positives + K.epsilon())
    return precision

def f1_m(y_true, y_pred):
    precision = precision_m(y_true, y_pred)
    recall = recall_m(y_true, y_pred)
    return 2*((precision*recall)/(precision+recall+K.epsilon()))
def get_sequence_model():
    class_vocab = label_processor.get_vocabulary()

    frame_features_input = keras.Input((MAX_SEQ_LENGTH, NUM_FEATURES))
    mask_input = keras.Input((MAX_SEQ_LENGTH,), dtype="bool")

    # Refer to the following tutorial to understand the significance of using `mask`:
    # https://keras.io/api/layers/recurrent_layers/gru/
    x = keras.layers.GRU(16, return_sequences=True)(
        frame_features_input, mask=mask_input
    )
    x = keras.layers.GRU(8)(x)
    x = keras.layers.Dropout(0.4)(x)
    x = keras.layers.Dense(8, activation="relu")(x)
    output = keras.layers.Dense(len(class_vocab), activation="softmax")(x)

    rnn_model = keras.Model([frame_features_input, mask_input], output)

    rnn_model.compile(
        #loss="sparse_categorical_crossentropy", optimizer="adam", metrics=["accuracy"]
        loss="sparse_categorical_crossentropy", optimizer="adam", metrics=["accuracy",f1_m,precision_m, recall_m]

    )
    return rnn_model


In [60]:
# Utility for running experiments.
def run_experiment():
    filepath = "/tmp/video_classifier"
    checkpoint = keras.callbacks.ModelCheckpoint(
        filepath, save_weights_only=True, save_best_only=True, verbose=1
    )

    seq_model = get_sequence_model()
    history = seq_model.fit(
        [train_data[0], train_data[1]],
        train_labels,
        validation_split=0.3,
        epochs=EPOCHS,
        callbacks=[checkpoint],
    )

    seq_model.load_weights(filepath)
    # _, accuracy = seq_model.evaluate([test_data[0], test_data[1]], test_labels)
    loss, accuracy, f1_score, precision, recall = seq_model.evaluate([test_data[0], test_data[1]], test_labels)

    print(f"Test accuracy: {round(accuracy * 100, 2)}%")
    print(f"Test F1: {round(f1_score * 100, 2)}%")
    print(f"Test Precision: {round(precision * 100, 2)}%")
    print(f"Test Recall: {round(recall * 100, 2)}%")

    return history, seq_model


_, sequence_model = run_experiment()

Epoch 1/10
Epoch 1: val_loss improved from inf to 0.67427, saving model to /tmp/video_classifier
Epoch 2/10
Epoch 2: val_loss improved from 0.67427 to 0.67259, saving model to /tmp/video_classifier
Epoch 3/10
Epoch 3: val_loss did not improve from 0.67259
Epoch 4/10
Epoch 4: val_loss did not improve from 0.67259
Epoch 5/10
Epoch 5: val_loss did not improve from 0.67259
Epoch 6/10
Epoch 6: val_loss did not improve from 0.67259
Epoch 7/10
Epoch 7: val_loss did not improve from 0.67259
Epoch 8/10
Epoch 8: val_loss did not improve from 0.67259
Epoch 9/10
Epoch 9: val_loss did not improve from 0.67259
Epoch 10/10
Epoch 10: val_loss did not improve from 0.67259
Test accuracy: 58.06%
Test F1: 73.47%
Test Precision: 58.06%
Test Recall: 100.0%
