In [None]:
import tqdm
import random
import pathlib
import itertools
import collections

import cv2
import einops
import numpy as np
import remotezip as rz
import seaborn as sns
import matplotlib.pyplot as plt

import tensorflow as tf
import keras
from keras import layers

In [None]:
import os
import cv2
import numpy as np
import tensorflow as tf
from sklearn.model_selection import train_test_split

NUM_FRAMES = 10
IMG_SIZE = 112

def get_video_paths_and_labels(data_dir):
    class_names = sorted(os.listdir(data_dir))
    video_paths = []
    labels = []
    
    for label_index, class_name in enumerate(class_names):
        class_dir = os.path.join(data_dir, class_name)
        if os.path.isdir(class_dir):
            for fname in os.listdir(class_dir):
                if fname.endswith(".mp4"):
                    video_paths.append(os.path.join(class_dir, fname))
                    labels.append(label_index)

    return video_paths, labels, class_names

# Modify load_video to handle TensorFlow tensors
def load_video(path, max_frames=NUM_FRAMES, resize=(IMG_SIZE, IMG_SIZE)):
    path = path.numpy().decode('utf-8')  # Convert Tensor -> string
    cap = cv2.VideoCapture(path)
    frames = []
    try:
        while len(frames) < max_frames:
            ret, frame = cap.read()
            if not ret:
                break
            frame = cv2.resize(frame, resize)
            frame = frame / 255.0  # Normalize
            frames.append(frame)
    finally:
        cap.release()

    # Pad if not enough frames
    while len(frames) < max_frames:
        frames.append(np.zeros((resize[1], resize[0], 3)))

    return np.array(frames, dtype=np.float32)

# Wrapper for using tf.py_function
def load_video_py(path):
    video = tf.py_function(func=load_video, inp=[path], Tout=tf.float32)
    video.set_shape((NUM_FRAMES, IMG_SIZE, IMG_SIZE, 3))  # Set shape of the returned tensor
    return video

# Create dataset from video paths and labels
def make_dataset(video_paths, labels):
    dataset = tf.data.Dataset.from_tensor_slices((video_paths, labels))

    def _process(path, label):
        video = load_video_py(path)  # Call the updated load_video function
        return video, label

    dataset = dataset.map(_process, num_parallel_calls=tf.data.AUTOTUNE)
    return dataset

# Load paths and labels
video_paths, labels, class_names = get_video_paths_and_labels("cleaned_dataset")

# Split the dataset into training and validation sets
train_paths, val_paths, train_labels, val_labels = train_test_split(
    video_paths, labels, test_size=0.2, stratify=labels, random_state=42
)

# Create TensorFlow datasets
train_ds = make_dataset(train_paths, train_labels).shuffle(100).batch(8).prefetch(tf.data.AUTOTUNE)
val_ds = make_dataset(val_paths, val_labels).batch(8).prefetch(tf.data.AUTOTUNE)


In [None]:
# Define the dimensions of one frame in the set of frames created
HEIGHT = 112
WIDTH = 112

In [None]:
class Conv2Plus1D(keras.layers.Layer):
  def __init__(self, filters, kernel_size, padding):
    """
      A sequence of convolutional layers that first apply the convolution operation over the
      spatial dimensions, and then the temporal dimension. 
    """
    super().__init__()
    self.seq = keras.Sequential([  
        # Spatial decomposition
        layers.Conv3D(filters=filters,
                      kernel_size=(1, kernel_size[1], kernel_size[2]),
                      padding=padding),
        # Temporal decomposition
        layers.Conv3D(filters=filters, 
                      kernel_size=(kernel_size[0], 1, 1),
                      padding=padding)
        ])

  def call(self, x):
    return self.seq(x)
  
class ResidualMain(keras.layers.Layer):
  """
    Residual block of the model with convolution, layer normalization, and the
    activation function, ReLU.
  """
  def __init__(self, filters, kernel_size):
    super().__init__()
    self.seq = keras.Sequential([
        Conv2Plus1D(filters=filters,
                    kernel_size=kernel_size,
                    padding='same'),
        layers.LayerNormalization(),
        layers.ReLU(),
        Conv2Plus1D(filters=filters, 
                    kernel_size=kernel_size,
                    padding='same'),
        layers.LayerNormalization()
    ])

  def call(self, x):
    return self.seq(x)
  

class Project(keras.layers.Layer):
  """
    Project certain dimensions of the tensor as the data is passed through different 
    sized filters and downsampled. 
  """
  def __init__(self, units):
    super().__init__()
    self.seq = keras.Sequential([
        layers.Dense(units),
        layers.LayerNormalization()
    ])

  def call(self, x):
    return self.seq(x)
  
def add_residual_block(input, filters, kernel_size):
  """
    Add residual blocks to the model. If the last dimensions of the input data
    and filter size does not match, project it such that last dimension matches.
  """
  out = ResidualMain(filters, 
                     kernel_size)(input)

  res = input
  # Using the Keras functional APIs, project the last dimension of the tensor to
  # match the new filter size
  if out.shape[-1] != input.shape[-1]:
    res = Project(out.shape[-1])(res)

  return layers.add([res, out])


class ResizeVideo(keras.layers.Layer):
  def __init__(self, height, width):
    super().__init__()
    self.height = height
    self.width = width
    self.resizing_layer = layers.Resizing(self.height, self.width)

  def call(self, video):
    """
      Use the einops library to resize the tensor.  

      Args:
        video: Tensor representation of the video, in the form of a set of frames.

      Return:
        A downsampled size of the video according to the new height and width it should be resized to.
    """
    # b stands for batch size, t stands for time, h stands for height, 
    # w stands for width, and c stands for the number of channels.
    old_shape = einops.parse_shape(video, 'b t h w c')
    images = einops.rearrange(video, 'b t h w c -> (b t) h w c')
    images = self.resizing_layer(images)
    videos = einops.rearrange(
        images, '(b t) h w c -> b t h w c',
        t = old_shape['t'])
    return videos


input_shape = (None, 10, HEIGHT, WIDTH, 3)
input = layers.Input(shape=(input_shape[1:]))
x = input

x = Conv2Plus1D(filters=16, kernel_size=(3, 7, 7), padding='same')(x)
x = layers.BatchNormalization()(x)
x = layers.ReLU()(x)
x = ResizeVideo(HEIGHT // 2, WIDTH // 2)(x)

# Block 1
x = add_residual_block(x, 16, (3, 3, 3))
x = ResizeVideo(HEIGHT // 4, WIDTH // 4)(x)

# Block 2
x = add_residual_block(x, 32, (3, 3, 3))
x = ResizeVideo(HEIGHT // 8, WIDTH // 8)(x)

# Block 3
x = add_residual_block(x, 64, (3, 3, 3))
x = ResizeVideo(HEIGHT // 16, WIDTH // 16)(x)

# Block 4
x = add_residual_block(x, 128, (3, 3, 3))

x = layers.GlobalAveragePooling3D()(x)
x = layers.Flatten()(x)
x = layers.Dense(4)(x)

model = keras.Model(input, x)



In [None]:
frames, label = next(iter(train_ds))
model.build(frames)

In [None]:
# Visualize the model
keras.utils.plot_model(model, expand_nested=True, dpi=60, show_shapes=True)

In [None]:
model.compile(loss = keras.losses.SparseCategoricalCrossentropy(from_logits=True), 
              optimizer = keras.optimizers.Adam(learning_rate = 0.0001), 
              metrics = ['accuracy'])

In [None]:
history = model.fit(x = train_ds,
                    epochs = 50, 
                    validation_data = val_ds)

In [None]:

# Save model 
model.save("Project_Model/second_chance2.h5")
model.save("Project_Model/second_chance4.keras")
model.export("saved_model2")
model.summary()

In [None]:
# Plot training & validation accuracy and loss
acc = history.history['accuracy']
val_acc = history.history['val_accuracy']
loss = history.history['loss']
val_loss = history.history['val_loss']

plt.figure(figsize=(12, 6))
plt.subplot(1, 2, 1)
plt.plot(acc, label='Training Accuracy')
plt.plot(val_acc, label='Validation Accuracy')
plt.title('Training and Validation Accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()

plt.subplot(1, 2, 2)
plt.plot(loss, label='Training Loss')
plt.plot(val_loss, label='Validation Loss')
plt.title('Training and Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()

plt.tight_layout()
plt.show()

In [None]:
# Evaluate the model on the validation dataset
model.evaluate(val_ds, return_dict=True)

In [None]:
from sklearn.metrics import confusion_matrix

# Assume `model` is your trained model

# Initialize lists to hold true labels and predicted labels
y_true_classes = []
y_pred_classes = []

# Iterate through the validation dataset
for video_batch, label_batch in val_ds:
    # Make predictions on the batch
    y_pred = model.predict(video_batch)

    # Get the predicted classes (since it's multi-class, use argmax)
    y_pred_classes_batch = np.argmax(y_pred, axis=1)

    # Get the true classes (already in integer form, no need to do argmax)
    y_true_classes_batch = label_batch.numpy()

    # Append the batch results to the lists
    y_true_classes.extend(y_true_classes_batch)
    y_pred_classes.extend(y_pred_classes_batch)

# Now generate the confusion matrix
cm = confusion_matrix(y_true_classes, y_pred_classes)

# Define your labels (if they are not predefined, replace this with the actual class labels)
labels = class_names  # Using the class names from your dataset

# Plot confusion matrix
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=labels, yticklabels=labels)
plt.title('Confusion Matrix')
plt.xlabel('Predicted Label')
plt.ylabel('True Label')
plt.show()