# Deepfake Image Detector

Let's do this.

Let us first import all the libraries we need:

In [None]:
import tensorflow as tf
import pandas as pd
import numpy as np
# import matplotlib.pylab as plt
# import PIL.Image
# import PIL.ImageDraw
# import face_recognition
import os
import cv2
import copy

In [None]:
# Installing a package not directly available:
# !pip install face_recognition
# import face_recognition

We will first attempt to setup an input pipeline (fancy words for setting up a folder from where we can put the images into the program):

In [None]:

from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


# Preprocess Training Data

In [None]:
# Listing File Paths
images_path = '/content/drive/MyDrive/HCDS Project/real_and_fake_face'

# fake_images_path = images_path + "/training_fake"
# real_images_path = images_path + "/training_real"

# # Array of fake and real images name
# fake_images = os.listdir(fake_images_path)
# real_images = os.listdir(real_images_path)
# # print(real_images)

In [None]:
# BATCH_SIZE = 10    # No more than 50 images can be trained at once (because of RAM size limitations and how big images can be)
# ALTERNATE_AFTER = 2   # Show model 2 fake images, then 2 real images, then 2 fake, 2 real and so on
FOLDERS = ["training_real", "training_fake"]
IMG_SIZE=128
TRAINING_PROPORTION = 0.8
MAX_USE = 1
EPOCHS = 25

# 0 is real, 1 is fake
RUN_NAME = "Run 5 with sparse_categorical_crossentropy"


In [None]:
def loadTrainingData():
  X = []
  Y = []

  for folder in FOLDERS:
    path = os.path.join(images_path, folder)
    class_num = FOLDERS.index(folder)
    max_index = TRAINING_PROPORTION*len(os.listdir(path)) *MAX_USE

    for img in (os.listdir(path))[:int(max_index)]:
      try:
        img_array = cv2.imread(os.path.join(path,img))
        new_array = cv2.resize(img_array,(IMG_SIZE,IMG_SIZE))
        new_array = np.array(new_array)
        new_array = new_array.astype('float32')
        new_array /= 255

        X.append(new_array)
        Y.append(class_num)
      except Exception as e:
        continue

  return X, Y

X, Y = loadTrainingData()

In [None]:
X

In [None]:
Y

# Preprocess Training Data V2

In [None]:
# Listing File Paths
images_path = '/content/drive/MyDrive/HCDS Project/real_and_fake_divided'

# # Array of fake and real images name
# fake_images = os.listdir(fake_images_path)
# real_images = os.listdir(real_images_path)
# # print(real_images)
FOLDERS = ["training_real", "training_fake_easy", "training_fake_med", "training_fake_hard"]
IMG_SIZE=128
TRAINING_PROPORTION = 0.8
MAX_USE = 1
EPOCHS = 50

# 0 is real, 1 is fake
RUN_NAME = "Run 5 with sparse_categorical_crossentropy"

In [None]:
def loadTrainingData(folder):
  X = []
  Y = []
  path = os.path.join(images_path, folder)
  # class_num = FOLDERS.index(folder)
  class_num = 0
  if "fake" in folder:
    class_num = 1

  max_index = TRAINING_PROPORTION*len(os.listdir(path)) *MAX_USE

  for img in (os.listdir(path))[:int(max_index)]:
    try:
      img_array = cv2.imread(os.path.join(path,img))
      new_array = cv2.resize(img_array,(IMG_SIZE,IMG_SIZE))
      new_array = np.array(new_array)
      new_array = new_array.astype('float32')
      new_array /= 255

      X.append(new_array)
      Y.append(class_num)
    except Exception as e:
      continue

  return [X, Y]

easy_train_pure = loadTrainingData("training_fake_easy")
medium_train_pure = loadTrainingData("training_fake_med")
hard_train_pure = loadTrainingData("training_fake_hard")
real_train = loadTrainingData("training_real")

print("Loaded")
print("Splitting real images...")
# Split real_train into 3 parts:

total_real = len(real_train[0])
real_train_1 = [real_train[0][:int(total_real/3)],real_train[1][:int(total_real/3)]]
real_train_2 = [real_train[0][int(total_real/3):2*int(total_real/3)],real_train[1][int(total_real/3):2*int(total_real/3)]]
real_train_3 = [real_train[0][2*int(total_real/3):],real_train[1][2*int(total_real/3):]]

print("Splitted")
print("Batching together...")

# Batching together:

easy_train = [np.concatenate((easy_train_pure[0], real_train_1[0])), np.concatenate((easy_train_pure[1], real_train_1[1]))]
medium_train = [np.concatenate((medium_train_pure[0], real_train_2[0])), np.concatenate((medium_train_pure[1], real_train_2[1]))]
hard_train = [np.concatenate((hard_train_pure[0], real_train_3[0])), np.concatenate((hard_train_pure[1], real_train_3[1]))]

print("Batched")

Loaded
Splitting real images...
Splitted
Batching together...
Batched


In [None]:
overall_train = copy.deepcopy(easy_train)
overall_train = [np.concatenate((overall_train[0], medium_train[0])), np.concatenate((overall_train[1], medium_train[1]))]
overall_train = [np.concatenate((overall_train[0], hard_train[0])), np.concatenate((overall_train[1], hard_train[1]))]

overall_train = [np.concatenate((overall_train[0], real_train[0])), np.concatenate((overall_train[1], real_train[1]))]

In [None]:
medium_train

# Define Model:

Or load it if saved:

In [None]:
model = tf.keras.models.load_model('/content/drive/MyDrive/HCDS Project/trained_model5_0.6to0.9.h5')
# model = tf.keras.models.load_model('/content/drive/MyDrive/HCDS Project/trained_model4.h5')

Now compile and attach a logger:

In [None]:
model.compile(
              # optimizer = tf.keras.optimizers.Adam(),
              optimizer = tf.keras.optimizers.SGD(),
              loss = 'sparse_categorical_crossentropy',
              # loss = "MSE",
              metrics=[
                      # tf.keras.metrics.SparseCategoricalCrossentropy(),
                      #  tf.keras.metrics.Accuracy(),
                       'accuracy'
                       ]
              )

In [None]:

logger = tf.keras.callbacks.TensorBoard(
    log_dir=('./log/'+RUN_NAME),
    write_graph=True,
    histogram_freq=1
)



```
# This is formatted as code
```

# Train Model

In [None]:
history = model.fit(
    x=tf.cast(np.array(X), tf.float64),
    y=tf.cast(list(map(int,Y)),tf.int32),
    epochs=EPOCHS,
    # shuffle=False,
    # callbacks = [logger]
    )
history

Epoch 1/25
Epoch 2/25
Epoch 3/25
Epoch 4/25
Epoch 5/25
Epoch 6/25
Epoch 7/25
Epoch 8/25
Epoch 9/25
Epoch 10/25
Epoch 11/25
Epoch 12/25
Epoch 13/25
Epoch 14/25
Epoch 15/25
Epoch 16/25
Epoch 17/25
Epoch 18/25
Epoch 19/25
Epoch 20/25
Epoch 21/25
Epoch 22/25
Epoch 23/25
Epoch 24/25
Epoch 25/25


<tensorflow.python.keras.callbacks.History at 0x7fcf44524210>

# Train Model V2: Incremental Batch

In [None]:
# Train easy and some real

print("Easy---------------")
history = model.fit(
    x=tf.cast(np.array(easy_train[0]), tf.float64),
    y=tf.cast(list(map(int,easy_train[1])),tf.int32),
    epochs=EPOCHS,
    # shuffle=False,
    # callbacks = [logger]
    )

# Train medium and some real
print("Medium---------------")
history = model.fit(
    x=tf.cast(np.array(medium_train[0]), tf.float64),
    y=tf.cast(list(map(int,medium_train[1])),tf.int32),
    epochs=EPOCHS,
    # shuffle=False,
    # callbacks = [logger]
    )

# Train hard and some real
print("Hard---------------")
history = model.fit(
    x=tf.cast(np.array(hard_train[0]), tf.float64),
    y=tf.cast(list(map(int,hard_train[1])),tf.int32),
    epochs=EPOCHS,
    # shuffle=False,
    # callbacks = [logger]
    )

# Train Model V2.2: All at Once

In [None]:
model.fit(
    x=tf.cast(np.array(overall_train[0]), tf.float64),
    y=tf.cast(list(map(int,overall_train[1])),tf.int32),
    epochs=15,
    # shuffle=False,
    # callbacks = [logger]
    )

Epoch 1/15
Epoch 2/15
Epoch 3/15
Epoch 4/15
Epoch 5/15
Epoch 6/15
Epoch 7/15
Epoch 8/15
Epoch 9/15
Epoch 10/15
Epoch 11/15
Epoch 12/15
Epoch 13/15
Epoch 14/15
Epoch 15/15


<tensorflow.python.keras.callbacks.History at 0x7fd7ce357f90>

# Test Model

In [None]:

def loadTestingData():
  X = []
  Y = []
  for folder in FOLDERS:
    path = os.path.join(images_path, folder)
    class_num = FOLDERS.index(folder)
    min_index = TRAINING_PROPORTION*len(os.listdir(path)) *MAX_USE
    max_index = len(os.listdir(path)) *MAX_USE
    # print()
    # print(int(min_index),int(max_index))
    for img in os.listdir(path)[int(min_index):int(max_index)]:
      try:
        img_array = cv2.imread(os.path.join(path,img))
        new_array = cv2.resize(img_array,(IMG_SIZE,IMG_SIZE))
        new_array = np.array(new_array)
        new_array = new_array.astype('float32')
        new_array /= 255

        X.append(new_array)
        Y.append(class_num)
      except Exception as e:
        pass
  return X, Y



Xtest, Ytest = loadTestingData()

KeyboardInterrupt: ignored

In [None]:
error_rate = model.evaluate(
    x=tf.cast(np.array(Xtest), tf.float64),
    y=tf.cast(list(map(int,Ytest)),tf.int32),
    # callbacks=[logger]
    )

# error_rate = model.evaluate(
#     x=tf.cast(np.array(X), tf.float64),
#     y=tf.cast(list(map(int,Y)),tf.int32),
#     # callbacks=[logger]
#     )

error_rate

# Pre-Testing Processing V2

In [None]:
def loadTestingData(folder):

  X = []
  Y = []
  path = os.path.join(images_path, folder)
  class_num = 0
  if "fake" in folder:
    class_num = 1
  min_index = TRAINING_PROPORTION*len(os.listdir(path)) *MAX_USE
  max_index = len(os.listdir(path)) *MAX_USE

  for img in (os.listdir(path))[int(min_index):int(max_index)]:
    try:
      img_array = cv2.imread(os.path.join(path,img))
      new_array = cv2.resize(img_array,(IMG_SIZE,IMG_SIZE))
      new_array = np.array(new_array)
      new_array = new_array.astype('float32')
      new_array /= 255

      X.append(new_array)
      Y.append(class_num)
    except Exception as e:
      pass

  return [X, Y]

easy_test = loadTestingData("training_fake_easy")
medium_test = loadTestingData("training_fake_med")
hard_test = loadTestingData("training_fake_hard")
real_test = loadTestingData("training_real")





In [None]:
# Split real_train into 3 parts:

total_real = len(real_test[0])
real_test_1 = [real_test[0][:int(total_real/3)],real_test[1][:int(total_real/3)]]
real_test_2 = [real_test[0][int(total_real/3):2*int(total_real/3)],real_test[1][int(total_real/3):2*int(total_real/3)]]
real_test_3 = [real_test[0][2*int(total_real/3):],real_test[1][2*int(total_real/3):]]

In [None]:
# Batching together:

easy_test = [np.concatenate((easy_test[0], real_test_1[0])), np.concatenate((easy_test[1], real_test_1[1]))]
medium_test = [np.concatenate((medium_test[0], real_test_2[0])), np.concatenate((medium_test[1], real_test_2[1]))]
hard_test = [np.concatenate((hard_test[0], real_test_3[0])), np.concatenate((hard_test[1], real_test_3[1]))]

overall_test = easy_test
overall_test = [np.concatenate((overall_test[0], medium_test[0])), np.concatenate((overall_test[1], medium_test[1]))]
overall_test = [np.concatenate((overall_test[0], hard_test[0])), np.concatenate((overall_test[1], hard_test[1]))]

all_fake = copy.deepcopy(overall_test)

overall_test = [np.concatenate((overall_test[0], real_test[0])), np.concatenate((overall_test[1], real_test[1]))]

# all_fake_test = easy_test
# all_fake_test = [np.concatenate((all_fake_test[0], medium_test[0])), np.concatenate((all_fake_test[1], medium_test[1]))]
# all_fake_test = [np.concatenate((all_fake_test[0], hard_test[0])), np.concatenate((all_fake_test[1], hard_test[1]))]



In [None]:
real_test

[[array([[[0.24313726, 0.3254902 , 0.44705883],
          [0.23529412, 0.32941177, 0.4392157 ],
          [0.23921569, 0.32941177, 0.4509804 ],
          ...,
          [0.14509805, 0.28235295, 0.33333334],
          [0.1764706 , 0.3019608 , 0.34901962],
          [0.10196079, 0.24705882, 0.27450982]],
  
         [[0.24705882, 0.32941177, 0.45490196],
          [0.24705882, 0.34117648, 0.45882353],
          [0.23529412, 0.3372549 , 0.45490196],
          ...,
          [0.14509805, 0.29803923, 0.33333334],
          [0.14901961, 0.3019608 , 0.33333334],
          [0.12156863, 0.28235295, 0.3137255 ]],
  
         [[0.24313726, 0.3254902 , 0.4509804 ],
          [0.23921569, 0.32156864, 0.44705883],
          [0.22745098, 0.32156864, 0.44313726],
          ...,
          [0.13333334, 0.2901961 , 0.30980393],
          [0.11764706, 0.29411766, 0.29411766],
          [0.10588235, 0.29411766, 0.3019608 ]],
  
         ...,
  
         [[0.07843138, 0.14117648, 0.18431373],
          [0.0

# Test Model V2

In [None]:
print("Overall Evaluation----------")
model.evaluate(
    x=tf.cast(np.array(overall_test[0]), tf.float64),
    y=tf.cast(list(map(int,overall_test[1])),tf.int32),
    # callbacks=[logger]
    )

print("Easy Evaluation----------")
model.evaluate(
    x=tf.cast(np.array(easy_test[0]), tf.float64),
    y=tf.cast(list(map(int,easy_test[1])),tf.int32),
    # callbacks=[logger]
    )

print("Medium Evaluation----------")
model.evaluate(
    x=tf.cast(np.array(medium_test[0]), tf.float64),
    y=tf.cast(list(map(int,medium_test[1])),tf.int32),
    # callbacks=[logger]
    )

print("Hard Evaluation----------")
model.evaluate(
    x=tf.cast(np.array(hard_test[0]), tf.float64),
    y=tf.cast(list(map(int,hard_test[1])),tf.int32),
    # callbacks=[logger]
    )

print("Fake Evaluation----------")
model.evaluate(
    x=tf.cast(np.array(all_fake[0]), tf.float64),
    y=tf.cast(list(map(int,all_fake[1])),tf.int32),
    # callbacks=[logger]
    )

print("Real Evaluation----------")
model.evaluate(
    x=tf.cast(np.array(real_test[0]), tf.float64),
    y=tf.cast(list(map(int,real_test[1])),tf.int32),
    # callbacks=[logger]
    )

Overall Evaluation----------
Easy Evaluation----------
Medium Evaluation----------
Hard Evaluation----------
Fake Evaluation----------
Real Evaluation----------


[0.5058714747428894, 0.8663594722747803]

# Tensorboard

In [None]:
# Load the TensorBoard notebook extension
%load_ext tensorboard

In [None]:
%tensorboard --logdir log

# Save Model

In [None]:
model.save('/content/drive/MyDrive/HCDS Project/trained_model6_incremental_0.63 to0.73.h5')

# Rough work. Ignore

In [None]:
# def trainImage(model, face_landmarks):
#   found_points = []
#   for name, points in face_landmarks.items():
#     found_points.append(name)
#   print("For this image,",found_points," were found!")

# def trainBatchNormal(model,fake_images, starting_index):
#   last_index = starting_index+MAX_BATCH_SIZE
#   if last_index >= len(fake_images):
#     last_index = len(fake_images)-1
#   for img in fake_images[starting_index:last_index]:
#     face_image = face_recognition.load_image_file(fake_images_path + "/" + img)     # Fix this later
#     face_landmarks_list = face_recognition.face_landmarks(face_image)
#     number_of_faces = len(face_landmarks_list)
#     if number_of_faces == 0:
#       continue
#     for face_landmarks in face_landmarks_list:
#       trainImage(model,face_landmarks)

In [None]:
def convert_to_array(dictionary):
    '''Converts lists of values in a dictionary to numpy arrays'''


    new_array = []
    for k, v in dictionary.items():
      print(k, len(v))
      temp_array = v
      new_array.append(v)
    print(new_array)
    return new_array

    # return {k:np.array(v) for k, v in dictionary.items()}

def loadBatch(images, path,starting_index):
  face_images = []
  last_index = starting_index + MAX_BATCH_SIZE
  image_counter = 1

  if last_index >= len(images):
    last_index = len(images)-1


  # if (starting_index>=len(images)):
  #   return []

  for img in images[starting_index:last_index]:
    temp_img = face_recognition.load_image_file(path + "/" + img)
    temp_landmarks = face_recognition.face_landmarks(temp_img)
    face_images.append(temp_landmarks)
    # face_images.append(face_recognition.face_landmarks(temp_img))
    print("Loaded ", image_counter+starting_index)
    print("Image:", temp_landmarks)

    image_counter += 1

  return face_images

def trainImagesBatch(model, real_images, fake_images):

  real_counter = 0
  fake_counter = 0
  temp_images = []
  labels = []
  while (real_counter+ALTERNATE_AFTER<=len(real_images) and fake_counter+ALTERNATE_AFTER<=len(fake_images)):
    # temp_images.append(real_images[real_counter])
    temp_images.append(convert_to_array(real_images[real_counter]))
    labels.append("REAL")
    # temp_images.append(real_images[real_counter+1])
    temp_images.append(convert_to_array(real_images[real_counter+1]))
    labels.append("REAL")

    real_counter += ALTERNATE_AFTER

    # temp_images.append(fake_images[fake_counter])
    temp_images.append(convert_to_array(fake_images[fake_counter]))
    labels.append("FAKE")
    # temp_images.append(fake_images[fake_counter+1])
    temp_images.append(convert_to_array(fake_images[fake_counter+1]))
    labels.append("FAKE")

    fake_counter += ALTERNATE_AFTER

  while real_counter<len(real_images):
    # temp_images.append(real_images[real_counter])
    temp_images.append(convert_to_array(real_images[real_counter]))
    labels.append("REAL")
    real_counter += 1

  while fake_counter<len(fake_images):
    # temp_images.append(fake_images[fake_counter])
    temp_images.append(convert_to_array(fake_images[fake_counter]))
    labels.append("FAKE")
    fake_counter += 1

  for face_landmarks in temp_images:
    found_points = []

    # chin 17, left_eyebrow 5, right_eyebrow 5, nose_bridge 4, nose_tip 5, left_eye 6, right_eye 6, top_lip 12, bttom_lip 12
    print(face_landmarks)

    # for name, points in face_landmarks.items():
    #   found_points.append(name)


    print("For this image:",found_points," were found!")

def trainBatchTogether(model,real_images,fake_images, starting_index):
  real_face_images = loadBatch(real_images,real_images_path,starting_index)
  fake_face_images = loadBatch(fake_images,fake_images_path,starting_index)

  all_fake_faces = []
  all_real_faces = []

  for faces in fake_face_images:
    if len(faces)==0:
      continue
    for face in faces:
      all_fake_faces.append(face)

  for faces in real_face_images:
    if len(faces)==0:
      print("Skipped!")
      continue
    for face in faces:
      all_real_faces.append(face)

  print("Ls are:", len(all_fake_faces), len(all_real_faces))
  trainImagesBatch(model, all_real_faces,all_fake_faces)

def trainAllImages(model, real_images, fake_images):

  starting_index = 0
  max_length = max(len(real_images),len(fake_images))/200

  while starting_index<max_length:
    trainBatchTogether(model,real_images,fake_images,starting_index)
    starting_index += MAX_BATCH_SIZE


In [None]:
trainAllImages(0,real_images,fake_images)

In [None]:
model = tf.keras.models.Sequential([
                                    tf.keras.layers.Conv2D(64,(3,3),activation="relu",input_shape=(200,200,3))
])t
