In [65]:
# This database contains sequences of six classes of actions performed by
# 25 subjects in four different conditions d1-d4

# d1 - Static homogenous background
# d2 - -"-                          + Scale variations
# d3 - -"-                          + Different clothes
# d4 - -"-                          + Lighting variations

# Training:   person11, 12, 13, 14, 15, 16, 17, 18
# Validation: person19, 20, 21, 23, 24, 25, 01, 04
# Test:       person22, 02, 03, 05, 06, 07, 08, 09, 10 

In [95]:
import os
import torch
from torch.utils.data import Dataset
import numpy as np
from PIL import Image
import av
import random as rdm
import torchvision.transforms.v2 as transforms

import os
import torch
import torch.nn.functional as F

from torch.utils.data import Dataset
from torchvision.io import read_video
from torch.utils.data import DataLoader
import torchvision
import multiprocessing
import time
from torch.nn.utils.rnn import pad_sequence
from utils import *

num_cores = multiprocessing.cpu_count()

print("This machine has {} CPU cores.".format(num_cores))

def set_seed(seed=42):
    rdm.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    return seed

set_seed()
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

This machine has 8 CPU cores.
cpu


In [96]:
# training are persons :11, 12, 13, 14, 15, 16, 17, 18
# validation are persons : 19, 20, 21, 23, 24, 25, 01, 04
# test are persons : 22, 02, 03, 05, 06, 07, 08, 09, 10
# https://pytorch.org/vision/main/auto_examples/plot_optical_flow.html#sphx-glr-auto-examples-plot-optical-flow-py

class KTHDataset(Dataset):
    def __init__(self, root, txt_path, transforms=None, subset='train'):
        self.root = root
        self.transforms = transforms
        self.subset = subset
        self.data = []
        self.label_to_int = {"boxing": 0, "handclapping": 1, "handwaving": 2,
                             "jogging": 3, "running": 4, "walking": 5}
        self.int_to_label = {v: k for k, v in self.label_to_int.items()}

        with open(txt_path, 'r') as f:
            lines = f.readlines()
            for line in lines:
                video_id, *frame_sequences = line.strip().split(',')
                person_id, action, _ = video_id.split('_')

                person_number = int(person_id.replace("person", ""))
                
                if self._is_in_subset(person_number):
                    for frame_sequence in frame_sequences:
                        start, end = frame_sequence.split('-')
                        self.data.append((video_id, int(start), int(end), self.label_to_int[action]))

    def get_action(self, label):
        return self.int_to_label[label]
    
    def _is_in_subset(self, person_number):
        # check if the person number is in the subset
        if self.subset == 'train':
            return person_number in [11, 12, 13, 14, 15, 16, 17, 18]
        elif self.subset == 'validation':
            return person_number in [19, 20, 21, 23, 24, 25, 1, 4]
        elif self.subset == 'test':
            return person_number in [22, 2, 3, 5, 6, 7, 8, 9, 10]
        else:
            return False
        
    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        video_id, start_frame, end_frame, action = self.data[idx]
        video_path = os.path.join(self.root+'/'+self.get_action(action)+'/', video_id+'_uncomp' + '.avi') 
        video, audio, info = read_video(video_path, start_frame, end_frame, output_format="TCHW")

        if self.transforms:
            video = self.transforms(video)

        return video, action


In [97]:
data_path = './data/KTH'
sequences_path = './data/KTH/sequences.txt'

batch_size = 4
resized = (64, 64)

transform_outer = transforms.Compose([
    transforms.ColorJitter(brightness=0.1, contrast=0.1, saturation=0.1, hue=0.1),
    transforms.RandomHorizontalFlip(),
])

transform_inner = transforms.Compose([
    transforms.Resize(resized, antialias=True),
    transforms.ToImageTensor(),
    transforms.ConvertImageDtype(torch.float32),
    transforms.Normalize([0.5], [0.5])
])

transform_train = transforms.Compose([
    transform_outer,
    transform_inner
])

transform_test = transforms.Compose([
    transform_inner
])

def collate_fn(batch):
    videos, labels = zip(*batch)
    # Pad videos in the batch
    videos = pad_sequence([torch.from_numpy(np.array(vid)) for vid in videos], batch_first=True)
    labels = torch.tensor(labels, dtype=torch.long)
    return videos, labels


def get_loaders(data_path, sequences_path, batch_size, transform_train, transform_test, num_wokers=2, pin_memory=True, collate_fn=collate_fn):
    train_dataset = KTHDataset(data_path, sequences_path, transforms=transform_train, subset='train')
    val_dataset = KTHDataset(data_path, sequences_path, transforms=transform_test, subset='validation')
    test_dataset = KTHDataset(data_path, sequences_path, transforms=transform_test, subset='test')

    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=num_wokers, pin_memory=pin_memory, collate_fn=collate_fn)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=num_wokers, pin_memory=pin_memory, collate_fn=collate_fn)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=num_wokers, pin_memory=pin_memory, collate_fn=collate_fn)

    return train_loader, val_loader, test_loader

In [98]:
# Test the dataloader
train_set, _, _ = get_loaders(data_path, sequences_path, batch_size, transform_train, transform_test, num_wokers=0, pin_memory=True, collate_fn=collate_fn)
for i, (videos, labels) in enumerate(train_set):
    print('shape of batch :', videos.shape) # should get [batch_size, seq_len, 3, resized[0], resized[1]]
    print(labels.shape) # should get [batch_size], the labels of the videos in the batch
    break



shape of batch : torch.Size([4, 137, 3, 64, 64])
torch.Size([4])


In [100]:
import torch.nn as nn

class Simple3DCNN(nn.Module):
    def __init__(self, num_classes=6, in_channels=3, dropout_prob=0.5):
        super(Simple3DCNN, self).__init__()
        self.conv1 = nn.Conv3d(in_channels, 64, kernel_size=3, stride=1, padding=1)
        self.conv2 = nn.Conv3d(64, 128, kernel_size=3, stride=1, padding=1)
        self.fc1 = nn.Linear(128 * 32 * 32, 1024)
        self.fc2 = nn.Linear(1024, num_classes)  # Assume 6 classes for output
        self.dropout = nn.Dropout(p=dropout_prob)  # Dropout layer

    def forward(self, x):
        # Conv1 + ReLU + MaxPool
        x = F.relu(self.conv1(x))
        x = F.max_pool3d(x, 2, 2)
        
        # Conv2 + ReLU + MaxPool
        x = F.relu(self.conv2(x))
        x = F.max_pool3d(x, 2, 2)
        
        # Flatten the tensor
        x = x.view(x.size(0), -1)
        
        # FC1 + ReLU + Dropout
        x = F.relu(self.fc1(x))
        x = self.dropout(x)
        
        # FC2
        x = self.fc2(x)
        
        return x


In [101]:
class Metric: 
  def __init__(self):
    self.loss_train = []
    self.loss_test = []
    self.acc_train = []
    self.acc_test = []
    
def epoch(data, model, criterion, optimizer=None, cuda=False):
    """
    Make a pass (called epoch in English) on the data `data` with the
     model `model`. Evaluates `criterion` as loss.
     If `optimizer` is given, perform a training epoch using
     the given optimizer, otherwise, perform an evaluation epoch (no backward)
     of the model.
    """

    # indicates whether the model is in eval or train mode (some layers behave differently in train and eval)
    model.eval() if optimizer is None else model.train()

    # objects to store metric averages
    avg_loss = AverageMeter()
    avg_top1_acc = AverageMeter()
    avg_top5_acc = AverageMeter()
    avg_batch_time = AverageMeter()
    global loss_plot

    # we iterate on the batches
    tic = time.time()
    for i, (input, target) in enumerate(data):

        if cuda: # only with GPU, and not with CPU
            input = input.cuda()
            target = target.cuda()

        # forward
        output = model(input)
        loss = criterion(output, target)

        # backward if we are training
        if optimizer:
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        # compute metrics
        prec1, prec5 = accuracy(output, target, topk=(1, 5))
        batch_time = time.time() - tic
        tic = time.time()

        # update
        avg_loss.update(loss.item())
        avg_top1_acc.update(prec1.item())
        avg_top5_acc.update(prec5.item())
        avg_batch_time.update(batch_time)
        if optimizer:
            loss_plot.update(avg_loss.val)
        # print info
        #if i % PRINT_INTERVAL == 0:
        #    print('[{0:s} Batch {1:03d}/{2:03d}]\t'
        #          'Time {batch_time.val:.3f}s ({batch_time.avg:.3f}s)\t'
        #          'Loss {loss.val:.4f} ({loss.avg:.4f})\t'
        #          'Prec@1 {top1.val:5.1f} ({top1.avg:5.1f})\t'
        #          'Prec@5 {top5.val:5.1f} ({top5.avg:5.1f})'.format(
        #           "EVAL" if optimizer is None else "TRAIN", i, len(data), batch_time=avg_batch_time, loss=avg_loss,
        #           top1=avg_top1_acc, top5=avg_top5_acc))
            #if optimizer:
                #loss_plot.plot()

    # Print summary
    #print('\n===============> Total time {batch_time:d}s\t'
    #      'Avg loss {loss.avg:.4f}\t'
    #      'Avg Prec@1 {top1.avg:5.2f} %\t'
    #      'Avg Prec@5 {top5.avg:5.2f} %\n'.format(
    #       batch_time=int(avg_batch_time.sum), loss=avg_loss,
    #       top1=avg_top1_acc, top5=avg_top5_acc))

    return avg_top1_acc, avg_top5_acc, avg_loss

In [102]:
from torch.optim import lr_scheduler
import torch.backends.cudnn as cudnn

def main(batch_size=128, lr=0.1, epochs=5, cuda=False,  dropout_rate=0.1):

    # ex :
    #   {"batch_size": 128, "epochs": 5, "lr": 0.1}
    
    # define model, loss, optim
    model = Simple3DCNN(dropout_prob=0.01)
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.SGD(model.parameters(), lr)

    scheduler = lr_scheduler.StepLR(optimizer, step_size=1, gamma=0.1)

    if cuda: # only with GPU, and not with CPU
        cudnn.benchmark = True
        model = model.cuda()
        criterion = criterion.cuda()

    # Get the data
    train, test, _ = get_loaders(batch_size, cuda)

    # init plots
    listm = []
    #plot = AccLossPlot()
    global loss_plot
    loss_plot = TrainLossPlot()
    #accs_train= []
    #accs_test= []

    # We iterate on the epochs
    for i in range(epochs):
        m = Metric()

        # Train phase
        top1_acc, avg_top5_acc, loss = epoch(train, model, criterion, optimizer, cuda)
        # Update learning rate
        scheduler.step()

        # Test phase
        top1_acc_test, top5_acc_test, loss_test = epoch(test, model, criterion, cuda=cuda)
        # plot
        #plot.update(loss.avg, loss_test.avg, top1_acc.avg, top1_acc_test.avg)
        m.acc_train = top1_acc.avg
        m.acc_test = top1_acc_test.avg
        m.loss_train = loss.avg
        m.loss_test = loss_test.avg
        listm.append(m)
        print( f"********** EPOCH {i+1} acc train={m.acc_train:.2f}%, acc test={m.acc_test:.2f}%, loss train={m.loss_train:.3f}, loss test={m.loss_test:.3f} **********")
        #accs_train.append(top1_acc_ = test)
        #accs_test.append(top1_acc_test)

    return listm