In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
#Required Imports
import os

import matplotlib
matplotlib.use('AGG')
import matplotlib.pyplot as plt

import numpy as np

import cv2

from keras.layers import (Input, Activation, Conv3D, Dense, Dropout, Flatten, MaxPooling3D, Input, average, BatchNormalization, LeakyReLU)
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau
from keras.models import Model
from keras.models import Sequential
from tensorflow.keras.utils import to_categorical

from sklearn.model_selection import train_test_split
from tqdm import tqdm

In [None]:
# Check GPU Availability
import tensorflow as tf
print("Num GPUs Available:", len(tf.config.list_physical_devices('GPU')))

# Reduce Precision From float32 To float16
# from tensorflow.keras import mixed_precision
# mixed_precision.set_global_policy('mixed_float16')

#Accelerated Linear Algebra
tf.config.optimizer.set_jit(True)

# Create An Optimizer With Loss Scaling To Prevent Precision Issues
# from tensorflow.keras.mixed_precision import LossScaleOptimizer
# from tensorflow.keras.optimizers import Adam
# opt = LossScaleOptimizer(Adam(learning_rate=1e-4))

Num GPUs Available: 0


In [None]:
#Class To Extract Frames From Video (Converting Video File To 3D Array For Processing)
class Videoto3D:

  def __init__(self, width, height, depth):
    self.width = width
    self.height = height
    self.depth = depth

  #Skip Frames In The Video For Efficiency
  def video3d(self, filename, color=False, skip=True):
    cap = cv2.VideoCapture(filename)
    nframe = cap.get(cv2.CAP_PROP_FRAME_COUNT)
    if skip:
      frames = [x * nframe / self.depth for x in range(self.depth)]
    else:
      frames = [x for x in range(self.depth)]
    framearray = []

    for i in range(self.depth):
      cap.set(cv2.CAP_PROP_POS_FRAMES, frames[i])
      ret, frame = cap.read()
      frame = cv2.resize(frame, (self.height, self.width))
      if color:
        framearray.append(frame)
      else:
        framearray.append(cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY))

    cap.release()
    return np.array(framearray) #Outputs A Numpy Array For Each Video

In [None]:
#Function To Load Videos And Labels
def loaddata(video_dir, vid3d, nclass, result_dir, color=False, skip=True):

  categories = sorted(os.listdir(video_dir)) #One Folder Per Category
  X = []
  labels = []
  labellist = []

  total_categories = len(categories)  # For Progress Bar
  pbar = tqdm(total=total_categories, desc="Processing Directories")

  for category in categories:

    if category in ['.DS_Store', 'output'] :
      continue

    category_path = os.path.join(video_dir, category)

    if category not in labellist:
      if len(labellist) >= nclass: #Only Process Given Amount Of Classes
        break
      labellist.append(category)

    files = os.listdir(category_path)
    for filename in files:

      if filename == '.DS_Store':
        continue

      file_path = os.path.join(category_path, filename)
      X.append(vid3d.video3d(file_path, color=color, skip=skip))
      labels.append(category)

    pbar.update(1)

  pbar.close()

  for num, label in enumerate(labellist): #Assign Numbers For Classes
    for i in range(len(labels)):
      if label == labels[i]:
        labels[i] = num

  if color:
    return np.array(X).transpose((0, 2, 3, 4, 1)), labels #(num_samples, height, width, depth, channels)
  else:
    return np.array(X).transpose((0, 2, 3, 1)), labels #(num_samples, height, width, depth)

In [None]:
#Creating A 3D Convolutional Neural Network With Leaky Relu & Batch Normalization
def create_3dcnn(input_shape, n_classes):
  model = Sequential()

  model.add(Input(shape=input_shape))
  model.add(Conv3D(32, kernel_size=(3,3,3), padding='same'))
  model.add(BatchNormalization())  # Standardize activations
  model.add(LeakyReLU(negative_slope=0.01)) # Leaky ReLU instead of ReLU

  model.add(Conv3D(32, kernel_size=(3,3,3), padding='same'))
  model.add(BatchNormalization())
  model.add(LeakyReLU(negative_slope=0.01))

  model.add(MaxPooling3D(pool_size=(3, 3, 3)))
  model.add(Dropout(0.25))

  model.add(Conv3D(64, kernel_size=(3,3,3), padding='same'))
  model.add(BatchNormalization())
  model.add(LeakyReLU(negative_slope=0.01))

  model.add(Conv3D(64, kernel_size=(3,3,3), padding='same'))
  model.add(BatchNormalization())
  model.add(LeakyReLU(negative_slope=0.01))

  model.add(MaxPooling3D(pool_size=(3, 3, 3)))
  model.add(Dropout(0.25))

  model.add(Flatten())
  model.add(Dense(512))
  model.add(BatchNormalization())
  model.add(LeakyReLU(negative_slope=0.01))
  model.add(Dropout(0.5))

  model.add(Dense(n_classes, activation='softmax'))

  return model

In [None]:
#Variables

in_dir = '/content/drive/MyDrive/UCF-101'

out_dir = '/content/drive/MyDrive/UCF-101/output'
if not os.path.isdir(out_dir): #Create Directory If It Doesn't Exist
  os.makedirs(out_dir)

model_weights = '/content/drive/MyDrive/UCF-101/output/model_weights'
if not os.path.isdir(model_weights):
  os.makedirs(model_weights)

results = '/content/drive/MyDrive/UCF-101/output/results'
if not os.path.isdir(results):
  os.makedirs(results)

n_classes = 101
img_rows,img_cols,frames = 32,32,10

color = False
skip = True

channel = 3 if color else 1

In [None]:
#Saving Each Model's Training/Validation Accuracy & Loss
def plot_history(history, name):
  plt.plot(history.history['acc'], marker='.')
  plt.plot(history.history['val_acc'], marker='.')
  plt.title('model accuracy')
  plt.xlabel('epoch')
  plt.ylabel('accuracy')
  plt.grid()
  plt.savefig(os.path.join(results, '{}_accuracy.png'.format(name)))
  plt.close()

  plt.plot(history.history['loss'], marker='.')
  plt.plot(history.history['val_loss'], marker='.')
  plt.title('model loss')
  plt.xlabel('epoch')
  plt.ylabel('loss')
  plt.grid()
  plt.savefig(os.path.join(results, '{}_loss.png'.format(name)))
  plt.close()

In [None]:
#Function To Load Training & Testing Data
def Train_Test_Data():

  vid3d = Videoto3D(img_rows, img_cols, frames)

  preloaded_data_path = '/content/drive/MyDrive/UCF-101/output/dataset.npz'

  if os.path.exists(preloaded_data_path): #Load Data If It Already Exists
    loadeddata = np.load(preloaded_data_path)
    X, Y = loadeddata["X"], loadeddata["Y"]
  else:
    x, y = loaddata(in_dir, vid3d, n_classes, out_dir, color, skip) #Load And Save Data If It Doesn't Exist
    X = x.reshape((x.shape[0], img_rows, img_cols, frames, channel))
    Y = to_categorical(y, n_classes)

    X = X.astype('float16')
    np.savez(preloaded_data_path, X=X, Y=Y)
  print('X_shape:{} Y_shape:{}'.format(X.shape, Y.shape))

  return train_test_split(X, Y, test_size=0.2), X.shape[1: ] #No Random State To Improve Ensemble Performance

(X_train, X_test, Y_train, Y_test), in_shape = Train_Test_Data()

X_shape:(13320, 32, 32, 10, 1) Y_shape:(13320, 101)


In [None]:
#Function To Train An Ensemble Of Models
def train_model(n_models=10, epochs=100, batch_size=128):

  #Train Test Generator
  train_dataset = tf.data.Dataset.from_tensor_slices((X_train, Y_train)).shuffle(len(X_train)).batch(batch_size).prefetch(tf.data.AUTOTUNE)
  test_dataset = tf.data.Dataset.from_tensor_slices((X_test, Y_test)).batch(batch_size).prefetch(tf.data.AUTOTUNE)

  #Implementing Model CallBacks
  early_stopping = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True, verbose=1)
  reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=5, min_lr=1e-6, verbose=1)

  #Training Models Individually
  models=[]
  for i in range(n_models):
    print(f"Model {i}:")
    models.append(create_3dcnn(in_shape, n_classes))
    models[-1].compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

    model_path = os.path.join(model_weights, f"model_{i}.weights.h5")

    #Load Weights If Model Already Trained
    if os.path.exists(model_path):
      models[-1].load_weights(model_path)
      print(f"Model {i} weights loaded from: {model_path}")

    #Train The Model If Weights Do Not Exist
    else:
      history = models[-1].fit(train_dataset, validation_data=test_dataset, epochs=epochs, callbacks=[early_stopping, reduce_lr], verbose=1)
      models[-1].save_weights(model_path)
      print(f"Model {i} weights saved at: {model_path}")
      plot_history(history, i)

  # models=[]
  # for i in range(n_models):
  #   print(f"Model {i}:")
  #   models.append(create_3dcnn(in_shape, n_classes))
  #   models[-1].compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
  #   history = models[-1].fit(train_dataset, validation_data=test_dataset, epochs=epochs, callbacks=[early_stopping, reduce_lr], verbose=1)
  #   model_path = os.path.join(model_weights, f"model_{i}.weights.h5")
  #   models[-1].save_weights(model_path)
  #   print(f"Model {i} weights saved at: {model_path}")
  #   plot_history(history, i)

  #Creating An Ensemble Model
  model_inputs = [Input(shape=in_shape) for _ in range (n_models)]
  model_outputs = [models[i](model_inputs[i]) for i in range (n_models)]
  model_outputs = average(inputs=model_outputs)
  model = Model(inputs=model_inputs, outputs=model_outputs)
  model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

  model.summary()

  model.save(os.path.join(out_dir, 'ucf101_3dcnnmodel.h5'))

  X_test_copies = [np.copy(X_test) for _ in range(n_models)]
  loss, acc = model.evaluate(X_test_copies, Y_test, verbose=0)
  with open(os.path.join(out_dir, 'result.txt'), 'w') as f:
    f.write(f"Test loss: {loss:.4f}, Test accuracy: {acc:.4f}")

  print('merged model:')
  print('Test loss:', loss)
  print('Test accuracy:', acc)

In [None]:
train_model() #Default Hyperparameters