In [2]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

import torchvision.transforms as transforms
from torchvision.io import read_video

import open_clip

import os

  from .autonotebook import tqdm as notebook_tqdm


In [3]:
class DeadliftVideoDataset(Dataset):
    def __init__(self, video_paths, labels, transform=None, augmentation=None, frames_per_video=16):
        self.video_paths = video_paths
        self.labels = labels
        self.transform = transform
        self.augmentation = augmentation
        self.frames_per_video = frames_per_video
        self.label_to_idx = {
            'bad movement': 0,
            'good movement': 1
        }
        
    def __len__(self):
        return len(self.video_paths)
    
    def __getitem__(self, idx):
        video_path = self.video_paths[idx]
        label = self.label_to_idx[self.labels[idx]]
        
        # Read video frames
        video_frames, _, info = read_video(video_path, pts_unit='sec')
        total_frames = video_frames.shape[0]

        # Sample frames evenly throughout the video
        indices = torch.linspace(0, total_frames - 1, self.frames_per_video).long()
        sampled_frames = video_frames[indices]
        
        # Apply transforms to frames
        frames = []
        for frame in sampled_frames:
            frame = frame.permute(2, 0, 1)  # Convert to (C, H, W)
            if self.augmentation:
                frame = self.augmentation(frame)
            if self.transform:
                frame = self.transform(frame)
            frames.append(frame)
    
        # Stack frames into a tensor
        frames_tensor = torch.stack(frames)  # Shape: [frames_per_video, C, H, W]
    
        return frames_tensor, label


In [4]:
model_name = 'ViT-B-32'  # You can choose other architectures
pretrained = 'openai'

model, _, _ = open_clip.create_model_and_transforms(
    model_name=model_name,
    pretrained=pretrained
)
model.eval()  # Set to evaluation mode

CLIP(
  (visual): VisionTransformer(
    (conv1): Conv2d(3, 768, kernel_size=(32, 32), stride=(32, 32), bias=False)
    (patch_dropout): Identity()
    (ln_pre): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
    (transformer): Transformer(
      (resblocks): ModuleList(
        (0-11): 12 x ResidualAttentionBlock(
          (ln_1): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
          (attn): MultiheadAttention(
            (out_proj): NonDynamicallyQuantizableLinear(in_features=768, out_features=768, bias=True)
          )
          (ls_1): Identity()
          (ln_2): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
          (mlp): Sequential(
            (c_fc): Linear(in_features=768, out_features=3072, bias=True)
            (gelu): QuickGELU()
            (c_proj): Linear(in_features=3072, out_features=768, bias=True)
          )
          (ls_2): Identity()
        )
      )
    )
    (ln_post): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
  )
  

In [5]:
class VideoCLIPClassifier(nn.Module):
    def __init__(self, clip_model, num_classes):
        super(VideoCLIPClassifier, self).__init__()
        self.clip_model = clip_model
        self.num_classes = num_classes
        self.fc = nn.Linear(clip_model.visual.output_dim, num_classes)
        
    def forward(self, frames):
        # frames: [batch_size, frames_per_video, C, H, W]
        batch_size, frames_per_video, C, H, W = frames.shape
        frames = frames.view(-1, C, H, W)  # Flatten frames
        with torch.no_grad():
            frame_features = self.clip_model.encode_image(frames)  # [batch_size * frames_per_video, output_dim]
        frame_features = frame_features.view(batch_size, frames_per_video, -1)
        video_features = frame_features.mean(dim=1)  # Average over frames
        logits = self.fc(video_features)  # [batch_size, num_classes]
        return logits

In [6]:
class VideoCLIPClassifierWithAttention(nn.Module):
    def __init__(self, clip_model, num_classes):
        super(VideoCLIPClassifierWithAttention, self).__init__()
        self.clip_model = clip_model
        self.num_classes = num_classes
        self.attention_layer = nn.MultiheadAttention(embed_dim=clip_model.visual.output_dim, num_heads=4)
        self.fc = nn.Linear(clip_model.visual.output_dim, num_classes)
    
    def forward(self, frames):
        batch_size, frames_per_video, C, H, W = frames.shape
        frames = frames.view(-1, C, H, W)  # Flatten frames
        with torch.no_grad():
            frame_features = self.clip_model.encode_image(frames)
        frame_features = frame_features.view(batch_size, frames_per_video, -1)  # [batch_size, frames_per_video, output_dim]

        # Attention over frames
        frame_features = frame_features.permute(1, 0, 2)  # [frames_per_video, batch_size, output_dim]
        attn_output, _ = self.attention_layer(frame_features, frame_features, frame_features)
        video_features = attn_output.mean(dim=0)  # [batch_size, output_dim]
        logits = self.fc(video_features)
        return logits


In [7]:
num_classes = 2  # Number of movement labels
classifier = VideoCLIPClassifierWithAttention(model, num_classes)

In [32]:
import os
from torch.utils.data import DataLoader, Subset
from sklearn.model_selection import train_test_split

dataset_root = './dataset/Adam'

video_paths = []
labels = []

for label_folder in ['bad', 'good']:
    label_path = os.path.join(dataset_root, label_folder)
    
    if not os.path.isdir(label_path):
        print(f"Folder '{label_folder}' does not exist in '{dataset_root}'")
        continue

    label = 'bad movement' if label_folder == 'bad' else 'good movement'
    
    for video_file in os.listdir(label_path):
        if video_file.endswith('.mp4'):  
            video_paths.append(os.path.join(label_path, video_file))
            labels.append(label)

print("Video Paths:", video_paths)
print("Labels:", labels)

train_video_paths, test_video_paths, train_labels, test_labels = train_test_split(
    video_paths, labels, test_size=0.2, stratify=labels, random_state=42
)

train_augmentation = transforms.RandomApply([
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomRotation(degrees=15),
    transforms.ColorJitter(brightness=0.2, contrast=0.2),
], p=0.7)

transform = transforms.Compose([
    transforms.Resize(224),
    transforms.CenterCrop(224),
    transforms.ConvertImageDtype(torch.float32),
    transforms.Normalize(
        mean=(0.48145466, 0.4578275, 0.40821073),
        std=(0.26862954, 0.26130258, 0.27577711)
    ),
])

train_dataset = DeadliftVideoDataset(train_video_paths, train_labels, transform=transform, augmentation=train_augmentation)
test_dataset = DeadliftVideoDataset(test_video_paths, test_labels, transform=transform)

train_dataloader = DataLoader(train_dataset, batch_size=2, shuffle=True, num_workers=0)
test_dataloader = DataLoader(test_dataset, batch_size=2, shuffle=False, num_workers=0)


Video Paths: ['./dataset/Adam/bad/bad001.mp4', './dataset/Adam/bad/bad002.mp4', './dataset/Adam/bad/bad003.mp4', './dataset/Adam/bad/bad004.mp4', './dataset/Adam/bad/bad005.mp4', './dataset/Adam/bad/bad006.mp4', './dataset/Adam/bad/bad007.mp4', './dataset/Adam/bad/bad008.mp4', './dataset/Adam/bad/bad009.mp4', './dataset/Adam/bad/bad010.mp4', './dataset/Adam/bad/bad011.mp4', './dataset/Adam/bad/bad012.mp4', './dataset/Adam/bad/bad013.mp4', './dataset/Adam/bad/bad014.mp4', './dataset/Adam/bad/bad015.mp4', './dataset/Adam/bad/bad016.mp4', './dataset/Adam/bad/bad017.mp4', './dataset/Adam/bad/bad018.mp4', './dataset/Adam/bad/bad019.mp4', './dataset/Adam/good/good001.mp4', './dataset/Adam/good/good002.mp4', './dataset/Adam/good/good003.mp4', './dataset/Adam/good/good004.mp4', './dataset/Adam/good/good005.mp4', './dataset/Adam/good/good006.mp4', './dataset/Adam/good/good007.mp4', './dataset/Adam/good/good008.mp4', './dataset/Adam/good/good009.mp4', './dataset/Adam/good/good010.mp4', './datase

In [18]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
classifier = classifier.to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(classifier.fc.parameters(), lr=1e-4)  # Only train the classification head

In [10]:
num_epochs = 100

for epoch in range(num_epochs):
    classifier.train()
    running_loss = 0.0
    for i, (frames, labels) in enumerate(train_dataloader):
        frames = frames.to(device)  # [batch_size, frames_per_video, C, H, W]
        labels = labels.to(device)

        optimizer.zero_grad()

        outputs = classifier(frames)  # [batch_size, num_classes]
        loss = criterion(outputs, labels)

        loss.backward()
        optimizer.step()

        running_loss += loss.item()

        if i % 5 == 0:
            print(f'Epoch [{epoch+1}/{num_epochs}], Step [{i}/{len(train_dataloader)}], Loss: {loss.item():.4f}')
    if epoch%20 == 0 :
        torch.save(classifier.state_dict(), 'deadlift_classifier.pth')
    epoch_loss = running_loss / len(train_dataloader)
    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {epoch_loss:.4f}')




Epoch [1/100], Step [0/18], Loss: 0.8316
Epoch [1/100], Step [5/18], Loss: 0.6826
Epoch [1/100], Step [10/18], Loss: 0.6250
Epoch [1/100], Step [15/18], Loss: 0.7229
Epoch [1/100], Loss: 0.7130
Epoch [2/100], Step [0/18], Loss: 0.7153
Epoch [2/100], Step [5/18], Loss: 0.6835
Epoch [2/100], Step [10/18], Loss: 0.6880
Epoch [2/100], Step [15/18], Loss: 0.7073
Epoch [2/100], Loss: 0.6965
Epoch [3/100], Step [0/18], Loss: 0.6886
Epoch [3/100], Step [5/18], Loss: 0.6997
Epoch [3/100], Step [10/18], Loss: 0.6392
Epoch [3/100], Step [15/18], Loss: 0.6993
Epoch [3/100], Loss: 0.6892
Epoch [4/100], Step [0/18], Loss: 0.7012
Epoch [4/100], Step [5/18], Loss: 0.6681
Epoch [4/100], Step [10/18], Loss: 0.8279
Epoch [4/100], Step [15/18], Loss: 0.6847
Epoch [4/100], Loss: 0.6874
Epoch [5/100], Step [0/18], Loss: 0.6862
Epoch [5/100], Step [5/18], Loss: 0.6635
Epoch [5/100], Step [10/18], Loss: 0.6553
Epoch [5/100], Step [15/18], Loss: 0.6429
Epoch [5/100], Loss: 0.6797
Epoch [6/100], Step [0/18], Lo

In [27]:
classifier.load_state_dict(torch.load('deadlift_classifier.pth'))
classifier.eval()


correct = 0
total = 0
all_predictions = []
all_labels = []

with torch.no_grad():
    for frames, labels in test_dataloader:
        frames = frames.to(device)
        labels = labels.to(device)
        outputs = classifier(frames)
        _, predicted = torch.max(outputs, 1)

        all_predictions.extend(predicted.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

        correct += (predicted == labels).sum().item()
        total += labels.size(0)

accuracy = correct / total * 100
print(f'Accuracy on the test set: {accuracy:.2f}%')

idx_to_label = {v: k for k, v in test_dataset.label_to_idx.items()}
all_predictions_labels = [idx_to_label[pred] for pred in all_predictions]
all_labels_labels = [idx_to_label[label] for label in all_labels]

for pred, true_label in zip(all_predictions_labels, all_labels_labels):
    print(f'Predicted: {pred}, Actual: {true_label}')




Accuracy on the test set: 62.50%
Predicted: bad movement, Actual: good movement
Predicted: bad movement, Actual: bad movement
Predicted: good movement, Actual: good movement
Predicted: good movement, Actual: good movement
Predicted: bad movement, Actual: bad movement
Predicted: good movement, Actual: good movement
Predicted: good movement, Actual: bad movement
Predicted: good movement, Actual: bad movement
