In [7]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [8]:
cd /content/drive/MyDrive/video_vision_transformer

/content/drive/MyDrive/video_vision_transformer


In [9]:
import os
import io
import imageio
import ipywidgets
import numpy as np
import tensorflow as tf  # for data preprocessing only
import keras
from keras import layers

In [10]:
SEED = 42
os.environ["TF_CUDNN_DETERMINISTIC"] = "1"
keras.utils.set_random_seed(SEED)

In [11]:
import os
import cv2
import numpy as np

def prepare_dataset(data_dir: str):
    train_data_dir = os.path.join(data_dir, "TRAIN")
    val_data_dir = os.path.join(data_dir, "VAL")

    train_videos, train_labels = load_data(train_data_dir)
    valid_videos, valid_labels = load_data(val_data_dir)

    return (
        (train_videos, train_labels),
        (valid_videos, valid_labels),
        None  # No test dataset provided
    )

def load_data(data_dir):
    video_paths = []
    labels = []
    for phase_folder in os.listdir(data_dir):
        phase_folder_path = os.path.join(data_dir, phase_folder)
        if os.path.isdir(phase_folder_path):
            label = int(phase_folder)  # Assuming each phase folder is named with the phase number
            for file in os.listdir(phase_folder_path):
                if file.endswith(".avi"):
                    video_paths.append(os.path.join(phase_folder_path, file))
                    labels.append(label)

    videos = []
    for video_path in video_paths:
        cap = cv2.VideoCapture(video_path)
        frames = []
        while True:
            ret, frame = cap.read()
            if not ret:
                break
            frames.append(frame)
        cap.release()
        videos.append(frames)

    videos = np.array(videos)
    labels = np.array(labels)

    # Convert labels to one-hot encoding
    num_classes = len(np.unique(labels))  # Get the number of unique labels
    labels = keras.utils.to_categorical(labels, num_classes=num_classes)

    return videos, labels

data_dir = "/content/drive/MyDrive/video_vision_transformer/DEMO"
(train_data, train_labels), (valid_data, valid_labels), _ = prepare_dataset(data_dir)

In [28]:
import tensorflow as tf  # for data preprocessing only
#DATA
#DATASET_NAME = "organmnist3d"
BATCH_SIZE = 32
AUTO = tf.data.AUTOTUNE
INPUT_SHAPE = (224, 224, 25, 3)
NUM_CLASSES = 7

# OPTIMIZER
LEARNING_RATE = 1e-4
WEIGHT_DECAY = 1e-5

# TRAINING
EPOCHS = 60

# TUBELET EMBEDDING
PATCH_SIZE = (224, 224, 25)
NUM_PATCHES = (INPUT_SHAPE[0] // PATCH_SIZE[0]) ** 2

# MedViViT ARCHITECTURE
LAYER_NORM_EPS = 1e-6
PROJECTION_DIM = 128
NUM_HEADS = 8
NUM_LAYERS = 8

In [29]:
'''import os
import cv2

def get_video_metadata(data_dir):
    video_paths = []
    for root, dirs, files in os.walk(data_dir):
        for file in files:
            if file.endswith(".avi"):
                video_paths.append(os.path.join(root, file))

    metadata = []
    for video_path in video_paths:
        cap = cv2.VideoCapture(video_path)

        # Get video properties
        num_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        fps = cap.get(cv2.CAP_PROP_FPS)
        width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        duration = num_frames / fps

        # Append metadata
        metadata.append({
            'path': video_path,
            'num_frames': num_frames,
            'fps': fps,
            'resolution': (width, height),
            'duration_sec': duration
        })

        cap.release()

    return metadata

data_dir = "/content/drive/MyDrive/video_vision_transformer/DEMO"
metadata = get_video_metadata(data_dir)

# Print metadata of the first video
if metadata:
    meta = metadata[0]
    print(f"Path: {meta['path']}")
    print(f"Number of frames: {meta['num_frames']}")
    print(f"FPS: {meta['fps']}")
    print(f"Resolution: {meta['resolution']}")
    print(f"Duration (seconds): {meta['duration_sec']}")
else:
    print("No video metadata found.")'''

'import os\nimport cv2\n\ndef get_video_metadata(data_dir):\n    video_paths = []\n    for root, dirs, files in os.walk(data_dir):\n        for file in files:\n            if file.endswith(".avi"):\n                video_paths.append(os.path.join(root, file))\n\n    metadata = []\n    for video_path in video_paths:\n        cap = cv2.VideoCapture(video_path)\n\n        # Get video properties\n        num_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))\n        fps = cap.get(cv2.CAP_PROP_FPS)\n        width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))\n        height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))\n        duration = num_frames / fps\n\n        # Append metadata\n        metadata.append({\n            \'path\': video_path,\n            \'num_frames\': num_frames,\n            \'fps\': fps,\n            \'resolution\': (width, height),\n            \'duration_sec\': duration\n        })\n\n        cap.release()\n\n    return metadata\n\ndata_dir = "/content/drive/MyDrive/video_vis

In [48]:
import numpy as np
import tensorflow as tf

def preprocess(frames: tf.Tensor, label: tf.Tensor):
    """Preprocess the frames tensors and parse the labels."""
    # Resize frames to the desired shape (224x224)
    frames = tf.image.resize(frames, (224, 224))
    # Convert frames to float32 and normalize pixel values
    frames = tf.cast(frames, tf.float32) / 255.0
    # Transpose frames to match the expected input shape (None, 224, 224, 25, 3)
    frames = tf.transpose(frames, perm=[1, 2, 0, 3])  # Assuming the depth dimension is the last dimension
    # Parse label
    label = tf.cast(label, tf.float32)
    return frames, label

def prepare_dataloader(
    videos: np.ndarray,
    labels: np.ndarray,
    loader_type: str = "train",
    batch_size: int = 32,
):
    """Utility function to prepare the dataloader."""
    dataset = tf.data.Dataset.from_tensor_slices((videos, labels))

    if loader_type == "train":
        dataset = dataset.shuffle(len(videos))  # Shuffle only for training

    # Apply preprocessing and configure batching and prefetching
    dataloader = (
        dataset.map(preprocess, num_parallel_calls=tf.data.AUTOTUNE)
        .batch(batch_size)
        .prefetch(tf.data.AUTOTUNE)
    )
    return dataloader

In [49]:
#expected_shape=(None, 224, 224, 25, 3)

train_dataloader = prepare_dataloader(train_data, train_labels, loader_type="train", batch_size=32)
valid_dataloader = prepare_dataloader(valid_data, valid_labels, loader_type="valid", batch_size=32)

# Iterate over batches in the dataloaders
for batch in train_dataloader.take(1):
    frames, labels = batch
    print("Batch of frames shape:", frames.shape)
    print("Batch of labels shape:", labels.shape)

Batch of frames shape: (32, 224, 224, 25, 3)
Batch of labels shape: (32, 7)


In [56]:
class TubeletEmbedding(layers.Layer):
    def __init__(self, embed_dim, patch_size, **kwargs):
        super().__init__(**kwargs)
        self.projection = layers.Conv3D(
            filters=embed_dim,
            kernel_size=patch_size,
            strides=patch_size,
            padding="VALID",
        )
        self.flatten = layers.Reshape(target_shape=(-1, embed_dim))

    def call(self, videos):
        projected_patches = self.projection(videos)
        flattened_patches = self.flatten(projected_patches)
        return flattened_patches

In [57]:
class PositionalEncoder(layers.Layer):
    def __init__(self, embed_dim, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim

    def build(self, input_shape):
        _, num_tokens, _ = input_shape
        self.position_embedding = layers.Embedding(
            input_dim=num_tokens, output_dim=self.embed_dim
        )
        self.positions = tf.range(0, num_tokens, 1)

    def call(self, encoded_tokens):
        # Encode the positions and add it to the encoded tokens
        encoded_positions = self.position_embedding(self.positions)
        encoded_tokens = encoded_tokens + encoded_positions
        return encoded_tokens

In [58]:
def create_Medvivit_classifier(
    tubelet_embedder,
    positional_encoder,
    input_shape=INPUT_SHAPE,
    transformer_layers=NUM_LAYERS,
    num_heads=NUM_HEADS,
    embed_dim=PROJECTION_DIM,
    layer_norm_eps=LAYER_NORM_EPS,
    num_classes=NUM_CLASSES,
):
    # Get the input layer
    inputs = layers.Input(shape=input_shape)
    # Create patches.
    patches = tubelet_embedder(inputs)
    # Encode patches.
    encoded_patches = positional_encoder(patches)

    # Create multiple layers of the Transformer block.
    for _ in range(transformer_layers):
        # Layer normalization and MHSA
        x1 = layers.LayerNormalization(epsilon=1e-6)(encoded_patches)
        attention_output = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim // num_heads, dropout=0.1
        )(x1, x1)

        # Skip connection
        x2 = layers.Add()([attention_output, encoded_patches])

        # Layer Normalization and MLP
        x3 = layers.LayerNormalization(epsilon=1e-6)(x2)
        x3 = keras.Sequential(
            [
                layers.Dense(units=embed_dim * 4, activation='gelu'),
                layers.Dense(units=embed_dim, activation='gelu'),
            ]
        )(x3)

        # Skip connection
        encoded_patches = layers.Add()([x3, x2])

    # Layer normalization and Global average pooling.
    representation = layers.LayerNormalization(epsilon=layer_norm_eps)(encoded_patches)
    representation = layers.GlobalAvgPool1D()(representation)

    # Classify outputs.
    outputs = layers.Dense(units=num_classes, activation="softmax")(representation)

    # Create the Keras model.
    model = keras.Model(inputs=inputs, outputs=outputs)
    return model

In [None]:
def run_experiment(train_dataloader, valid_dataloader, EPOCHS=10):
    # Initialize model
    model = create_Medvivit_classifier(
        tubelet_embedder=TubeletEmbedding(
            embed_dim=PROJECTION_DIM, patch_size=PATCH_SIZE
        ),
        positional_encoder=PositionalEncoder(embed_dim=PROJECTION_DIM),
    )

    # Compile the model with the optimizer, loss function
    # and the metrics.
    optimizer = keras.optimizers.Adam(learning_rate=LEARNING_RATE)
    model.compile(
        optimizer=optimizer,
        loss="categorical_crossentropy",
        metrics=[
            keras.metrics.CategoricalAccuracy(name="accuracy"),
            keras.metrics.TopKCategoricalAccuracy(k=5, name="top-5-accuracy"),
        ],
    )

    # Train the model.
    model.fit(train_dataloader, epochs=EPOCHS, validation_data=valid_dataloader)

    # Evaluate on validation data
    loss, accuracy = model.evaluate(valid_dataloader)
    print(f"Validation loss: {loss:.4f}")
    print(f"Validation accuracy: {accuracy:.4f}")

    return model

# Assuming train_dataloader and valid_dataloader are properly initialized
model = run_experiment(train_dataloader, valid_dataloader, EPOCHS=10)

Epoch 1/10
Epoch 2/10
 3/22 [===>..........................] - ETA: 4:41 - loss: 1.9955 - accuracy: 0.2083 - top-5-accuracy: 0.6875

KeyboardInterrupt: 

In [None]:
NUM_SAMPLES_VIZ = 25
testsamples, labels = next(iter(testloader))
testsamples, labels = testsamples[:NUM_SAMPLES_VIZ], labels[:NUM_SAMPLES_VIZ]

ground_truths = []
preds = []
videos = []

for i, (testsample, label) in enumerate(zip(testsamples, labels)):
    # Generate gif
    testsample = np.reshape(testsample.numpy(), (-1, 28, 28))
    with io.BytesIO() as gif:
        imageio.mimsave(gif, (testsample * 255).astype("uint8"), "GIF", fps=5)
        videos.append(gif.getvalue())

    # Get model prediction
    output = model.predict(tf.expand_dims(testsample, axis=0))[0]

    pred = np.argmax(output, axis=0)

    ground_truths.append(label.numpy().astype("int"))
    preds.append(pred)


def make_box_for_grid(image_widget, fit):
    """Make a VBox to hold caption/image for demonstrating option_fit values.

    Source: https://ipywidgets.readthedocs.io/en/latest/examples/Widget%20Styling.html
    """
    # Make the caption
    if fit is not None:
        fit_str = "'{}'".format(fit)
    else:
        fit_str = str(fit)

    h = ipywidgets.HTML(value="" + str(fit_str) + "")

    # Make the green box with the image widget inside it
    boxb = ipywidgets.widgets.Box()
    boxb.children = [image_widget]

    # Compose into a vertical box
    vb = ipywidgets.widgets.VBox()
    vb.layout.align_items = "center"
    vb.children = [h, boxb]
    return vb


boxes = []
for i in range(NUM_SAMPLES_VIZ):
    ib = ipywidgets.widgets.Image(value=videos[i], width=100, height=100)
    true_class = info["label"][str(ground_truths[i])]
    pred_class = info["label"][str(preds[i])]
    caption = f"T: {true_class} | P: {pred_class}"

    boxes.append(make_box_for_grid(ib, caption))

ipywidgets.widgets.GridBox(
    boxes, layout=ipywidgets.widgets.Layout(grid_template_columns="repeat(5, 200px)")
)