DETECT DEEPFAKES NETWORK

First, we need to import the required packages

In [1]:
import torch
import torchvision
import cv2

from imutils import paths
import shutil

import torch.nn as nn
import torch.optim as optim

from torchvision import transforms
from torchvision.datasets import ImageFolder

from torch.utils.data import DataLoader

import matplotlib.pyplot as plt

import os
import numpy as np
from tqdm import tqdm #to show progressive meter

  from .autonotebook import tqdm as notebook_tqdm


We've decided to use Faceforesics' dataset which contains real and fake videos.
To download the dataset from kaggle you need to upload the .json file into the .kaggle directory or to use the function:


os.environ['KAGGLE_USERNAME'] = "username from the json file"

os.environ['KAGGLE_KEY'] = "key from the json file"

Below, we download the dataset from kaggle and we unzip the folder in a destination directory.

In [None]:
pip install kaggle

In [2]:
os.environ['KAGGLE_USERNAME'] = "stefanoservillo"
os.environ['KAGGLE_KEY'] = "d820211db8d6f4de9f2656d1eb4c2c38"

!kaggle datasets download -d sorokin/faceforensics

"kaggle" non � riconosciuto come comando interno o esterno,
 un programma eseguibile o un file batch.


In [None]:
!mkdir /content/data

!unzip faceforensics.zip -d /content/data

----------------------------------------
TRAINING AND VALIDATION SET
----------------------------------------
Then we divide the dataset in training and validation set. In order to do this, we define training and validation paths and the percentage of videos we want in our validation set. We decide to take only 400 videos from the dataset due to memory limitations.

In [None]:
# specify path to the dataset
DATASET_PATH = "/content/data"

# specify the paths to our training and validation set
TRAIN = "train"
VALIDATION = "val"

# split of validation set
VAL_SPLIT = 0.2

Below, we have the copy_videos() function, which takes as input the list of paths (in DATASET_PATH) and a destination folder for our videos. 

In [None]:
def copy_videos(videoPaths, folder, Set):

  if not os.path.exists(folder):
    os.makedirs(folder)
  
  number = 0
  max_videos = 0
  
  # 150 videos for training for each type
  if folder=='train':
    max_videos = 150
  #50 videos for validation for each type
  else:
    max_videos = 50
  
  while(number<max_videos):

    path = videoPaths[number]
    # grab image name and its label from the path and create
		# a placeholder corresponding to the separate label folder
    videoName = path.split(os.path.sep)[-1]
    labelFolder = os.path.join(folder, Set)
		
    # check to see if the label folder exists and if not create it
    if not os.path.exists(labelFolder):
      os.makedirs(labelFolder)
		
    # construct the destination image path and copy the current
		# image to it
    destination = os.path.join(labelFolder, videoName)
    shutil.copy(path, destination)
  
    number+=1

We load all the image paths and we randomly shaffle them. Then, we generate training and validation paths.

In [None]:
# load all altered image paths and randomly shuffle them
print("[INFO] loading video paths...")
videoAlteredPaths = list(paths.list_files(DATASET_PATH+'/manipulated_sequences/Deepfakes/c23/videos'))
np.random.shuffle(videoAlteredPaths)

# generate altered training and validation paths
valAlteredPathsLen = int(len(videoAlteredPaths) * VAL_SPLIT)
trainAlteredPathsLen = len(videoAlteredPaths) - valAlteredPathsLen
trainPaths = videoAlteredPaths[:trainAlteredPathsLen]
valPaths = videoAlteredPaths[trainAlteredPathsLen:]

# copy the altered training and validation images to their respective
# directories
print("[INFO] copying training and validation altered videos...")
copy_videos(trainPaths, TRAIN, "altered")
copy_videos(valPaths, VALIDATION, "altered")

# load all the original image paths and randomly shuffle them
print("[INFO] loading video paths...")
videoOriginalPaths = list(paths.list_files(DATASET_PATH+'/original_sequences/youtube/c23/videos'))
np.random.shuffle(videoOriginalPaths)

# generate original training and validation paths
valOriginPathsLen = int(len(videoOriginalPaths) * VAL_SPLIT)
trainOriginPathsLen = len(videoOriginalPaths) - valOriginPathsLen
trainPaths = videoOriginalPaths[:trainOriginPathsLen]
valPaths = videoOriginalPaths[trainOriginPathsLen:]

# copy the original training and validation images to their respective
# directories
print("[INFO] copying training and validation original videos...")
copy_videos(trainPaths, TRAIN, "original")
copy_videos(valPaths, VALIDATION, "original")

----------------------------------------
OPTICAL FLOW EXTRACTION
----------------------------------------
Once we've copied our files in training and validation folders, we can preprocess the videos.
First, we create some directories to store our optical flow.

In [None]:
opticalPath = "/content/optical_flow/"

In [None]:
if not os.path.exists(opticalPath):
  os.makedirs(opticalPath)
  os.makedirs(opticalPath+"training/original")
  os.makedirs(opticalPath+"training/altered")
  os.makedirs(opticalPath+"validation/original")
  os.makedirs(opticalPath+"validation/altered")

Before computing the optical flow we detect the faces for each frame and we crop them. Otherwise, the computation will be too slow and too heavy.

Below, we present the function which is meant to detect the faces in a given frame. We decide to use a Haar feature-based cascade classifiers to do this. The function returns a crop of the image around the face.

In [None]:
def detect_face(img):
    gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_alt2.xml')
    faces = face_cascade.detectMultiScale(gray, scaleFactor=1.2, minNeighbors=5)  
    
    for (x, y, w, h) in faces:
        img = cv2.rectangle(img, (x, y), (x + w, y + h), (255, 0, 0), 2)
    
    if (len(faces) == 0):
        return None
    
    return gray[y - 40 : y + w + 40, x - 40 : x + h + 40]

We then define a way to compute the optical flow for training and validation set. We use the Flow Farneback method and we save the results in the created directories.

In [None]:
def compute_optical_flow(prvs, nxt, number, name,flag,Set=None):
    
    # Creates an array filled with zero 
    # with the same dimensions of the frame
    rgb_image = cv2.cvtColor(prvs, cv2.COLOR_GRAY2BGR)
    hsv = np.zeros_like(rgb_image)
    hsv[..., 1] = 255

    # Compute the optical flow
    flow = cv2.calcOpticalFlowFarneback(prvs, nxt, None, 0.5, 3, 15, 3, 5, 1.2, 0)
    
    # Magnitude and angle of the 2D vectors
    mag, ang = cv2.cartToPolar(flow[..., 0], flow[..., 1])

    # Sets image hue and value according to the optical flow direction
    # and magnitude, then converts HSV to RGB (BGR) color representation
    hsv[..., 0] = ang*180/np.pi/2
    hsv[..., 2] = cv2.normalize(mag, None, 0, 255, cv2.NORM_MINMAX)
    bgr = cv2.cvtColor(hsv, cv2.COLOR_HSV2BGR)

    if flag:
      plt.imsave(opticalPath+'{}/{}-{}.png'.format(Set,name,number),bgr)
    else:
      plt.imsave('/content/finalVideo/video/{}-{}.png'.format(name,number),bgr)
    
    return bgr

Now, we define the function where we extract the frame from the training/validation videos and where we compute the optical flow.
Then, we call it. 

For both training and validation set we compute and save the optical flow of three random frames for each video. This because we can't extract every frame from each video or we will run out of memory. Moreover, taking three frames for each video, gives us very different data to train on. 

In [None]:
def frames(path,Set):
    video = cv2.VideoCapture(path)
    ret, old_frame = video.read()
    if not ret:
        print('No frames!')
        return
    face1 = detect_face(old_frame)
    # name of the video
    path = path.split('/')[1]+'/'+path.split('/')[2][0:-4]

    number = 0
    while(number<3):
        
        ret, new_frame = video.read()
        if not ret:
            print("The video is finished")
            break
        face2 = detect_face(new_frame)

        try:
          face1 = cv2.resize(face1, (300, 300), interpolation = cv2.INTER_AREA)
          face2 = cv2.resize(face2, (300, 300), interpolation = cv2.INTER_AREA)
          optical_flow = compute_optical_flow(face1, face2, number,path,True,Set)
        except Exception as e:
          continue
        
        # skip 20 frames to give randomness
        for i in range(0,20,1):
            video.read()
        ret, old_frame = video.read()
        face1 = detect_face(old_frame)
        number +=1

    video.release()

Execute this cell only one time, otherwise the program will extract the frames again. We compute and save the optical flow for training and validation data.

In [None]:
videoTrainAlteredPaths = list(paths.list_files(TRAIN))
for i in videoTrainAlteredPaths:
  frames(i,'training')
videoValAlteredPaths = list(paths.list_files(VALIDATION))
for i in videoValAlteredPaths:
  frames(i,'validation')

LOAD THE DATA
We then define some transformations and we use ImageFolder to load the data.

In [None]:
tran = transforms.Compose([
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
        transforms.ToTensor()
])
trainDataset=ImageFolder('/content/optical_flow/training',transform=tran)
valDataset=ImageFolder('/content/optical_flow/validation',transform=tran)

Below, we define batch size and the device. Subsequently, we use DataLoader, which represents a Python iterable over our datasets. Our classes (altered and original) are inside class_names.

In [None]:
BATCH_SIZE=32
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

train_loader=DataLoader(trainDataset, batch_size=BATCH_SIZE,shuffle=True)
val_loader=DataLoader(valDataset,batch_size=BATCH_SIZE)
class_name = trainDataset.classes

PLOT A BATCH
If you want to visualize a batch of data, you can run this function. This works also at test time, plotting the prediction.

In [None]:
def show(imgs,labels,predictions=None):
  total = 0
  good = 0
  fig = plt.figure(figsize=(21,12))

  # iterate over the batch, we plot one image at a time with its label
  for i,img in enumerate(imgs):
    total +=1
    fig.add_subplot(4,8, i+1)
    label=labels[i] # actual label of the image
    
    # this code runs only when we have a prediction for our batch
    if predictions==None:
      title="Label: {0}".format(class_name[label])
    else:
      prediction=predictions[i] # prediction done by our model
      title="prediction: {0}\nlabel:{1}".format(class_name[prediction],class_name[label])
      if class_name[label] == class_name[prediction]:
        good +=1
    
    plt.title(title)
    img = img.numpy().transpose((1, 2, 0))
    mean = np.array([0.485, 0.456, 0.406])
    std = np.array([0.229, 0.224, 0.225])
    img = std * img + mean
    img = np.clip(img, 0, 1) # clip values outside the interval 
    plt.axis("off")
    plt.imshow(img)
  return [total,good] # this is usefull to predict the final accuracy
  
plt.show()

inputs, classes = next(iter(train_loader))

show(inputs,classes)

TRAIN FUNCTION
This is the function which is responsible to train the model. It takes in input the model which is -in our case- a pretrained model, the loss, the dataloader, the optimizer and the number of the current epoch.

We store the losses and the accuracies inside a dictionary.

We set the number of epochs at 20.

In [None]:
losses = {'train':[], 'val':[]}
accuracies = {'train':[], 'val':[]}
epochs=20

In [None]:
def train(model,loss_fn,dataloader,optimizer,epoch):
  print(f'Epoch {epoch}/{epochs}')

  model.train() # model in train mode
  
  total_loss=0    
  correct=0
  total=0

  for data in tqdm(dataloader):
    
    inputs,labels=data[0].to(device),data[1].to(device) # takes inputs and classes from the train dataset
    
    outputs=model(inputs) # prediction of the model
    
    loss=loss_fn(outputs,labels) # loss function
    
    optimizer.zero_grad() # gradient to zero
    loss.backward() # backward passes
    optimizer.step()

    total_loss += loss.item()
    
    ret, predicted = outputs.max(1) # prediction
    total += labels.size(0)
    correct += predicted.eq(labels).sum().item()
      
  loss=total_loss/len(dataloader)
  accuracy=100.*correct/total
  
  accuracies['train'].append(accuracy)
  losses['train'].append(loss)
  print('Train Loss: %.3f | Accuracy: %.3f'%(loss,accuracy))

VALIDATION FUNCTION
The val_model function is similar to the train_model function, but we do not update the weigths. At the beginning of the function we set the model to evalution mode.

In [None]:
def validation(model,loss_fn,dataloader,epoch):
  model.eval()

  total_loss=0
  correct=0
  total=0

  with torch.no_grad():
    for data in tqdm(dataloader):
      images,labels=data[0].to(device),data[1].to(device)
      
      outputs=model(images)

      loss= loss_fn(outputs,labels)
      total_loss+=loss.item()
      
      ret, predicted = outputs.max(1)
      total += labels.size(0)
      correct += predicted.eq(labels).sum().item()
  
  loss=total_loss/len(dataloader)
  accuracy=100.*correct/total

  losses['val'].append(loss)
  accuracies['val'].append(accuracy)

  print('Test Loss: %.3f | Accuracy: %.3f'%(loss,accuracy)) 

SETUP THE MODEL
Below, we setup the model. We load a pretrained model and we reset the final fully connected layer. Here, we are just finetuning the convolutional network.

In [None]:
model = torchvision.models.resnet18(pretrained=True)

for param in model.parameters():
    param.requires_grad = False #freeze part of the model do train the rest

# change only the last FC layer
num_ftrs = model.fc.in_features

model.fc = nn.Linear(num_ftrs, 2)

model = model.to(device)

loss_fn = nn.CrossEntropyLoss()

# check the parameters to update
params_to_update = []
for name,param in model.named_parameters():
    if param.requires_grad == True:
        params_to_update.append(param)

# optimizer
optimizer_ft = optim.SGD(params_to_update, lr=0.001, momentum=0.9)

TRAIN THE MODEL
We train and evaluate the model for each batch.

In [None]:
for epoch in range(1,epochs+1): 
  train(model,loss_fn,train_loader,optimizer_ft,epoch)
  validation(model,loss_fn,val_loader,epoch)

GRAPHS
We can plot training and validation loss/accuracy for each batch.

In [None]:
plt.figure(figsize=(8, 8))
plt.subplot(2, 1, 1)
plt.plot(accuracies['train'], label='Training Accuracy')
plt.plot(accuracies['val'], label='Validation Accuracy')
plt.legend(loc='lower right')
plt.ylabel('Accuracy')
plt.title('Training and Validation Accuracy')

plt.subplot(2, 1, 2)
plt.plot(losses['train'], label='Training Loss')
plt.plot(losses['val'], label='Validation Loss')
plt.legend(loc='upper right')
plt.ylabel('Cross Entropy')
plt.ylim([0,1.0])
plt.title('Training and Validation Loss')
plt.xlabel('epoch')
plt.show()

This function allows us to show the prediction of our model, to check the trend

In [None]:
def test(model,images,actual_label):
  model.eval()
  with torch.no_grad():
    inputs = images.to(device)
    outputs = model(inputs)
    ret, preds = torch.max(outputs, 1)
    pair=show(images,actual_label,preds.cpu())
    return pair

images,classes=next(iter(val_loader))
total=0
good=0
for i in iter(val_loader):
  pair = test(model,i[0],classes)
  total +=pair[0]
  good += pair[1]

print("Accuracy: "+ str((good/total)*100)+ '%')

USER'S FUNCTIONS
We define compute_optical_flow() and extract_frames() functions for a user that wants to upload a video, compute its optical flow and save it in a specific directory.

Unlike the previous functions for training and validation data, these functions extract all the frames of the given video.

In [None]:
!kaggle datasets download -d stefanoservillo/testtt

In [None]:
!unzip testtt.zip

In [None]:
os.makedirs("finalVideo/video")

In [None]:
def frames(path):
    video = cv2.VideoCapture(path)
    ret, old_frame = video.read()
    if not ret:
        print('No frames')
        return
    face1 = detect_face(old_frame)
    path = path.split('/')[3][0:-4]

    number = 0
    while(video.isOpened()):
        ret, new_frame = video.read()
        if not ret:
            print("The video is finished")
            break
        face2 = detect_face(new_frame)

        try:
          face1 = cv2.resize(face1, (300, 300), interpolation = cv2.INTER_AREA)
          face2 = cv2.resize(face2, (300, 300), interpolation = cv2.INTER_AREA)
          optical_flow = compute_optical_flow(face1, face2, number,path,False)
        except Exception as e:
          continue

        old_frame = new_frame
        face1 = face2
        number +=1
    video.release()

In [None]:
frames('/content/prova/089_065.mp4')

In [None]:
def show(imgs,predictions):
  fig = plt.figure(figsize=(21,12))

  total = 0
  fake = 0
  for i,img in enumerate(imgs):
    total +=1
    fig.add_subplot(4,8, i+1)
    
    prediction=predictions[i]
    title="prediction: {0}".format(class_name[prediction]) # take the classes of the prediction
    if class_name[prediction] == 'altered':
      fake +=1

    plt.title(title)
    img = img.numpy().transpose((1, 2, 0))
    mean = np.array([0.485, 0.456, 0.406])
    std = np.array([0.229, 0.224, 0.225])
    img = std * img + mean
    img = np.clip(img, 0, 1)
    plt.axis("off")
    plt.imshow(img)
  return [total,fake]

In [None]:
Dataset=ImageFolder('/content/finalVideo',transform=tran) # load the data in the dataset

In [None]:
data_loader=DataLoader(Dataset,batch_size=BATCH_SIZE)

In [None]:
def predict_images(model,images):
  model.eval()
  with torch.no_grad(): #torch.no_grad() is used to reduce memory consumptions disabled gradient calculation
    inputs = images.to(device) # add the inputs to the model
    outputs = model(inputs) # obtained the prediction of the model
    ret, preds = torch.max(outputs, 1) # returns max output with it's position
    pair =show(images,preds.cpu()) # show the results
    return pair

total=0
fake=0
for i in iter(data_loader):
  pair = predict_images(model,i[0])
  total +=pair[0]
  fake += pair[1]
print("Video fake al "+ str((fake/total)*100)+ '%')