# Facial Keypoints Detection


In [None]:
import torch
import torchvision
import torch.nn as nn
import torch.optim as optim
from torchvision import models
from torchvision import transforms
from torch.utils.data import DataLoader, SequentialSampler
from data_loader import FacialKeypointsDataset
import transform
import matplotlib.pyplot as plt
import time
import copy
import numpy as np
import cv2 as cv

print('Torch version: {}'.format(torch.__version__))
print('Torchvision version: {}'.format(torchvision.__version__))
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print('Using GPU: {}'.format(device))

## Load the dataset with the following transforms:
* Normalization in the range (a, b)
\begin{equation}
sample^* = (b - a) * \frac{(sample - min(sample))}{max(sample) - min(sample)} + a
\end{equation}

     \begin{equation} Image = (1-0) * \frac{Image - 0}{255 - 0} + 0 = Image/255 \end{equation}
     \begin{equation} Keypoints = (1 - (-1))* \frac{Keypoints - 0}{96 - 0} + (-1) = \frac{Keypoints - 48}{48} \end{equation} 

* Numpy array to Tensor conversion

In [None]:
data_transforms = transforms.Compose([
    transform.Normalize(),
    transform.ToTensor(),
])
train_set = FacialKeypointsDataset('training.csv', transform=data_transforms)
val_set = FacialKeypointsDataset('val.csv', transform=data_transforms)
print(len(train_set), len(val_set))

## Use PyTorch DataLoader to load streams of train and validation

In [None]:
bs = 20
train_loader = DataLoader(train_set, batch_size=bs, shuffle=True, num_workers=4)
val_loader = DataLoader(val_set, batch_size=bs, shuffle=False, num_workers=4, drop_last=True)

### Plot some samples

In [None]:
num_samples = 4
fig = plt.figure(figsize=(15,15))
for idx, sample in enumerate(train_loader):
    if idx == num_samples:
        break
    image, key_pts = sample['image'], sample['keypoints']
    image = torch.squeeze(image[0]) * 255.
    key_pts = (key_pts[0].view(-1, 2) * 48) + 48
    plt.subplot(1, num_samples ,idx+1)
    plt.title('Sample {}'.format(idx))
    plt.axis('off')
    plt.imshow(image, cmap='gray')
    plt.scatter(key_pts[:,0], key_pts[:,1], s=100,  marker='.', c='r')
plt.show()

## Define Network Architecture
```python
features{(Layer Segment 1) = Conv2d(32, 96, 96),
                             BatchNorm2d(32, 96, 96),
                             MaxPool2d(32, 48, 48),
                             ReLU(32, 48, 48),
                             Dropout2d(32, 48, 48)
                
         (Layer Segment 2) = Conv2d(64, 48, 48),
                             BatchNorm2d(64, 48, 48),
                             MaxPool2d(64, 24, 24),
                             ReLU(64, 24, 24),
                             Dropout2d(64, 24, 24)
                
         (Layer Segment 3) = Conv2d(128, 24, 24),
                             BatchNorm2d(128, 24, 24),
                             MaxPool2d(128, 12, 12),
                             ReLU(128, 12, 12),
                             Dropout2d(128, 12, 12)

         (Layer Segment 4) = Conv2d(256, 12, 12),
                             BatchNorm2d(256, 12, 12),
                             MaxPool2d(256, 6, 6),
                             ReLU(256, 6, 6),
                             Dropout2d(256, 6, 6)
         }
                
classifier{(Layer Segment 1) = Linear(256),
                               ReLU(256),
                               Dropout(256),
                               Linear(30)
          }
```

In [None]:
class KeypointsNet(nn.Module):

    def __init__(self):
        super(KeypointsNet, self).__init__()
        self.reset()
        self.features = nn.Sequential(
            nn.Conv2d(1, 32, 3, padding=1),
            nn.BatchNorm2d(32),
            nn.MaxPool2d(2, 2),
            nn.ReLU(),
            nn.Dropout2d(0.1),

            nn.Conv2d(32, 64, 3, padding=1),
            nn.BatchNorm2d(64),
            nn.MaxPool2d(2, 2),
            nn.ReLU(),
            nn.Dropout2d(0.2),

            nn.Conv2d(64, 128, 3, padding=1),
            nn.BatchNorm2d(128),
            nn.MaxPool2d(2, 2),
            nn.ReLU(),
            nn.Dropout2d(0.3),

            nn.Conv2d(128, 256, 3, padding=1),
            nn.BatchNorm2d(256),
            nn.MaxPool2d(2, 2),
            nn.ReLU(),
            nn.Dropout2d(0.4),
        )
        self.classifier = nn.Sequential(
            nn.Linear(256*6*6, 256),
            nn.ReLU(),
            nn.Dropout(0.4),
            nn.Linear(256, 30)
        )

    def forward(self, x):
        x = self.features(x)
        x = x.view(-1, self.num_flat_features(x))
        x = self.classifier(x)
        return x
    
    def num_flat_features(self, x):
        n_feats = 1
        for s in x.size()[1:]:
            n_feats *= s

        return n_feats

    def reset(self):
        self.train_loss_history = []
        self.train_acc_history = []
        self.val_loss_history = []
        self.val_acc_history = []

    def print_params(self):
        total_params = sum(param.numel() for param in self.parameters())
        print('Total params: {}'.format(total_params))

### Display Model

In [None]:
model = KeypointsNet()
print(model)
model.print_params()
model = model.to(device)

### Training Loop

In [None]:
def train_net(model, train_loader, val_loader, use_cuda, print_every, n_epochs):
    print('\nStarted training...\n')
    best_score = 140
    best_model_wts = copy.deepcopy(model.state_dict())
    for epoch in range(n_epochs):
        model.train
        running_loss = 0.0
        running_corrects = 0
        for batch, data in enumerate(train_loader):
            images, key_pts = data['image'], data['keypoints']
            key_pts = key_pts.view(key_pts.size(0), -1)
            if use_cuda:
                images = images.to(device)
                key_pts = key_pts.to(device)

            output = model(images)
            train_loss = criterion(output, key_pts)
            model.train_loss_history.append(train_loss.item())
            
            optimizer.zero_grad()
            train_loss.backward()
            optimizer.step()

            if (batch+1) % print_every == 0:
                print('[Iteration {}/{}] Training loss: {:.3f}'.format(batch+1, len(train_loader), train_loss.item()))

        with torch.no_grad():
            running_loss = 0.0

            model.eval
            for batch, data in enumerate(val_loader):
                image_val, key_pts_val = data['image'], data['keypoints']
                key_pts_val = key_pts_val.view(key_pts_val.size(0), -1)
                if use_cuda:
                    image_val = image_val.to(device)
                    key_pts_val = key_pts_val.to(device)
                    

                o_val = model(image_val)
                val_loss = criterion(o_val, key_pts_val)
                running_loss += val_loss.item()
                model.val_loss_history.append(val_loss.item())
                
        metric = 1.0 / (2 * (running_loss/len(val_loader)))
        print('[Epoch {}/{}] Training loss: {:.3f}, Validation score: {:.3f}\n'.format(epoch+1, n_epochs, model.train_loss_history[-1], metric))

        if metric > best_score:
            best_score = metric
            best_model_wts = copy.deepcopy(model.state_dict())
            print('Achieved better score. Saving state...')

    print('\nFinished training...\n')
    model.load_state_dict(best_model_wts)
    return model

In [None]:
since = time.time()
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.01, betas=(0.9, 0.999), eps=1e-8, weight_decay=1e-6)
best_model = train_net(model, train_loader, val_loader, True, 10, 25)
torch.save(best_model, 'bestModel2.pt')
time_elapsed = time.time() - since
print('Total training time: {}m {}s'.format(time_elapsed // 60, time_elapsed % 60))

## Testing

* Detect faces using OpenCV's Haar Cascade
* Resize detected faces to (96, 96)
* Make a Torch Tensor from the face sample
* Evaluate the model on the sample
* Plot the output keypoints back on the sample

In [None]:
face_cascade = cv.CascadeClassifier('/home/hashir/anaconda3/lib/python3.7/site-packages/cv2/data/haarcascade_frontalface_default.xml')

def face_detect(frame):
    gray = cv.cvtColor(frame, cv.COLOR_BGR2GRAY)
    faces = face_cascade.detectMultiScale(gray, 1.3, 5)
    face_gray = None
    for (x,y,w,h) in faces:
        cv.rectangle(frame,(x,y),(x+w,y+h),(255,0,0),2)
        face_gray = gray[y:y+h, x:x+w]
    
    if face_gray is not None:
        sample = cv.resize(face_gray, (96, 96))
        sample_np = np.asarray(sample).reshape(1,96,96).astype(np.float32)
        sample_np /= 255.
        sample_tensor = torch.from_numpy(sample_np).unsqueeze(0).to(device)
        return sample_tensor
    else:
        print('No face found!')

saved_model = torch.load('Saved_Models/bestModel2.pt')
saved_model = saved_model.to(device)
saved_model.eval()

## Test in real-time using OpenCV's Video Capture

In [None]:
cap = cv.VideoCapture(0)
time.sleep(2.0)

# fourcc = cv.VideoWriter_fourcc(*'MJPG')
# out = cv.VideoWriter('output.avi',fourcc, 20.0)

while cap.isOpened():
    ret, frame = cap.read()
#     frame = cv.resize(frame, (1024, 600))
    gray = cv.cvtColor(frame, cv.COLOR_BGR2GRAY)
    faces = face_cascade.detectMultiScale(gray, 1.3, 5)
    for (x,y,w,h) in faces:
        cv.rectangle(frame,(x,y),(x+w,y+h),(255,0,0),2)
        face_gray = gray[y:y+h, x:x+w]
        original_shape = face_gray.shape
        sample = cv.resize(face_gray, (96, 96))
        sample = sample.astype('float32')/255
        sample = np.asarray(sample).reshape(1,96,96)
        sample = torch.from_numpy(sample).unsqueeze(0).to(device)
        output = saved_model(sample)
        output = output.view(-1, 2).detach()
        output = (output * 48) + 48
        output = output.cpu().numpy()
        for i in range(15):
            x_pt = (output[i][0] * (original_shape[1]/96)) + x
            y_pt = (output[i][1] * (original_shape[0]/96)) + y
            cv.circle(frame, (int(x_pt), int(y_pt)), 3, (0, 0, 255), -1)
#     out.write(frame)
    cv.imshow("Frame", frame)
    key = cv.waitKey(1) & 0xFF
    
    
    if key == ord('q'):
        break
        
cap.release()
# out.release()
cv.destroyAllWindows()
cv.waitKey(1)