<a href="https://colab.research.google.com/github/darshan-k3/Low-Complexity-Deepfake-Detection-Model/blob/main/Unimodal_Model_Training_and_Evaluation_FINAL.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#@title 0.1 Installs Necessary Python Libraries
import gdown
import os
# update at the end
requirements_txt_url = 'https://drive.google.com/file/d/1yKZtv18wki7O1Ncruv0fclH7lbtvTWiQ/view?usp=drive_link'
if not os.path.isfile('requirements.txt'):
  gdown.download(requirements_txt_url, fuzzy=True)

# %pip install -r requirements.txt

In [None]:
#@title 1.0. Download benchmark dataset, clear files from previous runs

# Make sure Google Drive has been mounted
from google.colab import drive
import gdown
# drive.mount('/content/drive', force_remount=True)

import os
# !rm -r benchmark_dataset
if not os.path.isfile('benchmark_dataset.zip'):
  gdown.download('https://drive.google.com/file/d/1E1QwFJeby_bkRFd2SbIWS9-td3SKqNp6/view?usp=sharing', fuzzy=True)

if not os.path.isdir('benchmark_dataset'):
  !unzip benchmark_dataset.zip

# Remove hidden .ipynb checkpoints folder
if os.path.isdir('benchmark_dataset/RealVideo-FakeAudio/.ipynb_checkpoints'):
  !rm -r benchmark_dataset/RealVideo-FakeAudio/.ipynb_checkpoints
  !rm -r benchmark_dataset/FakeVideo-RealAudio/.ipynb_checkpoints

# Remove any preexisting pretrained models
models = ['scaler_audio.pkl', 'scaler_video.pkl', 'svm_model_audio.pkl', 'svm_model_video.pkl',
          'checkpoint.pt', 'mlp_video_model.keras', 'mlp_audio_model.keras']

base_model_dir = 'models'

for model in models:
  model_path = os.path.join(base_model_dir, model)
  if os.path.isfile(model_path):
    !rm -r model_path


if os.path.isdir('temp'):
  !rm -r temp/
os.makedirs('temp', exist_ok=True)

In [3]:
#@title 1.1 Create Testing Dataset (Optional)

# The purpose of this dataset is to debug any problems with the pipeline
# running on larger sets
import shutil
import os
import random

if os.path.isdir('debugging_dataset'):
  !rm -r test_dataset

real_real_dir = os.path.join('benchmark_dataset', 'RealVideo-RealAudio')
real_fake_dir = os.path.join('benchmark_dataset', 'RealVideo-FakeAudio')
fake_real_dir = os.path.join('benchmark_dataset', 'FakeVideo-RealAudio')
fake_fake_dir = os.path.join('benchmark_dataset', 'FakeVideo-FakeAudio')




os.makedirs(os.path.join('debugging_dataset', 'RealVideo-RealAudio'), exist_ok=True)
os.makedirs(os.path.join('debugging_dataset', 'FakeVideo-FakeAudio'), exist_ok=True)
os.makedirs(os.path.join('debugging_dataset', 'FakeVideo-RealAudio'), exist_ok=True)
os.makedirs(os.path.join('debugging_dataset', 'RealVideo-FakeAudio'), exist_ok=True)



test_real_real = random.sample(os.listdir(real_real_dir), 5)

for file in test_real_real:
  shutil.copy(os.path.join(real_real_dir, file), os.path.join('debugging_dataset', 'RealVideo-RealAudio'))

test_real_fake = random.sample(os.listdir(real_fake_dir), 5)

for file in test_real_fake:
  shutil.copy(os.path.join(real_fake_dir, file), os.path.join('debugging_dataset', 'RealVideo-FakeAudio'))

test_fake_real = random.sample(os.listdir(fake_real_dir), 5)

for file in test_fake_real:
  shutil.copy(os.path.join(fake_real_dir, file), os.path.join('debugging_dataset', 'FakeVideo-RealAudio'))

test_fake_fake = random.sample(os.listdir(fake_fake_dir), 5)

for file in test_fake_fake:
  shutil.copy(os.path.join(fake_fake_dir, file), os.path.join('debugging_dataset', 'FakeVideo-FakeAudio'))


In [None]:
#@title 1.2 Create Train and Test Directories

import os
import shutil
from sklearn.model_selection import train_test_split
from pathlib import Path

dataset_dir = Path('benchmark_dataset')

!rm -r test_dataset
!rm -r train_dataset

train_dir = dataset_dir.parent / 'train_dataset'
test_dir = dataset_dir.parent / 'test_dataset'

train_dir.mkdir(parents=True, exist_ok=True)
test_dir.mkdir(parents=True, exist_ok=True)

label_mapping = {
    'RealVideo-RealAudio': 0,
    'RealVideo-FakeAudio': 1,
    'FakeVideo-RealAudio': 1,
    'FakeVideo-FakeAudio': 1,
}

files = []
labels = []

# Collect files and labels
for class_name in label_mapping.keys():
    class_dir = dataset_dir / class_name
    for file in os.listdir(class_dir):
        if file.endswith(".mp4"):  # Assuming you're working with mp4 files
            files.append(class_dir / file)
            labels.append(label_mapping[class_name])

# Test train split
files_train, files_test, _, _ = train_test_split(files, labels, test_size=0.2, random_state=42)


def copy_files(files, destination):
    for file_path in files:
        # Construct the new path in the destination directory
        new_path = destination / file_path.relative_to(dataset_dir)
        new_path.parent.mkdir(parents=True, exist_ok=True)  # Create subdirectories if they don't exist
        shutil.copy(file_path, new_path)  # Copy the file to the new location

copy_files(files_train, train_dir)
copy_files(files_test, test_dir)


In [6]:
#@title test - check debugging dataset contains 20 files
import os
count = 0
for root, dirs, files in os.walk('debugging_dataset'):
  for file in files:
    if file.endswith('.mp4'):
      count += 1
assert count == 20

In [None]:
# Remove models - used in each pipeline to clear away stored models from previous runs
def remove_old_files():
  !rm -r scaler_audio.pkl
  !rm -r scaler_video.pkl
  !rm -r scaler_combined.pkl
  !rm -r svm_model_combined.pkl
  !rm -r svm_model_audio.pkl
  !rm -r svm_model_video.pkl
  !rm -r mlp_audio_model.keras
  !rm -r mlp_video_model.keras
  !rm -r mlp_combined_model.keras

In [14]:
!pip freeze > requirements.txt

In [7]:
#@title Streamlined code
import os
import time
import numpy as np
import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVCa
from sklearn.metrics import accuracy_score, fbeta_score, roc_curve, auc, recall_score, confusion_matrix, classification_report
import joblib
import librosa
import cv2
from skimage.feature import local_binary_pattern
from keras.models import Sequential, load_model
from moviepy.video.io.ffmpeg_tools import ffmpeg_extract_audio
from sklearn.svm import SVC
from sklearn.preprocessing import StandardScaler
from keras.layers import Dense
from keras.optimizers import Adam
from sklearn.preprocessing import LabelEncoder, normalize
from keras.utils import to_categorical
import albumentations as A

def extract_audio_features(audio_path, n_mfcc=5, n_fft=2048, hop_length=512):
    try:
        audio_data, sr = librosa.load(audio_path, sr=None)
        mfccs = librosa.feature.mfcc(y=audio_data, sr=sr, n_mfcc=n_mfcc, n_fft=n_fft, hop_length=hop_length)

        S = librosa.magphase(librosa.stft(audio_data, n_fft=n_fft, hop_length=hop_length, window=np.ones, center=False))[0]
        rms = librosa.feature.rms(S=S, frame_length=n_fft, hop_length=hop_length)
        zero_crossing_rate = librosa.feature.zero_crossing_rate(y=audio_data, frame_length=n_fft, hop_length=hop_length)


        # Normalise features
        mfcc_norm = normalize(mfccs, norm='l2', axis=1)
        rms_norm = normalize(rms, norm='l2', axis=1)
        zcr_norm = normalize(zero_crossing_rate, norm='l2', axis=1)

        weight_rms = 2.0  # Increase RMS influence
        weight_zcr = 2.0  # Increase ZCR influence

        # Calculate statistical features: mean and standard deviation
        mfcc_mean = np.mean(mfcc_norm, axis=1)
        mfcc_std = np.std(mfcc_norm, axis=1)
        rms_mean = np.mean(rms_norm, axis=1) * weight_rms
        rms_std = np.std(rms_norm, axis=1) * weight_rms
        zcr_mean = np.mean(zcr_norm, axis=1) * weight_zcr
        zcr_std = np.std(zcr_norm, axis=1) * weight_zcr

        # Combine statistical features
        combined_features = np.hstack((mfcc_mean, mfcc_std, rms_mean, rms_std, zcr_mean, zcr_std))
        print(f'Audio Feature shape: {combined_features.shape}')
        return combined_features

    except Exception as e:
        print(f"Error processing audio file {audio_path}: {e}")
        return None

transform_pipeline = A.Compose([
    A.HorizontalFlip(p=0.5),
    A.RandomBrightnessContrast(brightness_limit=0.2, contrast_limit=0.25, p=0.5),
    A.OneOf([
        A.MotionBlur(p=0.2),
        A.MedianBlur(blur_limit=3, p=0.1),
        A.GaussianBlur(blur_limit=3, p=0.1),
    ], p=0.2),
    A.ShiftScaleRotate(shift_limit=0.0625, scale_limit=0.2, rotate_limit=15, p=0.5),
])


def extract_video_features(video_path, is_training=True, sample_count=50, max_attempts=5):
    face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')
    lbp_histograms = []

    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print("Error: Could not open video.")
        return None

    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    attempts = 0
    detected = False

    while not detected and attempts < max_attempts:
        frame_indices = np.linspace(0, total_frames - 1, sample_count, dtype=int)

        for frame_index in frame_indices:
            cap.set(cv2.CAP_PROP_POS_FRAMES, frame_index)
            ret, frame = cap.read()
            if not ret:
                continue

            # Don't augment testing frames
            if is_training:
                augmented = transform_pipeline(image=frame)
                frame = augmented['image']

            gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
            faces = face_cascade.detectMultiScale(gray, 1.1, 4)

            if len(faces) > 0:
                detected = True  # At least one face was detected

            for (x, y, w, h) in faces:
                face_region = gray[y:y+h, x:x+w]
                lbp = local_binary_pattern(face_region, P=8, R=1, method="uniform")
                (lbp_hist, _) = np.histogram(lbp.ravel(), bins=np.arange(257), range=(0, 256))
                lbp_hist = lbp_hist.astype("float")
                lbp_hist /= (lbp_hist.sum() + 1e-7)
                lbp_histograms.append(lbp_hist)

        if not detected:
            print(f"No faces detected on attempt {attempts+1}. Increasing sampling rate.")
            sample_count = int(sample_count * 1.5)  # Increase sampling rate by 50%
            attempts += 1

    cap.release()
    if len(lbp_histograms) > 0:
        print(f'Video Feature shape: {np.array(lbp_histograms).shape}')
        return np.mean(lbp_histograms, axis=0)
    else:
        print("No facial features detected in the video.")
        return None

def load_data(root_dir, is_training):
    X_video, X_audio, y = [], [], []
    count = 0

    # Mapping directory names to labels
    label_mapping = {
        'RealVideo-RealAudio': 0,
        'RealVideo-FakeAudio': 1,
        'FakeVideo-RealAudio': 1,
        'FakeVideo-FakeAudio': 1,
    }

    for class_name in sorted(os.listdir(root_dir)):
        class_dir = os.path.join(root_dir, class_name)
        label = label_mapping.get(class_name, 0)  # Default to 0 (real) if not found

        for video_name in sorted(os.listdir(class_dir)):
            print(f'\tProcessing {video_name} ... ')
            video_path = os.path.join(class_dir, video_name)
            video_features = extract_video_features(video_path, is_training)
            if video_features is None:
              print(f'\tFOUND CORRUPTED VIDEO: {video_path}')
            if video_features is not None:
                X_video.append(video_features)  # No padding needed here

            audio_output_path = os.path.join("temp", video_name.split('.')[0] + ".wav")
            if not os.path.isfile(audio_output_path):
                ffmpeg_extract_audio(video_path, audio_output_path)
            audio_features = extract_audio_features(audio_output_path)
            if audio_features is not None:
                X_audio.append(audio_features)
                y.append(label)

            count += 1
            print(f'Processed {count} videos')
    return np.array(X_video, dtype=object), np.array(X_audio), np.array(y)


def train_mlp(X_video, X_audio, y, early_fusion=True):
    if early_fusion:
        X = np.concatenate((X_video, X_audio), axis=1)
    else:
        X = X_video if X_video is not None else X_audio

    encoder = LabelEncoder()
    encoder.fit(y)
    encoded_Y = encoder.transform(y)
    dummy_y = to_categorical(encoded_Y)

    model = Sequential()
    model.add(Dense(512, input_dim=X.shape[1], activation='relu'))
    model.add(Dense(256, activation='relu'))
    model.add(Dense(dummy_y.shape[1], activation='softmax'))
    model.compile(loss='categorical_crossentropy', optimizer=Adam(lr=0.001), metrics=['accuracy'])

    model.fit(X, dummy_y, epochs=20, batch_size=32, verbose=1)

    return model


def train_svm(X_video, X_audio, y, early_fusion=True):
    if early_fusion:
        X = np.concatenate((X_video, X_audio), axis=1)  # Early fusion by concatenating features
    else:
        X = X_video if X_video is not None else X_audio  # For late fusion, this function is called separately for video and audio

    scaler = StandardScaler().fit(X)
    X_scaled = scaler.transform(X)

    model = SVC(C=1.0, kernel='rbf', probability=True)
    model.fit(X_scaled, y)

    return model, scaler


def scale_features(X_video, X_audio):
    scaler_video = StandardScaler()
    X_video_scaled = scaler_video.fit_transform(X_video)

    scaler_audio = StandardScaler()
    X_audio_scaled = scaler_audio.fit_transform(X_audio)

    return X_video_scaled, X_audio_scaled, scaler_video, scaler_audio

def evaluate_model(X_video_test, X_audio_test, y_test, model, scaler_video, scaler_audio, model_type="MLP", fusion_type="Early"):

    X_video_test_scaled = scaler_video.transform(X_video_test)
    print(f'Shape of X_video_test_scaled: {X_video_test_scaled.shape}')
    X_audio_test_scaled = scaler_audio.transform(X_audio_test)
    print(f'Shape of X_audio_test_scaled: {X_audio_test_scaled.shape}')

    if fusion_type == "Early":
        X_test_combined_scaled = np.concatenate((X_video_test_scaled, X_audio_test_scaled), axis=1)
        print(f'Shape of X test: {X_test_combined_scaled.shape}')
        if model_type == "MLP":
            combined_probs = model.predict(X_test_combined_scaled)[:, 1]
            print(f'Shape of combined_predictions: {combined_probs.shape}')
        else:  # SVM
            combined_probs = model.predict_proba(X_test_combined_scaled)[:, 1]
    else:  # Late Fusion
        if model_type == "MLP":
            video_probs = model[0].predict(X_video_test_scaled)[:, 1]
            audio_probs = model[1].predict(X_audio_test_scaled)[:, 1]
        else:  # SVM
            video_probs = model[0].predict_proba(X_video_test_scaled)[:, 1]
            audio_probs = model[1].predict_proba(X_audio_test_scaled)[:, 1]
        combined_probs = (video_probs + audio_probs) / 2

    combined_predictions = (combined_probs > 0.5).astype(int)

    # Evaluate and display results
    accuracy = accuracy_score(y_test, combined_predictions)
    f2_score = fbeta_score(y_test, combined_predictions, beta=2)
    roc_auc = auc(*roc_curve(y_test, combined_probs)[:2])

    print(f"Model Type: {model_type}, Fusion Type: {fusion_type}")
    print(f"Accuracy: {accuracy:.4f}, F2-Score: {f2_score:.4f}, ROC AUC: {roc_auc:.4f}")
    print(classification_report(y_test, combined_predictions, target_names=['real', 'deepfake']))

def main(train_dataset, test_dataset, model_type="MLP", fusion_type="Early"):
    remove_old_files()
    start_time = time.monotonic()
    # Load and scale features
    X_video, X_audio, y = load_data(train_dataset, is_training=True)
    X_video_scaled, X_audio_scaled, scaler_video, scaler_audio = scale_features(X_video, X_audio)

    # Model training
    if model_type == "MLP":
        if fusion_type == "Early":
            model = train_mlp(X_video_scaled, X_audio_scaled, y, early_fusion=True)
        else:  # Late Fusion
            model_video = train_mlp(X_video_scaled, None, y, early_fusion=False)
            model_audio = train_mlp(None, X_audio_scaled, y, early_fusion=False)
            model = (model_video, model_audio)
    else:  # SVM
        if fusion_type == "Early":
            model, _ = train_svm(X_video_scaled, X_audio_scaled, y, early_fusion=True)
        else:  # Late Fusion
            model_video, _ = train_svm(X_video_scaled, None, y, early_fusion=False)
            model_audio, _ = train_svm(None, X_audio_scaled, y, early_fusion=False)
            model = (model_video, model_audio)

    # Evaluation
    X_video_test, X_audio_test, y_test = load_data(test_dataset, is_training=False)
    evaluate_model(X_video_test, X_audio_test, y_test, model, scaler_video, scaler_audio, model_type, fusion_type)
    end_time = time.monotonic()
    print(f'Time Taken for Model to Evaluate on Test Dataset is {(end_time - start_time):.4f}')




In [10]:
import os
import time
import numpy as np
import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVC
from sklearn.metrics import accuracy_score, fbeta_score, roc_curve, auc, recall_score, confusion_matrix, classification_report
import joblib
import librosa
import cv2
from skimage.feature import local_binary_pattern
from keras.models import Sequential, load_model
from moviepy.video.io.ffmpeg_tools import ffmpeg_extract_audio
from sklearn.svm import SVC
from sklearn.preprocessing import StandardScaler
from keras.layers import Dense
from keras.optimizers import Adam
from sklearn.preprocessing import LabelEncoder, normalize
from keras.utils import to_categorical
import logging
import albumentations as A

def load_data_unimodal(root_dir, modality='audio', is_training=True):
    X, y = [], []
    count = 0
    label_mapping = {'RealVideo-RealAudio': 0, 'RealVideo-FakeAudio': 1, 'FakeVideo-RealAudio': 1, 'FakeVideo-FakeAudio': 1}

    for class_name in os.listdir(root_dir):
        class_dir = os.path.join(root_dir, class_name)
        label = label_mapping.get(class_name, 0)

        for video_name in os.listdir(class_dir):
            video_path = os.path.join(class_dir, video_name)

            if modality == 'audio':
                audio_output_path = os.path.join("temp", video_name.split('.')[0] + ".wav")
                if not os.path.isfile(audio_output_path):
                    ffmpeg_extract_audio(video_path, audio_output_path)
                features = extract_audio_features(audio_output_path)

            elif modality == 'video':
                features = extract_video_features(video_path, is_training)

            if features is not None:
                X.append(features)
                y.append(label)

            count += 1
            print(f'Processed {count} {modality} files')

    return np.array(X), np.array(y)

def train_model_unimodal(X, y, model_type="MLP"):
    encoder = LabelEncoder()
    y_encoded = encoder.fit_transform(y)
    y_dummy = to_categorical(y_encoded)

    if model_type == "MLP":
        model = Sequential([
            Dense(512, input_dim=X.shape[1], activation='relu'),
            Dense(256, activation='relu'),
            Dense(y_dummy.shape[1], activation='softmax')
        ])
        model.compile(loss='categorical_crossentropy', optimizer=Adam(lr=0.001), metrics=['accuracy'])
        model.fit(X, y_dummy, epochs=20, batch_size=32, verbose=1)
    else:  # SVM
        scaler = StandardScaler().fit(X)
        X_scaled = scaler.transform(X)
        model = SVC(C=1.0, kernel='rbf', probability=True)
        model.fit(X_scaled, y)
        return model, scaler

    return model

def evaluate_model_unimodal(X_test, y_test, model, model_type="MLP", scaler=None):
    if model_type == "SVM":
        X_test = scaler.transform(X_test)
        y_pred_prob = model.predict_proba(X_test)[:, 1]
    else:  # MLP
        y_pred_prob = model.predict(X_test)[:, 1]

    y_pred = (y_pred_prob > 0.5).astype(int)
    accuracy = accuracy_score(y_test, y_pred)
    f2_score = fbeta_score(y_test, y_pred, beta=2)
    roc_auc = auc(*roc_curve(y_test, y_pred_prob)[:2])

    print(f"Accuracy: {accuracy:.4f}, F2-Score: {f2_score:.4f}, ROC AUC: {roc_auc:.4f}")
    print(classification_report(y_test, y_pred, target_names=['real', 'fake']))

# Adjust the main function for unimodal workflow
def main_unimodal(train_dataset, test_dataset, modality="audio", model_type="MLP"):

    X_train, y_train = load_data_unimodal(train_dataset, modality, is_training=True)
    if model_type == "SVM":
        model, scaler = train_model_unimodal(X_train, y_train, model_type)
        start_time = time.monotonic()
        X_test, y_test = load_data_unimodal(test_dataset, modality, is_training=False)
        evaluate_model_unimodal(X_test, y_test, model, model_type, scaler)
    else:
        model = train_model_unimodal(X_train, y_train, model_type)
        start_time = time.monotonic()
        X_test, y_test = load_data_unimodal(test_dataset, modality, is_training=False)
        evaluate_model_unimodal(X_test, y_test, model, model_type)

    end_time = time.monotonic()
    print(f'Time Taken: {(end_time - start_time):.4f} seconds')

In [None]:
main_unimodal('train_dataset', 'test_dataset', modality='audio', model_type='MLP')

Moviepy - Running:
>>> "+ " ".join(cmd)
Moviepy - Command successful
Audio Feature shape: (14,)
Processed 1 audio files
Moviepy - Running:
>>> "+ " ".join(cmd)
Moviepy - Command successful
Audio Feature shape: (14,)
Processed 2 audio files
Moviepy - Running:
>>> "+ " ".join(cmd)
Moviepy - Command successful
Audio Feature shape: (14,)
Processed 3 audio files
Moviepy - Running:
>>> "+ " ".join(cmd)
Moviepy - Command successful
Audio Feature shape: (14,)
Processed 4 audio files
Moviepy - Running:
>>> "+ " ".join(cmd)
Moviepy - Command successful
Audio Feature shape: (14,)
Processed 5 audio files
Moviepy - Running:
>>> "+ " ".join(cmd)
Moviepy - Command successful
Audio Feature shape: (14,)
Processed 6 audio files
Moviepy - Running:
>>> "+ " ".join(cmd)
Moviepy - Command successful
Audio Feature shape: (14,)
Processed 7 audio files
Moviepy - Running:
>>> "+ " ".join(cmd)
Moviepy - Command successful
Audio Feature shape: (14,)
Processed 8 audio files
Moviepy - Running:
>>> "+ " ".join(cmd)




Moviepy - Command successful
Audio Feature shape: (14,)
Processed 160 audio files
Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20
Accuracy: 0.7750, ROC AUC: 0.8033
              precision    recall  f1-score   support

        real       0.53      0.27      0.36        37
        fake       0.81      0.93      0.86       123

    accuracy                           0.78       160
   macro avg       0.67      0.60      0.61       160
weighted avg       0.74      0.78      0.75       160

Time Taken: 103.5681 seconds


In [None]:
main_unimodal('train_dataset', 'test_dataset', modality='video', model_type='MLP')


Video Feature shape: (25, 256)
Processed 1 video files
Video Feature shape: (45, 256)
Processed 2 video files
Video Feature shape: (36, 256)
Processed 3 video files
Video Feature shape: (46, 256)
Processed 4 video files
Video Feature shape: (50, 256)
Processed 5 video files
Video Feature shape: (3, 256)
Processed 6 video files
Video Feature shape: (26, 256)
Processed 7 video files
Video Feature shape: (47, 256)
Processed 8 video files
Video Feature shape: (49, 256)
Processed 9 video files
Video Feature shape: (50, 256)
Processed 10 video files
Video Feature shape: (26, 256)
Processed 11 video files
Video Feature shape: (39, 256)
Processed 12 video files
Video Feature shape: (41, 256)
Processed 13 video files
Video Feature shape: (45, 256)
Processed 14 video files
Video Feature shape: (50, 256)
Processed 15 video files
Video Feature shape: (20, 256)
Processed 16 video files
Video Feature shape: (22, 256)
Processed 17 video files
Video Feature shape: (9, 256)
Processed 18 video files
Vid



Video Feature shape: (43, 256)
Processed 160 video files
Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20
Accuracy: 0.7688, ROC AUC: 0.6757
              precision    recall  f1-score   support

        real       0.00      0.00      0.00        37
        fake       0.77      1.00      0.87       123

    accuracy                           0.77       160
   macro avg       0.38      0.50      0.43       160
weighted avg       0.59      0.77      0.67       160

Time Taken: 1123.7882 seconds


  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))


In [None]:
main_unimodal('train_dataset', 'test_dataset', modality='audio', model_type='SVM')


Audio Feature shape: (14,)
Processed 1 audio files
Audio Feature shape: (14,)
Processed 2 audio files
Audio Feature shape: (14,)
Processed 3 audio files
Audio Feature shape: (14,)
Processed 4 audio files
Audio Feature shape: (14,)
Processed 5 audio files
Audio Feature shape: (14,)
Processed 6 audio files
Audio Feature shape: (14,)
Processed 7 audio files
Audio Feature shape: (14,)
Processed 8 audio files
Audio Feature shape: (14,)
Processed 9 audio files
Audio Feature shape: (14,)
Processed 10 audio files
Audio Feature shape: (14,)
Processed 11 audio files
Audio Feature shape: (14,)
Processed 12 audio files
Audio Feature shape: (14,)
Processed 13 audio files
Audio Feature shape: (14,)
Processed 14 audio files
Audio Feature shape: (14,)
Processed 15 audio files
Audio Feature shape: (14,)
Processed 16 audio files
Audio Feature shape: (14,)
Processed 17 audio files
Audio Feature shape: (14,)
Processed 18 audio files
Audio Feature shape: (14,)
Processed 19 audio files
Audio Feature shape: 

In [None]:
main_unimodal('train_dataset', 'test_dataset', modality='video', model_type='SVM')


Video Feature shape: (16, 256)
Processed 1 video files
Video Feature shape: (46, 256)
Processed 2 video files
Video Feature shape: (38, 256)
Processed 3 video files
Video Feature shape: (42, 256)
Processed 4 video files
Video Feature shape: (47, 256)
Processed 5 video files
Video Feature shape: (5, 256)
Processed 6 video files
Video Feature shape: (20, 256)
Processed 7 video files
Video Feature shape: (47, 256)
Processed 8 video files
Video Feature shape: (47, 256)
Processed 9 video files
Video Feature shape: (51, 256)
Processed 10 video files
Video Feature shape: (33, 256)
Processed 11 video files
Video Feature shape: (43, 256)
Processed 12 video files
Video Feature shape: (40, 256)
Processed 13 video files
Video Feature shape: (44, 256)
Processed 14 video files
Video Feature shape: (45, 256)
Processed 15 video files
Video Feature shape: (19, 256)
Processed 16 video files
Video Feature shape: (26, 256)
Processed 17 video files
Video Feature shape: (10, 256)
Processed 18 video files
Vi

In [None]:
##########################################################################################

In [None]:
main_unimodal('train_dataset', 'test_dataset', modality='audio', model_type='MLP')

Audio Feature shape: (14,)
Processed 1 audio files
Audio Feature shape: (14,)
Processed 2 audio files
Audio Feature shape: (14,)
Processed 3 audio files
Audio Feature shape: (14,)
Processed 4 audio files
Audio Feature shape: (14,)
Processed 5 audio files
Audio Feature shape: (14,)
Processed 6 audio files
Audio Feature shape: (14,)
Processed 7 audio files
Audio Feature shape: (14,)
Processed 8 audio files
Audio Feature shape: (14,)
Processed 9 audio files
Audio Feature shape: (14,)
Processed 10 audio files
Audio Feature shape: (14,)
Processed 11 audio files
Audio Feature shape: (14,)
Processed 12 audio files
Audio Feature shape: (14,)
Processed 13 audio files
Audio Feature shape: (14,)
Processed 14 audio files
Audio Feature shape: (14,)
Processed 15 audio files
Audio Feature shape: (14,)
Processed 16 audio files
Audio Feature shape: (14,)
Processed 17 audio files
Audio Feature shape: (14,)
Processed 18 audio files
Audio Feature shape: (14,)
Processed 19 audio files
Audio Feature shape: 



Audio Feature shape: (14,)
Processed 160 audio files
Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20
Accuracy: 0.7812, F2-Score: 0.8917, ROC AUC: 0.8034
              precision    recall  f1-score   support

        real       0.54      0.35      0.43        37
        fake       0.82      0.91      0.86       123

    accuracy                           0.78       160
   macro avg       0.68      0.63      0.65       160
weighted avg       0.76      0.78      0.76       160

Time Taken: 0.1234 seconds


In [None]:
main_unimodal('train_dataset', 'test_dataset', modality='video', model_type='MLP')

Video Feature shape: (23, 256)
Processed 1 video files
Video Feature shape: (46, 256)
Processed 2 video files
Video Feature shape: (28, 256)
Processed 3 video files
Video Feature shape: (41, 256)
Processed 4 video files
Video Feature shape: (48, 256)
Processed 5 video files
Video Feature shape: (2, 256)
Processed 6 video files
Video Feature shape: (26, 256)
Processed 7 video files
Video Feature shape: (48, 256)
Processed 8 video files
Video Feature shape: (50, 256)
Processed 9 video files
Video Feature shape: (48, 256)
Processed 10 video files
Video Feature shape: (23, 256)
Processed 11 video files
Video Feature shape: (42, 256)
Processed 12 video files
Video Feature shape: (39, 256)
Processed 13 video files
Video Feature shape: (41, 256)
Processed 14 video files
Video Feature shape: (47, 256)
Processed 15 video files
Video Feature shape: (16, 256)
Processed 16 video files
Video Feature shape: (20, 256)
Processed 17 video files
Video Feature shape: (12, 256)
Processed 18 video files
Vi



Video Feature shape: (43, 256)
Processed 160 video files
Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20
Accuracy: 0.7688, F2-Score: 0.9433, ROC AUC: 0.6647
              precision    recall  f1-score   support

        real       0.00      0.00      0.00        37
        fake       0.77      1.00      0.87       123

    accuracy                           0.77       160
   macro avg       0.38      0.50      0.43       160
weighted avg       0.59      0.77      0.67       160

Time Taken: 0.1300 seconds


  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))


In [None]:
main_unimodal('train_dataset', 'test_dataset', modality='audio', model_type='SVM')

Audio Feature shape: (14,)
Processed 1 audio files
Audio Feature shape: (14,)
Processed 2 audio files
Audio Feature shape: (14,)
Processed 3 audio files
Audio Feature shape: (14,)
Processed 4 audio files
Audio Feature shape: (14,)
Processed 5 audio files
Audio Feature shape: (14,)
Processed 6 audio files
Audio Feature shape: (14,)
Processed 7 audio files
Audio Feature shape: (14,)
Processed 8 audio files
Audio Feature shape: (14,)
Processed 9 audio files
Audio Feature shape: (14,)
Processed 10 audio files
Audio Feature shape: (14,)
Processed 11 audio files
Audio Feature shape: (14,)
Processed 12 audio files
Audio Feature shape: (14,)
Processed 13 audio files
Audio Feature shape: (14,)
Processed 14 audio files
Audio Feature shape: (14,)
Processed 15 audio files
Audio Feature shape: (14,)
Processed 16 audio files
Audio Feature shape: (14,)
Processed 17 audio files
Audio Feature shape: (14,)
Processed 18 audio files
Audio Feature shape: (14,)
Processed 19 audio files
Audio Feature shape: 

In [None]:
main_unimodal('train_dataset', 'test_dataset', modality='video', model_type='SVM')

Video Feature shape: (24, 256)
Processed 1 video files
Video Feature shape: (49, 256)
Processed 2 video files
Video Feature shape: (38, 256)
Processed 3 video files
Video Feature shape: (46, 256)
Processed 4 video files
Video Feature shape: (49, 256)
Processed 5 video files
Video Feature shape: (3, 256)
Processed 6 video files
Video Feature shape: (25, 256)
Processed 7 video files
Video Feature shape: (47, 256)
Processed 8 video files
Video Feature shape: (49, 256)
Processed 9 video files
Video Feature shape: (50, 256)
Processed 10 video files
Video Feature shape: (33, 256)
Processed 11 video files
Video Feature shape: (44, 256)
Processed 12 video files
Video Feature shape: (42, 256)
Processed 13 video files
Video Feature shape: (43, 256)
Processed 14 video files
Video Feature shape: (48, 256)
Processed 15 video files
Video Feature shape: (17, 256)
Processed 16 video files
Video Feature shape: (25, 256)
Processed 17 video files
Video Feature shape: (13, 256)
Processed 18 video files
Vi

In [11]:
# Full end to end process of loading and evaluating on data
main_unimodal('train_dataset', 'test_dataset', modality='video', model_type='SVM')

Video Feature shape: (31, 256)
Processed 1 video files
Video Feature shape: (45, 256)
Processed 2 video files
Video Feature shape: (45, 256)
Processed 3 video files
Video Feature shape: (43, 256)
Processed 4 video files
Video Feature shape: (56, 256)
Processed 5 video files
Video Feature shape: (49, 256)
Processed 6 video files
Video Feature shape: (52, 256)
Processed 7 video files
Video Feature shape: (35, 256)
Processed 8 video files
Video Feature shape: (44, 256)
Processed 9 video files
Video Feature shape: (49, 256)
Processed 10 video files
Video Feature shape: (46, 256)
Processed 11 video files
Video Feature shape: (9, 256)
Processed 12 video files
Video Feature shape: (50, 256)
Processed 13 video files
Video Feature shape: (40, 256)
Processed 14 video files
Video Feature shape: (46, 256)
Processed 15 video files
Video Feature shape: (56, 256)
Processed 16 video files
Video Feature shape: (61, 256)
Processed 17 video files
Video Feature shape: (39, 256)
Processed 18 video files
Vi

In [12]:
main_unimodal('train_dataset', 'test_dataset', modality='audio', model_type='SVM')

Audio Feature shape: (14,)
Processed 1 audio files
Moviepy - Running:
>>> "+ " ".join(cmd)
Moviepy - Command successful
Audio Feature shape: (14,)
Processed 2 audio files
Moviepy - Running:
>>> "+ " ".join(cmd)
Moviepy - Command successful
Audio Feature shape: (14,)
Processed 3 audio files
Moviepy - Running:
>>> "+ " ".join(cmd)
Moviepy - Command successful
Audio Feature shape: (14,)
Processed 4 audio files
Moviepy - Running:
>>> "+ " ".join(cmd)
Moviepy - Command successful
Audio Feature shape: (14,)
Processed 5 audio files
Moviepy - Running:
>>> "+ " ".join(cmd)
Moviepy - Command successful
Audio Feature shape: (14,)
Processed 6 audio files
Moviepy - Running:
>>> "+ " ".join(cmd)
Moviepy - Command successful
Audio Feature shape: (14,)
Processed 7 audio files
Moviepy - Running:
>>> "+ " ".join(cmd)
Moviepy - Command successful
Audio Feature shape: (14,)
Processed 8 audio files
Moviepy - Running:
>>> "+ " ".join(cmd)
Moviepy - Command successful
Audio Feature shape: (14,)
Processed 9 a

In [13]:
main_unimodal('train_dataset', 'test_dataset', modality='video', model_type='MLP')

Video Feature shape: (25, 256)
Processed 1 video files
Video Feature shape: (47, 256)
Processed 2 video files
Video Feature shape: (48, 256)
Processed 3 video files
Video Feature shape: (45, 256)
Processed 4 video files
Video Feature shape: (55, 256)
Processed 5 video files
Video Feature shape: (44, 256)
Processed 6 video files
Video Feature shape: (51, 256)
Processed 7 video files
Video Feature shape: (32, 256)
Processed 8 video files
Video Feature shape: (46, 256)
Processed 9 video files
Video Feature shape: (50, 256)
Processed 10 video files
Video Feature shape: (47, 256)
Processed 11 video files
Video Feature shape: (11, 256)
Processed 12 video files
Video Feature shape: (48, 256)
Processed 13 video files
Video Feature shape: (47, 256)
Processed 14 video files
Video Feature shape: (47, 256)
Processed 15 video files
Video Feature shape: (54, 256)
Processed 16 video files
Video Feature shape: (60, 256)
Processed 17 video files
Video Feature shape: (47, 256)
Processed 18 video files
V



Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20
Video Feature shape: (50, 256)
Processed 1 video files
Video Feature shape: (50, 256)
Processed 2 video files
Video Feature shape: (50, 256)
Processed 3 video files
Video Feature shape: (50, 256)
Processed 4 video files
Video Feature shape: (50, 256)
Processed 5 video files
Video Feature shape: (50, 256)
Processed 6 video files
Video Feature shape: (52, 256)
Processed 7 video files
Video Feature shape: (47, 256)
Processed 8 video files
Video Feature shape: (34, 256)
Processed 9 video files
Video Feature shape: (48, 256)
Processed 10 video files
Video Feature shape: (50, 256)
Processed 11 video files
Video Feature shape: (61, 256)
Processed 12 video files
Video Feature shape: (44, 256)
Processed 13 video files
Video Feature shape: (50, 256)
Processed 14 video 

  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))


In [14]:
main_unimodal('train_dataset', 'test_dataset', modality='audio', model_type='MLP')

Audio Feature shape: (14,)
Processed 1 audio files
Audio Feature shape: (14,)
Processed 2 audio files
Audio Feature shape: (14,)
Processed 3 audio files
Audio Feature shape: (14,)
Processed 4 audio files
Audio Feature shape: (14,)
Processed 5 audio files
Audio Feature shape: (14,)
Processed 6 audio files
Audio Feature shape: (14,)
Processed 7 audio files
Audio Feature shape: (14,)
Processed 8 audio files
Audio Feature shape: (14,)
Processed 9 audio files
Audio Feature shape: (14,)
Processed 10 audio files
Audio Feature shape: (14,)
Processed 11 audio files
Audio Feature shape: (14,)
Processed 12 audio files
Audio Feature shape: (14,)
Processed 13 audio files
Audio Feature shape: (14,)
Processed 14 audio files
Audio Feature shape: (14,)
Processed 15 audio files
Audio Feature shape: (14,)
Processed 16 audio files
Audio Feature shape: (14,)
Processed 17 audio files
Audio Feature shape: (14,)
Processed 18 audio files
Audio Feature shape: (14,)
Processed 19 audio files
Audio Feature shape: 



Audio Feature shape: (14,)
Processed 640 audio files
Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20
Audio Feature shape: (14,)
Processed 1 audio files
Audio Feature shape: (14,)
Processed 2 audio files
Audio Feature shape: (14,)
Processed 3 audio files
Audio Feature shape: (14,)
Processed 4 audio files
Audio Feature shape: (14,)
Processed 5 audio files
Audio Feature shape: (14,)
Processed 6 audio files
Audio Feature shape: (14,)
Processed 7 audio files
Audio Feature shape: (14,)
Processed 8 audio files
Audio Feature shape: (14,)
Processed 9 audio files
Audio Feature shape: (14,)
Processed 10 audio files
Audio Feature shape: (14,)
Processed 11 audio files
Audio Feature shape: (14,)
Processed 12 audio files
Audio Feature shape: (14,)
Processed 13 audio files
Audio Feature shape: (14,)
Processed 14 audio fil

In [None]:

# base__train__dir = "train_dataset"
# base__test__dir = "test_dataset"
# # Example usage: train and evaluate an MLP with Early Fusion
# main(base__train__dir, base__test__dir, model_type="MLP", fusion_type="Early")
# # To switch to another configuration, just call main() with different parameters
# # main(model_type="SVM", fusion_type="Late")

In [None]:
# main(base__train__dir, base__test__dir, model_type="MLP", fusion_type="Late")


In [None]:
# main(base__train__dir, base__test__dir, model_type="SVM", fusion_type="Early")


In [None]:
# main(base__train__dir, base__test__dir, model_type="SVM", fusion_type="Late")


In [20]:
#@title 4.0. Xception model

"""
Ported to pytorch thanks to [tstandley](https://github.com/tstandley/Xception-PyTorch)

@author: tstandley
Adapted by cadene

Creates an Xception Model as defined in:

Francois Chollet
Xception: Deep Learning with Depthwise Separable Convolutions
https://arxiv.org/pdf/1610.02357.pdf

This weights ported from the Keras implementation. Achieves the following performance on the validation set:

Loss:0.9173 Prec@1:78.892 Prec@5:94.292

REMEMBER to set your image size to 3x299x299 for both test and validation

normalize = transforms.Normalize(mean=[0.5, 0.5, 0.5],
                                  std=[0.5, 0.5, 0.5])

The resize parameter of the validation transform should be 333, and make sure to center crop at 299x299
"""
import math
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.utils.model_zoo as model_zoo
from torch.nn import init

pretrained_settings = {
    'xception': {
        'imagenet': {
            'url': 'http://data.lip6.fr/cadene/pretrainedmodels/xception-b5690688.pth',
            'input_space': 'RGB',
            'input_size': [3, 299, 299],
            'input_range': [0, 1],
            'mean': [0.5, 0.5, 0.5],
            'std': [0.5, 0.5, 0.5],
            'num_classes': 1000,
            'scale': 0.8975 # The resize parameter of the validation transform should be 333, and make sure to center crop at 299x299
        }
    }
}
def InitXception(model=None, num_class=2, pretrained=True):
    if(not model) :
        assert ("model is empty(None)")
    num_ftrs = model.last_linear.in_features
    model.last_linear = nn.Linear(num_ftrs, num_class)
    return model

class SeparableConv2d(nn.Module):
    def __init__(self,in_channels,out_channels,kernel_size=1,stride=1,padding=0,dilation=1,bias=False):
        super(SeparableConv2d,self).__init__()

        self.conv1 = nn.Conv2d(in_channels,in_channels,kernel_size,stride,padding,dilation,groups=in_channels,bias=bias)
        self.pointwise = nn.Conv2d(in_channels,out_channels,1,1,0,1,1,bias=bias)

    def forward(self,x):
        x = self.conv1(x)
        x = self.pointwise(x)
        return x


class Block(nn.Module):
    def __init__(self,in_filters,out_filters,reps,strides=1,start_with_relu=True,grow_first=True):
        super(Block, self).__init__()

        if out_filters != in_filters or strides!=1:
            self.skip = nn.Conv2d(in_filters,out_filters,1,stride=strides, bias=False)
            self.skipbn = nn.BatchNorm2d(out_filters)
        else:
            self.skip=None

        self.relu = nn.ReLU(inplace=True)
        rep=[]

        filters=in_filters
        if grow_first:
            rep.append(self.relu)
            rep.append(SeparableConv2d(in_filters,out_filters,3,stride=1,padding=1,bias=False))
            rep.append(nn.BatchNorm2d(out_filters))
            filters = out_filters

        for i in range(reps-1):
            rep.append(self.relu)
            rep.append(SeparableConv2d(filters,filters,3,stride=1,padding=1,bias=False))
            rep.append(nn.BatchNorm2d(filters))

        if not grow_first:
            rep.append(self.relu)
            rep.append(SeparableConv2d(in_filters,out_filters,3,stride=1,padding=1,bias=False))
            rep.append(nn.BatchNorm2d(out_filters))

        if not start_with_relu:
            rep = rep[1:]
        else:
            rep[0] = nn.ReLU(inplace=False)

        if strides != 1:
            rep.append(nn.MaxPool2d(3,strides,1))
        self.rep = nn.Sequential(*rep)

    def forward(self,inp):
        x = self.rep(inp)

        if self.skip is not None:
            skip = self.skip(inp)
            skip = self.skipbn(skip)
        else:
            skip = inp

        x+=skip
        return x


class Xception(nn.Module):
    """
    Xception optimized for the ImageNet dataset, as specified in
    https://arxiv.org/pdf/1610.02357.pdf
    """
    def __init__(self, num_classes=2):
        """ Constructor
        Args:
            num_classes: number of classes
        """
        super(Xception, self).__init__()
        self.num_classes = num_classes

        #self.conv1 = nn.Conv2d(15,32,3,2,0,bias=False)
        self.conv1 = nn.Conv2d(3,32,3,2,0,bias=False)
        self.bn1 = nn.BatchNorm2d(32)
        self.relu = nn.ReLU(inplace=True)

        self.conv2 = nn.Conv2d(32,64,3,bias=False)
        self.bn2 = nn.BatchNorm2d(64)
        #do relu here

        self.block1=Block(64,128,2,2,start_with_relu=False,grow_first=True)
        self.block2=Block(128,256,2,2,start_with_relu=True,grow_first=True)
        self.block3=Block(256,728,2,2,start_with_relu=True,grow_first=True)

        self.block4=Block(728,728,3,1,start_with_relu=True,grow_first=True)
        self.block5=Block(728,728,3,1,start_with_relu=True,grow_first=True)
        self.block6=Block(728,728,3,1,start_with_relu=True,grow_first=True)
        self.block7=Block(728,728,3,1,start_with_relu=True,grow_first=True)

        self.block8=Block(728,728,3,1,start_with_relu=True,grow_first=True)
        self.block9=Block(728,728,3,1,start_with_relu=True,grow_first=True)
        self.block10=Block(728,728,3,1,start_with_relu=True,grow_first=True)
        self.block11=Block(728,728,3,1,start_with_relu=True,grow_first=True)

        self.block12=Block(728,1024,2,2,start_with_relu=True,grow_first=False)

        self.conv3 = SeparableConv2d(1024,1536,3,1,1)
        self.bn3 = nn.BatchNorm2d(1536)

        #do relu here
        self.conv4 = SeparableConv2d(1536,2048,3,1,1)
        self.bn4 = nn.BatchNorm2d(2048)

        self.fc = nn.Linear(2048, num_classes)

        # #------- init weights --------
        # for m in self.modules():
        #     if isinstance(m, nn.Conv2d):
        #         n = m.kernel_size[0] * m.kernel_size[1] * m.out_channels
        #         m.weight.data.normal_(0, math.sqrt(2. / n))
        #     elif isinstance(m, nn.BatchNorm2d):
        #         m.weight.data.fill_(1)
        #         m.bias.data.zero_()
        # #-----------------------------

    def features(self, input):
#         print(input.size())

        x = self.conv1(input) #(32, 299, 299)
#         print(x.size())
        x = self.bn1(x)
        x = self.relu(x)

        x = self.conv2(x) #(64, 299, 299)
        x = self.bn2(x)
        x = self.relu(x)

        x = self.block1(x)
        x = self.block2(x)
        x = self.block3(x)
        x = self.block4(x)
        x = self.block5(x)
        x = self.block6(x)
        x = self.block7(x)
        x = self.block8(x)
        x = self.block9(x)
        x = self.block10(x)
        x = self.block11(x)
        x = self.block12(x) #(1024, 299, 299)

        x = self.conv3(x) #(1536, 299, 299)
        x = self.bn3(x)
        x = self.relu(x)

        x = self.conv4(x) #(2048, 299, 299)
#         print(x.size())


        x = self.bn4(x)
#         print(x.size())

        return x


    def GetEachFeatures(self, input):
#         print(input.size())

        list_feat = []
        feat1 = self.conv1(input) #(32, 299, 299)
#         print(x.size())
        feat1 = self.bn1(feat1)
        feat1 = self.relu(feat1)

        list_feat.append(feat1)
        feat2 = self.conv2(feat1) #(64, 299, 299)
        feat2 = self.bn2(feat2)
        feat3 = self.relu(feat2)
        list_feat.append(feat3)

        feat4 = self.block1(feat3)
        feat5 = self.block2(feat4)
        feat6 = self.block3(feat5)
        feat7 = self.block4(feat6)
        feat8 = self.block5(feat7)
        feat9 = self.block6(feat8)
        feat10 = self.block7(feat9)
        feat11 = self.block8(feat10)
        feat12 = self.block9(feat11)
        feat13 = self.block10(feat12)
        feat14 = self.block11(feat13)
        feat15 = self.block12(feat14) #(1024, 299, 299)

        list_feat.append(feat4)
        list_feat.append(feat5)
        list_feat.append(feat6)
        list_feat.append(feat7)
        list_feat.append(feat8)
        list_feat.append(feat9)
        list_feat.append(feat10)
        list_feat.append(feat11)
        list_feat.append(feat12)
        list_feat.append(feat13)
        list_feat.append(feat14)
        list_feat.append(feat15)


        feat16 = self.conv3(feat15) #(1536, 299, 299)
        feat16 = self.bn3(feat16)
        feat16 = self.relu(feat16)
        list_feat.append(feat16)

        feat17 = self.conv4(feat16) #(2048, 299, 299)
#         print(x.size())


        feat17 = self.bn4(feat17)
#         print(x.size())
        list_feat.append(feat17)
        return list_feat

    def logits(self, features):
        x = self.relu(features)

        x = F.adaptive_avg_pool2d(x, (1, 1))
        x = x.view(x.size(0), -1)
        x = self.last_linear(x)
        return x

    def forward(self, input):
        feat = self.features(input)
        x = self.logits(feat)
        # return feat, x
        return x



class Xception_concat(nn.Module):
    """
    Xception optimized for the ImageNet dataset, as specified in
    https://arxiv.org/pdf/1610.02357.pdf
    """
    def __init__(self, num_classes=2):
        """ Constructor
        Args:
            num_classes: number of classes
        """
        super(Xception_concat, self).__init__()
        self.num_classes = num_classes

        self.conv1 = nn.Conv2d(15,32,3,2,0,bias=False)
        self.bn1 = nn.BatchNorm2d(32)
        self.relu = nn.ReLU(inplace=True)

        self.conv2 = nn.Conv2d(32,64,3,bias=False)
        self.bn2 = nn.BatchNorm2d(64)
        #do relu here

        self.block1=Block(64,128,2,2,start_with_relu=False,grow_first=True)
        self.block2=Block(128,256,2,2,start_with_relu=True,grow_first=True)
        self.block3=Block(256,728,2,2,start_with_relu=True,grow_first=True)

        self.block4=Block(728,728,3,1,start_with_relu=True,grow_first=True)
        self.block5=Block(728,728,3,1,start_with_relu=True,grow_first=True)
        self.block6=Block(728,728,3,1,start_with_relu=True,grow_first=True)
        self.block7=Block(728,728,3,1,start_with_relu=True,grow_first=True)

        self.block8=Block(728,728,3,1,start_with_relu=True,grow_first=True)
        self.block9=Block(728,728,3,1,start_with_relu=True,grow_first=True)
        self.block10=Block(728,728,3,1,start_with_relu=True,grow_first=True)
        self.block11=Block(728,728,3,1,start_with_relu=True,grow_first=True)

        self.block12=Block(728,1024,2,2,start_with_relu=True,grow_first=False)

        self.conv3 = SeparableConv2d(1024,1536,3,1,1)
        self.bn3 = nn.BatchNorm2d(1536)

        #do relu here
        self.conv4 = SeparableConv2d(1536,2048,3,1,1)
        self.bn4 = nn.BatchNorm2d(2048)

        self.fc = nn.Linear(2048, num_classes)

        # #------- init weights --------
        # for m in self.modules():
        #     if isinstance(m, nn.Conv2d):
        #         n = m.kernel_size[0] * m.kernel_size[1] * m.out_channels
        #         m.weight.data.normal_(0, math.sqrt(2. / n))
        #     elif isinstance(m, nn.BatchNorm2d):
        #         m.weight.data.fill_(1)
        #         m.bias.data.zero_()
        # #-----------------------------

    def features(self, input):
        x = self.conv1(input) #(32, 299, 299)
        x = self.bn1(x)
        x = self.relu(x)

        x = self.conv2(x) #(64, 299, 299)
        x = self.bn2(x)
        x = self.relu(x)

        x = self.block1(x)
        x = self.block2(x)
        x = self.block3(x)
        x = self.block4(x)
        x = self.block5(x)
        x = self.block6(x)
        x = self.block7(x)
        x = self.block8(x)
        x = self.block9(x)
        x = self.block10(x)
        x = self.block11(x)
        x = self.block12(x) #(1024, 299, 299)

        x = self.conv3(x) #(1536, 299, 299)
        x = self.bn3(x)
        x = self.relu(x)

        x = self.conv4(x) #(2048, 299, 299)
        x = self.bn4(x)
        return x

    def logits(self, features):
        x = self.relu(features)

        x = F.adaptive_avg_pool2d(x, (1, 1))
        x = x.view(x.size(0), -1)
        x = self.last_linear(x)
        return x

    def forward(self, input):
        x = self.features(input)
        x = self.logits(x)
        return x

'''
    def features(self, input):
        x = self.conv1(input)
        x = self.bn1(x)
        x = self.relu(x)

        x = self.conv2(x)
        x = self.bn2(x)
        x = self.relu(x)

        x = self.block1(x)
        x = self.block2(x)
        x = self.block3(x)
        x = self.block4(x)
        x = self.block5(x)
        x = self.block6(x)
        x = self.block7(x)
        x = self.block8(x)
        x = self.block9(x)
        x = self.block10(x)
        x = self.block11(x)
        x = self.block12(x)

        x = self.conv3(x)
        x = self.bn3(x)
        x = self.relu(x)

        conv4_x = self.conv4(x)
        x = self.bn4(conv4_x)
        return x, conv4_x

    def logits(self, features):
        x = self.relu(features)

        x = F.adaptive_avg_pool2d(x, (1, 1))
        x = x.view(x.size(0), -1)
        x = self.last_linear(x)
        return x

    def forward(self, input):
        #x = self.features(input)
        x, conv4_x = self.features(input)
        x = self.logits(x)
        #x = self.logits(x)
        return x, conv4_x
        '''


def xception(num_classes=1000, pretrained='imagenet'):
    model = Xception(num_classes=num_classes)
    if pretrained:
        settings = pretrained_settings['xception'][pretrained]
        assert num_classes == settings['num_classes'], \
            "num_classes should be {}, but is {}".format(settings['num_classes'], num_classes)

        model = Xception(num_classes=num_classes)
        model.load_state_dict(model_zoo.load_url(settings['url']))

        model.input_space = settings['input_space']
        model.input_size = settings['input_size']
        model.input_range = settings['input_range']
        model.mean = settings['mean']
        model.std = settings['std']

    # TODO: ugly
    model.last_linear = model.fc
    del model.fc
    return model

def xception_concat(num_classes=1000):
    model = Xception_concat(num_classes=num_classes)
    # TODO: ugly
    model.last_linear = model.fc
    del model.fc
    return model

In [6]:
#@title 4.1. Break test dataset into real dataset
import os
import shutil
from sklearn.model_selection import train_test_split

# Directory where your original folders are located
base_dir = '/content/test_dataset'

# Directories for the split datasets
test_dir = 'xception-frames'

# Define your original folders and whether they're real or fake
folders = {
    'RealVideo-RealAudio': 'real',
    'RealVideo-FakeAudio': 'real',
    'FakeVideo-RealAudio': 'fake',
    'FakeVideo-FakeAudio': 'fake',
}

# Create lists to hold video paths and labels
video_paths = []
labels = []

# Populate lists with video paths and corresponding labels
for folder, label in folders.items():
    folder_path = os.path.join(base_dir, folder)
    for video in os.listdir(folder_path):
        video_paths.append(os.path.join(folder_path, video))
        labels.append(label)

# Function to create directories and copy videos into them
def organise_videos(video_paths, video_labels, directory):
    for path, label in zip(video_paths, video_labels):
        subfolder = os.path.join(directory, label)
        os.makedirs(subfolder, exist_ok=True)
        # Determine new file name (add "_fake" for fake videos)
        file_name = os.path.basename(path)
        destination_path = os.path.join(subfolder, file_name)
        shutil.copy(path, destination_path)

organise_videos(video_paths, labels, test_dir)

In [None]:
#@title 4.2. Extract and Replace on new dataset
import cv2
import os
import numpy as np

def extract_and_replace_frames(base_dir):
    for category in ['real', 'fake']:  # The two main categories in your folder structure
        category_path = os.path.join(base_dir, category)
        frame_counter = 1  # Initialize frame counter for each category

        for video_name in os.listdir(category_path):
            if not video_name.endswith(('.mp4', '.avi', '.mov')):  # Check if the file is a video
                continue  # Skip non-video files

            video_path = os.path.join(category_path, video_name)
            cap = cv2.VideoCapture(video_path)

            # Check if video opened successfully
            if not cap.isOpened():
                print(f"Error opening video file {video_name}")
                continue

            total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
            frames_to_extract = np.linspace(0, total_frames-1, 75, dtype=int)

            for f_id in frames_to_extract:
                cap.set(cv2.CAP_PROP_POS_FRAMES, f_id)
                ret, frame = cap.read()
                if ret:
                    # Save the frame in the category directory with a unique name
                    frame_filename = os.path.join(category_path, f"frame_{frame_counter:03d}.jpg")
                    cv2.imwrite(frame_filename, frame)
                    frame_counter += 1
                else:
                    print(f"Error reading frame {f_id} from {video_name}")

            # Release the video capture object and delete the original video
            cap.release()
            os.remove(video_path)
            print(f"Processed and removed {video_name}")

base__test__dir = '/content/xception-frames'

extract_and_replace_frames(base__test__dir)

In [8]:
#@title 4.3. Download all pretrained models
import gdown

os.makedirs('checkpoints', exist_ok=True)
# Xception pretrained
if not os.path.isfile('checkpoints/Xception_realA_fakeC.pt'):
  gdown.download('https://drive.google.com/file/d/1vxZKL98EgCGkD_SmhU4UDzEaCzh0wHdH/view?usp=drive_link', fuzzy=True)
  !mv Xception_realA_fakeC.pt checkpoints/Xception_realA_fakeC.pt

# Capsule pretrained
if not os.path.isfile('checkpoints/capsule_5.pt'):
  gdown.download('https://drive.google.com/file/d/1dixLATA96v1PuINWjeu4I4CYz93NyEiw/view?usp=drive_link', fuzzy=True)
  !mv capsule_5.pt checkpoints/capsule_5.pt


# MesoInceptionNet4 pretrained
if not os.path.isfile('checkpoints/MesoInception4_realA_fakeC.pt'):
  gdown.download('https://drive.google.com/file/d/1yicS_UhOyNi2QL_OTvN2tzhJfnLYvicr/view?usp=drive_link', fuzzy=True)
  !mv MesoInception4_realA_fakeC.pt checkpoints/MesoInception4_realA_fakeC.pt


Downloading...
From (original): https://drive.google.com/uc?id=1vxZKL98EgCGkD_SmhU4UDzEaCzh0wHdH
From (redirected): https://drive.google.com/uc?id=1vxZKL98EgCGkD_SmhU4UDzEaCzh0wHdH&confirm=t&uuid=4c713374-df09-4c48-8315-76de3ff8bd90
To: /content/Xception_realA_fakeC.pt
100%|██████████| 83.6M/83.6M [00:03<00:00, 25.7MB/s]
Downloading...
From: https://drive.google.com/uc?id=1dixLATA96v1PuINWjeu4I4CYz93NyEiw
To: /content/capsule_5.pt
100%|██████████| 6.38M/6.38M [00:00<00:00, 11.9MB/s]
Downloading...
From: https://drive.google.com/uc?id=1P6y72N5XUyuuRy5nQJlYNG6L6ei_ZVgq
To: /content/Meso4_realA_fakeC.pt
100%|██████████| 103k/103k [00:00<00:00, 51.2MB/s]


In [36]:
#@title 4.4.1. Evaluation Xception

import torch.nn as nn
import torch.utils.data as data
import torchvision.transforms as transforms
import torchvision.datasets as datasets
import copy
import time
import sklearn.metrics as metrics
import matplotlib.pyplot as plt
import os
#############################EVAL##############################
from sklearn.metrics import classification_report,fbeta_score
from sklearn.preprocessing import OneHotEncoder
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import accuracy_score
import numpy as np


def Eval_Xception(args):
    LIST_SELECT = ('VIDEO' if os.path.exists(args.path_video) or not args.path_video else '', 'AUDIO' if os.path.exists(args.path_audio) or not args.path_audio else '')
    assert (LIST_SELECT[0]!='' and LIST_SELECT[1]!='', 'At least one path must be typed')
    BATCH_SIZE = args.batch_size
    pretrained_size = 224
    pretrained_means = [0.4489, 0.3352, 0.3106]#[0.485, 0.456, 0.406]
    pretrained_stds= [0.2380, 0.1965, 0.1962]#[0.229, 0.224, 0.225]

    for MODE in LIST_SELECT:
        test_dir, load_dir = '', ''
        if MODE == 'VIDEO':
            test_dir, load_dir = args.path_video, args.path_video_model
        elif MODE == 'AUDIO':
            test_dir, load_dir = args.path_audio, args.path_audio_model
        assert(os.path.exists(test_dir) and os.path.exists(load_dir) ,'wrong path param !!!')

        test_transforms = transforms.Compose([
                                   transforms.Resize((pretrained_size,pretrained_size)),
                                   transforms.ToTensor(),
                                   transforms.Normalize(mean = pretrained_means,
                                                        std = pretrained_stds)
                               ])

        test_data = datasets.ImageFolder(root = test_dir,
                                         transform = test_transforms)

        print(f'Number of testing examples: {len(test_data)}')

        test_iterator = data.DataLoader(test_data,
                                        shuffle = True,
                                        batch_size = BATCH_SIZE)

        for i, (inputs, labels) in enumerate(test_iterator):
            print(f"Batch {i}")
            print(f"Input batch shape: {inputs.shape}")
            print(f"Label batch shape: {labels.shape}")
            # Optionally, print data types and a sample of the data
            print(f"Input data type: {inputs.dtype}")
            print(f"Label data type: {labels.dtype}")
            print(f"First input example: {inputs[0]}")
            print(f"First label: {labels[0]}")

            if i == 0:  # Remove or adjust this if-block to inspect more batches
                break

        model = xception(num_classes=2, pretrained='')
        if len(args.num_gpu) > 1:
            model = nn.DataParallel(model)
        # model.load_state_dict(torch.load(load_dir)['state_dict'])
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        model.load_state_dict(torch.load(load_dir, map_location=device)['state_dict'])
        # device = torch.device("cpu")
        model = model.to(device)

        print("eval...")
        start_time = time.monotonic()

        def EVAL_classification(model, test_iterator, device):
            label_encoder = LabelEncoder()
            enc = OneHotEncoder(sparse=False)

            y_true=np.zeros((0,2),dtype=np.int8)
            y_pred=np.zeros((0,2),dtype=np.int8)
            y_true_auc = []
            y_pred_auc = []

            model.eval()
            for i, data in enumerate(test_iterator):
                with torch.no_grad():
                    in_1 = data[0].to(device)
                    _y_pred = model(in_1)
                    _y_pred = _y_pred.cpu().detach()

                    _pred = copy.deepcopy(_y_pred).detach().cpu()#.tolist()
                    _true = copy.deepcopy(data[1]).detach().cpu().float().tolist()
                    [y_pred_auc.append(_a) for _a in _pred[:,1]]
                    [y_true_auc.append(_a) for _a in _true]


                    # integer_encoded = label_encoder.fit_transform(data[1].detach().cpu())
                    # integer_encoded = integer_encoded.reshape(len(integer_encoded), 1)

                    # onehot_encoded = enc.fit_transform(integer_encoded)
                    # onehot_encoded = onehot_encoded.astype(np.int8)

                    def manual_one_hot(labels, num_classes=2):
                        # labels: tensor of shape (batch_size,) containing labels for each instance in the batch
                        # num_classes: number of classes, 2 for binary classification
                        one_hot_encoded = torch.zeros((len(labels), num_classes), dtype=torch.int8)
                        for i, label in enumerate(labels):
                            one_hot_encoded[i, label] = 1
                        return one_hot_encoded  # Convert to numpy array if needed (not needed here)

                    integer_encoded = label_encoder.fit_transform(data[1].detach().cpu())
                    _y_true = manual_one_hot(torch.tensor(integer_encoded), num_classes=2)

                    # _y_true = torch.tensor(onehot_encoded)
                    # print(f'One Hot Encoded Shape: {onehot_encoded.shape}')
                    _y_true_argmax = _y_true.argmax(1)
                    print(f'_y_true_argmax {_y_true_argmax}')
                    _y_true = np.array(torch.zeros(_y_true.shape).scatter(1, _y_true_argmax.unsqueeze(1),1),dtype=np.int8)
                    print(f'y_true shape: {y_true.shape} and _y_true shape: _{y_true.shape}')
                    y_true = np.concatenate((y_true,_y_true))

                    a = _y_pred.argmax(1)
                    _y_pred = np.array(torch.zeros(_y_pred.shape).scatter(1, a.unsqueeze(1), 1),dtype=np.int8)
                    y_pred = np.concatenate((y_pred,_y_pred))

            result = classification_report(y_true, y_pred, labels=None, target_names=None, sample_weight=None, digits=4, output_dict=False, zero_division='warn')
            print(result)
            # Evaluate and display results

            print(f'ACC is {accuracy_score(y_true, y_pred)}')
            f2_score_micro = fbeta_score(y_true, y_pred, beta=2, average='micro')
            f2_score_macro = fbeta_score(y_true, y_pred, beta=2, average='macro')
            f2_score_weighted = fbeta_score(y_true, y_pred, beta=2, average='weighted')

            print(f'F2 Micro is {f2_score_micro}')
            print(f'F2 Macro is {f2_score_macro}')
            print(f'F2 Weighted is {f2_score_weighted}')
            y_true_auc, y_pred_auc = np.array(y_true_auc),np.array(y_pred_auc)
            print(y_true_auc.shape, y_pred_auc.shape)
            fpr = dict()
            tpr = dict()
            roc_auc = dict()
            for i in range(2):
                fpr[i], tpr[i], _ = metrics.roc_curve(y_true_auc,y_pred_auc)
                roc_auc[i] = metrics.auc(fpr[i], tpr[i])

            # Compute micro-average ROC curve and ROC area
            fpr["micro"], tpr["micro"], _ = metrics.roc_curve(y_true_auc,y_pred_auc)
            roc_auc["micro"] = metrics.auc(fpr["micro"], tpr["micro"])

            lw = 2
            print('ROC : {:.4f}'.format(roc_auc[1]))


        predictions = EVAL_classification(model,test_iterator,device)
        end_time = time.monotonic()
        total_time = end_time - start_time
        print(total_time)
        return predictions

  assert (LIST_SELECT[0]!='' and LIST_SELECT[1]!='', 'At least one path must be typed')
  assert(os.path.exists(test_dir) and os.path.exists(load_dir) ,'wrong path param !!!')


In [37]:
#@title 4.4.2. Run Evaluation on Xception

class Args:
    def __init__(self):
        self.path_video = '/content/xception-frames'
        self.path_audio = ''  # Leave empty if not used
        self.batch_size = 32  # Batch size for evaluation
        # Add or modify any additional parameters needed for Eval
        self.path_video_model = 'checkpoints/Xception_realA_fakeC.pt'  # Add the path to your video model
        self.path_audio_model = ''  # Add the path to your audio model if needed
        self.num_gpu = [0]  # Adjust based on your GPU setup

# Instantiate the Args object
args = Args()

# Call the Eval function with the Args instance
Eval_Xception(args)


Number of testing examples: 12000
Batch 0
Input batch shape: torch.Size([32, 3, 224, 224])
Label batch shape: torch.Size([32])
Input data type: torch.float32
Label data type: torch.int64
First input example: tensor([[[-0.6833, -0.6668, -0.6833,  ...,  0.4866,  0.4866,  0.4866],
         [-1.0458, -1.0293, -1.0293,  ...,  0.4701,  0.4866,  0.4701],
         [-0.9469, -0.9634, -0.9634,  ...,  0.4701,  0.5031,  0.4701],
         ...,
         [-1.3424, -1.3753, -1.3753,  ..., -0.9634, -0.9469, -0.9469],
         [-1.3424, -1.3753, -1.3918,  ..., -0.9634, -0.9469, -0.9469],
         [-1.3424, -1.3753, -1.3753,  ..., -0.9634, -0.9469, -0.9469]],

        [[-0.4685, -0.4486, -0.4685,  ...,  0.0703,  0.0703,  0.0703],
         [-0.9076, -0.8876, -0.8876,  ...,  0.0504,  0.0304,  0.0504],
         [-0.7878, -0.8078, -0.8078,  ...,  0.0304,  0.0105,  0.0304],
         ...,
         [-1.1870, -1.2269, -1.2069,  ..., -0.6681, -0.6481, -0.6481],
         [-1.1670, -1.2069, -1.2269,  ..., -0.6681, 

In [27]:
#@title 4.5.1. Meso4 and MesoInception4 models

import os
import argparse


import torch
import torch.nn as nn
import torch.nn.functional as F
import math
import torchvision

class Meso4(nn.Module):
    def __init__(self, num_classes=2):
        super(Meso4, self).__init__()
        self.num_classes = num_classes
        self.conv1 = nn.Conv2d(3, 8, 3, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(8)
        self.relu = nn.ReLU(inplace=True)
        self.leakyrelu = nn.LeakyReLU(0.1)
        self.conv2 = nn.Conv2d(8, 8, 5, padding=2, bias=False)
        self.bn2 = nn.BatchNorm2d(16)
        self.conv3 = nn.Conv2d(8, 16, 5, padding=2, bias=False)
        self.conv4 = nn.Conv2d(16, 16, 5, padding=2, bias=False)
        self.maxpooling1 = nn.MaxPool2d(kernel_size=(2, 2))
        self.maxpooling2 = nn.MaxPool2d(kernel_size=(4, 4))
        #flatten: x = x.view(x.size(0), -1)
        self.dropout = nn.Dropout(0.5)
        self.fc1 = nn.Linear(16*7*7, 16)
        self.fc2 = nn.Linear(16, num_classes)



    def forward(self, input):
        x = self.conv1(input) #(8, 256, 256)
        x = self.relu(x)
        x = self.bn1(x)
        x = self.maxpooling1(x) #(8, 128, 128)
        x = self.conv2(x) #(8, 128, 128)
        x = self.relu(x)
        x = self.bn1(x)
        x = self.maxpooling1(x) #(8, 64, 64)
        x = self.conv3(x) #(16, 64, 64)
        x = self.relu(x)
        x = self.bn2(x)
        x = self.maxpooling1(x) #(16, 32, 32)
        x = self.conv4(x) #(16, 32, 32)
        x = self.relu(x)
        x = self.bn2(x)
        x = self.maxpooling2(x) #(16, 8, 8)
        x = x.view(x.size(0), -1) #(Batch, 16*8*8)
        # x = self.fla(x)
        x = self.fc1(x) #(Batch, 16)
        x = self.leakyrelu(x)
        x = self.dropout(x)
        x = self.fc2(x)
        return x


class MesoInception4(nn.Module):
	"""
	Pytorch Implemention of MesoInception4
	Author: Honggu Liu
	Date: July 7, 2019
	"""
	def __init__(self, num_classes=2):
		super(MesoInception4, self).__init__()
		self.num_classes = num_classes
		#InceptionLayer1
		self.Incption1_conv1 = nn.Conv2d(3, 1, 1, padding=0, bias=False)
		self.Incption1_conv2_1 = nn.Conv2d(3, 4, 1, padding=0, bias=False)
		self.Incption1_conv2_2 = nn.Conv2d(4, 4, 3, padding=1, bias=False)
		self.Incption1_conv3_1 = nn.Conv2d(3, 4, 1, padding=0, bias=False)
		self.Incption1_conv3_2 = nn.Conv2d(4, 4, 3, padding=2, dilation=2, bias=False)
		self.Incption1_conv4_1 = nn.Conv2d(3, 2, 1, padding=0, bias=False)
		self.Incption1_conv4_2 = nn.Conv2d(2, 2, 3, padding=3, dilation=3, bias=False)
		self.Incption1_bn = nn.BatchNorm2d(11)


		#InceptionLayer2
		self.Incption2_conv1 = nn.Conv2d(11, 2, 1, padding=0, bias=False)
		self.Incption2_conv2_1 = nn.Conv2d(11, 4, 1, padding=0, bias=False)
		self.Incption2_conv2_2 = nn.Conv2d(4, 4, 3, padding=1, bias=False)
		self.Incption2_conv3_1 = nn.Conv2d(11, 4, 1, padding=0, bias=False)
		self.Incption2_conv3_2 = nn.Conv2d(4, 4, 3, padding=2, dilation=2, bias=False)
		self.Incption2_conv4_1 = nn.Conv2d(11, 2, 1, padding=0, bias=False)
		self.Incption2_conv4_2 = nn.Conv2d(2, 2, 3, padding=3, dilation=3, bias=False)
		self.Incption2_bn = nn.BatchNorm2d(12)

		#Normal Layer
		self.conv1 = nn.Conv2d(12, 16, 5, padding=2, bias=False)
		self.relu = nn.ReLU(inplace=True)
		self.leakyrelu = nn.LeakyReLU(0.1)
		self.bn1 = nn.BatchNorm2d(16)
		self.maxpooling1 = nn.MaxPool2d(kernel_size=(2, 2))

		self.conv2 = nn.Conv2d(16, 16, 5, padding=2, bias=False)
		self.maxpooling2 = nn.MaxPool2d(kernel_size=(4, 4))

		self.dropout = nn.Dropout2d(0.5)
		self.fc1 = nn.Linear(16*7*7, 16)
		self.fc2 = nn.Linear(16, num_classes)


	#InceptionLayer
	def InceptionLayer1(self, input):
		x1 = self.Incption1_conv1(input)
		x2 = self.Incption1_conv2_1(input)
		x2 = self.Incption1_conv2_2(x2)
		x3 = self.Incption1_conv3_1(input)
		x3 = self.Incption1_conv3_2(x3)
		x4 = self.Incption1_conv4_1(input)
		x4 = self.Incption1_conv4_2(x4)
		y = torch.cat((x1, x2, x3, x4), 1)
		y = self.Incption1_bn(y)
		y = self.maxpooling1(y)

		return y

	def InceptionLayer2(self, input):
		x1 = self.Incption2_conv1(input)
		x2 = self.Incption2_conv2_1(input)
		x2 = self.Incption2_conv2_2(x2)
		x3 = self.Incption2_conv3_1(input)
		x3 = self.Incption2_conv3_2(x3)
		x4 = self.Incption2_conv4_1(input)
		x4 = self.Incption2_conv4_2(x4)
		y = torch.cat((x1, x2, x3, x4), 1)
		y = self.Incption2_bn(y)
		y = self.maxpooling1(y)

		return y

	def forward(self, input):
		x = self.InceptionLayer1(input) #(Batch, 11, 128, 128)
		x = self.InceptionLayer2(x) #(Batch, 12, 64, 64)

		x = self.conv1(x) #(Batch, 16, 64 ,64)
		x = self.relu(x)
		x = self.bn1(x)
		x = self.maxpooling1(x) #(Batch, 16, 32, 32)

		x = self.conv2(x) #(Batch, 16, 32, 32)
		x = self.relu(x)
		x = self.bn1(x)
		x = self.maxpooling2(x) #(Batch, 16, 8, 8)

		x = x.view(x.size(0), -1) #(Batch, 16*8*8)
		x = self.dropout(x)
		x = self.fc1(x) #(Batch, 16)
		x = self.leakyrelu(x)
		x = self.dropout(x)
		x = self.fc2(x)

		return x


In [None]:
#@title 4.5.2. Meso4 Evaluation

import torch.utils.data as data
import torchvision.transforms as transforms
import torchvision.datasets as datasets
import copy
import time
# from utils.Common_Function import *
import sklearn.metrics as metrics
import matplotlib.pyplot as plt
#############################EVAL##############################
from sklearn.metrics import classification_report
from sklearn.preprocessing import OneHotEncoder
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import accuracy_score
import torch.nn as nn

def Eval_MesoNet(args):
    print(args.path_audio)
    LIST_SELECT = ('VIDEO' if os.path.exists(args.path_video) else '', 'AUDIO' if os.path.exists(args.path_audio) else '')
    assert (LIST_SELECT[0]!='' and LIST_SELECT[1]!='', 'At least one path must be typed')
    BATCH_SIZE = args.batch_size
    pretrained_size = 224
    pretrained_means = [0.4489, 0.3352, 0.3106]#[0.485, 0.456, 0.406]
    pretrained_stds= [0.2380, 0.1965, 0.1962]#[0.229, 0.224, 0.225]

    test_dir, load_dir = args.path_video, args.path_video_model

    assert(os.path.exists(test_dir) and os.path.exists(load_dir) ,'wrong path param !!!')
    print(len(test_dir))
    print(len(load_dir))
    test_transforms = transforms.Compose([
                                transforms.Resize((pretrained_size,pretrained_size)),
                                transforms.ToTensor(),
                                transforms.Normalize(mean = pretrained_means,
                                                    std = pretrained_stds)
                            ])
    test_data = datasets.ImageFolder(root = test_dir,
                                      transform = test_transforms)

    print(f'Number of testing examples: {len(test_data)}')

    test_iterator = data.DataLoader(test_data,
                                    shuffle = True,
                                    batch_size = BATCH_SIZE)
    model = Meso4()
    model.load_state_dict(torch.load(load_dir)['state_dict'])
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    if len(args.num_gpu) > 1:
        model = nn.DataParallel(model)
    model = model.to(device)
    print("eval...")
    start_time = time.monotonic()

    def EVAL_classification(model, test_iterator, device):
        label_encoder = LabelEncoder()
        enc = OneHotEncoder(sparse=False)

        y_true=np.zeros((0,2),dtype=np.int8)
        y_pred=np.zeros((0,2),dtype=np.int8)
        y_true_auc = []
        y_pred_auc = []

        model.eval()
        for i, data in enumerate(test_iterator):
            with torch.no_grad():
                in_1 = data[0].to(device)
                _y_pred = model(in_1).cpu().detach()

                _pred = copy.deepcopy(_y_pred).detach().cpu()#.tolist()
                _true = copy.deepcopy(data[1]).detach().cpu().float().tolist()
                [y_pred_auc.append(_a) for _a in _pred[:,1]]
                [y_true_auc.append(_a) for _a in _true]

                integer_encoded = label_encoder.fit_transform(data[1].detach().cpu())
                integer_encoded = integer_encoded.reshape(len(integer_encoded), 1)

                onehot_encoded = enc.fit_transform(integer_encoded)
                onehot_encoded = onehot_encoded.astype(np.int8)

                _y_true = torch.tensor(onehot_encoded)
                _y_true_argmax = _y_true.argmax(1)
                _y_true = np.array(torch.zeros(_y_true.shape).scatter(1, _y_true_argmax.unsqueeze(1),1),dtype=np.int8)
                y_true = np.concatenate((y_true,_y_true))

                a = _y_pred.argmax(1)
                _y_pred = np.array(torch.zeros(_y_pred.shape).scatter(1, a.unsqueeze(1), 1),dtype=np.int8)
                y_pred = np.concatenate((y_pred,_y_pred))

        result = metrics.classification_report(y_true, y_pred, labels=None, target_names=None, sample_weight=None, digits=4, output_dict=False, zero_division='warn')
        # Calculating metrics
        accuracy = accuracy_score(y_true, y_pred)
        roc_auc = roc_auc_score(y_true, y_pred)
        f2 = fbeta_score(y_true, y_pred, beta=2, average='macro')

        print("Evaluation Results:")
        print(classification_report(y_true, y_pred, target_names=['real', 'fake']))
        print(f"Accuracy: {accuracy:.4f}")
        print(f"ROC AUC: {roc_auc:.4f}")
        print(f"F2 Score: {f2:.4f}")

    EVAL_classification(model,test_iterator,device)
    end_time = time.monotonic()
    print(end_time - start_time)


  assert (LIST_SELECT[0]!='' and LIST_SELECT[1]!='', 'At least one path must be typed')
  assert(os.path.exists(test_dir) and os.path.exists(load_dir) ,'wrong path param !!!')


In [None]:
#@title 4.5.3. Run Evaluation on Meso4

class Args:
    def __init__(self):
        self.path_video = '/content/xception-frames'
        self.path_audio = ''  # Leave empty if not used
        self.batch_size = 32  # Batch size for evaluation
        # Add or modify any additional parameters needed for Eval
        # self.path_video_model = 'checkpoints/Meso4_realA_fakeC.pt'  # Add the path to your video model
        self.num_gpu = [0]  # Adjust based on your GPU setup

if __name__ == "__main__":
    # Instantiate the Args object only if running as a standalone script
    args = Args()

    # Call the Eval function with the Args instance
    Eval_MesoNet(args)

NameError: name 'Eval_MesoNet' is not defined

In [24]:
#@title 4.6.1. MesoInception4 Model
import torch.nn as nn
import torch.utils.data as data
import torchvision.transforms as transforms
import torchvision.datasets as datasets
import copy
import time
# from utils.Common_Function import *
# from models.MesoNet import  MesoInception4
import sklearn.metrics as metrics
import matplotlib.pyplot as plt
#############################EVAL##############################
from sklearn.metrics import classification_report,fbeta_score, roc_auc_score
from sklearn.preprocessing import OneHotEncoder
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import accuracy_score
import torch.nn as nn

def Eval_MesoInceptionNet4(args):
    LIST_SELECT = ('VIDEO' if os.path.exists(args.path_video) else '', 'AUDIO' if os.path.exists(args.path_audio) else '')
    assert (LIST_SELECT[0]!='' and LIST_SELECT[1]!='', 'At least one path must be typed')
    print(LIST_SELECT)
    BATCH_SIZE = args.batch_size
    pretrained_size = 224
    pretrained_means = [0.4489, 0.3352, 0.3106]#[0.485, 0.456, 0.406]
    pretrained_stds= [0.2380, 0.1965, 0.1962]#[0.229, 0.224, 0.225]
    test_dir, load_dir = args.path_video, args.path_video_model
    test_transforms = transforms.Compose([
        transforms.Resize((pretrained_size,pretrained_size)),
        transforms.ToTensor(),
        transforms.Normalize(mean = pretrained_means,
                            std = pretrained_stds)
    ])
    test_data = datasets.ImageFolder(root = test_dir,
                                      transform = test_transforms)

    print(f'Number of testing examples: {len(test_data)}')

    test_iterator = data.DataLoader(test_data,
                                    shuffle = True,
                                    batch_size = BATCH_SIZE)
    model = MesoInception4()
    model.load_state_dict(torch.load(load_dir)['state_dict'])
    if len(args.num_gpu) > 1:
        model = nn.DataParallel(model)
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = model.to(device)

    print("eval...")
    start_time = time.monotonic()

    def EVAL_classification(model, test_iterator, device):
        label_encoder = LabelEncoder()
        enc = OneHotEncoder(sparse=False)

        y_true=np.zeros((0,2),dtype=np.int8)
        y_pred=np.zeros((0,2),dtype=np.int8)
        y_true_auc = []
        y_pred_auc = []

        model.eval()
        for i, data in enumerate(test_iterator):
            with torch.no_grad():
                in_1 = data[0].to(device)
                _y_pred = model(in_1).cpu().detach()

                _pred = copy.deepcopy(_y_pred).detach().cpu()#.tolist()
                _true = copy.deepcopy(data[1]).detach().cpu().float().tolist()
                [y_pred_auc.append(_a) for _a in _pred[:,1]]
                [y_true_auc.append(_a) for _a in _true]

                integer_encoded = label_encoder.fit_transform(data[1].detach().cpu())
                integer_encoded = integer_encoded.reshape(len(integer_encoded), 1)

                onehot_encoded = enc.fit_transform(integer_encoded)
                onehot_encoded = onehot_encoded.astype(np.int8)

                _y_true = torch.tensor(onehot_encoded)
                _y_true_argmax = _y_true.argmax(1)
                _y_true = np.array(torch.zeros(_y_true.shape).scatter(1, _y_true_argmax.unsqueeze(1),1),dtype=np.int8)
                y_true = np.concatenate((y_true,_y_true))

                a = _y_pred.argmax(1)
                _y_pred = np.array(torch.zeros(_y_pred.shape).scatter(1, a.unsqueeze(1), 1),dtype=np.int8)
                y_pred = np.concatenate((y_pred,_y_pred))

        result = classification_report(y_true, y_pred, labels=None, target_names=None, sample_weight=None, digits=4, output_dict=False, zero_division='warn')
        # Calculating metrics
        accuracy = accuracy_score(y_true, y_pred)
        roc_auc = roc_auc_score(y_true, y_pred)
        f2 = fbeta_score(y_true, y_pred, beta=2,average='macro')

        print("Evaluation Results:")
        print(classification_report(y_true, y_pred, target_names=['real', 'fake']))
        print(f"Accuracy: {accuracy:.4f}")
        print(f"ROC AUC: {roc_auc:.4f}")
        print(f"F2 Score: {f2:.4f}")
    EVAL_classification(model,test_iterator,device)
    end_time = time.monotonic()
    print(end_time - start_time)

  assert (LIST_SELECT[0]!='' and LIST_SELECT[1]!='', 'At least one path must be typed')


In [28]:
#@title 4.6.2. Run Evaluation on MesoInception4

class Args:
    def __init__(self):
        self.path_video = '/content/xception-frames'
        self.path_audio = ''  # Leave empty if not used
        self.batch_size = 32  # Batch size for evaluation
        # Add or modify any additional parameters needed for Eval
        self.path_video_model = 'checkpoints/MesoInception4_realA_fakeC.pt'
        self.num_gpu = [0]  # Adjust based on your GPU setup

if __name__ == "__main__":
    # Instantiate the Args object only if running as a standalone script
    args = Args()

    # Call the Eval function with the Args instance
    Eval_MesoInceptionNet4(args)

('VIDEO', '')
Number of testing examples: 12000
eval...




Evaluation Results:
              precision    recall  f1-score   support

        real       0.83      0.93      0.88      6000
        fake       0.92      0.82      0.87      6000

   micro avg       0.87      0.87      0.87     12000
   macro avg       0.88      0.87      0.87     12000
weighted avg       0.88      0.87      0.87     12000
 samples avg       0.87      0.87      0.87     12000

Accuracy: 0.8732
ROC AUC: 0.8732
F2 Score: 0.8723
21.57380238299993




In [None]:
#@title 5.0. Evaluate Ad-Hoc video(s)
# To test how well these models generalise, a new dataset can be passed in
# must be structured in the following way in order to work

# MAKE THIS CUSTOMISABLE AND INPUTTABLE
import os
import numpy as np
import matplotlib.pyplot as plt
import librosa
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVC
from sklearn.metrics import roc_curve, auc
import joblib
import cv2
from skimage.feature import local_binary_pattern
import librosa
from sklearn.preprocessing import normalize
from skimage.feature import local_binary_pattern
from moviepy.video.io.ffmpeg_tools import ffmpeg_extract_audio

def evaluate_pretrained_model():
    # # Set your input directory containing fake and real videos
    # input_dir = "debugging_dataset"

    # # Load data
    # X_video, X_audio, y = load_data(input_dir)
    # print('Loaded data.')

    # # Split data into training and testing sets
    # X_video_train, X_video_test, X_audio_train, X_audio_test, y_train, y_test = train_test_split(X_video, X_audio, y, test_size=0.2, random_state=42)
    # print('Test Train split done.')

    # # Train models
    # # svm_video, svm_audio, scaler_video, scaler_audio, _, _, _ = train_model(X_video_train, X_audio_train, y_train)
    # svm_video = joblib.load("svm_model_video.pkl")
    # scaler_video = joblib.load("scaler_video.pkl")
    # svm_audio = joblib.load("svm_model_audio.pkl")
    # scaler_audio = joblib.load("scaler_audio.pkl")


    # # Calculate ROC curve for video on testing set
    # X_video_test_scaled = scaler_video.transform(X_video_test)
    # video_probs = svm_video.predict_proba(X_video_test_scaled)[:, 1]
    # fpr_video, tpr_video, _ = roc_curve(y_test, video_probs)
    # roc_auc_video = auc(fpr_video, tpr_video)

    # # Calculate ROC curve for audio on testing set
    # X_audio_test_scaled = scaler_audio.transform(X_audio_test)
    # audio_probs = svm_audio.predict_proba(X_audio_test_scaled)[:, 1]
    # fpr_audio, tpr_audio, _ = roc_curve(y_test, audio_probs)
    # roc_auc_audio = auc(fpr_audio, tpr_audio)

    # # Combine probabilities for multimodal on testing set
    # X_combined_scaled = np.hstack((X_video_test_scaled, X_audio_test_scaled))
    # svm_combined = SVC(kernel='linear', probability=True)
    # svm_combined.fit(X_combined_scaled, y_test)
    # multimodal_probs = svm_combined.predict_proba(X_combined_scaled)[:, 1]
    # fpr_multi, tpr_multi, _ = roc_curve(y_test, multimodal_probs)
    # roc_auc_multi = auc(fpr_multi, tpr_multi)

    # # Plot ROC curve for video
    # plt.figure(figsize=(8, 6))
    # plt.plot(fpr_video, tpr_video, color='#1f77b4', lw=2, linestyle='-', label='ROC curve for video (area = {:.2f})'.format(roc_auc_video))
    # plt.plot([0, 1], [0, 1], color='grey', lw=2, linestyle='--', label='Chance Level')
    # plt.xlim([0.0, 1.0])
    # plt.ylim([0.0, 1.05])
    # plt.xlabel('False Positive Rate', fontsize=12)
    # plt.ylabel('True Positive Rate', fontsize=12)
    # plt.title('Receiver Operating Characteristic - Video Model Performance', fontsize=14)
    # plt.legend(loc="lower right", fontsize=10)
    # plt.grid(True, which='both', linestyle='--', linewidth=0.5, color='grey')
    # plt.tight_layout()

    # # Plot ROC curve for audio
    # plt.figure(figsize=(8, 6))
    # plt.plot(fpr_audio, tpr_audio, color='#1f77b4', lw=2, linestyle='-', label='ROC curve for audio (area = {:.2f})'.format(roc_auc_audio))
    # plt.plot([0, 1], [0, 1], color='grey', lw=2, linestyle='--', label='Chance Level')
    # plt.xlim([0.0, 1.0])
    # plt.ylim([0.0, 1.05])
    # plt.xlabel('False Positive Rate', fontsize=12)
    # plt.ylabel('True Positive Rate', fontsize=12)
    # plt.title('Receiver Operating Characteristic - Audio Model Performance', fontsize=14)
    # plt.legend(loc="lower right", fontsize=10)
    # plt.grid(True, which='both', linestyle='--', linewidth=0.5, color='grey')
    # plt.tight_layout()

    # # Plot ROC curve for multimodal
    # plt.figure(figsize=(8, 6))
    # plt.plot(fpr_multi, tpr_multi, color='#1f77b4', lw=2, linestyle='-', label='ROC curve for multimodal (area = {:.2f})'.format(roc_auc_multi))
    # plt.plot([0, 1], [0, 1], color='grey', lw=2, linestyle='--', label='Chance Level')
    # plt.xlim([0.0, 1.0])
    # plt.ylim([0.0, 1.05])
    # plt.xlabel('False Positive Rate', fontsize=12)
    # plt.ylabel('True Positive Rate', fontsize=12)
    # plt.title('Receiver Operating Characteristic - Multimodal Model Performance', fontsize=14)
    # plt.legend(loc="lower right", fontsize=10)
    # plt.grid(True, which='both', linestyle='--', linewidth=0.5, color='grey')
    # plt.tight_layout()

    # # Show all plots
    # plt.show()

    # All we need to do is use the trained models and predict an output
    scaler_video = joblib.load("scaler_video.pkl")
    scaler_audio = joblib.load("scaler_audio.pkl")
    mlp_video = load_model('mlp_video_model.keras')
    mlp_audio = load_model('mlp_audio_model.keras')


    video_path = "/content/African_women_id00832.00078-segment_1.mp4"
    audio_path = "/content/African_women_id00832.00078-segment_1.mp4"

    # Extract features from provided video and audio paths
    video_features = extract_video_features(video_path)
    audio_features = extract_audio_features(audio_path)

    if video_features is None or audio_features is None:
        print("Error: Unable to extract features from the provided video or audio.")
        return

    # Scale features using training set scalers
    video_features_scaled = scaler_video.transform(video_features.reshape(1, -1))
    audio_features_scaled = scaler_audio.transform(audio_features.reshape(1, -1))

    # Predict using multimodal model
    multimodal_features_scaled = np.hstack((video_features_scaled, audio_features_scaled))
    multimodal_prediction = svm_combined.predict(multimodal_features_scaled)[0]

    print("Multimodal prediction: ", "deepfake" if multimodal_prediction == 1 else "real")

if __name__ == "__main__":
    evaluate_pretrained_model()