In [1]:
import sys
if '../../' not in sys.path:
    sys.path.append('../../')
from Utils.FileOperation import *
from Config import rootDir

import numpy as np
from PIL import Image, ImageDraw
import time
import cv2
from batch_face import RetinaFace
import math
import json

In [2]:
aspect_ratio=1.0
def extract(
        detector,
        data_path,
        output_path,
        json_path=None,
        padding=0,
        smooth_level=0,
        batch_size=200
    ):
    '''
    Parameters:
        detector: RetinaFace detector (from batch_face import RetinaFace)
        data_path: path to video file
        output_path: path to output video file
        json_path: path to output json file if this variable is not None
        padding: number of pixels to pad the bounding box
        smooth_level: number of frames to smooth the bounding box
    '''

    troublesome = False

    original_video = cv2.VideoCapture(data_path)
    cnt = 0
    frames = []
    while original_video.isOpened():
        success, frame = original_video.read()
        if not success:
            break
        cnt += 1
        frames.append(frame)
    original_video.release()

    all_faces = []
    for i in range(math.ceil(len(frames)/batch_size)):
        all_faces.extend(detector(frames[i*batch_size:(i+1)*batch_size]))
    
    warned_zero = False
    h_max = 0
    all_boxes = []
    for face_info in all_faces:
        valid_bounding_boxes = []
        for box, landmarks, score in face_info:
            if score < .95:
                continue
            x_min = int(box[0])
            if x_min < 0:
                x_min = 0
            y_min=int(box[1])
            if y_min < 0:
                y_min = 0
            x_max = int(box[2])
            y_max = int(box[3])
            center = (int((x_min + x_max) // 2), int((y_min + y_max) // 2))
            bbox_width = x_max - x_min + 2 * padding
            bbox_height = max(y_max - y_min + 2 * padding, int(bbox_width * aspect_ratio))

            if bbox_height > h_max:
                h_max = bbox_height

            valid_bounding_boxes.append((center, bbox_height, score))

        if len(valid_bounding_boxes) == 0:
            if not warned_zero:
                warned_zero = True
                print('\n * Warning: 404 Faces Not Found at', len(all_boxes))
            all_boxes.append(None)
            continue
        
        elif len(valid_bounding_boxes) > 1:
            all_boxes.append(valid_bounding_boxes)
            continue

        all_boxes.append(valid_bounding_boxes[0][:2])
    
    # Find the mean bounding box center and height by averaging all valid bounding boxes that only have one face
    mean_center = [0, 0]
    mean_bbox_height = 0
    cnt = 0
    for i in range(len(all_boxes)):
        if all_boxes[i] is None or type(all_boxes[i]) is list:
            continue
        mean_center[0] += all_boxes[i][0][0]
        mean_center[1] += all_boxes[i][0][1]
        mean_bbox_height += all_boxes[i][1]
        cnt += 1
    if cnt != 0:
        mean_center[0] /= cnt
        mean_center[1] /= cnt
        mean_bbox_height /= cnt
    else:
        if all_boxes[0] is None:
            print("\n * Warning: 404 Found no face in the entire video!")
            frame = cv2.cvtColor(frames[0], cv2.COLOR_BGR2RGB)
            frame = Image.fromarray(frame)
            frame.show()
            troublesome = True
        else:
            print("\n * Warning: 415 Found", len(all_boxes[0]), "Faces at 0. \n * When you see this message, it's probably because all frames have multiple faces")
            # Show the first frame with all bounding boxes
            frame = cv2.cvtColor(frames[0], cv2.COLOR_BGR2RGB)
            frame = Image.fromarray(frame)
            draw = ImageDraw.Draw(frame)
            for bbox in all_boxes[0]:
                draw.rectangle(get_bounding_box(bbox[0], bbox[1]), outline='red')
            frame.show()
            troublesome = True
            # This is probably because all frames have multiple faces
            # pick the biggest bounding box on the first frame as the mean bounding box
            for bbox in all_boxes[0]:
                if bbox[1] > mean_bbox_height:
                    mean_bbox_height = bbox[1]
                    mean_center = bbox[0]

    warned_multiple = False
    # Find the most appropriate bounding box for frames that have multiple faces by calculating IoU with mean bounding box
    for i in range(len(all_boxes)):
        if all_boxes[i] is None or type(all_boxes[i]) is tuple:
            continue
        best_bbox = None
        best_iou = 0
        for bbox in all_boxes[i]:
            iou = compute_iou(bbox, (mean_center, mean_bbox_height))
            if iou > best_iou:
                best_iou = iou
                best_bbox = bbox
        if best_bbox is None:
            # If no bounding box has IoU > 0, drop this frame
            if not warned_multiple:
                warned_multiple = True
                print('\n * Warning: 409 Found', len(all_boxes[i]), 'Faces at', i)
                troublesome = True
            # Draw all bounding boxes in red and mean bounding box in green on the frame and use PIL.Image to show it
            frame = cv2.cvtColor(frames[i], cv2.COLOR_BGR2RGB)
            frame = Image.fromarray(frame)
            draw = ImageDraw.Draw(frame)
            for bbox in all_boxes[i]:
                draw.rectangle(get_bounding_box(bbox[0], bbox[1]), outline='red')
            draw.rectangle(get_bounding_box(mean_center, mean_bbox_height), outline='green')
            frame.show()
            all_boxes[i] = None
            continue
        all_boxes[i] = best_bbox

    output_size = (int(h_max // aspect_ratio), h_max)
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    if troublesome:
        output_path_elements = output_path.split('/')
        last_len = len(output_path_elements[-1])
        output_path_elements[-1] = 'troublesome/' + output_path_elements[-1]
        output_path = '/'.join(output_path_elements)
        mkdir(output_path[:-last_len])
    out = cv2.VideoWriter(output_path, fourcc, 30.0, output_size)
    calculated_boxes = []
    for i in range(len(frames)):
        if all_boxes[i] is None:
            calculated_boxes.append(None)
            continue
        frame = frames[i]
        id_0 = max(0, i - smooth_level // 2)
        id_1 = min(len(all_boxes), i + 1 + math.ceil(smooth_level / 2))
        center, bbox_height = smooth_frames(all_boxes[id_0:id_1], i - id_0)
        bbox = get_bounding_box(center, bbox_height)
        calculated_boxes.append(bbox)
        frame = Image.fromarray(frame).crop(bbox).resize(output_size)
        out.write(np.array(frame))
    out.release()

    assert len(calculated_boxes) == len(frames)

    # Save calculated_boxes as json file
    if json_path is not None:
        if troublesome:
            json_path_elements = json_path.split('/')
            last_len = len(json_path_elements[-1])
            json_path_elements[-1] = 'troublesome/' + json_path_elements[-1]
            json_path = '/'.join(json_path_elements)
            mkdir(json_path[:-last_len])
        with open(json_path, 'w') as f:
            json.dump(calculated_boxes, f)


def smooth_frames(bounding_boxes, target_index, iou_threshold=0.5):
    '''
    Smooth bounding boxes using linear interpolation
    Parameters:
        bounding_boxes: List of bounding boxes, length of list means smooth level
        target_index: Targeted frame index
        iou_threshold: IoU threshold to filter out bounding boxes
    Return:
        One smoothed bounding box
    Bounding box structure:
        (center, bbox_height)
    '''

    # Make sure target_index is not None
    if bounding_boxes[target_index] is None:
        return None

    # Filter out bounding boxes equal to None
    invalid_cnt_before_target = 0
    for i in range(target_index):
        if bounding_boxes[i] is None:
            invalid_cnt_before_target += 1
    target_index -= invalid_cnt_before_target
    bounding_boxes = [bbox for bbox in bounding_boxes if bbox is not None]
    
    # Linear interpolation
    # If bounding boxes have no intersection with targeted frame, don't count it
    # Intersection is calculated by IoU
    mean_center = [0, 0]
    mean_bbox_height = 0
    cnt = 0
    for i in range(len(bounding_boxes)):
        bbox = bounding_boxes[i]
        if bbox is None:
            continue
        iou = compute_iou(bbox, bounding_boxes[target_index])
        if iou < iou_threshold:
            continue
        mean_center[0] += bbox[0][0]
        mean_center[1] += bbox[0][1]
        mean_bbox_height += bbox[1]
        cnt += 1
    mean_center[0] /= cnt
    mean_center[1] /= cnt
    mean_bbox_height /= cnt
    return (mean_center, mean_bbox_height)


def get_bounding_box(center, bbox_height):
    '''
    Get bounding box from center and bbox_height
    '''
    bbox_width = bbox_height // aspect_ratio
    x_min = int(center[0] - bbox_width // 2)
    y_min = int(center[1] - bbox_height // 2)
    x_max = int(center[0] + bbox_width // 2)
    y_max = int(center[1] + bbox_height // 2)
    return (x_min, y_min, x_max, y_max)

def compute_iou(bbox0, bbox1):
    '''
    Compute IoU of two bounding boxes
    '''
    x_min0, y_min0, x_max0, y_max0 = get_bounding_box(bbox0[0], bbox0[1])
    x_min1, y_min1, x_max1, y_max1 = get_bounding_box(bbox1[0], bbox1[1])
    x_min = max(x_min0, x_min1)
    y_min = max(y_min0, y_min1)
    x_max = min(x_max0, x_max1)
    y_max = min(y_max0, y_max1)
    intersection = max(0, x_max - x_min) * max(0, y_max - y_min)
    union = (x_max0 - x_min0) * (y_max0 - y_min0) + (x_max1 - x_min1) * (y_max1 - y_min1) - intersection
    return intersection / union

In [None]:
detector = RetinaFace(gpu_id=0)
dataDirs = [
    rootDir + 'cropped_videos/FF++_crop_basic/c23/real/',
    rootDir + 'cropped_videos/FF++_crop_basic/c23/fake/Deepfakes/',
    rootDir + 'cropped_videos/FF++_crop_basic/c23/fake/Face2Face/',
    rootDir + 'cropped_videos/FF++_crop_basic/c23/fake/FaceSwap/',
    rootDir + 'cropped_videos/FF++_crop_basic/c23/fake/NeuralTextures/',
]
for dataDir in dataDirs:
    saveDir = dataDir.replace('_basic', '')
    jsonDir = dataDir.replace('_basic/c23', '/json')
    vidDirs = ls(dataDir)
    savedList = ls(saveDir)
    for vidDir in vidDirs:
        if vidDir in savedList:
            saved = cv2.VideoCapture(saveDir + vidDir)
            saved_frame_count = int(saved.get(cv2.CAP_PROP_FRAME_COUNT))
            saved.release()
            original = cv2.VideoCapture(dataDir + vidDir)
            original_frame_count = int(original.get(cv2.CAP_PROP_FRAME_COUNT))
            original.release()
            if saved_frame_count == original_frame_count:
                continue
        print(time.ctime(time.time()), ':', dataDir + vidDir)
        try:
            extract(
                detector,
                dataDir + vidDir,
                saveDir + vidDir,
                json_path=jsonDir+vidDir[:-4]+'.json',
                padding=18,
                smooth_level=5
            )
        except:
            print("\n>>>>>>>>>> ERROR OCCURED <<<<<<<<<<")

Thu Mar 14 16:39:31 2024 : /home/kyr/GazeForensicsData/cropped_videos/FF++_crop_basic/c23/real/340.mp4


In [3]:
def extract_with_json(
        detector,
        data_path,
        output_path,
        json_path
    ):
    '''
    Parameters:
        data_path: path to video file
        output_path: path to output video file
        json_path: path to the json file of bounding box data
    '''
    original_video = cv2.VideoCapture(data_path)
    frames = []
    while original_video.isOpened():
        success, frame = original_video.read()
        if not success:
            break
        frames.append(frame)
    original_video.release()

    with open(json_path, 'r') as f:
        calculated_boxes = json.load(f) # Data structure: [[x_min, y_min, x_max, y_max], ...]
    
    if len(frames) < len(calculated_boxes):
        # Delete all None in calculated_boxes
        calculated_boxes = [box for box in calculated_boxes if box is not None]

    if len(frames) != len(calculated_boxes):
        print('\n>>> Error: The number of frames and the number of bounding boxes do not match.', len(frames), len(calculated_boxes))
        extract(
            detector,
            data_path,
            output_path,
            json_path=None,
            padding=18,
            smooth_level=5,
            batch_size=400
        )
    
    # Find the max height of the bounding boxes
    max_height = 0
    for bbox in calculated_boxes:
        if bbox is None:
            continue
        height = bbox[3] - bbox[1]
        if height > max_height:
            max_height = height

    # Save the cropped frames into .mp4
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    output_size = (int(max_height / aspect_ratio), max_height)
    out = cv2.VideoWriter(output_path, fourcc, 30, output_size)
    for frame, bbox in zip(frames, calculated_boxes):
        if bbox is None:
            continue
        frame = Image.fromarray(frame).crop(bbox).resize(output_size)
        out.write(np.array(frame))
    out.release()

Before next step, move all files in 'troublesome' folder to its parent directory.

In [4]:
detector = RetinaFace(gpu_id=0)
dataDirs = [
    rootDir + 'cropped_videos/FF++_crop_basic/c40/real/',
    rootDir + 'cropped_videos/FF++_crop_basic/c40/fake/Deepfakes/',
    rootDir + 'cropped_videos/FF++_crop_basic/c40/fake/Face2Face/',
    rootDir + 'cropped_videos/FF++_crop_basic/c40/fake/FaceSwap/',
    rootDir + 'cropped_videos/FF++_crop_basic/c40/fake/NeuralTextures/',
]
for dataDir in dataDirs:
    saveDir = dataDir.replace('_basic', '')
    vidDirs = ls(dataDir)
    savedList = ls(saveDir)
    jsonsDir = saveDir.replace('c40', 'json')
    for vidDir in vidDirs:
        if vidDir in savedList:
            saved = cv2.VideoCapture(saveDir + vidDir)
            saved_frame_count = int(saved.get(cv2.CAP_PROP_FRAME_COUNT))
            saved.release()
            original = cv2.VideoCapture(dataDir + vidDir)
            original_frame_count = int(original.get(cv2.CAP_PROP_FRAME_COUNT))
            original.release()
            if saved_frame_count == original_frame_count:
                continue
        jsonDir = vidDir.replace('mp4', 'json')
        print(time.ctime(time.time()), ':', dataDir + vidDir)
        try:
            extract_with_json(
                detector,
                dataDir + vidDir,
                saveDir + vidDir,
                jsonsDir + jsonDir
            )
        except:
            print("\n>>>>>>>>>> ERROR OCCURED <<<<<<<<<<")

Thu Mar 14 16:40:12 2024 : /home/kyr/GazeForensicsData/cropped_videos/FF++_crop_basic/c40/real/006.mp4
Thu Mar 14 16:40:12 2024 : /home/kyr/GazeForensicsData/cropped_videos/FF++_crop_basic/c40/real/007.mp4
Thu Mar 14 16:40:13 2024 : /home/kyr/GazeForensicsData/cropped_videos/FF++_crop_basic/c40/real/008.mp4
Thu Mar 14 16:40:13 2024 : /home/kyr/GazeForensicsData/cropped_videos/FF++_crop_basic/c40/real/009.mp4
Thu Mar 14 16:40:14 2024 : /home/kyr/GazeForensicsData/cropped_videos/FF++_crop_basic/c40/real/010.mp4
Thu Mar 14 16:40:14 2024 : /home/kyr/GazeForensicsData/cropped_videos/FF++_crop_basic/c40/real/011.mp4
Thu Mar 14 16:40:16 2024 : /home/kyr/GazeForensicsData/cropped_videos/FF++_crop_basic/c40/real/012.mp4
Thu Mar 14 16:40:16 2024 : /home/kyr/GazeForensicsData/cropped_videos/FF++_crop_basic/c40/real/013.mp4
Thu Mar 14 16:40:16 2024 : /home/kyr/GazeForensicsData/cropped_videos/FF++_crop_basic/c40/real/014.mp4
Thu Mar 14 16:40:17 2024 : /home/kyr/GazeForensicsData/cropped_videos/FF+