In [None]:
!pip install -q git+https://github.com/tensorflow/docs

## Setup

In [None]:
import os
import keras
from keras import layers
import tensorflow as tf
from keras.applications.densenet import DenseNet121
from sklearn.metrics import confusion_matrix, classification_report

from tensorflow_docs.vis import embed

import tensorflow as tf
from tensorflow import keras


import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import imageio
import cv2

## Check Version 

In [None]:
import keras
print(keras.__version__)

import tensorflow as tf
print(tf.__version__)

## Define hyperparameters

In [None]:
MAX_SEQ_LENGTH = 20
NUM_FEATURES = 1024
IMG_SIZE = 128
EPOCHS = 50

## Data preparation
- Reduce the image size to 128x128 instead to spead up
- Use a pre-trained DenseNet121 for feature extraction
- directly pad shorter videos to length MAX_SEQ_LENGTH

In [None]:
train_df = pd.read_csv("train.csv")
test_df = pd.read_csv("test.csv")

print(f"Total videos for training: {len(train_df)}")
print(f"Total videos for testing: {len(test_df)}")

center_crop_layer = layers.CenterCrop(IMG_SIZE, IMG_SIZE)


def crop_center(frame):
    cropped = center_crop_layer(frame[None, ...])
    cropped = keras.ops.convert_to_numpy(cropped)
    cropped = keras.ops.squeeze(cropped)
    return cropped



# Following method is modified from this tutorial:
# https://www.tensorflow.org/hub/tutorials/action_recognition_with_tf_hub
def load_video(path, max_frames=0, offload_to_cpu=False):
    cap = cv2.VideoCapture(path)
    frames = []
    try:
        while True:
            ret, frame = cap.read()
            if not ret:
                break
            frame = frame[:, :, [2, 1, 0]]
            frame = crop_center(frame)
            if offload_to_cpu and keras.backend.backend() == "torch":
                frame = frame.to("cpu")
            frames.append(frame)

            if len(frames) == max_frames:
                break
    finally:
        cap.release()
    if offload_to_cpu and keras.backend.backend() == "torch":
        return np.array([frame.to("cpu").numpy() for frame in frames])
    return np.array(frames)


def build_feature_extractor():
    feature_extractor = DenseNet121(
        weights="imagenet",
        include_top=False,
        pooling="avg",
        input_shape=(IMG_SIZE, IMG_SIZE, 3),
    )
    preprocess_input = keras.applications.densenet.preprocess_input

    inputs = keras.Input((IMG_SIZE, IMG_SIZE, 3))
    preprocessed = preprocess_input(inputs)

    outputs = feature_extractor(preprocessed)
    return keras.Model(inputs, outputs, name="feature_extractor")


feature_extractor = build_feature_extractor()


# Label preprocessing with StringLookup.
label_processor = keras.layers.StringLookup(
    num_oov_indices=0, vocabulary=np.unique(train_df["Label"]), mask_token=None
)
print(label_processor.get_vocabulary())


def prepare_all_videos(df, root_dir):
    num_samples = len(df)
    video_paths = df["Video_Path"].values.tolist()
    labels = df["Label"].values
    labels = label_processor(labels[..., None]).numpy()

    # `frame_features` are what we will feed to our sequence model.
    frame_features = np.zeros(
        shape=(num_samples, MAX_SEQ_LENGTH, NUM_FEATURES), dtype="float32"
    )

    # For each video.
    for idx, path in enumerate(video_paths):
        # Gather all its frames and add a batch dimension.
        frames = load_video(os.path.join(root_dir, path))

        # Pad shorter videos.
        if len(frames) < MAX_SEQ_LENGTH:
            diff = MAX_SEQ_LENGTH - len(frames)
            padding = np.zeros((diff, IMG_SIZE, IMG_SIZE, 3))
            frames = np.concatenate(frames, padding)

        frames = frames[None, ...]

        # Initialize placeholder to store the features of the current video.
        temp_frame_features = np.zeros(
            shape=(1, MAX_SEQ_LENGTH, NUM_FEATURES), dtype="float32"
        )

        # Extract features from the frames of the current video.
        for i, batch in enumerate(frames):
            video_length = batch.shape[0]
            length = min(MAX_SEQ_LENGTH, video_length)
            for j in range(length):
                if np.mean(batch[j, :]) > 0.0:
                    temp_frame_features[i, j, :] = feature_extractor.predict(
                        batch[None, j, :]
                    )

                else:
                    temp_frame_features[i, j, :] = 0.0

        frame_features[idx,] = temp_frame_features.squeeze()

    return frame_features, labels


## Show Sample From train_df

In [None]:
train_df.sample(10)

## Check Paths

In [None]:
# Function to check if paths exist
def check_paths(df, column_name):
    corrupted_paths = []
    for path in df[column_name]:
        if not os.path.exists(path):
            corrupted_paths.append(path)
    return corrupted_paths

# Check paths in train_df and test_df
train_corrupted = check_paths(train_df, 'Video_Path')
test_corrupted = check_paths(test_df, 'Video_Path')

# Print corrupted paths
print("Corrupted paths in train_df:")
for path in train_corrupted:
    print(path)

print("\nCorrupted paths in test_df:")
for path in test_corrupted:
    print(path)

## Display Sample From Dataset

In [None]:
import matplotlib.pyplot as plt

def show_sample_data(df, root_dir, num_samples=5):
    # Randomly select 'num_samples' videos
    sample_df = df.sample(n=num_samples)
    video_paths = sample_df['Video_Path'].values
    labels = sample_df['Label'].values
    
    for video_path, label in zip(video_paths, labels):
        frames = load_video(os.path.join(root_dir, video_path))
        
        # Skip videos with no frames
        if len(frames) == 0:
            print(f"Skipping video at path {video_path} because it has no frames.")
            continue
        
        # Display some information about the video
        print(f"Video Path: {video_path}")
        print(f"Label: {label}")
        print(f"Number of Frames: {len(frames)}")
        
        # Display the first few frames from the video
        fig, axes = plt.subplots(1, min(len(frames), 5), figsize=(15, 3))
        fig.suptitle(f"Sample Frames - Label: {label}")
        for j, frame in enumerate(frames[:5]):
            axes[j].imshow(frame)
            axes[j].axis('off')
        plt.show()

# Call the function with the training DataFrame and the root directory of your videos
show_sample_data(test_df, 'test', num_samples=5)


## Extract Feature From Train data and Test data
- After Extract Feature save it .npy file  

In [None]:
train_data, train_labels = prepare_all_videos(train_df, "name_of_dataset_dirictory")
np.save("train_data.npy", train_data)
np.save("train_labels.npy", train_labels)
test_data, test_labels = prepare_all_videos(test_df, "name_of_dataset_dirictory")
np.save("test_data.npy", test_data)
np.save("test_labels.npy", test_labels)

## Load .npy files 

In [None]:
train_data, train_labels = np.load("train_data.npy"), np.load("train_labels.npy")
test_data, test_labels = np.load("test_data.npy"), np.load("test_labels.npy")
print(f"Frame features in train set: {train_data.shape}")
print(f"Frame features in train set: {test_data.shape}")

## Building the Transformer

In [None]:
class PositionalEmbedding(layers.Layer):
    def __init__(self, sequence_length, output_dim, **kwargs):
        super().__init__(**kwargs)
        self.position_embeddings = layers.Embedding(
            input_dim=sequence_length, output_dim=output_dim
        )
        self.sequence_length = sequence_length
        self.output_dim = output_dim

    def build(self, input_shape):
        self.position_embeddings.build(input_shape)

    def call(self, inputs):
        # The inputs are of shape: `(batch_size, frames, num_features)`
        inputs = keras.ops.cast(inputs, self.compute_dtype)
        length = keras.ops.shape(inputs)[1]
        positions = keras.ops.arange(start=0, stop=length, step=1)
        embedded_positions = self.position_embeddings(positions)
        return inputs + embedded_positions

- Now, we can create a subclassed layer for the Transformer.

In [None]:
class TransformerEncoder(layers.Layer):
    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads
        self.attention = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim, dropout=0.3
        )
        self.dense_proj = keras.Sequential(
            [
                layers.Dense(dense_dim, activation=keras.activations.gelu),
                layers.Dense(embed_dim),
            ]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()

    def call(self, inputs, mask=None):
        attention_output = self.attention(inputs, inputs, attention_mask=mask)
        proj_input = self.layernorm_1(inputs + attention_output)
        proj_output = self.dense_proj(proj_input)
        return self.layernorm_2(proj_input + proj_output)

## Utility functions for Training

In [None]:
def get_compiled_model(shape):
    sequence_length = MAX_SEQ_LENGTH
    embed_dim = NUM_FEATURES
    dense_dim = 4
    num_heads = 1
    classes = len(label_processor.get_vocabulary())

    inputs = keras.Input(shape=shape)
    x = PositionalEmbedding(
        sequence_length, embed_dim, name="frame_position_embedding"
    )(inputs)
    x = TransformerEncoder(embed_dim, dense_dim, num_heads, name="transformer_layer")(x)
    x = layers.GlobalMaxPooling1D()(x)
    x = layers.Dropout(0.5)(x)
    outputs = layers.Dense(classes, activation="softmax")(x)
    model = keras.Model(inputs, outputs)

    model.compile(
        optimizer="adam",
        loss="sparse_categorical_crossentropy",
        metrics=["accuracy"],
    )
    return model


def run_experiment():
    filepath = "/tmp/video_classifier.weights.h5"
    checkpoint = keras.callbacks.ModelCheckpoint(
        filepath, save_weights_only=True, save_best_only=True, verbose=1
    )

    model = get_compiled_model(train_data.shape[1:])
    history = model.fit(
        train_data,
        train_labels,
        validation_split=0.15,
        epochs=EPOCHS,
        callbacks=[checkpoint],
    )

    model.load_weights(filepath)
    _, accuracy = model.evaluate(test_data, test_labels)
    print(f"Test accuracy: {round(accuracy * 100, 2)}%")
    
    training_accuracy = history.history['accuracy'][-1]
    print(f"Training accuracy: {round(training_accuracy * 100, 2)}%")
    
    
    # Predict test labels
    test_pred = model.predict(test_data)
    test_pred_classes = np.argmax(test_pred, axis=1)

    # Generate confusion matrix
    cm = confusion_matrix(test_labels, test_pred_classes)

    # Print classification report
    print("Classification Report:")
    print(classification_report(test_labels, test_pred_classes, target_names=label_processor.get_vocabulary(), zero_division=1))

    # Calculate accuracy for each class
    class_accuracy = np.diag(cm) / np.sum(cm, axis=1)
    class_accuracy_dict = {label: acc * 100 for label, acc in zip(label_processor.get_vocabulary(), class_accuracy)}
    print("Accuracy for each class:")
    print(class_accuracy_dict)

    return model


## Model training and inference

In [None]:
trained_model = run_experiment()

## Test Model On Single Video From Test dataset

In [None]:
def prepare_single_video(frames):
    frame_features = np.zeros(shape=(1, MAX_SEQ_LENGTH, NUM_FEATURES), dtype="float32")

    # Pad shorter videos.
    if len(frames) < MAX_SEQ_LENGTH:
        diff = MAX_SEQ_LENGTH - len(frames)
        padding = np.zeros((diff, IMG_SIZE, IMG_SIZE, 3))
        frames = np.concatenate(frames, padding)

    frames = frames[None, ...]

    # Extract features from the frames of the current video.
    for i, batch in enumerate(frames):
        video_length = batch.shape[0]
        length = min(MAX_SEQ_LENGTH, video_length)
        for j in range(length):
            if np.mean(batch[j, :]) > 0.0:
                frame_features[i, j, :] = feature_extractor.predict(batch[None, j, :])
            else:
                frame_features[i, j, :] = 0.0

    return frame_features


def predict_action(path):
    class_vocab = label_processor.get_vocabulary()

    frames = load_video(os.path.join("test", path), offload_to_cpu=True)
    frame_features = prepare_single_video(frames)
    probabilities = trained_model.predict(frame_features)[0]

    plot_x_axis, plot_y_axis = [], []

    for i in np.argsort(probabilities)[::-1]:
        plot_x_axis.append(class_vocab[i])
        plot_y_axis.append(probabilities[i])
        print(f"  {class_vocab[i]}: {probabilities[i] * 100:5.2f}%")
        
    plt.figure(figsize=(18, 6))
    plt.bar(plot_x_axis, plot_y_axis, label=plot_x_axis)
    plt.xlabel("class_label")
    plt.xlabel("Probability")
    plt.show()

    return frames


# This utility is for visualization.
# Referenced from:
# https://www.tensorflow.org/hub/tutorials/action_recognition_with_tf_hub
def to_gif(images):
    converted_images = images.astype(np.uint8)
    imageio.mimsave("animation.gif", converted_images, fps=10)
    return embed.embed_file("animation.gif")


test_video = np.random.choice(test_df["Video_Path"].values.tolist())
print(f"Test video path: {test_video}")
test_frames = predict_action(test_video)
to_gif(test_frames[:MAX_SEQ_LENGTH])