In [15]:
import torch
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import json
import cv2
from tqdm import tqdm
import os
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from sklearn.model_selection import train_test_split

In [16]:
device=torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

device(type='cuda')

In [17]:
def load_video_frames_opencv(video_path, num_frames=20):
    cap = cv2.VideoCapture(video_path)
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    
    if total_frames < num_frames:
        raise ValueError(f"Video too short: {total_frames} < {num_frames}")

    frame_indices = set()
    attempts = 0
    while len(frame_indices) < num_frames and attempts < 3:
        offsets = np.linspace(0, total_frames-1, num_frames-len(frame_indices))
        frame_indices.update(int(round(o)) for o in offsets)
        attempts += 1

    frames = []
    for idx in sorted(frame_indices):
        cap.set(cv2.CAP_PROP_POS_FRAMES, idx)
        ret, frame = cap.read()
        if ret:
            frames.append(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
    
    cap.release()
    
    if len(frames) < num_frames:
        last_frame = frames[-1]
        frames += [last_frame] * (num_frames - len(frames))

    video_tensor = torch.tensor(np.array(frames), dtype=torch.float32).permute(0, 3, 1, 2)
    video_tensor /= 255.0

    transform = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.Normalize(mean=[0.485, 0.456, 0.406],
                           std=[0.229, 0.224, 0.225])
    ])
    
    return transform(video_tensor)  # [T, C, H, W]


In [18]:
from transformers import AutoImageProcessor, TimesformerForVideoClassification
import torch

image_processor = AutoImageProcessor.from_pretrained("MCG-NJU/videomae-base-finetuned-kinetics")

model = TimesformerForVideoClassification.from_pretrained("facebook/timesformer-base-finetuned-k400",trust_remote_code=True,
    use_safetensors=True)
model.eval().to(device)

TimesformerForVideoClassification(
  (timesformer): TimesformerModel(
    (embeddings): TimesformerEmbeddings(
      (patch_embeddings): TimesformerPatchEmbeddings(
        (projection): Conv2d(3, 768, kernel_size=(16, 16), stride=(16, 16))
      )
      (pos_drop): Dropout(p=0.0, inplace=False)
      (time_drop): Dropout(p=0.0, inplace=False)
    )
    (encoder): TimesformerEncoder(
      (layer): ModuleList(
        (0-11): 12 x TimesformerLayer(
          (drop_path): Identity()
          (attention): TimeSformerAttention(
            (attention): TimesformerSelfAttention(
              (qkv): Linear(in_features=768, out_features=2304, bias=True)
              (attn_drop): Dropout(p=0.0, inplace=False)
            )
            (output): TimesformerSelfOutput(
              (dense): Linear(in_features=768, out_features=768, bias=True)
              (dropout): Dropout(p=0.0, inplace=False)
            )
          )
          (intermediate): TimesformerIntermediate(
            (dense

In [19]:
class VideoDataset(Dataset):
    def __init__(self, video_paths, labels, num_frames=20):
        self.video_paths = video_paths
        self.labels = labels
        self.num_frames = num_frames

    def __len__(self):
        return len(self.video_paths)

    def __getitem__(self, idx):
        path = self.video_paths[idx]
        label = self.labels[idx]

        # Load video frames for this sample on demand
        frames = load_video_frames_opencv(path, num_frames=self.num_frames)

        # Convert label to numeric here if needed or do it beforehand
        # For example, you can use a label encoder or pd.factorize once on all labels

        return frames, label

In [20]:
def extract_label(path):
  return path.split('/')[1]

In [21]:
video_folder = "data/"
video_paths,labels = [],[]
for root, _, files in os.walk(video_folder):
    for f in files:
        if f.endswith((".mp4", ".avi")):
            path=os.path.join(root, f).replace("\\", "/")
            video_paths.append(path)
            labels.append(extract_label(path))


In [None]:
len(video_paths), len(labels)

(100, 100)

In [9]:
X_train, X_val, y_train, y_val = train_test_split(
    video_paths, labels, test_size=0.2, random_state=42, stratify=labels
)


In [10]:
y_train_encoded, uniques = pd.factorize(y_train)
y_val_encoded = pd.Series(y_val).map({label: idx for idx, label in enumerate(uniques)}).values

  y_train_encoded, uniques = pd.factorize(y_train)


In [11]:
train_dataset = VideoDataset(X_train, y_train_encoded, num_frames=20)
val_dataset = VideoDataset(X_val, y_val_encoded, num_frames=20)


In [None]:
train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True, num_workers=4)
val_loader = DataLoader(val_dataset, batch_size=8, shuffle=False, num_workers=4)

In [14]:
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)
criterion = torch.nn.CrossEntropyLoss()
num_epochs=1

model.train()
for epoch in range(num_epochs):
    for batch_x, batch_y in tqdm(train_loader):
        batch_x, batch_y = batch_x.to(device), batch_y.to(device)
        optimizer.zero_grad()
        outputs = model(batch_x).logits
        loss = criterion(outputs, batch_y)
        loss.backward()
        optimizer.step()
    print(f"Epoch {epoch + 1}, Loss: {loss.item():.4f}")


  0%|          | 0/10 [00:23<?, ?it/s]


KeyboardInterrupt: 

In [None]:
from sklearn.metrics import classification_report
import torch

model.eval()
all_preds = []
all_labels = []

with torch.no_grad():
    for batch_x, batch_y in val_loader:
        batch_x, batch_y = batch_x.to(device), batch_y.to(device)
        outputs = model(batch_x).logits
        preds = torch.argmax(outputs, dim=1)
        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(batch_y.cpu().numpy())

# Generate classification report
report = classification_report(all_labels, all_preds, digits=4)
print(report)
