In [8]:
from tensorflow.keras import layers
from tensorflow import keras

import matplotlib.pyplot as plt
import tensorflow as tf
import pandas as pd
import numpy as np
import imageio
import cv2
import time 
import os
import math
import random

In [9]:
MAX_SEQ_LENGTH = 55
NUM_FEATURES = 512
IMG_SIZE = 128
EPOCHS = 20

In [12]:
import numpy as np
features = np.load("extracted_features.npz")
train_data, train_labels,test_data, test_labels = features["arr_0"], features["arr_1"], features["arr_2"], features["arr_3"]

In [13]:
class PositionalEmbedding(layers.Layer):
    def __init__(self, sequence_length, output_dim, **kwargs):
        super(PositionalEmbedding,self).__init__(**kwargs)
        self.position_embeddings = layers.Embedding(
            input_dim=sequence_length, output_dim=output_dim
        )
        self.sequence_length = sequence_length
        self.output_dim = output_dim

    def call(self, inputs):
        # The inputs are of shape: `(batch_size, frames, num_features)`
        length = tf.shape(inputs)[1]
        positions = tf.range(start=0, limit=length, delta=1)
        embedded_positions = self.position_embeddings(positions)
        return inputs + embedded_positions

    def compute_mask(self, inputs, mask=None):
        mask = tf.reduce_any(tf.cast(inputs, "bool"), axis=-1)
        return mask
    
    def get_config(self):
        config = super(PositionalEmbedding,self).get_config()
        config.update({
            "position_embeddings": self.position_embeddings,
            "sequence_length": self.sequence_length,
            "output_dim": self.output_dim
        })
        return config

In [14]:
class TransformerEncoder(layers.Layer):
    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
        super(TransformerEncoder,self).__init__(**kwargs)
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads
        self.attention = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim, dropout=0.5
        )
        self.dense_proj = keras.Sequential(
            [layers.Dense(dense_dim, activation=tf.nn.gelu),layers.Dropout(0.7), layers.Dense(embed_dim, activation=tf.nn.gelu),]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()

    def call(self, inputs, mask=None):
        if mask is not None:
            mask = mask[:, tf.newaxis, :]

        attention_output = self.attention(inputs, inputs, attention_mask=mask)
        proj_input = self.layernorm_1(inputs + attention_output)
        proj_output = self.dense_proj(proj_input)
        return self.layernorm_2(proj_input + proj_output)
    
    def get_config(self):
        config = super(TransformerEncoder,self).get_config()
        config.update({
            "embed_dim": self.embed_dim,
            "dense_dim": self.dense_dim,
            "num_heads": self.num_heads,
            "attention": self.attention,
            "layernorm_1":self.layernorm_1,
            "layernorm_2":self.layernorm_2,
        })
        return config

In [20]:
# kfold cross validation
from sklearn.model_selection import KFold
import numpy as np
num_folds = 5

# Define per-fold score containers
acc_per_fold = []
loss_per_fold = []


# inputs = np.expand_dims(inputs, 0)
# targets = np.expand_dims(targets, 0)

# Parse numbers as floats
train_data = train_data.astype('float32')
test_data = test_data.astype('float32')

# Normalize data
train_data = train_data / 255
test_data = test_data / 255

inputs = np.concatenate((train_data, test_data), axis=0)
targets = np.concatenate((train_labels, test_labels), axis=0)
print("inputs",inputs.shape,"targets",targets.shape)


# Define the K-fold Cross Validator
kfold = KFold(n_splits=num_folds, shuffle=True)
MAX_SEQ_LENGTH = 55
NUM_FEATURES = 512
IMG_SIZE = 128
EPOCHS = 20
# K-fold Cross Validation model evaluation
fold_no = 1
for train, test in kfold.split(inputs, targets):

    # Define the model architecture
        sequence_length = MAX_SEQ_LENGTH
        embed_dim = NUM_FEATURES
        dense_dim = 512  # definig dense layer 
        num_heads = 4 # defining  number of MultiHeadAttention layer
        # classes = len(label_processor.get_vocabulary())
        classes=30

        inputs = keras.Input(shape=(None, None))
        x = PositionalEmbedding(
            sequence_length, embed_dim, name="frame_position_embedding"
        )(inputs)
        x = TransformerEncoder(embed_dim, dense_dim, num_heads, name="transformer_layer")(x)
        x = layers.GlobalMaxPooling1D()(x)
        x = layers.Dropout(0.5)(x)
        
        
        outputs = layers.Dense(classes, activation="softmax")(x)
        model = keras.Model(inputs, outputs)

        model.compile(
            optimizer="adam", loss="sparse_categorical_crossentropy", metrics=["accuracy"]
        )

            # Generate a print
        print('------------------------------------------------------------------------')
        print(f'Training for fold {fold_no} ...')
        # import pdb;pdb.set_trace()
        history = model.fit(
          inputs[train],
          targets[train],
          # validation_split=0.15,
          epochs=20,
          # callbacks=[checkpoint],
          verbose=1,
          batch_size= math.ceil(len(train_data)/20))
          # number of train_data/ epochs number) 
        
        # Generate generalization metrics
        scores = model.evaluate(inputs[test], targets[test], verbose=0)
        print(f'Score for fold {fold_no}: {model.metrics_names[0]} of {scores[0]}; {model.metrics_names[1]} of {scores[1]*100}%')
        acc_per_fold.append(scores[1] * 100)
        loss_per_fold.append(scores[0]) 

          # Increase fold number
        fold_no = fold_no + 1

      # == Provide average scores ==
        print('------------------------------------------------------------------------')
        print('Score per fold')
        for i in range(0, len(acc_per_fold)):
          print('------------------------------------------------------------------------')
          print(f'> Fold {i+1} - Loss: {loss_per_fold[i]} - Accuracy: {acc_per_fold[i]}%')
        print('------------------------------------------------------------------------')
        print('Average scores for all folds:')
        print(f'> Accuracy: {np.mean(acc_per_fold)} (+- {np.std(acc_per_fold)})')
        print(f'> Loss: {np.mean(loss_per_fold)}')
        print('------------------------------------------------------------------------')





inputs (3600, 55, 512) targets (3600, 1)
------------------------------------------------------------------------
Training for fold 1 ...


TypeError: Exception encountered when calling layer "tf.__operators__.getitem_1" (type SlicingOpLambda).

Only integers, slices (`:`), ellipsis (`...`), tf.newaxis (`None`) and scalar tf.int32/tf.int64 tensors are valid indices, got array([   0,    1,    2, ..., 3596, 3597, 3598])

Call arguments received:
  • tensor=tf.Tensor(shape=(None, None, None), dtype=float32)
  • slice_spec=array([   0,    1,    2, ..., 3596, 3597, 3598])
  • var=None

In [None]:
# For model training
""" 
    This model architecture is for vgg16
    vgg16 give 512 features

"""
tf.keras.utils.set_random_seed(1024) # defining random seed 

"""
    Random seed is to generate same dataset in each shuffel while training.
    src: https://stackoverflow.com/questions/51249811/reproducible-results-in-tensorflow-with-tf-set-random-seed
"""


def get_compiled_model():
    sequence_length = MAX_SEQ_LENGTH
    embed_dim = NUM_FEATURES
    dense_dim = 512  # definig dense layer 
    num_heads = 4 # defining  number of MultiHeadAttention layer
    classes = len(label_processor.get_vocabulary())

    inputs = keras.Input(shape=(None, None))
    x = PositionalEmbedding(
        sequence_length, embed_dim, name="frame_position_embedding"
    )(inputs)
    x = TransformerEncoder(embed_dim, dense_dim, num_heads, name="transformer_layer")(x)
    x = layers.GlobalMaxPooling1D()(x)
    x = layers.Dropout(0.5)(x)
    
    
    outputs = layers.Dense(classes, activation="softmax")(x)
    model = keras.Model(inputs, outputs)

    model.compile(
        optimizer="adam", loss="sparse_categorical_crossentropy", metrics=["accuracy"]
    )
    model.summary()
    return model


def run_experiment():
    filepath = "models/vgg16/ckpt/"
    checkpoint = keras.callbacks.ModelCheckpoint(
        filepath, save_weights_only=True, save_best_only=True, verbose=1
    )

    model = get_compiled_model()
    history = model.fit(
        train_data,
        train_labels,
        validation_split=0.15,
        epochs=20,
        callbacks=[checkpoint],
        batch_size= math.ceil(len(train_data)/EPOCHS) # number of train_data/ epochs number
    )

    model.load_weights(filepath)
    _, accuracy = model.evaluate(test_data, test_labels)
    print(f"Test accuracy: {round(accuracy * 100, 2)}%")

    return model, history