In [1]:
from PIL import Image
import torch
from torch.utils.data import Dataset
import os
import random
from torchvision import transforms
import torch.nn as nn
import torch.optim as optim
import timm
import numpy as np


In [2]:
# Chemin vers le dossier principal contenant les données
data_folder_train = './project_data/train'
data_folder_validation = './project_data/val'
# Chemin vers le fichier texte de sortie
train_file = './train.txt'
val_file = "./val.txt"
test_file = "./test.txt"

In [3]:
def file_exist(path) :
    if os.path.exists(path):
        os.remove(path)
        print(f"Le fichier {path} a été supprimé avec succès.")
    else:
        print(f"Le fichier {path} n'existe pas.")

def get_labels_from_path(image_path, base_folder):
    # Obtenez le chemin relatif par rapport au dossier principal
    relative_path = os.path.relpath(image_path, start=base_folder)
    # Séparez le chemin relatif en éléments
    path_elements = os.path.dirname(relative_path).split(os.path.sep)
    label = path_elements[0]
    if label == "FakeManipulation-1" or label == "FakeManipulation-2" :
        return "1,2"
    elif label == "FakeManipulation-3" or label == "FakeManipulation-4" :
        return "1,3"
    elif label == "FakeManipulation-5" :
        return "1,4"
    elif label == "Real-1" or label == "Real-2" or label == "Real-3" or label == "Real-4" :
        return "0"
    else :
        print("error")



# Ouvrir le fichier en mode écriture
def create_txt(data_folder, train) :
    if (train) :
        output_txt_file = train_file
        txt_file = open(output_txt_file, 'a')
        # Parcourir les dossiers et sous-dossiers
        for root, dirs, files in os.walk(data_folder):
            for file in files:
                # Vérifier si le fichier est une image (vous pouvez ajuster les extensions selon votre cas)
                if file.endswith('.jpg') :
                    # Chemin complet du fichier
                    image_path = os.path.join(root, file)

                    # Obtenir les labels à partir du chemin du fichier ou de toute autre méthode appropriée
                    labels = get_labels_from_path(image_path, data_folder)

                    # Écrire dans le fichier texte
                    txt_file.write(f'{image_path} {labels}\n')     
                else :
                    print("Error")
        txt_file.close()
    else :
        indice = 0
        output_txt_file_val = val_file
        output_txt_file_test = test_file
        txt_file_val = open(output_txt_file_val, 'a')
        txt_file_test = open(output_txt_file_test, 'a')

        for root, dirs, files in os.walk(data_folder):
            for file in files:
                indice += 1
                # Vérifier si le fichier est une image (vous pouvez ajuster les extensions selon votre cas)
                if file.endswith('.jpg') :
                    # Chemin complet du fichier
                    image_path = os.path.join(root, file)

                    # Obtenir les labels à partir du chemin du fichier ou de toute autre méthode appropriée
                    labels = get_labels_from_path(image_path, data_folder)
                    # Écrire dans le fichier texte
                    if indice % 2 == 0 :
                        txt_file_val.write(f'{image_path} {labels}\n')     
                    else :               
                        txt_file_test.write(f'{image_path} {labels}\n')     
                else :
                    print("Error")
        txt_file_val.close()
        txt_file_test.close()


file_exist(train_file)
file_exist(val_file)
file_exist(test_file)

create_txt(data_folder_train, 1)
create_txt(data_folder_validation, 0)

Le fichier ./train.txt a été supprimé avec succès.
Le fichier ./val.txt a été supprimé avec succès.
Le fichier ./test.txt a été supprimé avec succès.


In [4]:
import torchvision.transforms as transforms
from torchvision.datasets import ImageFolder
from torch.utils.data import DataLoader

# Définir les transformations
transform_augmented = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomRotation(degrees=15),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])



In [41]:
list_of_sub_labels = ["Identity", "Expression", "Head Pose"]

from torch.utils.data import Dataset
from torchvision import transforms
from PIL import Image

class MyDataset(Dataset):
    def __init__(self, txt_path, transform=None, target_transform=None):
        fh = open(txt_path, 'r', encoding="latin-1")
        imgs = []
        for line in fh:
            line = line.rstrip()
            words = line.split(sep=' ')
            img_path = words[0]
            labels_str = words[1]

            # Séparer les labels principaux et les sous-labels (s'ils existent)
            label_parts = labels_str.split(',')
            main_label = int(label_parts[0])  # Label principal

            # Si des sous-labels sont présents, convertir en liste d'entiers
            sub_labels = list(map(int, label_parts[1:])) if len(label_parts) > 1 else []

            imgs.append((img_path, main_label, sub_labels))
        self.imgs = imgs
        self.transform = transform
        self.target_transform = target_transform
        fh.close()

    def get_label_from_path(self, path):
        # Fonction pour extraire le label à partir du chemin du fichier
        # Exemple de structure du texte : "image_1.jpg 1,3,5" (1 pour Fake, 3 et 5 pour des sous-labels)
        with open(path, 'r', encoding="latin-1") as file:
            line = file.readline().strip().split(' ')
            
            # Vérifier si la ligne contient au moins deux éléments
            if len(line) >= 2:
                label_list = list(map(int, line[1].split(',')))
                return label_list
            else:
                # Gérer le cas où la ligne ne contient pas de sous-labels
                return []


    def encode_labels(self, label_list):
        # Fonction pour encoder les labels en un vecteur de 0 et 1
        # Exemple : [0, 1, 4] -> [1, 1, 0, 0, 1]
        # Ici, nous supposons qu'il y a 5 classes au total (0, 1, 2, 3, 4)
        encoded_label = [1 if i in label_list else 0 for i in range(5)]
        return encoded_label
    

    

    def __getitem__(self, index):
        item = self.imgs[index]
        print(item)
        if len(item) == 2:
            fn, _ = item
            sub_labels = []  # Aucun sous-label
        elif len(item) == 3:
            fn, _, it = item
            sub_labels = it  # Sous-labels présents
        else:
            raise ValueError(f"Expected a tuple with 2 or 3 elements, got {len(item)} elements.")

        print(f"Filename: {fn}, Main Label: {_}, Sub Labels: {sub_labels}")


        img = Image.open(fn).convert('RGB')

        label_list = _ + sub_labels


        # Utiliser la fonction d'encodage pour obtenir la représentation numérique du label
        encoded_label = self.encode_labels(label_list)
        print(enc)

        if self.transform is not None:
            img = self.transform(img)

        return img, encoded_label


    

    def __len__(self):
        return len(self.imgs)

In [42]:


# Créer des ensembles de données
train_data = MyDataset(txt_path=train_file, transform=transform_augmented)
val_data = MyDataset(txt_path=val_file, transform=transform)
test_data = MyDataset(txt_path=test_file, transform=transform)

# Créer des chargeurs de données
batch_size = 32
train_loader = DataLoader(dataset=train_data, batch_size=32, shuffle=True, num_workers=4)
val_loader = DataLoader(dataset=val_data, batch_size=32, shuffle=True, num_workers=4)
test_loader = DataLoader(dataset=test_data, batch_size=32, shuffle=True, num_workers=4)

#Vérifier si la data augmentation fonctionne
print(len(train_data))
print(len(train_loader.dataset))

total_samples_after_augmentation = len(train_loader) * batch_size
print("Nombre total d'échantillons après augmentation :", total_samples_after_augmentation)





49061
49061
Nombre total d'échantillons après augmentation : 49088


In [None]:
import matplotlib.pyplot as plt
import numpy as np

# Afficher quelques images avant et après l'augmentation
num_samples_to_display = 4

# Obtenir quelques échantillons du DataLoader
sample_images, _ = next(iter(train_loader))

# Afficher les images originales
fig, axs = plt.subplots(2, num_samples_to_display, figsize=(12, 4))
for i in range(num_samples_to_display):
    axs[0, i].imshow(np.transpose(sample_images[i].numpy(), (1, 2, 0)))
    axs[0, i].axis('off')

plt.show()


In [20]:
import timm



# Charger EfficientNet B3 préentraîné
model = timm.create_model('efficientnet_b3', pretrained=True)

# Remplacer la dernière couche pour s'adapter à la classification multilabel
num_ftrs = model.classifier.in_features
num_sub_labels = len(list_of_sub_labels)  # Remplacez par la liste réelle de sous-labels
model.classifier = nn.Sequential(
    nn.Linear(num_ftrs, num_sub_labels),
    nn.Sigmoid()  # Utilisez la fonction d'activation Sigmoid pour la classification multilabel
)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
torch.cuda.empty_cache()
model = model.to(device)



In [8]:
from tqdm import tqdm

In [21]:
print('criterion')
criterion = nn.BCELoss()
print('optimizer')
optimizer = optim.Adam(model.parameters(), lr=0.001)

criterion
optimizer


In [44]:
# Fonction d'entraînement
def train_epoch(model, loader, criterion, optimizer):
    print('train')
    with torch.autograd.profiler.profile(use_cuda=True) as prof:
        model.train()
    running_loss = 0.0

    with tqdm(total=len(loader), desc='Training') as pbar:
        for inputs, labels in loader:
            print('starting')
            inputs, labels = inputs.to(device), labels.to(device)
            print('opti')
            optimizer.zero_grad()
            print('output')
            outputs = model(inputs)
            print('loss')
            loss = criterion(outputs, labels.float())
            print('backward')
            loss.backward()
            print('optimiser')
            optimizer.step()
            print('running loss')
            running_loss += loss.item()
            pbar.set_postfix({'Loss': running_loss / (pbar.n + 1)})
            pbar.update(1)

    return running_loss / len(loader)

# Fonction de validation
def validate(model, loader, criterion):
    model.eval()
    running_loss = 0.0

    with torch.no_grad():
        for inputs, labels in loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels.float())
            running_loss += loss.item()

    return running_loss / len(loader)

# Entraînement sur plusieurs époques
num_epochs = 2

for epoch in range(num_epochs):
    print('first epoch')
    # Entraînement
    train_loss = train_epoch(model, train_loader, criterion, optimizer)
    print(f'Epoch {epoch + 1}/{num_epochs}, Train Loss: {train_loss:.4f}')

    # Validation
    val_loss = validate(model, val_loader, criterion)
    print(f'Epoch {epoch + 1}/{num_epochs}, Validation Loss: {val_loss:.4f}')

# Enregistrement du modèle après l'entraînement
torch.save(model.state_dict(), 'efficientnet_b3_model.pth')


STAGE:2023-12-09 17:02:27 72960:72960 ActivityProfilerController.cpp:312] Completed Stage: Warm Up
STAGE:2023-12-09 17:02:27 72960:72960 ActivityProfilerController.cpp:318] Completed Stage: Collection
STAGE:2023-12-09 17:02:27 72960:72960 ActivityProfilerController.cpp:322] Completed Stage: Post Processing


first epoch
train


Training:   0%|          | 0/1534 [00:00<?, ?it/s]

('./project_data/train/Real-4/val_perturb_release/12/56131b6bf0458b312aff71cf427d451f/frame00056.jpg', 0, [])
('./project_data/train/FakeManipulation-5/1f388749295cf57ac1226ebee0687f5d/6a384ab62e27fc525476381369dcebcc/frame00006.jpg', 1, [4])Filename: ./project_data/train/Real-4/val_perturb_release/12/56131b6bf0458b312aff71cf427d451f/frame00056.jpg, Main Label: 0, Sub Labels: []('./project_data/train/Real-2/val_perturb_release/10/8edbae6e898509d71b3bfd0375d8bd33/frame00028.jpg', 0, [])
('./project_data/train/FakeManipulation-2/1787aa3248c0e61ac024b66c2e3c213d/5dfd71ad10ec10f34915421949cc2c99/frame00101.jpg', 1, [2])

Filename: ./project_data/train/Real-2/val_perturb_release/10/8edbae6e898509d71b3bfd0375d8bd33/frame00028.jpg, Main Label: 0, Sub Labels: []Filename: ./project_data/train/FakeManipulation-5/1f388749295cf57ac1226ebee0687f5d/6a384ab62e27fc525476381369dcebcc/frame00006.jpg, Main Label: 1, Sub Labels: [4]
Filename: ./project_data/train/FakeManipulation-2/1787aa3248c0e61ac024b66

In [None]:
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from sklearn.metrics import accuracy_score, precision_score, recall_score, roc_auc_score
from sklearn.metrics import roc_curve, auc
import matplotlib.pyplot as plt
from torchvision import transforms


# Charger le modèle entraîné
model.load_state_dict(torch.load('efficientnet_b3_model.pth'))
model.eval()

# Définir la fonction de perte
criterion = nn.BCEWithLogitsLoss()

# Définir les transformations pour le test set
transform_test = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

# Charger le test set
test_dataset = MyDataset(txt_path="path/to/your/test_dataset.txt", transform=transform_test)
test_loader = DataLoader(dataset=test_dataset, batch_size=batch_size, shuffle=False, num_workers=4)

# Variables pour stocker les prédictions et les vraies étiquettes
all_predictions = []
all_labels = []

# Fonction pour évaluer le modèle
def evaluate(model, loader, criterion):
    model.eval()
    running_loss = 0.0

    with torch.no_grad():
        for inputs, labels in loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels.float())
            running_loss += loss.item()

            # Ajouter les prédictions et les étiquettes au fur et à mesure
            all_predictions.append(outputs.sigmoid().cpu().numpy())
            all_labels.append(labels.cpu().numpy())

    return running_loss / len(loader)

# Évaluation sur le test set
test_loss = evaluate(model, test_loader, criterion)

# Convertir les listes en tableaux numpy
all_predictions = np.vstack(all_predictions)
all_labels = np.vstack(all_labels)

# Appliquer un seuil de décision pour convertir les probabilités en prédictions binaires
threshold = 0.5
binary_predictions = (all_predictions > threshold).astype(int)

# Calcul des différentes métriques
accuracy = accuracy_score(all_labels, binary_predictions)
precision = precision_score(all_labels, binary_predictions, average='macro')
recall = recall_score(all_labels, binary_predictions, average='macro')
roc_auc = roc_auc_score(all_labels, all_predictions, average='macro')

# Afficher les résultats
print(f'Test Loss: {test_loss:.4f}')
print(f'Accuracy: {accuracy:.4f}')
print(f'Precision: {precision:.4f}')
print(f'Recall: {recall:.4f}')
print(f'ROC AUC: {roc_auc:.4f}')

# Courbe ROC
fpr, tpr, _ = roc_curve(all_labels.flatten(), all_predictions.flatten())
roc_auc = auc(fpr, tpr)

plt.figure(figsize=(8, 8))
plt.plot(fpr, tpr, color='darkorange', lw=2, label='ROC curve (area = {:.2f})'.format(roc_auc))
plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver Operating Characteristic (ROC)')
plt.legend(loc='lower right')
plt.show()


In [None]:
import torch
import torch.nn as nn
import torchvision
from torch.utils.data import DataLoader
import torch.optim as optim
from torch.optim import lr_scheduler
import argparse
import os
import cv2

from network.models import model_selection
from network.mesonet import Meso4, MesoInception4
from dataset.transform import xception_default_data_transforms
from dataset.mydataset import MyDataset
def main():
	args = parse.parse_args()
	name = args.name
	continue_train = args.continue_train
	train_list = args.train_list
	val_list = args.val_list
	epoches = args.epoches
	batch_size = args.batch_size
	model_name = args.model_name
	model_path = args.model_path
	output_path = os.path.join('./output', name)
	if not os.path.exists(output_path):
		os.mkdir(output_path)
	torch.backends.cudnn.benchmark=True
	train_dataset = MyDataset(txt_path=train_list, transform=xception_default_data_transforms['train'])
	val_dataset = MyDataset(txt_path=val_list, transform=xception_default_data_transforms['val'])
	train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True, drop_last=False, num_workers=8)
	val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=batch_size, shuffle=True, drop_last=False, num_workers=8)
	train_dataset_size = len(train_dataset)
	val_dataset_size = len(val_dataset)
	model = model_selection(modelname='xception', num_out_classes=2, dropout=0.5)
	if continue_train:
		model.load_state_dict(torch.load(model_path))
	model = model.cuda()
	criterion = nn.CrossEntropyLoss()
	optimizer = optim.Adam(model.parameters(), lr=0.001, betas=(0.9, 0.999), eps=1e-08)
	scheduler = lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.5)
	model = nn.DataParallel(model)
	best_model_wts = model.state_dict()
	best_acc = 0.0
	iteration = 0
	for epoch in range(epoches):
		print('Epoch {}/{}'.format(epoch+1, epoches))
		print('-'*10)
		model.train()
		train_loss = 0.0
		train_corrects = 0.0
		val_loss = 0.0
		val_corrects = 0.0
		for (image, labels) in train_loader:
			iter_loss = 0.0
			iter_corrects = 0.0
			image = image.cuda()
			labels = labels.cuda()
			optimizer.zero_grad()
			outputs = model(image)
			_, preds = torch.max(outputs.data, 1)
			loss = criterion(outputs, labels)
			loss.backward()
			optimizer.step()
			iter_loss = loss.data.item()
			train_loss += iter_loss
			iter_corrects = torch.sum(preds == labels.data).to(torch.float32)
			train_corrects += iter_corrects
			iteration += 1
			if not (iteration % 20):
				print('iteration {} train loss: {:.4f} Acc: {:.4f}'.format(iteration, iter_loss / batch_size, iter_corrects / batch_size))
		epoch_loss = train_loss / train_dataset_size
		epoch_acc = train_corrects / train_dataset_size
		print('epoch train loss: {:.4f} Acc: {:.4f}'.format(epoch_loss, epoch_acc))

		model.eval()
		with torch.no_grad():
			for (image, labels) in val_loader:
				image = image.cuda()
				labels = labels.cuda()
				outputs = model(image)
				_, preds = torch.max(outputs.data, 1)
				loss = criterion(outputs, labels)
				val_loss += loss.data.item()
				val_corrects += torch.sum(preds == labels.data).to(torch.float32)
			epoch_loss = val_loss / val_dataset_size
			epoch_acc = val_corrects / val_dataset_size
			print('epoch val loss: {:.4f} Acc: {:.4f}'.format(epoch_loss, epoch_acc))
			if epoch_acc > best_acc:
				best_acc = epoch_acc
				best_model_wts = model.state_dict()
		scheduler.step()
		#if not (epoch % 40):
		torch.save(model.module.state_dict(), os.path.join(output_path, str(epoch) + '_' + model_name))
	print('Best val Acc: {:.4f}'.format(best_acc))
	model.load_state_dict(best_model_wts)
	torch.save(model.module.state_dict(), os.path.join(output_path, "best.pkl"))




if __name__ == '__main__':
	parse = argparse.ArgumentParser(
		formatter_class=argparse.ArgumentDefaultsHelpFormatter)
	parse.add_argument('--name', '-n', type=str, default='fs_xception_c0_299')
	parse.add_argument('--train_list', '-tl' , type=str, default = './data_list/FaceSwap_c0_train.txt')
	parse.add_argument('--val_list', '-vl' , type=str, default = './data_list/FaceSwap_c0_val.txt')
	parse.add_argument('--batch_size', '-bz', type=int, default=64)
	parse.add_argument('--epoches', '-e', type=int, default='20')
	parse.add_argument('--model_name', '-mn', type=str, default='fs_c0_299.pkl')
	parse.add_argument('--continue_train', type=bool, default=False)
	parse.add_argument('--model_path', '-mp', type=str, default='./output/df_xception_c0_299/1_df_c0_299.pkl')
	main()

In [None]:
import torch
import torch.nn as nn
import torchvision
from torch.utils.data import DataLoader
import torch.optim as optim
from torch.optim import lr_scheduler
import argparse
import os
import cv2
from network.models import model_selection
from dataset.transform import xception_default_data_transforms
from dataset.mydataset import MyDataset
def main():
	args = parse.parse_args()
	test_list = args.test_list
	batch_size = args.batch_size
	model_path = args.model_path
	torch.backends.cudnn.benchmark=True
	test_dataset = MyDataset(txt_path=test_list, transform=xception_default_data_transforms['test'])
	test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size, shuffle=True, drop_last=True, num_workers=8)
	test_dataset_size = len(test_dataset)
	corrects = 0
	acc = 0
	#model = torchvision.models.densenet121(num_classes=2)
	model = model_selection(modelname='xception', num_out_classes=2, dropout=0.5)
	model.load_state_dict(torch.load(model_path))
	if isinstance(model, torch.nn.DataParallel):
		model = model.module
	model = model.cuda()
	model.eval()
	with torch.no_grad():
		for (image, labels) in test_loader:
			image = image.cuda()
			labels = labels.cuda()
			outputs = model(image)
			_, preds = torch.max(outputs.data, 1)
			corrects += torch.sum(preds == labels.data).to(torch.float32)
			print('Iteration Acc {:.4f}'.format(torch.sum(preds == labels.data).to(torch.float32)/batch_size))
		acc = corrects / test_dataset_size
		print('Test Acc: {:.4f}'.format(acc))



if __name__ == '__main__':
	parse = argparse.ArgumentParser(
		formatter_class=argparse.ArgumentDefaultsHelpFormatter)
	parse.add_argument('--batch_size', '-bz', type=int, default=32)
	parse.add_argument('--test_list', '-tl', type=str, default='./data_list/Deepfakes_c0_test.txt')
	parse.add_argument('--model_path', '-mp', type=str, default='./pretrained_model/df_c0_best.pkl')
	main()
	print('Hello world!!!')

In [None]:
def test(model, data_path):
    Accuracy, Recall, Precision, AUC = 0, 0, 0
    """
    You need to finish this function.
    """
    return Accuracy, Recall, Precision, AUC