In [1]:
import matplotlib.pyplot as plt
from torch.utils.data import DataLoader, Dataset, random_split, TensorDataset, ConcatDataset
import torch.nn as nn
import copy
import torch
from torchvision import transforms
from torchvision.datasets import ImageFolder
from torchvision.utils import make_grid
from utils import FaceDataset
import os
import numpy as np
from skimage import io
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

cpu


In [2]:
data = FaceDataset("data")
data.image_dict["label_bin"].mean()

tensor(0.5296)

In [3]:
train_size = int(0.9*len(data))
train_data, test_data = random_split(data, [train_size, len(data)-train_size])
valid_size = int(0.1*len(train_data))
train_data, valid_data = random_split(train_data, [len(train_data)-valid_size,
                                                   valid_size])


In [29]:
def split_data(datas, train_size=0.8):
    
    """ Function to split data in training, valid and testing
        Args:   data torch Dataset
                train_size the training data size """
    training_size = int(train_size*len(datas))
    train_data, test_data = random_split(datas, [training_size, len(datas)-training_size])
    valid_size = int(0.1*len(train_data))
    train_data, valid_data = random_split(train_data, [len(train_data)-valid_size,
                                                   valid_size])
    return train_data, valid_data, test_data

In [6]:
train_data_loader = DataLoader(dataset= train_data,batch_size=100, shuffle=True, 
                               drop_last=True)
valid_data_loader = DataLoader(dataset= valid_data,batch_size=10, shuffle=True, 
                               drop_last=True)
test_data_loader = DataLoader(dataset=test_data, batch_size=10, shuffle=True, 
                              drop_last=True)

In [7]:
it = iter(train_data_loader)

In [None]:
next(it)[0]

In [26]:
tf = transforms.Compose([#transforms.Resize((30,30)),
    #transforms.Grayscale(num_output_channels=1),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(20),
    #transforms.ToTensor()
    ])

In [27]:
dataset = FaceDataset("data", transform=tf)

In [28]:
data_aug = torch.utils.data.ConcatDataset([data, dataset])

In [30]:
aug_train, aug_valid, aug_test = split_data(data_aug)

In [16]:
data_k = DataLoader(dataset= data_aug,batch_size=10, shuffle=True, 
                               drop_last=True)

In [31]:
train_aug_loader = DataLoader(dataset= aug_train,batch_size=10, shuffle=True, 
                               drop_last=True)
valid_aug_loader = DataLoader(dataset= aug_valid,batch_size=10, shuffle=True, 
                               drop_last=True)
test_aug_loader = DataLoader(dataset=aug_test, batch_size=10, shuffle=True, 
                              drop_last=True)

In [19]:
next(iter(data_k))[0].shape

torch.Size([10, 1, 600, 600])

In [20]:
class MLPClassif(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, act_fn):
        super(MLPClassif, self).__init__()
        self.hidden1 = nn.Linear(input_size, hidden_size)
        self.hidden2 = nn.Linear(hidden_size, hidden_size)
        self.hidden3 = nn.Linear(hidden_size, hidden_size)
        self.out_layer = nn.Linear(hidden_size, output_size)
        self.act_fn = act_fn

    
    def forward(self, x):
        dropout = nn.Dropout(p=0.1)
        x = self.act_fn(self.hidden1(x))
        x = nn.BatchNorm1d(100, affine=False)(x)
        x = self.act_fn(self.hidden2(x))
        x = nn.BatchNorm1d(100, affine=False)(x)
        x = self.act_fn(self.hidden3(x))
        x = nn.BatchNorm1d(100, affine=False)(x)
        out = self.out_layer(x)
        return out

In [21]:
model = MLPClassif(30*30, 100, 1, nn.ReLU())

In [22]:
def init_weights(m):
    if isinstance(m, nn.Linear):
        torch.nn.init.xavier_uniform_(m.weight.data)
        m.bias.data.fill_(0.01)
        

#torch.manual_seed(0)
model.apply(init_weights)
#model.to(device)

MLPClassif(
  (hidden1): Linear(in_features=900, out_features=100, bias=True)
  (hidden2): Linear(in_features=100, out_features=100, bias=True)
  (hidden3): Linear(in_features=100, out_features=100, bias=True)
  (out_layer): Linear(in_features=100, out_features=1, bias=True)
  (act_fn): ReLU()
)

In [34]:

def eval_binary_classifier(model, eval_dataloader, loss_fn):
    #model.to(device)
    # Set the model in 'evaluation' mode (this disables some layers (batch norm, dropout...) which are not needed when testing)
    model.eval() 
    #model.to(device)
    transform = transforms.Resize(size = (30,30))
    with torch.no_grad():
        # initialize the total and correct number of labels to compute the accuracy
        loss = 0

        

        # Iterate over the dataset using the dataloader
        for images, labels in eval_dataloader:
            #labels = torch.tensor(labels, dtype=torch.float32)
            images = transform(images.squeeze(0))
            
            #labels.to(device)
            images = images.reshape(images.shape[0], -1)
            #images.to(device)
            #print(images.shape)
            # Get the predicted labels
            y_predicted = model(images)

            l =loss_fn(y_predicted, labels.unsqueeze(1))
            loss += l.item()
            

    return loss

def train_val_binary_classifier(model, train_dataloader, valid_dataloader, num_epochs, 
                            loss_fn, learning_rate, verbose=True):

    # Make a copy of the model (avoid changing the model outside this function)
    model_tr = copy.deepcopy(model)
    
    # Set the model in 'training' mode (ensures all parameters' gradients are computed - it's like setting 'requires_grad=True' for all parameters)
    model_tr.train()
    #model_tr.to(device)
    # Define the optimizer
    #optimizer = #torch.optim.Adam(model_tr.parameters(), lr=learning_rate)
    optimizer =torch.optim.SGD(model_tr.parameters(), lr=learning_rate)
    
    # Initialize a list to record the training loss over epochs
    loss_all_epochs = []
    valid_loss = []
    min_valid_loss = 0
    transform = transforms.Resize(size = (30,30))
    # Training loop

    for epoch in range(num_epochs):
        # Initialize the training loss for the current epoch
        loss_current_epoch = 0
        
        # Iterate over batches using the dataloader
        for batch_index, (images, labels) in enumerate(train_dataloader):
            #labels = torch.tensor(labels, dtype =torch.float32)
            #images.to(device)
            #images = torch.tensor(images, dtype=float)
            images = transform(images.squeeze(0))
            #labels = torch.Tensor([(l==1) for l in labels])
            #labels.to(device)
            images = images.reshape(images.shape[0], -1)
            
            
            y_pred = model_tr(images)
            l = loss_fn(y_pred, labels.unsqueeze(1))

            
            optimizer.zero_grad()
            l.backward()
            optimizer.step()
            loss_current_epoch +=  l.item()



        # At the end of each epoch, record and display the loss over all batches
        loss_all_epochs.append(loss_current_epoch)
        val_loss= eval_binary_classifier(model_tr, 
                                                 valid_dataloader,
                                                 loss_fn)
        valid_loss.append(val_loss)

        #if val_loss < min_valid_loss:
        #    min_valid_loss = val_loss
        #    torch.save(model_tr.state_dict(), 'model_opt.pt')
                                    
        if verbose:
            print('Epoch [{}/{}], Train Loss: {:.4f}\n\
                Valid Loss: {:.4f}'.format(epoch+1, num_epochs, 
                                           loss_current_epoch,
                                           val_loss))

            
        
    return model_tr, loss_all_epochs, valid_loss

In [None]:
num_epochs = 15
learning_rate = 0.08
loss_fn = nn.BCEWithLogitsLoss()
model_trained, train_losses, val_losses =train_val_binary_classifier(model, 
                                                                     train_aug_loader,
                                                                     valid_aug_loader,
                                                                     num_epochs,loss_fn, 
                                                                     learning_rate, 
                                                                     verbose=True)

In [None]:
class FaceDataAugmentation(Dataset):

    def __init__(self, image_dir, transform = None):
        
        """Function to load images into Tensor
            Args: 
                - image_dir : directory of images
                - Return : a dictonary with images and labels
                """
        self.image_dir = image_dir
        self.transform = transform
        self.image_dict = self.load_image()


    def __len__(self) :
        return len(self.image_dict["label"])


    def __getitem__(self, index) :
        
        
        img = self.image_dict["img_dir"][index]
        path = torch.from_numpy(io.imread(img,
                                         as_gray=True).astype(np.float32))
        label = self.image_dict["label_bin"][index]
        if self.transform is not None:
            for t in self.transform:
                path = t(path)
        
        return path, label


    def load_image(self) :
        img_dict = {"img_dir" : [], "label" : [], 'label_bin':[]}
        for root, dirs, files in os.walk(self.image_dir):
            for img in files:
                img_dir = os.path.join(root, img)
                
                img_dict["img_dir"].append(img_dir)
                img_dict["label"].append(img[:4])
                img_dict["label_bin"].append(float(img[:4]=="real"))
        img_dict["label_bin"] = torch.tensor(img_dict["label_bin"],dtype=torch.float32)
        return img_dict

In [79]:
transform_ = transforms.Resize(size = (30,30))
img.shape
img = transform_(img)
img.shape

torch.Size([10, 30, 30])

In [122]:
augdata = FaceDataAugmentation('data', transform = [transforms.RandomHorizontalFlip(),transforms.Resize(size = (24,24))])
augdataloader = DataLoader(dataset= augdata,batch_size=10, shuffle=True, 
                               drop_last=True)

In [None]:
next(iter(augdataloader))[0][0,:,:].shape

In [None]:
# Plot the training and validation losses over epochs
plt.figure()
plt.subplot(1, 2, 1)
plt.plot(train_losses)
plt.title('Training loss')
plt.xlabel('Epochs')
plt.subplot(1, 2, 2)
plt.plot(val_losses)
plt.title('Validation loss')
plt.xlabel('Epochs')
plt.tight_layout()
plt.show()

In [28]:
def eval_classifier(model, eval_dataloader):

    logits = nn.Sigmoid()
    model.eval() 
    transform = transforms.Resize(size = (30,30))
    # In test phase, we don't need to compute gradients (for memory efficiency)
    with torch.no_grad():
        # initialize the total and correct number of labels to compute the accuracy
        correct = 0
        total = 0
        for images, labels in eval_dataloader:
            #images = torch.tensor(images, dtype=float)
            images = transform(images)
            labels = labels.unsqueeze(1)
            
            #label.to(device)
            images = images.reshape(images.shape[0], -1)
            y_predicted = model(images)
            y_predicted = logits(y_predicted)
            # print(y_predicted)
            label_predicted = torch.round(y_predicted)
            #print(label_predicted)
            total += labels.size(0)
            correct += (label_predicted == labels).sum().item()
    
    accuracy = 100 * correct / total
    
    return accuracy

In [177]:
#model_ =  MLPClassif(30*30, 32, 1, nn.ReLU())
#model_.load_state_dict(torch.load('model_opt.pt'))
torch.save(model_trained, 'model_opt.pt')
eval_classifier(model_trained, test_data_loader)

55.5

In [88]:
class CNN(nn.Module):
    def __init__(self, num_channels1=16, num_channels2=32, num_classes=1):
        super(CNN, self).__init__()
        self.conv_block1 = nn.Sequential(nn.Conv2d(in_channels=1, out_channels= num_channels1, kernel_size=5, padding=2),
                           nn.ReLU(),
                           #nn.BatchNorm2d(num_channels1, affine=False),
                           nn.MaxPool2d(kernel_size=2))
        
        self.conv_block2 = nn.Sequential(nn.Conv2d(num_channels1, num_channels2, kernel_size=5, padding=2),
                           nn.ReLU(),
                           #nn.BatchNorm2d(num_channels2,affine=False),
                           nn.MaxPool2d(kernel_size=2))
        self.fc = nn.Linear(32*7*7, num_classes)

    def forward(self, x):
        x = self.conv_block1(x)
        x = self.conv_block2(x)
        out = self.fc(x.view(-1, 32*7*7))
        return out

In [98]:
num_epochs = 30
learning_rate = 0.08

cv = nn.Sequential(nn.Conv2d(in_channels=1,out_channels=16, kernel_size=5, padding=2),
                           nn.ReLU(),
                           nn.BatchNorm2d(16, affine=False),
                           nn.MaxPool2d(kernel_size=2),
                           )
print(img.unsqueeze(1).shape)
out = cv(img.unsqueeze(1))

torch.Size([10, 1, 30, 30])


In [99]:
out.shape

torch.Size([10, 16, 15, 15])

In [100]:
conv_block2 = nn.Sequential(nn.Conv2d(16, 32, kernel_size=5, padding=2),
                           nn.ReLU(),
                           nn.MaxPool2d(kernel_size=2))
out2 = conv_block2(out)
print(out.shape, out2.shape)
print(out2.view(-1,15680).shape)

torch.Size([10, 16, 15, 15]) torch.Size([10, 32, 7, 7])
torch.Size([1, 15680])


In [10]:
x = torch.randn(3, 2)
y = torch.randn(3, 2)
z = torch.stack([x,y, torch.randn(3, 2)], dim=0)
z[0,:,:]

tensor([[ 1.6009, -0.2867],
        [ 0.1835,  0.6055],
        [-0.5437, -0.3293]])

In [107]:
model_cnn = CNN()
#model_cnn.train()
    
    # Define the optimizer
optimizer = torch.optim.SGD(model_cnn.parameters(), lr=learning_rate)
    
    # Initialize a list to record the training loss over epochs
loss_all_epochs = []
    
    # Training loop
for epoch in range(num_epochs):
        # Initialize the training loss for the current epoch
    loss_current_epoch = 0
        
        # Iterate over batches using the dataloader
    for batch_index, (images, labels) in enumerate(train_data_loader):
            
            # TO DO: write the training procedure for each batch. This should consist of:
            # - vectorizing the images (size should be (batch_size, input_size))
            # - apply the forward pass (calculate the predicted labels from the vectorized images)
            # - use the 'backward' method to compute the gradients
            # - apply the gradient descent algorithm
            # Also think of updating the loss at the current epoch
        # print(labels.unsqueeze(1).shape)
        images = transform_(images)
        y_pred = model_cnn(images.unsqueeze(1))
        # print(y_pred.shape)
        l = loss_fn(y_pred, labels.view(-1, 1))
        optimizer.zero_grad()
        l.backward()
        loss_current_epoch += l.item()
        optimizer.step() #update parameters

        # At the end of each epoch, record and display the loss over all batches
    loss_all_epochs.append(loss_current_epoch)
    
    print('Epoch [{}/{}], Loss: {:.4f}'.format(epoch+1, num_epochs, loss_current_epoch))

Epoch [1/30], Loss: 114.5117
Epoch [2/30], Loss: 113.5412


KeyboardInterrupt: 