# Testing Code

In [2]:
import os 
import json 
from PIL import Image 
import matplotlib.pyplot as plt 
import numpy as np 
import random 
import shutil 

import torch 
import torch.nn as nn 
import torch.nn.functional as F 
from torch import optim 
import torchvision 
import torchmetrics
from torchvision import models 
from torchvision import transforms 
from torch.utils.data import Dataset 
from torch.utils.data import dataloader 

In [3]:
# define training and testing dataset class
class DFD_dataset(Dataset):
    def __init__(self, img_path_list, true_class_list, transforms=None):
        imgs = []
        for i in range(len(img_path_list)):
            imgs.append((img_path_list[i], true_class_list[i]))
        
        self.imgs = imgs
        self.transforms = transforms

    def __len__(self):
        return len(self.imgs)

    def __getitem__(self, index):
        img_path, label = self.imgs[index]
        img = Image.open(img_path)
        if self.transforms:
            img = self.transforms(img)
        return img, label


In [None]:
# define transforms
test_tranform = transforms.Compose([
    transforms.Resize(299),
    transforms.CenterCrop(299),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

In [None]:
def test(img_path_list, true_class_list):
    # init dataloader
    test_data = DFD_dataset(img_path_list=img_path_list, true_class_list=true_class_list, transforms=test_tranform)
    for i in range(len(test_data)):
        img_index.append(test_data.imgs[i][0])
    
    testloader = dataloader.DataLoader(test_data, batch_size=64, shuffle=False)
    
    # load model
    # please keep saved model file in the same directory with this .ipynb file
    device = torch.device('cpu')
    network = models.inception_v3(pretrained = True)
    num_fc_in = network.fc.in_features
    network.fc = nn.Linear(num_fc_in, 2)
    network.load_state_dict(torch.load('./best.pt', map_location=device))

    # calculate accuracies
    network.eval()
    acc = 0
    recall = 0
    precision = 0
    TN = 0
    FN = 0
    TP = 0 
    FP = 0
    _d = 0
    with torch.no_grad():
        for data in testloader:
            images, labels = data
            outputs = network(images)
            for index,label in enumerate(labels.numpy()):
                if label:
                    if outputs.argmax(1).numpy()[index]:
                        TP+=1
                    else:
                        FN+=1
                else:
                    if outputs.argmax(1).numpy()[index]:
                        FP+=1
                    else:
                        TN+=1
    acc = (TP + TN) / (TP + TN + FN + FP)
    recall = (TP) / (TP + FN)
    precision = (TP) / (TP + FP)   
    return acc, recall, precision


In [None]:
# Main Code
img_path_list = []
true_class_list = []
    
acc, recall, precision = test(img_path_list, true_class_list)
print(acc)
print(recall)
print(precision)

# Training Code

In [None]:
# necessary dependencies
import os 
from PIL import Image 
import matplotlib.pyplot as plt 
import numpy as np 
import random 
import shutil 

import torch 
import torch.nn as nn 
import torch.nn.functional as F 
from torch import optim 
from torch.optim import lr_scheduler 
import torchvision 
from torchvision import models 
from torchvision import transforms 
from torch.utils.data import Dataset 
from torch.utils.data import dataloader 
from torch.utils.tensorboard import SummaryWriter 

- InceptionV3 82 Random

In [None]:
# need to keep this .ipynb file in the same directory as the images folder
# divede the images provided into training and validation set (8:2)
# create directories
current_path = os.getcwd()
image_path = current_path + '/images/'

train_path = current_path + '/train/'
val_path = current_path + '/val/'

train_fake_path = train_path + 'fake/'
train_real_path = train_path + 'real/'

val_fake_path = val_path + 'fake/'
val_real_path = val_path + 'real/'

if not os.path.exists(train_path):
    os.makedirs(train_path)
if not os.path.exists(val_path):
    os.makedirs(val_path)
    
if not os.path.exists(train_fake_path):
    os.makedirs(train_fake_path)
if not os.path.exists(train_real_path):
    os.makedirs(train_real_path)

if not os.path.exists(val_fake_path):
    os.makedirs(val_fake_path)
if not os.path.exists(val_real_path):
    os.makedirs(val_real_path)

# distribute 12000 images into different folders
train_fake_num = 0
train_real_num = 0

val_fake_num = 0
val_real_num = 0

test_fake_num = 0
test_real_num = 0

# loop through images folder
for rootpath, dirnames, filenames in os.walk(image_path):
    for dirname in dirnames:
        if dirname == 'fake_deepfake':
            # generate 800 random number in the range [0, 3999] to represent those go to val
            # force pseudorandom split
            random.seed(4487)
            val_index = random.sample(range(0, 4000), 800)
            # directory full path
            image_folder = rootpath + dirname + '/'
            # loop all images in fake_deepfake folder
            imgfiles = os.listdir(image_folder)
            for imgfile in imgfiles:
                srcpath = image_folder + imgfile
                index = int(imgfile.split('.')[0])
                if index in val_index:
                    newname = str(val_fake_num) + '.png'
                    dstpath = val_fake_path + newname
                    shutil.copyfile(srcpath, dstpath)
                    val_fake_num += 1
                else:
                    newname = str(train_fake_num) + '.png'
                    dstpath = train_fake_path + newname
                    shutil.copyfile(srcpath, dstpath)
                    train_fake_num += 1
            print('done')
        elif dirname == 'fake_face2face':
            # generate 800 random number in the range [0, 3999] to represent those go to val
            # force pseudorandom split
            random.seed(4486)
            val_index = random.sample(range(0, 4000), 800)
            # directory full path
            image_folder = rootpath + dirname + '/'
            # loop all images in fake_face2face folder
            imgfiles = os.listdir(image_folder)
            for imgfile in imgfiles:
                srcpath = image_folder + imgfile
                index = int(imgfile.split('.')[0])
                if index in val_index:
                    newname = str(val_fake_num) + '.png'
                    dstpath = val_fake_path + newname
                    shutil.copyfile(srcpath, dstpath)
                    val_fake_num += 1
                else:
                    newname = str(train_fake_num) + '.png'
                    dstpath = train_fake_path + newname
                    shutil.copyfile(srcpath, dstpath)
                    train_fake_num += 1
            print('done')
        elif dirname == 'real':
            # generate 800 random number in the range [0, 3999] to represent those go to val
            # force pseudorandom split
            random.seed(4485)
            val_index = random.sample(range(0, 4000), 800)
            # directory full path
            image_folder = rootpath + dirname + '/'
            # loop all images in real folder
            imgfiles = os.listdir(image_folder)
            for imgfile in imgfiles:
                srcpath = image_folder + imgfile
                index = int(imgfile.split('.')[0])
                if index in val_index:
                    newname = str(val_real_num) + '.png'
                    dstpath = val_real_path + newname
                    shutil.copyfile(srcpath, dstpath)
                    val_real_num += 1
                else:
                    newname = str(train_real_num) + '.png'
                    dstpath = train_real_path + newname
                    shutil.copyfile(srcpath, dstpath)
                    train_real_num += 1
            print('done')


In [None]:
# define training and validation dataset class
class DFD_dataset(Dataset):
    def __init__(self, img_path, transforms=None):
        self.transforms = transforms
        self.img_path = img_path
        self.real_dir = img_path + 'real'
        self.fake_dir = img_path + 'fake'
        self.real_num = len(os.listdir(self.real_dir))
        self.fake_num = len(os.listdir(self.fake_dir))
        
    def __len__(self):
        return self.real_num + self.fake_num

    def __getitem__(self, index):
        if index < self.real_num:
            label = 1
            img = Image.open(self.real_dir + '/' + str(index) + '.png')
        else:
            label = 0
            img = Image.open(self.fake_dir + '/' + str(index - self.real_num) + '.png')

        if self.transforms:
            img = self.transforms(img)

        return img, label


In [None]:
# define img transformers and create dataLoaders
train_tranform = transforms.Compose([
    transforms.Resize(299),
    transforms.CenterCrop(299),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomRotation(degrees=10, expand=False, fill=None),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])
val_tranform = transforms.Compose([
    transforms.Resize(299),
    transforms.CenterCrop(299),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

train_data = DFD_dataset(train_path, transforms=train_tranform)
trainloader = dataloader.DataLoader(train_data, batch_size=60, shuffle=True)

val_data = DFD_dataset(val_path, transforms=val_tranform)
valloader = dataloader.DataLoader(val_data, batch_size=60, shuffle=True)

In [None]:
# if cuda is available, use GPU to accelerate training process
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# init network (pre-trained on ImageNet)
network = models.inception_v3(pretrained=True)
# input channels to fc
num_fc_in = network.fc.in_features
# change out features to 2 (fit our binary classification task)
network.fc = nn.Linear(num_fc_in, 2)
network = network.to(device)

In [None]:
# define loss function
criterion = nn.CrossEntropyLoss()
# set different learning rate for revised fc layer and previous layers
# add weight decay (L2 Regularization)
lr = 0.008 / 10
fc_params = list(map(id, network.fc.parameters()))
base_params = filter(lambda p: id(p) not in fc_params, network.parameters())
optimizer = optim.Adam([
            {'params': base_params},
            {'params': network.fc.parameters(), 'lr': lr * 10}],
            lr=lr, betas=(0.9, 0.999), eps=1e-08, weight_decay=1e-4)
# learning rate decay function
scheduler = lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.5)

In [None]:
# define test function to calculate both training and val accuracy
def test(network, loader, optimizer):
    network.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for data in loader:
            images, labels = data
            images = images.to(device)
            labels = labels.to(device)
            optimizer.zero_grad()
            outputs = network(images)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    
    return round((float(100) * float(correct) / float(total)), 4)


In [None]:
# define this function to save state_dict of each epoch
def save_local(network, rootpath, epoch):
    path = rootpath + '/' + str(epoch) + '.pt'
    torch.save(network.state_dict(), path)

In [None]:
# define function to set random seed for each epoch
def set_seed(seed):
    torch.backends.cudnn.benchmark = False
    torch.backends.cudnn.deterministic = True
    random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    np.random.seed(seed)

In [None]:
# training process
epoch_num = 20
# data recorders
training_loss = []
train_acc = []
val_acc = []
test_acc = []
# change accordingly
model_root_path = current_path + '/model_res/'
networkInfo = 'Group02_InceptionV3'
model_path = model_root_path + networkInfo
# make dirs
if not os.path.exists(model_root_path):
    os.makedirs(model_root_path)
if not os.path.exists(model_path):
    os.makedirs(model_path)

# force pseudorandom to generate 20 random seeds for reproduce
random.seed(88)
seeds = random.sample(range(0, 88), 20)

# begin training
for epoch in range(epoch_num):
    network.train()
    running_loss = 0.0
    # set random seed for current epoch
    set_seed(seeds[epoch])
    for i, data in enumerate(trainloader):
        # get the inputs
        inputs, labels = data
        inputs = inputs.to(device)
        labels = labels.to(device)
        # zero the parameter gradients
        optimizer.zero_grad()
        # forward + backward + optimize
        outputs, x = network(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()

    # calculate loss and accuracy
    epoch_loss = running_loss / len(trainloader)
    training_loss.append(epoch_loss)
    train_acc.append(test(network, trainloader, optimizer))
    val_acc.append(test(network, valloader, optimizer))
    # whether to save current model
    if epoch > 4:
        save_local(network, model_path, epoch)
    # print result of current epoch
    print('Train Epoch: {}\t Loss: {:.6f}'.format(epoch, epoch_loss))
    # step forward the scheduler function
    scheduler.step()

# end training
print('Finished Training')

In [None]:
from scipy.interpolate import make_interp_spline
# plot the traing loss v.s. epoch number

# create dirs
plot_root_path = current_path + '/viz/'
plot_path = plot_root_path + networkInfo
if not os.path.exists(plot_root_path):
    os.makedirs(plot_root_path)
if not os.path.exists(plot_path):
    os.makedirs(plot_path)

# plot
epoch = list(range(20))
training_avg = []
cnt = 0.0
counter = 0
for i in training_loss:
    counter += 1
    cnt += i
    training_avg.append(cnt / counter)
plt.xlabel('Epoch')
plt.ylabel('Training Loss')
plt.title('Training Loss for ' + networkInfo)
plt.scatter(epoch, training_loss, c='b', marker='o', s=25)
epoch_np = np.array(epoch)
avg_np = np.array(training_avg)
epoch_new = np.linspace(epoch_np.min(), epoch_np.max(), 300)
avg_smooth = make_interp_spline(epoch_np, avg_np)(epoch_new)
plt.plot(epoch_new, avg_smooth, c='r')
my_x_ticks = np.arange(0, 20, 1)
plt.xticks(my_x_ticks)
# save plot
plt.savefig(plot_path + '/' + 'training_loss.png', bbox_inches='tight', dpi=300)


In [None]:
# plot the accuracies v.s. epoch number
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.title('Train / Val Accuracy for ' + networkInfo)
plt.plot(epoch, train_acc, marker='o', markersize=5, label='train')
plt.plot(epoch, val_acc, c='orange', marker='o', markersize=5, label='val')
plt.plot(epoch, test_acc, c='red', marker='o', markersize=5, label='test')
my_x_ticks = np.arange(0, 20, 1)
plt.xticks(my_x_ticks)
plt.legend(loc='lower right')
# save plot
plt.savefig(plot_path + '/' + 'accuracy.png', bbox_inches='tight', dpi=300)


In [None]:
# print the maximum val_acc and corresponding num_of_epoch FYI
print('The maximum val accuracy is: ', max(val_acc))
print('It occurs when the number of training epoch equals to: ', val_acc.index(max(val_acc)))
# save numpy array for possible use in the future
numpy_root_path = current_path + '/numpy/'
numpy_path = numpy_root_path + networkInfo
if not os.path.exists(numpy_root_path):
    os.makedirs(numpy_root_path)
if not os.path.exists(numpy_path):
    os.makedirs(numpy_path)

np.save(numpy_path + '/' + 'training_loss.npy', np.array(training_loss))
np.save(numpy_path + '/' + 'train_acc.npy', np.array(train_acc))
np.save(numpy_path + '/' + 'val_acc.npy', np.array(val_acc))