In [29]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import torch
import matplotlib.pyplot as plt
from torch.utils.data import DataLoader, Dataset, random_split
import torch.nn as nn
import copy
import torch
from torchvision import transforms
from torch.optim import lr_scheduler
import os
import numpy as np
from skimage import io
import os
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)


cuda:0


# Data preparation

In [30]:
class FaceDataset(Dataset):

    def __init__(self, image_dir, transform=None):
        
        """Function to load images into Tensor
            Args: 
                - image_dir : directory of images
                - Return : a dictonary with images and labels
                """
        self.image_dir = image_dir
        self.image_dict = self.load_image()
        self.transform = transform


    def __len__(self) :
        return len(self.image_dict["label"])


    def __getitem__(self, index) :
        
        
        path = torch.from_numpy(io.imread(self.image_dict["img_dir"][index],
                                         as_gray=True).astype(np.float32)).unsqueeze(0)
        label = self.image_dict["label_bin"][index]

        if self.transform:
            path = self.transform(path)
        
        return path, label


    def load_image(self) :
        img_dict = {"img_dir" : [], "label" : [], 'label_bin':[]}
        for root, dirs, files in os.walk(self.image_dir):
            for img in files:
                img_dict["img_dir"].append(os.path.join(root, img))
                
                img_dict["label"].append(img[:4])
                if img[:4] != 'real':
                    img_dict["label_bin"].append(0.0)
                else:
                    img_dict["label_bin"].append(1.0)
        img_dict["label_bin"] = torch.tensor(img_dict["label_bin"],dtype=torch.float32)
        return img_dict

In [31]:
def split_data(datas, train_size=0.8):
    
    """ Function to split data in training, valid and testing
        Args:   data torch Dataset
                train_size the training data size """
    training_size = int(train_size*len(datas))
    train_data, test_data = random_split(datas, [training_size, len(datas)-training_size])
    valid_size = int(0.1*len(train_data))
    train_data, valid_data = random_split(train_data, [len(train_data)-valid_size,
                                                   valid_size])
    return train_data, valid_data, test_data

In [32]:
data = FaceDataset("/data")

In [None]:
tf = transforms.Compose([#transforms.Resize((30,30)),
    #transforms.Grayscale(num_output_channels=1),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(20),
    #transforms.ToTensor()
    ])
dataset = FaceDataset("data", transform=tf)

In [None]:
train_data, valid_data, test_data = split_data(data)
train_data_loader = DataLoader(dataset= train_data,batch_size=10, shuffle=True, 
                               drop_last=True)
valid_data_loader = DataLoader(dataset= valid_data,batch_size=10, shuffle=True, 
                               drop_last=True)
test_data_loader = DataLoader(dataset=test_data, batch_size=10, shuffle=True, 
                              drop_last=True)

In [52]:
data_aug = torch.utils.data.ConcatDataset([train_data, dataset])

In [53]:
train_aug_loader = DataLoader(dataset= aug_train,batch_size=32, shuffle=True, 
                               drop_last=True)
#valid_aug_loader = DataLoader(dataset= aug_valid,batch_size=10, shuffle=True, 
                               #drop_last=True)
#test_aug_loader = DataLoader(dataset=aug_test, batch_size=10, shuffle=True, 
                              #drop_last=True)

In [47]:
next(iter(train_data_loader))[0].shape

torch.Size([10, 1, 600, 600])

In [54]:
class MLPClassif(nn.Module):
    
    def __init__(self, input_size, hidden_size, output_size):
        super(MLPClassif, self).__init__()
        self.hidden1 = nn.Linear(input_size, hidden_size)
        self.hidden2 = nn.Linear(hidden_size, hidden_size)
        self.hidden3 = nn.Linear(hidden_size, hidden_size)
        #self.hidden4 = nn.Linear(hidden_size, hidden_size)
        self.out_layer = nn.Linear(hidden_size, output_size)
        self.relu = nn.ReLU()
        self.batchnorm = nn.BatchNorm1d(hidden_size, affine=False)

    
    def forward(self, x):

        #sigmoid = nn.Sigmoid()
        dropout = nn.Dropout(p=0.1)
        x = self.hidden1(x)
        x = self.relu(x)
        x = dropout(x)
        x = self.batchnorm(x)
        x = self.hidden2(x)
        x = self.relu(x)
        #x = dropout(x)
        x = self.batchnorm(x)
        x = self.hidden3(x)
        x = self.relu(x)
        #x = dropout(x)
        #x = self.hidden4(x)
        #x = self.relu(x)
        x = self.batchnorm(x)
        #x = dropout(x)
        out = self.out_layer(x)
        return out

In [55]:
model = MLPClassif(30*30, 100, 1)
def init_weights(m):
    if isinstance(m, nn.Linear):
        torch.nn.init.xavier_uniform_(m.weight.data)
        m.bias.data.fill_(0.01)
        

#torch.manual_seed(0)
model.apply(init_weights)

MLPClassif(
  (hidden1): Linear(in_features=900, out_features=100, bias=True)
  (hidden2): Linear(in_features=100, out_features=100, bias=True)
  (hidden3): Linear(in_features=100, out_features=100, bias=True)
  (out_layer): Linear(in_features=100, out_features=1, bias=True)
  (relu): ReLU()
  (batchnorm): BatchNorm1d(100, eps=1e-05, momentum=0.1, affine=False, track_running_stats=True)
)

In [56]:
def accuracy_fn(y_true, y_pred):
    correct = torch.eq(y_true, y_pred).sum().item()
    acc = (correct / len(y_pred)) * 100 
    return acc


def eval_binary_classifier(model, eval_dataloader, loss_fn):
    sigmoid = nn.Sigmoid()
    model.eval() 
    #model.to(device)
    transform = transforms.Resize(size = (30,30))
    # In evaluation phase, we don't need to compute gradients (for memory efficiency)
    with torch.no_grad():
        # initialize the total and correct number of labels to compute the accuracy
        loss, accuracy = 0, 0
        # Iterate over the dataset using the dataloader
        for images, labels in eval_dataloader:
            #images = torch.tensor(images, dtype=float)
            images = transform(images)
            
            #labels.to(device)
            images = images.reshape(images.shape[0], -1).to(device)
            #images.to(device)
            #print(images.shape)
            # Get the predicted labels
            y_predicted = model(images)

            l =loss_fn(y_predicted, labels.unsqueeze(1).to(device))
            loss += l.item()
            accuracy += accuracy_fn(labels.to(device), torch.round(sigmoid(y_predicted)).squeeze(1))
        accuracy = accuracy/len(eval_dataloader)

    return loss, accuracy

def train_val_binary_classifier(model, train_dataloader, valid_dataloader, num_epochs, 
                            loss_fn, learning_rate, verbose=True):

    # Make a copy of the model (avoid changing the model outside this function)
    model_tr = copy.deepcopy(model)
    model_tr = model_tr.cuda()
    
    # Set the model in 'training' mode (ensures all parameters' gradients are computed - it's like setting 'requires_grad=True' for all parameters)
    model_tr.train()
    #optimizer = #torch.optim.Adam(model_tr.parameters(), lr=learning_rate)
    optimizer =torch.optim.SGD(model_tr.parameters(), lr=learning_rate)
    scheduler = lr_scheduler.ExponentialLR(optimizer, gamma=0.9)
    # Initialize a list to record the training loss over epochs
    loss_all_epochs = []
    valid_loss = []
    acuracy = 0
    transform = transforms.Resize(size = (30,30))
    # Training loop

    for epoch in range(num_epochs):
        # Initialize the training loss for the current epoch
        loss_current_epoch = 0
        
        # Iterate over batches using the dataloader
        for batch_index, (images, labels) in enumerate(train_dataloader):
            images = transform(images)
            images = images.reshape(images.shape[0], -1)
            
            
            y_pred = model_tr(images.to(device))
            #print(f"size of ypred {y_pred}")
            # print(f"size of labels {labels.unsqueeze(1).shape}")
            l = loss_fn(y_pred, labels.unsqueeze(1).to(device))

            
            optimizer.zero_grad()
            l.backward()
            optimizer.step()
            loss_current_epoch +=  l.item()
            #accuracy += accuracy_fn(labels, torch.round(sigmoid(y_pred)).squeeze(1))


        scheduler.step()
        loss_all_epochs.append(loss_current_epoch)
        val_loss, accuracy = eval_binary_classifier(model_tr, 
                                                 valid_dataloader,
                                                 loss_fn)
        valid_loss.append(val_loss)

                                    
        if verbose:
            print(f"\nEpoch [{epoch+1}/{num_epochs}]"
                  f"\nTrain loss: {loss_current_epoch:.5f} | "
                  f"Test loss: {val_loss:.5f} | Test acc: {accuracy:.2f}%\n")
            

            
        
    return model_tr, loss_all_epochs, valid_loss

In [57]:
#model = MLPClassif(30*30, 100, 1)
num_epochs = 50
learning_rate = 0.081
loss_fn = nn.BCEWithLogitsLoss()
model = model.cuda()
print(next(model.parameters()).is_cuda)
model_trained, train_losses, val_losses =train_val_binary_classifier(model, 
                                                                     train_aug_loader,
                                                                     valid_data_loader,
                                                                     num_epochs,loss_fn, 
                                                                     learning_rate, 
                                                                     verbose=True)

True

Epoch [1/50]
Train loss: 66.95740 | Test loss: 26.83195 | Test acc: 55.62%


Epoch [2/50]
Train loss: 75.09399 | Test loss: 24.53105 | Test acc: 50.31%


Epoch [3/50]
Train loss: 63.31080 | Test loss: 24.72954 | Test acc: 50.62%


Epoch [4/50]
Train loss: 62.07545 | Test loss: 21.45687 | Test acc: 58.12%


Epoch [5/50]
Train loss: 61.24953 | Test loss: 20.94338 | Test acc: 61.88%


Epoch [6/50]
Train loss: 60.89502 | Test loss: 21.23224 | Test acc: 60.31%


Epoch [7/50]
Train loss: 59.99446 | Test loss: 21.58044 | Test acc: 58.12%


Epoch [8/50]
Train loss: 59.06878 | Test loss: 20.34067 | Test acc: 67.19%


Epoch [9/50]
Train loss: 58.31242 | Test loss: 20.88385 | Test acc: 61.56%


Epoch [10/50]
Train loss: 57.38644 | Test loss: 20.12618 | Test acc: 65.62%


Epoch [11/50]
Train loss: 57.24799 | Test loss: 19.90862 | Test acc: 65.00%


Epoch [12/50]
Train loss: 55.62573 | Test loss: 20.61035 | Test acc: 63.12%


Epoch [13/50]
Train loss: 55.51096 | Test loss: 20.71474 | Test acc

In [None]:
class CNN(nn.Module):
    def __init__(self, num_channels1=16, num_channels2=32, num_classes=1):
        super(CNN, self).__init__()
        self.conv_block1 = nn.Sequential(nn.Conv2d(in_channels=1, out_channels= num_channels1, kernel_size=5, padding=2),
                           nn.ReLU(),
                           #nn.BatchNorm2d(num_channels1, affine=False),
                           nn.MaxPool2d(kernel_size=2))
        
        self.conv_block2 = nn.Sequential(nn.Conv2d(num_channels1, num_channels2, kernel_size=5, padding=2),
                           nn.ReLU(),
                           #nn.BatchNorm2d(num_channels2,affine=False),
                           nn.MaxPool2d(kernel_size=2))
        self.fc = nn.Linear(32*7*7, num_classes)

    def forward(self, x):
        x = self.conv_block1(x)
        x = self.conv_block2(x)
        out = self.fc(x.view(-1, 32*7*7))
        return out

In [None]:
cnn_model = CNN()
cnn_model = cnn_model.cuda()
print(next(cnn_model.parameters()).is_cuda)

In [None]:
def training_cnn_classifier(model, train_dataloader, num_epochs,
                            loss_fn, learning_rate, verbose=True):

    model_tr = copy.deepcopy(model)
    model_tr = model_tr.to(device)
    
    model_tr.train()
    optimizer = torch.optim.SGD(model_tr.parameters(), lr=learning_rate)
    
    # Initialize a list to record the training loss over epochs
    loss_all_epochs = []
    transform = transforms.Resize(size = (30,30))
    # Training loop
    for epoch in range(num_epochs):
        # Initialize the training loss for the current epoch
        loss_current_epoch = 0
        
        # Iterate over batches using the dataloader
        for batch_index, (images, labels) in enumerate(train_dataloader):
            images = transform(images).to(device)
            labels = labels.to(device)
            y_pred = model_tr(images)
            l = loss_fn(y_pred, labels)
            optimizer.zero_grad()
            l.backward()
            loss_current_epoch += l.item()
            optimizer.step() #update parameters

        # At the end of each epoch, record and display the loss over all batches
        loss_all_epochs.append(loss_current_epoch)
        if verbose:
            print('Epoch [{}/{}], Loss: {:.4f}'.format(epoch+1, num_epochs, loss_current_epoch))
        
    return model_tr, loss_all_epochs

def eval_cnn_classifier(model, eval_dataloader):
    sigmoid = nn.Sigmoid()
    # Set the model in evaluation mode
    model.eval() 

    # In test phase, we don't need to compute gradients (for memory efficiency)
    with torch.no_grad():
        # initialize the total and correct number of labels to compute the accuracy
        correct = 0
        total = 0
        for images, labels in eval_dataloader:
            images = transform_(images).to(device)
            labels = labels.to(device)
            y_predicted = model(images)
            label_predicted = torch.round(sigmoid(y_predicted)).squeeze(1) #y_preds = torch.round(torch.sigmoid(model_3(X_test))).squeeze()
            total += labels.size(0)
            correct += (label_predicted == labels).sum().item()
    
    accuracy = 100 * correct / total
    
    return accuracy

In [None]:
model_cnn = CNN()
model_cnn.to(device)
num_epochs = 30
learning_rate = 0.1
loss_fn = nn.BCEWithLogitsLoss()
transform_ = transforms.Resize(size = (30,30))
model_cnn.train()
    
    # Define the optimizer
optimizer = torch.optim.SGD(model_cnn.parameters(), lr=learning_rate)
    
    # Initialize a list to record the training loss over epochs
loss_all_epochs = []
    
    # Training loop
for epoch in range(num_epochs):
        # Initialize the training loss for the current epoch
    loss_current_epoch = 0
        
        # Iterate over batches using the dataloader
    for batch_index, (images, labels) in enumerate(train_data_loader):
        #print(images.shape)
        images = transform_(images).to(device)
        labels = labels.to(device)
        y_pred = model_cnn(images)
        # print(y_pred.shape)
        l = loss_fn(y_pred, labels.view(-1, 1))
        optimizer.zero_grad()
        l.backward()
        loss_current_epoch += l.item()
        optimizer.step() #update parameters

        # At the end of each epoch, record and display the loss over all batches
    loss_all_epochs.append(loss_current_epoch)
    
    print('Epoch [{}/{}], Loss: {:.4f}'.format(epoch+1, num_epochs, loss_current_epoch))

In [None]:
eval_cnn_classifier(model_cnn, valid_aug_loader)