In [None]:
import os
import numpy as np
import cv2
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.layers import (Input, ConvLSTM2D, MaxPooling3D, 
                                   Dropout, Dense, GlobalAveragePooling3D,
                                   TimeDistributed, MultiHeadAttention,
                                   Reshape, GlobalAveragePooling1D)
from tensorflow.keras.applications import EfficientNetB0
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.utils import to_categorical, Sequence
from tensorflow.keras.callbacks import (EarlyStopping, ReduceLROnPlateau, 
                                      ModelCheckpoint, TerminateOnNaN)
from tensorflow.keras import mixed_precision
from sklearn.model_selection import train_test_split
import random
import torch
from tensorflow.keras.models import Sequential, load_model
from classes import CLASSES_LIST, IMAGE_HEIGHT, IMAGE_WIDTH, SEQUENCE_LENGTH, BATCH_SIZE, DATASET_DIR



device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

In [None]:
# Store weights using float32 (accuracy)
# Make computations using float16 (effectiveness)
policy = mixed_precision.Policy('mixed_float16')
# Set it up for all layers in the model
mixed_precision.set_global_policy(policy)

# Load pre-trained model for features extraction
base_model = EfficientNetB0(weights="imagenet", include_top=False, 
                           pooling="avg", input_shape=(IMAGE_HEIGHT, IMAGE_WIDTH, 3))
feature_extractor = Model(inputs=base_model.input, outputs=base_model.output).to(device)  

"""
EfficientNetB0 is a pre-trained CNN model for recognising actions on images, 
trained on the ImageNet dataset, containing around 1000 categories cover a wide range of objects,
including animals, vehicles, household items, and more.
I prefered to choose this model because I needed a high accuracy with relatively low computational cost.
"""


In [None]:
def smart_frame_extraction(video_path):
    # List for frames extraction
    frames = []

    # Capture video
    cap = cv2.VideoCapture(video_path)

    # Get the overall number of frames
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    
    if total_frames <= SEQUENCE_LENGTH:
        # if number of frames <= required number,
        # just take all of them.
        indices = range(total_frames)
    else:
        # if number of frames > required number,
        # concentrate on the central part.
        # We can just skip some parts of the video, 
        # but it would be uneffective for training as there are gonna be a lot of unnecessary data.
        middle = total_frames // 2
        radius = SEQUENCE_LENGTH // 2

        # Take consecutive frames around the center by index
        indices = range(max(0, middle-radius), min(total_frames, middle+radius))

        # If the number of frames still not enough,
        # randomly select the SEQUENCE_LENGTH of frames
        if len(indices) < SEQUENCE_LENGTH:
            indices = sorted(random.sample(range(total_frames), SEQUENCE_LENGTH))

    # Frames processing 
    for idx in indices:
        cap.set(cv2.CAP_PROP_POS_FRAMES, idx)  # set the position on a frame by index
        ret, frame = cap.read()  # Read frame
        if ret:
            frame = cv2.resize(frame, (IMAGE_HEIGHT, IMAGE_WIDTH))  # Change the size
            frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)  # Transform a color format from BGR to RGB 
            frames.append(frame)  # Append to the list


    while len(frames) < SEQUENCE_LENGTH:
        # If not enough frames, just copy the last one and flip it 
        frames.append(np.flip(frames[-1], axis=0))
    """
    P.s. Its possible to add this step to overall training and slightly improve the accurasy,
    but here I decided not to do it because it could lead to overtraining.
    """

    # Release the captioned resources
    cap.release()

    # Return the frames array using float16 data type
    return np.array(frames, dtype=np.float16)


In [None]:
def extract_and_cache_features():
    # Iterate over each class in the list
    for class_idx, class_name in enumerate(CLASSES_LIST):
        # Connect path of a class with the video 
        class_dir = os.path.join(DATASET_DIR, class_name)

        # Path, where features gonna be stored
        save_dir = f"features_2\\{class_name}"
        os.makedirs(save_dir, exist_ok=True)  

        # Iterate over each video in the class folder
        for video_file in os.listdir(class_dir):
            video_path = os.path.join(class_dir, video_file)  # full path to the video

            # Extract frames from a previously defined function 
            frames = smart_frame_extraction(video_path)

            # Extract features from a previously defined model 
            features = feature_extractor.predict(frames, verbose=0)

            # Transform the format to float16 for the efficient storage handling
            features = features.astype(np.float16)

            # Check: if features extracted right
            if features.shape == (SEQUENCE_LENGTH, 1280):
                print(f"@ The form of the extracted features: {features.shape} for video {video_file}")

                # Path for saving a file in .npy format
                save_path = os.path.join(save_dir, f"{video_file}.npy")

                np.save(save_path, features)
                print(f"-- Features, saved for video: {video_file}")

            else:
                # Warnings
                print(f"! Warning: extracted objects do not match the shape {features.shape} for {video_file}\n")
                print(f"! Warning: not suitable number of frames {len(frames)} in {video_file}\n")


In [None]:
# Run
extract_and_cache_features()