In [1]:
%%capture
%pip install opencv-python
%pip install tqdm
%pip install tensorflow
%pip install imageio
%pip install tensorflow_docs
%pip install scikit-learn
%pip install matplotlib
%pip install keras-tuner
%pip install pandas
%pip install tensorflow-addons

In [2]:
import random
import pathlib
from sklearn.metrics import classification_report, confusion_matrix, ConfusionMatrixDisplay
import itertools
from pathlib import Path
import matplotlib.pyplot as plt
import os
import cv2
import numpy as np
import tensorflow as tf
import imageio
from IPython import display
from urllib import request
from tensorflow_docs.vis import embed
from tensorflow.keras.metrics import Precision, Recall
import tensorflow_addons as tfa
import pandas as pd
from tensorflow import keras
from kerastuner.tuners import RandomSearch
from keras_tuner import Objective
from keras.optimizers import SGD, Adam, RMSprop


TensorFlow Addons (TFA) has ended development and introduction of new features.
TFA has entered a minimal maintenance and release mode until a planned end of life in May 2024.
Please modify downstream libraries to take dependencies from other repositories in our TensorFlow community (e.g. Keras, Keras-CV, and Keras-NLP). 

For more information see: https://github.com/tensorflow/addons/issues/2807 

  from kerastuner.tuners import RandomSearch


In [3]:
subset_paths = {}
subset_paths['train'] = Path(r'C:\Users\2alex\OneDrive\Documents\GitHub\FightClub\Data\train')
subset_paths['test'] = Path(r'C:\Users\2alex\OneDrive\Documents\GitHub\FightClub\Data\test')
subset_paths['val'] = Path(r'C:\Users\2alex\OneDrive\Documents\GitHub\FightClub\Data\val')

In [4]:
def format_frames(frame, output_size):
    """
      Pad and resize an image from a video.

      Args:
        frame: Image that needs to resized and padded.
        output_size: Pixel size of the output frame image.

      Return:
        Formatted frame with padding of specified output size.
    """
    frame = tf.image.convert_image_dtype(frame, tf.float32)
    frame = tf.image.resize_with_pad(frame, *output_size)
    return frame

In [5]:
def frames_from_video_file(video_path, n_frames, output_size=(224, 224), frame_step=15):
    """
      Creates frames from each video file present for each category.

      Args:
        video_path: File path to the video.
        n_frames: Number of frames to be created per video file.
        output_size: Pixel size of the output frame image.

      Return:
        An NumPy array of frames in the shape of (n_frames, height, width, channels).
    """
    # Read each video frame by frame
    result = []
    src = cv2.VideoCapture(str(video_path))

    video_length = src.get(cv2.CAP_PROP_FRAME_COUNT)

    need_length = 1 + (n_frames - 1) * frame_step

    if need_length > video_length:
        start = 0
    else:
        max_start = video_length - need_length
        start = random.randint(0, max_start + 1)

    src.set(cv2.CAP_PROP_POS_FRAMES, start)
    # ret is a boolean indicating whether read was successful, frame is the image itself
    ret, frame = src.read()
    result.append(format_frames(frame, output_size))

    for _ in range(n_frames - 1):
        for _ in range(frame_step):
            ret, frame = src.read()
        if ret:
            frame = format_frames(frame, output_size)
            result.append(frame)
        else:
            result.append(np.zeros_like(result[0]))
    src.release()
    result = np.array(result)[..., [2, 1, 0]]

    return result

In [6]:
class FrameGenerator:
    def __init__(self, path, n_frames, training=False):
        """ Returns a set of frames with their associated label.

          Args:
            path: Video file paths.
            n_frames: Number of frames.
            training: Boolean to determine if training dataset is being created.
        """
        self.path = path
        self.n_frames = n_frames
        self.training = training
        self.class_names = sorted(
            set(p.name for p in self.path.iterdir() if p.is_dir()))
        self.class_ids_for_name = dict((name, idx)
                                       for idx, name in enumerate(self.class_names))

    def get_files_and_class_names(self):
        video_paths = list(self.path.glob('*/*.mp4'))
        classes = [p.parent.name for p in video_paths]
        return video_paths, classes

    def __call__(self):
        video_paths, classes = self.get_files_and_class_names()
        pairs = list(zip(video_paths, classes))

        if self.training:
            random.shuffle(pairs)

        for path, name in pairs:
            video_frames = frames_from_video_file(path, self.n_frames)
            label = self.class_ids_for_name[name]
            # Encode labels
            yield video_frames, label


In [7]:
output_signature = (tf.TensorSpec(shape=(None, None, None, 3), dtype=tf.float32),
                    tf.TensorSpec(shape=(), dtype=tf.float32))

In [8]:
# Create the training set
train_ds = tf.data.Dataset.from_generator(FrameGenerator(
    subset_paths['train'], 6, training=True),                                        output_signature=output_signature)
# Create the validation set
val_ds = tf.data.Dataset.from_generator(FrameGenerator(subset_paths['val'], 6),
                                        output_signature=output_signature)
# create the test set
test_ds = tf.data.Dataset.from_generator(FrameGenerator(subset_paths['test'], 6),
                                         output_signature=output_signature)

In [9]:
AUTOTUNE = tf.data.AUTOTUNE

train_ds = train_ds.cache().shuffle(1000).prefetch(buffer_size=AUTOTUNE)
val_ds = val_ds.cache().shuffle(1000).prefetch(buffer_size=AUTOTUNE)
test_ds = test_ds.cache().shuffle(1000).prefetch(buffer_size=AUTOTUNE)

train_ds = train_ds.batch(2)
val_ds = val_ds.batch(2)
test_ds = val_ds.batch(2)

train_frames, train_labels = next(iter(train_ds))
val_frames, val_labels = next(iter(val_ds))
test_frames, test_labels = next(iter(test_ds))

In [10]:
net = tf.keras.applications.EfficientNetB0(include_top=False)
net.trainable = False

In [11]:
def model_builder(hp):
    model = tf.keras.Sequential()
    model.add(tf.keras.layers.Rescaling(scale=255))
    model.add(tf.keras.layers.TimeDistributed(net))
    model.add(tf.keras.layers.Dense(units=hp.Int(
        'units', min_value=32, max_value=512, step=32), activation='relu'))
    model.add(tf.keras.layers.GlobalAveragePooling3D())
    model.add(tf.keras.layers.Dense(1, activation='sigmoid'))

    custom_optimizer = keras.optimizers.Adam(
        learning_rate=hp.Choice('learning_rate', values=[1e-2, 1e-3, 1e-4]),
        beta_1=hp.Choice('beta_1', values=[0.9, 0.99, 0.999]),
        beta_2=hp.Choice('beta_2', values=[0.999, 0.9999]),
        epsilon=hp.Float('epsilon', min_value=1e-10, max_value=1e-7)
    )

    # Define metrics
    #metrics = [tf.keras.metrics.AUC(), tf.keras.metrics.Recall(), tf.keras.metrics.Precision(), tf.keras.metrics.BinaryAccuracy(), tf.keras.metrics.TruePositives(), tf.keras.metrics.TrueNegatives(), tf.keras.metrics.FalseNegatives(), tf.keras.metrics.FalsePositives()]


    # Running with SGD optimizer
    model.compile(optimizer=custom_optimizer,
                  loss=keras.losses.binary_crossentropy, metrics='accuracy')
    
    
    return model

In [13]:
# Initialize the tuner
tuner = RandomSearch(
    model_builder,
    # understand 'objective' should be converted to binary
    objective=Objective(tfa.metrics.F1Score(name='val_loss', num_classes=1, average='macro',threshold=0.5), direction=max), # 'val_accuracy'
    max_trials=10,  # Adjust the number of trials as needed
    directory='adam_accuracy_directory/logs'  # For accuracy
)

Reloading Tuner from adam_accuracy_directory/logs\untitled_project\tuner0.json


In [14]:
# Define callbacks, such as early stopping, if necessary

combined = [tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=5)]

# Start the tuning process
tuner.search(train_ds, epochs=10, validation_data=(
    val_ds), callbacks=combined)

In [15]:
# Each trial tests different parameters over 10 epochs
tuner.results_summary()

# SGD
# Trial 06 summary
# Hyperparameters:
# units: 128
# learning_rate: 0.01
# beta_1: 0.9
# beta_2: 0.999
# epsilon: 6.329346769904573e-09
# Score: 0.8500000238418579

# Adam
# Trial 05 summary
# Hyperparameters:
# units: 288
# learning_rate: 0.001
# beta_1: 0.9
# beta_2: 0.999
# epsilon: 6.297480105726857e-08
# Score: 0.8500000238418579


Results summary
Results in adam_accuracy_directory/logs\untitled_project
Showing 10 best trials
Objective(name="F1Score(name=val_loss,dtype=float32,num_classes=1,average=macro,threshold=0.5)", direction="<built-in function max>")

Trial 01 summary
Hyperparameters:
units: 384
learning_rate: 0.001
beta_1: 0.999
beta_2: 0.9999
epsilon: 5.955380768793277e-08
Score: 0.800000011920929

Trial 04 summary
Hyperparameters:
units: 256
learning_rate: 0.01
beta_1: 0.99
beta_2: 0.9999
epsilon: 1.8779680855635415e-09
Score: 0.800000011920929

Trial 06 summary
Hyperparameters:
units: 64
learning_rate: 0.0001
beta_1: 0.99
beta_2: 0.999
epsilon: 5.294146043098329e-08
Score: 0.800000011920929

Trial 09 summary
Hyperparameters:
units: 256
learning_rate: 0.0001
beta_1: 0.9
beta_2: 0.999
epsilon: 1.376496785140196e-08
Score: 0.8166666626930237

Trial 03 summary
Hyperparameters:
units: 480
learning_rate: 0.0001
beta_1: 0.99
beta_2: 0.999
epsilon: 1.0407582549292059e-08
Score: 0.8166666626930237

Trial 02 sum

In [66]:
tuner.search_space_summary()

Search space summary
Default search space size: 5
units (Int)
{'default': None, 'conditions': [], 'min_value': 32, 'max_value': 512, 'step': 32, 'sampling': 'linear'}
learning_rate (Choice)
{'default': 0.01, 'conditions': [], 'values': [0.01, 0.001, 0.0001], 'ordered': True}
beta_1 (Choice)
{'default': 0.9, 'conditions': [], 'values': [0.9, 0.99, 0.999], 'ordered': True}
beta_2 (Choice)
{'default': 0.999, 'conditions': [], 'values': [0.999, 0.9999], 'ordered': True}
epsilon (Float)
{'default': 1e-10, 'conditions': [], 'min_value': 1e-10, 'max_value': 1e-07, 'step': None, 'sampling': 'linear'}


In [None]:
# Start Tensorboard
%load_ext tensorboard

In [None]:
# Get the best hyperparameters
best_hps = tuner.get_best_hyperparameters(num_trials=1)[0]
print("Best Hyperparameters:")
print(best_hps)

# Build and compile the final model with the best hyperparameters
best_model = tuner.hypermodel.build(best_hps)
"""best_model.compile(
    optimizer=best_hps['optimizer'], loss='binary_crossentropy', metrics=[keras.metrics.Recall()])"""

Best Hyperparameters:
<keras_tuner.src.engine.hyperparameters.hyperparameters.HyperParameters object at 0x0000019C99F8CB50>


AttributeError: 'Sequential' object has no attribute '_nested_inputs'

In [None]:
# Train the final model
history = best_model.fit(x=train_ds, epochs=10, validation_data=(val_ds))
val_acc_per_epoch = history.history['val_accuracy']
best_epoch = val_acc_per_epoch.index(max(val_acc_per_epoch)) + 1

print('Best Epoch: %d' % (best_epoch,))