# Creating a Modular Version

In [1]:
# Creating a directory to store the scripts
import os
os.makedirs('module', exist_ok=True)

## Create Dataset - data_setup.py

In [2]:
%%writefile module/data_setup.py
import os
import numpy as np
import cv2
from sklearn.model_selection import train_test_split
import tensorflow as tf
from tensorflow.keras.utils import to_categorical

class Dataset:
    """
    Creates a tensorflow dataset for video files.
    The data needs to been in a Imagenet directory structure.
    After processing the data, two tf.data train and test will be returned.
    
    Parameters: 
        data_path: A string of the parent directory for all the data.
        class_list: A list containing classes names that will be needed in the dataset.
        seq_len(default=20): A integer for selecting total frames from the video.
        frame_size(default=128): A integer for resizing height and width of the frames.
        batch_size(default=32): A integer for selecting the size of a batch.
        seed(default=42): A integer for controlling the randomness of random numbers generator.
    
    Returns:
        train_ds, test_ds: A tuple of training and testing dataset pipeline. 
    """
    def __init__(self, data_path, seq_len = 20, frame_size = 128, batch_size = 32, seed = 42, class_list=None):
        self.data_path = data_path
        self.seq_len = seq_len
        self.frame_size = frame_size
        self.batch_size = batch_size
        self.seed = seed
        
        # Handling class_list
        if class_list==None:
            self.classes = sorted(os.listdir(self.data_path))
        else:
            self.classes = sorted(class_list)
    
    def frames_extraction(self, video_file_path):
        frames_list = []
        # Reading the video file and counting the frames
        video_reader = cv2.VideoCapture(video_file_path)
        video_frame_count = int(video_reader.get(cv2.CAP_PROP_FRAME_COUNT))

        # Selecting the frames at certain interval and applying the transformation
        skip_frames = max(int(video_frame_count/self.seq_len), 1)
        for i in range(self.seq_len):
            video_reader.set(cv2.CAP_PROP_POS_FRAMES, i * skip_frames)
            success, frame = video_reader.read()
            if not success:
                break
            resize_frame = cv2.resize(frame, (self.frame_size, self.frame_size))
            norm_frame = resize_frame/255.
            frames_list.append(norm_frame.astype('float32'))
        video_reader.release()
        return frames_list
    
    def create_dataset(self):
        features = []
        labels = []
        video_files_path = []
        
        # Going through all the data in the class list
        print(f'[INFO] Extracting data from {len(self.classes)} classes...')
        for i, class_name in enumerate(self.classes):
            print(f'[INFO] Extracting all the data in the class: {class_name}')

            # Getting the list of all the video files and the path to the video
            files_list = os.listdir(os.path.join(self.data_path, class_name))
            for file_name in files_list:
                video_file_path = os.path.join(self.data_path, class_name, file_name)

                # Extracting frames using the function and verifying the total frames
                frames_list = self.frames_extraction(video_file_path=video_file_path)
                if len(frames_list) == self.seq_len:

                    # Appending the data in a list
                    features.append(frames_list)
                    labels.append(i)
                    video_files_path.append(video_file_path)
        
        # Converting the list to array
        features = np.asarray(features)
        labels = np.asarray(labels)
        
        # Hot encoding the labels
        labels = to_categorical(labels)
        print('[INFO] Datset is been created')
        return features, labels, video_files_path
    
    def split_dataset(self):
        features, labels, video_files_path = self.create_dataset()
        train_features, test_features, train_labels, test_labels = train_test_split(features, 
                                                                                    labels, 
                                                                                    test_size=0.25, 
                                                                                    shuffle=True, 
                                                                                    random_state=self.seed)
        print('[INFO] Dataset is been splitted into train and test set.')
        return train_features, test_features, train_labels, test_labels
    
    def dataset_pipeline(self):
        train_features, test_features, train_labels, test_labels = self.split_dataset()
        train_ds = tf.data.Dataset.from_tensor_slices((train_features,
                                                       train_labels)).shuffle(10000, self.seed).batch(self.batch_size, True).prefetch(tf.data.AUTOTUNE)
        test_ds = tf.data.Dataset.from_tensor_slices((test_features,
                                                      test_labels)).batch(self.batch_size, True).prefetch(tf.data.AUTOTUNE)
        print('[INFO] Dataset pipeline is been created')
        return train_ds, test_ds

Writing module/data_setup.py


In [3]:
from module.data_setup import Dataset
dataset = Dataset(data_path='data/UCF50', class_list=['Biking', 'Diving', 'GolfSwing', 'Punch', 'Rowing'])
train_ds, test_ds = dataset.dataset_pipeline()

[INFO] Extracting data from 5 classes...
[INFO] Extracting all the data in the class: Biking
[INFO] Extracting all the data in the class: Diving
[INFO] Extracting all the data in the class: GolfSwing
[INFO] Extracting all the data in the class: Punch
[INFO] Extracting all the data in the class: Rowing
[INFO] Datset is been created
[INFO] Dataset is been splitted into train and test set.
[INFO] Dataset pipeline is been created


In [4]:
len(train_ds), len(test_ds)

(17, 5)

## Create Model - model_builder.py

In [5]:
%%writefile module/model_builder.py
from tensorflow.keras import Model, Sequential
from tensorflow.keras.layers import ConvLSTM2D, Conv2D, MaxPooling3D, MaxPooling2D, LSTM, TimeDistributed, Dropout, Flatten, Dense

class CreateConvlstmModel(Model):
    """
    Constructs and Initiates a ConvLSTM model for video classification.

    Parameters: 
        input_shape: tuple, Input shape of the array that is feeded in the model.
                     Format of the input_shape should be (timesteps, height, width, channels)
        num_classes: int, Total number of classes that model needs to predict.

    Returns: Fully Constructed ConvLSTM Model.
    """
    def __init__(self, input_shape: tuple, num_classes: int):
        super(CreateConvlstmModel, self).__init__()
        self.input_block1 = Sequential([
            ConvLSTM2D(4, 3, activation='tanh', data_format='channels_last', recurrent_dropout=0.2, return_sequences=True, input_shape=input_shape),
            MaxPooling3D((1, 2, 2), padding='same', data_format='channels_last'),
            TimeDistributed(Dropout(0.2))
        ])
        self.block2 = Sequential([
            ConvLSTM2D(8, 3, activation='tanh', data_format='channels_last', recurrent_dropout=0.2, return_sequences=True),
            MaxPooling3D((1, 2, 2), padding='same', data_format='channels_last'),
            TimeDistributed(Dropout(0.2))
        ])
        self.block3 = Sequential([
            ConvLSTM2D(12, 3, activation='tanh', data_format='channels_last', recurrent_dropout=0.2, return_sequences=True),
            MaxPooling3D((1, 2, 2), padding='same', data_format='channels_last'),
            TimeDistributed(Dropout(0.2))
        ])
        self.block4 = Sequential([
            ConvLSTM2D(16, 3, activation='tanh', data_format='channels_last', recurrent_dropout=0.2, return_sequences=True),
            MaxPooling3D((1, 2, 2), padding='same', data_format='channels_last')
        ])
        self.classifier_block = Sequential([
            Flatten(),
            Dense(num_classes, activation='softmax')
        ])
    
    def call(self, x):
        x = self.input_block1(x)
        x = self.block2(x)
        x = self.block3(x)
        x = self.block4(x)
        return self.classifier_block(x)

class CreateLRCNModel(Model):
    """
    Constructs and Initiates a LRCN model for video classification.

    Parameters: 
        input_shape: tuple, Input shape of the array that is feeded in the model.
                     Format of the input_shape should be (timesteps, height, width, channels)
        num_classes: int, Total number of classes that model needs to predict.

    Returns: Fully Constructed LRCN Model.
    """
    def __init__(self, input_shape: tuple, num_classes: int):
        super(CreateLRCNModel, self).__init__()
        self.input_block1 = Sequential([
            TimeDistributed(Conv2D(16, 3, padding='same', activation='relu'), input_shape=input_shape),
            TimeDistributed(MaxPooling2D(4)),
            TimeDistributed(Dropout(0.25))
        ])
        self.block2 = Sequential([
            TimeDistributed(Conv2D(32, 3, padding='same', activation='relu')),
            TimeDistributed(MaxPooling2D(4)),
            TimeDistributed(Dropout(0.25))
        ])
        self.block3 = Sequential([
            TimeDistributed(Conv2D(64, 3, padding='same', activation='relu')),
            TimeDistributed(MaxPooling2D(2)),
            TimeDistributed(Dropout(0.25))
        ])
        self.block4 = Sequential([
            TimeDistributed(Conv2D(64, 3, padding='same', activation='relu')),
            TimeDistributed(MaxPooling2D(2))
        ])
        self.classifier_block = Sequential([
            TimeDistributed(Flatten()),
            LSTM(32),
            Dense(num_classes, activation='softmax')
        ])
        
    def call(self, x):
        x = self.input_block1(x)
        x = self.block2(x)
        x = self.block3(x)
        x = self.block4(x)
        return self.classifier_block(x)

Writing module/model_builder.py


In [6]:
from module.model_builder import CreateConvlstmModel, CreateLRCNModel
model = CreateLRCNModel((20, 128, 128, 3), 2)
model.build((32, 20, 128, 128, 3))
model.summary()

Model: "create_lrcn_model"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 sequential (Sequential)     (None, 20, 32, 32, 16)    448       
                                                                 
 sequential_1 (Sequential)   (32, 20, 8, 8, 32)        4640      
                                                                 
 sequential_2 (Sequential)   (32, 20, 4, 4, 64)        18496     
                                                                 
 sequential_3 (Sequential)   (32, 20, 2, 2, 64)        36928     
                                                                 
 sequential_4 (Sequential)   (32, 2)                   37058     
                                                                 
Total params: 97,570
Trainable params: 97,570
Non-trainable params: 0
_________________________________________________________________


## Create utilities - utils.py

In [7]:
%%writefile module/utils.py
import os
import datetime as dt
from tensorflow.keras.callbacks import EarlyStopping, TensorBoard, ReduceLROnPlateau

def tensorboard_callback(dir_name: str, model_name: str, exp_name: str):
    """
    Creates Tensorboard Callback
    Parameters: 
        dir_name: A string to save tensorboard data in a directory.
        model_name: A string for the model name.
        exp_name: A string for the experiment name.
    Returns: A pre-configured tensorboard callback.
    """
    # Creating a tensorboard callback
    log_dir = os.path.join(dir_name, model_name, exp_name, dt.datetime.now().strftime('%Y-%m-%d-%H:%M:%S'))
    tensorboard_callback = TensorBoard(log_dir=log_dir)
    print(f'[INFO] Saving Tensorboard log files to: {log_dir}')
    return tensorboard_callback
    
# Creating a early stopping callback
early_stopping_callback = EarlyStopping(monitor='val_loss',
                                        patience=20,
                                        mode='min',
                                        verbose=1,
                                        restore_best_weights=True)
    
# Creating a reduce learning rate callback
reduce_lr_callback = ReduceLROnPlateau(monitor='val_loss',
                                       mode='min',
                                       factor=0.2,
                                       patience=10,
                                       verbose=1,
                                       min_lr=1e-7)

def save_model(model, dir_name: str, model_name: str, exp_name: str):
    """
    A function to save a tensorflow model.
    Parameters: 
        model: A trained model.
        dir_name: A string to save whole model data in a directory.
        model_name: A string for the model name.
        exp_name: A string for the experiment name.
    """
    filepath = os.path.join(dir_name, model_name, exp_name, dt.datetime.now().strftime('%Y-%m-%d-%H:%M:%S'))
    model.save(filepath=filepath, save_format='tf')
    print(f'[INFO] "{model_name}" Model is been saved to directory: {filepath}')

Writing module/utils.py


## Train and Save the model - train.py

In [8]:
%%writefile module/train.py
import os
import argparse
import datetime as dt
from sys import exit
import data_setup, model_builder, utils
import tensorflow as tf

# Creating a parser
parser = argparse.ArgumentParser(description='Get some hyperparameters')

# Getting hyperparameters
# Model name
parser.add_argument('--model_name',
                    default='LRCN',
                    choices=('ConvLSTM', 'LRCN'),
                    type=str,
                    help='Name of the model')

# Experiment name
parser.add_argument('--exp_name',
                    default='experiment',
                    type=str,
                    help='Name of the experiment')

# Number of epochs
parser.add_argument('--num_epochs',
                    default=100,
                    type=int,
                    help='The number of epochs to train the model')

# batch size
parser.add_argument('--batch_size',
                    default=32,
                    type=int,
                    help='The number of sample for batch data')

# Sequence length
parser.add_argument('--seq_len',
                    default=20,
                    type=int,
                    help='Total number of frames for every video')

# Frame size
parser.add_argument('--frame_size',
                    default=128,
                    type=int,
                    help='Integer for resizing the frame')

# learning rate
parser.add_argument('--lr',
                    default=0.001,
                    type=float,
                    help='Learning rate for optimizer')

# Data directory path
parser.add_argument('--data_dir_path',
                    default='data/UCF50',
                    type=str,
                    help='A path for data directory')

# Classes list
parser.add_argument('--class_list',
                    default=None,
                    nargs='+',
                    help='A list containing class names, use None for all the classes')

# Number of workers
parser.add_argument('--num_workers',
                    default=os.cpu_count(),
                    type=int,
                    help='Workers you want to assign durning the model training.')

# Callbacks
parser.add_argument('--callbacks',
                    default='True',
                    choices=('True', 'False'),
                    type=str,
                    help='Select a boolean to use callbacks durning training.')

# Getting the arguments from parser 
args = parser.parse_args()

# Collecting the arguments
MODEL_NAME = args.model_name
EXP_NAME = args.exp_name
NUM_EPOCHS = args.num_epochs
BATCH_SIZE = args.batch_size
SEQ_LEN = args.seq_len
FRAME_SIZE = args.frame_size
LR = args.lr
DATA_PATH = args.data_dir_path
CLASSES = args.class_list
NUM_WORKERS = args.num_workers
CALLBACKS = args.callbacks

# Error handling
if not CLASSES == None:
    # Checking valid class list
    ucf_class_list = data_setup.Dataset(data_path=DATA_PATH).classes
    for i in CLASSES:
        if i not in ucf_class_list:
            print(f'[ERROR] "{i}" is a wrong class name.')
            print(f'[INFO] Kindly select classes from this list: {ucf_class_list}')
            exit()
    
    # Checking total classes used for training
    if not len(CLASSES) >= 3:
        print(f'[ERROR] The Class list "{CLASSES}", contains less classes than the requirement.')
        print('[INFO] Minimum required classes in class list is 3. If you want to use the whole dataset than do not use this flag.')
        exit()
    
print(f'\n[INFO] Training a {MODEL_NAME} model for {NUM_EPOCHS} epochs with batch size {BATCH_SIZE} and a learning rate of {LR}')

# Creating dataset using data_setup script
dataset = data_setup.Dataset(data_path=DATA_PATH, 
                             seq_len=SEQ_LEN,
                             frame_size=FRAME_SIZE,
                             batch_size=BATCH_SIZE,
                             class_list=CLASSES)
train_ds, test_ds = dataset.dataset_pipeline()

# Creating model using the model_builder script
# Getting number of classes
if CLASSES == None:
    NUM_CLASSES = 50
else:
    NUM_CLASSES = len(CLASSES)

# Selecting a model and creating it.
if MODEL_NAME == 'ConvLSTM':
    model = model_builder.CreateConvlstmModel(input_shape = (SEQ_LEN, FRAME_SIZE, FRAME_SIZE, 3), 
                                              num_classes = NUM_CLASSES)
    model.build((BATCH_SIZE, SEQ_LEN, FRAME_SIZE, FRAME_SIZE, 3))
    print(f'[INFO] Model "{MODEL_NAME}" is been constructed.')
elif MODEL_NAME == 'LRCN':
    model = model_builder.CreateLRCNModel(input_shape = (SEQ_LEN, FRAME_SIZE, FRAME_SIZE, 3), 
                                          num_classes = NUM_CLASSES)
    model.build((BATCH_SIZE, SEQ_LEN, FRAME_SIZE, FRAME_SIZE, 3))
    print(f'[INFO] Model "{MODEL_NAME}" is been constructed.')
    
# Compiling the model
model.compile(loss='categorical_crossentropy',
              optimizer=tf.keras.optimizers.Adam(learning_rate=LR),
              metrics=['accuracy'])

# Setting up callbacks
if CALLBACKS == 'True':
    callbacks = [utils.tensorboard_callback(dir_name='training_logs', model_name=MODEL_NAME, exp_name=EXP_NAME),
                 utils.early_stopping_callback,
                 utils.reduce_lr_callback]
elif CALLBACKS == 'False':
    callbacks = [utils.tensorboard_callback(dir_name='training_logs', model_name=MODEL_NAME, exp_name=EXP_NAME)]

# Fitting the model
model.fit(train_ds,
          epochs=NUM_EPOCHS,
          steps_per_epoch=len(train_ds),
          validation_data=test_ds,
          validation_steps=len(test_ds),
          callbacks=callbacks,
          workers=NUM_WORKERS)

# Save the model
utils.save_model(model=model, dir_name='saved_model', model_name=MODEL_NAME, exp_name=EXP_NAME)

Writing module/train.py


## Prediction on saved model - predict.py

In [9]:
%%writefile module/predict.py
import os
import cv2
import argparse
import numpy as np
from sys import exit
import tensorflow as tf

# Creating a parser
parser = argparse.ArgumentParser(description='Get some hyperparameters')

# Getting hyperparameters
# Sequence length
parser.add_argument('--seq_len',
                    default=20,
                    type=int,
                    help='Total number of frames for every video')

# Frame size
parser.add_argument('--frame_size',
                    default=128,
                    type=int,
                    help='Integer for resizing the frame')

# Video file path
parser.add_argument('--video_path',
                    type=str,
                    help='File path of the video for predicting.')

# Saved model
parser.add_argument('--model_path',
                    default='saved_model/lrcn_model2_2023-03-21-09:40:09_loss:0.1253_accuracy:0.9622',
                    type=str,
                    help='Target model path to use for the prediction.')

# Class list
parser.add_argument('--class_list',
                    default=['Biking', 'Diving', 'GolfSwing', 'Punch', 'Rowing'],
                    nargs='+',
                    help='A list containing class names')

args = parser.parse_args()

SEQ_LEN = args.seq_len
FRAME_SIZE = args.frame_size
VIDEO_PATH = args.video_path
MODEL_PATH = args.model_path
CLASS_LIST = args.class_list

print(f'[INFO] Predicting video file: "{VIDEO_PATH}" using model: "{MODEL_PATH}".')

# Loading the model
model = tf.keras.models.load_model(filepath=MODEL_PATH)
print('[INFO] Model is been loaded and ready for prediction.')

# function to process the video file
def frames_extraction(video_file_path):
    frames_list = []
    
    # Reading the video file and counting the frames
    video_reader = cv2.VideoCapture(video_file_path)
    video_frame_count = int(video_reader.get(cv2.CAP_PROP_FRAME_COUNT))
    
    # Selecting the frames at certain interval and applying the transformation
    skip_frames = max(int(video_frame_count/SEQ_LEN), 1)
    for i in range(SEQ_LEN):
        video_reader.set(cv2.CAP_PROP_POS_FRAMES, i * skip_frames)
        success, frame = video_reader.read()
        if not success:
            break
        resize_frame = cv2.resize(frame, (FRAME_SIZE, FRAME_SIZE))
        norm_frame = resize_frame/255.
        frames_list.append(norm_frame.astype('float32'))
    video_reader.release()
    return frames_list

# Processing the video file
frames_list = frames_extraction(video_file_path=VIDEO_PATH)
print('[INFO] Video file is ready for prediction.')

# predicting using the model 
pred_prob = model.predict(np.expand_dims(frames_list, axis=0))[0]
pred_label = np.argmax(pred_prob)
pred_class = args.class_list[pred_label]

# printing the result
print(f'[INFO] Action predicted by the model : {pred_class}')
print(f'[INFO] Prediction probalities: {pred_prob[pred_label]:.2f}\n')

Writing module/predict.py
