In [1]:
import os
import time 
import cv2
import numpy as np
import onnxruntime as rt
import torch
import torch.nn.functional as F
import pandas as pd 
import argparse
from src.data_io import transform as trans
from src.generate_patches import CropImage
from src.model_test import Detection
from src.utility import parse_model_name
import warnings
warnings.filterwarnings('ignore')


In [3]:
def load_model(onnx_path):
    model = rt.InferenceSession(onnx_path, providers=['CUDAExecutionProvider'])
    return model

def predict_onnx(model, img: torch.Tensor) -> np.ndarray:
    input_name = model.get_inputs()[0].name
    output_name = model.get_outputs()[0].name
    pred_onx = model.run([output_name], {input_name: (img.cpu().numpy()).astype(np.float32)})[0]
    return pred_onx

def forward(img, model, device):
    test_transform = trans.Compose(
        [
            trans.ToTensor(),
        ]
    )
    img = test_transform(img)
    img = img.to(device)
    img = img[None,:,:]
    result = predict_onnx(model,img)
    
    result = F.softmax(torch.Tensor(result)).cpu().numpy()
    return result
def predict(image, model_dir):
    image_cropper = CropImage()
    model_test = Detection()
    image_bbox = model_test.get_bbox(image)
    prediction = np.zeros((1, 2))
    for model_name in os.listdir(model_dir):
        model_path = os.path.join(model_dir, model_name)

        h_input, w_input, _, scale = parse_model_name(model_name)
        param = {
            "org_img": image,
            "bbox": image_bbox,
            "scale": scale,
            "out_w": w_input,
            "out_h": h_input,
            "crop": True,
        }
        if scale is None:
            param["crop"] = False
        img = image_cropper.crop(**param)
        model = load_model(model_path)
        prediction += forward(img, model,device=torch.device('cuda'))
    label = np.argmax(prediction)
    score = prediction[0][label] / 2
    if label == 1:
        return score
    else:
        return 1 - score

In [4]:
def post_processing(file_path,model_dir):
    cap = cv2.VideoCapture(file_path)
    ls = []
    c = 0
    while cap.isOpened():
        ret, frame = cap.read()
        try:
            if c % 5 == 0:
                score = predict(frame,model_dir)
                ls.append(score)
            c +=1
        except:
            break
    return ls

In [5]:
data_dir = '/data/'
test_cases = os.listdir(data_dir)
model_dir = '/code/resources/ckpt_onnx/'

In [None]:
all_predicted_time = []
all_result = []
print('preprocessing ...')
for file_name in test_cases:
    t1 = time.time()  
    file_path = os.path.join(data_dir,file_name)
    result = post_processing(file_path,model_dir)
    t2 = time.time()
    predicted_time = int(t2*1000 - t1*1000)
    all_predicted_time.append((file_name, predicted_time))
    all_result.append((file_name,sum(result)/len(result)))
df_1 = pd.DataFrame(all_result,columns=["fname", "liveness_score"])
df_2 = pd.DataFrame(all_predicted_time,columns=["fname", "time_submission"])
os.makedirs('/result',exist_ok=True)
df_1.to_csv("/result/jupyter_submission.csv", index=False, encoding="utf-8", float_format="%.10f")
df_2.to_csv("/result/time_submission.csv", index=False, encoding="utf-8", float_format="%.10f")

print('output will be saved in /result/time_submission.csv')
print('output will be saved in /result/jupyter_submission.csv')