In [133]:
import dlib
import os
import numpy as np

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

import torchvision
from torch.utils.data import Dataset

import torchvision.transforms as transforms

from tqdm import tqdm

from skimage import io
import cv2

In [134]:
path_to_file = 'data/Menpo_68p/train/aflw__face_43009.jpg'

In [135]:
check_valid_bbox(torch.tensor([8.058350229756407e-20, 1.5407837134637094e-40, 7.2618585040409505e-31, 1.1537170516479084e-40]))

False

In [136]:
def face_bbox(file_path: str) -> torch.FloatTensor:
    detector = dlib.get_frontal_face_detector()
    img = io.imread(file_path)
    dets, _, _ = detector.run(img, 1, -1)
    try:
        if ((dets[0].right() - dets[0].left()) <= 0) or ((dets[0].bottom() - dets[0].top()) <= 0):
            face_bbox = [[None]*4]
        else:
            face_bbox = torch.tensor([dets[0].left(), dets[0].top(), dets[0].right(), dets[0].bottom()], dtype=torch.float)
    except IndexError:
        print(f"Following face wasn't recognized {file_path}")
        face_bbox = [[None]*4]
    return face_bbox

In [137]:
face_bbox(path_to_file)

Following face wasn't recognized data/Menpo_68p/train/aflw__face_43009.jpg


[[None, None, None, None]]

In [138]:
path_to_points = 'data/Menpo_68p/train/aflw__face_43009.pts'

In [139]:
def read_keypoint(jpg_path: str) -> torch.FloatTensor:
    #assume .pts file is in the same fir as .jpg
    pts_name = jpg_path[:-4] + '.pts'
    with open(pts_name) as f:
        lines = f.readlines()
        if lines[0].startswith('version'):  # to support different formats
            lines = lines[3:-1]
        mat = np.fromstring(''.join(lines), sep=' ')
        mat_tensor = torch.tensor((mat.reshape((68, 2))), dtype=torch.float)
        visibility = torch.ones([68, 1], dtype=torch.float)
        keypoint = torch.cat((mat_tensor, visibility), dim=1)                    
    return keypoint

In [140]:
read_keypoint(path_to_points).shape

torch.Size([68, 3])

In [141]:
def check_valid_bbox(bbox):
     list_bbox = bbox.tolist()
     x1, y1, x2, y2 = float(list_bbox[0]), float(list_bbox[1]), float(list_bbox[2]), float(list_bbox[3])
     if ((x2 - x1) <= 0) or ((y2 - y1) <= 0):
         return False
     return True

In [142]:
class FaceLandmarksDataset(Dataset):
    """Face Landmarks dataset."""

    def __init__(self, dir_to_jpgs: str, transform=None):
        """
        Arguments:
            dir_to_folder (string): Path to folder with .jpg and .pts files.
            transform (callable, optional): Optional transform to be applied
                on a sample.
        """
        self.root_dir = dir_to_jpgs
        self.transform = transform
        self.images = []
        for idx, fname in enumerate(os.listdir(self.root_dir)):
            cur_path = os.path.join(self.root_dir, fname)
            if cur_path.endswith('.jpg'):
                self.images.append(cur_path)

    def __getitem__(self, idx):
        img_path = self.images[idx]
        img = cv2.cvtColor(cv2.imread(img_path), cv2.COLOR_BGR2RGB)
        bbox = face_bbox(img_path) # [x1, y1, x2, y2]
        keypoints = read_keypoint(img_path) # (FloatTensor[K, 3]) format K : [x, y, visibility]
        labels = torch.tensor(1, dtype=torch.int64)
        img = img / 255 # normalize values  
        img_height, img_width, _ = img.shape
        if bbox == [[None]*4]:
            # ловим неопределяшки от dlib -> пустые таргеты
            bbox = torch.zeros((4), dtype=torch.float)
            keypoints = torch.zeros((21, 3), dtype=torch.float)
            labels = torch.tensor(0, dtype=torch.int64)

        if self.transform is not None:
            img = self.transform(img) # to tensor, from shape (H, W, C) -> (C, H, W)
        img = img.to(torch.float)

        target = {
            #'path': img_path,
            'boxes': bbox,
            'keypoints': keypoints,
            'labels': labels
        }

        return img, target

    def __len__(self):
        return len(self.images)

In [143]:
path_to_train = 'data/Menpo_68p/train/'
batch_size = 1

In [144]:
transform = transforms.Compose([transforms.ToTensor()])

trainset = FaceLandmarksDataset(dir_to_jpgs=path_to_train, transform=transform)
train_loader = torch.utils.data.DataLoader(trainset, batch_size=batch_size, shuffle=True)

In [145]:
path_to_val = 'data/Menpo_68p/test/'

In [146]:
valset = FaceLandmarksDataset(dir_to_jpgs=path_to_val, transform=transform)
val_loader = torch.utils.data.DataLoader(valset, batch_size=batch_size, shuffle=True)

In [147]:
torch.cuda.empty_cache()
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

In [148]:

model = torchvision.models.detection.keypointrcnn_resnet50_fpn(pretrained=False,
                                                                   num_classes=2,
                                                                   num_keypoints=68, 
                                                                   )
model.to(device)



KeypointRCNN(
  (transform): GeneralizedRCNNTransform(
      Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
      Resize(min_size=(640, 672, 704, 736, 768, 800), max_size=1333, mode='bilinear')
  )
  (backbone): BackboneWithFPN(
    (body): IntermediateLayerGetter(
      (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
      (bn1): FrozenBatchNorm2d(64, eps=1e-05)
      (relu): ReLU(inplace=True)
      (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
      (layer1): Sequential(
        (0): Bottleneck(
          (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (bn1): FrozenBatchNorm2d(64, eps=1e-05)
          (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
          (bn2): FrozenBatchNorm2d(64, eps=1e-05)
          (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (bn3): FrozenBatchNorm2d(256, 

In [149]:
optimizer = optim.Adam(model.parameters(), lr=0.02)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=120000, gamma=0.1)

In [150]:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
# if torch.cuda.is_available():
#     model = nn.DataParallel(model, device_ids=[device])

In [151]:
# for i, (sample, targets) in enumerate(train_loader):
#     print(targets.items())
#     targets = [{k: v.to(device) for k, v in targets.items()}]

#     if i == 2:
#         break

In [153]:
for epoch in range(1):
    model.train()
    pbar = tqdm(desc=f'Epoch {epoch+1} - train: ', total=len(train_loader))
    for i, (images, targets) in enumerate(train_loader):
        images = list(image.to(device) for image in images)
        targets = [{k: v.to(device) for k, v in targets.items()}]

        # Forward pass
        loss_dict = model(images, targets)
        losses = loss_dict['loss_keypoint'] # consider only keypoint loss

        optimizer.zero_grad()
        losses.backward()
        optimizer.step()


        pbar.update(1)

        del images, targets, loss_dict
        torch.cuda.empty_cache()

    pbar.close()

Epoch 1 - train:  25%|██▍       | 1478/6018 [1:00:36<3:06:08,  2.46s/it]
Epoch 1 - train:   0%|          | 5/6018 [00:08<2:40:41,  1.60s/it]