In [None]:
import numpy as np
import pandas as pd
import cv2 as cv

import os
import random

# Image augmentations
import albumentations as A

# NN
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader

from collections import defaultdict

from tqdm.notebook import tqdm

from PIL import Image

In [None]:
DATA_FOLDER = '../input/deepfake-detection-challenge'
TRAIN_SAMPLE_FOLDER = 'train_sample_videos'
TEST_FOLDER = 'test_videos'

TRAIN_PATH_PRE_FIX = os.path.join(DATA_FOLDER, TRAIN_SAMPLE_FOLDER)
TEST_PATH_PRE_FIX = os.path.join(DATA_FOLDER, TEST_FOLDER)

train_fnames, test_fnames = os.listdir(os.path.join(DATA_FOLDER, TRAIN_SAMPLE_FOLDER)), os.listdir(os.path.join(DATA_FOLDER, TEST_FOLDER)) 
train_fnames.remove("metadata.json")

print(f"Train samples: {len(train_fnames)}")
print(f"Test samples: {len(test_fnames)}")

In [None]:
# Get the labels from json
import json

f = open(os.path.join(TRAIN_PATH_PRE_FIX, 'metadata.json'))
data = json.load(f)

# f2 = open(os.path.join(TEST_PATH_PRE_FIX, 'metadata.json'))
# data_test = json.load(f)

def get_label(vid_fname, training=True):
#     if not training:
#         return data_test[vid_fname]['label']
    return data[vid_fname]['label']

In [None]:
# Lets see how many are fake vs. real
real_ct, fake_ct = 0, 0
for fname in data:
    if get_label(fname) == 'REAL':
        real_ct += 1
    else:
        fake_ct += 1
print(real_ct, fake_ct)

In [None]:
def get_data_subset(data, n_samples):
    return random.sample(data, n_samples)

# To play around with our model, start small and develop more samples from this subset
train_fnames, test_fnames = get_data_subset(train_fnames, 10), get_data_subset(test_fnames, 10)

In [None]:
def get_frames(fname, num_frames=10, every_n_frames=None):
    
    # Docs: https://docs.opencv.org/4.5.3/d8/dfe/classcv_1_1VideoCapture.html
    vid = cv.VideoCapture(fname)
    
    # Properties found in https://docs.opencv.org/4.5.3/d4/d15/group__videoio__flags__base.html#gaeb8dd9c89c10a5c63c139bf7c4f5704d
    total_frames = int(vid.get(cv.CAP_PROP_FRAME_COUNT))
    h, w = int(vid.get(cv.CAP_PROP_FRAME_HEIGHT)), int(vid.get(cv.CAP_PROP_FRAME_WIDTH))
    
    step = total_frames//num_frames
    frame_nums = [i*step for i in range(num_frames)]
    
    # If specified frame-interval, try to approximately center the frame captures (generally establish equal offsets)
    # Getting the exact center is not as important so don't mind the imperfect slice
    if every_n_frames:
        
        # Take a frame every n frames
        
        frame_range = every_n_frames*num_frames
        start = total_frames // 2 - frame_range // 2
        
#         print("Start Frame", start)
#         print("End Frame", total_frames // 2 + frame_range // 2)
        
        frame_nums = [i*every_n_frames + start for i in range(num_frames)]
        
    out = np.empty((num_frames, h, w, 3), np.dtype('uint8'))
    
    # Get the frames at the specified frame_nums and add it to out
    curr_frame, i = 0, 0
    for frame in frame_nums:
        
        # Advance to correct frame
        while curr_frame != frame:
            boo = vid.grab()
            curr_frame+=1
            
        # Get current frame and place it in out
        vid.grab()
        out[i] = vid.retrieve()[1]
        i+=1
    return out

In [None]:
# for fname in train_fnames[:10]:
#     pic_arr = get_frames(DATA_FOLDER + "/" + TRAIN_SAMPLE_FOLDER + "/" + fname, every_n_frames=5)
    
#     # For each picture array
#     curr_pic = 1
#     for pic_vals in pic_arr:
#         im = Image.fromarray(pic_vals)
#         # Get rid of .mp4 at the end
#         new_fname = fname[:-4] + "_" + str(curr_pic)+"_" + str(len(pic_arr)) + ".jpeg"

#         # Imaged are saved in the format fname_pic#_totalPic#.jpeg
#         im.save(DATA_FOLDER + "/" + TRAIN_SAMPLE_FOLDER + "/" + new_fname )

#         curr_pic += 1

In [None]:
'''
import matplotlib.pylab as plt
train_dir = '/kaggle/input/deepfake-detection-challenge/train_sample_videos/'
#train_dir = TRAIN_SAMPLE_FOLDER
fig, ax = plt.subplots(1,1, figsize=(15, 15))
train_video_files = [train_dir + x for x in os.listdir(train_dir)]
video_file = train_video_files[3]
#video_file = '/kaggle/input/deepfake-detection-challenge/train_sample_videos/afoovlsmtx.mp4'
cap = cv.VideoCapture(video_file)
success, image = cap.read()
image = cv.cvtColor(image, cv.COLOR_BGR2RGB)
cap.release()   
ax.imshow(image)
ax.xaxis.set_visible(False)
ax.yaxis.set_visible(False)
ax.title.set_text(f"FRAME 0: {video_file.split('/')[-1]}")
plt.grid(False)
'''

In [None]:
!pip install deepface

In [None]:
from deepface import DeepFace
import matplotlib.pylab as plt
train_dir = '/kaggle/input/deepfake-detection-challenge/train_sample_videos/'

fig, ax = plt.subplots(1,1, figsize=(15, 15))

train_video_files = [train_dir + x for x in os.listdir(train_dir)]
video_file = train_video_files[6]

image = get_frames(video_file)
img = cv.cvtColor(image[5], cv.COLOR_BGR2RGB)

ax.imshow(img)
ax.xaxis.set_visible(False)
ax.yaxis.set_visible(False)
#ax.title.set_text(f"FRAME 0: {video_file.split('/')[-1]}")
plt.grid(False)


In [None]:
face = DeepFace.detectFace(image[4], detector_backend = 'ssd', enforce_detection = False)
plt.imshow(face)

In [None]:
# Credit to https://www.kaggle.com/vaillant/dfdc-3d-2d-inc-cutmix-with-3d-model-fix#Calculate-ensembled-prediction-&-clamp for the modified ResNet architecture
def conv3x3x3(in_planes, out_planes, stride=1):
    # 3x3x3 convolution with padding
    return nn.Conv3d(
        in_planes,
        out_planes,
        kernel_size=3,
        stride=stride,
        padding=1,
        bias=False)

def downsample_basic_block(x, planes, stride):
    out = F.avg_pool3d(x, kernel_size=1, stride=stride)
    zero_pads = torch.Tensor(
        out.size(0), planes - out.size(1), out.size(2), out.size(3),
        out.size(4)).zero_()
    if isinstance(out.data, torch.cuda.FloatTensor):
        zero_pads = zero_pads.cuda()
 
    out = Variable(torch.cat([out.data, zero_pads], dim=1))
 
    return out

class BasicBlock(nn.Module):
    expansion = 1
 
    def __init__(self, inplanes, planes, stride=1, downsample=None):
        super(BasicBlock, self).__init__()
        self.conv1 = conv3x3x3(inplanes, planes, stride)
        self.bn1 = nn.BatchNorm3d(planes)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = conv3x3x3(planes, planes)
        self.bn2 = nn.BatchNorm3d(planes)
        self.downsample = downsample
        self.stride = stride
 
    def forward(self, x):
        residual = x
 
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)
 
        out = self.conv2(out)
        out = self.bn2(out)
 
        if self.downsample is not None:
            residual = self.downsample(x)
 
        out += residual
        out = self.relu(out)
 
        return out

class ResNet(nn.Module):
 
    def __init__(self,
                 block,
                 layers,
                 sample_size,
                 sample_duration,
                 num_classes=400):
        self.inplanes = 64
        super(ResNet, self).__init__()
        self.conv1 = nn.Conv3d(
            3,
            64,
            kernel_size=7,
            stride=(1, 2, 2),
            padding=(3, 3, 3),
            bias=False)
        self.bn1 = nn.BatchNorm3d(64)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool3d(kernel_size=(3, 3, 3), stride=2, padding=1)
        self.layer1 = self._make_layer(block, 64, layers[0])
        self.layer2 = self._make_layer(
            block, 128, layers[1], stride=2)
        self.layer3 = self._make_layer(
            block, 256, layers[2], stride=2)
        self.layer4 = self._make_layer(
            block, 512, layers[3], stride=2)
        self.avgpool = nn.AdaptiveAvgPool3d(1)
        self.fc = nn.Linear(512 * block.expansion, num_classes)
 
        for m in self.modules():
            if isinstance(m, nn.Conv3d):
                m.weight = nn.init.kaiming_normal(m.weight, mode='fan_out')
            elif isinstance(m, nn.BatchNorm3d):
                m.weight.data.fill_(1)
                m.bias.data.zero_()
 
    def _make_layer(self, block, planes, blocks, stride=1):
        downsample = None
        if stride != 1 or self.inplanes != planes * block.expansion:
            downsample = nn.Sequential(
                nn.Conv3d(
                    self.inplanes,
                    planes * block.expansion,
                    kernel_size=1,
                    stride=stride,
                    bias=False), nn.BatchNorm3d(planes * block.expansion))
 
        layers = []
        layers.append(block(self.inplanes, planes, stride, downsample))
        self.inplanes = planes * block.expansion
        for i in range(1, blocks):
            layers.append(block(self.inplanes, planes))
 
        return nn.Sequential(*layers)
 
    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)
 
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
 
        x = self.avgpool(x)
 
        x = x.view(x.size(0), -1)
        x = self.fc(x)
 
        return x

# def get_fine_tuning_parameters(model, ft_begin_index):
#     if ft_begin_index == 0:
#         return model.parameters()
 
#     ft_module_names = []
#     for i in range(ft_begin_index, 5):
#         ft_module_names.append('layer{}'.format(i))
#     ft_module_names.append('fc')
 
#     parameters = []
#     for k, v in model.named_parameters():
#         for ft_module in ft_module_names:
#             if ft_module in k:
#                 parameters.append({'params': v})
#                 break
#         else:
#             parameters.append({'params': v, 'lr': 0.0})
 
#     return parameters


def resnet18(**kwargs):
    """Constructs a ResNet-18 model.
    """
    model = ResNet(BasicBlock, [2, 2, 2, 2], **kwargs)
    return model

def resnet34(**kwargs):
    """Constructs a ResNet-34 model.
    """
    model = ResNet(BasicBlock, [3, 4, 6, 3], **kwargs)
    return model

In [None]:
model = resnet34(num_classes=2, sample_size=224, sample_duration=32)
# model.load_state_dict(torch.load(modeldict['path']))
if torch.cuda.is_available():
    model = model.cuda() 
model

In [None]:
mult_ratio = fake_ct//real_ct
print(mult_ratio)

train_loader = DataLoader(train_fnames, batch_size=1, shuffle=True)

In [None]:
test_loader = DataLoader(test_fnames, batch_size=1, shuffle=False)

In [None]:

def train(model, epochs, output=False, debug_batch_interval=5):
    optimizer = torch.optim.Adam(model.parameters(),lr=0.0000001)
    
    model.train()
    
    for epoch in tqdm(range(epochs), position=0, desc="Epochs"):
        print('Epoch', epoch+1)
        
        # Mostly used for stats
        running, batch_running, ct, batch_ct, correct = 0, 0, 0, 0, 0
        real_ct, fake_ct, correct_real, correct_fake = 0, 0, 0, 0
        predictions = defaultdict(list)
        
        for batch_idx, vid_name in tqdm(enumerate(train_loader), position=1, desc="Batches", total=len(train_fnames)):
            
            # Batch size should be 1!
            assert(len(vid_name) == 1)

            vid_name = vid_name[0]

            # Get the video by cutting out some images
            video_frames = get_frames(os.path.join(TRAIN_PATH_PRE_FIX, vid_name))

            preds_video = []

            gt_text = get_label(vid_name)
#             print("This sample is -------------", gt_text)
            gt_label = 0 if gt_text == 'REAL' else 1

#             try:
            # Generate our predictions by getting a confidence score of each frame
            for im in video_frames:
                optimizer.zero_grad()

                # Done by a strong notebook... might be worth trying out later
        #             model = HFlipWrapper(model=model)
                if gt_label == 0:

                    # Sam, I think this will mess up our loss... if we train it on the same REAL images, we will overfit the REALs and it will be memorizing
                    for aug_num in range(1,mult_ratio):
                        
                        im_real = im
                        with torch.no_grad():
                            im_real = DeepFace.detectFace(im_real, detector_backend = 'ssd', enforce_detection = False)

                            # Get transforms/variations of our image
                            transforms = A.Compose([A.Resize(height=112, width=112),
                                     A.Normalize(), A.ShiftScaleRotate(p=1)])


                            im_real = np.array([transforms(image=im_real)['image']
                                 ])

                            im_real = torch.from_numpy(im_real.transpose([3, 0, 1, 2])).float()

                            # Unsqueeze because we are using 1 frame, not many frames together as a video
                            im_real = im_real.unsqueeze(0)

                        im_real = torch.tensor(im_real, requires_grad=True)
                        model.cuda()
                        im_real = im_real.cuda()
                        y_pred = model(im_real)
                        prob0, prob1 = torch.mean(torch.exp(F.log_softmax(y_pred, dim=1)),dim=0)

                        # Add this frame's prediction
                        preds_video.append(float(prob1))

                        # Take the loss after generating predictions on all images for a given video
                        loss = F.mse_loss(prob1.reshape(1), torch.tensor(gt_label, dtype=torch.float32).reshape(1).cuda())

#                         pred = 'REAL' if prob1 < 0.5 else 'FAKE'
#                         print("Prediction:", pred, "Confidence:", prob1)
#                         print("Actual:", gt_text, "loss:", loss.item())

                        # If guessed correctly that it was REAL
                        if prob1 < 0.5: # already know gt_label == 0):
                            correct += 1
                            correct_real += 1

                        real_ct += 1
                        running += loss.item()
                        batch_running += loss.item()
                        ct += 1
                        batch_ct += 1
                        loss.backward()
                        optimizer.step()

                else:

                    with torch.no_grad():
                        # Get transforms/variations of our image
                        transforms = A.Compose([A.Resize(height=112, width=112),
                                         A.Normalize()])

                        im = DeepFace.detectFace(im, detector_backend = 'ssd', enforce_detection = False)
                        im = np.array([transforms(image=im)['image']
                                     ])

                        im = torch.from_numpy(im.transpose([3, 0, 1, 2])).float()

                        # Unsqueeze because we are using 1 frame, not many frames together as a video
                        im = im.unsqueeze(0)
                    
                    
                    im = torch.tensor(im, requires_grad=True)
                    model.cuda()
                    im = im.cuda()
                    y_pred = model(im)
                    prob0, prob1 = torch.mean(torch.exp(F.log_softmax(y_pred, dim=1)),dim=0)

                    # Add this frame's prediction
                    preds_video.append(float(prob1))

                    # Take the loss after generating predictions on all images for a given video
                    loss = F.mse_loss(prob1.reshape(1), torch.tensor(gt_label, dtype=torch.float32).reshape(1).cuda())

#                     pred = 'REAL' if prob1 < 0.5 else 'FAKE'
#                     print("Prediction:", pred, "Confidence (REAL, FAKE):", prob0.item(), prob1.item())
#                     print("Actual:", gt_text, "loss:", loss.item())

                    if prob1 >= 0.5: # already know that gt_label == 1):
                        correct += 1
                        correct_fake += 1
                    
                    fake_ct += 1
                    running += loss.item()
                    batch_running += loss.item()
                    ct += 1
                    batch_ct += 1
                    loss.backward()
                    optimizer.step()

            # Every debug_batch_interval iterations, print the data we've churned through (iterations * data per batch)
            if output and batch_idx % (len(train_fnames) // debug_batch_interval) == 0 and batch_idx != 0:                
                print('Epoch: {} [{}/{} ({:.2f}%)]\tBatch Loss: {:.5f}\tEpoch Loss: {:.5f}'.format(
                          epoch+1, batch_idx, len(train_fnames),    # current sample num / total num
                          100. * batch_idx / len(train_fnames), # this batch num's % of total dataset
                          batch_running / batch_ct, # the loss for this batch
                          running / ct) # running loss for the epoch
                     )
                batch_running, batch_ct = 0, 0

            # As long as we have predictions from our images, take the mean of those predictions to determine a prediction for the video
            if preds_video:
                predictions[vid_name].extend([np.mean(preds_video)])

#             except Exception as e:
#                 print(f"ERROR: Video {vid_name}: {e}")
                
        if ct == 0:
            continue
        this_loss = running / ct
        if output:
            print("\nAverage Loss:", round(running / ct * 10000) / 10000.0,"\n")
        else:
            print("Epoch", epoch+1, "Average Loss:", round(this_loss * 10000) / 10000.0)
        print("Accuracy =", str(round(correct / ct * 100) / 100.0) + "%", "(" + str(correct), "/" , str(ct) + ")")
        print("Real vids:", str(round(correct_real / real_ct * 100) / 100.0) + "%", "(" + str(correct_real), "/" , str(real_ct) + ")")
        print("Fake vids:", str(round(correct_fake / fake_ct * 100) / 100.0) + "%", "(" + str(correct_fake), "/" , str(fake_ct) + ")")

In [None]:
train(model, 18, output=True)
'''
epoch 1
Accuracy = 63.15% (3499 / 5540)
Real vids: 82.94% (1916 / 2310)
Fake vids: 49% (1583 / 3230)
'''

In [None]:
checkpoint = {'model': resnet34(num_classes=2, sample_size=224, sample_duration=32),
             'state_dict': model.state_dict(),
#              'optimizer': optimizer.state_dict()
             }
torch.save(checkpoint, 'checkpoint.pth')

In [None]:
def load_checkpoint(filepath='checkpoint.pth'):
    # To load again
    checkpoint = torch.load(filepath)
    model = checkpoint['model']
    model.load_state_dict(checkpoint['state_dict'])

    # If using for testing
    for parameter in model.parameters():
        parameter.requires_grad = False
    model.eval()

    return model

In [None]:

def test(model):
    model.eval()
    
    # Mostly used for stats
    running, batch_running, ct, batch_ct, correct = 0, 0, 0, 0, 0
    real_ct, fake_ct, correct_real, correct_fake = 0, 0, 0, 0
    predictions = defaultdict(list)
        
    for batch_idx, vid_name in tqdm(enumerate(test_loader), position=1, desc="Batches", total=len(train_fnames)):

        # Batch size should be 1!
        assert(len(vid_name) == 1)

        vid_name = vid_name[0]

        # Get the video by cutting out some images
        video_frames = get_frames(os.path.join(TRAIN_PATH_PRE_FIX, vid_name))

        preds_video = []

        gt_text = get_label(vid_name)
        gt_label = 0 if gt_text == 'REAL' else 1

        # Generate our predictions by getting a confidence score of each frame
        for im in video_frames:

            # Done by a strong notebook... might be worth trying out later
    #             model = HFlipWrapper(model=model)
            if gt_label == 0:

                im_real = im
                with torch.no_grad():
                    im_real = DeepFace.detectFace(im_real, detector_backend = 'ssd', enforce_detection = False)

                    # Get transforms/variations of our image
                    transforms = A.Compose([A.Resize(height=112, width=112),
                             A.Normalize()])

                    im_real = torch.from_numpy(im_real.transpose([3, 0, 1, 2])).float()

                    # Unsqueeze because we are using 1 frame, not many frames together as a video
                    im_real = im_real.unsqueeze(0)

                y_pred = model(im_real)
                prob0, prob1 = torch.mean(torch.exp(F.log_softmax(y_pred, dim=1)),dim=0)

                # Add this frame's prediction
                preds_video.append(float(prob1))

                # Take the loss after generating predictions on all images for a given video
                loss = F.mse_loss(prob1.reshape(1), torch.tensor(gt_label, dtype=torch.float32).reshape(1).cuda())

                # If guessed correctly that it was REAL
                if prob1 < 0.5: # already know gt_label == 0):
                    correct += 1
                    correct_real += 1

                real_ct += 1
                running += loss.item()
                ct += 1

            else:

                with torch.no_grad():
                    # Get transforms/variations of our image
                    transforms = A.Compose([A.Resize(height=112, width=112),
                                     A.Normalize()])

                    im = DeepFace.detectFace(im, detector_backend = 'ssd', enforce_detection = False)

                    im = torch.from_numpy(im.transpose([3, 0, 1, 2])).float()

                    # Unsqueeze because we are using 1 frame, not many frames together as a video
                    im = im.unsqueeze(0)


                y_pred = model(im)
                prob0, prob1 = torch.mean(torch.exp(F.log_softmax(y_pred, dim=1)),dim=0)

                # Add this frame's prediction
                preds_video.append(float(prob1))

                # Take the loss after generating predictions on all images for a given video
                loss = F.mse_loss(prob1.reshape(1), torch.tensor(gt_label, dtype=torch.float32).reshape(1).cuda())

                if prob1 >= 0.5: # already know that gt_label == 1):
                    correct += 1
                    correct_fake += 1

                fake_ct += 1
                running += loss.item()
                ct += 1

        # As long as we have predictions from our images, take the mean of those predictions to determine a prediction for the video
        if preds_video:
            predictions[vid_name].extend([np.mean(preds_video)])

#             except Exception as e:
#                 print(f"ERROR: Video {vid_name}: {e}")

    if ct == 0:
        return
    this_loss = running / ct
    print("\nAverage Loss:", round(running / ct * 10000) / 10000.0,"\n")
    print("Accuracy =", str(round(correct / ct * 100) / 100.0) + "%", "(" + str(correct), "/" , str(ct) + ")")
    print("Real vids:", str(round(correct_real / real_ct * 100) / 100.0) + "%", "(" + str(correct_real), "/" , str(real_ct) + ")")
    print("Fake vids:", str(round(correct_fake / fake_ct * 100) / 100.0) + "%", "(" + str(correct_fake), "/" , str(fake_ct) + ")")
    return correct / ct

In [None]:
# No way to test this easily
test(model)

In [None]:
for k,v in predictions.items():
    string = '{} : {:.4f} //'.format(k, np.mean(v))
    for proba in v:
        string += ' {:.4f}'.format(proba)
    print(string)