In [None]:
!pip install cvlib

In [3]:
import numpy as np
import matplotlib.pyplot as plt
import cv2
import os
from numba import cuda

import torch
import torch.nn.functional as F
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
import pywt

from torch.utils.data import DataLoader, Dataset
from sklearn import metrics
from sklearn.metrics import confusion_matrix

import cvlib as cv

In [4]:
# transforms tensor + normalization (0 mean, unit variance)
transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0), (1))])

# custom dataset loader     
class CustomDataset(Dataset):
    def __init__(self, list_imgs, transform):
        self.data = list_imgs
        self.transform = transform

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img_tensor = torch.zeros(len(self.data[idx])-1,self.data[idx][0].shape[0],self.data[idx][0].shape[1])
        for i in range(len(self.data[idx])-1):
            img_tensor[i,:,:] = self.transform(self.data[idx][i])
            
        class_id   = torch.tensor([self.data[idx][-1]])

        return img_tensor.float(), class_id.float()

In [5]:
# Reduced VGG16 model
class VGG16(nn.Module):
    def __init__(self, in_channels):
        super(VGG16, self).__init__()
        self.conv1_1 = nn.Conv2d(in_channels=in_channels, out_channels=32, kernel_size=3, padding=1)
        self.conv2_1 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, padding=1)
        self.conv3_1 = nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, padding=1)

        self.maxpool = nn.MaxPool2d(kernel_size=2, stride=2)

        self.fc1 = nn.Linear(80000, 128)
        self.fc2 = nn.Linear(128, 1)

    def forward(self, x):
        x = F.relu(self.conv1_1(x))
        x = self.maxpool(x)
        x = F.relu(self.conv2_1(x))
        x = self.maxpool(x)
        x = F.relu(self.conv3_1(x))
        x = self.maxpool(x)
        x = x.reshape(x.shape[0], -1)
        x = F.relu(self.fc1(x))
        x = F.dropout(x, 0.5) # dropout to avoid overfitting
        x = torch.sigmoid(self.fc2(x))
        return x
    

# custom ultra-reduced VGG16 model   
class VGG16_reduced(nn.Module):
    def __init__(self, in_channels):
        super(VGG16_reduced, self).__init__()
        self.conv1_1 = nn.Conv2d(in_channels=in_channels, out_channels=10, kernel_size=3, padding=1)

        self.maxpool = nn.MaxPool2d(kernel_size=2, stride=2)

        self.fc1 = nn.Linear(100000, 128)
        self.fc2 = nn.Linear(128, 1)

    def forward(self, x):
        x = F.relu(self.conv1_1(x))
        x = self.maxpool(x)
        x = x.reshape(x.shape[0], -1)
        x = F.relu(self.fc1(x))
        x = F.dropout(x, 0.5) # dropout to avoid overfitting
        x = torch.sigmoid(self.fc2(x))
        return x
    
def train(first_epoch, num_epochs, model, criterion, optimizer, train_loader, valid_loader, all_targets):

    train_losses, valid_losses = [],[]

    for epoch in range(first_epoch, first_epoch + num_epochs):

        # training phase
        train_loss, train_acc = train_for_epoch(model, criterion, optimizer, train_loader)

        # validation phase
        valid_loss, valid_acc = validate(model, criterion, valid_loader, all_targets)        

        print(f'[{epoch:03d}] train loss: {train_loss:04f}  '
              f'val loss: {valid_loss:04f}  '
              f'train acc: {train_acc*100:.4f}  '
              f'val acc: {valid_acc*100:.4f}%')

        train_losses.append(train_loss)
        valid_losses.append(valid_loss)
        
    torch.save(model.state_dict(), '\kaggle\working\df_model'+str(epoch))        


def train_for_epoch(model, criterion, optimizer, train_loader):

    # put model in train mode
    model.train()

    # keep track of the training losses during the epoch
    train_losses = []
    y_pred = []
    y_true = []

    for batch, targets in train_loader:
        # Move the training data to the GPU
        batch = batch.to(device)
        targets = targets.to(device)

        # clear previous gradient computation
        optimizer.zero_grad()

        # forward propagation
        predictions = model(batch)

        # calculate the loss
        loss = criterion(predictions, targets)

        # backpropagate to compute gradients
        loss.backward()

        # update model weights
        optimizer.step()

        # update average loss
        train_losses.append(loss.item())
        
        # save predictions
        y_pred.extend(predictions.cpu().detach().numpy().round())
        y_true.extend(targets.cpu().detach().numpy())

    # calculate average training loss
    train_loss = np.sum(train_losses)/len(train_losses)
    
    # Collect predictions into y_pred and ground truth into y_true
    y_pred = np.array(y_pred, dtype=np.float32)
    y_true = np.array(y_true, dtype=np.float32)
    
    y_pred = np.squeeze(y_pred)
    y_true = np.squeeze(y_true)

    # Calculate accuracy as the average number of times y_true == y_pred
    accuracy = np.sum(y_pred==y_true)/len(y_pred)

    return train_loss, accuracy


def validate(model, criterion, valid_loader, all_targets):

    # put model in evaluation mode
    model.eval()

    # keep track of losses and predictions
    valid_losses = []
    y_pred = []

    # We don't need gradients for validation, so wrap in 
    # no_grad to save memory
    with torch.no_grad():

        for batch, targets in valid_loader:

            # Move the training batch to the GPU
            batch = batch.to(device)
            targets = targets.to(device)

            # forward propagation
            predictions = model(batch)

            # calculate the loss
            # loss = criterion(predictions, targets.squeeze())
            loss = criterion(predictions, targets)


            # update running loss value
            valid_losses.append(loss.item())

            # save predictions
#             y_pred.extend(predictions.argmax(dim=1).cpu().numpy())
            y_pred.extend(predictions.cpu().numpy().round())


    # compute the average validation loss
    valid_loss = np.mean(valid_losses)

    # Collect predictions into y_pred and ground truth into y_true
    y_pred = np.array(y_pred, dtype=np.float32)
    y_true = np.array(all_targets, dtype=np.float32)
    
    y_pred = np.squeeze(y_pred)

    # Calculate accuracy as the average number of times y_true == y_pred
    accuracy = np.sum(y_pred==y_true)/len(y_pred)

    return valid_loss, accuracy

In [6]:
# function to load the dataset
def load_dataset(folder_path, type_face):

    images = []

    for number_folder in os.listdir(folder_path + type_face):

        for image_name in os.listdir(folder_path + type_face + number_folder + '/'):
            img = cv2.imread(folder_path + type_face + number_folder + '/' + image_name, 1)
            images.append(img)

    return images

# function to plot one image
def plot_single_img(img):
    rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    plt.imshow(rgb)
    plt.show()

# function that finds a returns the cropped face from the input image
def face_detector_cvlib(img, mode, resize=False, x=150, y=220):
    
    faces, _ = cv.detect_face(img)
    
    try:
        
        # Try get the first face prediction
        x1, y1, x2, y2 = faces[0]

        # Check other predictions if some of these statements doesn't hold
        if (x1<0 or y1<0 or x2>img.shape[1] or y2>img.shape[0]):
            for i in range(1, len(faces)):
                x1, y1, x2, y2 = faces[i]
                # break when prediction is within the boundaries of the img
                if (x1>0 and y1>0 and x2<img.shape[1] and y2<img.shape[0]):
                    break
        
        if not resize:
            # return cropped img
            return img[y1:y2, x1:x2]
        else:
            return cv2.resize(img[y1:y2, x1:x2], (x, y))
                
    except:
        
        print('No face was detected')

        if mode=="development":
            pass
        else:
            return cv2.resize(img, (x, y))

# function to create the gabor filters 
def get_gabor_filters(k_size=31, n_theta=16, lambda_val=2, sigma_val=4, gamma_val=0.5, psi_val=0):

    # theta  = orientation
    # sigma  = standard deviation of the gaussian envelope
    # lambda = wavelength of the sinusoidal factor.
    # gamma  = spatial aspect ratio
    # phi    = pahse offset

    filters = []
    for psi_value in psi_val:
        for sigma_value in sigma_val:
            for gamma_value in gamma_val:
                for lambda_value in lambda_val:
                    for theta_value in np.arange(0, np.pi, np.pi / n_theta):
                        kern = cv2.getGaborKernel(ksize=(k_size, k_size), sigma=sigma_value, theta=theta_value, lambd=lambda_value, gamma=gamma_value, psi=psi_value, ktype=cv2.CV_32F)
                        # kern /= 1.5*kern.sum()
                        # kern /= kern.sum()
                        filters.append(kern)

    return filters

# function to filter the images with the gabor filters
def gabor_filter(img, filters):
    filtered_img = np.zeros_like(img)
    for kern in filters:
        fimg = cv2.filter2D(img, cv2.CV_8UC3, kern)
        np.maximum(filtered_img, fimg, filtered_img)

    return filtered_img

# function to normalize the images within a range (max_value)
def normalize_img(img, max_value):

    max_f = np.max(img)
    min_f = np.min(img)

    img = np.round(((img - min_f) / (max_f - min_f)) * max_value)
    
    return img.astype('int32')
    
# function that returns the coocurrence matrix of an image
def co_ocurrence_matrix(gray):

    coom = np.zeros(shape=([4, 200, 200]))
    
    gray_norm = normalize_img(gray, 199)


    for i in range(gray_norm.shape[0]):
        for j in range(gray_norm.shape[1]):
            
            #   0 degrees
            if j != gray_norm.shape[1] - 1:
                coom[0, gray_norm[i,j], gray_norm[i,j+1]] += 1
            #  90 degrees
            if i != gray_norm.shape[0] - 1:
                coom[2, gray_norm[i,j], gray_norm[i+1,j]] += 1

            if i < gray_norm.shape[0] - 1 and j < gray_norm.shape[1] - 1:
                #  45 degrees
                coom[1, gray_norm[i+1,j], gray_norm[i,j+1]] += 1
                # 135 degrees
                coom[3, gray_norm[i,j], gray_norm[i+1,j+1]] += 1

    return coom

# function that returns the wavelet transform at different levels of an image 
def wavelet_tranform(img, levels=5, w_type = 'haar', color=False): #9 features

    feature_list = []

    if color:

        caR = img[:,:,2]
        caG = img[:,:,1]
        caB = img[:,:,0]

        # wavelet transform for number of levels
        for i in range(levels):

            caR, (chR, cvR, cdR) = pywt.dwt2(caR, wavelet=w_type)
            caG, (chG, cvG, cdG) = pywt.dwt2(caG, wavelet=w_type)
            caB, (chB, cvB, cdB) = pywt.dwt2(caB, wavelet=w_type)

            ch = np.stack([chR,chG,chB], axis=2)
            cv = np.stack([cvR,cvG,cvB], axis=2)
            cd = np.stack([cdR,cdG,cdB], axis=2)
            print(ch.shape)
            feature_list.extend([chR,cvR,cdR,chG,cvG,cdG,chB,cvB,cdB])
            
        ca = np.stack([caR,caG,caB], axis=2)
        feature_list.extend([caR,caG,caB])



    # gray level
    else:

        # compute the features from the different levels
        for i in range(levels):

            ca, (ch, cv, cd) = pywt.dwt2(img, wavelet=w_type)
            img = ca

            # standard deviation
            feature_list.append(ch)
            feature_list.append(cv)
            feature_list.append(cd)
    
        feature_list.append(ca)

    return  feature_list


# kernel definition to gradient extraction (horizontal and vertical edges)
kernelx = np.array([[ 1,  2,  1],
                    [ 0,  1,  0],
                    [-1, -2, -1]])

kernely = np.array([[-1,  0,  1],
                    [-2,  0,  2],
                    [-1,  0,  1]])

# function that returns the image filtered and the magnitude of the gradient
def compute_gradient(gray): #5 features

    grad_list = []
    Gx   = cv2.filter2D(gray, ddepth=cv2.CV_32F, kernel=kernelx) # enough with CV_32F?
    Gy   = cv2.filter2D(gray, ddepth=cv2.CV_32F, kernel=kernely)
    magnitude = np.sqrt(Gx**2 + Gy**2)
    grad_list.extend([Gx, Gy, magnitude])
    
    return grad_list 

def compute_AUC(y_true, y_pred_prob, save_plot = False):
    
    # transpose so 1 is the positive label
    y_true = 1 - y_true
    # therefore, invert probabilities as well
    y_pred_prob = 1. - y_pred_prob

    # compute the roc curve
    fpr, tpr, thresholds = metrics.roc_curve(y_true, y_pred_prob, pos_label=1)
    # compute the AUC
    auc_val = metrics.auc(fpr, tpr)
    
    print('AUC: ',auc_val)
    fig = plt.figure(figsize=(12,8))
    plt.plot(fpr, tpr, 'g')
    plt.title('AUC = '+str("{:.3f}".format(auc_val)))
    plt.xlabel('FPR')
    plt.ylabel('TPR')
    plt.figure(tight_layout=True)
    fig.savefig('AUC_'+str("{:.3f}".format(auc_val))+'.jpg')

In [None]:
## PARAMETERS
task_folder = ["../input/task1-df/Task_1/","../input/task2df/Task_2_3/"]
folder_path = ["development", "evaluation"]
options = ['real/', 'fake/']
width_img  = 200
height_img = 200

# split in training and validation 
P_SPLIT = True

# TASK SELECTION
# 1 = TASK 1
# 2 = TASK 2
F_TASK = 1


gf_theta  = 16
gf_lambda = [1, 1.5, 2, 2.5, 3, 3.5, 4, 4.5, 5]
gf_sigma  = [4]
# gf_phi    = [0, 0.25, 0.41] # 0
gf_phi    = [0] # 0
# gf_gamma  = [0.35, 0.5, 0.75] # 0.5
gf_gamma  = [0.5] # 0.5



# process
F_EXTRACT_FEATURES = True

if F_EXTRACT_FEATURES:
    for n_main in range(2):

        # 1. LOAD DATASET
        print('1. Loading dataset...')
        real_faces = load_dataset(folder_path = (task_folder[n_main*(F_TASK-1)] + folder_path[n_main] + "/"), type_face = options[0])
        fake_faces = load_dataset(folder_path = (task_folder[n_main*(F_TASK-1)] + folder_path[n_main] + "/"), type_face = options[1])


        # 2. CROP THE FACE REGION
        print('2. Cropping faces')
        real_faces_cropped = []
        fake_faces_cropped = []


        for i in range(len(real_faces)):
            img = face_detector_cvlib(real_faces[i], folder_path[n_main], resize=True, x=width_img, y=height_img)
            if isinstance(img, np.ndarray):
                real_faces_cropped.append(img)

        for i in range(len(fake_faces)):
            img = face_detector_cvlib(fake_faces[i], folder_path[n_main], resize=True, x=width_img, y=height_img)
            if isinstance(img, np.ndarray):
                fake_faces_cropped.append(img)

        real_feat = []
        fake_feat = []

        # 3. GET GABOR FILTERS (append labels)
        print('3. Creating filters')
        gabor_filters = get_gabor_filters(k_size=31, n_theta=gf_theta, lambda_val=gf_lambda, sigma_val=gf_sigma, gamma_val=gf_gamma, psi_val=gf_phi)
        
        # 4. FILTER IMAGES
        print('4. Filtering images')
        # 4.1 Real faces
        for i, rgb_img in enumerate(real_faces_cropped):
            img_gray = cv2.cvtColor(rgb_img, cv2.COLOR_BGR2GRAY)
            
            # GABOR FILTERS
            imgs_feat = []
            for f in range(0, len(gabor_filters), gf_theta):
                imgs_feat.append(gabor_filter(img = img_gray, filters = gabor_filters[f:f+gf_theta]))
                
            # CO-OCCURENCE MATRIX
#             imgs_feat.extend(co_ocurrence_matrix(img_gray))

            # WAVELET (can not append, different sizes (levels))
#             imgs_feat = imgs_feat + wavelet_tranform(img, levels=2, w_type = 'haar', color=True)
            
            # GRAD
#             imgs_feat = imgs_feat + compute_gradient(img_gray)
            
            # RGB
            imgs_feat.extend([rgb_img[:,:,0],rgb_img[:,:,1],rgb_img[:,:,2], 0]) # 0 REAL
            
            # FINAL FEATURES
            real_feat.append(imgs_feat)
            
        # 4.2 Fake faces
        for i, rgb_img in enumerate(fake_faces_cropped):
            img_gray = cv2.cvtColor(rgb_img, cv2.COLOR_BGR2GRAY)
            
            # GABOR FILTERS
            imgs_feat = []
            for f in range(0, len(gabor_filters), gf_theta):
                imgs_feat.append(gabor_filter(img = img_gray, filters = gabor_filters[f:f+gf_theta]))
                
            # CO-OCCURENCE MATRIX
#             imgs_feat.extend(co_ocurrence_matrix(img_gray))
            
            # WAVELET (can not append, different sizes (levels))
#             imgs_feat = imgs_feat + wavelet_tranform(img, levels=2, w_type = 'haar', color=True)

            # GRAD
#             imgs_feat = imgs_feat + compute_gradient(img_gray)
            
            # RGB
            imgs_feat.extend([rgb_img[:,:,0],rgb_img[:,:,1],rgb_img[:,:,2], 1]) # 1 FAKE
            
            # FINAL FEATURES
            fake_feat.append(imgs_feat)

        # 5. SPLIT TRAIN AND TEST
        print('4. Splitting data')
        if n_main==0:
            
            if P_SPLIT:
                n_split = 300
                train_imgs = real_feat[:n_split] + fake_feat[:n_split]

                val_imgs   = real_feat[n_split:] + fake_feat[n_split:]
                dataset_val   = CustomDataset(list_imgs=val_imgs, transform=transform)
                len_real_val = len(real_feat[n_split:])
                
            else:
                train_imgs = real_feat + fake_feat
            
            dataset_train = CustomDataset(list_imgs=train_imgs, transform=transform)

        else:
            test_imgs = real_feat + fake_feat
            dataset_test = CustomDataset(list_imgs=test_imgs, transform=transform)
            
            if P_SPLIT:
                all_targets = np.ones([len(val_imgs)])
                all_targets[:len_real_val] = 0
            else:
                all_targets = np.ones([len(test_imgs)])
                all_targets[:len(real_feat)] = 0


In [None]:
print('Number of channels: ', train_imgs[0][0].shape)

# 5. CREATE LOADERS
train_loader = DataLoader(dataset_train, batch_size=64, num_workers=0, shuffle=True)
test_loader  = DataLoader(dataset_test, batch_size=10, num_workers=0)  

if P_SPLIT:
    val_loader = DataLoader(dataset_val, batch_size=10, num_workers=0)


# 6. INIT MODEL
# model = VGG16(in_channels = 15)
model = VGG16_reduced(in_channels = 12)

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model.to(device)

# 7. LOSS AND OPTIMIZER
criterion = nn.BCELoss()
# optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-4)
optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.9, nesterov=True, weight_decay=1e-4)

# 8. TRAIN
print('8. Training model')

if P_SPLIT:
    # training to choose the architecture and paramiters of the model
    train(1, 13, model, criterion, optimizer, train_loader, val_loader, all_targets)
else:
    # last training with all the images
    train(1, 13, model, criterion, optimizer, train_loader, test_loader, all_targets)

In [None]:
# Load same model but without the dropout layer (added to figth against overfitting through training) so we get the same output every time
class VGG16_reduced(nn.Module):
    def __init__(self, in_channels):
        super(VGG16_reduced, self).__init__()
        self.conv1_1 = nn.Conv2d(in_channels=in_channels, out_channels=10, kernel_size=3, padding=1)

        self.maxpool = nn.MaxPool2d(kernel_size=2, stride=2)

        self.fc1 = nn.Linear(100000, 128)
        self.fc2 = nn.Linear(128, 1)

    def forward(self, x):
        x = F.relu(self.conv1_1(x))
        x = self.maxpool(x)
        x = x.reshape(x.shape[0], -1)
        x = F.relu(self.fc1(x))
        x = torch.sigmoid(self.fc2(x))
        return x

model = VGG16_reduced(in_channels = 12)
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model.to(device)

In [None]:
# load the weigths of the last epoch 
# model.load_state_dict(torch.load('\kaggle\working\df_model13'))
model.load_state_dict(torch.load('../input/weigths-tr/_kaggle_working_df_model13'))

In [None]:
# EVALUATE THE MODEL

all_targets_test = np.ones([len(test_imgs)])
all_targets_test[:len(real_feat)] = 0

# put model in evaluation mode
model.eval()

y_pred = []
y_pred_prob = []

# We don't need gradients for validation, so wrap in 
# no_grad to save memory
with torch.no_grad():

    for batch, targets in test_loader:

        # Move the training batch to the GPU
        batch = batch.to(device)
        targets = targets.to(device)

        # forward propagation
        predictions = model(batch)

        # save predictions
        y_pred_prob.extend(predictions.cpu().numpy())
        y_pred.extend(predictions.cpu().numpy().round())



# Collect predictions into y_pred and ground truth into y_true
y_pred = np.array(y_pred, dtype=np.float32)
y_pred_prob = np.array(y_pred_prob, dtype=np.float32)

y_true = np.array(all_targets_test, dtype=np.float32)

y_pred = np.squeeze(y_pred)
y_pred_prob = np.squeeze(y_pred_prob)


# Calculate accuracy as the average number of times y_true == y_pred
accuracy = np.sum(y_pred==y_true)/len(y_pred)

# print confusion matrix
CM = confusion_matrix(y_true, y_pred)
print(CM)

print(accuracy)
print(y_pred)
print(y_pred_prob)




In [None]:
# Compute the AUC and plot
compute_AUC(y_true, y_pred_prob, save_plot = True)

In [None]:
# save the labels, predictions and probabilities
np.save('y_true',y_true)
np.save('y_pred',y_pred)
np.save('y_pred_prob',y_pred_prob)

In [None]:
# See the results
list_rf = ['Real', 'Fake']
test_images = real_faces + fake_faces
for idx, (pp, gt) in enumerate(zip(y_pred, y_true)):
    print('GroundTruth: ',list_rf[int(gt)],' Prediction: ',list_rf[int(pp)])
    plot_single_img(test_images[idx])
    input()