In [17]:
%pip install efficientnet
%pip install facenet-pytorch

Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.


In [18]:
import cv2
from facenet_pytorch import MTCNN

# Code Extracted from: https://www.kaggle.com/code/timesler/fast-mtcnn-detector-55-fps-at-full-resolution
class FastMTCNN(object):
    """Fast MTCNN implementation."""
    
    def __init__(self, stride, resize=1, *args, **kwargs):
        """Constructor for FastMTCNN class.
        
        Arguments:
            stride (int): The detection stride. Faces will be detected every `stride` frames
                and remembered for `stride-1` frames.
        
        Keyword arguments:
            resize (float): Fractional frame scaling. [default: {1}]
            *args: Arguments to pass to the MTCNN constructor. See help(MTCNN).
            **kwargs: Keyword arguments to pass to the MTCNN constructor. See help(MTCNN).
        """
        self.stride = stride
        self.resize = resize
        self.mtcnn = MTCNN(*args, **kwargs)
        
    def __call__(self, frames):
        """Detect faces in frames using strided MTCNN."""
        if self.resize != 1:
            frames = [
                cv2.resize(f, (int(f.shape[1] * self.resize), int(f.shape[0] * self.resize)))
                    for f in frames
            ]
                      
        boxes, probs = self.mtcnn.detect(frames[::self.stride])

        faces = []
        for i, frame in enumerate(frames):
            box_ind = int(i / self.stride)
            if boxes[box_ind] is None:
                continue
            for box in boxes[box_ind]:
                box = [int(b) for b in box]
                faces.append(frame[box[1]:box[3], box[0]:box[2]])
        
        return faces

In [19]:
import cv2
from facenet_pytorch import MTCNN
import numpy as np
import h5py
import os
os.environ['TF_DISABLE_MMM'] = '1'  # Disable TensorFlow Meta Optimizer
from PIL import Image
import random
import tensorflow as tf
from efficientnet.keras import EfficientNetB0
from keras.models import Sequential
from keras.layers import LSTM, TimeDistributed, Dense, Conv2D, MaxPooling2D, Flatten, GlobalAveragePooling2D
import torch

device = 'cpu'
random.seed(42)

# 1 videos correspond to 10 frames (10 faces)
# each face is is represented as a matrix of 224x224 dimension. where each value is in rgb (3 values)

FILE_NAME  = "MODEL.h5"
BATCH_SIZE = 10 # number of videos to process
cwd = os.getcwd()
face_detector = FastMTCNN(
stride=4,
resize=0.5,
margin=14,
factor=0.6,
keep_all=False,
device=device
)

print('CUDA Available:', torch.cuda.is_available())

def preprocess_image(image):
    image = image / 255.0 # normalize between [0, 1]
    return image

def extract_faces(vid_path, batch, counter, n_faces = 10):
    video_path = os.path.join(cwd, vid_path)
    cap = cv2.VideoCapture(video_path)
    frame_count = 0
    face_images = []
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    frame_step = max(total_frames // n_faces, 1)
    
    frames = []
    while cap.isOpened():
        ret, frame = cap.read()
        frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        if not ret:
            break
        frame_count += 1
        if (frame_count - 1) % frame_step != 0:
            continue
        frames.append(frame)
        if len(frames) >= n_faces:
            break
    resized_faces = []
    if len(frames) > 0:
        faces = face_detector(frames)
        for face in faces:
            if len(resized_faces) >= n_faces:
                break
            if face.size == 0:
                continue
            resized_face = cv2.resize(face, (224, 224))
            resized_face = preprocess_image(resized_face)
            resized_faces.append(resized_face)
    cap.release()
    counter += 1
    print(counter)
    return resized_faces, counter


def get_splits(vids): # vids = (vid_name, label)
    train_ratio = 0.7
    val_ratio   = 0.15
    test_ratio  = 0.15

    num_videos  = len(vids)
    num_train   = int(num_videos * train_ratio)
    num_val     = int(num_videos * val_ratio)
    num_test    = num_videos - num_train - num_val # ~0.15

    train_videos = vids[:num_train]
    val_videos   = vids[num_train:num_train+num_val]
    test_videos  = vids[num_train+num_val:]

    return train_videos, val_videos, test_videos

# def save_faces(faces, dataset, batch): # faces = [(15 images), (15 images), ...]
#     with h5py.File(FILE_NAME, 'a') as f:
#         f.create_dataset(dataset + "_" + str(batch), data=faces)
#         f.close()

def special_save(data, labels, label):
    with h5py.File(label + "_data.h5", "w") as f:
        for i, d in enumerate(data):
            f.create_dataset(f"{label}_data_{i}", data=d)
        f.create_dataset(label +"_labels", data=labels)
        f.close()
    print(f"{label} data saved")

def load(label, dataset_keys):
    with h5py.File(cwd + "/" + label + "_data.h5", "r") as f:
        recovered_test_data = []
        for key in dataset_keys:
            if key != label + "_labels":
                data = f[key][:]
                recovered_test_data.append(data)
        f.close()
    return recovered_test_data
    
def special_load(label):
    with h5py.File(cwd + "/" + label + "_data.h5", "r") as f:
        recovered_test_labels = np.array(f[label + "_labels"])
        f.close()
    return recovered_test_labels

def get_data_labels(vids, batches, dataset, counter, flag):
    data     = []
    labels   = []
    for batch_index in range(batches):
        batch_data = vids[batch_index * BATCH_SIZE : (batch_index + 1) * BATCH_SIZE]
        processed_faces = []
        # preprocess
        for video_paths, label in batch_data:
            faces, counter = extract_faces(video_paths, batch_index, counter) # list of faces
            data.append(np.array(faces))
            labels.append(1 if label == 'real' else 0)
            processed_faces.append(np.array(faces))
        # save
        if flag:
            pass

    labels = np.array(labels)
    data = np.array(data)
    return data, labels, counter


def fix_shape(data):
    reshaped_data = []
    for arr in data:
        if arr.shape == (10, 224, 224, 3):
            reshaped_data.append(arr)
    reshaped_data = np.array(reshaped_data, dtype=np.float32)
    reshaped_data = reshaped_data.reshape((-1, 10, 224, 224, 3))
    return reshaped_data


def preprocessing():
    fake_path   = '/kaggle/input/dataset/archive/manipulated_sequences/Deepfakes/c23/videos'
    ff          = os.listdir(fake_path)
    fake_files  = [(fake_path + "/") + s for s in ff]
    fake_label  = ['fake'] * len(fake_files)

    real_path   = '/kaggle/input/dataset/archive/original_sequences/youtube/c23/videos'
    rf          = os.listdir(real_path)
    real_files  = [(real_path + "/") + s for s in rf]
    real_label  = ['real'] * len(real_files)

    random.seed(42)

    fake_combined = list(zip(fake_files, fake_label))
    real_combined = list(zip(real_files, real_label))
    combined      = []
    combined.extend(fake_combined)
    combined.extend(real_combined)
    fake_combined = []
    real_combined = []

    random.shuffle(combined)

    train_vids, val_vids, test_vids = get_splits(combined)
    combined      = []
    
    # deleting half of the videos on each list for lack computational power
    del train_vids[len(train_vids) // 2:]
    del val_vids[len(val_vids) // 2:]
    del test_vids[len(test_vids) // 2:]
    
    print(f"Training examples:   {len(train_vids)}")
    print(f"Validation examples: {len(val_vids)}")
    print(f"Testing examples:    {len(test_vids)}")
    
    train_batches = len(train_vids) // BATCH_SIZE
    val_batches   = len(val_vids)   // BATCH_SIZE
    test_batches  = len(test_vids)  // BATCH_SIZE
    
    counter = 0
    
    train_data, train_labels, counter    = get_data_labels(train_vids, train_batches, 'train', counter, False)
#     train_data = fix_shape(train_data)
    special_save(train_data, train_labels, "train")
    print("train data saved")
    train_data = []
    
    val_data, val_labels, counter        = get_data_labels(val_vids, val_batches, 'val', counter, False)
#     val_data   = fix_shape(val_data)
    special_save(val_data, val_labels, "val")
    print("val data saved")
    val_data = []
    
    test_data, test_labels, counter      = get_data_labels(test_vids, test_batches, 'test', counter, False)
    special_save(test_data, test_labels, "test")
    print("test data saved")
    test_data = []


CUDA Available: False


In [20]:
import os
import matplotlib.pyplot as plt
os.environ['TF_DISABLE_MMM'] = '1'  # Disable TensorFlow Meta Optimizer
import tensorflow as tf
from keras.models import load_model
from keras.utils import to_categorical
import gc
# from tensorflow.keras.utils import plot_model

# gpus = tf.config.experimental.list_physical_devices('GPU')
# print(gpus)

cwd = os.getcwd()
MODEL_FILE_NAME = cwd + "/nn.h5"
TRAIN_FILE_NAME  = "/kaggle/input/data-processed/train_data.h5"
VAL_FILE_NAME  = "/kaggle/input/data-processed/val_data.h5"
TEST_FILE_NAME  = cwd + "/test_data.h5"


def create_model():
    num_frames = 10
    input_shape     = (224, 224, 3)
    # the input has shape (10, 224, 224, 3)

    #base model
    base_model = EfficientNetB0(weights='imagenet', include_top=False, input_shape=(224,224,3)) 
    # efficientenet b0 - cnn que acepta imagenes de 224x224x3

    model = Sequential()
    # container
    model.add(TimeDistributed(base_model, input_shape=(num_frames,) + input_shape))
    # se aplica efficientnet a cada frame (convoluciones)
    model.add(TimeDistributed(GlobalAveragePooling2D()))
    # se realiza un average pooling a cada frame, reduciendo las dimensiones espaciales a un vector por frame.
    model.add(LSTM(units=64, return_sequences=True))
    # recibe los vectores caracteristicos de cada frame , los procesa y retorna vectores caracteristicos de 64 dim por cada frame
    model.add(Flatten())
    # transforma todos los feat vec a un solo feat vec
    model.add(Dense(units=128, activation='relu'))
    # pasa el feat vec a un MLP completo de 128 units
    model.add(Dense(units=1, activation='sigmoid'))
    # final layer que retorna la probibilidad de ser real o fake (1 = real video) (0 = fake video)

    model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
    model.summary()

    return model

def process_batches(data_list, labels_list, batch_size):
    for i in range(0, len(data_list), batch_size):
        batch_data = data_list[i:i+batch_size]
        batch_labels = labels_list[i:i+batch_size]

        processed_data = [arr for arr in batch_data if arr.shape == (10, 224, 224, 3)]
        processed_labels = [label for arr, label in zip(batch_data, batch_labels) if arr.shape == (10, 224, 224, 3)]

        yield processed_data, processed_labels

if os.path.exists(MODEL_FILE_NAME):
    # testing
    print("Testing")

    model = tf.keras.models.load_model('nn.h5')

    # Use the loaded model for prediction or further training
    
    with h5py.File(TEST_FILE_NAME, "r") as f:
        dataset_keys = list(f.keys())
        f.close()
        
    test_data = []
    test_labels = []
        
    test_data_list = load("test", dataset_keys)
    test_labels_list = special_load("test")

    batch_size = 32
    batches = process_batches(test_data_list, test_labels_list, batch_size)
    del test_data_list
    del test_labels_list
    for batch_data, batch_labels in batches:
        batch_data = np.array(batch_data, dtype=np.float32)
        batch_data = fix_shape(batch_data)
        test_data.extend(batch_data)
        test_labels.extend(batch_labels)
    del batches
    gc.collect()
    
    test_data = np.array(test_data)
    test_labels = np.array(test_labels)
    # plot_model(model, to_file='model_architecture.png', show_shapes=True)
    model.summary()
    loss, accuracy = model.evaluate(test_data, test_labels)
    # print("Loss: ", loss)
    print("Test Accuracy:", accuracy)
    
    
elif not os.path.exists(TRAIN_FILE_NAME):
    print("Preprocessing")
    preprocessing()
else:
    # Load the model from the saved file
    # FINAL PREPROCESSING (limpieza)
    
    print("Clean data, create and train and validate model")
    batch_size = 32
    
    half1 = []
    half2 = []
    half3 = []
    with h5py.File("/kaggle/input/data-processed/train_data.h5", "r") as f:
        dataset_keys = list(f.keys())
        length = len(dataset_keys)
        half1 = dataset_keys[:length // 3]
        half2 = dataset_keys[length // 3: 2 * length // 3]
        half3 = dataset_keys[2 * length // 3:]
        f.close()
    
    del half3
    train_data = []
    train_labels = []
        
    train_data_list = load("train", half1)
    train_labels_list = special_load("train")
    train_labels_half1 = train_labels_list[:len(half1)]
    del train_labels_list

    batches = process_batches(train_data_list, train_labels_half1, batch_size)
    del train_data_list
    del train_labels_half1
    for batch_data, batch_labels in batches:
        batch_data = np.array(batch_data, dtype=np.float32)
        batch_data = fix_shape(batch_data)
        train_data.extend(batch_data)
        train_labels.extend(batch_labels)
    del batches
    gc.collect()
    # ------
    
    train_data_list = load("train", half2)
    train_labels_list = special_load("train")
    train_labels_half2 = train_labels_list[len(half1):]
    del train_labels_list

    batches = process_batches(train_data_list, train_labels_half2, batch_size)
    del train_data_list
    del train_labels_half2
    for batch_data, batch_labels in batches:
        batch_data = np.array(batch_data, dtype=np.float32)
        batch_data = fix_shape(batch_data)
        train_data.extend(batch_data)
        train_labels.extend(batch_labels)
    del batches

    train_data = np.array(train_data)
    train_labels = np.array(train_labels)
#     train_labels = to_categorical(train_labels)

    print(train_data.shape)
    print(train_labels.shape)
    print("done with train data")    
    gc.collect()

# ---------------------------------------------------------------------
#     half1 = []
#     half2 = []
#     with h5py.File("/kaggle/input/data-processed/val_data.h5", "r") as f:
#         dataset_keys = list(f.keys())
#         half1 = dataset_keys[:len(dataset_keys)//2]
#         half2 = dataset_keys[len(dataset_keys)//2:]
#         f.close()
        
#     val_data = []
#     val_labels = []
        
#     train_data_list = load("val", half1)
#     train_labels_list = special_load("val")
#     train_labels_half1 = train_labels_list[:len(half1)]
#     del train_labels_list

#     batches = process_batches(train_data_list, train_labels_half1, batch_size)
#     del train_data_list
#     del train_labels_half1
#     for batch_data, batch_labels in batches:
#         batch_data = np.array(batch_data, dtype=np.float32)
#         batch_data = fix_shape(batch_data)
#         val_data.extend(batch_data)
#         val_labels.extend(batch_labels)
#     del batches
#     gc.collect()
#     # ------
    
#     train_data_list = load("val", half2)
#     train_labels_list = special_load("val")
#     train_labels_half2 = train_labels_list[len(half1):]
#     del train_labels_list

#     batches = process_batches(train_data_list, train_labels_half2, batch_size)
#     del train_data_list
#     del train_labels_half2
#     for batch_data, batch_labels in batches:
#         batch_data = np.array(batch_data, dtype=np.float32)
#         batch_data = fix_shape(batch_data)
#         val_data.extend(batch_data)
#         val_labels.extend(batch_labels)
#     del batches
#     gc.collect()
#     val_data = np.array(val_data, dtype=np.float32)
#     val_labels = np.array(val_labels)
#     val_labels = to_categorical(val_labels)

#     print(val_data.shape)
#     print(val_labels.shape)
#     print("done with val data")    
    
    # CREATE MODEL
    model = create_model()
    print("model created")
#     model.fit(train_data, train_labels, batch_size=2, epochs=10, validation_data=(val_data, val_labels))
    history = model.fit(train_data, train_labels, batch_size=8, epochs=10)
    print("model trained")
    
    # Plot training loss
    plt.plot(history.history['loss'])
    plt.title('Training Loss')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.savefig('training_loss.png')  # Save the plot as an image file
    plt.show()

    # Plot training accuracy
    plt.plot(history.history['accuracy'])
    plt.title('Training Accuracy')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy')
    plt.savefig('training_accuracy.png')  # Save the plot as an image file
    plt.show()
    
    del train_data
    del train_labels
#     del val_data
#     del val_labels
    
    model.save("nn.h5")
    print("model saved")

    with h5py.File("/kaggle/input/data-processed/test_data.h5", "r") as f:
        dataset_keys = list(f.keys())
        f.close()
        
    test_data = []
    test_labels = []
        
    test_data_list = load("test", dataset_keys)
    test_labels_list = special_load("test")

    batch_size = 32
    batches = process_batches(test_data_list, test_labels_list, batch_size)
    del test_data_list
    del test_labels_list
    for batch_data, batch_labels in batches:
        batch_data = np.array(batch_data, dtype=np.float32)
        batch_data = fix_shape(batch_data)
        test_data.extend(batch_data)
        test_labels.extend(batch_labels)
    del batches
    gc.collect()
    
    test_data = np.array(test_data)
    test_labels = np.array(test_labels)
        
    loss, accuracy = model.evaluate(test_data, test_labels)
    # print("Loss: ", loss)
    print("Test Accuracy:", accuracy)

Testing
Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 time_distributed (TimeDist  (None, 10, 7, 7, 1280)    4049564   
 ributed)                                                        
                                                                 
 time_distributed_1 (TimeDi  (None, 10, 1280)          0         
 stributed)                                                      
                                                                 
 lstm (LSTM)                 (None, 10, 64)            344320    
                                                                 
 flatten (Flatten)           (None, 640)               0         
                                                                 
 dense (Dense)               (None, 128)               82048     
                                                                 
 dense_1 (Dense)             (None, 1)          