In [1]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load in 

# import numpy as np # linear algebra
# import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the "../input/" directory.
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

# import os
# for dirname, _, filenames in os.walk('/kaggle/input'):
#     for filename in filenames:
#         print(os.path.join(dirname, filename))

# Any results you write to the current directory are saved as output.

In [2]:
import json
import os
import tqdm
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.transforms.functional as t_F
import torchvision.models as models
import torchvision.transforms as transforms
import torch.utils.data as data
import torchvision
from torch.autograd import Variable
from torch.utils.data import Dataset
import cv2

In [3]:
# set path
data_path = "/kaggle/input/deepfake-detection-challenge/test_videos"
save_model_path = "/kaggle/input/single-frame/"

In [4]:
class MesoInception4(nn.Module):
    """
    Pytorch Implemention of MesoInception4
    Author: Honggu Liu
    Date: July 7, 2019
    """
    def __init__(self, num_classes=2):
        super(MesoInception4, self).__init__()
        self.num_classes = num_classes
        #InceptionLayer1
        self.Incption1_conv1 = nn.Conv2d(3, 1, 1, padding=0, bias=False)
        self.Incption1_conv2_1 = nn.Conv2d(3, 4, 1, padding=0, bias=False)
        self.Incption1_conv2_2 = nn.Conv2d(4, 4, 3, padding=1, bias=False)
        self.Incption1_conv3_1 = nn.Conv2d(3, 4, 1, padding=0, bias=False)
        self.Incption1_conv3_2 = nn.Conv2d(4, 4, 3, padding=2, dilation=2, bias=False)
        self.Incption1_conv4_1 = nn.Conv2d(3, 2, 1, padding=0, bias=False)
        self.Incption1_conv4_2 = nn.Conv2d(2, 2, 3, padding=3, dilation=3, bias=False)
        self.Incption1_bn = nn.BatchNorm2d(11)


        #InceptionLayer2
        self.Incption2_conv1 = nn.Conv2d(11, 2, 1, padding=0, bias=False)
        self.Incption2_conv2_1 = nn.Conv2d(11, 4, 1, padding=0, bias=False)
        self.Incption2_conv2_2 = nn.Conv2d(4, 4, 3, padding=1, bias=False)
        self.Incption2_conv3_1 = nn.Conv2d(11, 4, 1, padding=0, bias=False)
        self.Incption2_conv3_2 = nn.Conv2d(4, 4, 3, padding=2, dilation=2, bias=False)
        self.Incption2_conv4_1 = nn.Conv2d(11, 2, 1, padding=0, bias=False)
        self.Incption2_conv4_2 = nn.Conv2d(2, 2, 3, padding=3, dilation=3, bias=False)
        self.Incption2_bn = nn.BatchNorm2d(12)

        #Normal Layer
        self.conv1 = nn.Conv2d(12, 16, 5, padding=2, bias=False)
        self.relu = nn.ReLU(inplace=True)
        self.leakyrelu = nn.LeakyReLU(0.1)
        self.bn1 = nn.BatchNorm2d(16)
        self.maxpooling1 = nn.MaxPool2d(kernel_size=(2, 2))

        self.conv2 = nn.Conv2d(16, 16, 5, padding=2, bias=False)
        self.maxpooling2 = nn.MaxPool2d(kernel_size=(4, 4))

        self.dropout = nn.Dropout2d(0.5)
        self.fc1 = nn.Linear(16*8*8, 16)
        self.fc2 = nn.Linear(16, num_classes)


    #InceptionLayer
    def InceptionLayer1(self, input):
        x1 = self.Incption1_conv1(input)
        x2 = self.Incption1_conv2_1(input)
        x2 = self.Incption1_conv2_2(x2)
        x3 = self.Incption1_conv3_1(input)
        x3 = self.Incption1_conv3_2(x3)
        x4 = self.Incption1_conv4_1(input)
        x4 = self.Incption1_conv4_2(x4)
        y = torch.cat((x1, x2, x3, x4), 1)
        y = self.Incption1_bn(y)
        y = self.maxpooling1(y)

        return y

    def InceptionLayer2(self, input):
        x1 = self.Incption2_conv1(input)
        x2 = self.Incption2_conv2_1(input)
        x2 = self.Incption2_conv2_2(x2)
        x3 = self.Incption2_conv3_1(input)
        x3 = self.Incption2_conv3_2(x3)
        x4 = self.Incption2_conv4_1(input)
        x4 = self.Incption2_conv4_2(x4)
        y = torch.cat((x1, x2, x3, x4), 1)
        y = self.Incption2_bn(y)
        y = self.maxpooling1(y)

        return y

    def forward(self, input):
        x = self.InceptionLayer1(input) #(Batch, 11, 128, 128)
        x = self.InceptionLayer2(x) #(Batch, 12, 64, 64)

        x = self.conv1(x) #(Batch, 16, 64 ,64)
        x = self.relu(x)
        x = self.bn1(x)
        x = self.maxpooling1(x) #(Batch, 16, 32, 32)

        x = self.conv2(x) #(Batch, 16, 32, 32)
        x = self.relu(x)
        x = self.bn1(x)
        x = self.maxpooling2(x) #(Batch, 16, 8, 8)

        x = x.view(x.size(0), -1) #(Batch, 16*8*8)
        x = self.dropout(x)
        x = self.fc1(x) #(Batch, 16)
        x = self.leakyrelu(x)
        x = self.dropout(x)
        x = self.fc2(x)

        return x

In [5]:
def face_detect(frame):
    
    face_cascade = cv2.CascadeClassifier('/kaggle/input/single-frame/haarcascade_frontalface_default.xml')
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    # Resize frame of video to 1/4 size for faster face detection processing
    small_frame = cv2.resize(gray, (0, 0), fx=0.25, fy=0.25)
    # Detect the faces
    faces = face_cascade.detectMultiScale(small_frame, 1.1, 4)
    return faces


def readVideo(videoFile):

    max_attempts = 60
    # Open the video file
    cap = cv2.VideoCapture(videoFile)
    transform = transforms.Compose([
        transforms.Resize((256, 256)),
        transforms.ToTensor(),
        transforms.Normalize([0.5]*3, [0.5]*3)
    ])
    #cap.set(1, self.frame_no)
    # nFrames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    # frames = torch.FloatTensor(self.channels, self.timeDepth, self.xSize, self.ySize)

    attempts = 0
    while attempts < max_attempts:
        ret, frame = cap.read()
        attempts += 1
        if ret:
            last_good_frame = frame
            try:
                faces = face_detect(frame)
                # Face detected
                if len(faces) > 0:
                    # Get the face, if more than two, use the whole frame
                    if len(faces) > 1:
                        break
                    x, y, w, h = faces[0] * 4
                    x -= 40
                    y -= 40
                    w += 80
                    h += 80
                    face_img = frame[y: y + h, x: x + w]
                    frame = torch.from_numpy(face_img)
                    # HWC2CHW
                    frame = frame.permute(2, 0, 1)
                    frame = t_F.to_pil_image(frame)
                    frame = transform(frame)
                    cap.release()
                    return frame
            except:
                print("Face detection error")
        else:
            break

    frame = torch.from_numpy(last_good_frame)
    # HWC2CHW
    frame = frame.permute(2, 0, 1)
    frame = t_F.to_pil_image(frame)
    frame = transform(frame)
    cap.release()
    return frame

In [6]:
def test(model, device, test_vidoes):
    # set model as testing mode
    output_file = 'submission.csv'
    if os.path.exists(output_file):
        os.remove(output_file)      
    cnn_encoder= model
    cnn_encoder.eval()

    results = {}
    with torch.no_grad():
        for video_file in tqdm.tqdm(test_vidoes):
            file_name = video_file.split('/')[-1]
            # Make prediction
            try:
                X = readVideo(video_file)
                X = X.to(device)
                # y = y.to(device).view(-1, )
                X = X.unsqueeze(0)
                output = cnn_encoder(X)
                output_prob = F.softmax(output, dim=1)
                results[file_name] = output_prob[0][1].item()
            except:
                results[file_name] = 0.5
                
    df =  pd.DataFrame([results.keys(), results.values()]).T
    df.columns = ['filename', 'label']
    df.fillna(0.5)
    df.to_csv(output_file, sep=',', index=False)
    print("Finished prediction!!!")

In [7]:
def get_videos(data_folder):
    video_files = []
    videos = os.listdir(data_folder)
    for v in videos:
        if v.endswith('mp4'):
            video_files.append(os.path.join(data_folder, v))
    return video_files

In [8]:
# Detect devices
use_cuda = torch.cuda.is_available()                   # check if GPU exists
device = torch.device("cuda" if use_cuda else "cpu")   # use CPU or GPU
video_files = get_videos(data_path)

In [9]:
# Create model
model_ft = MesoInception4().to(device)

# Load model
encoder_model_path = os.path.join(save_model_path, 'v1_meso_encoder_epoch1.pth')
model_ft.load_state_dict(torch.load(encoder_model_path))

<All keys matched successfully>

In [10]:
# Predict
test(model_ft, device, video_files)

  2%|▏         | 9/400 [00:06<08:55,  1.37s/it]

Face detection error
Face detection error
Face detection error
Face detection error


  2%|▎         | 10/400 [00:07<07:04,  1.09s/it]

Face detection error
Face detection error


 54%|█████▍    | 215/400 [03:30<03:19,  1.08s/it]

Face detection error


 56%|█████▌    | 224/400 [03:42<01:21,  2.15it/s]

Face detection error
Face detection error
Face detection error


 64%|██████▍   | 257/400 [04:30<03:12,  1.35s/it]

Face detection error
Face detection error
Face detection error
Face detection error
Face detection error
Face detection error
Face detection error
Face detection error


100%|██████████| 400/400 [07:06<00:00,  1.07s/it]


Finished prediction!!!
