In [1]:
# Imports

import os
import cv2
import json
import glob
import numpy as np
import pandas as pd

In [2]:
# Mounting gdrive

from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
# Get list of classes

CLASSES = []
for i in os.scandir("/content/drive/MyDrive/v1.0"):
  CLASSES.append(i.path.split("/")[5].split("_")[2])
print(CLASSES)

['birddog', 'curl', 'fly', 'legraise', 'pushup', 'superman', 'bicyclecrunch', 'squat', 'armraise', 'overheadpress']


In [4]:
# Create Dataset folder
os.mkdir("/content/Dataset")

# Create folder for each class
for i in CLASSES:
  path = "/content/Dataset/"+i
  os.mkdir(path)

In [5]:
# Unzip the video files in respective class folders

import shutil

for i in os.scandir("/content/drive/MyDrive/v1.0"):
  for exercise in CLASSES:
    des_path = path = "/content/Dataset/"+exercise
    shutil.unpack_archive(i.path, des_path)


In [27]:
IMAGE_HEIGHT , IMAGE_WIDTH = 75, 75

SEQUENCE_LENGTH = 20

DATASET_DIR = "/content/Dataset"

In [28]:
# Extract,resize and normalize

def frames_extraction(video_path):
    '''
    This function will extract the required frames from a video after resizing and normalizing them.
    Args:
        video_path: The path of the video in the disk, whose frames are to be extracted.
    Returns:
        frames_list: A list containing the resized and normalized frames of the video.
    '''

    # Declare a list to store video frames.
    frames_list = []

    # Read the Video File using the VideoCapture object.
    video_reader = cv2.VideoCapture(video_path)

    # Get the total number of frames in the video.
    video_frames_count = int(video_reader.get(cv2.CAP_PROP_FRAME_COUNT))

    # Calculate the the interval after which frames will be added to the list.
    skip_frames_window = max(int(video_frames_count/SEQUENCE_LENGTH), 1)

    # Iterate through the Video Frames.
    for frame_counter in range(SEQUENCE_LENGTH):

        # Set the current frame position of the video.
        video_reader.set(cv2.CAP_PROP_POS_FRAMES, frame_counter * skip_frames_window)

        # Reading the frame from the video.
        success, frame = video_reader.read()

        # Check if Video frame is not successfully read then break the loop
        if not success:
            break
        # Crop center
        y, x = frame.shape[0:2]
        min_dim = min(y, x)
        start_x = (x // 2) - (min_dim // 2)
        start_y = (y // 2) - (min_dim // 2)
        cropped_frame =  frame[start_y : start_y + min_dim, start_x : start_x + min_dim]

        # Resize the Frame to fixed height and width.
        resized_frame = cv2.resize(cropped_frame, (IMAGE_HEIGHT, IMAGE_WIDTH))

        # Normalize the resized frame by dividing it with 255 so that each pixel value then lies between 0 and 1
        normalized_frame = resized_frame / 255

        # Append the normalized frame into the frames list
        frames_list.append(normalized_frame)

    # Release the VideoCapture object.
    video_reader.release()

    # Return the frames list.
    return frames_list

In [29]:
def create_dataset():

    # Declared Empty Lists to store the features, labels and video file path values.
    features = []
    labels = []
    video_files_paths = []

    # Iterating through all the classes mentioned in the classes list
    for class_index, class_name in enumerate(CLASSES):

            folder = "/content/Dataset/"+ class_name +"/data"
            video_paths = (sorted(glob.glob(os.path.join(folder, "*.mp4"))))
            # video_files_paths.extend(video_paths)

            for video_file_path in video_paths:

              # Extract the frames of the video file.
              frames = frames_extraction(video_file_path)

              # So ignore the vides having frames less than the SEQUENCE_LENGTH.
              if len(frames) == SEQUENCE_LENGTH:

                  # Append the data to their repective lists.
                  features.append(frames)
                  labels.append(class_index)
                  video_files_paths.append(video_file_path)

    # Converting the list to numpy arrays
    features = np.asarray(features)
    labels = np.array(labels)

    # Return the frames, class index, and video file path.
    return features, labels, video_files_paths

In [30]:
features, labels, video_files_paths = create_dataset()

In [31]:
print(features.shape)

(1000, 20, 75, 75, 3)


In [32]:
from sklearn.model_selection import train_test_split

from tensorflow.keras.layers import *
from tensorflow.keras.models import Sequential
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.utils import plot_model

In [33]:
one_hot_encoded_labels = to_categorical(labels)

In [34]:
features_train, features_test, labels_train, labels_test = train_test_split(features, one_hot_encoded_labels,
                                                                            test_size = 0.25, shuffle = True)



In [35]:
features_train.shape

(750, 20, 75, 75, 3)

In [36]:
from tensorflow import keras

def build_feature_extractor():
    feature_extractor = keras.applications.InceptionV3(
        weights="imagenet",
        include_top=False,
        pooling="avg",
        input_shape=(75, 75, 3),
    )
    preprocess_input = keras.applications.inception_v3.preprocess_input

    inputs = keras.Input((75, 75, 3))
    preprocessed = preprocess_input(inputs)

    outputs = feature_extractor(preprocessed)
    return keras.Model(inputs, outputs, name="feature_extractor")


feature_extractor = build_feature_extractor()

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/inception_v3/inception_v3_weights_tf_dim_ordering_tf_kernels_notop.h5


In [37]:
label_processor = keras.layers.StringLookup(
    num_oov_indices=0, vocabulary=np.unique(train_df["tag"])
)

# Utility for our sequence model.
def get_sequence_model():
    class_vocab = label_processor.get_vocabulary()

    frame_features_input = keras.Input((20, 2048))
    mask_input = keras.Input((20,), dtype="bool")

    # Refer to the following tutorial to understand the significance of using `mask`:
    # https://keras.io/api/layers/recurrent_layers/gru/
    x = keras.layers.GRU(16, return_sequences=True)(frame_features_input, mask=mask_input)
    x = keras.layers.GRU(8)(x)
    x = keras.layers.Dropout(0.4)(x)
    x = keras.layers.Dense(8, activation="relu")(x)
    output = keras.layers.Dense(len(class_vocab), activation="softmax")(x)

    rnn_model = keras.Model([frame_features_input, mask_input], output)

    rnn_model.compile(
        loss="sparse_categorical_crossentropy", optimizer="adam", metrics=["accuracy"]
    )
    return rnn_model

EPOCHS = 30
# Utility for running experiments.
def run_experiment():
    filepath = "./tmp/video_classifier"
    checkpoint = keras.callbacks.ModelCheckpoint(
        filepath, save_weights_only=True, save_best_only=True, verbose=1
    )

    seq_model = get_sequence_model()
    history = seq_model.fit(
        [train_data[0], train_data[1]],
        train_labels,
        validation_split=0.3,
        epochs=EPOCHS,
        callbacks=[checkpoint],
    )

    seq_model.load_weights(filepath)
    _, accuracy = seq_model.evaluate([test_data[0], test_data[1]], test_labels)
    print(f"Test accuracy: {round(accuracy * 100, 2)}%")

    return history, seq_model


_, sequence_model = run_experiment()

NameError: ignored