<a href="https://colab.research.google.com/github/TINF-DeepfakeEvaluierung/Evaluation-von-Methoden-zur-Erkennung-von-Deepfakes/blob/main/DSP_FWA.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Setup

In [None]:
!git clone https://github.com/TINF-DeepfakeEvaluierung/DSP-FWA.git
!pip install opencv-python==3.*

Cloning into 'DSP-FWA'...
remote: Enumerating objects: 117, done.[K
remote: Counting objects: 100% (35/35), done.[K
remote: Compressing objects: 100% (35/35), done.[K
remote: Total 117 (delta 11), reused 2 (delta 0), pack-reused 82[K
Receiving objects: 100% (117/117), 161.07 MiB | 17.21 MiB/s, done.
Resolving deltas: 100% (26/26), done.
Updating files: 100% (27/27), done.
Collecting opencv-python==3.*
  Downloading opencv_python-3.4.18.65-cp36-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (58.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m58.4/58.4 MB[0m [31m7.3 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: opencv-python
  Attempting uninstall: opencv-python
    Found existing installation: opencv-python 4.8.0.76
    Uninstalling opencv-python-4.8.0.76:
      Successfully uninstalled opencv-python-4.8.0.76
Successfully installed opencv-python-3.4.18.65


In [None]:
%cd /content/DSP-FWA

/content/DSP-FWA


In [None]:
import sys
sys.path.append('..')

import torch
import torch.nn.functional as F
import cv2, os, dlib, json
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns
from sklearn.preprocessing import normalize
from tqdm import tqdm
from itertools import islice
from py_utils.face_utils import lib
from py_utils.vid_utils import proc_vid as pv
from py_utils.DL.pytorch_utils.models.classifier import SPPNet

In [None]:
# Connect Google Cloud
# Authenticate.
from google.colab import auth
auth.authenticate_user()

# Install Cloud Storage FUSE.
!echo "deb https://packages.cloud.google.com/apt gcsfuse-`lsb_release -c -s` main" | sudo tee /etc/apt/sources.list.d/gcsfuse.list
!curl https://packages.cloud.google.com/apt/doc/apt-key.gpg | sudo apt-key add -
!apt -qq update && apt -qq install gcsfuse
# Mount a Cloud Storage bucket or location, without the gs:// prefix.
mount_path = "deepfake_detection_datasets"  # or a location like "my-bucket/path/to/mount"
local_path = f"/content/{mount_path}"

!mkdir -p {local_path}
!gcsfuse --implicit-dirs {mount_path} {local_path}

deb https://packages.cloud.google.com/apt gcsfuse-jammy main
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  2659  100  2659    0     0  13277      0 --:--:-- --:--:-- --:--:-- 13295
OK
54 packages can be upgraded. Run 'apt list --upgradable' to see them.
[1;33mW: [0mhttps://packages.cloud.google.com/apt/dists/gcsfuse-jammy/InRelease: Key is stored in legacy trusted.gpg keyring (/etc/apt/trusted.gpg), see the DEPRECATION section in apt-key(8) for details.[0m
The following NEW packages will be installed:
  gcsfuse
0 upgraded, 1 newly installed, 0 to remove and 54 not upgraded.
Need to get 10.4 MB of archives.
After this operation, 0 B of additional disk space will be used.
Selecting previously unselected package gcsfuse.
(Reading database ... 121918 files and directories currently installed.)
Preparing to unpack .../gcsfuse_2.0.1_amd64.deb ...
Unpacking gcsfuse (2.0.1) .

In [None]:
!mkdir ./ckpt
!cp "/content/deepfake_detection_datasets/DSP-FWA_(model)/SPP-res50.pth" ./ckpt/

In [None]:
sample_num = 10
# Employ dlib to extract face area and landmark points
front_face_detector = dlib.get_frontal_face_detector()
lmark_predictor = dlib.shape_predictor('./dlib_model/shape_predictor_68_face_landmarks.dat')

def im_test(net, im, input_size):
    face_info = lib.align(im[:, :, (2,1,0)], front_face_detector, lmark_predictor)
    # Samples
    if len(face_info) != 1:
        prob = -1
    else:
        _, point = face_info[0]
        rois = []
        for i in range(sample_num):
            roi, _ = lib.cut_head([im], point, i)
            rois.append(cv2.resize(roi[0], (input_size, input_size)))

        # vis_ = np.concatenate(rois, 1)
        # cv2.imwrite('vis.jpg', vis_)

        bgr_mean = np.array([103.939, 116.779, 123.68])
        bgr_mean = bgr_mean[np.newaxis, :, np.newaxis, np.newaxis]
        bgr_mean = torch.from_numpy(bgr_mean).float().cuda()

        rois = torch.from_numpy(np.array(rois)).float().cuda()
        rois = rois.permute((0, 3, 1, 2))
        prob = net(rois - bgr_mean)
        prob = F.softmax(prob, dim=1)
        prob = prob.data.cpu().numpy()
        prob = 1 - np.mean(np.sort(prob[:, 0])[np.round(sample_num / 2).astype(int):])
    return prob, face_info

def setup(arch, layers):
    num_class = 2
    if arch.lower() == 'sppnet':
        net = SPPNet(backbone=layers, num_class=num_class)
    net = net.cuda()
    net.eval()
    return net

def predict_deepfake_video(input, arch="sppnet", layers=50, input_size=224, save_dir="./ckpt/", ckpt_name="SPP-res50.pth"):
    net = setup(arch, layers)
    model_path = os.path.join(save_dir, ckpt_name)
    if os.path.isfile(model_path):
        #print("=> loading checkpoint '{}'".format(model_path))
        checkpoint = torch.load(model_path)
        start_epoch = checkpoint['epoch']
        net.load_state_dict(checkpoint['net'])
        #print("=> loaded checkpoint '{}' (epoch {})".format(model_path, start_epoch))
    else:
        raise ValueError("=> no checkpoint found at '{}'".format(model_path))

    f_path = input
    #print('\nTesting: ' + f_path)
    suffix = f_path.split('.')[-1]
    if suffix.lower() in ['jpg', 'png', 'jpeg', 'bmp', 'tif', 'nef', 'raf']:
        im = cv2.imread(f_path)
        if im is None:
            prob = -1
        else:
            prob, face_info = im_test(net, im, input_size)
        print(prob)

    elif suffix.lower() in ['mp4', 'avi', 'mov']:
        # Parse video
        imgs, frame_num, fps, width, height = pv.parse_vid(f_path)
        total_frames_to_process = min(300, len(imgs))
        probs = []
        for fid, im in enumerate(imgs):
          if fid >= total_frames_to_process:
            break

          prob, face_info = im_test(net, im, input_size)
          if prob != -1:
            probs.append(1-prob)
          else:
            # if the model cant find a face, the result is 0.5 which can be interpreted as guessing Fake or Real
            probs.append(0.5)

        probs = np.mean(probs)
        #print(f"Probability: {probs}")
        return probs

# FF++

In [None]:
ffpp_dir = "/content/deepfake_detection_datasets/FFPP/"
dataset = "FaceForensics++"

In [None]:
# images_per_class should be devidable by 6
def detect_ffpp_deepfakes(ffpp_dir, images_per_class=30):
    # Initialize arrays to store scores for real and fake videos.
    scores_real = np.array([])
    scores_fake = np.array([])
    # Dictionaries to keep track of how many files have been processed in each subfolder.
    manipulated_predicted_count = {}
    original_predicted_count = {}

    # Calculate the number of files to be processed per subfolder based on the folder type.
    files_per_manipulated_subfolder = images_per_class / 6
    files_per_original_subfolder = images_per_class / 2

    # Total number of files targeted for processing from all subfolders.
    total_files_to_process = 2 * images_per_class
    processed_files = 0  # Counter for the number of files processed.

    # Set up a progress bar with the total number of files to process.
    pbar = tqdm(total=total_files_to_process)
    pbar.set_description(f"Processing: ")

    # Walk through the directory structure of the given path.
    for root, _, files in os.walk(ffpp_dir):
        for file in files:
            # Stop processing if the number of processed files meets the target.
            if processed_files >= total_files_to_process:
                break

            # Process only MP4 video files.
            if file.endswith('.mp4'):
                vid_path = os.path.join(root, file)
                rel_path = os.path.relpath(vid_path, ffpp_dir)
                path_parts = rel_path.split(os.sep)

                # Check if the video is from a manipulated or original sequence based on folder names.
                is_manipulated = path_parts[0] == 'manipulated_sequences'
                subfolder_path = os.path.join(path_parts[0], path_parts[1])

                # Process videos from manipulated sequences.
                if is_manipulated:
                    # Ensure not to process more videos than the set limit for the subfolder.
                    if manipulated_predicted_count.get(subfolder_path, 0) < files_per_manipulated_subfolder:
                        faces_pred = predict_deepfake_video(vid_path)
                        scores_fake = np.append(scores_fake, faces_pred)
                        manipulated_predicted_count[subfolder_path] = manipulated_predicted_count.get(subfolder_path, 0) + 1
                        processed_files += 1
                        pbar.update(1)
                # Process videos from original sequences.
                else:
                    if original_predicted_count.get(subfolder_path, 0) < files_per_original_subfolder:
                        faces_pred = predict_deepfake_video(vid_path)
                        scores_real = np.append(scores_real, faces_pred)
                        original_predicted_count[subfolder_path] = original_predicted_count.get(subfolder_path, 0) + 1
                        processed_files += 1
                        pbar.update(1)

        # Break the outer loop if the processing limit is reached.
        if processed_files >= total_files_to_process:
            break

    # Close the progress bar upon completion.
    pbar.close()
    # Return the arrays containing the fake and real scores.
    return scores_fake, scores_real

In [None]:
score_fake, score_real = detect_ffpp_deepfakes(ffpp_dir)

Downloading: "https://download.pytorch.org/models/resnet50-0676ba61.pth" to /root/.cache/torch/hub/checkpoints/resnet50-0676ba61.pth

  0%|          | 0.00/97.8M [00:00<?, ?B/s][A
 13%|█▎        | 12.5M/97.8M [00:00<00:00, 131MB/s][A
 26%|██▌       | 25.3M/97.8M [00:00<00:00, 133MB/s][A
 39%|███▉      | 38.4M/97.8M [00:00<00:00, 135MB/s][A
 52%|█████▏    | 51.3M/97.8M [00:00<00:00, 121MB/s][A
 64%|██████▍   | 63.0M/97.8M [00:00<00:00, 114MB/s][A
 76%|███████▌  | 74.0M/97.8M [00:00<00:00, 111MB/s][A
 87%|████████▋ | 84.7M/97.8M [00:00<00:00, 109MB/s][A
100%|██████████| 97.8M/97.8M [00:00<00:00, 113MB/s]
Processing: : 100%|██████████| 60/60 [1:16:41<00:00, 76.70s/it]


# Celeb-DF

In [None]:
real_celeb_dir = "/content/deepfake_detection_datasets/Celeb-DF/Celeb-real"
fake_celeb_dir = "/content/deepfake_detection_datasets/Celeb-DF/Celeb-synthesis"
dataset = "CelebDF"

In [None]:
def detect_celeb_deepfake(deepfake_folder, num_videos=5):
  scores = np.array([]);

  for deepfake in tqdm(islice(os.listdir(deepfake_folder), num_videos), total=num_videos):
    vid_path = os.path.join(deepfake_folder, deepfake)
    faces_pred = predict_deepfake_video(vid_path)

    scores = np.append(scores, faces_pred)
  return scores

In [None]:
#score_real = detect_celeb_deepfake(real_celeb_dir)
score_fake = detect_celeb_deepfake(fake_celeb_dir)

In [None]:
score_fake

# FaceAVCeleb

In [None]:
fakeavceleb_metadata = "/content/deepfake_detection_datasets/FakeAVCeleb/meta_data.csv"
dataset = "FakeAVCeleb"

In [None]:
# files_per_class must be devidable by 10
def detect_fakeAVCeleb_deepfake(metadata_file, videos_per_class=30):
  metadata = pd.read_csv(metadata_file)

  scores_real = np.array([]);
  scores_fake = np.array([]);

  # Filter for 'RealVideo-RealAudio' category
  real_videos = metadata[metadata['type'] == 'RealVideo-RealAudio'].head(videos_per_class)

  videos_per_fake_class = int(videos_per_class/10)
  # Filter for 'FakeVideo-RealAudio' category and sample
  fake_videos = metadata[metadata['type'] == 'FakeVideo-RealAudio']
  sampled_fakes = fake_videos.groupby(['race', 'gender']).apply(
        lambda x: x.sample(n=videos_per_fake_class, replace=False) if len(x) >= videos_per_fake_class else x).reset_index(drop=True)

  # Concatenate real and sampled fake videos
  final_metadata = pd.concat([real_videos, sampled_fakes]).reset_index(drop=True)

  for index, properties in tqdm(final_metadata.iterrows(), total=len(final_metadata.index)):
    vid_path = os.path.join("/content/deepfake_detection_datasets/", properties['Unnamed: 9'], properties['path'])

    faces_pred = predict_deepfake_video(vid_path)

    if properties['method'] == "real":
      scores_real = np.append(scores_real, faces_pred)
    else:
      scores_fake = np.append(scores_fake, faces_pred)

  return scores_fake, scores_real

In [None]:
score_fake, score_real = detect_fakeAVCeleb_deepfake(fakeavceleb_metadata)

100%|██████████| 60/60 [09:50<00:00,  9.84s/it]


# DFDC

In [None]:
dfdc_dir = "/content/deepfake_detection_datasets/DFDC/train_sample_videos"
dataset = "DFDC"

In [None]:
# videos_per_class can not be bigger then 77 because there are only 77 real videos
def detect_dfdc_deepfake(deepfake_folder, videos_per_class=30):
  metadata_file = deepfake_folder + "/metadata.json"
  video_count = {'REAL': 0, 'FAKE': 0}
  pbar = tqdm(total=videos_per_class*2)

  with open(metadata_file, 'r') as file:
      metadata = json.load(file)

  scores_real = np.array([]);
  scores_fake = np.array([]);
  for deepfake, properties in metadata.items():
    if video_count[properties['label']] < videos_per_class:
      vid_path = os.path.join(deepfake_folder, deepfake)
      faces_pred = predict_deepfake_video(vid_path)

      if properties['label'] == "FAKE":
        scores_fake = np.append(scores_fake, faces_pred)
        video_count['FAKE'] += 1
      else:
        scores_real = np.append(scores_real, faces_pred)
        video_count['REAL'] += 1
      pbar.update(1)

  pbar.close()
  return scores_fake, scores_real

In [None]:
score_fake, score_real = detect_dfdc_deepfake(dfdc_dir)



  2%|▏         | 1/60 [02:09<2:07:36, 129.77s/it][A
  3%|▎         | 2/60 [04:13<2:02:04, 126.28s/it][A
  5%|▌         | 3/60 [06:22<2:01:17, 127.68s/it][A
  7%|▋         | 4/60 [08:09<1:51:16, 119.22s/it][A
  8%|▊         | 5/60 [10:18<1:52:28, 122.70s/it][A
 10%|█         | 6/60 [12:26<1:52:17, 124.77s/it][A
 12%|█▏        | 7/60 [14:35<1:51:12, 125.89s/it][A
 13%|█▎        | 8/60 [16:44<1:50:04, 127.02s/it][A
 15%|█▌        | 9/60 [18:55<1:49:08, 128.40s/it][A
 17%|█▋        | 10/60 [21:04<1:47:06, 128.53s/it][A
 18%|█▊        | 11/60 [22:48<1:38:47, 120.97s/it][A
 20%|██        | 12/60 [24:59<1:39:17, 124.12s/it][A
 22%|██▏       | 13/60 [27:08<1:38:22, 125.59s/it][A
 23%|██▎       | 14/60 [29:18<1:37:15, 126.86s/it][A
 25%|██▌       | 15/60 [31:26<1:35:27, 127.27s/it][A
 27%|██▋       | 16/60 [33:36<1:33:52, 128.00s/it][A
 28%|██▊       | 17/60 [35:45<1:31:55, 128.27s/it][A
 30%|███       | 18/60 [37:57<1:30:32, 129.34s/it][A
 32%|███▏      | 19/60 [40:02<1:27:

# TIMIT

In [None]:
timit_high_dir = "/content/deepfake_detection_datasets/DeepfakeTIMIT/deepfakes_higher_quality"
timit_low_dir = "/content/deepfake_detection_datasets/DeepfakeTIMIT/deepfakes_lower_quality"

In [None]:
def detect_timit_deepfake(deepfake_folder, num_videos=60):
  scores = np.array([]);

  for deepfake in tqdm(islice(os.listdir(deepfake_folder), num_videos), total=num_videos):
    vid_path = os.path.join(deepfake_folder, deepfake)
    faces_pred = predict_deepfake_video(vid_path)

    scores = np.append(scores, faces_pred)
  return scores

In [None]:
# @title High Quality
dataset = "TIMIT (Hohe Auflösung"

score_fake = detect_timit_deepfake(timit_high_dir)
score_real = np.array([])

In [None]:
# @title Low Quality
dataset = "TIMIT (Niedrige Auflösung"

score_fake = detect_timit_deepfake(timit_low_dir)
score_real = np.array([])

100%|██████████| 60/60 [09:38<00:00,  9.64s/it]


# VASA

In [None]:
vasa_dir = "/content/deepfake_detection_datasets/VASA-1"
dataset = "VASA-1"

In [None]:
def detect_vasa_deepfake(deepfake_folder):
  scores = np.array([]);

  for deepfake in tqdm(os.listdir(deepfake_folder)):
    vid_path = os.path.join(deepfake_folder, deepfake)
    faces_pred = predict_deepfake_video(vid_path)

    scores = np.append(scores, faces_pred)
  return scores

In [None]:
score_fake = detect_vasa_deepfake(vasa_dir)
score_real = np.array([])

100%|██████████| 15/15 [07:22<00:00, 29.51s/it]


# Create Diagrams

In [None]:
# Use the default PLT style, in case it was changed for the confusion matrix
plt.style.use('default')

_, _, bars = plt.hist(score_real, range=(0,1), color='steelblue', edgecolor='black')
plt.title(f'{net_model} ({train_db}) + {dataset} (echt)')
plt.xlabel(f'Punktzahl\nMittelwert: {score_real.mean()}')
plt.bar_label(bars)
plt.xlim(0, 1)
plt.ylabel('#Videos')
plt.show()

_, _, bars = plt.hist(score_fake, range=(0,1), color='steelblue', edgecolor='black')
plt.title(f'{net_model} ({train_db}) + {dataset} (manipuliert)')
plt.xlabel(f'Punktzahl\nMittelwert: {score_fake.mean()}')
plt.bar_label(bars)
plt.xlim(0, 1)
plt.ylabel('#Videos')
plt.show()

In [None]:
# Getting Data
TP = np.sum(score_fake > 0.6) # True Positives  => Fake Videos als fake Klassifiziert
FN = np.sum(score_fake < 0.6) # False Negatives => Fake Videos als echt Klassifiziert
FP = np.sum(score_real > 0.4) # False Positives => Echt Videos als fake Klassifiziert
TN = np.sum(score_real < 0.4) # True Negatives  => Echt Videos als echt Klassifiziert

# Create the confusion matrix
confusion_matrix = np.array([[TP, FN], [FP, TN]])
# Create the nomalized confusion matrix
normalized_confusion_matrix = normalize(confusion_matrix, norm='l1', axis=1)

In [None]:
# Labels for the classes
labels = ["Fake", "Echt"]

# Create a heatmap
sns.set(color_codes=False)
sns.heatmap(confusion_matrix, annot=True, fmt='.0f', cmap='Blues', cbar=False, xticklabels=labels, yticklabels=labels)

plt.figure(1, figsize=(5,5))
plt.title(f'{net_model} ({train_db}) + {dataset}')
plt.ylabel("Wirkliche Labels")
plt.xlabel('Vorhergesagte Labels')
plt.show()

In [None]:
# Labels for the classes
labels = ["Fake", "Echt"]

# Create a heatmap
sns.set(color_codes=False)
sns.heatmap(normalized_confusion_matrix, annot=True, fmt='.2f', cmap='Blues', cbar=False, xticklabels=labels, yticklabels=labels)

plt.figure(1, figsize=(5,5))
plt.title(f'{net_model} ({train_db}) + {dataset}')
plt.ylabel("Wirkliche Labels")
plt.xlabel('Vorhergesagte Labels')
plt.show()