# Eye glasses detection

In this project, we will create a binary eye-glasses detector.
    We are going to use [CelebA dataset](http://mmlab.ie.cuhk.edu.hk/projects/CelebA.html) as our dataset. This dataset has face attributes, one of these attributes is eyeglasses.

## Intruduction

### Guidelines

Outline:

1. Install face detector [DSFDDetector](https://github.com/hukkelas/DSFD-Pytorch-Inference)
2. Crop faces using face detector
3. Separate faces with glasses and those without glasses using the attributes
4. build a train-dataloader and a test-dataloader (with 2 classes: `eyeglasses` `no-eyeglasses`)
5. build and train the model
6. evaluate the model

You are allowed to and encouraged to browse the web for help.

[PyTorch](https://pytorch.org/) is the preferred framework.

### Evaluation

You can solve this in any way, however you will be evaluated on the method you used.

Evaluation will be based on:

- code is functional (it works)
- time
- code is organized
- bonus tasks


### Tips

- Use the internet!
- Take your time reading, in fact go over the entire notebook first before writing anything
- Ask for help if you get stuck
- If one thing doesn't work, try something else (even if it's sub-optimal)
- Check your output often, check the data shape makes sense and plot images to prevent confusion or errors
- Keep track of time
- Don't forget to enjoy and benefit from the excercise :)

### Setup

This is a [jupyter notebook](https://jupyter.org/),
here are some [keyboard shortcuts](http://maxmelnick.com/2016/04/19/python-beginner-tips-and-tricks.html) to get you started.

You can run terminal commands using `!` such as:

```shell script
!echo "this is a command"
```

### Workstation/PC specs

The workstation you have is a [LambdaLabs](https://lambdalabs.com/deep-learning/workstations/2-gpu) dual RTX 8000 GPUs, each with 50GB of VRAM.  
**It is advised to utilize both GPUs whenever possible to speedup computation**.

To get more details, run `!nvidia-smi`

In [None]:
!nvidia-smi

## Bonus 1: Visualization (tensorboard is recommended)

Visualize losses, and anything else that may be helpful

## Bonus 2: Hyperparameter search (gridsearch)



## Install [DSFD Face detector](https://github.com/hukkelas/DSFD-Pytorch-Inference)

In [None]:
!pip install opencv-python tqdm

In [None]:
# cloning https://github.com/hukkelas/DSFD-Pytorch-Inference
! git clone https://github.com/hukkelas/DSFD-Pytorch-Inference.git

In [None]:
cd DSFD-Pytorch-Inference/

In [None]:
!python setup.py install

In [None]:
cd ..

## Crop faces

Run the DSFD face detector to crop the faces (to make the problem easier to solve for our classifier)

This part might take a while to run so **do** ask for help when you get here

### Expected output

A folder with cropped faces

In [None]:
import glob
import os
import cv2
import face_detection
import matplotlib.pyplot as plt
import numpy as np
from tqdm import tqdm
import time
import torch
import concurrent.futures
import itertools 

def get_face(img, detections):
    if detections:
        bboxes = [d[:4] for d in detections]
        scores = [(b[2]-b[0])+(b[3]-b[1])for b in bboxes]
        idx = np.argmax(scores)
        bbox = bboxes[idx]
        bbox = [int(num) for num in bbox]
        face = img[bbox[1]:bbox[3], bbox[0]:bbox[2]]
        return face[:, :, ::-1]
    return []

start_time = time.time()
src_path = 'img_align_celeba/'
dest_path = 'img_align_celeba_cropped'
os.makedirs(dest_path, exist_ok=True)
img_paths = glob.glob(f'{src_path}/*')


detectors = [
    face_detection.build_detector("DSFDDetector", confidence_threshold=.5, nms_iou_threshold=.3, device=f'cuda:{i}') 
    for i in range(torch.cuda.device_count())
]

def pred_dist(detector_idx, *args):
    return detectors[detector_idx].batched_detect(*args)

In [None]:
batch_size = 200
for i in tqdm(range(len(img_paths)//batch_size), 'infering', unit='batch'):
    img_p = img_paths[batch_size*i:batch_size*(i+1)]
    
    im = [cv2.imread(p)[:, :, ::-1] for p in img_p]
    im = np.stack(im)
    
    batch_per_gpu = batch_size//len(detectors)
    with concurrent.futures.ThreadPoolExecutor() as executer:
        imgs = [im[batch_per_gpu*i : batch_per_gpu*(i+1)] for i in range(len(detectors))]
        detections += list(executer.map(pred_dist, list(range(len(detectors))), imgs))
    
    detections = [d.tolist() for d in detections]
    faces = list(map(get_face, im, detections))

    output_paths = [os.path.join(dest_path, p.split('/')[-1]) for p in img_p]
    for output_path, face in zip(output_paths, faces):
        if len(face) != 0:
            try:
                cv2.imwrite(output_path,face)
            except:
                print(face, output_path)

print(time.time() - start_time)

## Build dataloaders

### Expected output:

Train loader and test loader, each with 2 classes: `eyeglasses` and `no-eyeglasses`

In [None]:
import pandas as pd
import time
import os

attributes_map_file = 'list_attr_celeba.txt'
target_attribute = 'Eyeglasses'


att = pd.read_csv(attributes_map_file, sep="  | ")
Eyeglasses = att[target_attribute]
Eyeglasses = Eyeglasses.replace(-1,0)

In [None]:
list_of_im = os.listdir('img_align_celeba_cropped/')
data_dict = {im:Eyeglasses[im] for im in list_of_im}

In [None]:
!pip install sklearn

In [None]:
from sklearn.model_selection import train_test_split

x_train, x_test, y_train, y_test = train_test_split(list(data_dict.keys()), list(data_dict.values()), test_size=0.1, stratify=list(data_dict.values()))
d = {x:y for x,y in zip(x_train,y_train)}

In [None]:
train_df = pd.DataFrame(y_train, x_train)[0]
test_df = pd.DataFrame(y_test, index=x_test)[0]

## build a classifier
(transfer learning is recommended)

### Expected output

a pytorch classifier model to classify `eyeglasses` vs `no-eyeglasses`

In [None]:
from torch.utils.data.dataset import Dataset
from torch.utils.data import DataLoader
import cv2
import matplotlib.pyplot as plt
from torchvision import transforms

In [None]:
class Eyeglasses_data(Dataset):
    def __init__(self, root, data_frame, transform=None):
        super(Eyeglasses_data, self).__init__()
        self.root = root
        self.data_frame = data_frame
        self.keys = data_frame.keys()
        self.transform = transform
    def __getitem__(self, idx):
        im_name = self.keys[idx]
        im_path = os.path.join(self.root, im_name)
        label = self.data_frame[im_name]
        image = plt.imread(im_path)
        if self.transform:
            image = self.transform(image)
        return image, label
    
    def __len__(self):
        return len(self.data_frame)

In [None]:
train_transforms = transforms.Compose([
                                    transforms.ToPILImage(),
                                    transforms.Resize((200,200)),
                                    transforms.RandomHorizontalFlip(),
                                    transforms.ColorJitter(brightness=(0.5)),
                                    transforms.ToTensor()
])
test_transforms = transforms.Compose([
                                    transforms.ToPILImage(),
                                    transforms.Resize((200,200)),
                                    transforms.ToTensor()
])


train_dataset = Eyeglasses_data('img_align_celeba_cropped/', train_df, transform=train_transforms)
trainloader = DataLoader(train_dataset, batch_size=256)

test_dataset = Eyeglasses_data('img_align_celeba_cropped/', test_df, transform=test_transforms)
testloader = DataLoader(test_dataset, batch_size=256)

## Train

### Expected output:

a trained model

In [None]:
import torch
from torch import nn
import torchvision
from torch.utils.tensorboard import SummaryWriter
import sklearn

class Classifier(nn.Module):
    def __init__(self, params):
        super(Classifier, self).__init__()
        self.num_classes = params['num_classes']
        self.classifier = nn.Linear(1000, self.num_classes)
        self.feature_extractor = torchvision.models.resnet18(pretrained=True)
        self.change_training_state_of_feature_extractor(False)

    def forward(self, x):
        features = self.feature_extractor(x)
        logits = self.classifier(features)
        return logits

    def change_training_state_of_feature_extractor(self, state):
        for param in self.feature_extractor.parameters():
            param.requires_grad = state

In [None]:
class Trainer():    
    def __init__(self, model, trainloader, testloader, params):
        self.epoch = 0
        self.print_every = 10
        self.iter_counter = 0
        self.trainloader, self.testloader = trainloader, testloader
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        self.model = model.to(self.device)
        self.params = params
        self.exp_name = '___'.join(['']+[f'{k}={v}' for k,v in params.items()])
        print(f'executing:  {self.exp_name}')
        self.summarywriter = SummaryWriter(comment=self.exp_name)
        
        self.create_optimizer()
        weight = self.calculate_weight()
        weight = torch.tensor([0.9349, 0.0651])
        if self.params['loss_function'] == 'cross_entropy':
            weight = weight.to(self.device)
            self.criterion = nn.CrossEntropyLoss(weight=weight).to(self.device)
        else:
            raise Exception('unknown loss function!')
        
    def train(self):
        self.running_loss = 0.0
        for self.epoch in range(self.params['n_epochs']):
            if self.epoch == 0:
                self.model.change_training_state_of_feature_extractor(True)
            for inputs, labels in self.trainloader:
                self.iter_counter += 1
                inputs, labels = inputs.to(self.device), labels.to(self.device)
                self.optimizer.zero_grad()
                outputs = self.model(inputs)
                loss = self.criterion(outputs, labels)
                self.running_loss += loss.item()
                loss.backward()
                self.optimizer.step()
                if self.iter_counter % self.print_every == 0:
                    self.print_progress()
            self.test()
        
    def test(self):
        all_pred = torch.empty(0)
        all_labels = torch.empty(0)
        self.model.eval()
        for i, (inputs, labels) in enumerate(self.testloader):
            inputs, labels = inputs.to(trainer.device), labels.to(self.device)
            outputs = self.model(inputs)
            _, predicted = torch.max(outputs.data, 1)
            all_labels = torch.cat((all_labels, labels.cpu()))
            all_pred = torch.cat((all_pred, predicted.cpu()))
        accuracy = 100 * (all_pred == all_labels).sum().item()/all_labels.size()[0]
        self.summarywriter.add_scalar('test accuracy', accuracy, self.epoch)
        print(f'accuracy= {accuracy}')
        conf_mat = sklearn.metrics.confusion_matrix(all_labels, all_pred)
        print('confusion matrix', conf_mat)
        self.save_model()
        self.model.train()
        
    def create_optimizer(self):
        if self.params['optimizer'] == 'SGD':
            self.optimizer = torch.optim.SGD(self.model.parameters(), lr=self.params['learning_rate'])
        elif self.params['optimizer'] == 'adam':
            self.optimizer = torch.optim.Adam(self.model.parameters(), lr=self.params['learning_rate'])
        else:
            raise Exception('Unknown optimizer!')

    def save_model(self):
        saving_dir = f'runs/{self.exp_name}/saved_models'
        os.makedirs(saving_dir, exist_ok=True)
        path = os.path.join(saving_dir, f'epoch_{self.epoch}_iter_{self.iter_counter}.pth')
        torch.save(self.model.state_dict(), path)
        
    def calculate_weight(self):
        weight = torch.zeros(self.params['num_classes'], dtype=torch.float32)
        for _, label in self.trainloader:
            weight += torch.nn.functional.one_hot(label, num_classes=self.params['num_classes']).sum(0)
        return weight/weight.sum()
    
    def print_progress(self):
        avg_loss = self.running_loss / self.print_every
        print(f'epoch = {self.epoch+1}    iter = {self.iter_counter}     loss = {avg_loss:.5f}')
        self.summarywriter.add_scalar('loss', avg_loss, (self.epoch+1)*self.iter_counter)
        self.running_loss = 0.0

In [None]:
params = {
    'num_classes':2,
    'n_epochs':5,
    'batch_size':256,
    'optimizer':'adam',
    'learning_rate':0.0001,
    'loss_function':'cross_entropy'
}

In [None]:
classifier = Classifier(params)
trainer = Trainer(classifier, trainloader, testloader, params)

In [None]:
# run training ...

trainer.train()

In [None]:
# run testing ...

trainer.test()