In [10]:
import os
import numpy as np
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.layers import Conv2D, Dropout, MultiHeadAttention, TimeDistributed, MaxPool2D, BatchNormalization, Dense, Input, Reshape, Flatten, Add, LayerNormalization
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, TensorBoard
from tensorflow.distribute import MirroredStrategy
from tensorflow.keras.applications.resnet50 import ResNet50, preprocess_input
from sklearn.model_selection import train_test_split
import time
from datetime import datetime

In [8]:
data_dir = 'saved_np_data/'
features_path = os.path.join(data_dir, 'features.npy')
labels_path = os.path.join(data_dir, 'labels.npy')

features = np.load(features_path)
labels = np.load(labels_path)

X_train, X_test_val, y_train, y_test_val = train_test_split(features, labels, test_size = 0.3)

X_val, X_test, y_val, y_test = train_test_split(X_test_val, y_test_val, test_size = 0.7)


print("X_train shape:", X_train.shape)
print("y_train shape:", y_train.shape)

print("X_val shape:", X_val.shape)
print("y_val shape:", y_val.shape)

print("X_test shape:", X_test.shape)
print("y_test shape:", y_test.shape)

X_train shape: (779, 10, 100, 100, 3)
y_train shape: (779,)
X_val shape: (100, 10, 100, 100, 3)
y_val shape: (100,)
X_test shape: (234, 10, 100, 100, 3)
y_test shape: (234,)


In [3]:
def spatio_temporal_transformer_block(inputs, num_heads=8, key_dim=64, ff_dim=64, dropout_rate=0.1):
    # LayerNormalization and Multi-Head Self-Attention
    x = LayerNormalization(epsilon=1e-6)(inputs)
    attention_output_1 = MultiHeadAttention(
        key_dim=key_dim, num_heads=num_heads, dropout=dropout_rate
    )(x, x)
    
    # Residual Connection
    attention_output_1 = Add()([inputs, attention_output_1])
    
    # Second LayerNormalization and Multi-Head Self-Attention
    x = LayerNormalization(epsilon=1e-6)(attention_output_1)
    attention_output_2 = MultiHeadAttention(
        key_dim=key_dim, num_heads=num_heads, dropout=dropout_rate
    )(x, x)
    
    # Residual Connection
    attention_output_2 = Add()([attention_output_1, attention_output_2])
    
    # LayerNormalization and Feed Forward Network
    x = LayerNormalization(epsilon=1e-6)(attention_output_2)
    x = Conv1D(filters=ff_dim, kernel_size=1, activation="relu")(x)
    x = Dropout(dropout_rate)(x)
    x = Conv1D(filters=inputs.shape[-1], kernel_size=1)(x)
    
    # Residual Connection
    x = keras.layers.Add()([attention_output_2, x])
    
    return x

In [None]:
input_shape = (10, 100, 100, 3)
video_input = Input(shape=input_shape)

# Spatial Feature Extraction using Conv2D layers with BatchNormalization
x = TimeDistributed(Conv2D(32, 3, activation='relu'))(video_input)
x = Dropout(0.3)(x)
x = TimeDistributed(BatchNormalization())(x)

x = TimeDistributed(Conv2D(64, 3, activation='relu'))(x)
x = Dropout(0.3)(x)
x = TimeDistributed(BatchNormalization())(x)

x = TimeDistributed(Conv2D(128, 3, activation='relu'))(x)
x = Dropout(0.3)(x)
x = TimeDistributed(BatchNormalization())(x)

# Reshape the output from the previous layers to be 3D (batch_size * num_tokens, height * width, channels)
x = TimeDistributed(Flatten())(x)

# Multi-Head Attention for Temporal Feature Extraction
num_heads = 8  
key_dim = 64  

# Create the MultiHeadAttention layer
attention_layer = MultiHeadAttention(key_dim=key_dim, num_heads=num_heads)

# Reshape the output from the previous layers to be 3D (batch_size * num_tokens, height * width, channels)
attention_input = Reshape((-1, x.shape[-1]))(x)

# Apply Multi-Head Attention
attention_output = attention_layer(attention_input, attention_input)

x = Flatten()(attention_output)

x = Dense(512, activation = 'relu')(x)
x = Dropout(0.3)(x)
output = Dense(7, activation = 'softmax')(x) 

# model.summary()

timestamp = time.time()
datetime_obj = datetime.fromtimestamp(timestamp)
fmt_time = datetime_obj.strftime('%m-%d %H:%M:%S')

tb = TensorBoard(log_dir=f'logs/{fmt_time}')

es = EarlyStopping(monitor = 'val_loss', patience = 15)

ckpt = ModelCheckpoint(filepath = 'cpkts/model-{epoch:02d}-{val_loss:.4f}.hdf5',
                      monitor = 'val_loss',
                      mode = 'min',
                      save_best_only = True,
                      verbose = 1)


strat = MirroredStrategy()

with strat.scope():
   
    model = Model(inputs=video_input, outputs=output) 
    model.compile(optimizer = 'adam', loss = 'sparse_categorical_crossentropy', metrics = 'accuracy')


BATCH_SIZE = 128 * strat.num_replicas_in_sync

model.fit(X_train, y_train, validation_split = 0.2, batch_size = BATCH_SIZE, epochs = 5000, callbacks = [tb, es, ckpt])
