## Deep Fake Detection Challenge

### Data Processing

##### Imports

In [None]:
import os
import pandas as pd
import tensorflow as tf
from tensorflow import keras
from keras import layers
import matplotlib.pyplot as plt
import numpy as np
import cv2
from typing import Tuple
from model2D import *
#from model3D import *
from model3D_small import *
import math
from sklearn.utils.class_weight import compute_class_weight
from keras.utils.np_utils  import to_categorical
from sklearn.metrics import classification_report, confusion_matrix, ConfusionMatrixDisplay
from matplotlib import offsetbox
from tensorflow.keras.applications.resnet50 import ResNet50
import visualkeras
from keras.applications.imagenet_utils import preprocess_input
from tqdm import tqdm
from sklearn.manifold import TSNE
from PIL import Image
from matplotlib.patches import Patch
from keras.utils.vis_utils import plot_model

##### Data Description

In [None]:
train_videoes = "data/train_sample_videos"
test_videos = "data/test_videos"

print(f"Train Videoes: {len(os.listdir(train_videoes))}\nTest Vidoes: {len(os.listdir(test_videos))}")

In [None]:
meta_data = pd.read_json(train_videoes+"/metadata.json").T
labels_col =meta_data["label"].to_list()
paths_col = meta_data.index.to_list()
print(meta_data.head())

In [None]:
def visualize_real_vs_fake(class_count: list) -> None:
    plt.pie(class_count, labels=["Real Videos", "Deep Fake Videos"], autopct='%.2f%%',
       wedgeprops={'linewidth': 2.5, 'edgecolor': 'white'},
       textprops={'size': 'large', 'fontweight': 'bold'})
    plt.title("Proportion of Real vs Deep Fake videos in the training dataset.", fontdict={'fontweight': 'bold'})
    plt.legend([f"Real Videos Count: {class_count[0]}", f"Deep Fake Videos Count: {class_count[1]}"], bbox_to_anchor=(0.5, 0.05), bbox_transform=plt.gcf().transFigure, loc="lower center", prop={'weight':'bold'})
    plt.savefig("images/pie_chart_class_proportions.jpg")
    plt.show()

In [None]:
label_count_series = meta_data["label"].value_counts()
fake_count = label_count_series["FAKE"]
real_count = label_count_series["REAL"]

visualize_real_vs_fake([real_count, fake_count])

Quite skewed dataset. Might want to consider upsampling of real classes

In [None]:
def get_faces(paths: list, face_amount: int) -> (np.ndarray, list):
    video_array = []
    invalid_indices = []
    face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')
    #model = ResNet50(weights='imagenet', include_top=False, pooling='avg')

    for idx, path in enumerate(tqdm(paths)):
        vc = cv2.VideoCapture(path)

        faces = []
        while len(faces) < face_amount:
            ret, frame = vc.read()
            if ret and frame is not None:    
                gray_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
                face = face_cascade.detectMultiScale(gray_frame, scaleFactor=1.1, minNeighbors=5)
                if len(face) > 0:
                    x, y, w, h = max(face, key=lambda x: x[2] * x[3])
                    face_img = frame[y:y+h, x:x+w]
                    face_img = cv2.resize(face_img, (224, 224))
                    #face_img = np.expand_dims(face_img, axis=0)
                    #feature = model.predict(face_img)
                    faces.append(face_img)
            else:
                break
        vc.release()
        if len(faces) == face_amount:
            video_array.append(np.array(faces))
        else:
            invalid_indices.append(idx)

    
    
    return np.array(video_array), invalid_indices

In [None]:
training_videos_sample = os.listdir(train_videoes)
sample_file_names = training_videos_sample.copy()
complete_paths = []

for path in paths_col:
    complete_paths.append(train_videoes+"/"+path)
complete_paths.sort()

(faces, indices) = get_faces(complete_paths, 5)

In [None]:
print(faces[0][1].shape)
print(faces[1][1].shape)
print(faces.shape)
print(indices)

## Frames

In [None]:
def get_frames_v1(paths: list, frames_each_video: int, video_amount: int, resolution: tuple) -> list:
    video_array_colors = []
    for idx, path in enumerate(paths): 
        if idx == video_amount:
            break
        vc = cv2.VideoCapture(path)

        frames_to_skip = (int(vc.get(cv2.CAP_PROP_FRAME_COUNT))-5)/frames_each_video
        frames_to_skip = int(round(frames_to_skip,0))

        video = []
        i = 0
        while vc.isOpened():
            i += 1
            ret, frame = vc.read()
            if ret and frame is not None:
                if i % frames_to_skip != 0:
                    continue
                if frame.shape[0] == 1920:
                    frame = frame.transpose(1, 0, 2)
                    
                frame = cv2.resize(frame, resolution)
                video.append((cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) / 255))
            else:
                vc.release()
                break
            if len(video) < frames_each_video:        # for å catch vid me for lite frames
                video.append(video[-1])
        video_array_colors.append(np.array(video))
    return np.array(video_array_colors)

In [None]:

def get_frames_v2(paths: list, frames_each_video: int, video_amount: int):
    video_array_colors = []
    face_regions = []

    face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')

    for idx, path in enumerate(paths): 
        if idx == video_amount:
            break

        vc = cv2.VideoCapture(path)
        frames_to_skip = (int(vc.get(cv2.CAP_PROP_FRAME_COUNT))-5)/frames_each_video
        frames_to_skip = math.floor(frames_to_skip)
        video = []
        i = 0

        while vc.isOpened():
            i += 1
            ret, frame = vc.read()
            if ret and frame is not None:
                if i % frames_to_skip != 0:
                    continue
                frame = cv2.resize(frame, (1080, 720))
                video.append(frame)
            else:
                vc.release()
                break

        video_array_colors.append(np.array(video))

        video_face_regions = []

        for frame in video:
            gray_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
            faces = face_cascade.detectMultiScale(gray_frame, scaleFactor=1.1, minNeighbors=5)

            if len(faces) > 0:
                x, y, w, h = max(faces, key=lambda face: face[2] * face[3])
                face_img = frame[y:y+h, x:x+w]
                video_face_regions.append(face_img)

        if video_face_regions:
            face_regions.append(video_face_regions[0])
        else:
            print("No faces detected in any frame of this video.")
            face_regions.append(np.zeros((224, 224, 3)))  # Add a placeholder image with the same size as the face images

    return np.array(video_array_colors), face_regions



def extract_features(face_regions, model):
    features = []
    for face_img in face_regions:
        if np.count_nonzero(face_img) == 0:  # If the face image is a placeholder (all zeros)
            features.append(np.zeros_like(features[-1]))  # Add zeros as features
        else:
            face_img = cv2.resize(face_img, (224, 224))
            face_img = preprocess_input(face_img)
            face_img = np.expand_dims(face_img, axis=0)
            feature = model.predict(face_img)
            features.append(feature.squeeze())
    return features
from tensorflow.keras.applications.resnet50 import ResNet50

model = ResNet50(weights='imagenet', include_top=False, pooling='avg')
video_array_colors, face_regions = get_frames_v2(paths=complete_paths, frames_each_video=5, video_amount=200)
video_features = extract_features(face_regions, model)


In [None]:
training_videos_sample = os.listdir(train_videoes)
sample_file_names = training_videos_sample.copy()
complete_paths = []

for path in paths_col:
    complete_paths.append(train_videoes+"/"+path)
complete_paths.sort()


In [None]:
print(vid_arr_col.shape)
print(vid_arr_col[8].shape)

In [None]:
y = [0 if val!="FAKE" else 1 for val in labels_col ]
y =np.array(y)
y = to_categorical(y, num_classes=None).astype(int)
y[:10]

In [None]:
print(f"Amount of Videos: {len(vid_arr_col)}")
# print(f"Frames for videos: {[len(vid_arr_col[i]) for i in range(len(vid_arr_col))]}")
print(f"Frames for videos: {[len(vid_arr_col[i]) for i in range(10)]}")

### Plot frames

In [None]:

model = ResNet50(weights='imagenet', include_top=False, pooling='avg')

In [None]:
def extract_features(video_array):
    features = []
    face_regions = []

    face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')
    noFaceFound = 0
    for video in tqdm(video_array):
        video_features = []
        video_face_regions = []

        for frame in video:
            gray_frame = cv2.cvtColor((frame * 255).astype(np.uint8), cv2.COLOR_RGB2GRAY)
            faces = face_cascade.detectMultiScale(gray_frame, scaleFactor=1.1, minNeighbors=5)

            if len(faces) > 0:
                x, y, w, h = max(faces, key=lambda face: face[2] * face[3])
                face_img = frame[y:y+h, x:x+w]
                face_img = cv2.resize(face_img, (224, 224))

                face_img = preprocess_input(face_img * 255)
                face_img = np.expand_dims(face_img, axis=0)
                feature = model.predict(face_img)
                video_features.append(feature.squeeze())

                video_face_regions.append(frame[y:y+h, x:x+w])

            else:
                continue

        if video_features:
            features.append(np.mean(video_features, axis=0))
            face_regions.append(video_face_regions[0])
        else:
            print("No faces detected in any frame of this video.")
            noFaceFound += 1
            if features:  # Check if the features list is not empty
                features.append(np.zeros_like(features[-1]))  # Add zeros if no faces are detected
                face_regions.append(np.zeros((224, 224, 3)))  # Add a placeholder image with the same size as the face images

    return np.array(features), face_regions
video_features, face_regions = extract_features(vid_arr_col)

In [None]:
def plot_tsne_with_images(tsne_results, face_regions, figsize=(4, 4), thumbnail_size=(64, 36)):
    fig, ax = plt.subplots(figsize=figsize)

    for i in range(tsne_results.shape[0]):
        x, y = tsne_results[i, :]

        img = face_regions[i]
        img = cv2.resize(img, thumbnail_size)

        img_box = offsetbox.OffsetImage(img, zoom=1, cmap='gray')
        img_annotation = offsetbox.AnnotationBbox(img_box, (x, y), xycoords='data', frameon=False)

        ax.add_artist(img_annotation)

    ax.set_xlim(tsne_results[:, 0].min() - 10, tsne_results[:, 0].max() + 10)
    ax.set_ylim(tsne_results[:, 1].min() - 10, tsne_results[:, 1].max() + 10)
    ax.set_xlabel('t-SNE Component 1')
    ax.set_ylabel('t-SNE Component 2')
    ax.set_title('t-SNE Plot of Video Features with Face Thumbnails')
    plt.show()


In [None]:
tsne = TSNE(n_components=2, random_state=42,perplexity=20)
tsne_results = tsne.fit_transform(video_features)

In [None]:

vis = visualkeras.layered_view(test_m, to_file="images/layers_yo.png", legend=True)
vis.show()
plt.rcParams.update({'font.size':500})  # set the legend font size to 56
plt.show()



In [None]:

plot_model(test_m, to_file="images/layerv2.png", show_shapes = True, show_layer_names = True)

In [None]:
plot_tsne_with_images(tsne_results, face_regions)

In [None]:
def plot_video(video: list, figsize: tuple, width: int, height: int) -> None:
    fig = plt.figure(figsize=figsize)
    for i in range(len(video[:(width*height)])):
        plt.subplot(width, height, i+1)
        plt.imshow(video[i])
    plt.show()

In [None]:

# Generate some example data
data = np.random.rand(10, 10)

fig, ax = plt.subplots()

# Create the heatmap
heatmap = ax.imshow(data, cmap='viridis')

# Create the colorbar
cbar = fig.colorbar(heatmap, ax=ax)

# Change the font size of the colorbar labels
cbar.ax.tick_params(labelsize=14)  # You can set the desired font size here

# Show the plot
plt.show()


In [None]:
def plot_faces(faces: list, figsize: tuple, width: int, height: int) -> None:
    fig = plt.figure(figsize=figsize)
    num_faces = min(len(faces), width * height)
    for i in range(num_faces):
        plt.subplot(width, height, i + 1)
        plt.imshow(faces[i])
        plt.axis('off')
    plt.show()
    

In [None]:
plot_faces(face_regions, figsize=(10, 10), width=5, height=4)

In [None]:
plot_video(video_features[0], (30,5), 2, 5)

## tSNE

### BUILD MODEL
- Add Layers
- Add Loss function, optimizers, and metrics
- Compile model and Fit

In [None]:
weight_class = compute_class_weight(class_weight='balanced',classes=[0,1],y=np.argmax(y, axis=1))
class_weights = dict(zip(np.unique(y), weight_class))
class_weights

In [None]:
def build_3D_model(input_data):
    model = keras.models.Sequential()
    model.add(layers.Conv3D(filters=64, kernel_size=3, padding="same", strides=1, activation="relu", input_shape=input_data.shape[1:]))
    model.add(layers.MaxPool3D(pool_size=2, padding="same"))
    model.add(layers.Conv3D(filters=32, kernel_size=3, padding="same", activation="relu"))
    model.add(layers.MaxPool3D(pool_size=2, padding="same"))
    model.add(layers.Conv3D(filters=16, kernel_size=3, padding="same", activation="relu"))
    model.add(layers.MaxPool3D(pool_size=2, padding="same"))
    model.add(layers.Conv3D(filters=8, kernel_size=3, padding="same", activation="relu"))
    model.add(layers.MaxPool3D(pool_size=2, padding="same"))
    model.add(layers.Flatten())
    model.add(layers.Dense(128, activation="relu"))
    model.add(layers.Dense(2 ,activation="softmax"))
    return model


In [None]:
def test_model(input_data):
    model = keras.models.Sequential()
    model.add(layers.Conv3D(filters=32, kernel_size=(2,2,2),input_shape=input_data.shape[1:],
                activation='relu',
                padding='same', data_format='channels_last'))
    model.add(layers.BatchNormalization())
    model.add(layers.ConvLSTM2D(filters=16, kernel_size=(2, 2),
                    padding='same', return_sequences=True))
    model.add(layers.BatchNormalization())
    model.add(layers.Dropout(0.3))

    model.add(layers.MaxPooling3D(pool_size=(2,2,2)))

    model.add(layers.ConvLSTM2D(filters=16, kernel_size=(2, 2),
                    padding='same', return_sequences=True))
    model.add(layers.BatchNormalization())
    model.add(layers.Dropout(0.3))

    model.add(layers.MaxPooling3D(pool_size=(2,2,2)))
    model.add(layers.Flatten())

    model.add(layers.BatchNormalization())

    model.add(layers.Dense(32,activation='elu'))

    model.add(layers.Dense(2,activation='sigmoid'))
    return model

In [None]:
test_m = test_model(vid_arr_col)
test_m.summary()

In [None]:
loss = "binary_crossentropy"
optimizer = "adam"
metrics=["accuracy"]
test_m.compile(optimizer=optimizer, loss=loss, metrics=metrics)

In [None]:
history_test = test_m.fit(vid_arr_col, 
                          y[:vid_arr_col.shape[0]],
                          epochs=10, 
                          batch_size=10, 
                          verbose=1, 
                          class_weight=class_weights)

In [None]:
pred_y = test_m.predict(vid_arr_col)

In [None]:
pred_y[:5]

In [None]:
actual_y = np.argmax(y[:vid_arr_col.shape[0]], axis=1)
pred_y = np.argmax(pred_y, axis=1)
print(classification_report(actual_y, pred_y, digits=3))

In [None]:
pred_y.sum()

In [None]:
actual_y

In [None]:
cfm = confusion_matrix(actual_y, pred_y)

In [None]:
disp = ConfusionMatrixDisplay(confusion_matrix=cfm, display_labels= ["REAL", "FAKE"])
disp.plot()

In [None]:
model_3D_col = build_3D_model(vid_arr_col)
model_3D_col.summary()

In [None]:
loss = "binary_crossentropy"
optimizer = keras.optimizers.Adam(learning_rate = 0.1)
metrics=["accuracy"]
model_3D_col.compile(optimizer=optimizer, loss=loss, metrics=metrics)

In [None]:
history = model_3D_col.fit(vid_arr_col, 
                           y[:vid_arr_col.shape[0]],
                           epochs=10, 
                           batch_size=10, 
                           verbose=2)

In [None]:
loss = keras.losses.BinaryCrossentropy(from_logits=True)
optimizer = keras.optimizers.Adam(learning_rate=0.001)
metrics=["accuracy"]
model_3D_col.compile(optimizer=optimizer, loss=loss, metrics=metrics)

### Plotting performance

In [None]:
print(history_test.params)
print(history_test.history.keys())

In [None]:
# summarize history for accuracy
plt.plot(history_test.history['accuracy'])
plt.title('model accuracy')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
plt.show()
# summarize history for loss
plt.plot(history_test.history['loss'])
plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
plt.show()