In [1]:
# ========== ----- ========== Import Libraries ========== ----- ========== #

from sklearn.metrics import accuracy_score
from sklearn.metrics import confusion_matrix
from sklearn.model_selection import train_test_split
from sklearn.model_selection import GridSearchCV
from sklearn import svm
import pandas as pd
import json
import re
import os
import cv2
import dlib
import numpy as np
from joblib import dump, load
from collections import Counter
import pickle
from LDP import LDP_TOP
from LDP import train
import sklearn
print(sklearn.__version__)


# ========== ----- ========== End ========== ----- ========== #


1.2.2


In [8]:
# ========== ----- ========== Dynamic Texture Class ========== ----- ========== #

class dynamicTexture:

    # ========== ----- ========== Class Initialization ========== ----- ========== #

    def __init__(self, dataset_version):
        """
        Initializes an instance of the dynamicTexture class.

        :param dataset_version: The version of the dataset ('cf23' or 'cf40').
        """
        assert dataset_version == 'cf23' or dataset_version == 'cf40', "dataset_version is not equal to 'cf23' or 'cf40'"
        self.dataset_version = dataset_version

# ========== ----- ========== Feature Extraction Train ========== ----- ========== #

    def feature_extraction_train(self, train_frame_folders, features_file_name, labels_file_name):

        # Face detector
        detector = dlib.get_frontal_face_detector()

        list_of_train_LDP = []
        list_of_train_labels = []

        for train_frame_folder in train_frame_folders:
            list_of_train_data = [f for f in os.listdir(
                train_frame_folder) if f.endswith('.mp4')]
            print(train_frame_folder, 'videos: ', len(list_of_train_data))
            for vid in list_of_train_data:
                frames = []
                # Video capturing constructor.
                cap = cv2.VideoCapture(os.path.join(train_frame_folder, vid))

                # (CAP_PROP_FPS) Returns frame rate of the video (#frames / second).
                frameRate = cap.get(5)
                while cap.isOpened():  # Returns true if video capturing has been initialized already.

                    # (CAP_PROP_POS_MSEC) Current position of the video file in milliseconds or video capture timestamp.
                    frameId = cap.get(1)
                    ret, frame = cap.read()  # The methods/functions combine VideoCapture::grab and VideoCapture::retrieve in one call. This is the most convenient method for reading video files or capturing data from decode and return the just grabbed frame. If no frames has been grabbed (camera has been disconnected, or there are no more frames in video file), the methods return false and the functions return NULL pointer.
                    if ret != True:
                        break

                    face_rects, scores, idx = detector.run(frame, 0)
                    for i, d in enumerate(face_rects):
                        x1 = d.left()
                        y1 = d.top()
                        x2 = d.right()
                        y2 = d.bottom()

                        crop_img = frame[y1:y2, x1:x2]
                        if crop_img is not None and crop_img.size > 0:
                            crop_img = cv2.resize(crop_img, (128, 128))
                            gray_scale_img = cv2.cvtColor(
                                crop_img, cv2.COLOR_BGR2GRAY)
                            frames.append(gray_scale_img)

                    # d = 3 seconds.
                    if frameId % ((int(frameRate)+1)*3) == 0:
                        if (len(frames) > 0):
                            frames_ldp = LDP_TOP(
                                np.array(frames).astype(np.float64))
                            list_of_train_LDP.append(frames_ldp)

                            if ((self.dataset_version == 'cf23' and train_frame_folder == 'datasets/cf23/faceforensics_dataset_train/original_sequences')
                                    or (self.dataset_version == 'cf40' and train_frame_folder == 'datasets/cf40/faceforensics_dataset_train/original_sequences')):
                                list_of_train_labels.append(0)
                            else:
                                list_of_train_labels.append(1)
                        frames = []

        with open(features_file_name, 'wb') as f:
            pickle.dump(list_of_train_LDP, f)

        with open(labels_file_name, 'wb') as f:
            pickle.dump(list_of_train_labels, f)

# ========== ----- ========== Feature Extraction Test ========== ----- ========== #

    def feature_extraction_test(self, test_frame_folders, features_file_name):

        # Face detector
        detector = dlib.get_frontal_face_detector()

        list_of_test_LDP = []

        index = 0
        for test_frame_folder in test_frame_folders:
            list_of_test_data = [f for f in os.listdir(
                test_frame_folder) if f.endswith('.mp4')]

            print(test_frame_folder, 'videos: ', len(list_of_test_data))
            for vid in list_of_test_data:
                list_of_test_LDP.append([])
                frames = []
                # Video capturing constructor.
                cap = cv2.VideoCapture(os.path.join(test_frame_folder, vid))

                # (CAP_PROP_FPS) Returns frame rate of the video (#frames / second).
                frameRate = cap.get(5)
                while cap.isOpened():  # Returns true if video capturing has been initialized already.
                    # (CAP_PROP_POS_MSEC) Current position of the video file in milliseconds or video capture timestamp.
                    frameId = cap.get(1)
                    ret, frame = cap.read()  # The methods/functions combine VideoCapture::grab and VideoCapture::retrieve in one call. This is the most convenient method for reading video files or capturing data from decode and return the just grabbed frame. If no frames has been grabbed (camera has been disconnected, or there are no more frames in video file), the methods return false and the functions return NULL pointer.
                    if ret != True:
                        break

                    face_rects, scores, idx = detector.run(frame, 0)
                    for i, d in enumerate(face_rects):
                        x1 = d.left()
                        y1 = d.top()
                        x2 = d.right()
                        y2 = d.bottom()

                        crop_img = frame[y1:y2, x1:x2]
                        if crop_img is not None and crop_img.size > 0:
                            crop_img = cv2.resize(crop_img, (128, 128))
                            gray_scale_img = cv2.cvtColor(
                                crop_img, cv2.COLOR_BGR2GRAY)
                            frames.append(gray_scale_img)

                    # d = 3 seconds.
                    if frameId % ((int(frameRate)+1)*3) == 0:
                        if (len(frames) > 0):
                            frames_ldp = LDP_TOP(
                                np.array(frames).astype(np.float64))
                            list_of_test_LDP[index].append(frames_ldp)

                        frames = []
                index += 1

        with open(features_file_name, 'wb') as f:
            pickle.dump(list_of_test_LDP, f)

# ========== ----- ========== Loading Features ========== ----- ========== #

    def loading_features(self, features_file_name, labels_file_name):

        # Loading the list_of_train_LDP
        with open(features_file_name, 'rb') as f:
            list_of_train_LDP = pickle.load(f)

        # Loading the list_of_train_labels
        with open(labels_file_name, 'rb') as f:
            list_of_train_labels = pickle.load(f)

        return list_of_train_LDP, list_of_train_labels

# ========== ----- ========== Train ========== ----- ========== #

    def train(self, list_of_train_LDP, list_of_train_labels, rbf_model_name, linear_model_name):

        model = svm.SVC(kernel='rbf')
        model.fit(list_of_train_LDP, list_of_train_labels)
        # Save the trained model to a file
        dump(model, rbf_model_name)
        # ================================================ #
        model = svm.SVC(kernel='linear')
        model.fit(list_of_train_LDP, list_of_train_labels)
        # Save the trained model to a file
        dump(model, linear_model_name)

# ========== ----- ========== Single Technique Test ========== ----- ========== #

    def single_technique_test(self, list_of_test_LDP, rbf_model, linear_model):
        rbf_predictions = []
        linear_predictions = []

        for vid in list_of_test_LDP:
            y_pred = rbf_model.predict(vid)
            if y_pred.tolist().count(1) >= y_pred.tolist().count(0):
                rbf_predictions.append(1)
            else:
                rbf_predictions.append(0)
            # ============================================= #
            y_pred = linear_model.predict(vid)
            if y_pred.tolist().count(1) >= y_pred.tolist().count(0):
                linear_predictions.append(1)
            else:
                linear_predictions.append(0)

            labels = [0]*140 + [1]*140

        # calculate the accuracy score and print it out
        accuracy = accuracy_score(labels, rbf_predictions)
        print("RBF Accuracy:", accuracy)
        # calculate the confusion matrix and print it out
        cm = confusion_matrix(labels, rbf_predictions)
        print("RBF Confusion matrix:\n", cm, "\n")
        # calculate the accuracy score and print it out
        accuracy = accuracy_score(labels, linear_predictions)
        print("Linear Accuracy:", accuracy)
        # calculate the confusion matrix and print it out
        cm = confusion_matrix(labels, linear_predictions)
        print("Linear Confusion matrix:\n", cm)

# ========== ----- ========== Multiple Technique Test With Binary Models ========== ----- ========== #

    def multiple_technique_test_with_binary_models(self, list_of_test_LDP, linear_models):
        linear_predictions = []
        multi_linear_predictions = []
        for vid in list_of_test_LDP:
            y_pred = []
            y_score = []

            df_pred = linear_models[0].predict(vid)
            if df_pred.tolist().count(1) >= df_pred.tolist().count(0):
                y_score.append(df_pred.tolist().count(1))
                y_pred.append(1)
            else:
                y_score.append(0)
                y_pred.append(0)
            # ============================================= #
            if self.dataset_version == 'cf23':
                f2f_pred = linear_models[1].predict(vid)
                if f2f_pred.tolist().count(1) >= f2f_pred.tolist().count(0):
                    y_score.append(f2f_pred.tolist().count(1))
                    y_pred.append(1)
                else:
                    y_score.append(0)
                    y_pred.append(0)
            # ============================================= #
            fsw_pred = linear_models[2].predict(vid)
            if fsw_pred.tolist().count(1) >= fsw_pred.tolist().count(0):
                y_score.append(fsw_pred.tolist().count(1))
                y_pred.append(1)
            else:
                y_score.append(0)
                y_pred.append(0)
            # ============================================= #
            if self.dataset_version == 'cf23':
                nt_pred = linear_models[3].predict(vid)
                if nt_pred.tolist().count(1) >= nt_pred.tolist().count(0):
                    y_score.append(nt_pred.tolist().count(1))
                    y_pred.append(1)
                else:
                    y_score.append(0)
                    y_pred.append(0)
            # ============================================= #
            if y_pred.count(1) > 0:
                linear_predictions.append(1)
                multi_linear_predictions.append(y_score.index(max(y_score))+1)
            else:
                linear_predictions.append(0)
                multi_linear_predictions.append(0)

        if self.dataset_version == 'cf23':
            labels = [0]*140 + [1]*560
        elif self.dataset_version == 'cf40':
            labels = [0]*140 + [1]*280

        # calculate the accuracy score and print it out
        accuracy = accuracy_score(labels, linear_predictions)
        print("Linear Accuracy:", accuracy)

        # calculate the confusion matrix and print it out
        cm = confusion_matrix(labels, linear_predictions)
        print("Linear Confusion matrix:\n", cm)
        # ============================================= #
        if self.dataset_version == 'cf23':
            labels = [0]*140 + [1]*140 + [2]*140 + [3]*140 + [4]*140
        elif self.dataset_version == 'cf40':
            labels = [0]*140 + [1]*140 + [3]*140

        # calculate the accuracy score and print it out
        accuracy = accuracy_score(labels, multi_linear_predictions)
        print("Multi-Linear Accuracy:", accuracy)

        # calculate the confusion matrix and print it out
        cm = confusion_matrix(labels, multi_linear_predictions)
        print("Multi-Linear Confusion matrix:\n", cm)

# ========== ----- ========== Multiple Technique Test With Multi Models ========== ----- ========== #

    def multiple_technique_test_with_multi_model(self, list_of_test_LDP, multi_model):
        linear_predictions = []
        multi_predictions = []

        for vid in list_of_test_LDP:
            y_pred = multi_model.predict(vid)
            # print(y_pred)
            if y_pred.tolist().count(0) > (len(y_pred) // 2):
                linear_predictions.append(0)
            else:
                linear_predictions.append(1)

            count_dict = Counter(y_pred)
            most_frequent_number = max(count_dict, key=count_dict.get)
            multi_predictions.append(most_frequent_number)

        labels = [0]*140 + [1]*560
        # calculate the accuracy score and print it out
        accuracy = accuracy_score(labels, linear_predictions)
        print("Linear Accuracy:", accuracy)
        # calculate the confusion matrix and print it out
        cm = confusion_matrix(labels, linear_predictions)
        print("Linear Confusion matrix:\n", cm)
        # ============================================= #
        labels = [0]*140 + [1]*140 + [2]*140 + [3]*140 + [4]*140
        # calculate the accuracy score and print it out
        accuracy = accuracy_score(labels, multi_predictions)
        print("Multi-Linear Accuracy:", accuracy)
        # calculate the confusion matrix and print it out
        cm = confusion_matrix(labels, multi_predictions)
        print("Multi-Linear Confusion matrix:\n", cm)

# ========== ----- ========== End ========== ----- ========== #
