# Test

In [1]:
#!pip install ../input/mtcnn-package/mtcnn-0.1.0-py3-none-any.whl

import pandas as pd
import numpy as np

import os
import sys
import shutil

import cv2
from mtcnn import MTCNN

import tensorflow as tf
from tensorflow import keras
from tensorflow.python.keras import backend as k

from tensorflow.keras import layers
from tensorflow.keras.layers import Input, Add, Dense, Activation, ZeroPadding2D, BatchNormalization, Flatten, Conv2D, AveragePooling2D, MaxPooling2D
from tensorflow.keras.models import Model, load_model
from tensorflow.keras.initializers import glorot_uniform
from tensorflow.keras.callbacks import Callback, EarlyStopping

from tqdm.notebook import tqdm
import random

import warnings
warnings.filterwarnings("ignore")

#tf.debugging.set_log_device_placement(True) # Enable GPU logging
print("Num GPUs Available: ", len(tf.config.experimental.list_physical_devices('GPU')))

Using TensorFlow backend.


Num GPUs Available:  1


In [2]:
def identity_block(X, f, filters, stage, block):
    # defining name basis
    conv_name_base = 'res' + str(stage) + block + '_branch'
    bn_name_base = 'bn' + str(stage) + block + '_branch'

    # Retrieve Filters
    F1, F2, F3 = filters

    # Save the input value. We'll need this later to add back to the main path. 
    X_shortcut = X

    # First component of main path
    X = Conv2D(filters=F1, kernel_size=(1, 1), strides=(1,1), padding='valid', name=conv_name_base + '2a', kernel_initializer=glorot_uniform(seed=0))(X)
    X = BatchNormalization(axis=3, name=bn_name_base + '2a')(X)
    X = Activation('relu')(X)

    # Second component of main path
    X = Conv2D(filters=F2, kernel_size=(f, f), strides=(1,1), padding='same', name=conv_name_base + '2b', kernel_initializer=glorot_uniform(seed=0))(X)
    X = BatchNormalization(axis=3, name=bn_name_base + '2b')(X)
    X = Activation('relu')(X)

    # Third component of main path
    X = Conv2D(filters=F3, kernel_size=(1, 1), strides=(1,1), padding='valid', name=conv_name_base + '2c', kernel_initializer=glorot_uniform(seed=0))(X)
    X = BatchNormalization(axis=3, name=bn_name_base + '2c')(X)

    # Final step: Add shortcut value to main path, and pass it through a RELU activation
    X = Add()([X, X_shortcut])
    X = Activation('relu')(X)

    return X

In [3]:
def convolutional_block(X, f, filters, stage, block, s=2):
    # defining name basis
    conv_name_base='res' + str(stage) + block + '_branch'
    bn_name_base='bn' + str(stage) + block + '_branch'
    
    # Retrieve Filters
    F1, F2, F3 = filters
    
    # Save the input value
    X_shortcut = X


    ##### MAIN PATH #####
    # First component of main path 
    X = Conv2D(F1, (1, 1), strides=(s,s), name=conv_name_base + '2a', kernel_initializer=glorot_uniform(seed=0))(X)
    X = BatchNormalization(axis=3, name=bn_name_base + '2a')(X)
    X = Activation('relu')(X)

    # Second component of main path
    X = Conv2D(filters=F2, kernel_size=(f, f), strides=(1, 1), padding='same', name=conv_name_base + '2b', kernel_initializer=glorot_uniform(seed=0))(X)
    X = BatchNormalization(axis=3, name=bn_name_base + '2b')(X)
    X = Activation('relu')(X)

    # Third component of main path
    X = Conv2D(filters=F3, kernel_size=(1, 1), strides=(1, 1), padding='valid', name=conv_name_base + '2c', kernel_initializer=glorot_uniform(seed=0))(X)
    X = BatchNormalization(axis=3, name=bn_name_base + '2c')(X)

    
    ##### SHORTCUT PATH ####
    X_shortcut = Conv2D(F3, (1, 1), strides=(s,s), name = conv_name_base + '1', kernel_initializer=glorot_uniform(seed=0))(X_shortcut)
    X_shortcut = BatchNormalization(axis=3, name=bn_name_base + '1')(X_shortcut)

    # Final step: Add shortcut value to main path, and pass it through a RELU activation
    X = Add()([X, X_shortcut])
    X = Activation('relu')(X)
    
    return X

In [4]:
def ResNet50(input_shape = (64, 64, 3), classes=2):   
    # Define the input as a tensor with shape input_shape
    X_input = Input(input_shape)

    # Zero-Padding
    X = ZeroPadding2D((3, 3))(X_input)
    
    # Stage 1
    X = Conv2D(64, (7, 7), strides=(2, 2), name='conv1', kernel_initializer=glorot_uniform(seed=0))(X)
    X = BatchNormalization(axis=3, name='bn_conv1')(X)
    X = Activation('relu')(X)
    X = MaxPooling2D((3, 3), strides=(2, 2))(X)

    # Stage 2
    X = convolutional_block(X, f=3, filters=[64, 64, 256], stage=2, block='a', s=1)
    X = identity_block(X, 3, [64, 64, 256], stage=2, block='b')
    X = identity_block(X, 3, [64, 64, 256], stage=2, block='c')

    # Stage 3
    X = convolutional_block(X, f=3, filters=[128, 128, 512], stage=3, block='a', s=2)
    X = identity_block(X, 3, [128, 128, 512], stage=3, block='b')
    X = identity_block(X, 3, [128, 128, 512], stage=3, block='c')
    X = identity_block(X, 3, [128, 128, 512], stage=3, block='d')

    # Stage 4
    X = convolutional_block(X, f=3, filters=[256, 256, 1024], stage=4, block='a', s=2)
    X = identity_block(X, 3, [256, 256, 1024], stage=4, block='b')
    X = identity_block(X, 3, [256, 256, 1024], stage=4, block='c')
    X = identity_block(X, 3, [256, 256, 1024], stage=4, block='d')
    X = identity_block(X, 3, [256, 256, 1024], stage=4, block='e')
    X = identity_block(X, 3, [256, 256, 1024], stage=4, block='f')

    # Stage 5
    X = convolutional_block(X, f=3, filters=[512, 512, 2048], stage=5, block='a', s=2)
    X = identity_block(X, 3, [512, 512, 2048], stage=5, block='b')
    X = identity_block(X, 3, [512, 512, 2048], stage=5, block='c')

    # AVGPOOL.
    X = AveragePooling2D((2, 2), name='avg_pool')(X)

    # output layer
    X = Flatten()(X)
    X = Dense(classes, activation='sigmoid', name='fc' + str(classes), kernel_initializer=glorot_uniform(seed=0))(X)
    
    # Create model
    model = Model(inputs=X_input, outputs=X, name='ResNet50')

    return model

In [5]:
kfolds = 5
# Import the weights of our model
models = []
for i in range(kfolds):
    model = ResNet50(input_shape=(64, 64, 3), classes=2)
    model.load_weights('../output/resnet50_' + str(i) + '.h5')
    models.append(model)

In [6]:
model = ResNet50(input_shape=(64, 64, 3), classes=2)
model.load_weights('../output/resnet50_4.h5')

In [7]:
# Read video in.
# Extract n frames
# For each frame in video, detect all faces.
# For each face detected - make REAL or FAKE prediction.
# Take max/mean of predictions across faces in frame.
# Take max/mean of all predictions across frames in video.
# Edit submission script dataframe with name of video and final prediction.

In [8]:
test_videos_path = '../input/test_videos/'
test_videos_dir = os.listdir(test_videos_path)
test_images_path = "../input/test_images/" # path to save test images to

In [9]:
test_videos_files = [] # List of all train videos paths

for file in test_videos_dir:
    test_videos_files.append(test_videos_path + file)

In [45]:
def make_submission(videos_dir_path, frames=1, conf_level=0.9):
    """
    Inputs a directory of videos, extracts n frames. 
    Outputs images of ANY faces detected in those frames.
    
    NOTE: The is not the same function as in preprocessing.ipynb. It has been altered to suit the test environment.

    videos_dir_path: (str) Path to your directory of videos
    images_dir_path: (str) Path to where you'll save your images to
    frames: (int or list) Number of frames. If int, take that many frames. If list, take frame numbers specified in list. 
    conf_level: (float) Confidence level for the face recognition model.
    """
    def crop(img, x, y, w, h):
        """
        Crop and reshape images to be uniform across all frames
        """
        x -= 40
        y -= 40
        w += 80
        h += 80
        if x < 0:
            x = 0
        if y <= 0:
            y = 0
        return cv2.cvtColor(cv2.resize(img[y:y + h, x:x + w], (256, 256)), cv2.COLOR_BGR2RGB)
    def read_image(img):
        return cv2.resize(img, (ROWS, COLS), interpolation=cv2.INTER_CUBIC)
    def prepare_data(images):
        m = len(images)
        X = np.zeros((m, ROWS, COLS, CHANNELS), dtype=np.uint8)
        for i, image_file in enumerate(images):
            X[i,:] = read_image(image_file)
        return X
    submission_df = pd.DataFrame()
    # Get video directory
    if type(videos_dir_path) == list: 
        videos_dir = videos_dir_path # If it's a list of paths 
    else: 
        videos_dir = os.listdir(videos_dir_path) # List test vids
        
    for video in tqdm(range(0, 5)): #len(videos_dir)
        
        # Extract frames from video
        file_name = videos_dir_path[video].split('/')[3]
        file_path = videos_dir_path[video]
        vid_name = file_name.split('.')[0]
        frames_list = [] # We'll store the raw frames here
        detector = MTCNN() # Facial recognition algorithm
        if type(frames) == list:
            for num in range(0, len(frames)):
                cap = cv2.VideoCapture(file_path)
                total_frames = cap.get(7)
                vid_length = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
                cap.set(1, num) # EDIT HERE FOR FRAME NUMBER
                ret, frame = cap.read()
                frames_list.append(frame)
        else:
            for num in range(0, frames):
                cap = cv2.VideoCapture(file_path)
                total_frames = cap.get(7)
                vid_length = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
                cap.set(1, random.randint(0, vid_length)) # EDIT HERE FOR FRAME NUMBER
                ret, frame = cap.read()
                frames_list.append(frame)
        
        # Extract faces from frames
        
        vid_preds = []
        for i, image in enumerate(frames_list): 
            # Read image and detect faces
            result = detector.detect_faces(image)
            
            # Extract and save faces as their own images
            faces = []
            for face in range(0, len(result)):
                # Only extract the face if confidence is more than or equal to default 0.95
                if result[face]['confidence'] >= conf_level:            
                    startX, startY, endX, endY = result[face]['box'] # Get box coordinates
                    crop_img = crop(frame, startX, startY, endX, endY)
                    faces.append(crop_img)
            
            # Predict if face is FAKE or REAL
            ROWS = 64
            COLS = 64
            CHANNELS = 3
            CLASSES = 2
            faces_prepared = prepare_data(faces)
            faces_prepared =  faces_prepared / 255  ## DO WE NEED THIS??

            frame_preds = []
            for i, face in enumerate(faces_prepared):
                face = np.expand_dims(face, axis=0)
                face_pred = np.mean([model.predict(face) for model in models])
                frame_preds.append(face_pred)

            if len(frame_preds) > 0:
                frame_pred = max(frame_preds) # Choose max pred across faces for overall frame pred
                vid_preds.append(frame_pred)
            else:
                pass
        vid_pred = np.mean(vid_preds) # Take mean pred across all frames for overall video pred
        results_dict = dict(zip(['filename','label'], [file_name, vid_pred]))
        submission_df = submission_df.append(results_dict, ignore_index=True)
    return submission_df

In [48]:
submission = make_submission(test_videos_files, frames=10)
submission.head()

HBox(children=(FloatProgress(value=0.0, max=5.0), HTML(value='')))




In [49]:
submission.to_csv('submission.csv', index=False)