## Model

In [39]:
import cv2
import os
from sklearn.model_selection import train_test_split
import tensorflow as tf
import numpy as np
import keras
from keras import layers, ops
import matplotlib.pyplot as plt

SEED = 42
os.environ["TF_CUDNN_DETERMINISTIC"] = "1"
keras.utils.set_random_seed(SEED)

In [2]:
# retrieve所有video的名字

frames_path = 'data/frames/'
frames_path_normal = 'data/frames/Normal/'
frames_path_crash = 'data/frames/Crash/'

frames_name_normal = sorted([f for f in os.listdir(frames_path_normal)])
frames_name_crash = sorted([f for f in os.listdir(frames_path_crash)])

In [3]:
# 我只要50个video做train test split

frames_name_normal = frames_name_normal[1:26]
frames_name_crash = frames_name_crash[1:26]

In [4]:
# 三七分 train test split

train_normal, test_normal = train_test_split(frames_name_normal, \
                                     test_size=0.3, \
                                     random_state=42)

train_crash, test_crash = train_test_split(frames_name_crash, \
                                     test_size=0.3, \
                                     random_state=42)

In [38]:
# `##` --> Adjustable

# DATA
IMG_SIZE = 128  ## Image size (128, 128) in this case
CHAN_SIZE = 1   ## 1 - GrayScale; 3 - RGB
BATCH_SIZE = 8  ## 教程给的是 32
AUTO = tf.data.AUTOTUNE
INPUT_SHAPE = (50, IMG_SIZE, IMG_SIZE, CHAN_SIZE)
NUM_CLASSES = 2  ## Crash vs. Normal

# OPTIMIZER
LEARNING_RATE = 1e-4
WEIGHT_DECAY = 1e-5

# TRAINING
EPOCHS = 10  ## 教程给的好像是 100

# TUBELET EMBEDDING
PATCH_SIZE = (8, 8, 8)
NUM_PATCHES = (INPUT_SHAPE[0] // PATCH_SIZE[0]) ** 2

# ViViT ARCHITECTURE
LAYER_NORM_EPS = 1e-6
PROJECTION_DIM = 64  ## size of the feature vectors transformed from the input
NUM_HEADS = 4        ## 教程给的是 8
NUM_LAYERS = 6       ## 教程给的是 8

In [6]:
# Load data

def load_image(path):
    image = tf.io.read_file(path)
    image = tf.image.decode_jpeg(image, channels=CHAN_SIZE)
    image = tf.image.resize(image, [IMG_SIZE, IMG_SIZE])
    # image = image / 255.0  # Normalize to [0, 1]
    return image

train_videos = []
test_videos = []
train_labels = []
test_labels = []


for t in train_normal:
    video = []
    for i in range(50):
        current_frame_index = str(i)
        if (i < 10):
            video.append(load_image(frames_path_normal + t \
                                    + "/frame_000" + str(i) + ".jpg"))
        else:
            video.append(load_image(frames_path_normal + t \
                                    + "/frame_00" + str(i) + ".jpg"))
    video = tf.stack(video)
    # print(video.shape) # (50, 224, 224, 3)
    train_videos.append(video.numpy())
    train_labels.append(0)
    
for t in test_normal:
    video = []
    for i in range(50):
        current_frame_index = str(i)
        if (i < 10):
            video.append(load_image(frames_path_normal + t \
                                    + "/frame_000" + str(i) + ".jpg"))
        else:
            video.append(load_image(frames_path_normal + t \
                                    + "/frame_00" + str(i) + ".jpg"))
    video = tf.stack(video)
    # print(video.shape) # (50, 224, 224, 3)
    test_videos.append(video.numpy())
    test_labels.append(0)

for t in train_crash:
    video = []
    for i in range(50):
        current_frame_index = str(i)
        if (i < 10):
            video.append(load_image(frames_path_crash + t \
                                    + "/frame_000" + str(i) + ".jpg"))
        else:
            video.append(load_image(frames_path_crash + t \
                                    + "/frame_00" + str(i) + ".jpg"))
    video = tf.stack(video)
    # print(video.shape) # (50, 224, 224, 3)
    train_videos.append(video.numpy())
    train_labels.append(1)
    
for t in test_crash:
    video = []
    for i in range(50):
        current_frame_index = str(i)
        if (i < 10):
            video.append(load_image(frames_path_crash + t \
                                    + "/frame_000" + str(i) + ".jpg"))
        else:
            video.append(load_image(frames_path_crash + t \
                                    + "/frame_00" + str(i) + ".jpg"))
    video = tf.stack(video)
    # print(video.shape) # (50, 224, 224, 3)
    test_videos.append(video.numpy())
    test_labels.append(1)

In [7]:
train_videos = np.asarray(train_videos)
test_videos = np.asarray(test_videos)
train_labels = np.asarray(train_labels)
test_labels = np.asarray(test_labels)

# print(len(train_videos), len(train_labels)) # 34, 34
# print(len(test_videos), len(test_labels)) # 16, 16

In [8]:
# Create Dataloader

def preprocess(frames: tf.Tensor, label: tf.Tensor):
    """Preprocess the frames tensors and parse the labels."""
    # Preprocess images
    frames = tf.image.convert_image_dtype(
        frames[
            ..., tf.newaxis
        ],  # The new axis is to help for further processing with Conv3D layers
        tf.float32,
    )
    # Parse label
    label = tf.cast(label, tf.float32)
    return frames, label



def prepare_dataloader(
    videos: np.ndarray,
    labels: np.ndarray,
    loader_type: str = "train",
    batch_size: int = BATCH_SIZE,
):
    """Utility function to prepare the dataloader."""
    dataset = tf.data.Dataset.from_tensor_slices((videos, labels))

    if loader_type == "train":
        dataset = dataset.shuffle(BATCH_SIZE * 2)

    dataloader = (
        dataset.map(preprocess, num_parallel_calls=tf.data.AUTOTUNE)
        .batch(batch_size)
        .prefetch(tf.data.AUTOTUNE)
    )
    return dataloader


trainloader = prepare_dataloader(train_videos, train_labels, "train")
testloader = prepare_dataloader(test_videos, test_labels, "test")

In [9]:
class TubeletEmbedding(layers.Layer):
    def __init__(self, embed_dim, patch_size, **kwargs):
        super().__init__(**kwargs)
        self.projection = layers.Conv3D(
            filters=embed_dim,
            kernel_size=patch_size,
            strides=patch_size,
            padding="VALID",
        )
        self.flatten = layers.Reshape(target_shape=(-1, embed_dim))

    def call(self, videos):
        projected_patches = self.projection(videos)
        flattened_patches = self.flatten(projected_patches)
        return flattened_patches

In [10]:
class PositionalEncoder(layers.Layer):
    def __init__(self, embed_dim, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim

    def build(self, input_shape):
        _, num_tokens, _ = input_shape
        self.position_embedding = layers.Embedding(
            input_dim=num_tokens, output_dim=self.embed_dim
        )
        self.positions = ops.arange(0, num_tokens, 1)

    def call(self, encoded_tokens):
        # Encode the positions and add it to the encoded tokens
        encoded_positions = self.position_embedding(self.positions)
        encoded_tokens = encoded_tokens + encoded_positions
        return encoded_tokens

In [11]:
def create_vivit_classifier(
    tubelet_embedder,
    positional_encoder,
    input_shape=INPUT_SHAPE,
    transformer_layers=NUM_LAYERS,
    num_heads=NUM_HEADS,
    embed_dim=PROJECTION_DIM,
    layer_norm_eps=LAYER_NORM_EPS,
    num_classes=NUM_CLASSES,
):
    # Get the input layer
    inputs = layers.Input(shape=input_shape)
    # Create patches.
    patches = tubelet_embedder(inputs)
    # Encode patches.
    encoded_patches = positional_encoder(patches)

    # Create multiple layers of the Transformer block.
    for _ in range(transformer_layers):
        # Layer normalization and MHSA
        x1 = layers.LayerNormalization(epsilon=1e-6)(encoded_patches)
        attention_output = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim // num_heads, dropout=0.1
        )(x1, x1)

        # Skip connection
        x2 = layers.Add()([attention_output, encoded_patches])

        # Layer Normalization and MLP
        x3 = layers.LayerNormalization(epsilon=1e-6)(x2)
        x3 = keras.Sequential(
            [
                layers.Dense(units=embed_dim * 4, activation=ops.gelu),
                layers.Dense(units=embed_dim, activation=ops.gelu),
            ]
        )(x3)

        # Skip connection
        encoded_patches = layers.Add()([x3, x2])

    # Layer normalization and Global average pooling.
    representation = layers.LayerNormalization(epsilon=layer_norm_eps)(encoded_patches)
    representation = layers.GlobalAvgPool1D()(representation)

    # Classify outputs.
    outputs = layers.Dense(units=num_classes, activation="softmax")(representation)

    # Create the Keras model.
    model = keras.Model(inputs=inputs, outputs=outputs)
    return model

In [40]:
def run_experiment():
    # Initialize model
    model = create_vivit_classifier(
        tubelet_embedder=TubeletEmbedding(
            embed_dim=PROJECTION_DIM, patch_size=PATCH_SIZE
        ),
        positional_encoder=PositionalEncoder(embed_dim=PROJECTION_DIM),
    )

    # Compile the model with the optimizer, loss function
    # and the metrics.
    optimizer = keras.optimizers.Adam(learning_rate=LEARNING_RATE)
    model.compile(
        optimizer=optimizer,
        loss="sparse_categorical_crossentropy",
        metrics=[
            keras.metrics.SparseCategoricalAccuracy(name="accuracy"),
            keras.metrics.SparseTopKCategoricalAccuracy(5, name="top-5-accuracy"),
        ],
    )

    # Train the model
                                                    # 先不要validloader啦
    history = model.fit(trainloader, epochs=EPOCHS) # validation_data=validloader

    loss, accuracy, top_5_accuracy = model.evaluate(testloader)
    print(f"Test accuracy: {accuracy * 100}%")
    print(f"Test top accuracy: {top_5_accuracy * 100}%")

    return model,history


model,history = run_experiment()

Epoch 1/10
[1m5/5[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m15s[0m 2s/step - accuracy: 0.8624 - loss: 0.5217 - top-5-accuracy: 1.0000
Epoch 2/10
[1m5/5[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 2s/step - accuracy: 0.6093 - loss: 0.7132 - top-5-accuracy: 1.0000
Epoch 3/10
[1m5/5[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 2s/step - accuracy: 0.8138 - loss: 0.4698 - top-5-accuracy: 1.0000
Epoch 4/10
[1m5/5[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m11s[0m 2s/step - accuracy: 0.7925 - loss: 0.5186 - top-5-accuracy: 1.0000
Epoch 5/10
[1m5/5[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 2s/step - accuracy: 0.7324 - loss: 0.5481 - top-5-accuracy: 1.0000
Epoch 6/10
[1m5/5[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m12s[0m 2s/step - accuracy: 0.7631 - loss: 0.3793 - top-5-accuracy: 1.0000
Epoch 7/10
[1m5/5[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m9s[0m 2s/step - accuracy: 0.7930 - loss: 0.4525 - top-5-accuracy: 1.0000
Epoch 8/10
[1

## Visualization

In [43]:
## 1. Accuracy and Loss

plt.plot(history.history['accuracy'])
plt.title('Model Accuracy')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend(['Train', 'Validation'], loc='upper left')
plt.show()

plt.plot(history.history['loss'])
plt.title('Model Loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(['Train', 'Validation'], loc='upper left')
plt.show()

In [23]:
## 2. Feature Heatmap using attention weights

weights_per_layer = []

for layer in model.layers:
    weights_per_layer.append(layer.get_weights())
    
    # Attention Weights
    
    # These scores represent how much each part of the input image (divided into patches) 
    # attends to every other part (relations between layers)
    
    # Transformers typically use multi-headed attention mechanisms where each head can potentially 
    # focus on different features or parts of the image.
    
    # To visualize these as heatmaps over the original image, 
    #you will need to reshape these matrices back to the spatial dimensions of the image 
    # and use interpolation techniques to upscale the coarse heatmap to the full resolution of the image.
    
    # Visualize the Matrices: Use heatmaps to visualize each matrix. Areas of higher values (e.g., closer to 1)
    # indicate stronger attention and can be represented with warmer colors.
    
    # Calculate the average attention that each frame gives to every other frame. 
    # This can help identify if certain frames are particularly influential or are being ignored.
    
    # Look at the rows or columns that correspond to specific patches. 
    # For instance, if a particular patch in Frame 1 consistently shows high attention values 
    # across its row when looking at other frames, this indicates that the patch is of high interest 
    # across the video.
    
    # Patches are fixed-size, non-overlapping segments of the original image 
    # (or frame in the case of video), each treated as a distinct element.
    
weights_per_layer

In [None]:
""" FOR REFERENCE """

# DATA
IMG_SIZE = 128  ## Image size (128, 128) in this case
CHAN_SIZE = 1   ## 1 - GrayScale; 3 - RGB
BATCH_SIZE = 8  ## 教程给的是 32
AUTO = tf.data.AUTOTUNE
INPUT_SHAPE = (50, IMG_SIZE, IMG_SIZE, CHAN_SIZE)
NUM_CLASSES = 2  ## Crash vs. Normal

# OPTIMIZER
LEARNING_RATE = 1e-4
WEIGHT_DECAY = 1e-5

# TRAINING
EPOCHS = 10  ## 教程给的好像是 100

# TUBELET EMBEDDING
PATCH_SIZE = (8, 8, 8)
NUM_PATCHES = (INPUT_SHAPE[0] // PATCH_SIZE[0]) ** 2

# ViViT ARCHITECTURE
LAYER_NORM_EPS = 1e-6
PROJECTION_DIM = 64  ## size of the feature vectors transformed from the input
NUM_HEADS = 4        ## 教程给的是 8
NUM_LAYERS = 6       ## 教程给的是 8