## Part 1: Dataset

In [1]:
import os
import torch
from torch import nn
from torch.utils.data import DataLoader
from torchvision import datasets, transforms, models

In [2]:
# Test if GPU is available
device = torch.accelerator.current_accelerator().type if torch.accelerator.is_available() else "cpu"
print(f"Using {device} device")

Using cuda device


In [3]:
import glob
from PIL import Image
import numpy as np
import torch
from torch.utils.data import Dataset, Subset

class VideoDataset(Dataset):
    def __init__(self, root_dir, seq_len=16, transform=None):
        self.root_dir = root_dir
        self.seq_len = seq_len
        self.transform = transform
        
        # Map class names to numeric labels
        classes = sorted(entry.name for entry in os.scandir(root_dir) if entry.is_dir())
        self.class_to_idx = {cls_name: idx for idx, cls_name in enumerate(classes)}
        
        # Build a list of all videos: each as (video_id, [frame_paths], label)
        self.video_list = []
        for cls_name in classes:
            cls_dir = os.path.join(root_dir, cls_name)
            # Find all images in this class folder
            all_imgs = glob.glob(os.path.join(cls_dir, '*.jpg'))
            # Group images by video ID prefix (e.g. '001' in '001_frame0.jpg')
            groups = {}
            for img_path in all_imgs:
                filename = os.path.basename(img_path)
                # Assume video ID is everything before first underscore
                vid = filename.split('_')[0]
                groups.setdefault(vid, []).append(img_path)
            # Convert each group to (video_id, sorted_paths, label)
            for vid, frame_paths in groups.items():
                # Sort frames by numeric index after 'frame'
                frame_paths.sort(key=lambda x: int(os.path.splitext(x)[0].split('_frame')[-1]))
                label = self.class_to_idx[cls_name]
                self.video_list.append((vid, frame_paths, label))
        
        # Optionally, sort video_list by video ID or shuffle as needed
        self.video_list.sort(key=lambda x: x[0])  # sort by video_id (string)
    
    def __len__(self):
        return len(self.video_list)
    
    def __getitem__(self, idx):
        vid, frame_paths, label = self.video_list[idx]
        selected_paths = frame_paths

        # Load images and apply transforms
        frames = []
        for img_path in selected_paths:
            img = Image.open(img_path).convert('RGB')
            if self.transform:
                img = self.transform(img) 
            else:
                img = transforms.ToTensor()(img)
            frames.append(img)
        # Stack into a tensor of shape (T, C, H, W)
        video_tensor = torch.stack(frames, dim=0).to(device)
        return video_tensor, label

In [4]:
# Define transforms
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
])

# Instantiate dataset
dataset = VideoDataset(root_dir="cnn-lstm-project-5", seq_len=24, transform=transform)

In [5]:
# Defining the training loop and model evaluation functions
from torch.amp import GradScaler, autocast

scaler = GradScaler()

def train_loop(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)
    model.train()
    correct = 0
    for batch, (X, y) in enumerate(dataloader):
        # Compute prediction and loss
        y = y.to(device).float()
        with autocast(device_type="cuda", dtype=torch.float16):
            pred = model(X.to(device))
            loss = loss_fn(pred, y)

        # Backpropagation - modified to use mixed precision
        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()
        optimizer.zero_grad()

        predicted_classes = (pred > 0).float()
        correct += (predicted_classes == y).type(torch.float).sum().item()

        if batch % 100 == 0:
            loss, current = loss.item(), batch * batch_size + len(X)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")
    correct /= size
    print(f"Training Accuracy: {(100*correct):>0.1f}%")
            
def test_loop(dataloader, model, loss_fn):
    model.eval()
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    test_loss, correct = 0, 0
    
    with torch.no_grad():
        for X, y in dataloader:
            y = y.to(device).float()
            # Using mixed precision for testing
            with autocast(device_type="cuda", dtype=torch.float16):
                pred = model(X.to(device))
                test_loss += loss_fn(pred, y).item()

            predicted_classes = (pred > 0).float()
            correct += (predicted_classes == y).type(torch.float).sum().item()

    test_loss /= num_batches
    correct /= size
    print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")


## Model 1: Custom

Last Recorded Training Accuracy: 92.7%

Last Recorded Testing Accuracy: 95.8%

In [6]:
# From Model 1 Attempt 10, I added changes to the model architecture
# I decided to use Conv3D instead of Conv2D, where the additional dimension is the sequence
from torch.nn.utils.parametrizations import weight_norm

class NeuralNetwork_Hyper(nn.Module):
    def __init__(self, lstm_h=64):
        super().__init__()
        self.cnn_layer = nn.Sequential(
            weight_norm(nn.Conv3d(3, 16, 3, bias=False, padding=1)),
            nn.GroupNorm(8, 16),
            nn.ReLU(),
            nn.MaxPool3d(kernel_size=(1, 2, 2), stride=(1, 2, 2)),
            weight_norm(nn.Conv3d(16, 32, 3, bias=False, padding=1, groups=2)),
            nn.GroupNorm(8, 32),
            nn.ReLU(),
            nn.MaxPool3d(kernel_size=(1, 2, 2), stride=(1, 2, 2)),
            weight_norm(nn.Conv3d(32, 32, 3, bias=False, padding=1, groups=2)),
            nn.GroupNorm(8, 32),
            nn.ReLU(),
            nn.MaxPool3d(kernel_size=(1, 2, 2), stride=(1, 2, 2)),
            weight_norm(nn.Conv3d(32, 64, 3, bias=False, padding=1, groups=2)),
            nn.GroupNorm(8, 64),
            nn.ReLU(),
            nn.MaxPool3d(kernel_size=(1, 2, 2), stride=(1, 2, 2)),
            weight_norm(nn.Conv3d(64, 64, 3, bias=False, padding=1, groups=2)),
            nn.GroupNorm(8, 64),
            nn.ReLU(),
            nn.MaxPool3d(kernel_size=(1, 2, 2), stride=(1, 2, 2)),
            weight_norm(nn.Conv3d(64, 128, 3, bias=False, padding=1, groups=2)),
            nn.GroupNorm(8, 128),
            nn.ReLU(),
            nn.MaxPool3d(kernel_size=(1, 2, 2), stride=(1, 2, 2)),
            weight_norm(nn.Conv3d(128, 128, 3, bias=False, padding=1, groups=2)),
            nn.GroupNorm(8, 128),
            nn.ReLU(),
            nn.MaxPool3d(kernel_size=(1, 2, 2), stride=(1, 2, 2)),
            nn.Dropout(0.3),
        )
        self.lstm_layer = nn.LSTM(input_size=128, hidden_size=lstm_h, num_layers=3,
                                  dropout=0.3, batch_first=True)
        self.fc_layer = nn.Linear(lstm_h, 1)

    def forward(self, x):
        # Reshape for CNN input
        batch_size, sequence, channels, height, width = x.size()
        x = x.view(batch_size, channels, sequence, height, width)
        x = self.cnn_layer(x)


        # Reshape for LSTM input
        x = x.view(batch_size, sequence, -1)
        output, (h_n, c_n) = self.lstm_layer(x)
        logits = self.fc_layer(h_n[-1])
        return logits.squeeze(1)

model = NeuralNetwork_Hyper().to(device)
print(model)

NeuralNetwork_Hyper(
  (cnn_layer): Sequential(
    (0): ParametrizedConv3d(
      3, 16, kernel_size=(3, 3, 3), stride=(1, 1, 1), padding=(1, 1, 1), bias=False
      (parametrizations): ModuleDict(
        (weight): ParametrizationList(
          (0): _WeightNorm()
        )
      )
    )
    (1): GroupNorm(8, 16, eps=1e-05, affine=True)
    (2): ReLU()
    (3): MaxPool3d(kernel_size=(1, 2, 2), stride=(1, 2, 2), padding=0, dilation=1, ceil_mode=False)
    (4): ParametrizedConv3d(
      16, 32, kernel_size=(3, 3, 3), stride=(1, 1, 1), padding=(1, 1, 1), groups=2, bias=False
      (parametrizations): ModuleDict(
        (weight): ParametrizationList(
          (0): _WeightNorm()
        )
      )
    )
    (5): GroupNorm(8, 32, eps=1e-05, affine=True)
    (6): ReLU()
    (7): MaxPool3d(kernel_size=(1, 2, 2), stride=(1, 2, 2), padding=0, dilation=1, ceil_mode=False)
    (8): ParametrizedConv3d(
      32, 32, kernel_size=(3, 3, 3), stride=(1, 1, 1), padding=(1, 1, 1), groups=2, bias=False

In [7]:
from torch.optim.lr_scheduler import CosineAnnealingWarmRestarts
from sklearn.model_selection import KFold

# Enabling NVIDIA cuDNN auto-tuner
torch.backends.cudnn.benchmark = True

num_videos = len(dataset)
video_indices = np.arange(num_videos)
kf = KFold(n_splits=5, shuffle=True, random_state=0)
batch_size = 8
num_epochs = 6
learning_rate = 1e-3

for fold, (train_idx, test_idx) in enumerate(kf.split(video_indices)):
    print(f"\n===== Fold {fold+1} =====")
    train_set = Subset(dataset, train_idx)
    test_set = Subset(dataset, test_idx)
    train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True)
    test_loader = DataLoader(test_set, batch_size=batch_size, shuffle=True)

    # Re-initialize optimizer, scheduler for each fold
    loss_fn = nn.BCEWithLogitsLoss()
    optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)
    scheduler = CosineAnnealingWarmRestarts(optimizer, T_0=3)

    for epoch in range(num_epochs):
        print(f"Epoch {epoch+1}/{num_epochs}")
        train_loop(train_loader, model, loss_fn, optimizer)
        test_loop(test_loader, model, loss_fn)
        scheduler.step()

    print(f"Done with fold {fold+1}")

print("KFold cross-validation complete!")


===== Fold 1 =====
Epoch 1/6
loss: 0.694594  [    8/   96]
Training Accuracy: 49.0%
Test Error: 
 Accuracy: 54.2%, Avg loss: 0.693159 

Epoch 2/6
loss: 0.694680  [    8/   96]
Training Accuracy: 41.7%
Test Error: 
 Accuracy: 45.8%, Avg loss: 0.694368 

Epoch 3/6
loss: 0.690033  [    8/   96]
Training Accuracy: 50.0%
Test Error: 
 Accuracy: 45.8%, Avg loss: 0.694965 

Epoch 4/6
loss: 0.697494  [    8/   96]
Training Accuracy: 39.6%
Test Error: 
 Accuracy: 54.2%, Avg loss: 0.692042 

Epoch 5/6
loss: 0.691379  [    8/   96]
Training Accuracy: 52.1%
Test Error: 
 Accuracy: 45.8%, Avg loss: 0.693462 

Epoch 6/6
loss: 0.692429  [    8/   96]
Training Accuracy: 52.1%
Test Error: 
 Accuracy: 45.8%, Avg loss: 0.694408 

Done with fold 1

===== Fold 2 =====
Epoch 1/6
loss: 0.696270  [    8/   96]
Training Accuracy: 44.8%
Test Error: 
 Accuracy: 54.2%, Avg loss: 0.693090 

Epoch 2/6
loss: 0.693749  [    8/   96]
Training Accuracy: 60.4%
Test Error: 
 Accuracy: 50.0%, Avg loss: 0.693605 

Epoch 3

In [8]:
# Saving the model weights as reference
torch.save(model.state_dict(), "model1_new_weights.pth")

## Model 2: EfficientNet

Last Recorded Training Accuracy: 97.9%

Last Recorded Testing Accuracy: 95.8%

In [6]:
class CustomFineTuneModel(nn.Module):
    def __init__(self, base_model):
        super(CustomFineTuneModel, self).__init__()
        self.base_model = base_model
        self.lstm_layer = nn.LSTM(input_size=1280, hidden_size=64, batch_first=True)
        self.final_classifier = nn.Linear(64, 1) 

    def forward(self, x):
        batch_size, sequence, channels, height, width = x.size()
        x = x.view(batch_size * sequence, channels, height, width)
        x = self.base_model(x)
        
        x = x.view(batch_size, sequence, -1)  # Reshape for LSTM input
        output, (h_n, c_n) = self.lstm_layer(x)
        x = self.final_classifier(h_n.squeeze(0))
        return x.squeeze()

# Create an instance of the custom model
model2_ft1 = models.efficientnet_v2_s(weights='DEFAULT')
model2_ft1.to(device)

EfficientNet(
  (features): Sequential(
    (0): Conv2dNormActivation(
      (0): Conv2d(3, 24, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
      (1): BatchNorm2d(24, eps=0.001, momentum=0.1, affine=True, track_running_stats=True)
      (2): SiLU(inplace=True)
    )
    (1): Sequential(
      (0): FusedMBConv(
        (block): Sequential(
          (0): Conv2dNormActivation(
            (0): Conv2d(24, 24, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
            (1): BatchNorm2d(24, eps=0.001, momentum=0.1, affine=True, track_running_stats=True)
            (2): SiLU(inplace=True)
          )
        )
        (stochastic_depth): StochasticDepth(p=0.0, mode=row)
      )
      (1): FusedMBConv(
        (block): Sequential(
          (0): Conv2dNormActivation(
            (0): Conv2d(24, 24, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
            (1): BatchNorm2d(24, eps=0.001, momentum=0.1, affine=True, track_running_stats=True)
  

In [7]:
# For Fixed Feature Extraction

for param in model2_ft1.parameters():
    param.requires_grad = False  # Freeze all layers

In [8]:
model2_ft1.classifier = nn.Identity()  # Remove the final classification layer
model2_ft1 = CustomFineTuneModel(model2_ft1).to(device)

In [None]:
from torch.optim.lr_scheduler import CosineAnnealingWarmRestarts
from sklearn.model_selection import KFold

# Enabling NVIDIA cuDNN auto-tuner
torch.backends.cudnn.benchmark = True

num_videos = len(dataset)
video_indices = np.arange(num_videos)
kf = KFold(n_splits=5, shuffle=True, random_state=0)
batch_size = 8
num_epochs = 6
learning_rate = 1e-3

for fold, (train_idx, test_idx) in enumerate(kf.split(video_indices)):
    print(f"\n===== Fold {fold+1} =====")
    train_set = Subset(dataset, train_idx)
    test_set = Subset(dataset, test_idx)
    train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True)
    test_loader = DataLoader(test_set, batch_size=batch_size, shuffle=True)

    # Re-initialize optimizer, scheduler for each fold
    loss_fn = nn.BCEWithLogitsLoss()
    optimizer = torch.optim.AdamW(model2_ft1.parameters(), lr=learning_rate)
    scheduler = CosineAnnealingWarmRestarts(optimizer, T_0=10, T_mult=2, eta_min=1e-6)

    for epoch in range(num_epochs):
        print(f"Epoch {epoch+1}/{num_epochs}")
        train_loop(train_loader, model2_ft1, loss_fn, optimizer)
        test_loop(test_loader, model2_ft1, loss_fn)
        scheduler.step()

    print(f"Done with fold {fold+1}")

print("KFold cross-validation complete!")

# Training took 17 mins.


===== Fold 1 =====
Epoch 1/6
loss: 0.655065  [    8/   96]
Training Accuracy: 50.0%
Test Error: 
 Accuracy: 45.8%, Avg loss: 0.701501 

Epoch 2/6
loss: 0.680434  [    8/   96]
Training Accuracy: 65.6%
Test Error: 
 Accuracy: 41.7%, Avg loss: 0.716992 

Epoch 3/6
loss: 0.599751  [    8/   96]
Training Accuracy: 71.9%
Test Error: 
 Accuracy: 25.0%, Avg loss: 0.738910 

Epoch 4/6
loss: 0.625346  [    8/   96]
Training Accuracy: 81.2%
Test Error: 
 Accuracy: 41.7%, Avg loss: 0.771145 

Epoch 5/6
loss: 0.547474  [    8/   96]
Training Accuracy: 83.3%
Test Error: 
 Accuracy: 33.3%, Avg loss: 0.842587 

Epoch 6/6
loss: 0.582722  [    8/   96]
Training Accuracy: 87.5%
Test Error: 
 Accuracy: 29.2%, Avg loss: 0.905157 

Done with fold 1

===== Fold 2 =====
Epoch 1/6
loss: 0.455884  [    8/   96]
Training Accuracy: 70.8%
Test Error: 
 Accuracy: 87.5%, Avg loss: 0.430945 

Epoch 2/6
loss: 0.547609  [    8/   96]
Training Accuracy: 86.5%
Test Error: 
 Accuracy: 95.8%, Avg loss: 0.420914 

Epoch 3

In [None]:
# Save the model weights as reference
torch.save(model2_ft1.state_dict(), "model2_new_weights.pth")

## Model 3: ShuffleNet

Last Recorded Training Accuracy: 96.9%

Last Recorded Testing Accuracy: 95.8%

In [11]:
from torchvision import models

model_3 = models.shufflenet_v2_x1_5(weights='DEFAULT') # Using the 1.5x output channel model
model_3.to(device)

model_3

ShuffleNetV2(
  (conv1): Sequential(
    (0): Conv2d(3, 24, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
    (1): BatchNorm2d(24, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU(inplace=True)
  )
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (stage2): Sequential(
    (0): InvertedResidual(
      (branch1): Sequential(
        (0): Conv2d(24, 24, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), groups=24, bias=False)
        (1): BatchNorm2d(24, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (2): Conv2d(24, 88, kernel_size=(1, 1), stride=(1, 1), bias=False)
        (3): BatchNorm2d(88, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (4): ReLU(inplace=True)
      )
      (branch2): Sequential(
        (0): Conv2d(24, 88, kernel_size=(1, 1), stride=(1, 1), bias=False)
        (1): BatchNorm2d(88, eps=1e-05, momentum=0.1, affine=True, track_running_

In [12]:
# FOR Fixed Feature Extraction

for param in model_3.parameters():
    param.requires_grad = False  # Freeze all layers

In [13]:
class CustomFineTuneModel(nn.Module):
    def __init__(self, base_model):
        super(CustomFineTuneModel, self).__init__()
        self.base_model = base_model
        self.lstm_layer = nn.LSTM(input_size=1024, hidden_size=128, batch_first=True) # add dropout
        self.final_classifier = nn.Linear(128, 1) 

    def forward(self, x):
        batch_size, sequence, channels, height, width = x.size()
        x = x.view(batch_size * sequence, channels, height, width)
        x = self.base_model(x)
        
        x = x.view(batch_size, sequence, -1)  # Reshape for LSTM input
        output, (h_n, c_n) = self.lstm_layer(x)
        x = self.final_classifier(h_n[-1])
        return x.squeeze(-1)

# Create an instance of the custom model


model_3.fc = nn.Identity()  # Remove the final classification layer
model3_ft1 = CustomFineTuneModel(model_3).to(device)
# Show the model architecture
model3_ft1

CustomFineTuneModel(
  (base_model): ShuffleNetV2(
    (conv1): Sequential(
      (0): Conv2d(3, 24, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
      (1): BatchNorm2d(24, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (2): ReLU(inplace=True)
    )
    (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
    (stage2): Sequential(
      (0): InvertedResidual(
        (branch1): Sequential(
          (0): Conv2d(24, 24, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), groups=24, bias=False)
          (1): BatchNorm2d(24, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
          (2): Conv2d(24, 88, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (3): BatchNorm2d(88, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
          (4): ReLU(inplace=True)
        )
        (branch2): Sequential(
          (0): Conv2d(24, 88, kernel_size=(1, 1), stride=(1, 1), bias=False)
          

In [None]:
from torch.optim.lr_scheduler import CosineAnnealingWarmRestarts
from sklearn.model_selection import KFold

# Enabling NVIDIA cuDNN auto-tuner
torch.backends.cudnn.benchmark = True

num_videos = len(dataset)
video_indices = np.arange(num_videos)
kf = KFold(n_splits=5, shuffle=True, random_state=0)
batch_size = 8
num_epochs = 6
learning_rate = 1e-4

for fold, (train_idx, test_idx) in enumerate(kf.split(video_indices)):
    print(f"\n===== Fold {fold+1} =====")
    train_set = Subset(dataset, train_idx)
    test_set = Subset(dataset, test_idx)
    train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True)
    test_loader = DataLoader(test_set, batch_size=batch_size, shuffle=True)

    # Re-initialize optimizer, scheduler for each fold
    loss_fn = nn.BCEWithLogitsLoss()
    optimizer = torch.optim.AdamW(model3_ft1.parameters(), lr=learning_rate)
    scheduler = CosineAnnealingWarmRestarts(optimizer, T_0=10, T_mult=2, eta_min=1e-6)

    for epoch in range(num_epochs):
        print(f"Epoch {epoch+1}/{num_epochs}")
        train_loop(train_loader, model3_ft1, loss_fn, optimizer)
        test_loop(test_loader, model3_ft1, loss_fn)
        scheduler.step()

    print(f"Done with fold {fold+1}")

print("KFold cross-validation complete!")

# Training took 16 mins.


===== Fold 1 =====
Epoch 1/6
loss: 0.242297  [    8/   96]
Training Accuracy: 88.5%
Test Error: 
 Accuracy: 95.8%, Avg loss: 0.259191 

Epoch 2/6
loss: 0.318742  [    8/   96]
Training Accuracy: 93.8%
Test Error: 
 Accuracy: 91.7%, Avg loss: 0.245983 

Epoch 3/6
loss: 0.178621  [    8/   96]
Training Accuracy: 88.5%
Test Error: 
 Accuracy: 95.8%, Avg loss: 0.232871 

Epoch 4/6
loss: 0.131113  [    8/   96]
Training Accuracy: 95.8%
Test Error: 
 Accuracy: 87.5%, Avg loss: 0.259693 

Epoch 5/6
loss: 0.423329  [    8/   96]
Training Accuracy: 92.7%
Test Error: 
 Accuracy: 95.8%, Avg loss: 0.242034 

Epoch 6/6
loss: 0.113820  [    8/   96]
Training Accuracy: 94.8%
Test Error: 
 Accuracy: 91.7%, Avg loss: 0.247724 

Done with fold 1

===== Fold 2 =====
Epoch 1/6
loss: 0.202168  [    8/   96]
Training Accuracy: 91.7%
Test Error: 
 Accuracy: 100.0%, Avg loss: 0.139055 

Epoch 2/6
loss: 0.137582  [    8/   96]
Training Accuracy: 93.8%
Test Error: 
 Accuracy: 95.8%, Avg loss: 0.150174 

Epoch 

In [None]:
# Saving the model weights as reference
torch.save(model3_ft1.state_dict(), "model3_new_weights.pth")