In [20]:
from facenet_pytorch.models.mtcnn import MTCNN
import cv2
import torch
import numpy as np
from matplotlib import pyplot as plt
from torch.utils.data import Dataset
from torch.utils.data.dataloader import DataLoader
import os
from IPython.display import clear_output
import math
from tqdm.notebook import tqdm
import json
import pandas as pd

In [2]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [69]:
data_path = "original_data"
processed_data_path = "processed_data"
split_types = ['train', 'test', 'val']
target_size = (380, 380)
padding_coef = 0.3
frame_step = 30

In [70]:
def get_scale_coef(w):
    if w <= 300:
        scale_coef = 2
    elif w <= 1000:
        scale_coef = 1
    elif w <= 1900:
        scale_coef = 0.5
    else:
        scale_coef = 0.33
    return scale_coef


class VideoDataset(Dataset):
    def __init__(self, data_path, split=None, step=1):
        self.data_path = data_path
        self.split = split
        self.step = step
        
        data = []
        for split in split_types:
            if self.split is not None and self.split != split:
                continue
            for path in os.listdir(os.path.join(data_path, split)):
                if os.path.isdir(os.path.join(data_path, split, path)):
                    for name in os.listdir(os.path.join(data_path, split, path)):
                        data.append({'name':name, 'path':path, 'split':split})
        data = pd.DataFrame(data)
        self.data = data
    
    def __getitem__(self, index):
        name, path, split = self.data.loc[index]
        capture = cv2.VideoCapture(os.path.join(self.data_path, split, path, name))
        frame_count = int(capture.get(cv2.CAP_PROP_FRAME_COUNT))
        frame_w = int(capture.get(cv2.CAP_PROP_FRAME_WIDTH))
        scale_coef = get_scale_coef(frame_w)
        vid = os.path.splitext(name)[0]
        
        frames = []
        scaled_frames = []
        for i in range(frame_count):
            capture.grab()
            success, frame = capture.retrieve()
            if success and i%self.step == 0:
                frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                scaled_frame = cv2.resize(frame, tuple([int(s*scale_coef) for s in frame.shape[1::-1]]))
                frames.append(frame)
                scaled_frames.append(scaled_frame)
        return vid, frames, scaled_frames, scale_coef
        
    def __len__(self):
        return len(self.data)

In [71]:
class FaceDetector():
    def __init__(self, batch_size=16, device=None):
        self.batch_size = batch_size
        self.detector = MTCNN(margin=0,thresholds=[0.85, 0.95, 0.95], device=device)
    
    def detect_faces(self, frames, scale_coef):
        boxes = []
        for i in range(math.ceil(len(frames) / self.batch_size)):
            batch_boxes, *_ = self.detector.detect(frames[i*self.batch_size:(i + 1)*self.batch_size])
            boxes += [(b/scale_coef).astype(int).tolist() if b is not None else None for b in batch_boxes]
        return boxes

In [73]:
def crop_frame(frame, box, padding_coef):
    x_min, y_min, x_max, y_max = box
    w = x_max - x_min
    h = y_max - y_min
    w_p = int(w * padding_coef)
    h_p = int(h * padding_coef)
    crop = frame[max(0, y_min - h_p):y_max + h_p, max(0, x_min - w_p):x_max + w_p]
    return crop

def process_videos(dataset, processed_data_path, face_detector, padding_coef=0.3):
    loader = DataLoader(dataset, collate_fn=lambda x: x)
    crops_dir = os.path.join(processed_data_path, "crops")
    os.makedirs(crops_dir, exist_ok=True)
    
    for item in tqdm(loader):
        vid, frames, scaled_frames, scale_coef = item[0]
        boxes = face_detector.detect_faces(scaled_frames, scale_coef)
        
        out_dir = os.path.join(crops_dir, vid)
        os.makedirs(out_dir, exist_ok=True) 
        for i in range(len(frames)):
            box = boxes[i]
            if box is not None:
                frame = frames[i]
                crop = crop_frame(frame, box[0], padding_coef)
                crop = cv2.resize(crop, target_size)
                crop = cv2.cvtColor(crop, cv2.COLOR_RGB2BGR)
                cv2.imwrite(os.path.join(out_dir, "{}.png".format(i)), crop)

In [74]:
def save_crop_metadata(data_path, processed_data_path, split):
    video_metadata = {}
    for path in os.listdir(os.path.join(data_path, split)):
        if os.path.isdir(os.path.join(data_path, split, path)):
            with open(os.path.join(data_path, split, path, 'metadata.json'), 'r') as f:
                metadata_part = json.load(f)
                video_metadata.update(metadata_part)
    
    crops_metadata = []
    for video_name in video_metadata:
        label, *_ = video_metadata[video_name].values()
        label = True if label == 'REAL' else False
        vid = video_name.split('.')[0]
        crops_path = os.path.join(processed_data_path, 'crops', vid)
        for crop_name in os.listdir(crops_path):
            cid = os.path.splitext(crop_name)[0]
            crops_metadata.append({'vid':vid, 'cid':cid, 'label':label})
    crops_metadata = pd.DataFrame(crops_metadata)
    crops_metadata.to_csv(os.path.join(processed_data_path, '{}.csv'.format(split)),
                          index=False)

In [75]:
dataset = VideoDataset(data_path, step=frame_step)

In [76]:
face_detector = FaceDetector(16, device=device)

In [None]:
process_videos(dataset, processed_data_path, face_detector, padding_coef=padding_coef)

In [77]:
for split in split_types:
    save_crop_metadata(data_path, processed_data_path, split)