In [None]:
import os
import tensorflow as tf
import numpy as np
import json
import math
import pandas as pd
from PIL import Image
from datetime import datetime
from google.colab import drive
from tensorflow.keras.preprocessing.image import ImageDataGenerator

##Mounting the Drive

In [None]:
drive.mount('/content/drive/')

cwd = os.getcwd()

dataset_dir = os.path.join(cwd, '/content/drive/My Drive/no_mask_detector/dataset')

decide_class_indices = True
if decide_class_indices:
  classes = [ '0',         # no_person
              '1',         # all_the_people
              '2' ]        # someone
else:
  classes = None

##Data augmentation

In [None]:
# Data augmentation
apply_data_augmentation = True

if apply_data_augmentation:
  train_data_gen = ImageDataGenerator(width_shift_range=10,
                                      height_shift_range=10,
                                      zoom_range=0.1,         # Not too wide zoom range, in order not to lose the correct 'label'
                                      horizontal_flip=True,   # Vertical flip is useless, we don't expect upside down images
                                      fill_mode='nearest',
                                      rescale=1./255)
else:
  train_data_gen = ImageDataGenerator(rescale=1./255)

valid_data_gen = ImageDataGenerator(rescale=1./255)
test_data_gen = ImageDataGenerator(rescale=1./255)


##Data loading

In [None]:
# Data Loading
bs = 8
num_classes = 3
img_h = 256
img_w = 256

SEED = 1234
tf.random.set_seed(SEED)


# Loading json file into a pandas dataframe
with open(os.path.join(dataset_dir,"train_gt.json")) as f:
  dic = json.load(f)

dataframe = pd.DataFrame(dic.items())
dataframe.rename(columns = {0:'filename', 1:'class'}, inplace = True)
dataframe["class"] = dataframe["class"].astype(str)
dataframe = dataframe.sample(frac=1).reset_index(drop=True)

# Dividing dataframes into "train_dataframe" and "valid_dataframe" by equally
# distributing saples of the main dataframe among the different classes.
# Proportions: 
# train_dataframe --> first 80% of the samples.
# valid_dataframe --> last 20% of the samples.
train_dataframe = pd.DataFrame()
valid_dataframe = pd.DataFrame()

for i in range(num_classes):
  df = dataframe.loc[dataframe['class'] == str(i)]
  train_dataframe = train_dataframe.append(df.head(math.floor(len(df.index) * 0.8)), ignore_index=True)
  valid_dataframe = valid_dataframe.append(df.tail(math.floor(len(df.index) * 0.2)), ignore_index=True)


# Reshuffle of the dataframes
train_dataframe = train_dataframe.sample(frac=1).reset_index(drop=True)
valid_dataframe = valid_dataframe.sample(frac=1).reset_index(drop=True)


# Training
training_dir = os.path.join(dataset_dir, 'training')
train_gen = train_data_gen.flow_from_dataframe(train_dataframe,
                                               training_dir,
                                               batch_size=bs,
                                               classes=classes,
                                               class_mode='categorical',
                                               target_size=(img_h,img_w),
                                               shuffle=True,
                                               seed=SEED)

# Validation
valid_gen = valid_data_gen.flow_from_dataframe(valid_dataframe,
                                               training_dir,
                                               batch_size=bs,
                                               classes=classes,
                                               class_mode='categorical',
                                               target_size=(img_h,img_w),
                                               shuffle=False,
                                               seed=SEED)

##Training and validation sets

In [None]:
# Datasets
train_dataset = tf.data.Dataset.from_generator(lambda: train_gen,
                                               output_types=(tf.float32, tf.float32),
                                               output_shapes=([None, img_h, img_w, 3], [None, num_classes]))
train_dataset = train_dataset.repeat()


valid_dataset = tf.data.Dataset.from_generator(lambda: valid_gen, 
                                               output_types=(tf.float32, tf.float32),
                                               output_shapes=([None, img_h, img_w, 3], [None, num_classes]))
valid_dataset = valid_dataset.repeat()

##Model creation

In [None]:
# Model
start_f = 8
depth = 5

model = tf.keras.Sequential()

# Features extraction
for i in range(depth):

    if i == 0:
        input_shape = [img_h, img_w, 3]
    else:
        input_shape=[None]

    # Conv block: Conv2D -> Activation -> Pooling
    model.add(tf.keras.layers.Conv2D(filters=start_f, 
                                    kernel_size=(3, 3),
                                    strides=(1, 1),
                                    padding='same',
                                    input_shape=input_shape))
    model.add(tf.keras.layers.ReLU())
    model.add(tf.keras.layers.MaxPool2D(pool_size=(2, 2)))

    start_f *= 2
    
# Classifier
model.add(tf.keras.layers.Flatten())
model.add(tf.keras.layers.Dropout(.3))
model.add(tf.keras.layers.Dense(units=128, activation='relu'))
model.add(tf.keras.layers.Dropout(.3))
model.add(tf.keras.layers.Dense(units=64, activation='relu'))
model.add(tf.keras.layers.Dropout(.3))
model.add(tf.keras.layers.Dense(units=128, activation='relu'))
model.add(tf.keras.layers.Dropout(.3))
model.add(tf.keras.layers.Dense(units=32, activation='relu'))
model.add(tf.keras.layers.Dense(units=num_classes, activation='softmax'))

# Visualize initialized weights
model.weights

# Loss
loss = tf.keras.losses.CategoricalCrossentropy()

# learning rate
lr = 1e-4
optimizer = tf.keras.optimizers.Adam(learning_rate=lr)

# Metrics
metrics = ['accuracy']

# Compile Model
model.compile(optimizer=optimizer, loss=loss, metrics=metrics)

##Tensorboard

In [None]:
# Tensorboard initialization
exps_dir = os.path.join('/content/drive/My Drive/no_mask_detector', 'classification_experiments')
if not os.path.exists(exps_dir):
    os.makedirs(exps_dir)

%load_ext tensorboard
%tensorboard --logdir /content/drive/My\ Drive/Kaggle\ Competitions/Competition1/classification_experiments/

##Checkpoint and callbacks

In [None]:
# Checkpoint and callbacks
now = datetime.now().strftime('%b%d_%H-%M-%S')

model_name = 'CNN'

exp_dir = os.path.join(exps_dir, model_name + '_' + str(now))
if not os.path.exists(exp_dir):
    os.makedirs(exp_dir)

callbacks = []

# Model checkpoint
ckpt_dir = os.path.join(exp_dir, 'ckpts')
if not os.path.exists(ckpt_dir):
    os.makedirs(ckpt_dir)

ckpt_callback = tf.keras.callbacks.ModelCheckpoint(filepath=os.path.join(ckpt_dir, 'cp_{epoch:02d}.ckpt'), 
                                                   save_weights_only=True)  # False to save the model directly
callbacks.append(ckpt_callback)

# Visualize Learning on Tensorboard
tb_dir = os.path.join(exp_dir, 'tb_logs')
if not os.path.exists(tb_dir):
    os.makedirs(tb_dir)
    
# By default shows losses and metrics for both training and validation
tb_callback = tf.keras.callbacks.TensorBoard(log_dir=tb_dir,
                                             profile_batch=0,
                                             histogram_freq=1)  # if 1 shows weights histograms
callbacks.append(tb_callback)

# Early Stopping
early_stop = True
if early_stop:
    # We decide to put the restore_best_weights of the EarlyStopping callback to true in order to have the best performing model 
    # after the training phase and not the last one before stopping
    
    # We put the patience to 7 to wait a little bit more for hypothetical small decreasing on the loss of the validation set
    es_callback = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=7)
    callbacks.append(es_callback)

##Model training

In [None]:
# The 2 quantities below are the step size per epoch for respectively the training set and validation set
# We chose those quantities to make the steps depending on the number of samples on the set and on the batch size
STEP_SIZE_TRAIN=train_gen.n//train_gen.batch_size
STEP_SIZE_VALID=valid_gen.n//valid_gen.batch_size

# Model training
model.fit(x=train_dataset,
          epochs=60,
          steps_per_epoch=STEP_SIZE_TRAIN,
          validation_data=valid_dataset,
          validation_steps=STEP_SIZE_VALID,
          callbacks=callbacks)

##Model prediction and results

In [None]:
# Model Evaluation
test_dir = os.path.join(dataset_dir, 'test')

image_filenames = next(os.walk(test_dir))[2]

predictions = {}
for image_filename in image_filenames:
  
  # Image loading and converting to RGB mode
  img = Image.open(os.path.join(test_dir,image_filename)).convert('RGB')

  # Resizing it to make it suitable to the model structure
  img = img.resize((img_h,img_w))
  img_array = np.array(img)
  img_array = img_array / 255
  img_array = np.expand_dims(img_array, 0)

  prediction = model.predict(img_array)

  # Taking the argmax of the predictions made to have the class predicted
  predictions[image_filename] = np.argmax(np.matrix(prediction))

In [None]:
# Function to create the CSV file with the results
def create_csv(results, results_dir='./'):

    csv_fname = 'results_'
    csv_fname += datetime.now().strftime('%b%d_%H-%M-%S') + '.csv'

    with open(os.path.join(results_dir, csv_fname), 'w') as f:

        f.write('Id,Category\n')

        for key, value in results.items():
            f.write(key + ',' + str(value) + '\n')

# Create the CSV file from the predictions made and save it on our folder on Drive
create_csv(predictions, '/content/drive/My Drive/no_mask_detector/results')