In [None]:

import os, sys
import numpy as np
import tensorflow as tf
import tensorflow_addons as tfa
from tensorflow import keras
from tensorflow.keras.layers import Input, Flatten, Add, concatenate, Dense, Conv2D, Dropout, BatchNormalization, Activation, AveragePooling2D, ZeroPadding2D, LeakyReLU
from tensorflow.keras.models import Model, load_model
from tensorflow.keras.utils import to_categorical, plot_model
from tensorflow.keras.preprocessing.image import img_to_array, load_img, ImageDataGenerator
from tensorflow.keras.applications import MobileNetV2, MobileNetV3Small
from tensorflow.keras.applications.mobilenet_v2 import preprocess_input
from sklearn.model_selection import train_test_split

physical_devices = tf.config.experimental.list_physical_devices('GPU')
config = tf.config.experimental.set_memory_growth(physical_devices[0], True)
print(f'Running on Python {sys.version}, Tensorflow {tf.__version__}.')

In [None]:
DataPath = 'Dataset'
maskImage = DataPath + '/with_mask'
unmaskImage = DataPath + '/without_mask'
data, labels = [], []
img_height, img_width = 224, 224

for imagePath in os.listdir(maskImage):
    label = '1'  # with mask
    image = load_img(os.path.join(maskImage,imagePath), target_size=(img_height, img_width))
    # image = img_to_array(image)  # for v3 since v3 no need preprocess
    image = preprocess_input(img_to_array(image))

    # update the data and labels lists, respectively
    data.append(image)
    labels.append(label)

for imagePath in os.listdir(unmaskImage):
    label = '0'  # without mask
    image = load_img(os.path.join(unmaskImage,imagePath), target_size=(img_height, img_width))
    # image = img_to_array(image)  # for v3 since v3 no need preprocess
    image = preprocess_input(img_to_array(image))

    # update the data and labels lists, respectively
    data.append(image)
    labels.append(label)

data = np.array(data, dtype="float32")
labels = to_categorical(np.array(labels))
assert len(data) == len(labels), 'Length of data and labels mismatch!'
print(f'{len(data)} images loaded from {DataPath}')

In [None]:
train_x, test_x, train_y, test_y = train_test_split(data, labels, test_size=0.15, stratify=labels, shuffle=True, random_state=69)
train_x, val_x, train_y, val_y = train_test_split(train_x, train_y, test_size=0.20, stratify=train_y, shuffle=True, random_state=69)


In [None]:
ImageGen = ImageDataGenerator(rotation_range=20, zoom_range=0.15, width_shift_range=0.2, height_shift_range=0.2, shear_range=0.15, horizontal_flip=True, fill_mode="nearest")

In [None]:
def h_swish(x):
    return x * tf.nn.relu6(x+3) / 6  # replace with 0.16666667 if cant convert to tflite

# Conv Block H-swish
def CBH(x, filters, kernel_size, strides, padding="same", zero_pad=False):
    if zero_pad:
        x = ZeroPadding2D(((1, 0), (1, 0)))(x)
    x = Conv2D(filters=filters, kernel_size=kernel_size, strides=strides, padding=padding, use_bias=False)(x)
    x = BatchNormalization(epsilon=1.001e-5)(x)
    return h_swish(x)

def ResUnit(x, filters, padding="same", zero_pad=False):
    xShort = x
    x = CBH(x, filters, kernel_size=1, strides=1, padding=padding, zero_pad=zero_pad)
    x = CBH(x, filters, kernel_size=3, strides=1, padding=padding, zero_pad=zero_pad)
    return Add()([x, xShort])

def CSP1_X(x, no, filters, strides, padding="same", zero_pad=False):
    xShort = x
    x = CBH(x, filters=filters, kernel_size=1, strides=1, padding=padding, zero_pad=zero_pad)
    for _ in range(no):
        x = ResUnit(x, filters, padding, zero_pad)
    x = Conv2D(filters, kernel_size=3, strides=strides, padding=padding)(x)
    xShort = Conv2D(filters, kernel_size=1, strides=1, padding=padding)(xShort)
    x = concatenate([x, xShort])
    x = BatchNormalization(epsilon=1.001e-5)(x)
    x = LeakyReLU(alpha=0.1)(x)
    return CBH(x, filters, kernel_size=1, strides=strides, padding=padding, zero_pad=zero_pad)

def csp_darknet53(input_shape):
    """
    CSPDarknet53 implementation based on AlexeyAB/darknet config
    https://github.com/AlexeyAB/darknet/blob/master/cfg/yolov4.cfg
    """
    inputs = tf.keras.Input(shape=input_shape)

    # First downsampling: L29 -> L103
    x = conv_bn(inputs, filters=32, kernel_size=3, strides=1, activation="mish")

    # This block could be expressed as a CSPBlock with modification of num_filters in the middle
    # For readability purpose, we chose to keep the CSPBlock as simple as possible and have a little redondancy
    x = conv_bn(
        x,
        filters=64,
        kernel_size=3,
        strides=2,
        zero_pad=True,
        padding="valid",
        activation="mish",
    )
    route = conv_bn(x, filters=64, kernel_size=1, strides=1, activation="mish")

    shortcut = conv_bn(x, filters=64, kernel_size=1, strides=1, activation="mish")
    x = conv_bn(shortcut, filters=32, kernel_size=1, strides=1, activation="mish")
    x = conv_bn(x, filters=64, kernel_size=3, strides=1, activation="mish")

    x = x + shortcut
    x = conv_bn(x, filters=64, kernel_size=1, strides=1, activation="mish")
    x = tf.keras.layers.Concatenate()([x, route])
    x = conv_bn(x, filters=64, kernel_size=1, strides=1, activation="mish")

    # Second downsampling: L105 -> L191
    x = csp_block(x, filters=128, num_blocks=2)

    # Third downsampling: L193 -> L400
    output_1 = csp_block(x, filters=256, num_blocks=8)

    # Fourth downsampling: L402 -> L614
    output_2 = csp_block(output_1, filters=512, num_blocks=8)

    # Fifth downsampling: L616 -> L744
    output_3 = csp_block(output_2, filters=1024, num_blocks=4)

    return tf.keras.Model(inputs, [output_1, output_2, output_3], name="CSPDarknet53")

def conv_classes_anchors(inputs, num_anchors_stage, num_classes):
    """
    Applies Conv2D based on the number of anchors and classifications classes, then reshape the Tensor.
    TODO:  doubleCheck use_bias=True: is False in yolov3_tf2, is not specified in yolov4.cfg and True in pytorch yolov4.
    Args:
        inputs (tf.Tensor): 4D (N,H,W,C) input tensor
        num_anchors_stage (int): Number of anchors for the given output stage
        num_classes (int): Number of classes
    Returns:
        tf.Tensor: 5D (N,H,W,num_anchors_stage,num_classes+5) output tensor.
            The last dimension contains the 4 box coordinates regression factors, the 1 objectness score,
            and the num_classes confidence scores
    """
    x = Conv2D(filters=num_anchors_stage * (num_classes + 5), kernel_size=1, strides=1, padding="same", use_bias=True)(inputs)
    x = tf.keras.layers.Reshape((x.shape[1], x.shape[2], num_anchors_stage, num_classes + 5))(x)
    return x

def yolov3_boxes_regression(feats_per_stage, anchors_per_stage):
    """
    Applies the yolov4 box regression algorithm on the output of a stage.
    Args:
        feats_per_stage (tf.Tensor): 5D (N,grid_x,grid_y,num_anchors_per_stage,4+1+num_classes). The last dimension
            consists in (x, y, w, h, obj, ...classes)
        anchors_per_stage (numpy.array[int, 2]): List of 3 numpy arrays containing the anchor used for each stage.
            The first and second columns respectively contain the anchors width and height.
        (int): Maximum number of boxes predicted on each image (across all anchors/stages)
    Returns:
        List[tf.Tensor]: 4 Tensors respectively describing
        bbox (N,grid_x,grid_y,num_anchors,4),
        objectness (N,grid_x,grid_y,num_anchors,1),
        class_probs (N,grid_x,grid_y,num_anchors,num_classes),
    """
    grid_size_x, grid_size_y = feats_per_stage.shape[1], feats_per_stage.shape[2]
    num_classes = feats_per_stage.shape[-1] - 5  # feats.shape[-1] = 4 + 1 + num_classes

    box_xy, box_wh, objectness, class_probs = tf.split(feats_per_stage, (2, 2, 1, num_classes), axis=-1)

    box_xy = tf.sigmoid(box_xy)
    objectness = tf.sigmoid(objectness)
    class_probs = tf.sigmoid(class_probs)

    grid = tf.meshgrid(tf.range(grid_size_y), tf.range(grid_size_x))
    grid = tf.expand_dims(tf.stack(grid, axis=-1), axis=2)  # [gy, gx, 1, 2]

    box_xy = (box_xy + tf.cast(grid, tf.float32)) / tf.constant([grid_size_y, grid_size_x], dtype=tf.float32)
    box_wh = tf.exp(box_wh) * anchors_per_stage

    box_x1y1 = box_xy - box_wh / 2
    box_x2y2 = box_xy + box_wh / 2
    bbox = tf.concat([box_x1y1, box_x2y2], axis=-1)

    return bbox, objectness, class_probs


def yolo_nms(yolo_feats, yolo_max_boxes, yolo_iou_threshold, yolo_score_threshold):
    """
    Applies the non max suppression to YOLO features and returns predicted boxes
    Args:
        yolo_feats (List[Tuple[tf.Tensor]]): For each output stage, is a 3-tuple of 5D tensors corresponding to
            bbox (N,grid_x,grid_y,num_anchors,4),
            objectness (N,grid_x,grid_y,num_anchors,4),
            class_probs (N,grid_x,grid_y,num_anchors,num_classes),
        yolo_max_boxes (int): Maximum number of boxes predicted on each image (across all anchors/stages)
        yolo_iou_threshold (float between 0. and 1.): IOU threshold defining whether close boxes will be merged
            during non max regression.
        yolo_score_threshold (float between 0. and 1.): Boxes with score lower than this threshold will be filtered
            out during non max regression.
    Returns:
        List[tf.Tensor]: 4 Tensors(N,yolo_max_boxes) respectively describing boxes, scores, classes, valid_detections
    """
    bbox_per_stage, objectness_per_stage, class_probs_per_stage = [], [], []

    for stage_feats in yolo_feats:
        num_boxes = (
            stage_feats[0].shape[1] * stage_feats[0].shape[2] * stage_feats[0].shape[3]
        )  # num_anchors * grid_x * grid_y
        bbox_per_stage.append(
            tf.reshape(
                stage_feats[0],
                (tf.shape(stage_feats[0])[0], num_boxes, stage_feats[0].shape[-1]),
            )
        )  # [None,num_boxes,4]
        objectness_per_stage.append(
            tf.reshape(
                stage_feats[1],
                (tf.shape(stage_feats[1])[0], num_boxes, stage_feats[1].shape[-1]),
            )
        )  # [None,num_boxes,1]
        class_probs_per_stage.append(
            tf.reshape(
                stage_feats[2],
                (tf.shape(stage_feats[2])[0], num_boxes, stage_feats[2].shape[-1]),
            )
        )  # [None,num_boxes,num_classes]

    bbox = tf.concat(bbox_per_stage, axis=1)
    objectness = tf.concat(objectness_per_stage, axis=1)
    class_probs = tf.concat(class_probs_per_stage, axis=1)

    boxes, scores, classes, valid_detections = tf.image.combined_non_max_suppression(
        boxes=tf.expand_dims(bbox, axis=2),
        scores=objectness * class_probs,
        max_output_size_per_class=yolo_max_boxes,
        max_total_size=yolo_max_boxes,
        iou_threshold=yolo_iou_threshold,
        score_threshold=yolo_score_threshold,
    )

    return [boxes, scores, classes, valid_detections]

def yolov3_head(
    input_shapes,
    anchors,
    num_classes,
    training,
    yolo_max_boxes,
    yolo_iou_threshold,
    yolo_score_threshold,
):
    """
    Returns the YOLOv3 head, which is used in YOLOv4
    Args:
        input_shapes (List[Tuple[int]]): List of 3 tuples, which are the output shapes of the neck.
            None dimensions are ignored.
            For CSPDarknet53+YOLOv4_neck, those are: [ (52, 52, 128), (26, 26, 256), (13, 13, 512)] for a (416,
            416) input.
        anchors (List[numpy.array[int, 2]]): List of 3 numpy arrays containing the anchor sizes used for each stage.
            The first and second columns of the numpy arrays respectively contain the anchors width and height.
        num_classes (int): Number of classes.
        training (boolean): If False, will output boxes computed through YOLO regression and NMS, and YOLO features
            otherwise. Set it True for training, and False for inferences.
        yolo_max_boxes (int): Maximum number of boxes predicted on each image (across all anchors/stages)
        yolo_iou_threshold (float between 0. and 1.): IOU threshold defining whether close boxes will be merged
            during non max regression.
        yolo_score_threshold (float between 0. and 1.): Boxes with score lower than this threshold will be filtered
            out during non max regression.
    Returns:
        tf.keras.Model: Head model
    """
    input_1 = tf.keras.Input(shape=filter(None, input_shapes[0]))
    input_2 = tf.keras.Input(shape=filter(None, input_shapes[1]))
    input_3 = tf.keras.Input(shape=filter(None, input_shapes[2]))

    x = conv_bn(input_1, filters=256, kernel_size=3, strides=1, activation="leaky_relu")
    output_1 = conv_classes_anchors(
        x, num_anchors_stage=len(anchors[0]), num_classes=num_classes
    )

    x = conv_bn(
        input_1,
        filters=256,
        kernel_size=3,
        strides=2,
        zero_pad=True,
        padding="valid",
        activation="leaky_relu",
    )
    x = tf.keras.layers.Concatenate()([x, input_2])
    x = conv_bn(x, filters=256, kernel_size=1, strides=1, activation="leaky_relu")
    x = conv_bn(x, filters=512, kernel_size=3, strides=1, activation="leaky_relu")
    x = conv_bn(x, filters=256, kernel_size=1, strides=1, activation="leaky_relu")
    x = conv_bn(x, filters=512, kernel_size=3, strides=1, activation="leaky_relu")
    connection = conv_bn(
        x, filters=256, kernel_size=1, strides=1, activation="leaky_relu"
    )
    x = conv_bn(
        connection, filters=512, kernel_size=3, strides=1, activation="leaky_relu"
    )
    output_2 = conv_classes_anchors(
        x, num_anchors_stage=len(anchors[1]), num_classes=num_classes
    )

    x = conv_bn(
        connection,
        filters=512,
        kernel_size=3,
        strides=2,
        zero_pad=True,
        padding="valid",
        activation="leaky_relu",
    )
    x = tf.keras.layers.Concatenate()([x, input_3])
    x = conv_bn(x, filters=512, kernel_size=1, strides=1, activation="leaky_relu")
    x = conv_bn(x, filters=1024, kernel_size=3, strides=1, activation="leaky_relu")
    x = conv_bn(x, filters=512, kernel_size=1, strides=1, activation="leaky_relu")
    x = conv_bn(x, filters=1024, kernel_size=3, strides=1, activation="leaky_relu")
    x = conv_bn(x, filters=512, kernel_size=1, strides=1, activation="leaky_relu")
    x = conv_bn(x, filters=1024, kernel_size=3, strides=1, activation="leaky_relu")
    output_3 = conv_classes_anchors(
        x, num_anchors_stage=len(anchors[2]), num_classes=num_classes
    )

    if training:
        return tf.keras.Model(
            [input_1, input_2, input_3],
            [output_1, output_2, output_3],
            name="YOLOv3_head",
        )

    predictions_1 = tf.keras.layers.Lambda(
        lambda x_input: yolov3_boxes_regression(x_input, anchors[0]),
        name="yolov3_boxes_regression_small_scale",
    )(output_1)
    predictions_2 = tf.keras.layers.Lambda(
        lambda x_input: yolov3_boxes_regression(x_input, anchors[1]),
        name="yolov3_boxes_regression_medium_scale",
    )(output_2)
    predictions_3 = tf.keras.layers.Lambda(
        lambda x_input: yolov3_boxes_regression(x_input, anchors[2]),
        name="yolov3_boxes_regression_large_scale",
    )(output_3)

    output = tf.keras.layers.Lambda(
        lambda x_input: yolo_nms(
            x_input,
            yolo_max_boxes=yolo_max_boxes,
            yolo_iou_threshold=yolo_iou_threshold,
            yolo_score_threshold=yolo_score_threshold,
        ),
        name="yolov4_nms",
    )([predictions_1, predictions_2, predictions_3])

    return tf.keras.Model([input_1, input_2, input_3], output, name="YOLOv3_head")