# Import

In [None]:
import numpy as np
from PIL import Image
import matplotlib.pyplot as plt
from collections import Counter
from tensorflow.keras.losses import BinaryCrossentropy
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Conv2D, Flatten, Dropout, MaxPooling2D
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.callbacks import ModelCheckpoint,EarlyStopping, ReduceLROnPlateau
from get_dataset import get_training_and_validation
from tensorflow.keras.models import save_model, load_model
from sklearn.metrics import classification_report, confusion_matrix
from tensorflow.keras.models import save_model, load_model

In [None]:
#import tensorflow as tf
#print("Num GPUs Available: ", len(tf.config.experimental.list_physical_devices('GPU')))
#tf.debugging.set_log_device_placement(True)

In [None]:
batch_size = 32
epochs = 50
IMG_HEIGHT = 256
IMG_WIDTH = 256

# make sure to write own classifier-function in next cell
# if num_classes is greater than 2

# Load Data

In [None]:
# this filter allows all combination with exactly 1 human on it
# artificial masks (photoshop, cg), are not distinguished

# this implicite sets the ['class'] as follows:
#   with_mask >= 1 -> 'mask'
#   with_mask == 0 -> 'no_mask'
#  classifier-attribute can be set to own function for matching data-rows into classes
frame = get_training_and_validation(people_per_img = 1, with_mask=None, no_mask=None, unknown=0,validation_split=0.15 ,test_split=0.1)

train_frame=frame[0]
val_frame=frame[1]
test_frame=frame[2]


print('Filter returned {:d} train images'.format(len(train_frame)))
print(Counter(train_frame['class']))
print('Filter returned {:d} valid images'.format(len(val_frame)))
try:
    print('Filter returned {:d} test images'.format(len(test_frame)))
except:
    print ("Filter returned no test images")
    
num_classes = len(Counter(train_frame['class']))

# Data preparation

In [None]:
def get_data_generator(image_generator, frame, shuffle=True):
    return image_generator.flow_from_dataframe(frame, directory='./', x_col='Path', y_col='class', color_mode="rgb",
                                                 batch_size = batch_size, shuffle=shuffle, target_size=(IMG_HEIGHT, IMG_WIDTH), class_mode='categorical')

In [None]:
# generator for all data
image_generator = ImageDataGenerator(
                    rescale=1./255,
                    rotation_range=15,
                    width_shift_range=.15,
                    height_shift_range=.15,
                    horizontal_flip=True,
                    zoom_range=0.2
                    )
image_generator_test = ImageDataGenerator(rescale=1./255)

In [None]:
train_data_gen = get_data_generator(image_generator, train_frame)

validation_data_gen = get_data_generator(image_generator_test, val_frame)


In [None]:
sample_training_images, _ = next(train_data_gen)

In [None]:
# This function will plot images in the form of a grid with 1 row and 5 columns where images are placed in each column.
def plotImages(images_arr):
    fig, axes = plt.subplots(1, 5, figsize=(20,20))
    axes = axes.flatten()
    for img, ax in zip( images_arr, axes):
        ax.imshow(img,interpolation='none')
        ax.axis('off')
    plt.tight_layout()
    plt.show()

In [None]:
plotImages(sample_training_images[:5])

# First Model
## Create a starting model

In [None]:
model = Sequential([
    Conv2D(32, 3, padding='same', strides=2,activation='relu', input_shape=(IMG_HEIGHT, IMG_WIDTH ,3)),
    MaxPooling2D(),
    Conv2D(64, 3, padding='same', strides=2,activation='relu'),
    MaxPooling2D(),
    Conv2D(64, 3, padding='same', strides=2,activation='relu'),
    MaxPooling2D(),
    Flatten(),
    Dense(512, activation='relu'),
    Dense(num_classes, activation='softmax')
])

In [None]:
model.compile(optimizer='adam',
              loss='binary_crossentropy',
              metrics=['accuracy'])


In [None]:
model.summary()

## Train the model

In [None]:
#small training with only X epochs - save old epochs value
epochs_saved=epochs
epochs=10
history = model.fit(
    train_data_gen,
    epochs=epochs,
    validation_data=validation_data_gen,
)

In [None]:
acc = history.history['accuracy']
val_acc = history.history['val_accuracy']

loss=history.history['loss']
val_loss=history.history['val_loss']

epochs_range = range(epochs)

plt.figure(figsize=(8, 8))
plt.subplot(1, 2, 1)
plt.plot(epochs_range, acc, label='Training Accuracy')
plt.plot(epochs_range, val_acc, label='Validation Accuracy')
plt.legend(loc='lower right')
plt.title('Training and Validation Accuracy')

plt.subplot(1, 2, 2)
plt.plot(epochs_range, loss, label='Training Loss')
plt.plot(epochs_range, val_loss, label='Validation Loss')
plt.legend(loc='upper right')
plt.title('Training and Validation Loss')
plt.show()

In [None]:
#RESTORE old epochs value
epochs=epochs_saved

# Data augmentation versions

## Flip and Shift

In [None]:
#Flip and shift by up to 15%
image_gen = ImageDataGenerator(rescale=1./255, horizontal_flip=True,width_shift_range=.15,
                    height_shift_range=.15,)

train_data_gen = get_data_generator(image_gen, train_frame)

In [None]:
augmented_images = [train_data_gen[0][0][0] for i in range(5)]

# Re-use the same custom plotting function defined and used
# above to visualize the training images
plotImages(augmented_images)

## Rotate

In [None]:
# Rotate picture by -15 to 15 degree
image_gen = ImageDataGenerator(rescale=1./255, rotation_range=15)

train_data_gen = get_data_generator(image_gen, train_frame)

In [None]:
augmented_images = [train_data_gen[0][0][0] for i in range(5)]

plotImages(augmented_images)

## Zoom

In [None]:
# Zoom picture up to 30%
image_gen = ImageDataGenerator(rescale=1./255, zoom_range=0.3)

train_data_gen = get_data_generator(image_gen, train_frame)

In [None]:
augmented_images = [train_data_gen[0][0][0] for i in range(5)]

plotImages(augmented_images)

# Updated Image Generator

In [None]:
# generator for all data
image_generator = ImageDataGenerator(
                    rescale=1./255,
                    rotation_range=15,
                    width_shift_range=.10,
                    height_shift_range=.10,
                    horizontal_flip=True,
                    zoom_range=0.2
                    )

#image generator for test data
image_generator_test = ImageDataGenerator(rescale=1./255)

In [None]:
train_data_gen = get_data_generator(image_generator, train_frame)

validation_data_gen = get_data_generator(image_generator_test, val_frame)

test_data_gen = get_data_generator(image_generator_test, test_frame, shuffle=False)

In [None]:
augmented_images = [train_data_gen[0][0][0] for i in range(5)]
plotImages(augmented_images)

# Second Model with Dropout

In [None]:
model_new = Sequential([
    Conv2D(32, 3, padding='same', activation='relu', 
           input_shape=(IMG_HEIGHT, IMG_WIDTH ,3)),
    MaxPooling2D(),
    Conv2D(64, 3, padding='same', strides=2, activation='relu'),
    MaxPooling2D(),
    Dropout(0.2),
    Conv2D(128, 3, padding='same', strides=4, activation='relu'),
    MaxPooling2D(),
    Dropout(0.2),
    Flatten(),
    Dense(512, activation='relu'),
    Dropout(0.3),
    Dense(64, activation='relu'),
    Dropout(0.3),
    Dense(num_classes, activation='softmax')
])

In [None]:
model_new.compile( optimizer='adam', #optimizer='rmsprop',
                  loss='binary_crossentropy',#tf.keras.losses.BinaryCrossentropy(from_logits=True),
                  metrics=['accuracy'])
 
model_new.summary()

## Train the Model

In [None]:
checkpoint_filepath = './tmp/model_checkpoint'

model_checkpoint_callback = ModelCheckpoint(
    filepath=checkpoint_filepath,
    save_weights_only=True,
    monitor='val_accuracy',
    mode='max',
    save_best_only=True)

earlyStopping = EarlyStopping(monitor='val_loss', patience=10, verbose=1, mode='min')

reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=3, verbose=1,
                              cooldown=0, mode='auto',min_delta=0.0001, min_lr=0)

In [None]:
history = model_new.fit(
    train_data_gen,
    epochs=epochs,
    validation_data=validation_data_gen,
    callbacks=[model_checkpoint_callback,earlyStopping,reduce_lr]
)

In [None]:
acc = history.history['accuracy']
val_acc = history.history['val_accuracy']

loss = history.history['loss']
val_loss = history.history['val_loss']

epochs_range = range(len(acc))

plt.figure(figsize=(8, 8))
plt.subplot(1, 2, 1)
plt.plot(epochs_range, acc, label='Training Accuracy')
plt.plot(epochs_range, val_acc, label='Validation Accuracy')
plt.legend(loc='lower right')
plt.title('Training and Validation Accuracy')

plt.subplot(1, 2, 2)
plt.plot(epochs_range, loss, label='Training Loss')
plt.plot(epochs_range, val_loss, label='Validation Loss')
plt.legend(loc='upper right')
plt.title('Training and Validation Loss')
plt.show()

## Test

In [None]:
test_data_gen = get_data_generator(image_generator_test, test_frame, shuffle=False)

In [None]:

model_new.load_weights(checkpoint_filepath)
used_model=model_new

total_test=len(test_frame)

Y_pred = used_model.predict(test_data_gen)#, steps=total_test // batch_size+1)
predict_class = np.argmax(Y_pred, axis=1)
predict_class = predict_class.tolist()

ausgabe=0
if ausgabe:
    print('Predict\n',predict_class)
    print('Klassen\n',test_data_gen.classes)
    classes_table=[int(i=='no_mask') for i in test_frame['class'].tolist()]
    print('Klassen Tabelle\n', classes_table)
    print(test_frame['class'].tolist())


print('Confusion Matrix')
print(confusion_matrix(test_data_gen.classes, predict_class))
print('Classification Report')
target_names = ['Mask','NoMask']
print(classification_report(test_data_gen.classes, predict_class, target_names=target_names))

In [None]:
#used_model.save('saved_model')
used_model.save('saved_model_test')

# Try out different models
## Create models

In [None]:
model_3 = Sequential([
    Conv2D(32, 3, padding='same', activation='relu', 
           input_shape=(IMG_HEIGHT, IMG_WIDTH ,3)),
    MaxPooling2D(),
    Conv2D(32, 3, padding='same', strides=2, activation='relu'),
    MaxPooling2D(),
    Dropout(0.2),
    Conv2D(64, 3, padding='same', strides=4, activation='relu'),
    MaxPooling2D(),
    Dropout(0.2),
    Flatten(),
    Dense(512, activation='relu'),
    Dropout(0.3),
    Dense(num_classes, activation='softmax')
])
model_3.compile( optimizer='adam',
                  loss='binary_crossentropy',#tf.keras.losses.BinaryCrossentropy(from_logits=True),
                  metrics=['accuracy'])
 
model_3.summary()

In [None]:
model_4 = Sequential([
    Conv2D(128, 3, padding='same', activation='relu', 
           input_shape=(IMG_HEIGHT, IMG_WIDTH ,3)),
    MaxPooling2D(),
    Conv2D(64, 3, padding='same', strides=2, activation='relu'),
    MaxPooling2D(),
    Dropout(0.2),
    Conv2D(32, 3, padding='same', strides=4, activation='relu'),
    MaxPooling2D(),
    Dropout(0.2),
    Flatten(),
    Dense(256, activation='relu'),
    Dropout(0.3),
    Dense(32, activation='relu'),
    Dropout(0.3),
    Dense(8, activation='relu'),
    Dense(num_classes, activation='softmax')
])
model_4.compile( optimizer='adam',
                  loss='binary_crossentropy',#tf.keras.losses.BinaryCrossentropy(from_logits=True),
                  metrics=['accuracy'])
 
model_4.summary()

In [None]:
model_5 = Sequential([
    Conv2D(32, 3, padding='same', activation='relu', 
           input_shape=(IMG_HEIGHT, IMG_WIDTH ,3)),
    MaxPooling2D(),
    Conv2D(128, 3, padding='same', strides=2, activation='relu'),
    MaxPooling2D(),
    Dropout(0.2),
    Conv2D(64, 3, padding='same', strides=4, activation='relu'),
    MaxPooling2D(),
    Dropout(0.2),
    Flatten(),
    Dense(512, activation='relu'),
    Dropout(0.3),
    Dense(32, activation='relu'),
    Dropout(0.3),
    Dense(num_classes, activation='softmax')
])
model_5.compile( optimizer='adam',
                  loss='binary_crossentropy',#tf.keras.losses.BinaryCrossentropy(from_logits=True),
                  metrics=['accuracy'])
 
model_5.summary()

## Train models

In [None]:
models=[model_3, model_4, model_5]

for model in models:
    history = model_new.fit(
        train_data_gen,
        epochs=epochs,
        validation_data=validation_data_gen,
        callbacks=[model_checkpoint_callback,earlyStopping,reduce_lr]
    )

    acc = history.history['accuracy']
    val_acc = history.history['val_accuracy']

    loss = history.history['loss']
    val_loss = history.history['val_loss']

    epochs_range = range(len(acc))

    plt.figure(figsize=(8, 8))
    plt.subplot(1, 2, 1)
    plt.plot(epochs_range, acc, label='Training Accuracy')
    plt.plot(epochs_range, val_acc, label='Validation Accuracy')
    plt.legend(loc='lower right')
    plt.title('Training and Validation Accuracy')

    plt.subplot(1, 2, 2)
    plt.plot(epochs_range, loss, label='Training Loss')
    plt.plot(epochs_range, val_loss, label='Validation Loss')
    plt.legend(loc='upper right')
    plt.title('Training and Validation Loss')
    plt.show()