In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
!pip install opencv-python-headless



In [None]:
!pip install tqdm



In [None]:
import os
import cv2
import numpy as np
import pandas as pd

from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt

import tensorflow as tf
from tensorflow.keras import layers, models, Sequential
from tensorflow.keras.layers import TimeDistributed, LSTM, Dense, Flatten, Dropout
from tensorflow.keras.callbacks import EarlyStopping

from tensorflow.keras.applications import EfficientNetB0, EfficientNetV2B0, EfficientNetV2S

from tqdm import tqdm

In [None]:
def load_video_paths_with_label(data_dir):
    labels = {'Defensive Shot': 0, 'Pull Shot': 1, 'Drive Shot': 2, 'Flick Shot': 3}
    video_paths_with_label = {}

    for label in tqdm(labels):
        videos_dir = os.path.join(data_dir, label)
        videos = os.listdir(videos_dir)

        video_paths_with_label[labels[label]] = []

        for video in videos:
            video_path = os.path.join(videos_dir, video)
            video_paths_with_label[labels[label]].append(video_path)

    return video_paths_with_label

In [None]:
def prepare_train_test_split(video_paths_with_label, test_size=0.2, random_state=42):
    paths_train = []
    paths_test = []
    labels_train = []
    labels_test = []

    for label in tqdm(video_paths_with_label):
        label_video_paths = video_paths_with_label[label]
        label_paths_train, label_paths_val = train_test_split(label_video_paths, test_size=test_size, random_state=random_state)

        paths_train.extend(label_paths_train)
        paths_test.extend(label_paths_val)

        labels_train.extend([label] * len(label_paths_train))
        labels_test.extend([label] * len(label_paths_val))

    return paths_train, paths_test, labels_train, labels_test

In [None]:
def preprocess_video(video_path, num_frames=16, frame_size=(224, 224)):
    # Load the video
    cap = cv2.VideoCapture(video_path)
    frames = []
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

    # Calculate the interval to extract the required number of frames
    frame_interval = max(total_frames // num_frames, 1)

    for i in range(0, total_frames, frame_interval):
        cap.set(cv2.CAP_PROP_POS_FRAMES, i)
        ret, frame = cap.read()
        if not ret:
            break

        # Resize the frame
        frame = cv2.resize(frame, frame_size)
        # Normalize the frame (scaling pixel values to [0, 1])
        # frame = frame.astype('float32') / 255.0
        frames.append(frame)

        # Stop if we have enough frames
        if len(frames) == num_frames:
            break

    cap.release()

    # Ensure we have exactly num_frames by padding with the last frame if necessary
    while len(frames) < num_frames:
        frames.append(frames[-1])

    # Convert to numpy array
    frames = np.array(frames)
    # Add batch dimension
    # frames = np.expand_dims(frames, axis=0)
    return frames

In [None]:
def video_data_generator(video_paths, labels, num_frames=16, frame_size=(224, 224)):
    batch_videos = []
    batch_labels = []

    for idx in tqdm(range(len(labels))):
        video_path = video_paths[idx]
        frames = preprocess_video(video_path, num_frames, frame_size)

        batch_videos.append(frames)
        batch_labels.append(labels[idx])

    # batch_videos = np.vstack(batch_videos)
    batch_videos = np.array(batch_videos)
    batch_labels = np.array(batch_labels)

    return batch_videos, batch_labels

In [None]:
def prepare_train_test_data():
    frame_size = (224, 224)
    frame_channel = 3
    num_frames = 16

    data_dir = '/content/drive/My Drive/ShotDetection'
    data_paths = load_video_paths_with_label(data_dir)

    paths_train, paths_test, labels_train, labels_test = prepare_train_test_split(data_paths)

    X_train, y_train = video_data_generator(paths_train, labels_train, num_frames, frame_size)
    X_test, y_test = video_data_generator(paths_test, labels_test, num_frames, frame_size)

    X_train = np.reshape(X_train, (X_train.shape[0], num_frames, frame_size[0], frame_size[1], frame_channel))
    X_test = np.reshape(X_test, (X_test.shape[0], num_frames, frame_size[0], frame_size[1], frame_channel))

    return X_train, y_train, X_test, y_test

In [None]:
X_train, y_train, X_test, y_test = prepare_train_test_data()

100%|██████████| 4/4 [00:00<00:00, 14.59it/s]
100%|██████████| 4/4 [00:00<00:00, 1021.20it/s]
100%|██████████| 1020/1020 [11:36<00:00,  1.46it/s]
100%|██████████| 258/258 [03:04<00:00,  1.39it/s]


In [None]:
print(X_train.shape)
print(y_train.shape)

(1020, 16, 224, 224, 3)
(1020,)


In [None]:
print(X_test.shape)
print(y_test.shape)

(258, 16, 224, 224, 3)
(258,)


In [None]:
input_shape = X_train.shape[1:]
input_shape

(16, 224, 224, 3)

In [None]:
# def mb_conv_block(x, filters, kernel_size, strides, expansion, block_id):
#     shortcut = x
#     prefix = 'block_{}_'.format(block_id)

#     # Expansion phase
#     if expansion != 1:
#         x = layers.Conv3D(expansion * x.shape[-1], 1, padding='same', use_bias=False, name=prefix + 'expand')(x)
#         x = layers.BatchNormalization(name=prefix + 'expand_bn')(x)
#         x = layers.Activation('relu', name=prefix + 'expand_relu')(x)

#     # Depthwise Convolution
#     x = layers.Conv3D(filters, kernel_size, strides=strides, padding='same', use_bias=False, name=prefix + 'dwconv')(x)
#     x = layers.BatchNormalization(name=prefix + 'dwconv_bn')(x)
#     x = layers.Activation('relu', name=prefix + 'dwconv_relu')(x)

#     # Project
#     x = layers.Conv3D(filters, 1, padding='same', use_bias=False, name=prefix + 'project')(x)
#     x = layers.BatchNormalization(name=prefix + 'project_bn')(x)

#     if strides == 1 and shortcut.shape[-1] == filters:
#         x = layers.Add(name=prefix + 'add')([shortcut, x])

#     return x

# def create_MAT_effNet_model(input_shape=(16, 224, 224, 3), num_classes=101):
#     inputs = layers.Input(shape=input_shape)

#     # Stage 0
#     x = inputs

#     # Stage 1
#     x = layers.Conv3D(32, (3, 3, 3), strides=(1, 2, 2), padding='same', name='stage1_conv')(x)
#     x = layers.BatchNormalization(name='stage1_bn')(x)
#     x = layers.Activation('relu', name='stage1_relu')(x)

#     # Stage 2
#     x = mb_conv_block(x, 16, (3, 3, 3), 1, 1, 2)

#     # Stage 3
#     x = mb_conv_block(x, 24, (3, 3, 3), (1, 2, 2), 6, 3)
#     x = mb_conv_block(x, 24, (3, 3, 3), 1, 6, 4)

#     # Stage 4
#     x = mb_conv_block(x, 40, (5, 5, 5), (1, 2, 2), 6, 5)
#     x = mb_conv_block(x, 40, (5, 5, 5), 1, 6, 6)

#     # Stage 5
#     x = mb_conv_block(x, 80, (3, 3, 3), (1, 2, 2), 6, 7)
#     x = mb_conv_block(x, 80, (3, 3, 3), 1, 6, 8)
#     x = mb_conv_block(x, 80, (3, 3, 3), 1, 6, 9)

#     # Stage 6
#     x = mb_conv_block(x, 112, (5, 5, 5), 1, 6, 10)
#     x = mb_conv_block(x, 112, (5, 5, 5), 1, 6, 11)
#     x = mb_conv_block(x, 112, (5, 5, 5), 1, 6, 12)

#     # Stage 7
#     x = mb_conv_block(x, 192, (5, 5, 5), (1, 2, 2), 6, 13)
#     x = mb_conv_block(x, 192, (5, 5, 5), 1, 6, 14)
#     x = mb_conv_block(x, 192, (5, 5, 5), 1, 6, 15)
#     x = mb_conv_block(x, 192, (5, 5, 5), 1, 6, 16)

#     # Stage 8
#     x = mb_conv_block(x, 320, (3, 3, 3), 1, 6, 17)

#     # Stage 9
#     x = layers.Conv3D(1280, 1, padding='same', name='stage9_conv')(x)
#     x = layers.BatchNormalization(name='stage9_bn')(x)
#     x = layers.Activation('relu', name='stage9_relu')(x)
#     x = layers.GlobalAveragePooling3D(name='stage9_pool')(x)

#     # Stage 10: Multi-head attention layer
#     x = layers.Reshape((1, 1280))(x)
#     attention_output = layers.MultiHeadAttention(num_heads=8, key_dim=128, name='attention_layer')(x, x)
#     x = layers.Reshape((512,))(attention_output)

#     # Stage 11: Fully connected layer and softmax
#     outputs = layers.Dense(num_classes, activation='softmax', name='fc_softmax')(x)

#     model = models.Model(inputs, outputs)
#     return model

In [None]:
# def mb_conv_block(x, filters, kernel_size, strides, expansion, block_id):
#     shortcut = x
#     prefix = 'block_{}_'.format(block_id)

#     # Expansion phase
#     if expansion != 1:
#         x = layers.Conv2D(expansion * x.shape[-1], 1, padding='same', use_bias=False, name=prefix + 'expand')(x)
#         x = layers.BatchNormalization(name=prefix + 'expand_bn')(x)
#         x = layers.Activation('relu', name=prefix + 'expand_relu')(x)

#     # Depthwise Convolution
#     x = layers.DepthwiseConv2D(kernel_size, strides=strides, padding='same', use_bias=False, name=prefix + 'dwconv')(x)
#     x = layers.BatchNormalization(name=prefix + 'dwconv_bn')(x)
#     x = layers.Activation('relu', name=prefix + 'dwconv_relu')(x)

#     # Project
#     x = layers.Conv2D(filters, 1, padding='same', use_bias=False, name=prefix + 'project')(x)
#     x = layers.BatchNormalization(name=prefix + 'project_bn')(x)

#     if strides == 1 and shortcut.shape[-1] == filters:
#         x = layers.Add(name=prefix + 'add')([shortcut, x])

#     return x

# def create_MAT_effNet_model(input_shape=(16, 224, 224, 3), num_classes=101):
#     inputs = layers.Input(shape=input_shape)
#     x = inputs
#     frames = []

#     # Process each frame individually
#     for i in range(input_shape[0]):
#         frame = layers.Lambda(lambda x: x[:, i])(x)

#         # Stage 1
#         frame = layers.Conv2D(32, (3, 3), strides=(2, 2), padding='same', name=f'stage1_conv_{i}')(frame)
#         frame = layers.BatchNormalization(name=f'stage1_bn_{i}')(frame)
#         frame = layers.Activation('relu', name=f'stage1_relu_{i}')(frame)

#         # Stage 2
#         frame = mb_conv_block(frame, 16, (3, 3), 1, 1, f'2_{i}')

#         # Stage 3
#         frame = mb_conv_block(frame, 24, (3, 3), 2, 6, f'3_{i}')
#         frame = mb_conv_block(frame, 24, (3, 3), 1, 6, f'4_{i}')

#         # Stage 4
#         frame = mb_conv_block(frame, 40, (5, 5), 2, 6, f'5_{i}')
#         frame = mb_conv_block(frame, 40, (5, 5), 1, 6, f'6_{i}')

#         # Stage 5
#         frame = mb_conv_block(frame, 80, (3, 3), 2, 6, f'7_{i}')
#         frame = mb_conv_block(frame, 80, (3, 3), 1, 6, f'8_{i}')
#         frame = mb_conv_block(frame, 80, (3, 3), 1, 6, f'9_{i}')

#         # Stage 6
#         frame = mb_conv_block(frame, 112, (5, 5), 1, 6, f'10_{i}')
#         frame = mb_conv_block(frame, 112, (5, 5), 1, 6, f'11_{i}')
#         frame = mb_conv_block(frame, 112, (5, 5), 1, 6, f'12_{i}')

#         # Stage 7
#         frame = mb_conv_block(frame, 192, (5, 5), 2, 6, f'13_{i}')
#         frame = mb_conv_block(frame, 192, (5, 5), 1, 6, f'14_{i}')
#         frame = mb_conv_block(frame, 192, (5, 5), 1, 6, f'15_{i}')
#         frame = mb_conv_block(frame, 192, (5, 5), 1, 6, f'16_{i}')

#         # Stage 8
#         frame = mb_conv_block(frame, 320, (3, 3), 1, 6, f'17_{i}')

#         # Stage 9
#         frame = layers.Conv2D(1280, 1, padding='same', name=f'stage9_conv_{i}')(frame)
#         frame = layers.BatchNormalization(name=f'stage9_bn_{i}')(frame)
#         frame = layers.Activation('relu', name=f'stage9_relu_{i}')(frame)
#         frame = layers.GlobalAveragePooling2D(name=f'stage9_pool_{i}')(frame)

#         frames.append(frame)

#     # Combine frames
#     x = tf.stack(frames, axis=1)  # shape: (batch_size, num_frames, 1280)

#     # Stage 10: Multi-head attention layer
#     attention_output = layers.MultiHeadAttention(num_heads=8, key_dim=128, name='attention_layer')(x, x)
#     x = layers.Reshape((input_shape[0], -1))(attention_output)  # Flatten along the last dimension

#     # Stage 11: Fully connected layer and softmax
#     outputs = layers.Dense(num_classes, activation='softmax', name='fc_softmax')(x)

#     model = models.Model(inputs, outputs)
#     return model

In [None]:
# def create_MAT_effNet_model(input_shape=(16, 224, 224, 3), num_classes=101):
#     base_model = EfficientNetB0(include_top=False, weights=None, input_shape=input_shape[1:])
#     base_model.trainable = True

#     inputs = layers.Input(shape=input_shape)
#     frames = []

#     # Process each frame individually
#     for i in range(input_shape[0]):
#         frame = layers.Lambda(lambda x: x[:, i])(inputs)
#         frame = base_model(frame, training=False)
#         frame = layers.GlobalAveragePooling2D()(frame)
#         frames.append(frame)

#     # Combine frames
#     x = tf.stack(frames, axis=1)  # shape: (batch_size, num_frames, 1280)

#     # Multi-head attention layer
#     attention_output = layers.MultiHeadAttention(num_heads=8, key_dim=128, name='attention_layer')(x, x)
#     x = layers.Flatten()(attention_output)  # Flatten along the last dimension

#     # Fully connected layer and softmax
#     outputs = layers.Dense(num_classes, activation='softmax', name='fc_softmax')(x)

#     model = models.Model(inputs, outputs)
#     return model

In [None]:
def create_MAT_effNet_model(input_shape=(16, 224, 224, 3), num_classes=101):
    model = Sequential()

    # Input Layer
    model.add(layers.Input(shape=input_shape))

    # TimeDistributed EfficientNetB0
    base_model = EfficientNetV2B0(include_top=False, weights=None, input_shape=input_shape[1:])
    base_model.trainable = True

    model.add(TimeDistributed(base_model))
    model.add(TimeDistributed(layers.GlobalAveragePooling2D()))

    # Combine frames
    # For attention, we need to switch from Sequential to functional API for this step
    inputs = model.input
    x = model.output

    # Multi-head attention layer
    attention_output = layers.MultiHeadAttention(num_heads=8, key_dim=128, name='attention_layer')(x, x)
    x = layers.Flatten()(attention_output)

    # Fully connected layer and softmax
    outputs = layers.Dense(num_classes, activation='softmax', name='fc_softmax')(x)

    model = models.Model(inputs, outputs)
    return model

In [None]:
# Load the model
num_classes = 4
# num_classes = 101  # or 51
model = create_MAT_effNet_model(input_shape=input_shape, num_classes=num_classes)
model.summary()

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_1 (InputLayer)        [(None, 16, 224, 224, 3)]    0         []                            
                                                                                                  
 time_distributed (TimeDist  (None, 16, 7, 7, 1280)       4049571   ['input_1[0][0]']             
 ributed)                                                                                         
                                                                                                  
 time_distributed_1 (TimeDi  (None, 16, 1280)             0         ['time_distributed[0][0]']    
 stributed)                                                                                       
                                                                                              

In [None]:
# Training the model
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])

history = model.fit(X_train, y_train, epochs=10, batch_size=16)

NameError: name 'model' is not defined