In [None]:
import lightning as L
from transformers import VideoMAEModel
import torch
import torch.nn as nn
import torch.nn.functional as F

class BaseClassifier(L.LightningModule):
    def training_step(self, batch, batch_idx):
        x, y = batch
        logits = self(x)
        loss = F.cross_entropy(logits, y)
        self.log('train_loss', loss)
        #train acc
        preds = torch.argmax(logits, dim=1)
        acc = (preds == y).float().mean()
        self.log('train_acc', acc)
        return loss
    
    def test_step(self, batch, batch_idx):
        x, y = batch
        logits = self(x)
        loss = F.cross_entropy(logits, y)
        self.log('test_loss', loss)
        #test acc
        preds = torch.argmax(logits, dim=1)
        acc = (preds == y).float().mean()
        self.log('test_acc', acc)

class VideoMAEClassifier(BaseClassifier):
    def __init__(self, num_classes):
        super(VideoMAEClassifier, self).__init__()
        self.model = VideoMAEModel.from_pretrained("MCG-NJU/videomae-base")
        self.hidden_size = self.model.config.hidden_size
        self.num_classes = num_classes
        self.classifier = nn.Sequential(
            nn.Linear(self.hidden_size, 512),  # You can adjust the hidden layer size as needed
            nn.ReLU(),
            nn.Linear(512, num_classes)  # Final layer to predict the number of classes
        )

    def forward(self, x):
        last_hidden_state = self.model(x).last_hidden_state
        cls_token = last_hidden_state[:, 0, :]  # Extract the [CLS] token
        logits = self.classifier(cls_token)
        return logits

    def configure_optimizers(self):
        #only update the classifier
        optimizer = torch.optim.Adam(self.model.congid.parameters(), lr=1e-4)
        return optimizer


In [None]:
model = VideoMAEClassifier(num_classes=2)
for name, param in model.named_parameters():
    print(name)
classifier_param = [param for name, param in model.named_parameters() if 'classifier' in name]
model_param = [param for name, param in model.named_parameters() if 'model' in name]
len(classifier_param) 

In [None]:
import av
import numpy as np

from transformers import AutoImageProcessor, VideoMAEModel
from huggingface_hub import hf_hub_download

np.random.seed(0)


def read_video_pyav(container, indices):
    '''
    Decode the video with PyAV decoder.
    Args:
        container (`av.container.input.InputContainer`): PyAV container.
        indices (`List[int]`): List of frame indices to decode.
    Returns:
        result (np.ndarray): np array of decoded frames of shape (num_frames, height, width, 3).
    '''
    frames = []
    container.seek(0)
    start_index = indices[0]
    end_index = indices[-1]
    for i, frame in enumerate(container.decode(video=0)):
        if i > end_index:
            break
        if i >= start_index and i in indices:
            frames.append(frame)
    return np.stack([x.to_ndarray(format="rgb24") for x in frames])


def sample_frame_indices(clip_len, frame_sample_rate, seg_len):
    '''
    Sample a given number of frame indices from the video.
    Args:
        clip_len (`int`): Total number of frames to sample.
        frame_sample_rate (`int`): Sample every n-th frame.
        seg_len (`int`): Maximum allowed index of sample's last frame.
    Returns:
        indices (`List[int]`): List of sampled frame indices
    '''
    converted_len = int(clip_len * frame_sample_rate)
    end_idx = np.random.randint(converted_len, seg_len)
    start_idx = end_idx - converted_len
    indices = np.linspace(start_idx, end_idx, num=clip_len)
    indices = np.clip(indices, start_idx, end_idx - 1).astype(np.int64)
    return indices


# video clip consists of 300 frames (10 seconds at 30 FPS)
file_path = hf_hub_download(
    repo_id="nielsr/video-demo", filename="eating_spaghetti.mp4", repo_type="dataset"
)
container = av.open(file_path)

# sample 16 frames
indices = sample_frame_indices(clip_len=16, frame_sample_rate=1, seg_len=container.streams.video[0].frames)
video = read_video_pyav(container, indices)

image_processor = AutoImageProcessor.from_pretrained("MCG-NJU/videomae-base")
model = VideoMAEModel.from_pretrained("MCG-NJU/videomae-base")

# prepare video for the model
inputs = image_processor(list(video), return_tensors="pt")

# forward pass
outputs = model(**inputs)
last_hidden_states = outputs.last_hidden_state
list(last_hidden_states.shape)