In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
import tensorflow as tf
import numpy as np
import json
from tensorflow.keras.preprocessing.image import ImageDataGenerator

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

## Directories Variables

In [None]:
root_dir = '/kaggle/input/artificial-neural-networks-and-deep-learning-2020/MaskDataset'

train_gt = '%s/train_gt.json' % root_dir
training_dir = '%s/training' % root_dir

testing_dir = '%s/test' % root_dir

output_dir = '/kaggle/working'

train_faces_gt = '%s/train_faces_gt.json' % output_dir
training_faces_dir = '%s/training_faces' % output_dir

In [None]:
!pip install retinaface --quiet
from retinaface import RetinaFace

from PIL import Image

!pip install tensorflow --upgrade --quiet

## Faces Cropping

In [None]:
face_detector = RetinaFace(quality='normal')

def crop_faces_from_images(dataframe):
    faces = []
    for index, img in dataframe.loc[(dataframe['class'] == 0) | (dataframe['class'] == 1)].iterrows():

        img_path = "{}/{}".format(training_dir, img['filename'])
        
        rgb_img = Image.open(img_path).convert('RGB')
        img_pixels = np.asarray(rgb_img)

        img_faces = face_detector.predict(img_pixels)

        img_w, img_h = rgb_img.size

        for face_index, face in enumerate(img_faces):
            x1, x2 = face['x1'], face['x2']
            y1, y2 = face['y1'], face['y2']
            x1, x2, y1, y2 = max(0, x1), min(x2, img_w), max(0, y1), min(y2, img_h)

            cropped_img = rgb_img.crop((x1,y1,x2,y2))
            
            face_name = "face%d_%s" % (face_index, img['filename'])
            face_path = "%s/%s" % (training_faces_dir, face_name)

            faces.append({"image": cropped_img, "path": face_path,
                         "filename": face_name, "class": "mask" if img['class']==1 else "no_mask"})
    
    return faces

def save_training_faces(faces):
    if not os.path.isdir(training_faces_dir):
        os.mkdir(training_faces_dir)
        
    for face in faces:
        cropped_img = face['image']
        face_path = face['path']
        
        cropped_img.save(face_path)

def load_training_dataframe():
    if os.path.isfile(train_faces_gt):
        return pd.read_json(train_faces_gt)
    
    with open(train_gt) as f:
        dic = json.load(f)
        f.close()

    dataframe = pd.DataFrame(dic.items())
    dataframe.rename(columns = {0:'filename', 1:'class'}, inplace = True)
    
    faces = crop_faces_from_images(dataframe)
    save_training_faces(faces)
    
    faces_data = [[face['filename'], face['class']] for face in faces]
    faces_dataframe = pd.DataFrame(faces_data, columns=['filename', 'class'])
    faces_dataframe.to_json(train_faces_gt)
    
    return faces_dataframe

### Clean output folder

In [None]:
clean_output_folder = False

if clean_output_folder:
    import shutil

    if os.path.isdir(training_faces_dir):
        shutil.rmtree(training_faces_dir)
    if os.path.isfile(train_faces_gt):
        os.remove(train_faces_gt)

## Datasets Preparation

In [None]:
dataframe = load_training_dataframe()

probs = np.random.rand(len(dataframe))
training_mask = probs < 0.8
validation_mask = probs>=0.8

training_dataframe = dataframe[training_mask]
validation_dataframe = dataframe[validation_mask]

## Datasets Generation

In [None]:
# Define Variables
classes = ['mask', 'no_mask']
num_classes = len(classes) if classes!=None else 0

img_size = (112,112)
input_shape = (img_size[0], img_size[1], 3)
output_shapes = ([None, img_size[0], img_size[1], 3], [None])

batch_size = 100

SEED = 1000
tf.random.set_seed(SEED) 

#Training
train_data_gen = ImageDataGenerator(rotation_range=10,
                                    width_shift_range=10,
                                    height_shift_range=10,
                                    zoom_range=0.1,
                                    shear_range=0.1,
                                    channel_shift_range=0.1,
                                    horizontal_flip=True,
                                    vertical_flip=True,
                                    fill_mode='constant',
                                    rescale=1./255)

training_generator = train_data_gen.flow_from_dataframe(training_dataframe,
                                                        training_faces_dir,
                                                        batch_size=batch_size,
                                                        class_mode='binary',
                                                        shuffle=True,
                                                        seed=SEED,
                                                        target_size=img_size,
                                                        classes=classes)

training_dataset = tf.data.Dataset.from_generator(lambda: training_generator,
                                                  output_types=(tf.float32, tf.float32),
                                                  output_shapes=output_shapes)

training_dataset = training_dataset.repeat()

# Validation
val_data_gen = ImageDataGenerator(rescale=1./255)

validation_generator = val_data_gen.flow_from_dataframe(validation_dataframe,
                                                        training_faces_dir,
                                                        batch_size=batch_size,
                                                        class_mode='binary',
                                                        shuffle=True,
                                                        seed=SEED,
                                                        target_size=img_size,
                                                        classes=classes)

validation_dataset = tf.data.Dataset.from_generator(lambda: validation_generator, 
                                                    output_types=(tf.float32, tf.float32),
                                                    output_shapes=output_shapes)

validation_dataset = validation_dataset.repeat()

## Model Creation

In [None]:
# Transfer Learning Convolution
vgg16 = tf.keras.applications.vgg16.VGG16(
    input_shape=input_shape,
    include_top=False,
    weights="imagenet",
    classes=num_classes
)
vgg16.trainable = True
for layer in vgg16.layers:
    if layer.name in ['block4_conv1', 'block5_conv1']:
        layer.trainable = True
    else:
        layer.trainable = False

# Classifier
model = tf.keras.Sequential()
model.add(vgg16)

model.add(tf.keras.layers.Flatten())
model.add(tf.keras.layers.Dense(units=128, activation='relu',  input_dim=input_shape))
model.add(tf.keras.layers.Dense(units=1, activation='sigmoid'))

model.summary()

## Model Optimization

In [None]:
# learning rate
learning_rate = 1e-5
optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)

# Loss
loss = tf.keras.losses.BinaryCrossentropy()

# Validation metrics
metrics = ['accuracy']

# Compile Model
model.compile(optimizer=optimizer, loss=loss, metrics=metrics)

## Training Model

In [None]:
# Early Stopping
es_callback = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)

callbacks = [es_callback]

model.fit(training_dataset,
          epochs=20,  #### set repeat in training dataset
          steps_per_epoch=len(training_generator),
          validation_data=validation_dataset,
          validation_steps=len(validation_generator), 
          callbacks=callbacks)

## Testing Model

In [None]:
import matplotlib.pyplot as plt

test_images = [filename for filename in next(os.walk(testing_dir))[2]]

results={}
for index, img_filename in enumerate(test_images):
    img_path = "%s/%s" % (testing_dir, img_filename)

    rgb_img = Image.open(img_path).convert('RGB')
    img_pixels = np.asarray(rgb_img)

    img_faces = face_detector.predict(img_pixels)

    img_w, img_h = rgb_img.size

    faces_preds = []
    for face_index, face in enumerate(img_faces):
        x1, x2 = face['x1'], face['x2']
        y1, y2 = face['y1'], face['y2']
        x1, x2, y1, y2 = max(0, x1), min(x2, img_w), max(0, y1), min(y2, img_h)

        cropped_img = rgb_img.crop((x1,y1,x2,y2))

        face_test = cropped_img.resize(img_size, Image.ANTIALIAS)
        face_test = tf.keras.preprocessing.image.img_to_array(face_test) / 255.0
        face_test = np.expand_dims(face_test, axis=0)
        
        face_pred = model.predict(face_test)
        face_pred = 1 if face_pred < 0.5 else 0
        
        faces_preds.append(face_pred)
    
    if sum(faces_preds) == 0:
        img_pred = 0
    elif sum(faces_preds) == len(img_faces):
        img_pred = 1
    else:
        img_pred = 2
    
    results[img_filename] = img_pred
    
    if index % 50 == 0:
        plt.figure()
        plt.imshow(rgb_img)
        plt.xlabel(faces_preds)
        plt.title("First %d images analysed." % index)
        plt.show()

## Create CSV

In [None]:
import csv
from datetime import datetime

def create_csv(results, results_dir='./'):

    for file in next(os.walk(output_dir))[2]:
        if 'results' in file:
            os.remove('%s/%s' % (output_dir, file))
            
    csv_fname = 'results_'
    csv_fname += datetime.now().strftime('%b%d_%H-%M-%S') + '.csv'

    with open(os.path.join(results_dir, csv_fname), 'w') as f:

        f.write('Id,Category\n')

        for key, value in results.items():
            f.write(key + ',' + str(value) + '\n')

create_csv(results)