In [1]:
import os
import argparse
import torch
import torch.backends.cudnn as cudnn
import numpy as np
from data import cfg_mnet, cfg_re50
from layers.functions.prior_box import PriorBox
from utils.nms.py_cpu_nms import py_cpu_nms
import cv2
from models.retinaface import RetinaFace
from utils.box_utils import decode, decode_landm
import time
import matplotlib.pyplot as plt
from torchvision.transforms.functional import crop
from torchvision.transforms.functional import rotate
from torchvision.transforms.functional import InterpolationMode

In [2]:
#parser = argparse.ArgumentParser(description='Retinaface')

#parser.add_argument('-m', '--trained_model', default='./weights/Resnet50_Final.pth',type=str, help='Trained state_dict file path to open')
#parser.add_argument('--network', default='resnet50', help='Backbone network mobile0.25 or resnet50')
#parser.add_argument('--cpu', action="store_true", default=False, help='Use cpu inference')
#parser.add_argument('--confidence_threshold', default=0.02, type=float, help='confidence_threshold')
#parser.add_argument('--top_k', default=5000, type=int, help='top_k')
#parser.add_argument('--nms_threshold', default=0.4, type=float, help='nms_threshold')
#parser.add_argument('--keep_top_k', default=750, type=int, help='keep_top_k')
#parser.add_argument('-s', '--save_image', action="store_true", default=True, help='show detection results')
#parser.add_argument('--vis_thres', default=0.6, type=float, help='visualization_threshold')
#args = parser.parse_args()

In [3]:
def check_keys(model, pretrained_state_dict):
    ckpt_keys = set(pretrained_state_dict.keys())
    model_keys = set(model.state_dict().keys())
    used_pretrained_keys = model_keys & ckpt_keys
    unused_pretrained_keys = ckpt_keys - model_keys
    missing_keys = model_keys - ckpt_keys
    #print('Missing keys:{}'.format(len(missing_keys)))
    #print('Unused checkpoint keys:{}'.format(len(unused_pretrained_keys)))
    #print('Used keys:{}'.format(len(used_pretrained_keys)))
    assert len(used_pretrained_keys) > 0, 'load NONE from pretrained checkpoint'
    return True

In [4]:
def remove_prefix(state_dict, prefix):
    ''' Old style model is stored with all names of parameters sharing common prefix 'module.' '''
    #print('remove prefix \'{}\''.format(prefix))
    f = lambda x: x.split(prefix, 1)[-1] if x.startswith(prefix) else x
    return {f(key): value for key, value in state_dict.items()}

In [5]:
def load_model(model, pretrained_path, load_to_cpu):
    print('Loading pretrained model from {}'.format(pretrained_path))
    if load_to_cpu:
        pretrained_dict = torch.load(pretrained_path, map_location=lambda storage, loc: storage)
    else:
        device = torch.cuda.current_device()
        pretrained_dict = torch.load(pretrained_path, map_location=lambda storage, loc: storage.cuda(device))
        print("Model loaded to GPU")
    if "state_dict" in pretrained_dict.keys():
        pretrained_dict = remove_prefix(pretrained_dict['state_dict'], 'module.')
    else:
        pretrained_dict = remove_prefix(pretrained_dict, 'module.')
    check_keys(model, pretrained_dict)
    model.load_state_dict(pretrained_dict, strict=False)
    return model

In [6]:
def face_select(dets, selec_thresh):
    previous_area = 0
    max_area = 0
    #print("dets", dets)
    prev_coords = np.zeros_like(dets[0])
    coords = np.zeros_like(dets[0])

    for b in dets:
        #if b[4] < selec_thresh: # Excludes lower score detections indicating possible background faces
            #continue
        
        height = b[3]-b[1] #ymax-ymin
        width = b[2]-b[0] #xmax-xmin
    
        b = list(map(int, b))
        bbox_area = width*height
        #print(len(dets))
        #print("test", bbox_area, previous_area)
        
        if len(dets) == 1: # Only one face present in the picture
            max_area = bbox_area
            coords[:] = b
        else:
            if bbox_area > previous_area:
                previous_area = bbox_area
                prev_coords[:] = b
            else:
                max_area = previous_area
                coords [:] = prev_coords
    face = np.append(coords, max_area)

    return face
    #if tensor is not None:
        #percentage = 10
        #w_margin = 1 + (percentage/100)
        #h_margin = 1 + (percentage/100)
    
        #pre_crop_height = (coords[3]-coords[1]) * h_margin #ymax-ymin
        #pre_crop_width = (coords[2]-coords[0]) * w_margin #xmax-xmin
    
        #pre_crop_coordinates = [int(coords[1]), int(coords[0]), int(pre_crop_height), int(pre_crop_width)]
        #pre_cropped_tensor = crop(tensor, *pre_crop_coordinates)
    
        #print("The maximum area corresponds to the face closer to the camera and is equal to {}.".format(max_area))
        #return face, pre_cropped_tensor
    #else:
        #return face

In [7]:
def detection_model(network="resnet50"):
    if network == "mobile0.25":
        cfg = cfg_mnet
        trained_model = "./weights/mobilenet0.25_Final.pth"
    elif network == "resnet50":
        cfg = cfg_re50
        trained_model = "./weights/Resnet50_Final.pth"
    # net and model
    net = RetinaFace(cfg=cfg, phase = 'test')
    net = load_model(net, trained_model, False)
    net.eval()
    cudnn.benchmark = True
    device = torch.device('cuda:0' if torch.cuda.is_available() else "cpu") # Defines the computation device (cuda:0 => GPU)
    net = net.to(device)
    
    return net, cfg, device

In [8]:
def crop_align(img, dets, selec_thresh, net, cfg, device):
    '''
    b[0], b[1] is the top left corner of the bounding box
    b[2], b[3] is the lower right corner of the bounding box
    b[4] relates to the the score of the detection
    b[5], b[6] is the left eye
    b[7], b[8] is the right eye
    b[9], b[10] is the nose
    b[11], b[12] is the left of the mouth
    b[13], b[14] is the right of the mouth
    '''
    
    #img_raw = cv2.imread(img_path, cv2.IMREAD_COLOR)
    
    face_coords = face_select(dets, selec_thresh)
    face_coords = list(map(int, face_coords)) # Coordinates must be integers
    
    print("face coords -----", face_coords)
    # -------------------- Rotation Stage ---------------------
    left_eye = (face_coords[5], face_coords[6]) # Components: (x, y)
    right_eye = (face_coords[7], face_coords[8])
    if left_eye[1] > right_eye[1]:               # Right eye is higher
        # Clock-wise rotation
        aux_point = (right_eye[0], left_eye[1])
        a = right_eye[0] - left_eye[0]
        b = right_eye[1] - aux_point[1]
        
        #cv2.circle(img_raw, left_eye, 10, (0, 255, 0), 4)
        #cv2.circle(img_raw, right_eye, 10, (0, 255, 0), 4)
        #cv2.circle(img_raw, aux_point, 10, (0, 255, 0), 4)
        
        #cv2.line(img_raw, left_eye, right_eye, (23, 23, 23), 2)
        #cv2.line(img_raw, aux_point, right_eye, (23, 23, 23), 2)
        #cv2.line(img_raw, left_eye, aux_point, (23, 23, 23), 2)
        #plt.imshow(cv2.cvtColor(img_raw, cv2.COLOR_BGR2RGB)) 
        
        theta = np.rad2deg(np.arctan(b/a)) # Angle of rotation in degrees
        #print("Right eye is higher, therefore, a clock-wise rotation of {} is applied".format(-theta))
        rotated_tensor = rotate(img.squeeze(), angle=theta, interpolation=InterpolationMode.BILINEAR, center=right_eye)

    else:                                        # Left eye is higher
        # Counter clock-wise rotation
        aux_point = (left_eye[0], right_eye[1])
        a = right_eye[0] - left_eye[0]
        b = left_eye[1] - aux_point[1]
        
        #cv2.circle(img_raw, left_eye, 10, (0, 255, 0), 4)
        #cv2.circle(img_raw, right_eye, 10, (0, 255, 0), 4)
        #cv2.circle(img_raw, aux_point, 10, (0, 255, 0), 4)
        
        #plt.imshow(img_raw)
        
        theta = np.rad2deg(np.arctan(b/a))
        #print("Left eye is higher, therefore, a clock-wise rotation of {} degrees is applied".format(-theta))
        rotated_tensor = rotate(img.squeeze(), angle=-theta, interpolation=InterpolationMode.BILINEAR, center=left_eye)
        
    #plt.imshow(rotated_tensor.squeeze().permute(1, 2, 0).cpu().numpy().astype(int))
    
    # -------------------- New Bounding Box computing ---------------------
    # The image is rotated, a new bbox must be generated. 
    
    # TBD: optimization by performing a preliminary crop in order to try and isolate only the relevant face
    
    loc, conf, _ = net(rotated_tensor.unsqueeze(0))  # Forward pass that gives the results <--------------
    
    im_height = rotated_tensor.shape[1]
    im_width = rotated_tensor.shape[2]
    
    resize = 1
    new_scale = torch.Tensor([rotated_tensor.shape[2], rotated_tensor.shape[1], rotated_tensor.shape[2], rotated_tensor.shape[1]])
    new_scale = new_scale.to(device)
    
    new_priorbox = PriorBox(cfg, image_size=(im_height, im_width))
    new_priors = new_priorbox.forward()
    new_priors = new_priors.to(device)
    new_prior_data = new_priors.data
    
    new_boxes = decode(loc.data.squeeze(0), new_prior_data, cfg['variance'])
    new_boxes = new_boxes * new_scale / resize
    new_boxes = new_boxes.cpu().numpy() # Tensor is moved to CPU (numpy doesn't support GPU)
    new_scores = conf.squeeze(0).data.cpu().numpy()[:, 1]

    # Score's threshold
    confidence_threshold = 0.02 # Default value
    inds = np.where(new_scores > confidence_threshold)[0]
    new_boxes = new_boxes[inds]
    new_scores = new_scores[inds]

    # keep top-K before NMS
    top_k = 500 # Default value
    order = new_scores.argsort()[::-1][:top_k] # Extracts the indexes relating to the top scores
    new_boxes = new_boxes[order] # Array [300, 4] where in each line are the coordinates
    new_scores = new_scores[order] # Array [1, 300]
    
    # do NMS
    nms_threshold = 0.4 # Default value
    new_dets = np.hstack((new_boxes, new_scores[:, np.newaxis])).astype(np.float32, copy=False)
    keep = py_cpu_nms(new_dets, nms_threshold)
    new_dets = new_dets[keep, :]

    # keep top-K faster NMS
    #keep_top_k = 500 # Default value
    #new_dets = new_dets[:keep_top_k, :]
    
    #rotated_bbox = new_dets[0]
    rotated_bbox = face_select(new_dets, selec_thresh)
    #print("rotated_bbox 1", rotated_bbox)
    rotated_bbox = list(map(int, rotated_bbox))
    #print("rotated_bbox 2", rotated_bbox)
    
    
    # -------------------- Cropping Stage ---------------------
    crop_height = rotated_bbox[3]-rotated_bbox[1] #ymax-ymin
    crop_width = rotated_bbox[2]-rotated_bbox[0] #xmax-xZmin
    crop_coordinates = (rotated_bbox[1], rotated_bbox[0], crop_height, crop_width)
    cropped_tensor = crop(rotated_tensor, *crop_coordinates)
    
    #plt.imshow(cropped_tensor.squeeze().permute(1, 2, 0).cpu().numpy().astype(int))
        
    image_array = cropped_tensor.permute(1,2,0).cpu().numpy()

    # Convert the numpy array to BGR format (required by OpenCV)
    cropped_image = cv2.cvtColor(image_array, cv2.COLOR_RGB2BGR)
    
    #if save == True:
        #new_name = "cropped_" + img_name
        #if not os.path.exists("cropped/"):
            #print("Result's directory created!")
            #os.makedirs("cropped/")
        #new_name = "cropped/" + new_name
        #cv2.imwrite(new_name, cropped_image)
    return cropped_tensor

In [9]:
# https://github.com/biubug6/Pytorch_Retinaface/
def face_detection(net, cfg, device, img):
    #save_image = False
    torch.set_grad_enabled(False)
    
    resize = 1

    # Testing stage
    img = img.permute(2, 0, 1)
    _, im_height, im_width = img.shape
    
    scale = torch.Tensor([img.shape[2], img.shape[1], img.shape[2], img.shape[1]])

    img = img.unsqueeze(0)
    img = img.to(device)
    scale = scale.to(device)
    
    tic = time.time()
    loc, conf, landms = net(img)  # Forward pass that gives the results <--------------
    print('Forward time: {:.4f}'.format(time.time() - tic))
        
    priorbox = PriorBox(cfg, image_size=(im_height, im_width))
    priors = priorbox.forward()
    priors = priors.to(device)
    prior_data = priors.data
    boxes = decode(loc.data.squeeze(0), prior_data, cfg['variance'])
    
    boxes = boxes * scale / resize
    boxes = boxes.cpu().numpy() # Tensor is moved to CPU (numpy doesn't support GPU)
    scores = conf.squeeze(0).data.cpu().numpy()[:,1]
    landms = decode_landm(landms.data.squeeze(0), prior_data, cfg['variance'])
    scale1 = torch.Tensor([img.shape[3], img.shape[2], img.shape[3], img.shape[2],
                            img.shape[3], img.shape[2], img.shape[3], img.shape[2],
                            img.shape[3], img.shape[2]])
    scale1 = scale1.to(device)
    landms = landms * scale1 / resize
    landms = landms.cpu().numpy()

    # Score's threshold
    confidence_threshold = 0.02 # Default value
    inds = np.where(scores > confidence_threshold)[0]
    boxes = boxes[inds]
    landms = landms[inds]
    scores = scores[inds]

    # keep top-K before NMS
    top_k = 500 # Default value
    order = scores.argsort()[::-1][:top_k] # Extracts the indexes relating to the top scores
    boxes = boxes[order] # Array [300, 4] where in each line are the coordinates
    landms = landms[order] # Array [300, 10]
    scores = scores[order] # Array [1, 300]

    # do NMS
    #print(boxes, boxes.shape)
    #print("")
    #print(scores, scores.shape)
    nms_threshold = 0.4 # Default value
    dets = np.hstack((boxes, scores[:, np.newaxis])).astype(np.float32, copy=False)
    keep = py_cpu_nms(dets, nms_threshold)
    dets = dets[keep, :]
    landms = landms[keep]

    # keep top-K faster NMS
    keep_top_k = 750 # Default value
    dets = dets[:keep_top_k, :]
    landms = landms[:keep_top_k, :]
    

    dets = np.concatenate((dets, landms), axis=1)

    cropped = crop_align(img, dets, 0.1, net, cfg, device)
    
    #plt.imshow(cropped.squeeze().permute(1, 2, 0).cpu().numpy().astype(int))
    
    #if cropped.is_cuda: print("tensor in GPU")

        # show image
        #vis_thres = 0.6
        #for b in dets:
            #if b[4] < vis_thres:
                #continue
            #text = "{:.4f}".format(b[4])
            #b = list(map(int, b))
            #cv2.rectangle(img_raw, (b[0], b[1]), (b[2], b[3]), (0, 0, 255), 2)
            #cx = b[0]
            #cy = b[1] + 12
            #cv2.circle(img_raw, (0, 0), 10, (0, 255, 0), 4)
            #cv2.circle(img_raw, (b[0], b[1]), 1, (255, 0, 255), 4)
            #cv2.circle(img_raw, (b[2], b[3]), 1, (255, 0, 255), 4)
            #cv2.putText(img_raw, text, (cx, cy),
                        #cv2.FONT_HERSHEY_DUPLEX, 0.5, (255, 255, 255))

            # landms
            #cv2.circle(img_raw, (b[5], b[6]), 1, (0, 0, 255), 4)
            #cv2.circle(img_raw, (b[7], b[8]), 1, (0, 255, 255), 4)
            #cv2.circle(img_raw, (b[9], b[10]), 1, (255, 0, 255), 4)
            #cv2.circle(img_raw, (b[11], b[12]), 1, (0, 255, 0), 4)
            #cv2.circle(img_raw, (b[13], b[14]), 1, (255, 0, 0), 4)
        
    
    #plt.imshow(cropped.permute(1, 2, 0).cpu().numpy().astype(int))
    return cropped

In [10]:
#------------- Creating custom dataset -------------

import torch
from torchvision import datasets, transforms
from dataset_split import DatasetSplitter
import torchvision.io as io

In [11]:
splitter = DatasetSplitter('/app/datasets/', '/app/data/', split = [80,20]) #Class isntance
splitter.split_dataset() #Splitting dataset into train, test (and val if needed)

train_dir, test_dir = splitter.data_dir()

data_transform = transforms.Compose([
    # Resize the images to 64x64
    #transforms.Resize(size=(64, 64)),
    # Flip the images randomly on the horizontal
    #transforms.RandomHorizontalFlip(p=0.5), # p = probability of flip, 0.5 = 50% chance
    # Turn the image into a torch.Tensor
    transforms.Lambda(lambda image: torch.tensor(np.array(image).astype(np.float32)).unsqueeze(0)) # this also converts all pixel values from 0 to 255 to be between 0.0 and 1.0 
])

train_data = datasets.ImageFolder(root=train_dir, # target folder of images
                                  transform=data_transform, # transforms to perform on data (images)
                                  target_transform=None) # transforms to perform on labels (if necessary)

test_data = datasets.ImageFolder(root=test_dir, 
                                 transform=data_transform
                                )

print(f"Train data:\n{train_data}\nTest data:\n{test_data}")


#print("tensor", train_data[0][0].shape)
#print("label", train_data[0][1])

Dataset split: 80% for training and 20% for testing.
Train data:
Dataset ImageFolder
    Number of datapoints: 570003
    Root location: /app/data/train_dir
    StandardTransform
Transform: Compose(
               Lambda()
           )
Test data:
Dataset ImageFolder
    Number of datapoints: 150000
    Root location: /app/data/test_dir
    StandardTransform
Transform: Compose(
               Lambda()
           )


In [12]:
#------------- DataLoader -------------
from torch.utils.data import DataLoader

In [13]:
workers = os.cpu_count()

train_dataloader = DataLoader(dataset=train_data, 
                              batch_size=32, # how many samples per batch?
                              num_workers=1, # how many subprocesses to use for data loading? (higher = more)
                              shuffle=False) # shuffle the data?

test_dataloader = DataLoader(dataset=test_data, 
                             batch_size=32, 
                             num_workers=1, 
                             shuffle=False) # don't usually need to shuffle testing data

train_dataloader, test_dataloader

(<torch.utils.data.dataloader.DataLoader at 0x7f065b13a460>,
 <torch.utils.data.dataloader.DataLoader at 0x7f065b13a430>)

In [14]:
#img_batch, label_batch = next(iter(train_dataloader))

# 2. Get a single image from the batch and unsqueeze the image so its shape fits the model
#print(train_dataloader)
#img_single, label_single = img_batch[0].squeeze(0), label_batch[0]
#print(img_single, img_single.shape)
#print(f"Single image shape: {img_single.shape}\n")
#print(f"Single image label: {label_single}\n")

#plt.imshow(img_single.squeeze(0).permute(1, 2, 0))

net, cfg, device = detection_model()

#cropped_face_tensor = face_detection(net, cfg, device, img_single)

#print(cropped_face_tensor.type)
#plt.imshow(cropped_face_tensor.permute(1, 2, 0).cpu().numpy().astype(int))

t_tic = time.time()

for (img_batch, label_batch) in train_dataloader:
    print("Currently in label :", label_batch)
    for img_single in img_batch:
        img_single = img_single.squeeze(0)
        cropped_face_tensor = face_detection(net, cfg, device, img_single)

print('Total time: {:.4f}'.format(time.time() - t_tic)) 



Loading pretrained model from ./weights/Resnet50_Final.pth
Model loaded to GPU
Currently in label : tensor([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0])
Forward time: 1.0498
face coords ----- [10, 6, 94, 108, 0, 35, 53, 74, 51, 58, 71, 40, 93, 71, 92, 8491]
Forward time: 0.0040
face coords ----- [20, 9, 91, 107, 0, 37, 51, 73, 51, 56, 73, 40, 89, 70, 89, 7044]
Forward time: 0.0039
face coords ----- [7, 7, 90, 107, 0, 32, 54, 73, 50, 57, 75, 39, 93, 70, 91, 8332]
Forward time: 0.0039
face coords ----- [2, 11, 73, 106, 0, 36, 49, 67, 50, 64, 70, 44, 93, 66, 94, 6784]
Forward time: 0.0039
face coords ----- [5, 13, 89, 107, 0, 33, 52, 74, 50, 60, 74, 38, 93, 70, 92, 7932]
Forward time: 0.0039
face coords ----- [7, 7, 86, 107, 0, 34, 53, 71, 50, 58, 73, 40, 93, 68, 91, 7955]
Forward time: 0.0039
face coords ----- [26, 11, 99, 106, 0, 38, 52, 75, 51, 51, 71, 41, 90, 70, 90, 6909]
Forward time: 0.0039
face coords ----- [12, 10, 94, 1

IndexError: index 0 is out of bounds for axis 0 with size 0