## Initializing opencv haarcascade face detection network
https://github.com/opencv/opencv/tree/master/data/haarcascades
explained in: https://www.youtube.com/watch?v=7IFhsbfby9s&t=300s (or gitHub)

In [1]:
import cv2
import os

In [2]:
def convertImg(img):
    return cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)

def faceNetLocalize(img, **kwargs):
    scaleFactor = kwargs.get('scaleFactor', 1.1) #between 1.05 (quality) and 1.4 (speed) recommended (scale of the faces we search for)
    minNeighbors = kwargs.get('minNeighbors', 4) #between 3 (quantity) and 6 (quality) recommended
    minSize = kwargs.get('minSize', (10, 10)) #min size of a face in the picture
    faceNet = kwargs.get('faceNet', init_faceNet())
    
    img_cvt = convertImg(img)
    return faceNet.detectMultiScale3(img_cvt, scaleFactor=scaleFactor, minNeighbors=minNeighbors, minSize=minSize, outputRejectLevels = True)

def init_faceNet(**kwargs):
    path = kwargs.get('path', 'haarcascade_frontalface_default.xml')
    return cv2.CascadeClassifier(path)


In [3]:
#TODO: testing with different model types
#for example: eye model + larger bounding box towards the bottom

## Mask classifier

foundation: https://www.tensorflow.org/tutorials/load_data/images

In [4]:
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
import numpy as np
import matplotlib.pyplot as plt
from tensorflow.keras.layers import MaxPool2D, Conv2D, Input, Dense, Flatten, AveragePooling2D, Dropout
import tensorflow.keras.layers as lays
from tensorflow.keras.layers.experimental.preprocessing import Rescaling, RandomContrast, RandomFlip, RandomRotation
from tensorflow.keras.preprocessing.image import load_img, img_to_array, ImageDataGenerator
from tensorflow.keras.preprocessing import image_dataset_from_directory
from tensorflow.keras import Sequential, losses as lfs
from tensorflow.keras.optimizers import Adam
import keras_tuner as kt
from tensorflow.keras.callbacks import ModelCheckpoint
from tensorflow.keras import utils

#### Data augmentation
https://www.tensorflow.org/tutorials/images/data_augmentation

In [5]:
augmentation = Sequential([
  RandomFlip("horizontal"),
  RandomRotation(0.4),
  RandomContrast(0.5)
])

##### variables:

In [6]:
batch_size = 32
img_size = (180, 180)
img_size_vgg = (224, 224)
epochs = 11
checkpoint_path = "mask_model/weights.ckpt"
checkpoint_dir = os.path.dirname(checkpoint_path)
imgs_path = os.path.join('..', 'img')
num_classes = 3

correct_usage =  'correct usage: \n' + 'predict([path to image], \'category\' \n' + 'predict([path to image], \'probabilities\' \n' + 'predict([path to image], \'detection\' \n' + 'predict(\'live_detection\')'

##### loading dataset:

In [7]:
def load_dataset(**kwargs):
    imgs_path = kwargs.get('imgs_path', os.path.join('..', 'img'))
    img_size = kwargs.get('img_size', (180, 180))
    batch_size = kwargs.get('batch_size', 32)

    ##load images/ labels from directory
    valid_images = [".jpg",".png",".jpeg",".JPG"]
    x=[]
    y=[]
    
    for root, dirs, files in os.walk(imgs_path):
        for filename in files:
            end = os.path.splitext(filename)[1]
            if end.lower() not in valid_images:
                continue
            image = load_img(os.path.join(root, filename), target_size=img_size)
            image = img_to_array(image)
            
            label = os.path.join(root, filename).split(os.path.sep)[-2]
            
            x.append(image)
            y.append(label)
    x = np.array(x, dtype="float32")
    y = np.array(y)

    ## convert labels to ints from 0 ... len(labels)-1
    labels = []
    for i in range(len(y)):
        try:
            j = labels.index(y[i])
        except:
            labels.append(y[i])
            j = labels.index(y[i])
        y[i] = j
    y.astype(int)
    
    ## split dataset
    trainX, testX, trainY, testY = train_test_split(x, y, test_size=0.2, random_state=3)

    ## one-hot encoding
    trainY = utils.to_categorical(trainY, num_classes)
    testY = utils.to_categorical(testY, num_classes)

    ## data augmentation
    datagen = ImageDataGenerator(
        featurewise_center=True,
        featurewise_std_normalization=True,
        rotation_range=20,
        width_shift_range=0.2,
        height_shift_range=0.2,
        horizontal_flip=True,
        validation_split=0.2
    )

    ## merge xs and ys
    train_batches = datagen.flow(trainX, trainY, batch_size=32, subset='training')
    test_batches  = datagen.flow(trainX, trainY, batch_size=32, subset='validation')
    
    return train_batches, test_batches, labels, testX, testY, trainX, trainY

### Model selection & Training

In [8]:
#Hypermodels
#https://www.analyticsvidhya.com/blog/2021/06/create-convolutional-neural-network-model-and-optimize-using-keras-tuner-deep-learning/
#https://www.tensorflow.org/tutorials/keras/keras_tuner

def basic_model_builder(hp):
  
    basic_model = Sequential([
        Rescaling(1. /255),
        augmentation,
        Conv2D(filters=hp.Int('c1_filter', min_value=32, max_value=256, step=16), kernel_size=hp.Choice('c1_kernel', values=[3,5]), activation='relu'),
        AveragePooling2D(pool_size=(7,7)),
        Flatten(name="flatten"),
        Dense(units=hp.Int('d1_units', min_value=32, max_value=512, step=32), activation="relu"),
        Dropout(0.2),#drops small confidences
        Dense(num_classes, activation="softmax")
    ])

    basic_model.compile(optimizer=Adam(hp.Choice('learning_rate', values=[1e-2, 1e-3, 1e-4])), loss=lfs.SparseCategoricalCrossentropy(from_logits=True), metrics=['accuracy'])
    return basic_model

def small_model_builder(hp):
  
    small_model = Sequential([
        Rescaling(1. /255),
        augmentation,
        Conv2D(filters=hp.Int('c1_filter', min_value=32, max_value=256, step=16), kernel_size=hp.Choice('c1_kernel', values=[3,5]), activation='relu'),
        Conv2D(filters=hp.Int('c2_filter', min_value=32, max_value=256, step=16), kernel_size=hp.Choice('c2_kernel', values=[3,5]), activation='relu'),
        MaxPool2D(pool_size=(3,3)),
        Flatten(name="flatten"),
        Dense(units=hp.Int('d1_units', min_value=32, max_value=512, step=32), activation="relu"),
        Dropout(0.2),#drops small confidences
        Dense(num_classes, activation="softmax")
    ])

    small_model.compile(optimizer=Adam(hp.Choice('learning_rate', values=[1e-2, 1e-3, 1e-4])), loss=lfs.SparseCategoricalCrossentropy(from_logits=True), metrics=['accuracy'])
    return small_model


def tune_model(model_builder, xs, ys):
    #tuner = kt.RandomSearch(model_builder, objective='val_accuracy', max_trials=5)
    tuner = kt.RandomSearch(kt.applications.HyperResNet(input_shape=(180, 180, 3), classes=2), objective='val_loss', max_trials=5)
    tuner = kt.Hyperband(model_builder, objective='val_accuracy', max_epochs=10, factor=3)
    tuner.search(xs, ys, epochs=50, validation_split=0.2)
    return tuner.get_best_hyperparameters(num_trials=1)[0], tuner

In [9]:
def select_model(model_name, **kwargs):
    num_labels = kwargs.get('num_classes', num_classes)

    basic_model = Sequential()
    basic_model.add(Rescaling(1. /255))
    basic_model.add(Conv2D(32, (3,3), activation='relu', input_shape=(180,180,3)))
    basic_model.add(AveragePooling2D(pool_size=(7,7)))
    basic_model.add(Flatten(name="flatten"))
    basic_model.add(Dense(128, activation="relu"))
    basic_model.add(Dropout(0.5)) #drops small confidences
    basic_model.add(Dense(num_labels, activation="softmax"))
    

    small_model = Sequential()
    small_model.add(Rescaling(1. /255))
    small_model.add(Conv2D(filters=128, kernel_size=(5,5), activation='relu', input_shape=(180,180,3)))
    small_model.add(Conv2D(filters=128, kernel_size=(5,5), activation='relu'))
    small_model.add(MaxPool2D(pool_size=(3,3)))
    small_model.add(Flatten(name="flatten"))
    small_model.add(Dense(units=224, activation="relu"))
    small_model.add(Dropout(0.5))#drops small confidences
    small_model.add(Dense(num_labels, activation="softmax"))
    

    vgg_small_model=Sequential()

    vgg_small_model.add(Conv2D(64,(3,3),activation='relu',input_shape=(180,180,3)))
    vgg_small_model.add(MaxPool2D(2,2))
    vgg_small_model.add(Conv2D(64,(3,3),activation='relu'))
    vgg_small_model.add(MaxPool2D(2,2))
    vgg_small_model.add(Conv2D(128,(3,3),activation='relu'))
    vgg_small_model.add(MaxPool2D(2,2))
    vgg_small_model.add(Conv2D(128,(3,3),activation='relu'))
    vgg_small_model.add(MaxPool2D(2,2))
    vgg_small_model.add(Flatten())
    vgg_small_model.add(Dropout(0.5))
    vgg_small_model.add(Dense(120,activation='relu'))
    vgg_small_model.add(Dense(3,activation='softmax'))

    vgg_model = Sequential()
    vgg_model.add(Rescaling(1. /255))
    vgg_model.add(Conv2D(input_shape=(224,224,3), filters=64, kernel_size=(3,3), padding="same", activation="relu", strides=(1,1))) 
    vgg_model.add(Conv2D(filters=64,kernel_size=(3,3),padding="same", activation="relu"))
    vgg_model.add(MaxPool2D(pool_size=(2, 2), strides=(2, 2)))
    vgg_model.add(Conv2D(filters=128, kernel_size=(3,3), padding="same", activation="relu"))
    vgg_model.add(Conv2D(filters=128, kernel_size=(3,3), padding="same", activation="relu"))
    vgg_model.add(MaxPool2D(pool_size=(2, 2), strides=(2)))
    vgg_model.add(Conv2D(filters=256, kernel_size=(3,3), padding="same", activation="relu"))
    vgg_model.add(Conv2D(filters=256, kernel_size=(3,3), padding="same", activation="relu"))
    vgg_model.add(Conv2D(filters=256, kernel_size=(3,3), padding="same", activation="relu"))
    vgg_model.add(MaxPool2D(pool_size=(2, 2), strides=(2)))
    vgg_model.add(Conv2D(filters=512, kernel_size=(3,3), padding="same", activation="relu"))
    vgg_model.add(Conv2D(filters=512, kernel_size=(3,3), padding="same", activation="relu"))
    vgg_model.add(Conv2D(filters=512, kernel_size=(3,3), padding="same", activation="relu"))
    vgg_model.add(MaxPool2D(pool_size=(2, 2), strides=(2)))
    vgg_model.add(Conv2D(filters=512, kernel_size=(3,3), padding="same", activation="relu"))
    vgg_model.add(Conv2D(filters=512, kernel_size=(3,3), padding="same", activation="relu"))
    vgg_model.add(Conv2D(filters=512, kernel_size=(3,3), padding="same", activation="relu"))
    vgg_model.add(MaxPool2D(pool_size=(2, 2), strides=(2)))
    vgg_model.add(Flatten())
    vgg_model.add(Dense(units=4096, activation="relu"))
    vgg_model.add(Dense(units=4096, activation="relu"))
    vgg_model.add(Dense(units=num_labels, activation="softmax"))
    

    if model_name == 'basic_model':
        basic_model.summary()
        return basic_model
    
    if model_name == 'small_model':
        small_model.summary()
        return small_model

    if model_name == 'vgg_model':
        vgg_model.summary()
        return vgg_model
    
    if model_name == 'vgg_small_model':
        vgg_small_model.summary()
        return vgg_small_model

about the VGG-model: https://neurohive.io/en/popular-networks/vgg16/

In [10]:
def train_model(model, train_ds, val_ds, **kwargs):
    epochs = kwargs.get('epochs', 10)

    model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
    history = model.fit(train_ds, validation_data=val_ds, epochs=epochs)
    return history

#### Testing/Evaluation (mask):
https://www.tensorflow.org/guide/keras/train_and_evaluate

In [11]:
def evaluate_model(x_test, y_test, model):
    results = model.evaluate(x_test, y_test, batch_size=32)
    print(results)
    
    y_pred_confidences = model.predict(x_test)
    y_pred = [np.argmax(cs) for cs in y_pred_confidences]
    print(classification_report(y_test, y_pred))

##### Predictions:

In [12]:
def load_model(**kwargs):
    checkpoint_path = kwargs.get('checkpoint_path', "mask_model/weights.ckpt")
    model = kwargs.get('model', select_model('basic_model'))
    model.load_weights(checkpoint_path)
    return model

In [13]:
def maskPredict(model, img, labels):
    pred = model.predict(img[None])
    label_index = np.argmax(pred)
    return labels[label_index], pred[0][label_index]


#mode can be 'category', 'probabilities', 'detection', 'live_detection'
def predict(mode, **kwargs):
    img_path = kwargs.get('img_path', None)
    model = kwargs.get('model', load_model())
    if mode=='live_detection':
        live_det()
        return
    
    if img_path is None:
        print(correct_usage)
        return

    #load_img
    img = load_img(img_path, target_size = img_size)
    img = img_to_array(img)
    
    if mode=='detection':
        detect(img)
        cv2.waitKey(0)
        cv2.destroyAllWindows()
        return
        
    if mode=='category':
        label, confidence = maskPredict(img)
        return label
        
    if mode=='probabilities':
        #TODO: schöneres Format
        return model.predict(img[None])

    else:
        print(correct_usage)

def detect(img):
    faceLocs, rejectLevels, confidences = faceNetLocalize(img)
        
    for (x, y, w, h) in faceLocs:
        #crop image and predict label of cropped image
        img_crop = img[y:y+h, x:x+w]
        label, confidence_mask = maskPredict(img)
        #show label/ bounding box on image
        cv2.putText(img, f"{label}, confidence:{confidence_mask}", (x+w-30, y+h), cv2.FONT_HERSHEY_PLAIN, 1.0, cv2.CV_RGB(0,255,0), 2.0) 
        cv2.rectangle(img, (x, y), (x+w, y+h), (0, 255, 0), 2)
    
    cv2.imshow('live_output', img)

def live_det():
    #TODO: errorhandling for camera
    
    wait_time = 10 #time in ms to wait before refreshing feed
    camera = cv2.VideoCapture(0) #Input value might differ on different systems
    
    while(True):
        _, img = camera.read()

        detect(img)

        #wait for ESC or q
        if (cv2.waitKey(wait_time) & 0xFF) in [27, ord('q')]: 
            break

    camera.release()
    return 'live_output'



In [18]:
train_ds, test_ds, labels, testX, testY, trainX, trainY = load_dataset()
#bhps, tuner = tune_model(basic_model_builder, x_test, y_test)

In [19]:
model = select_model('basic_model')
#model = tuner.hypermodel.build(bhps)
history = train_model(model, train_ds, test_ds, epochs=20)

#evaluate_model(testX, testY, model)



Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20

In [None]:
print(history.history['val_accuracy'])

[0.39344263076782227, 0.2786885201931, 0.44262295961380005, 0.4754098355770111, 0.5245901346206665, 0.4754098355770111, 0.7704917788505554, 0.688524603843689, 0.868852436542511, 0.8852459192276001, 0.8524590134620667, 0.9016393423080444, 0.8360655903816223, 0.9016393423080444, 0.9016393423080444, 0.868852436542511, 0.8524590134620667, 0.8524590134620667, 0.6557376980781555, 0.7868852615356445]


In [17]:
#img_path='../img/ffp2/IMG_1596_(657, 421, 1426, 1426).png'
#img_path='../img/no_mask/IMG_1323.JPG_(1025, 427, 984, 984).png'
#img_path='../img/op_mask/IMG_1337.JPG_(878, 710, 954, 954).png'
#img_path='../img/other_mask/IMG_1327.JPG_(863, 476, 1097, 1097).png'

#predict(img_path, 'category', model=model, labels=labels)