## XDU SCE 信息与内容安全-虚假人脸检测

给定一个人脸数据集，其中包含1999张真实人脸，1999张虚假人脸。将其中500
张真实人脸和500张虚假人脸作为训练集，其余作为测试集。

根据给定数据集训练训练一个虚假人脸检测器，该检测器本质就是一个二分类器。要求利用Pytorch框架任意设计一种神经网络模型进行分类，分类准确率越高越好(分类准确率和得分不相关)。


In [1]:
"""
    Face_Forensic-master
    Autor: ShenaoW
    Date: May 2nd, 2022
    Model: MesoNet (Best Acc ≈ 65.4%)
           ResNet18 Pretrained (Best Acc ≈ 99%)
           ResNet18 (trained with the given data only) (Best Acc ≈ 97%) 
"""

import os
import csv
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader, ConcatDataset, Subset
import torch.optim as optim
from torch.optim import lr_scheduler
import os
from torchvision import models, datasets, transforms
import warnings
warnings.filterwarnings("ignore")
from torch.functional import F

### Label Generation
从原数据生成labels.csv
每行数据为 (png_path, label)
用于构造dataset

In [2]:
def get_pic_path():

    real_list = []
    fake_list = []

    for root, dirs, files in os.walk("data/0_real", topdown=False):
        for file in files:
            real_list.append(file)
    real_list.sort()
    
    for root, dirs, files in os.walk("data/1_fake", topdown=False):
        for file in files:
            fake_list.append(file)
    fake_list.sort()

    return real_list, fake_list


def gen_label():

    real_list, fake_list = get_pic_path()

    real_writer = csv.writer(open("data/real_labels.csv", "w", encoding="utf8"))

    for real_png in real_list:
        real_writer.writerow((real_png, "1"))

    fake_writer = csv.writer(open("data/fake_labels.csv", "w", encoding="utf8"))

    for fake_png in fake_list:
        fake_writer.writerow((fake_png, "0"))
    

# gen_label()

### Transforms

In [3]:
data_transforms = transforms.Compose([
        # transforms.ToPILImage(),
        transforms.Resize((256, 256)),
        transforms.ToTensor(),
        transforms.Normalize([0.5]*3, [0.5]*3)
    ])

### Generate Dataset

#### 通过重写torch.utils.data.Dataset自定义数据集

annotations_file: real_labels.csv/fake_labels.csv

img_dir: "data/0_real"/"data/1_fake"

In [4]:
# import os
# import pandas as pd
# from torchvision.io import read_image

# class FaceDataset(Dataset):

#     def __init__(self, annotations_file, img_dir, transform=None, target_transform=None):
#         self.img_labels = pd.read_csv(annotations_file, header=None)
#         self.img_dir = img_dir
#         self.transform = transform
#         self.target_transform = target_transform

#     def __len__(self):
#         return len(self.img_labels)

#     def __getitem__(self, idx):
#         img_path = os.path.join(self.img_dir, self.img_labels.iloc[idx, 0])
#         image = read_image(img_path)
#         label = self.img_labels.iloc[idx, 1]
#         if self.transform:
#             image = self.transform(image)
#         if self.target_transform:
#             label = self.target_transform(label)
#         return image, label

# real_dataset = FaceDataset("data/real_labels.csv", "data/0_real", transform=data_transforms)
# fake_dataset = FaceDataset("data/fake_labels.csv", "data/1_fake", transform=data_transforms)


#### 通过torchvision.datasets.ImageFolder构造数据集

该方法比重写torch.utils.data.Dataset更简单，标签以文件名自动加载

为了后续方便划分，需要将real_data和fake_data区分，划分得到两个子集

In [5]:
full_dataset = datasets.ImageFolder("data", transform=data_transforms)
real_dataset = Subset(full_dataset, range(1999))
fake_dataset = Subset(full_dataset, range(1999, 3998))


### Split Dataset

train_dataset : real_face和fake_face各400张

dev_dataset : real_face和fake_face各100张 ( 按照题目要求共500张用于训练，train : dev = 8 : 2 )

test_dataset : real_face和fake_face各1000张

In [6]:
assert len(real_dataset)==1999 and len(fake_dataset)==1999

train_size, dev_size, test_size  = int(500*0.8), 500-int(500*0.8), 1499

assert train_size + dev_size + test_size == 1999

train_dataset = ConcatDataset((

    Subset(real_dataset, range(train_size)), 

    Subset(fake_dataset, range(train_size))

    ))

dev_dataset = ConcatDataset((
    
    Subset(real_dataset, range(dev_size)), 

    Subset(fake_dataset, range(dev_size))

    ))

test_dataset = ConcatDataset(( 
    
    Subset(real_dataset, range(test_size)), 

    Subset(fake_dataset, range(test_size))
    
    ))

assert len(train_dataset) == 2*train_size
assert len(dev_dataset) == 2*dev_size
assert len(test_dataset) == 2*test_size

### Load Datasets

In [7]:
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
dev_loader = DataLoader(dev_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=True)

train_dataset_size = len(train_dataset)
dev_dataset_size = len(dev_dataset)
test_dataset_size = len(test_dataset)


### MesoNet

best acc在70%左右，效果不理想，后面改用ResNet


cite：https://github.com/HongguLiu/MesoNet-Pytorch

Autor: Honggu Liu

Date: July 4, 2019

In [8]:
# class Meso4(nn.Module):
# 	"""
# 	Pytorch Implemention of Meso4
# 	Autor: Honggu Liu
# 	Date: July 4, 2019
# 	"""
# 	def __init__(self, num_classes=2)
# 		super(Meso4, self).__init__()
# 		self.num_classes = num_classes
# 		self.conv1 = nn.Conv2d(3, 8, 3, padding=1, bias=False)
# 		self.bn1 = nn.BatchNorm2d(8)
# 		self.relu = nn.ReLU(inplace=True)
# 		self.leakyrelu = nn.LeakyReLU(0.1)

# 		self.conv2 = nn.Conv2d(8, 8, 5, padding=2, bias=False)
# 		self.bn2 = nn.BatchNorm2d(16)
# 		self.conv3 = nn.Conv2d(8, 16, 5, padding=2, bias=False)
# 		self.conv4 = nn.Conv2d(16, 16, 5, padding=2, bias=False)
# 		self.maxpooling1 = nn.MaxPool2d(kernel_size=(2, 2))
# 		self.maxpooling2 = nn.MaxPool2d(kernel_size=(4, 4))
# 		#flatten: x = x.view(x.size(0), -1)
# 		self.dropout = nn.Dropout2d(0.5)
# 		self.fc1 = nn.Linear(16*8*8, 16)
# 		self.fc2 = nn.Linear(16, num_classes)

# 	def forward(self, input):
# 		x = self.conv1(input) #(8, 256, 256)
# 		x = self.relu(x)
# 		x = self.bn1(x)
# 		x = self.maxpooling1(x) #(8, 128, 128)

# 		x = self.conv2(x) #(8, 128, 128)
# 		x = self.relu(x)
# 		x = self.bn1(x)
# 		x = self.maxpooling1(x) #(8, 64, 64)

# 		x = self.conv3(x) #(16, 64, 64)
# 		x = self.relu(x)
# 		x = self.bn2(x)
# 		x = self.maxpooling1(x) #(16, 32, 32)

# 		x = self.conv4(x) #(16, 32, 32)
# 		x = self.relu(x)
# 		x = self.bn2(x)
# 		x = self.maxpooling2(x) #(16, 8, 8)

# 		x = x.view(x.size(0), -1) #(Batch, 16*8*8)
# 		x = self.dropout(x)
# 		x = self.fc1(x) #(Batch, 16)
# 		x = self.leakyrelu(x)
# 		x = self.dropout(x)
# 		x = self.fc2(x)

# 		return x

### ResNet Pretrained

In [9]:
class ResNetPretrained(nn.Module):
    def __init__(self):
        super(ResNetPretrained, self).__init__()
        resnet = torch.hub.load('pytorch/vision:v0.10.0', 'resnet18', pretrained=True)
        num_ftrs = resnet.fc.in_features
        resnet.fc = nn.Linear(num_ftrs, 2)
        self.model = resnet

    def forward(self, x):
        x = self.model(x)
        return F.log_softmax(x, dim=1)


### Model Train

In [10]:
epoches = 50
batch_size = 32
early_stop = 5
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model_name = "net.pkl"
model_path = "./output/best.pkl"
output_path = "./output"

if not os.path.exists(output_path):
    os.mkdir(output_path)

torch.backends.cudnn.benchmark=True

# model = Meso4()
# model = ResNetPretrained()
model = models.resnet18()

model = model.to(device)
criterion = nn.CrossEntropyLoss()
# optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9, weight_decay=0.001)
optimizer = optim.Adam(model.parameters(), lr=0.001, betas=(0.9, 0.999), eps=1e-08)
scheduler = lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.5)

best_model_wts = model.state_dict()
best_acc = 0.0
iteration = 0
early_stop_flag = 0

for epoch in range(epoches):
    print('Epoch {}/{}'.format(epoch+1, epoches))
    print('-'*10)

    # 模型训练
    model=model.train()
    train_loss = 0.0
    train_corrects = 0.0
    val_loss = 0.0
    val_corrects = 0.0

    for (image, labels) in train_loader:

        iter_loss = 0.0
        iter_corrects = 0.0
        image = image.to(device)
        labels = labels.to(device)
        optimizer.zero_grad()
        outputs = model(image)
        _, preds = torch.max(outputs.data, 1)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        iter_loss = loss.data.item()
        train_loss += iter_loss
        iter_corrects = torch.sum(preds == labels.data).to(torch.float32)
        train_corrects += iter_corrects
        iteration += 1
        # if not (iteration % 20):
            # print('iteration {} train loss: {:.4f} Acc: {:.4f}'.format(iteration, iter_loss / batch_size, iter_corrects / batch_size))
    
    epoch_loss = train_loss / train_dataset_size
    epoch_acc = train_corrects / train_dataset_size
    print('epoch train loss: {:.4f} Acc: {:.4f}'.format(epoch_loss, epoch_acc))

    # 模型评估
    model.eval()
    with torch.no_grad():

        for (image, labels) in dev_loader:

            image = image.to(device)
            labels = labels.to(device)
            outputs = model(image)
            _, preds = torch.max(outputs.data, 1)
            loss = criterion(outputs, labels)
            val_loss += loss.data.item()
            val_corrects += torch.sum(preds == labels.data).to(torch.float32)
            
        epoch_loss = val_loss / dev_dataset_size
        epoch_acc = val_corrects / dev_dataset_size
        print('epoch dev loss: {:.4f} Acc: {:.4f}'.format(epoch_loss, epoch_acc))
        
        if epoch_acc > best_acc:
            best_acc = epoch_acc
            best_model_wts = model.state_dict()
            early_stop_flag = 0
        else:
            early_stop_flag += 1 

    scheduler.step()

    # 每10个epoch保存模型的中间状态
    # if not (epoch % 10):
    #     torch.save(model.state_dict(), os.path.join(output_path, str(epoch) + '_' + model_name))
    
    if early_stop_flag >= early_stop:
        break
    
print('Best dev Acc: {:.4f}'.format(best_acc))
model.load_state_dict(best_model_wts)

torch.save(model.state_dict(), os.path.join(output_path, "best.pkl"))


Epoch 1/50
----------
epoch train loss: 0.0384 Acc: 0.5138
epoch dev loss: 0.0268 Acc: 0.5200
Epoch 2/50
----------
epoch train loss: 0.0107 Acc: 0.6388
epoch dev loss: 0.0155 Acc: 0.6100
Epoch 3/50
----------
epoch train loss: 0.0095 Acc: 0.6913
epoch dev loss: 0.0153 Acc: 0.5600
Epoch 4/50
----------
epoch train loss: 0.0093 Acc: 0.7100
epoch dev loss: 0.0130 Acc: 0.6250
Epoch 5/50
----------
epoch train loss: 0.0087 Acc: 0.7425
epoch dev loss: 0.0116 Acc: 0.6450
Epoch 6/50
----------
epoch train loss: 0.0081 Acc: 0.7862
epoch dev loss: 0.0142 Acc: 0.6500
Epoch 7/50
----------
epoch train loss: 0.0077 Acc: 0.7750
epoch dev loss: 0.0113 Acc: 0.6750
Epoch 8/50
----------
epoch train loss: 0.0074 Acc: 0.7887
epoch dev loss: 0.0276 Acc: 0.6100
Epoch 9/50
----------
epoch train loss: 0.0070 Acc: 0.7925
epoch dev loss: 0.0156 Acc: 0.6850
Epoch 10/50
----------
epoch train loss: 0.0068 Acc: 0.8288
epoch dev loss: 0.0218 Acc: 0.5200
Epoch 11/50
----------
epoch train loss: 0.0062 Acc: 0.8275

### Model Test

In [13]:
model_path  = "./output/best.pkl"

corrects = 0
acc = 0
# model = Meso4()
model = models.resnet18()

model.load_state_dict(torch.load(model_path))

if isinstance(model, torch.nn.DataParallel):
    model = model.module

model = model.to(device)
model.eval()

with torch.no_grad():

    for (image, labels) in test_loader:

        image = image.to(device)
        labels = labels.to(device)
        outputs = model(image)
        _, preds = torch.max(outputs.data, 1)
        corrects += torch.sum(preds == labels.data).to(torch.float32)

    acc = corrects / test_dataset_size
    
    print('Test Acc: {:.4f}'.format(acc))

Test Acc: 0.9013
