## Download the data from Kaggle

In [None]:
!rm -rf real_and_fake_face real_and_fake_face_detection processed

In [None]:
from google.colab import drive
from subprocess import Popen, PIPE
# makes files from your drive accessible
drive.mount('/content/drive', force_remount=True)

# TODO - specify path to your API key via google drive
api_key_filepath = "/content/drive/MyDrive/MDST/RvF/kaggle.json"


# Kaggle API Key setup ------------------
cmd = "mkdir /root/.kaggle"
process = Popen(cmd.split(), stdout=PIPE, stderr=PIPE)
stdout, stderr = process.communicate()
print(stdout.decode("utf-8"), stderr.decode("utf-8"))
cmd = f"cp -f {api_key_filepath} /root/.kaggle/"
process = Popen(cmd.split(), stdout=PIPE, stderr=PIPE)
stdout, stderr = process.communicate()
print(stdout.decode("utf-8"), stderr.decode("utf-8"))
cmd = f"chmod 600 /root/.kaggle/kaggle.json"
process = Popen(cmd.split(), stdout=PIPE, stderr=PIPE)
print(stdout.decode("utf-8"), stderr.decode("utf-8"))
# ------------------------------
!kaggle datasets download -d ciplab/real-and-fake-face-detection
!unzip -q real-and-fake-face-detection.zip

Mounted at /content/drive
 mkdir: cannot create directory ‘/root/.kaggle’: File exists

 
 
real-and-fake-face-detection.zip: Skipping, found more recently modified local copy (use --force to force download)


# Setup PyTorch Data Loading

The code in the next cell can be copied into your notebook to load the downloaded data correctly. It does two things:
- processes the dataset into a train and test set
- creates data loaders for the training and testing data

Don't worry about the details, but if you're on the dataset team, you'll want to read carefully through this part to understand how the code works (since you'll be editing this to make your own version of the dataset!)

In [None]:
from imageio.v3 import imread
import pandas as pd
from pathlib import Path
from random import random
from shutil import copy
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision.transforms import functional

class RealAndFakeFaceProcessor:
    def __init__(self, directory, train_test_split = 0.7) -> None:
        self.train_test_split = train_test_split

        self.train_index = 0
        self.test_index = 0

        self.src_directory = Path(directory)
        self.directory = self.src_directory.parent / "processed"

        self.tgt_train = self.directory / "train"
        self.tgt_train.mkdir(parents=True, exist_ok=True)

        self.tgt_test = self.directory / "test"
        self.tgt_test.mkdir(parents=True, exist_ok=True)

        self.index_by_type = {"index":[], "partition": [], "type": [], "label": []}
        self.__process("training_fake","easy")
        self.__process("training_fake","mid")
        self.__process("training_fake","hard")
        self.__process("training_real","real")

        df = pd.DataFrame(self.index_by_type)
        df.to_csv(self.directory / "images.csv", index=False)


    def __add_image_to_record(self, index: int, partition: str, type: str, label: int):
        """Real - label is 1, Fake - label is 0"""
        self.index_by_type["index"].append(index)
        self.index_by_type["partition"].append(partition)
        self.index_by_type["type"].append(type)
        self.index_by_type["label"].append(label)


    def __process(self, subdir: str, type: str) -> None:
        src = self.src_directory / subdir
        label = 1 if type == "real" else 0

        for image in src.iterdir():
            if image.name.startswith(type):
                random_number = random()
                if random() > self.train_test_split:
                    copy(image.absolute(), self.tgt_test / f"{self.test_index}.png")
                    self.__add_image_to_record(self.test_index, "test", type, label)
                    self.test_index += 1
                else:
                    copy(image.absolute(), self.tgt_train / f"{self.train_index}.png")
                    self.__add_image_to_record(self.train_index, "train", type, label)
                    self.train_index += 1

class RealAndFakeFaceDataset(Dataset):
    def __init__(
        self,
        directory: str,
        partition: str ="train"
    ) -> 'RealAndFakeFaceDataset':
        self.partition = partition
        if partition not in ("train", "test"):
            raise ValueError(f"Invalid partition specified - {partition}")
        self.directory = Path(directory)
        self.img_directory = self.directory / partition
        metadata = pd.read_csv(self.directory / "images.csv")
        self.metadata = metadata[metadata["partition"] == self.partition]

    def __len__(self) -> int:
        return len(self.metadata)

    def __getitem__(self, index: int) -> tuple[torch.tensor, int]:
        filename = self.img_directory / f"{index}.png"
        label = self.metadata.iloc[index]["label"]

        image = torch.from_numpy(imread(filename))
        image = image.to(torch.float32)
        image = image.permute((2,0,1))
        image = functional.resize(image, (224, 224), antialias=True)
        image /= 255.0

        image_mean = [.485, .456, .406]
        image_std = [.229, .224, .225]

        preprocess = transforms.Compose([
            transforms.Normalize(mean = image_mean, std = image_std),
            transforms.RandomCrop((200,200)),
            transforms.RandomRotation((-15,15), expand=False)
        ])
        image = preprocess(image)

        return image, label

    def get_type(self, index) -> str:
        return self.metadata.iloc[index]["type"]

processor = RealAndFakeFaceProcessor("real_and_fake_face") # Call this to process the dataset into a train and test set
train = RealAndFakeFaceDataset("processed", "train")
test = RealAndFakeFaceDataset("processed", "test")

train_loader = DataLoader(train, batch_size = 32, shuffle=True)
test_loader = DataLoader(test, batch_size = 32)

In [None]:
from tqdm import tqdm
import torch

from torchvision import models, transforms, datasets
import torch

source_model = models.vgg16(weights=models.VGG16_Weights.DEFAULT)
source_model = source_model.cuda()
feats_list = list(source_model.features)
new_feats_list = []

# modify convolution layers
# source_model.features = nn.Sequential(*new_feats_list)


In [None]:
count = 0
for param in source_model.parameters():
    count += param.numel()
print(f"VGG-16 has {count} parameters")

VGG-16 has 138357544 parameters


In [None]:
print(source_model)

VGG(
  (features): Sequential(
    (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU(inplace=True)
    (2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (3): ReLU(inplace=True)
    (4): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (5): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (6): ReLU(inplace=True)
    (7): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (8): ReLU(inplace=True)
    (9): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (10): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (11): ReLU(inplace=True)
    (12): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (13): ReLU(inplace=True)
    (14): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (15): ReLU(inplace=True)
    (16): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1

In [None]:
source_model.classifier = torch.nn.Sequential(
    torch.nn.Linear(25088, 1024),
    torch.nn.ReLU(),
    torch.nn.Dropout(0.7),
    torch.nn.Linear(1024, 128),
    torch.nn.ReLU(),
    torch.nn.Dropout(0.7),
    torch.nn.Linear(128, 2)
)

In [None]:
print(source_model)

VGG(
  (features): Sequential(
    (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU(inplace=True)
    (2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (3): ReLU(inplace=True)
    (4): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (5): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (6): ReLU(inplace=True)
    (7): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (8): ReLU(inplace=True)
    (9): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (10): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (11): ReLU(inplace=True)
    (12): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (13): ReLU(inplace=True)
    (14): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (15): ReLU(inplace=True)
    (16): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1

In [None]:
for parameter in source_model.features.parameters():
    parameter.requires_grad = False

In [None]:
from typing import Callable
from torch import nn
from torch.utils.data import DataLoader

def evaluate(model: nn.Module, criterion: Callable, loader: DataLoader, device='cuda') -> tuple[float]:
    model.eval()
    with torch.no_grad():
        correct, total = 0,0
        loss = 0.0
        for i, (X, y) in enumerate(loader):
            outputs = model(X.to(device)).to('cpu')
            loss += criterion(outputs, y).item()
            _, predicted = torch.max(outputs.data, 1) # get predicted class
            total += len(y)
            correct += (predicted == y).sum().item()
    model.train()
    return correct / total, loss / total

In [None]:
from tqdm import tqdm

device ='cuda' if torch.cuda.is_available() else 'cpu' # automatically use gpu if available
epochs = 30  # Change Number of epochs
train_losses, train_accuracies = [], []
test_losses, test_accuracies = [], []
model =  source_model.to(device)

criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(params=source_model.parameters(), lr=1e-3)

for epoch in range(epochs):
    source_model.train()
    for i, (X, y) in enumerate(tqdm(train_loader)):
        X, y = X.to(device), y.to(device)
        optimizer.zero_grad()
        outputs = source_model(X)
        loss = criterion(outputs, y)
        loss.backward()
        optimizer.step()

    train_accuracy, train_loss = evaluate(source_model, criterion, train_loader, device)
    train_losses.append(train_loss)
    train_accuracies.append(train_accuracy)

    test_accuracy, test_loss = evaluate(source_model, criterion, test_loader, device)
    test_losses.append(test_loss)
    test_accuracies.append(test_accuracy)

    print(
        f"Epoch {epoch + 1}: Loss - (Train {train_loss:.5f}/Test {test_loss:.2f}, "
        f"Accuracy - (Train {train_accuracy:.5f}/Test {test_accuracy:.2f})"
    )

100%|██████████| 46/46 [00:22<00:00,  2.07it/s]


Epoch 1: Loss - (Train 0.02171/Test 0.02, Accuracy - (Train 0.52675/Test 0.54)


100%|██████████| 46/46 [00:22<00:00,  2.04it/s]


Epoch 2: Loss - (Train 0.02171/Test 0.02, Accuracy - (Train 0.59602/Test 0.63)


100%|██████████| 46/46 [00:21<00:00,  2.14it/s]


Epoch 3: Loss - (Train 0.02155/Test 0.02, Accuracy - (Train 0.61043/Test 0.58)


100%|██████████| 46/46 [00:22<00:00,  2.02it/s]


Epoch 4: Loss - (Train 0.02152/Test 0.02, Accuracy - (Train 0.60974/Test 0.59)


100%|██████████| 46/46 [00:23<00:00,  1.92it/s]


Epoch 5: Loss - (Train 0.02131/Test 0.02, Accuracy - (Train 0.63855/Test 0.60)


100%|██████████| 46/46 [00:23<00:00,  1.98it/s]


Epoch 6: Loss - (Train 0.02114/Test 0.02, Accuracy - (Train 0.56996/Test 0.55)


100%|██████████| 46/46 [00:22<00:00,  2.08it/s]


Epoch 7: Loss - (Train 0.02111/Test 0.02, Accuracy - (Train 0.64746/Test 0.62)


100%|██████████| 46/46 [00:22<00:00,  2.00it/s]


Epoch 8: Loss - (Train 0.02061/Test 0.02, Accuracy - (Train 0.61180/Test 0.60)


100%|██████████| 46/46 [00:22<00:00,  2.07it/s]


Epoch 9: Loss - (Train 0.02084/Test 0.02, Accuracy - (Train 0.63786/Test 0.63)


100%|██████████| 46/46 [00:23<00:00,  1.98it/s]


Epoch 10: Loss - (Train 0.02057/Test 0.02, Accuracy - (Train 0.59122/Test 0.59)


100%|██████████| 46/46 [00:22<00:00,  2.03it/s]


Epoch 11: Loss - (Train 0.02081/Test 0.02, Accuracy - (Train 0.63374/Test 0.62)


100%|██████████| 46/46 [00:22<00:00,  2.01it/s]
