In [None]:
!pip install PyAV

In [None]:
from google.colab import drive
import sys

drive.mount('/content/drive')

sys.path.append('/content/drive/MyDrive/dads-7202-final-project')

ModuleNotFoundError: No module named 'google.colab'

In [None]:
import os
import torch
from torch import nn
from torchvision.io import read_video
from torchvision.datasets import VisionDataset
from torch.utils.data import DataLoader
from torchvision.models.video import mvit_v2_s,MViT_V2_S_Weights
from torchvision.models.video import swin3d_t,Swin3D_T_Weights
from torchvision.models.video import r2plus1d_18,R2Plus1D_18_Weights
# from custom_dataset import CustomVidDataset
import pandas as pd
import numpy as np
from sklearn.metrics import  accuracy_score,precision_score,recall_score,f1_score,average_precision_score
from sklearn.preprocessing import MultiLabelBinarizer
import gc
import random

In [None]:
# fix seed
torch.manual_seed(12)
random.seed(12)
np.random.seed(12)

In [None]:
class CustomVidDataset(VisionDataset):
    def __init__(self, annotations_file, vid_dir, transform= None, target_transform=None):
        self.vid_labels = pd.read_csv(annotations_file)
        self.vid_dir = vid_dir
        self.transform = transform
        self.target_transform = target_transform

    def __len__(self):
        return len(self.vid_labels)

    def __getitem__(self, idx):

        vid_path = os.path.join(self.vid_dir, self.vid_labels.iloc[idx, 0])
        vid,_,_ = read_video(vid_path,pts_unit='sec',end_pts=10.0,output_format='TCHW')
        label = self.vid_labels.iloc[idx, 5:17]
        array_label = label.to_numpy()
        array_label = array_label.astype(int)
        trans_vid = vid[:16] # slice only first 16 frames
        if self.transform:
            trans_vid = self.transform(trans_vid)
        if self.target_transform:
            label = self.target_transform(array_label)
        return trans_vid, array_label

In [None]:
#Create Transform for preprocessing
transforms_resnet = R2Plus1D_18_Weights.KINETICS400_V1.transforms()

In [None]:
#Custom Dataset by Transform (Preprocess)
PATH = '/content/drive/MyDrive/dads-7202-final-project/'

def custom_data(model_type, transform):
    train_data = CustomVidDataset(PATH + 'data/train_label.csv', PATH + 'data/train', transform=transform)
    valid_data = CustomVidDataset(PATH + 'data/validate_label.csv', PATH + 'data/validate', transform=transform)
    test_data = CustomVidDataset(PATH + 'data/test_label.csv', PATH + 'data/test', transform=transform)
    return train_data, valid_data, test_data

train_data_resnet, valid_data_resnet, test_data_resnet = custom_data('resnet', transforms_resnet)


In [None]:
#Create Data Loader
def create_data_loader(dataset, batch_size=16, shuffle=True, num_workers=4):
    return DataLoader(
        dataset,
        batch_size=batch_size,
        shuffle=shuffle,
        num_workers=num_workers
    )

train_dataset_resnet = create_data_loader(train_data_resnet)
valid_dataset_resnet = create_data_loader(valid_data_resnet)
test_dataset_resnet = create_data_loader(test_data_resnet)

In [None]:
# Get cpu, gpu or mps device for training.
device = (
    "cuda"
    if torch.cuda.is_available()
    else "mps"
    if torch.backends.mps.is_available()
    else "cpu"
)
print(f"Using {device} device")

Using cuda device


In [None]:
#Set up Model
model_resnet = r2plus1d_18(weights="KINETICS400_V1")
for param in model_resnet.parameters():
    param.requires_grad = False

for param in model_resnet.layer4.parameters():
    param.requires_grad = True

model_resnet.head = nn.Sequential(
            nn.Dropout(0.5),
            nn.Linear(400,12),
            nn.Sigmoid()
        )
print(model_resnet)

VideoResNet(
  (stem): R2Plus1dStem(
    (0): Conv3d(3, 45, kernel_size=(1, 7, 7), stride=(1, 2, 2), padding=(0, 3, 3), bias=False)
    (1): BatchNorm3d(45, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU(inplace=True)
    (3): Conv3d(45, 64, kernel_size=(3, 1, 1), stride=(1, 1, 1), padding=(1, 0, 0), bias=False)
    (4): BatchNorm3d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (5): ReLU(inplace=True)
  )
  (layer1): Sequential(
    (0): BasicBlock(
      (conv1): Sequential(
        (0): Conv2Plus1D(
          (0): Conv3d(64, 144, kernel_size=(1, 3, 3), stride=(1, 1, 1), padding=(0, 1, 1), bias=False)
          (1): BatchNorm3d(144, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
          (2): ReLU(inplace=True)
          (3): Conv3d(144, 64, kernel_size=(3, 1, 1), stride=(1, 1, 1), padding=(1, 0, 0), bias=False)
        )
        (1): BatchNorm3d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=Tru

In [None]:
model_resnet = model_resnet.to(device)

In [None]:
#Create Loss Function and Optimizer
loss_fn = nn.BCELoss()

def create_optimizer(model, learning_rate=1e-3):
    return torch.optim.Adam(model.parameters(), lr=learning_rate)

optimizer_resnet = create_optimizer(model_resnet)


In [None]:
def scoring(y_true,prediction):

    acc = accuracy_score(y_true,prediction)

    f1s = f1_score(y_true,prediction,average = 'samples', zero_division=0)

    rec_s = recall_score(y_true,prediction,average = 'samples', zero_division=0)

    preci_s = precision_score(y_true,prediction,average = 'samples', zero_division=0)

    ap_s = average_precision_score(y_true,prediction,average = 'samples')
    return acc,f1s,rec_s,preci_s,ap_s

In [None]:
def train(train_dataset,valid_dataset, model, loss_fn, optimizer):

    num_label = 12
    class_label = list(range(num_label))
    label_binarizer = MultiLabelBinarizer(classes=class_label)

    model.train()

    train_loss = 0.
    valid_loss = 0.
    train_y_true_lst = []
    train_y_pred_lst = []

    valid_y_true_lst = []
    valid_y_pred_lst = []

    for data in train_dataset:
        # print(torch.cuda.memory_summary(device=None, abbreviated=False))
        X, y = data
        X, y = X.to(device), y.to(device)

        #  clear the grad
        optimizer.zero_grad()

        # predict
        pred = model(X)
        # compute loss
        loss = loss_fn(pred, y.float())

        # Backpropagation
        loss.backward()
        # update new weight
        optimizer.step()

        round_pred = np.round(pred.detach().cpu())

        train_loss += loss.item()*X.size(0)
        train_y_true_lst.append(y.cpu())
        train_y_pred_lst.append(round_pred)

        # delete locals
        del X
        del y
        del loss
        del pred

        # clean the cache
        if device == "cuda":
            torch.cuda.empty_cache()
        elif device == "mps":
            torch.mps.empty_cache()
        # collect the garbage
        gc.collect()

    # evaluation
    model.eval()
    for data in valid_dataset:
        X, y = data
        X, y = X.to(device), y.to(device)
        # predict
        pred = model(X)
        # compute loss
        loss = loss_fn(pred, y.float())

        round_pred = np.round(pred.detach().cpu())

        valid_loss += loss.item()*X.size(0)
        valid_y_true_lst.append(y.cpu())
        valid_y_pred_lst.append(round_pred)

        # delete locals
        del X
        del y
        del loss
        del pred

        # clean the cache
        if device == "cuda":
            torch.cuda.empty_cache()
        elif device == "mps":
            torch.mps.empty_cache()
        # collect the garbage
        gc.collect()

    # calculate average loss over an epoch
    train_loss = train_loss/len(train_dataset.sampler)
    valid_loss = valid_loss/len(valid_dataset.sampler)

    train_y_pred = torch.cat(train_y_pred_lst, dim=0).numpy()
    train_y_pred = label_binarizer.fit_transform(train_y_pred)

    train_y_true = torch.cat(train_y_true_lst, dim=0).numpy()
    train_y_true = label_binarizer.fit_transform(train_y_true)

    valid_y_pred = torch.cat(valid_y_pred_lst, dim=0).numpy()
    valid_y_pred = label_binarizer.fit_transform(valid_y_pred)

    valid_y_true = torch.cat(valid_y_true_lst, dim=0).numpy()
    valid_y_true = label_binarizer.fit_transform(valid_y_true)

    t_acc,t_f1s,t_rec_s,t_preci_s,t_ap_s = scoring(train_y_true,train_y_pred)
    v_acc,v_f1s,v_rec_s,v_preci_s,v_ap_s = scoring(valid_y_true,valid_y_pred)
    print(f"AVG Training Loss: {round(train_loss,4)}\nAVG Validation Loss: {round(valid_loss,4)}")
    print(f"Accuracy: {round(t_acc,4)}\nF1: {round(t_f1s,4)}\nRecall: {round(t_rec_s,4)}\nPrecision: {round(t_preci_s,4)}\nAP: {round(t_ap_s,4)}")
    return train_loss,valid_loss

In [None]:
def test(test_dataset, model, loss_fn):

  num_label = 12
  class_label = list(range(num_label))
  label_binarizer = MultiLabelBinarizer(classes=class_label)

  test_loss = 0.
  test_y_true_lst = []
  test_y_pred_lst = []

  with torch.no_grad():
    model.eval()
    for data in valid_dataset_mvit:
      X, y = data
      X, y = X.to(device), y.to(device)
      pred = model(X)
      loss = loss_fn(pred, y.float())

      round_pred = np.round(pred.detach().cpu())

      test_loss += loss.item()*X.size(0)
      test_y_true_lst.append(y.cpu())
      test_y_pred_lst.append(round_pred)

    test_loss = test_loss/len(train_dataset_mvit.sampler)

    y_pred = torch.cat(test_y_pred_lst, dim=0).numpy()
    y_pred = label_binarizer.fit_transform(y_pred)

    y_true = torch.cat(test_y_true_lst, dim=0).numpy()
    y_true = label_binarizer.fit_transform(y_true)

    acc,f1s,rec_s,preci_s,ap_s = scoring(y_true,y_pred)

    print(f"AVG Test Loss: {round(test_loss,4)}")
    print(f"Accuracy: {round(acc,4)}\nF1: {round(f1s,4)}\nRecall: {round(rec_s,4)}\nPrecision: {round(preci_s,4)}\nAP: {round(ap_s,4)}")
    return test_loss

In [None]:
#Train Model
all_train_loss_resnet = []
all_valid_loss_resnet = []
epochs = 1
for t in range(epochs):
    print(f"Epoch {t+1}\n-------------------------------")
    train_loss_resnet, valid_loss_resnet = train(train_dataset_resnet,valid_dataset_resnet, model_resnet, loss_fn, optimizer_resnet)
    all_train_loss_resnet.append(train_loss_resnet)
    all_valid_loss_resnet.append(all_valid_loss_resnet)
print("Done!")

In [None]:
#Test Model
test_loss_resnet = test(test_dataset_resnet,model_resnet,loss_fn)