In [17]:
import os
import shutil
from tqdm import tqdm
import numpy as np
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision
from torchvision import datasets, models, transforms
from torchvision.utils import save_image
from torch.utils.data import Dataset, DataLoader

print(torch.cuda.is_available())

True


### Loading Datasets

loading from `/home/nfs/inf6/data/datasets/kth_actions`

In [18]:
import os
from glob import glob

# Define the split for training and eval sets
train_persons = {'person11', 'person12', 'person13', 'person14', 'person15', 'person16', 'person17', 'person02', 'person03', 'person05', 'person06', 'person07', 'person08', 'person09', 'person10', 'person18'}
val_persons = {'person19', 'person20', 'person21', 'person23', 'person24', 'person25', 'person01', 'person04'}

def get_sequences_with_labels(base_dir, persons):
    sequences = []
    labels = []
    classes = os.listdir(base_dir)
    class_to_idx = {cls_name: idx for idx, cls_name in enumerate(classes)}

    for cls in classes:
        for person in persons:
            person_sequences = glob(os.path.join(base_dir, cls, f'{person}*'))
            sequences.extend(person_sequences)
            labels.extend([class_to_idx[cls]] * len(person_sequences))
    
    return sequences, labels

base_dir = '/home/nfs/inf6/data/datasets/kth_actions/processed'
train_sequences, train_labels = get_sequences_with_labels(base_dir, train_persons)
val_sequences, val_labels = get_sequences_with_labels(base_dir, val_persons)


In [19]:
import cv2
import numpy as np

def load_frames(sequence_path):
    frame_files = sorted(glob(os.path.join(sequence_path, '*.png')))
    frames = [cv2.imread(frame_file) for frame_file in frame_files]
    frames = [cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) for frame in frames]  # Convert to RGB
    frames = [frame / 255.0 for frame in frames]  # Normalize
    return frames

def create_subsequences(frames, subsequence_length=13):
    subsequences = []
    for i in range(len(frames) - subsequence_length + 1):
        subsequences.append(frames[i:i + subsequence_length])
    return subsequences

sequence_path = train_sequences[0]
frames = load_frames(sequence_path)
subsequences = create_subsequences(frames)


In [39]:
class VideoDataset(Dataset):
    def __init__(self, sequences, labels, transform=None, subsequence_length=13):
        self.sequences = sequences
        self.labels = labels
        self.transform = transform
        self.subsequence_length = subsequence_length
        self.data = self.load_data()

    def load_data(self):
        data = []
        for sequence, label in zip(self.sequences, self.labels):
            frames = load_frames(sequence)
            subsequences = create_subsequences(frames, self.subsequence_length)
            for subsequence in subsequences:
                data.append((subsequence, label))
        return data

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        subsequence, label = self.data[idx]
        if self.transform:
            subsequence = [self.transform(frame) for frame in subsequence]
        subsequence = torch.stack([torch.tensor(frame) for frame in subsequence])  # Convert to CxHxW
        label = torch.tensor(label)
        return subsequence, label

# Define transforms if needed
data_transforms = {
    'train': transforms.Compose([
        transforms.ToPILImage(),
        # transforms.RandomResizedCrop(224),
        # transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'val': transforms.Compose([
        transforms.ToPILImage(),
        # transforms.Resize(256),
        # transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
}

# Create dataset
train_dataset = VideoDataset(train_sequences, train_labels, transform=data_transforms['train'])
val_dataset = VideoDataset(val_sequences, val_labels, transform=data_transforms['val'])



Setting up dataloader

In [40]:
BATCH_SIZE = 16

In [41]:
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers = 4)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers = 4)

In [44]:
train_dataset[0][0].size(1)

  subsequence = torch.stack([torch.tensor(frame) for frame in subsequence])  # Convert to CxHxW


3

## Custom LSTM  
Source: https://github.com/piEsposito/pytorch-lstm-by-hand/blob/master/LSTM.ipynb - By Piero Esposito

In [None]:
class CustomLSTMCell(nn.Module):
    def __init__(self, input_size, hidden_size):
        super(CustomLSTMCell, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size

        # Forget gate weights
        self.W_f = nn.Parameter(torch.Tensor(input_size + hidden_size, hidden_size))
        self.b_f = nn.Parameter(torch.Tensor(hidden_size))

        # Input gate weights
        self.W_i = nn.Parameter(torch.Tensor(input_size + hidden_size, hidden_size))
        self.b_i = nn.Parameter(torch.Tensor(hidden_size))

        # Candidate cell state weights
        self.W_c = nn.Parameter(torch.Tensor(input_size + hidden_size, hidden_size))
        self.b_c = nn.Parameter(torch.Tensor(hidden_size))

        # Output gate weights
        self.W_o = nn.Parameter(torch.Tensor(input_size + hidden_size, hidden_size))
        self.b_o = nn.Parameter(torch.Tensor(hidden_size))

        self.reset_parameters()

    def reset_parameters(self):
        nn.init.kaiming_uniform_(self.W_f, a=0.01)
        nn.init.kaiming_uniform_(self.W_i, a=0.01)
        nn.init.kaiming_uniform_(self.W_c, a=0.01)
        nn.init.kaiming_uniform_(self.W_o, a=0.01)
        nn.init.constant_(self.b_f, 0)
        nn.init.constant_(self.b_i, 0)
        nn.init.constant_(self.b_c, 0)
        nn.init.constant_(self.b_o, 0)

    def forward(self, input, layers=1, mode="random"):
        if mode == "zeroes":
            hx = (torch.zeros(input.size(0), self.hidden_size, dtype=input.dtype, device=input.device),
                  torch.zeros(input.size(0), self.hidden_size, dtype=input.dtype, device=input.device))
        elif mode == "Random":
            hx = (torch.rand(input.size(0), self.hidden_size, dtype=input.dtype, device=input.device),
                  torch.rand(input.size(0), self.hidden_size, dtype=input.dtype, device=input.device))


        h_prev, c_prev = hx
        combined = torch.cat((input, h_prev), dim=1)

        # Forget gate
        f = torch.sigmoid(torch.matmul(combined, self.W_f) + self.b_f)

        # Input gate
        i = torch.sigmoid(torch.matmul(combined, self.W_i) + self.b_i)

        # Candidate cell state
        c_hat = torch.tanh(torch.matmul(combined, self.W_c) + self.b_c)

        # Updated cell state
        c = f * c_prev + i * c_hat

        # Output gate
        o = torch.sigmoid(torch.matmul(combined, self.W_o) + self.b_o)

        # Updated hidden state
        h = o * torch.tanh(c)

        return h, c