In [None]:
import numpy as np 
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, random_split
import torchvision.transforms as T
import random
from PIL import Image
import glob
import pandas as pd
import matplotlib.pyplot as plt
import plotly.express as px
import time 
from sklearn.metrics import confusion_matrix, recall_score, precision_score, f1_score, accuracy_score, classification_report

In [None]:
class Dataset:
    def __init__(self,x,y):
        self.data = x
        self.targets = y.astype('int64')

    def __len__(self):
        return len(self.targets)

    def __getitem__(self, index):
        x = self.data[index]
        transform = T.Compose([ T.ToTensor() ])
        return transform(x)  ,  self.targets[index]

In [None]:
#The image of dataset was read by traversing the directory of the dataset (REAL and FAKE), 
#and the image was converted to unmpy array format to construct the dataset.
classes = ["REAL","FAKE"]
x, y = [],[]
class_dict = {'REAL':0, 'FAKE':1}
for c in classes:
    paths = glob.glob('./archive/train/'+c+'/*')
    for p in paths:
        img = Image.open(p)
        imgarray = np.asarray(img)
        x.append(imgarray)
        y.append(class_dict[c]) 
x , y = np.array(x), np.array(y)

print("=> {} has been load in training set".format(len(y)))
test_set = Dataset(x, y)

In [None]:
#The proportion of training set and verification set is 80% and 20% respectively
train_set, val_set = random_split(dataset=train_set,
                                  lengths=[80000, 20000])
print('Training Set, Validatoin Set, Testing set：')
print(len(train_set), len(val_set), len(test_set))
print()

In [None]:
#Displays one image for each category
img0 = train_set.data[0]
img1 = test_set.data[0]
fig = plt.figure(figsize=(6, 3))
plt.subplot(1, 2, 1)
plt.imshow(img0)
plt.title('real')
plt.axis('off')
plt.subplot(1, 2, 2)
plt.imshow(img1)
plt.title('fake')
plt.axis('off')
plt.show()

In [None]:
#Training function, the input parameter is the epoch of training, loss function, etc
def train(num_epochs, batch_size, criterion, optimizer, model, 
          train_dataset, val_dataset=None, ce=False, verbose=True, type=None):
    train_error = []
    val_error = []
    train_loader = DataLoader(train_dataset, batch_size, shuffle=True)
    if val_dataset is not None:
        val_loader = DataLoader(val_dataset, batch_size, shuffle=True, drop_last=True)
    debut_calcul = time.perf_counter()
    for epoch in range(num_epochs):
        train_epoch_average_loss = 0.0
        val_epoch_average_loss = 0.0
        model.train()
        for (images,labels) in train_loader:
            if type == 'linear':
                y_pre = model(images.view(batch_size, -1))
            else:
                y_pre = model(images) 
                
            if not ce:
                labels_one_hot = torch.FloatTensor(batch_size, 2)
                labels_one_hot.zero_()
                labels_one_hot.scatter_(1, labels.view(-1, 1), 1)
                labels = labels_one_hot
            loss = criterion(y_pre, labels)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            train_epoch_average_loss += loss.item() * batch_size / len(train_dataset)
        train_error.append(train_epoch_average_loss)
    

In [None]:
# Model evaluation. dataloder is used to evaluate and calculate true and predicted values. Calculate the obfuscation function, 
# precision shield and accuracy, and finally output the obfuscation function and evaluation results
#
def evaluation(model, dataset, batch_size, num_epochs, type=None, show=True): 
    val_loader = DataLoader(dataset, batch_size, shuffle=True, drop_last=True)
    model.eval()
    y_pred = []
    y_true = []
    for (images,labels) in val_loader:
        with torch.no_grad():
            if type == 'linear':
                y_pre = model(images.view(batch_size, -1))
            else:
                y_pre = model(images) 
        y_pred.extend(torch.argmax(y_pre, dim=1).tolist())
        y_true.extend(labels.tolist())
    labels = ['real', 'fake']
    cm = confusion_matrix(y_true, y_pred, normalize='true')
    f_1 = f_1_score(y_true, y_pred)
    acc = accuracy_score(y_true, y_pred)
    if show:
        ax = sns.heatmap(cm, annot=True, fmt=".1%", cmap="viridis",  vmin=0., vmax=1., annot_kws={"size": 18},)
        ax.set_ylabel("True Label")
        ax.set_yticks(np.arange(2)+0.5)
        ax.set_yticklabels(["fine","bad"])
        ax.set_xlabel("Predict Label")
        ax.set_xticks(np.arange(2)+0.5)
        ax.set_xticklabels(["fine","bad"])
        plt.title("Confusion Matrix")
        plt.show()

In [None]:
# The convolutional neural network (CNN) algorithm is used to classify. Set up four convolution layers.
#The code in this part is referred to reference [2]and [3].
class CNN(torch.nn.Module):
    def __init__(self):
        super(CNN,self).__init__()  
        self.conv1 = torch.nn.Sequential(
            torch.nn.Conv2d(in_channels=3,
                            out_channels=16,
                            kernel_size=3,
                            stride=2,
                            padding=1),
            torch.nn.BatchNorm2d(16),
            torch.nn.ReLU()
        )
        self.conv2 = torch.nn.Sequential(
            torch.nn.Conv2d(16,32,3,2,1),
            torch.nn.BatchNorm2d(32),
            torch.nn.ReLU()
        )
        self.conv3 = torch.nn.Sequential(
            torch.nn.Conv2d(32,64,3,2,1),
            torch.nn.BatchNorm2d(64),
            torch.nn.ReLU()
        )
        self.conv4 = torch.nn.Sequential(
            torch.nn.Conv2d(64,64,2,2,0),
            torch.nn.BatchNorm2d(64),
            torch.nn.ReLU()
        )
        self.mlp1 = torch.nn.Linear(256,100)
        self.mlp2 = torch.nn.Linear(100,2)
    def forward(self, x):
        x = self.conv1(x)
        x = self.conv2(x)
        x = self.conv3(x)
        x = self.conv4(x)
        x = self.mlp1(x.view(x.size(0),-1))
        x = self.mlp2(x)
        return x
model = CNN()
print(model)

In [None]:
# Hyperparameter, loss function and model training.
num_epochs = 20
batch_size = 10
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
train_error, val_error, exe_time, epoch = train(num_epochs, batch_size, criterion, optimizer, model, 
                                               train_dataset=train_set, val_dataset=val_set)

In [None]:
#Plot error as epoche increases. Use different colored lines to display the training error and the Validation error respectively.
plt.figure()
plt.plot(range(1, epoch+1), train_error, label='train error')
plt.plot(range(1, epoch+1), val_error, label='valida error')
plt.legend()
plt.xticks(range(1, epoch+1), rotation=90)
plt.show()
plt.title('Errors with 20 Epochs')

In [None]:
print("Evaluation for Training_set: ")
f1_train, precision_train, recall_train = evaluation(model, train_set, 10, 20)
print("Evaluation for Testing_set: ")
f1_test, precision_test, recall_test = evaluation(model, test_set, 10, 20)