S6 Model

In [12]:
import os
import cv2
import torch
import torch.nn as nn
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, Dataset
import torch.optim as optim
import torchmetrics
import timm
import numpy as np

# Data preparation
class VideoDataset(Dataset):
    def __init__(self, video_files, root_dir, transform=None):
        self.video_files = video_files
        self.root_dir = root_dir
        self.transform = transform or transforms.Compose([
            transforms.ToPILImage(),
            transforms.Resize((300, 300)),  # EfficientNet S6 input size
            transforms.RandomHorizontalFlip(),
            transforms.RandomRotation(10),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
        ])
        print(f"Initialized dataset with {len(video_files)} videos.")

    def __len__(self):
        return len(self.video_files)

    def __getitem__(self, idx):
        video_file = self.video_files[idx]
        video_path = os.path.join(self.root_dir, video_file)
        cap = cv2.VideoCapture(video_path)
        frames = []

        while True:
            ret, frame = cap.read()
            if not ret:
                break
            frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)  # Convert OpenCV BGR to RGB
            frame = self.transform(frame)  # Apply transformations (ensures shape: [3, 300, 300])
            frames.append(frame)

        cap.release()

        # 🛠 Fix: Ensure at least one frame is present
        if len(frames) == 0:
            return torch.zeros(3, 300, 300), torch.tensor(-1, dtype=torch.float32)  # Default tensor for empty videos

        # 🛠 Fix: Stack frames correctly (ensures consistent tensor shape)
        frames = torch.stack(frames)  # Shape: [num_frames, 3, 300, 300]
        frames = torch.mean(frames, dim=0)  # Take mean across time axis → [3, 300, 300]

        # 🛠 Fix: Convert label to `float32`
        label = torch.tensor(0.0 if 'Makeup' in video_file else 1.0, dtype=torch.float32)

        return frames, label

# Defining the model using EfficientNet S6
class EfficientNetS6Model(nn.Module):
    def __init__(self, num_classes=1):
        super(EfficientNetS6Model, self).__init__()
        self.base_model = timm.create_model('efficientnetv2_rw_s', pretrained=True)
        num_features = self.base_model.classifier.in_features
        self.base_model.classifier = nn.Linear(num_features, num_classes)

    def forward(self, x):
        return self.base_model(x)  # No sigmoid here; apply it in loss calculation

# datasets and dataloaders
def setup_datasets(root_dir):
    video_files = os.listdir(root_dir)
    np.random.shuffle(video_files)
    num_videos = len(video_files)
    train_split = int(0.7 * num_videos)
    val_split = int(0.85 * num_videos)

    train_files = video_files[:train_split]
    val_files = video_files[train_split:val_split]
    test_files = video_files[val_split:]

    train_dataset = VideoDataset(train_files, root_dir)
    val_dataset = VideoDataset(val_files, root_dir)
    test_dataset = VideoDataset(test_files, root_dir)

    return train_dataset, val_dataset, test_dataset

root_dir = './Assessment_Dataset'
train_dataset, val_dataset, test_dataset = setup_datasets(root_dir)

train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=4, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=4, shuffle=True)


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = EfficientNetS6Model().to(device)
optimizer = optim.Adam(model.parameters(), lr=0.001)
criterion = nn.BCEWithLogitsLoss()
accuracy_metric = torchmetrics.Accuracy(task="binary").to(device)
total_correct = 0
total_samples = 0

# Training 
def train_and_validate(model, train_loader, val_loader, criterion, optimizer, num_epochs=10):
    global total_correct, total_samples  
    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        for frames, labels in train_loader:
            if torch.any(labels == -1):  
                continue
            frames = frames.to(device)
            labels = labels.unsqueeze(1).to(device)  
            optimizer.zero_grad()

            outputs = model(frames)  
            loss = criterion(outputs, labels)  
            loss.backward()
            optimizer.step()
            running_loss += loss.item()

            
            preds = torch.sigmoid(outputs)  
            accuracy_metric.update(preds, labels.int())

        train_accuracy = accuracy_metric.compute()
        print(f'Epoch {epoch+1}, Training Accuracy: {train_accuracy:.4f}')
        accuracy_metric.reset()

        model.eval()
        val_loss = 0.0
        for frames, labels in val_loader:
            if torch.any(labels == -1):  
                continue
            frames = frames.to(device)
            labels = labels.unsqueeze(1).to(device)

            with torch.no_grad():
                outputs = model(frames)
                loss = criterion(outputs, labels)
                val_loss += loss.item()

                # Compute accuracy
                preds = torch.sigmoid(outputs)
                accuracy_metric.update(preds, labels.int())

                # 🛠 NEW: Track overall correct predictions
                total_correct += (preds.round() == labels).sum().item()
                total_samples += labels.size(0)

        val_accuracy = accuracy_metric.compute()
        print(f'Epoch {epoch+1}, Validation Accuracy: {val_accuracy:.4f}')
        accuracy_metric.reset()

    
    overall_accuracy = (total_correct / total_samples) * 100 if total_samples > 0 else 0
    print(f"\n🚀 Overall Model Accuracy: {overall_accuracy:.2f}%")

train_and_validate(model, train_loader, val_loader, criterion, optimizer)


Initialized dataset with 56 videos.
Initialized dataset with 12 videos.
Initialized dataset with 12 videos.
Epoch 1, Training Accuracy: 0.7679
Epoch 1, Validation Accuracy: 1.0000
Epoch 2, Training Accuracy: 0.9464
Epoch 2, Validation Accuracy: 0.9167
Epoch 3, Training Accuracy: 0.9643
Epoch 3, Validation Accuracy: 1.0000
Epoch 4, Training Accuracy: 0.9286
Epoch 4, Validation Accuracy: 1.0000
Epoch 5, Training Accuracy: 0.9107
Epoch 5, Validation Accuracy: 0.9167
Epoch 6, Training Accuracy: 1.0000
Epoch 6, Validation Accuracy: 1.0000
Epoch 7, Training Accuracy: 0.9107
Epoch 7, Validation Accuracy: 0.7500
Epoch 8, Training Accuracy: 0.9464
Epoch 8, Validation Accuracy: 0.9167
Epoch 9, Training Accuracy: 1.0000
Epoch 9, Validation Accuracy: 1.0000
Epoch 10, Training Accuracy: 0.9464
Epoch 10, Validation Accuracy: 0.9167

🚀 Overall Model Accuracy: 94.17%


Video Mamba Suit

In [17]:
import os
import cv2
import torch
import torch.nn as nn
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, Dataset
import torch.optim as optim
import torchmetrics
import numpy as np

# Video Mamba Block
class VideoMambaBlock(nn.Module):
    def __init__(self, input_dim, hidden_dim):
        super(VideoMambaBlock, self).__init__()
        self.linear1 = nn.Linear(input_dim, hidden_dim)
        self.activation = nn.ReLU()
        self.linear2 = nn.Linear(hidden_dim, hidden_dim)
    
    def forward(self, x):
        x = self.linear1(x)
        x = self.activation(x)
        x = self.linear2(x)
        return x

# Video Mamba Model
class VideoMamba(nn.Module):
    def __init__(self, input_dim, hidden_dim, num_layers=3):
        super(VideoMamba, self).__init__()
        self.layers = nn.ModuleList([VideoMambaBlock(input_dim if i == 0 else hidden_dim, hidden_dim) for i in range(num_layers)])
        self.output_layer = nn.Linear(hidden_dim, hidden_dim)
    
    def forward(self, x):
        for layer in self.layers:
            x = layer(x)
        return self.output_layer(x)

# Data Preparation
class VideoDataset(Dataset):
    def __init__(self, video_files, root_dir, transform=None):
        self.video_files = video_files
        self.root_dir = root_dir
        self.transform = transform or transforms.Compose([
            transforms.ToPILImage(),
            transforms.Resize((224, 224)),
            transforms.RandomHorizontalFlip(),
            transforms.RandomRotation(10),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
        ])
        print(f"Initialized dataset with {len(video_files)} videos.")

    def __len__(self):
        return len(self.video_files)

    def __getitem__(self, idx):
        video_file = self.video_files[idx]
        video_path = os.path.join(self.root_dir, video_file)
        cap = cv2.VideoCapture(video_path)
        frames = []

        while True:
            ret, frame = cap.read()
            if not ret:
                break
            frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            frame = self.transform(frame)
            frames.append(frame)

        cap.release()

        if len(frames) == 0:
            return torch.zeros(3, 224, 224), torch.tensor(-1, dtype=torch.float32)

        frames = torch.stack(frames)
        frames = torch.mean(frames, dim=0)
        label = torch.tensor(0.0 if 'Makeup' in video_file else 1.0, dtype=torch.float32)
        return frames, label

# Video Mamba model for Temporal Action Localization
class VideoMambaTALModel(nn.Module):
    def __init__(self, num_classes=1, hidden_dim=256, num_layers=3):
        super(VideoMambaTALModel, self).__init__()
        self.mamba_encoder = VideoMamba(input_dim=3*224*224, hidden_dim=hidden_dim, num_layers=num_layers)
        self.classifier = nn.Linear(hidden_dim, num_classes)
    
    def forward(self, x):
        x = x.view(x.shape[0], -1)
        x = self.mamba_encoder(x)
        return self.classifier(x)

# datasets and dataloaders
def setup_datasets(root_dir):
    video_files = os.listdir(root_dir)
    np.random.shuffle(video_files)
    num_videos = len(video_files)
    train_split = int(0.7 * num_videos)
    val_split = int(0.85 * num_videos)

    train_files = video_files[:train_split]
    val_files = video_files[train_split:val_split]
    test_files = video_files[val_split:]

    train_dataset = VideoDataset(train_files, root_dir)
    val_dataset = VideoDataset(val_files, root_dir)
    test_dataset = VideoDataset(test_files, root_dir)

    return train_dataset, val_dataset, test_dataset

root_dir = './Assessment_Dataset'
train_dataset, val_dataset, test_dataset = setup_datasets(root_dir)

train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=4, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=4, shuffle=True)


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = VideoMambaTALModel().to(device)
optimizer = optim.Adam(model.parameters(), lr=0.001)
criterion = nn.BCEWithLogitsLoss()
accuracy_metric = torchmetrics.Accuracy(task="binary").to(device)

#training
def train_and_validate(model, train_loader, val_loader, criterion, optimizer, num_epochs=10):
    total_correct = 0
    total_samples = 0
    
    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        for frames, labels in train_loader:
            if torch.any(labels == -1):
                continue
            frames = frames.to(device)
            labels = labels.unsqueeze(1).to(device)
            optimizer.zero_grad()
            
            outputs = model(frames)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()

            preds = torch.sigmoid(outputs)
            accuracy_metric.update(preds, labels.int())
            total_correct += (preds.round() == labels).sum().item()
            total_samples += labels.size(0)
        
        train_accuracy = accuracy_metric.compute()
        print(f'Epoch {epoch+1}, Training Accuracy: {train_accuracy:.4f}')
        accuracy_metric.reset()

        model.eval()
        val_loss = 0.0
        for frames, labels in val_loader:
            if torch.any(labels == -1):
                continue
            frames = frames.to(device)
            labels = labels.unsqueeze(1).to(device)

            with torch.no_grad():
                outputs = model(frames)
                loss = criterion(outputs, labels)
                val_loss += loss.item()
                preds = torch.sigmoid(outputs)
                accuracy_metric.update(preds, labels.int())
                total_correct += (preds.round() == labels).sum().item()
                total_samples += labels.size(0)

        val_accuracy = accuracy_metric.compute()
        print(f'Epoch {epoch+1}, Validation Accuracy: {val_accuracy:.4f}')
        accuracy_metric.reset()
    
    overall_accuracy = (total_correct / total_samples) * 100 if total_samples > 0 else 0
    print(f'\n🚀 Overall Model Accuracy: {overall_accuracy:.2f}%')

train_and_validate(model, train_loader, val_loader, criterion, optimizer)


Initialized dataset with 56 videos.
Initialized dataset with 12 videos.
Initialized dataset with 12 videos.
Epoch 1, Training Accuracy: 0.5577
Epoch 1, Validation Accuracy: 0.6667
Epoch 2, Training Accuracy: 0.8654
Epoch 2, Validation Accuracy: 0.8333
Epoch 3, Training Accuracy: 0.9423
Epoch 3, Validation Accuracy: 0.6667
Epoch 4, Training Accuracy: 1.0000
Epoch 4, Validation Accuracy: 0.7500
Epoch 5, Training Accuracy: 0.9423
Epoch 5, Validation Accuracy: 0.7500
Epoch 6, Training Accuracy: 0.9231
Epoch 6, Validation Accuracy: 0.6667
Epoch 7, Training Accuracy: 0.9038
Epoch 7, Validation Accuracy: 0.5000
Epoch 8, Training Accuracy: 0.9231
Epoch 8, Validation Accuracy: 0.8333
Epoch 9, Training Accuracy: 0.9231
Epoch 9, Validation Accuracy: 0.5833
Epoch 10, Training Accuracy: 0.8654
Epoch 10, Validation Accuracy: 0.7500

🚀 Overall Model Accuracy: 85.00%


PRN Model

In [16]:
import os
import cv2
import torch
import torch.nn as nn
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, Dataset
import torch.optim as optim
import torchmetrics
import numpy as np

# PRN Block
class PRNBlock(nn.Module):
    def __init__(self, input_dim, hidden_dim):
        super(PRNBlock, self).__init__()
        self.linear1 = nn.Linear(input_dim, hidden_dim)
        self.activation = nn.ReLU()
        self.linear2 = nn.Linear(hidden_dim, hidden_dim)
    
    def forward(self, x):
        x = self.linear1(x)
        x = self.activation(x)
        x = self.linear2(x)
        return x

# PRN Model
class PRN(nn.Module):
    def __init__(self, input_dim, hidden_dim, num_layers=3):
        super(PRN, self).__init__()
        self.layers = nn.ModuleList([PRNBlock(input_dim if i == 0 else hidden_dim, hidden_dim) for i in range(num_layers)])
        self.output_layer = nn.Linear(hidden_dim, hidden_dim)
    
    def forward(self, x):
        for layer in self.layers:
            x = layer(x)
        return self.output_layer(x)

# Data Preparation
class VideoDataset(Dataset):
    def __init__(self, video_files, root_dir, transform=None):
        self.video_files = video_files
        self.root_dir = root_dir
        self.transform = transform or transforms.Compose([
            transforms.ToPILImage(),
            transforms.Resize((224, 224)),
            transforms.RandomHorizontalFlip(),
            transforms.RandomRotation(10),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
        ])
        print(f"Initialized dataset with {len(video_files)} videos.")

    def __len__(self):
        return len(self.video_files)

    def __getitem__(self, idx):
        video_file = self.video_files[idx]
        video_path = os.path.join(self.root_dir, video_file)
        cap = cv2.VideoCapture(video_path)
        frames = []

        while True:
            ret, frame = cap.read()
            if not ret:
                break
            frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            frame = self.transform(frame)
            frames.append(frame)

        cap.release()

        if len(frames) == 0:
            return torch.zeros(3, 224, 224), torch.tensor(-1, dtype=torch.float32)

        frames = torch.stack(frames)
        frames = torch.mean(frames, dim=0)
        label = torch.tensor(0.0 if 'Makeup' in video_file else 1.0, dtype=torch.float32)
        return frames, label

# PRN model for Temporal Action Localization
class PRNTALModel(nn.Module):
    def __init__(self, num_classes=1, hidden_dim=256, num_layers=3):
        super(PRNTALModel, self).__init__()
        self.prn_encoder = PRN(input_dim=3*224*224, hidden_dim=hidden_dim, num_layers=num_layers)
        self.classifier = nn.Linear(hidden_dim, num_classes)
    
    def forward(self, x):
        x = x.view(x.shape[0], -1)
        x = self.prn_encoder(x)
        return self.classifier(x)

# datasets and dataloaders
def setup_datasets(root_dir):
    video_files = os.listdir(root_dir)
    np.random.shuffle(video_files)
    num_videos = len(video_files)
    train_split = int(0.7 * num_videos)
    val_split = int(0.85 * num_videos)

    train_files = video_files[:train_split]
    val_files = video_files[train_split:val_split]
    test_files = video_files[val_split:]

    train_dataset = VideoDataset(train_files, root_dir)
    val_dataset = VideoDataset(val_files, root_dir)
    test_dataset = VideoDataset(test_files, root_dir)

    return train_dataset, val_dataset, test_dataset

root_dir = './Assessment_Dataset'
train_dataset, val_dataset, test_dataset = setup_datasets(root_dir)

train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=4, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=4, shuffle=True)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = PRNTALModel().to(device)
optimizer = optim.Adam(model.parameters(), lr=0.001)
criterion = nn.BCEWithLogitsLoss()
accuracy_metric = torchmetrics.Accuracy(task="binary").to(device)

#training
def train_and_validate(model, train_loader, val_loader, criterion, optimizer, num_epochs=10):
    total_correct = 0
    total_samples = 0
    
    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        for frames, labels in train_loader:
            if torch.any(labels == -1):
                continue
            frames = frames.to(device)
            labels = labels.unsqueeze(1).to(device)
            optimizer.zero_grad()
            
            outputs = model(frames)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()

            preds = torch.sigmoid(outputs)
            accuracy_metric.update(preds, labels.int())
            total_correct += (preds.round() == labels).sum().item()
            total_samples += labels.size(0)
        
        train_accuracy = accuracy_metric.compute()
        print(f'Epoch {epoch+1}, Training Accuracy: {train_accuracy:.4f}')
        accuracy_metric.reset()

        model.eval()
        val_loss = 0.0
        for frames, labels in val_loader:
            if torch.any(labels == -1):
                continue
            frames = frames.to(device)
            labels = labels.unsqueeze(1).to(device)

            with torch.no_grad():
                outputs = model(frames)
                loss = criterion(outputs, labels)
                val_loss += loss.item()
                preds = torch.sigmoid(outputs)
                accuracy_metric.update(preds, labels.int())
                total_correct += (preds.round() == labels).sum().item()
                total_samples += labels.size(0)

        val_accuracy = accuracy_metric.compute()
        print(f'Epoch {epoch+1}, Validation Accuracy: {val_accuracy:.4f}')
        accuracy_metric.reset()
    
    overall_accuracy = (total_correct / total_samples) * 100 if total_samples > 0 else 0
    print(f'\n🚀 Overall Model Accuracy: {overall_accuracy:.2f}%')

train_and_validate(model, train_loader, val_loader, criterion, optimizer)


Initialized dataset with 56 videos.
Initialized dataset with 12 videos.
Initialized dataset with 12 videos.
Epoch 1, Training Accuracy: 0.6538
Epoch 1, Validation Accuracy: 0.5000
Epoch 2, Training Accuracy: 0.7115
Epoch 2, Validation Accuracy: 0.5000
Epoch 3, Training Accuracy: 0.9038
Epoch 3, Validation Accuracy: 0.5000
Epoch 4, Training Accuracy: 0.9231
Epoch 4, Validation Accuracy: 0.6667
Epoch 5, Training Accuracy: 0.8654
Epoch 5, Validation Accuracy: 0.5000
Epoch 6, Training Accuracy: 0.8846
Epoch 6, Validation Accuracy: 0.5000
Epoch 7, Training Accuracy: 0.9615
Epoch 7, Validation Accuracy: 1.0000
Epoch 8, Training Accuracy: 1.0000
Epoch 8, Validation Accuracy: 1.0000
Epoch 9, Training Accuracy: 1.0000
Epoch 9, Validation Accuracy: 1.0000
Epoch 10, Training Accuracy: 1.0000
Epoch 10, Validation Accuracy: 1.0000

🚀 Overall Model Accuracy: 85.78%


Proposed Model

In [18]:
import os
import cv2
import torch
import torch.nn as nn
import torchvision.models as models
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms
import torch.optim as optim
import torchmetrics
import numpy as np

# Data Preparation
class VideoDataset(Dataset):
    def __init__(self, video_files, root_dir, transform=None):
        self.video_files = video_files
        self.root_dir = root_dir
        self.transform = transform or transforms.Compose([
            transforms.ToPILImage(),
            transforms.Resize((224, 224)),
            transforms.RandomHorizontalFlip(),  # Data augmentation
            transforms.RandomRotation(10),      # Random rotation
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
        ])
        print(f"Initialized dataset with {len(video_files)} videos.")

    def __len__(self):
        return len(self.video_files)

    def __getitem__(self, idx):
        video_file = self.video_files[idx]
        video_path = os.path.join(self.root_dir, video_file)
        cap = cv2.VideoCapture(video_path)
        frames = []
        while True:
            ret, frame = cap.read()
            if not ret:
                break
            frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            frame = self.transform(frame)
            frames.append(frame)
        cap.release()
        
        if len(frames) == 0:
            return None  # Return None if no frames were found

        frames = torch.stack(frames)  # Stack frames into tensor
        frames = torch.mean(frames, dim=0, keepdim=True)  # Take average across time
        
        label = 0 if 'Makeup' in video_file else 1  # Assign binary label
        return frames.squeeze(0), torch.tensor(label, dtype=torch.float32)

# the model
class BabyCrawlingModel(nn.Module):
    def __init__(self):
        super(BabyCrawlingModel, self).__init__()
        self.feature_extractor = models.mobilenet_v2(pretrained=True).features
        self.adaptive_pool = nn.AdaptiveAvgPool2d((1, 1))
        self.temporal_model = nn.Linear(1280, 512)
        self.classifier = nn.Sequential(
            nn.Dropout(0.5),  
            nn.Linear(512, 1)  
        )

    def forward(self, x):
        x = self.feature_extractor(x)
        x = self.adaptive_pool(x)
        x = x.view(x.size(0), -1)
        x = self.temporal_model(x)
        x = self.classifier(x)
        return x

# datasets and dataloaders
def setup_datasets(root_dir):
    video_files = os.listdir(root_dir)
    np.random.shuffle(video_files)
    num_videos = len(video_files)
    train_split = int(0.7 * num_videos)
    val_split = int(0.85 * num_videos)

    train_files = video_files[:train_split]
    val_files = video_files[train_split:val_split]
    test_files = video_files[val_split:]

    train_dataset = VideoDataset(train_files, root_dir)
    val_dataset = VideoDataset(val_files, root_dir)
    test_dataset = VideoDataset(test_files, root_dir)
    
    # Filter out None values (invalid samples)
    train_dataset = [x for x in train_dataset if x is not None]
    val_dataset = [x for x in val_dataset if x is not None]
    test_dataset = [x for x in test_dataset if x is not None]

    return train_dataset, val_dataset, test_dataset

root_dir = './Assessment_Dataset'
train_dataset, val_dataset, test_dataset = setup_datasets(root_dir)

train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True)  # Increased batch size
val_loader = DataLoader(val_dataset, batch_size=4, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=4, shuffle=True)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = BabyCrawlingModel().to(device)
optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-5)
criterion = nn.BCEWithLogitsLoss()

accuracy_metric = torchmetrics.Accuracy(task="binary").to(device)

# Training 
def train_and_validate(model, train_loader, val_loader, criterion, optimizer, num_epochs=10):
    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        for frames, labels in train_loader:
            if frames is None:  
                continue
            frames = frames.to(device)
            labels = labels.float().unsqueeze(1).to(device) 
            optimizer.zero_grad()
            outputs = model(frames)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
            accuracy_metric.update(torch.sigmoid(outputs), labels.int())

    
        train_accuracy = accuracy_metric.compute()
        print(f'Epoch {epoch+1}, Training Accuracy: {train_accuracy:.4f}')
        accuracy_metric.reset()

        
        model.eval()
        val_loss = 0.0
        for frames, labels in val_loader:
            if frames is None:  
                continue
            frames = frames.to(device)
            labels = labels.float().unsqueeze(1).to(device)
            with torch.no_grad():
                outputs = model(frames)
                loss = criterion(outputs, labels)
                val_loss += loss.item()

                
                accuracy_metric.update(torch.sigmoid(outputs), labels.int())

        
        val_accuracy = accuracy_metric.compute()
        print(f'Epoch {epoch+1}, Validation Accuracy: {val_accuracy:.4f}')
        accuracy_metric.reset()

train_and_validate(model, train_loader, val_loader, criterion, optimizer)

def evaluate_model(model, test_loader):
    model.eval()
    accuracy_metric.reset()
    
    total_correct = 0  
    total_samples = 0  
    
    for frames, labels in test_loader:
        if frames is None:
            continue
        frames = frames.to(device)
        labels = labels.float().unsqueeze(1).to(device)
        
        with torch.no_grad():
            outputs = model(frames)
            preds = torch.sigmoid(outputs).round()  
            
            total_correct += (preds == labels).sum().item()  
            total_samples += labels.size(0)  
    
    overall_accuracy = (total_correct / total_samples) * 100 if total_samples > 0 else 0
    print(f'\n🚀 Overall Model Accuracy: {overall_accuracy:.2f}%')

evaluate_model(model, test_loader)


Initialized dataset with 56 videos.
Initialized dataset with 12 videos.
Initialized dataset with 12 videos.




Epoch 1, Training Accuracy: 0.7273
Epoch 1, Validation Accuracy: 0.9167
Epoch 2, Training Accuracy: 0.8364
Epoch 2, Validation Accuracy: 0.9167
Epoch 3, Training Accuracy: 0.8727
Epoch 3, Validation Accuracy: 0.7500
Epoch 4, Training Accuracy: 0.9455
Epoch 4, Validation Accuracy: 1.0000
Epoch 5, Training Accuracy: 0.9273
Epoch 5, Validation Accuracy: 0.6667
Epoch 6, Training Accuracy: 0.8364
Epoch 6, Validation Accuracy: 0.9167
Epoch 7, Training Accuracy: 0.8727
Epoch 7, Validation Accuracy: 1.0000
Epoch 8, Training Accuracy: 0.9091
Epoch 8, Validation Accuracy: 1.0000
Epoch 9, Training Accuracy: 0.9455
Epoch 9, Validation Accuracy: 1.0000
Epoch 10, Training Accuracy: 0.9636
Epoch 10, Validation Accuracy: 1.0000

🚀 Overall Model Accuracy: 100.00%


Deployment

In [None]:
import os
import cv2
import torch
import torch.nn as nn
import torchvision.models as models
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms
import torch.optim as optim
import torchmetrics
import numpy as np
import threading
import time
import queue
from flask import Flask, Response, render_template
from flask_socketio import SocketIO
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
import io
import base64

# Define the model
class BabyCrawlingModel(nn.Module):
    def __init__(self):
        super(BabyCrawlingModel, self).__init__()
        self.feature_extractor = models.mobilenet_v2(pretrained=True).features
        self.adaptive_pool = nn.AdaptiveAvgPool2d((1, 1))
        self.temporal_model = nn.Linear(1280, 512)
        self.classifier = nn.Sequential(
            nn.Dropout(0.5),
            nn.Linear(512, 1)
        )

    def forward(self, x):
        x = self.feature_extractor(x)
        x = self.adaptive_pool(x)
        x = x.view(x.size(0), -1)
        x = self.temporal_model(x)
        x = self.classifier(x)
        return x

# Initialize Flask app
app = Flask(__name__)
socketio = SocketIO(app)

# Video source
VIDEO_SOURCE = 0  # Change to video file path if needed
frame_queue = queue.Queue()
activity_data = {"frame_count": [], "detections": []}
frame_counter = 0

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = BabyCrawlingModel().to(device)
model.eval()

def generate_frames():
    global frame_counter
    cap = cv2.VideoCapture(VIDEO_SOURCE)
    while True:
        success, frame = cap.read()
        if not success:
            break
        frame_counter += 1
        frame = cv2.resize(frame, (224, 224))
        frame_queue.put(frame)
        _, buffer = cv2.imencode(".jpg", frame)
        frame_bytes = buffer.tobytes()
        yield (b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' + frame_bytes + b'\r\n')
    cap.release()

@app.route('/video_feed')
def video_feed():
    return Response(generate_frames(), mimetype='multipart/x-mixed-replace; boundary=frame')

@app.route('/')
def index():
    return render_template("index.html")

def process_video():
    global frame_counter
    while True:
        if not frame_queue.empty():
            frame = frame_queue.get()
            frame_tensor = torch.tensor(frame, dtype=torch.float32).permute(2, 0, 1).unsqueeze(0).to(device) / 255.0
            with torch.no_grad():
                output = model(frame_tensor)
                prediction = torch.sigmoid(output).cpu().numpy()[0][0]
            activity_data["frame_count"].append(frame_counter)
            activity_data["detections"].append(prediction)
            socketio.emit('update_chart', {"frame": frame_counter, "prediction": prediction, "graph": generate_graph()})
            time.sleep(0.1)

def generate_graph():
    plt.clf()
    if len(activity_data["frame_count"]) > 0:
        plt.plot(activity_data["frame_count"], activity_data["detections"], marker="o", linestyle="-")
        plt.xlabel("Frame Number")
        plt.ylabel("Prediction Score")
        plt.title("Real-Time Activity Analytics")
        plt.ylim([0, 1])
    buf = io.BytesIO()
    plt.savefig(buf, format="png")
    buf.seek(0)
    return base64.b64encode(buf.getvalue()).decode('utf-8')

@app.route('/graph_feed')
def graph_feed():
    return f'<img src="data:image/png;base64,{generate_graph()}" width="50%">'

if __name__ == "__main__":
    threading.Thread(target=process_video, daemon=True).start()
    socketio.run(app, debug=True, use_reloader=False, port=5000, allow_unsafe_werkzeug=True)


Werkzeug appears to be used in a production deployment. Consider switching to a production web server instead.


 * Serving Flask app '__main__'
 * Debug mode: on


 * Running on http://127.0.0.1:5000
Press CTRL+C to quit
