In [None]:
!pip install -q timm

[0m

In [None]:
import os
import sys
import pandas as pd
import numpy as np
import torch
from torch.utils.data import DataLoader as DataLoader
from torch.utils.data import Dataset as Dataset
import cv2
import timm

import albumentations as A
from albumentations.pytorch import ToTensorV2
from tqdm import tqdm
from torch.optim.lr_scheduler import ReduceLROnPlateau

from sklearn.metrics import roc_auc_score
from sklearn.metrics import roc_curve
import math
import gc

In [1]:
# TO DO
ckpt_path = # ĐƯỜNG DẪN CKPT BEST
ckpt = torch.load(ckpt_path)
ckpt.keys()

SyntaxError: ignored

In [None]:
# TO DO
backbone = 'darknet53'
PUBLIC_TEST_PATH = # ĐƯỜNG DẪN FOLDER PUBLIC TEST

In [None]:
# create sample submission
submission = pd.DataFrame()
fname_lst = []
public_test_files = os.listdir(PUBLIC_TEST_PATH)
for file in public_test_files:
    if file.endswith('.mp4'):
        fname_lst.append(file)
        
submission['fname'] = fname_lst

In [None]:
# config
DEVICE = torch.device('cuda:0')
EPOCHS = 50
FOLD_LST = [0,1,2,3,4]
# DIM = (320, 320)
DIM = (384, 384)
TRAIN_BATCH_SIZE = 16
VALID_BATCH_SIZE = 2 * TRAIN_BATCH_SIZE
LR = 1e-4
SAMPLE = None

valid_transform = A.Compose(
    [
        A.Resize(DIM[0], DIM[1], always_apply=True),
        A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
        ToTensorV2()
    ]
)

In [None]:
class LivenessDataset(Dataset):
    def __init__(self, df, video_dir, take_frame = 5, transform=None):
        self.df = df.reset_index(drop = True)
        self.take_frame = take_frame
        self.video_dir = video_dir
        self.transform = transform

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        fname = self.df.iloc[idx]['fname']
        video_path = os.path.join(self.video_dir, fname)
        video = cv2.VideoCapture(video_path)
        image_lst = []
        frame_number = 0
        length = int(video.get(cv2.CAP_PROP_FRAME_COUNT))
        frame_step = length // self.take_frame
        while video.isOpened():
            ret, frame = video.read()
            if ret:
                frame_number += 1
                if frame is not None and frame_number % frame_step == 0:
                    if self.transform is not None:
                        frame = self.transform(image=frame)["image"]
                    image_lst.append(frame)
            else:
                break
        video.release()
        image_lst = image_lst[:self.take_frame]
        return torch.stack(image_lst, axis =0)

In [None]:
class LivenessModel(torch.nn.Module):
    def __init__(self, pretrained_name = 'resnet50'):
        super(LivenessModel, self).__init__()
        self.backbone = timm.create_model(pretrained_name, pretrained=None)
        if pretrained_name == 'resnet50':
            self.in_feats = self.backbone.fc.in_features
            self.backbone.fc = torch.nn.Identity()
        if pretrained_name == 'darknet53':
            self.in_feats = self.backbone.head.fc.in_features
            self.backbone.head.fc = torch.nn.Identity()
    
        self.lstm = torch.nn.LSTM(self.in_feats, self.in_feats, 2,
                                  bidirectional = True, dropout = 0.3, batch_first = True)
        self.linear = torch.nn.Linear(self.in_feats * 2, 1)
    def forward(self, x):
        b, f, c, h, w = x.shape
        x = torch.reshape(x, (b * f, c, h, w))
        x = self.backbone(x)
        x = torch.reshape(x, (b, f, self.in_feats))
        output, (h, c) = self.lstm(x)
        x = output[:,-1,:]
        x = self.linear(x)
        return x

In [None]:
pbl_test_dataset = LivenessDataset(submission, PUBLIC_TEST_PATH, take_frame = 5, transform = valid_transform)

In [None]:
def infer_fn(model, dataloader, device):
    model.eval()
    pred_lst = []
    with torch.no_grad():
        for i, batch in tqdm(enumerate(dataloader)):
            inp = batch
            inp = inp.to(device)
            output = model(inp)
            
            np_output = output.detach().cpu().numpy()
            pred_lst.append(np_output)
            
            del inp, output, np_output
            torch.cuda.empty_cache()
            gc.collect()
            
    pred = np.concatenate(pred_lst, axis = 0)
    return pred

In [None]:
test_dataloader = DataLoader(pbl_test_dataset, batch_size=VALID_BATCH_SIZE, shuffle=False)
ckpt = torch.load(ckpt_path)
state_dict = ckpt['state_dict']
best_eer = ckpt['eer']
print(best_eer)
model = LivenessModel(backbone)
model.load_state_dict(state_dict)
model = model.to(DEVICE)

0.017543859649122806


In [None]:
pred = infer_fn(model, test_dataloader, DEVICE)
pred.shape

12it [06:09, 30.83s/it]


(360, 1)

In [None]:
def sigmoid(x):
    return 1 / (1 + np.exp(-x))

In [None]:
pred = sigmoid(pred)

In [None]:
submission['liveness_score'] = pred
submission.to_csv('./submission.csv' ,index = False)

In [None]:
submission.head(5)

Unnamed: 0,fname,liveness_score
0,123.mp4,0.990104
1,479.mp4,0.979466
2,660.mp4,0.023028
3,1327.mp4,0.030343
4,410.mp4,0.999438
