In [None]:
# ! cp './drive/MyDrive/Colab Notebooks/UCF101.rar' .
# ! unrar x './UCF101.rar'

In [11]:
import torch
from torch import nn
from torch.utils.data import DataLoader
import torchvision
from torchvision import io, models, transforms
from torchvision.datasets.folder import make_dataset
from torchvision.datasets.utils import list_dir

from sklearn.model_selection import train_test_split
import os
import re
import time
from tqdm import tqdm
from hmmlearn import hmm
import numpy as np

In [2]:
class UCF101(torchvision.datasets.VisionDataset):
    def __init__(self, root, annotation, transform=None):
        super(UCF101, self).__init__(root)

        extensions = ('avi',)
        self.train = train

        classes = list(sorted(list_dir(root)))
        class_to_idx = {classes[i]: i for i in range(len(classes))}
        self.samples = make_dataset(self.root, class_to_idx, extensions, is_valid_file=None)
        self.classes = classes

        video_list = [x[0] for x in self.samples]
        self.indices = [i for i in range(len(video_list)) if video_list[i] in annotation]
        self.transform = transform

    def __len__(self):
        return len(self.indices)

    def __getitem__(self, idx):
        file_ = self.samples[self.indices[idx]][0]
        video, audio, info = io.read_video(file_, pts_unit='sec')
        label = self.samples[self.indices[idx]][1] - 1
        video = video.transpose(2, 3).transpose(1, 2)

        if self.transform is not None:
            video = self.transform(video)

        return video, audio, label

In [51]:
file_list = {}
class_list = ['ApplyLipstick', 'Archery']
unique_list = {c: [] for c in class_list}

f_re = re.compile('v_([A-Za-z]+)_g([0-9]+)_c([0-9]+).avi')
# Shuffle train/test dataset
for _, dirs, _ in os.walk('./UCF-101'):
    for dir in dirs:
        for _, _, files in os.walk(f'./UCF-101/{dir}'):
            for file in files:
                if dir in class_list:
                    m = f_re.match(file)
                    if m.group(1) not in file_list:
                        file_list[m.group(1)] = {}
                    if m.group(2) not in file_list[m.group(1)]:
                        file_list[m.group(1)][m.group(2)] = []
                    file_list[m.group(1)][m.group(2)].append(m.group(3))
                    unique_list[m.group(1)].append(m.group(2))

training_data = {}
test_list = []

transform = transforms.Compose([
#     transforms.Grayscale(),
#     transforms.Resize((60, 80)), # 240x320 => 60x80
#     nn.Flatten()
])

for cls in class_list:
    unique_class_list = list(set(unique_list[cls]))
    train, test = train_test_split(unique_class_list, random_state=42)

    train_sublists = [[f'./UCF-101/{cls}/v_{cls}_g{file}_c{c}.avi' for c in file_list[cls][file]] for file in train]
    train_list = [item for sublist in train_sublists for item in sublist]

    test_sublists = [[f'./UCF-101/{cls}/v_{cls}_g{file}_c{c}.avi' for c in file_list[cls][file]] for file in test]
    test_list += [item for sublist in test_sublists for item in sublist]

    training_data[cls] = UCF101('./UCF-101', train_list, transform=transform)
    
testing_data = UCF101('./UCF-101', test_list, transform=transform)

In [69]:
class Encoder(nn.Module):
    def __init__(self):
        super(Encoder, self).__init__()
        self.conv = models.resnet18(pretrained=True)
        self.conv.fc = nn.Identity()
        for param in self.conv.parameters():
            param.requires_grad = False

    def forward(self, x):
        return self.conv(x)

In [71]:
test_loader = DataLoader(testing_data, shuffle=False)
hmm_models = {}
training_times = {}

encoder = Encoder()
print(encoder)
x = training_data['ApplyLipstick'][0][0].float()
print(x.size())
print(encoder(x).size())

# for cls in class_list:
#     print(f'Training for Class: {cls}')
#     train_loader = DataLoader(training_data[cls], shuffle=True)
    
#     hmm_models[cls] = hmm.MultinomialHMM(n_components=2, n_iter=200, tol=0.0001)
#     X = []
#     lengths = []
#     for video, audio, label in train_loader:
#         X.append(video.squeeze().numpy())
#         lengths.append(video.size(1))
#     X = np.concatenate(X)
#     lengths = np.array(lengths)
#     print(f'Data Shape: {X.shape}\nLengths Shape: {lengths.shape}')
#     start = time.time()
#     hmm_models[cls].fit(X, lengths)
#     training_times[cls] = time.time() - start
#     print(f'Total Training Time: {training_times[cls]}')
#     print()

Encoder(
  (conv): ResNet(
    (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
    (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (relu): ReLU(inplace=True)
    (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
    (layer1): Sequential(
      (0): BasicBlock(
        (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (relu): ReLU(inplace=True)
        (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      )
      (1): BasicBlock(
        (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_runnin

In [None]:
X = []
lengths = []
labels = []
for video, audio, label in train_loader:
    X.append(video.squeeze().numpy())
    lengths.append(video.size(1))
    labels.append(int(label))
X = np.concatenate(X)
lengths = np.array(lengths)
scores = {}
for cls in class_list:
    scores[cls] = hmm_models[cls].score(X, lengths)

In [None]:
print(training_times)
print(scores)