In [2]:
import os, sys
ROOT_DIR = os.path.dirname(os.path.abspath(os.curdir))
sub_module_dir = os.path.join(ROOT_DIR, 'retinaface-tf2')
if sub_module_dir not in sys.path: # add retinaface-tf2 repo to PATH
    sys.path.append(sub_module_dir)

from pathlib import Path
import time
import base64

import matplotlib.pyplot as plt
from modules.models import RetinaFaceModel
from modules.utils import pad_input_image, recover_pad_output, load_yaml
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image, ImageDraw, ImageFont
from io import BytesIO
import cv2

# Disable TF warnings
tf.get_logger().setLevel('ERROR')

BASEWIDTH = 850


## Load Face Detection Model

In [21]:
retina_face_dir = Path(ROOT_DIR) / 'retinaface-tf2'
retinaface_cfg_path = retina_face_dir / 'configs' / 'retinaface_mbv2.yaml'
retinaface_cfg = load_yaml(retinaface_cfg_path)

# define network
model = RetinaFaceModel(retinaface_cfg, training=False, iou_th=0.4, score_th=0.5)

# load checkpoint
checkpoint_dir = retina_face_dir / retinaface_cfg['sub_name']
checkpoint = tf.train.Checkpoint(model=model)
checkpoint.restore(tf.train.latest_checkpoint(checkpoint_dir))

/Users/baturayofluoglu/Workspace/face-mask-detection/retinaface-tf2
/Users/baturayofluoglu/Workspace/face-mask-detection/retinaface-tf2/configs/retinaface_mbv2.yaml
/Users/baturayofluoglu/Workspace/face-mask-detection/retinaface-tf2/retinaface_mbv2


<tensorflow.python.training.tracking.util.CheckpointLoadStatus at 0x15f0a94d0>

## Load Mask Classification Model

In [22]:
classifier_dir = Path(ROOT_DIR) / 'data' / 'classifier_model_weights'
classifier = tf.keras.models.load_model(classifier_dir / 'best.h5')

In [23]:
def resize_image(img):
    # Resize image by keeping the aspect ratio if image witdth is greater than BASEWIDTH
    if img.size[0] > BASEWIDTH:
        wpercent = (BASEWIDTH / float(img.size[0]))
        hsize = int((float(img.size[1])*float(wpercent)))
        img = img.resize((BASEWIDTH,hsize), Image.ANTIALIAS)
    return img

def detect_faces(img_raw):
    img = np.float32(img_raw.copy())
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    
    img, pad_params = pad_input_image(img, max_steps=max(retinaface_cfg['steps']))
    detected_faces = model(img[np.newaxis, ...]).numpy()
    return recover_pad_output(detected_faces, pad_params)

def get_detected_face_coordinates(detected_faces, img_width, img_height):
    detected_face_coordinates = []
    for detected_face in detected_faces: 
        x1 = int(detected_face[0] * img_width)
        y1 = int(detected_face[1] * img_height)
        x2 = int(detected_face[2] * img_width)
        y2 = int(detected_face[3] * img_height)
        detected_face_coordinates.append([x1, y1, x2, y2])
    return detected_face_coordinates

In [24]:
def classify_faces(img_raw, face_coords):
    classification_scores = []
    # Iterate over detected face coordinates to find
    for coords in face_coords:
        x1, y1, x2, y2 = coords
        cropped_face = img_raw.crop(coords)
        cropped_face_np = np.float32(cropped_face)
        img = cv2.cvtColor(cropped_face_np, cv2.COLOR_BGR2RGB)
        img = cv2.resize(img, (112, 112)) 
        preprocessed_img = tf.keras.applications.mobilenet.preprocess_input(img)
        preprocessed_img = preprocessed_img[np.newaxis, ...]
        pred = classifier.predict(preprocessed_img)[0][0]
        classification_scores.append(pred)
    return classification_scores

In [31]:
def annotate_image(img, face_coords, classified_face_scores, classification_labels):
    pil_draw = ImageDraw.Draw(img)
    for idx, coords in enumerate(face_coords):
        x1, y1, x2, y2 = coords
        label = classification_labels[idx]
        color = 'green' if label == 'masked' else 'red'
        display_str = "{}: {:.2f}".format(label, classified_face_scores[idx])

        # Draw rectangle for detected face
        pil_draw.rectangle([x1, y1, x2, y2], outline=color, width=3)
        
        # Draw label text box
        # portion of image width you want text width to be
        img_fraction = 0.2
        font_size = 5  # starting font size
        
        font = ImageFont.truetype(os.path.join(ROOT_DIR, "arial.ttf"), font_size)
        image_size = img.size[0]
        
        while font.getsize(display_str)[0] < img_fraction * image_size:
            # iterate until the text size is just larger than the criteria
            font_size += 1
            font = ImageFont.truetype(os.path.join(ROOT_DIR, "arial.ttf"), font_size)

        # Find coordinates of bounding text box
        w, h = font.getsize(display_str)
        pil_draw.rectangle([x1, y1, x1 + w, y1 + h], fill=color)
        pil_draw.text((x1, y1), display_str, font=font)
    return img

def convert_pil_to_base64(annotated_image, image_type):
    buffered = BytesIO()
    if image_type == 'jpg':
        annotated_image.save(buffered, format='jpeg')
    else:
        annotated_image.save(buffered, format=image_type)
    annotated_image_base64 = base64.b64encode(buffered.getvalue())
    return annotated_image_base64.decode('utf-8')

In [32]:
#' @dploy endpoint predict
def predict_masked_faces(body):
    base64_image = body['image'].encode('utf-8')
    image_type = body['type']
    
    # Convert image from base64 to PIL Image and resize it to improve the performance
    img_raw = Image.open(BytesIO(base64.b64decode(base64_image)))
    img_raw = resize_image(img_raw)

    # Detect face coordinates from the raw image
    detected_faces = detect_faces(img_raw)
    print(detected_faces)
    # Get detected face coordinates
    img_width, img_height  = img_raw.size
    face_coords = get_detected_face_coordinates(detected_faces, img_width, img_height)
    
    # Classify detected faces whether they have a mask or not
    classified_face_scores = classify_faces(img_raw, face_coords)
    
    # Find labels
    classification_labels = np.where(np.array(classified_face_scores) > 0.5, 'masked', 'not masked').tolist()
    
    # Annotate base image with detected faces and mask classification
    annotated_image = annotate_image(img_raw, face_coords, classified_face_scores, classification_labels)
    
    # Convert PIL image to base64
    annotated_image_base_64 = convert_pil_to_base64(annotated_image, image_type)
    
    return {
            'detected_face_coordinates': face_coords,
            'detected_mask_scores': classified_face_scores,
            'detected_face_labels': classification_labels,
            'annotated_image': annotated_image_base_64
           }

In [37]:
import json
_time = time.time()
with open(os.path.join(ROOT_DIR, "1.jpg"), "rb") as img_file:
    body = {
        'image': base64.b64encode(img_file.read()).decode('utf-8'),
        'type': 'jpeg'
    }
with open('./test_data.json', 'w') as f:
    json.dump(body, f)

with open('./test_data.json', 'r') as f:
    request_body = json.load(f)
    response = predict_masked_faces(request_body)

#Image.open(BytesIO(base64.b64decode(response['image']))).show()
print(time.time() - _time)

[[0.38385132 0.12743628 0.5838287  0.518403   0.43789974 0.29725102
  0.5258882  0.29297885 0.4824697  0.3529638  0.4508822  0.42788082
  0.5200763  0.42299545 1.         0.9999862 ]
 [0.0960518  0.7258081  0.3056761  0.96894056 0.15428889 0.9007342
  0.18773465 0.9013555  0.15501799 0.9637476  0.19403099 0.9545578
  0.20730095 0.9709965  1.         0.58616716]]
1.1999261379241943
