<a href="https://colab.research.google.com/github/JyotsnaEdathoot/Action-Recognition-Transformers-/blob/main/HAR_with_claude_30_12_24.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import torch
import torch.nn as nn
import math
from torch.nn import TransformerEncoder, TransformerEncoderLayer
from torchvision import transforms
from einops import rearrange
from einops.layers.torch import Rearrange

class PatchEmbedding(nn.Module):
    def __init__(self, img_size=224, patch_size=16, in_channels=3, embed_dim=768):
        super().__init__()
        self.img_size = img_size
        self.patch_size = patch_size
        self.n_patches = (img_size // patch_size) ** 2

        self.proj = nn.Sequential(
            nn.Conv2d(in_channels, embed_dim, kernel_size=patch_size, stride=patch_size),
            Rearrange('b e h w -> b (h w) e')
        )

        # Positional embedding
        self.pos_embedding = nn.Parameter(torch.randn(1, self.n_patches + 1, embed_dim))
        self.cls_token = nn.Parameter(torch.randn(1, 1, embed_dim))

    def forward(self, x):
        batch_size = x.shape[0]
        x = self.proj(x)
        cls_tokens = self.cls_token.expand(batch_size, -1, -1)
        x = torch.cat([cls_tokens, x], dim=1)
        x = x + self.pos_embedding
        return x

class ActionViT(nn.Module):
    def __init__(
        self,
        img_size=224,
        patch_size=16,
        in_channels=3,
        num_classes=101,
        embed_dim=768,
        depth=12,
        num_heads=12,
        mlp_ratio=4,
        dropout=0.1
    ):
        super().__init__()

        # Patch embedding
        self.patch_embed = PatchEmbedding(
            img_size=img_size,
            patch_size=patch_size,
            in_channels=in_channels,
            embed_dim=embed_dim
        )

        # Transformer encoder
        encoder_layer = TransformerEncoderLayer(
            d_model=embed_dim,
            nhead=num_heads,
            dim_feedforward=mlp_ratio * embed_dim,
            dropout=dropout,
            activation='gelu',
            batch_first=True
        )
        self.transformer = TransformerEncoder(encoder_layer, num_layers=depth)

        # MLP head
        self.mlp_head = nn.Sequential(
            nn.LayerNorm(embed_dim),
            nn.Linear(embed_dim, num_classes)
        )

    def forward(self, x):
        # Input shape: [batch_size, channels, frames, height, width]
        b, c, f, h, w = x.shape

        # Reshape for 2D processing
        x = rearrange(x, 'b c f h w -> (b f) c h w')

        # Patch embedding
        x = self.patch_embed(x)

        # Transformer encoding
        x = self.transformer(x)

        # Use CLS token for classification
        x = x[:, 0]

        # Reshape back to include temporal dimension
        x = rearrange(x, '(b f) d -> b f d', b=b)

        # Global average pooling over frames
        x = x.mean(dim=1)

        # Classification
        x = self.mlp_head(x)
        return x

def train_step(model, optimizer, data, labels, criterion):
    model.train()
    optimizer.zero_grad()

    outputs = model(data)
    loss = criterion(outputs, labels)

    loss.backward()
    optimizer.step()

    return loss.item()

def validate(model, val_loader, criterion):
    model.eval()
    total_loss = 0
    correct = 0
    total = 0

    with torch.no_grad():
        for data, labels in val_loader:
            outputs = model(data)
            loss = criterion(outputs, labels)

            total_loss += loss.item()
            _, predicted = outputs.max(1)
            total += labels.size(0)
            correct += predicted.eq(labels).sum().item()

    return total_loss / len(val_loader), 100. * correct / total

# Data preprocessing
def get_transforms(img_size=224):
    train_transform = transforms.Compose([
        transforms.RandomResizedCrop(img_size),
        transforms.RandomHorizontalFlip(),
        transforms.ColorJitter(brightness=0.4, contrast=0.4, saturation=0.4),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])

    val_transform = transforms.Compose([
        transforms.Resize(int(img_size * 1.14)),
        transforms.CenterCrop(img_size),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])

    return train_transform, val_transform

Next steps for better recognition:

Data Preparation:
Collect/use standard action datasets (UCF101, Kinetics, HMDB51)
Implement video loading and preprocessing pipeline
Apply temporal augmentations (random clips, frame skipping)


In [4]:
import torch
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms
import cv2
import os
import numpy as np
from pathlib import Path

class UCF101Dataset(Dataset):
    def __init__(self, root_dir, annotation_file, transform=None, frames_per_clip=16, frame_skip=2, split='train'):
        self.root_dir = Path(root_dir)
        self.frames_per_clip = frames_per_clip
        self.frame_skip = frame_skip
        self.transform = transform
        self.split = split

        self.videos = []
        self.labels = []

        with open(annotation_file, 'r') as f:
            for line in f:
                video_path, label = line.strip().split()
                self.videos.append(video_path)
                self.labels.append(int(label))

    def _load_video(self, video_path):
        cap = cv2.VideoCapture(str(self.root_dir / video_path))
        frames = []

        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        if self.split == 'train':
            start_idx = np.random.randint(0, max(1, total_frames - self.frames_per_clip * self.frame_skip))
        else:
            start_idx = max(0, total_frames - self.frames_per_clip * self.frame_skip) // 2

        cap.set(cv2.CAP_PROP_POS_FRAMES, start_idx)

        for _ in range(self.frames_per_clip * self.frame_skip):
            ret, frame = cap.read()
            if ret:
                frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                frames.append(frame)
            if len(frames) >= self.frames_per_clip:
                break

        cap.release()

        # If we don't have enough frames, duplicate the last frame
        while len(frames) < self.frames_per_clip:
            frames.append(frames[-1] if frames else np.zeros((224, 224, 3), dtype=np.uint8))

        return np.array(frames[::self.frame_skip])

    def __len__(self):
        return len(self.videos)

    def __getitem__(self, idx):
        video_path = self.videos[idx]
        label = self.labels[idx]

        video = self._load_video(video_path)

        if self.transform:
            frames = []
            for frame in video:
                frame = self.transform(frame)
                frames.append(frame)
            video = torch.stack(frames)

        return video, label

def get_data_loaders(root_dir, annotation_dir, batch_size=16, num_workers=4):
    train_transform = transforms.Compose([
        transforms.ToPILImage(),
        transforms.RandomResizedCrop(224),
        transforms.RandomHorizontalFlip(),
        transforms.ColorJitter(brightness=0.4, contrast=0.4, saturation=0.4),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])

    val_transform = transforms.Compose([
        transforms.ToPILImage(),
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])

    train_dataset = UCF101Dataset(
        root_dir=root_dir,
        annotation_file=os.path.join(annotation_dir, 'trainlist01.txt'),
        transform=train_transform,
        split='train'
    )

    val_dataset = UCF101Dataset(
        root_dir=root_dir,
        annotation_file=os.path.join(annotation_dir, 'testlist01.txt'),
        transform=val_transform,
        split='val'
    )

    train_loader = DataLoader(
        train_dataset,
        batch_size=batch_size,
        shuffle=True,
        num_workers=num_workers,
        pin_memory=True
    )

    val_loader = DataLoader(
        val_dataset,
        batch_size=batch_size,
        shuffle=False,
        num_workers=num_workers,
        pin_memory=True
    )

    return train_loader, val_loader

Model Improvements:
Add temporal attention layers
Implement 3D patch embeddings
Use pretrained weights from ImageNet

Training Enhancements:
Implement learning rate scheduling
Add model checkpointing
Use mixed precision training
Add validation metrics (accuracy, confusion matrix)

In [9]:
import torch
import torch.nn as nn
from einops import rearrange, repeat
import timm

class Space3DPatchEmbedding(nn.Module):
    def __init__(self, video_size=(16, 224, 224), patch_size=(2, 16, 16), in_channels=3, embed_dim=768):
        super().__init__()
        self.t, self.h, self.w = video_size
        self.pt, self.ph, self.pw = patch_size
        self.n_patches = (self.t // self.pt) * (self.h // self.ph) * (self.w // self.pw)
        self.projection = nn.Conv3d(in_channels, embed_dim, kernel_size=patch_size, stride=patch_size)
        self.position_embeddings = nn.Parameter(torch.randn(1, self.n_patches + 1, embed_dim))
        self.cls_token = nn.Parameter(torch.randn(1, 1, embed_dim))

    def forward(self, x):
        B = x.shape[0]
        x = self.projection(x)
        x = rearrange(x, 'b e t h w -> b (t h w) e')
        cls_tokens = repeat(self.cls_token, '() n e -> b n e', b=B)
        x = torch.cat([cls_tokens, x], dim=1)
        x = x + self.position_embeddings
        return x

class TemporalAttention(nn.Module):
    def __init__(self, dim, num_heads=8):
        super().__init__()
        self.num_heads = num_heads
        self.head_dim = dim // num_heads
        self.scale = self.head_dim ** -0.5
        self.qkv = nn.Linear(dim, dim * 3)
        self.proj = nn.Linear(dim, dim)

    def forward(self, x, T):
        B, N, C = x.shape
        qkv = self.qkv(x).reshape(B, N, 3, self.num_heads, self.head_dim).permute(2, 0, 3, 1, 4)
        q, k, v = qkv[0], qkv[1], qkv[2]

        q = rearrange(q, 'b h (t n) d -> b h t n d', t=T)
        k = rearrange(k, 'b h (t n) d -> b h t n d', t=T)
        v = rearrange(v, 'b h (t n) d -> b h t n d', t=T)

        attn = (q @ k.transpose(-2, -1)) * self.scale
        attn = attn.softmax(dim=-1)
        x = (attn @ v)
        x = rearrange(x, 'b h t n d -> b (t n) (h d)')
        x = self.proj(x)
        return x

class ImprovedActionViT(nn.Module):
    def __init__(self, video_size=(16, 224, 224), patch_size=(2, 16, 16), in_channels=3,
                 num_classes=101, embed_dim=768, depth=12, num_heads=12, mlp_ratio=4, dropout=0.1):
        super().__init__()

        self.patch_embed = Space3DPatchEmbedding(
            video_size=video_size,
            patch_size=patch_size,
            in_channels=in_channels,
            embed_dim=embed_dim
        )

        self.temporal_attn = nn.ModuleList([
            TemporalAttention(embed_dim, num_heads=num_heads)
            for _ in range(depth)
        ])

        self.transformer = nn.ModuleList([
            nn.TransformerEncoderLayer(
                d_model=embed_dim,
                nhead=num_heads,
                dim_feedforward=mlp_ratio * embed_dim,
                dropout=dropout,
                activation='gelu',
                batch_first=True
            ) for _ in range(depth)
        ])

        self.norm = nn.LayerNorm(embed_dim)
        self.fc = nn.Linear(embed_dim, num_classes)

    def forward(self, x):
        x = self.patch_embed(x)
        T = x.size(1) // (16 * 16)

        for temporal_layer, transformer_layer in zip(self.temporal_attn, self.transformer):
            x = x + temporal_layer(x, T)
            x = transformer_layer(x)

        x = self.norm(x)
        x = x[:, 0]
        x = self.fc(x)
        return x

def load_imagenet_weights(model):
    vit = timm.create_model('vit_base_patch16_224', pretrained=True)

    # Load patch embedding weights
    with torch.no_grad():
        w = vit.patch_embed.proj.weight.data
        model.patch_embed.projection.weight.data[:, :, 1, :, :].copy_(w)

        # Load transformer weights
        for i, layer in enumerate(model.transformer):
            # Copy attention weights
            vit_layer = vit.blocks[i].attn
            layer.self_attn.in_proj_weight.data.copy_(vit_layer.qkv.weight.data)
            layer.self_attn.in_proj_bias.data.copy_(vit_layer.qkv.bias.data)
            layer.self_attn.out_proj.weight.data.copy_(vit_layer.proj.weight.data)
            layer.self_attn.out_proj.bias.data.copy_(vit_layer.proj.bias.data)

            # Copy MLP weights
            vit_mlp = vit.blocks[i].mlp
            layer.linear1.weight.data.copy_(vit_mlp.fc1.weight.data)
            layer.linear1.bias.data.copy_(vit_mlp.fc1.bias.data)
            layer.linear2.weight.data.copy_(vit_mlp.fc2.weight.data)
            layer.linear2.bias.data.copy_(vit_mlp.fc2.bias.data)

        # Load normalization weights
        model.norm.weight.data.copy_(vit.norm.weight.data)
        model.norm.bias.data.copy_(vit.norm.bias.data)

    return model

def get_model(num_classes=101, pretrained=True):
    model = ImprovedActionViT(num_classes=num_classes)
    if pretrained:
        model = load_imagenet_weights(model)
    return model