# Mask Classification

This Notebook contains a model is based on VGG19, using transfer learning

In [5]:
import os
import numpy as np 
import tensorflow as tf 
import pandas
import random

SEED = 1234

tf.random.set_seed(SEED)

GC = True
Gio = False 

In [6]:
if GC:
    from google.colab import drive
    drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [7]:
if Gio:
  !unzip '/content/drive/My Drive/MaskDataset/artificial-neural-networks-and-deep-learning-2020.zip'
  !ls /content/MaskDataset

In [8]:
# Random Noise
def add_noise(img):
    '''Add random noise to an image'''
    VARIABILITY = 50
    deviation = VARIABILITY*random.random()
    noise = np.random.normal(0, deviation, img.shape)
    img += noise
    np.clip(img, 0., 255.)
    return img

### Data Augmentation
simple transformation as zoom, flip (horizontal and vertical), rotation and shear.<br>
From the dataset, the 75% is used for training set with all the trasformation and 25% for validation set


In [9]:
from tensorflow.keras.preprocessing.image import ImageDataGenerator

apply_data_augmentation = True

if apply_data_augmentation: #if data augmentation is enabled, create the generator
     train_data_gen = ImageDataGenerator(rotation_range=10,
                                        zoom_range=0.1,
                                        horizontal_flip=True,
                                        vertical_flip=True,
                                        fill_mode='constant',
                                        cval=0,
                                        shear_range = 0.2, #added for TL
                                        validation_split = 0.25,
                                        rescale=1./255)
else: #rescale only the image
     train_data_gen = ImageDataGenerator(rescale = 1./255, validation_split = 0.25)                                       

#rescale only on validation dataset and test dataset
valid_data_gen = ImageDataGenerator(rescale = 1./255, validation_split= 0.25)

test_data_gen = ImageDataGenerator(rescale = 1./255)


In [10]:
#width and height of imgaes
img_w = 256
img_h = 256

num_classes = 3

classes = ["NO PERSON", "ALL THE PEOPLE", "SOMEONE"]

bs = 16 #batch size

In [15]:
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

#Loading data 
import json 

cwd = os.getcwd()
#setting dirs

if not Gio: # Matte
  os.chdir('/content/drive/My Drive/PoliMi/1-ANN_DL/LAB/AN2DL-homeworks')

dataset_dir = os.path.join(cwd, "MaskDataset")

training_dir = os.path.join(dataset_dir, "training")
validation_dir = training_dir


#reading json file
with open(os.path.join(dataset_dir,"train_gt.json")) as f:
  dic = json.load(f)


dataframe = pandas.DataFrame(dic.items())

dataframe.rename(columns = {0:'filename', 1:'class'}, inplace = True)

dataframe["class"] = dataframe["class"].astype(str)

#shuffling dataframe
dataframe = dataframe.sample(frac = 1) 

train_gen = train_data_gen.flow_from_dataframe(dataframe,
                                               training_dir,
                                               batch_size=bs,
                                               target_size=(img_h, img_w),
                                               class_mode='categorical',
                                               color_mode='rgb',
                                               subset='training',
                                               shuffle=True,
                                               seed=SEED)

valid_gen = valid_data_gen.flow_from_dataframe(dataframe,
                                               training_dir,
                                               #directory='full_dataset',
                                               batch_size=bs,
                                               target_size=(img_h, img_w),
                                               class_mode='categorical',
                                               color_mode='rgb',
                                               subset='validation',
                                               shuffle=True,
                                               seed=SEED) 


Found 4211 validated image filenames belonging to 3 classes.
Found 1403 validated image filenames belonging to 3 classes.


In [16]:
#Creating Dataset objects

train_dataset = tf.data.Dataset.from_generator(lambda: train_gen,
                                               output_types=(tf.float32, tf.float32),
                                               output_shapes=([None, img_h, img_w, 3], [None, num_classes]))

train_dataset = train_dataset.repeat()


valid_dataset = tf.data.Dataset.from_generator(lambda: valid_gen,
                                               output_types=(tf.float32, tf.float32),
                                               output_shapes=([None, img_h, img_w, 3], [None, num_classes]))

valid_dataset = valid_dataset.repeat()  



In [18]:
# Architecture: Features extraction -> Classifier

model = tf.keras.Sequential()


vgg = tf.keras.applications.VGG19(weights='imagenet', include_top=False, input_shape=(img_h, img_w, 3))
# Create Model
    # ------------

finetuning = True

if finetuning:
    freeze_until = 13 # layer from which we want to fine-tune

    for layer in vgg.layers:
          layer.trainable = False

    for layer in vgg.layers[:freeze_until]:
          layer.trainable = True
else:
      for layer in vgg.layers[:]:
        layer.trainable = False
    
model.add(vgg)

# Classifier
model.add(tf.keras.layers.Flatten())
#Basic Model
#model.add(tf.keras.layers.Dense(units=512, activation='relu'))
#TL
model.add(tf.keras.layers.Dense(units=128, activation='relu'))
model.add(tf.keras.layers.Dropout(0.3))
model.add(tf.keras.layers.Dense(units=num_classes, activation='softmax'))

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/vgg19/vgg19_weights_tf_dim_ordering_tf_kernels_notop.h5


In [19]:
# Optimization params
# -------------------

# Loss
loss = tf.keras.losses.CategoricalCrossentropy()

# learning rate
lr = 5e-5 #basic lr 1e-4
optimizer = tf.keras.optimizers.Adam(learning_rate=lr)
# -------------------

# Validation metrics
# ------------------

metrics = ['accuracy']
# ------------------

# Compile Model
model.compile(optimizer=optimizer, loss=loss, metrics=metrics)


model.summary()

Model: "sequential"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
vgg19 (Functional)           (None, 8, 8, 512)         20024384  
_________________________________________________________________
flatten (Flatten)            (None, 32768)             0         
_________________________________________________________________
dense (Dense)                (None, 128)               4194432   
_________________________________________________________________
dropout (Dropout)            (None, 128)               0         
_________________________________________________________________
dense_1 (Dense)              (None, 3)                 387       
Total params: 24,219,203
Trainable params: 7,700,547
Non-trainable params: 16,518,656
_________________________________________________________________


In [20]:
from datetime import datetime



exps_dir = os.path.join(cwd, 'classification_experiments')
if not os.path.exists(exps_dir):
    os.makedirs(exps_dir)

now = datetime.now().strftime('%b%d_%H-%M-%S')

model_name = 'CNN'

exp_dir = os.path.join(exps_dir, model_name + '_' + str(now))
if not os.path.exists(exp_dir):
    os.makedirs(exp_dir)
    
callbacks = []

# Model checkpoint
# ----------------
ckpt_dir = os.path.join(exp_dir, 'ckpts')
if not os.path.exists(ckpt_dir):
    os.makedirs(ckpt_dir)

ckpt_callback = tf.keras.callbacks.ModelCheckpoint(filepath=os.path.join(ckpt_dir, 'cp_{epoch:02d}.ckpt'), 
                                                   save_weights_only=True)  # False to save the model directly
callbacks.append(ckpt_callback)

# Visualize Learning on Tensorboard
# ---------------------------------
tb_dir = os.path.join(exp_dir, 'tb_logs')
if not os.path.exists(tb_dir):
    os.makedirs(tb_dir)
    
# By default shows losses and metrics for both training and validation
tb_callback = tf.keras.callbacks.TensorBoard(log_dir=tb_dir,
                                             profile_batch=0,
                                             histogram_freq=1)  # if 1 shows weights histograms
callbacks.append(tb_callback)

# Early Stopping
# --------------
early_stop = True
if early_stop:
    es_callback = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=10,restore_best_weights=True )
    callbacks.append(es_callback)

In [None]:
model.fit(x=train_dataset,
          epochs=30,  #### set repeat in training dataset
          steps_per_epoch=len(train_gen),
          validation_data=valid_dataset,
          validation_steps=len(valid_gen), 
          callbacks=callbacks)

Epoch 1/30
 14/264 [>.............................] - ETA: 38:53 - loss: 1.2823 - accuracy: 0.3080

In [None]:
#check that is all ok

#iterator = iter(valid_dataset)



#from PIL import Image

#for i in range(10):
#    sample, target = next(iterator)
#    sample_ = sample[0, ...]

#    img = Image.fromarray(np.uint8(np.array(sample_)*255.))
#    img = img.resize([img_w,img_h])
#    img_array = np.array(img)
#    img_array = np.expand_dims(img_array, 0) 
#    img_array = tf.cast(img_array, tf.float32) / 255.
#    prediction = model.predict(img_array)
#    img
#    print("Predicted:"+classes[np.argmax(prediction)])
#    print("Original:" +classes[tf.argmax(target[0], axis=0)])

#sample_
#class_names[tf.argmax(target[0], axis=0)]

In [None]:
# Test Dataset 
# Useful if you want to see images with predictions  

#from PIL import Image
#image_filenames = next(os.walk('MaskDataset/test'))[2]

#results = {}
#for image_name in image_filenames:
#   img = Image.open('MaskDataset/test/'+image_name).convert('RGB')
#   img = img.resize((img_w,img_h))
#   img_array = np.array(img)
#   img_array = np.expand_dims(img_array, 0) 
#   img_array = tf.cast(img_array, tf.float32) / 255.
#   prediction = model.predict(img_array)
#   img
#   classes[np.argmax(prediction)]


In [None]:
#Creating CSV

import os
from datetime import datetime
from PIL import Image

def create_csv(results, results_dir='./'):

    csv_fname = 'results_'
    csv_fname += datetime.now().strftime('%b%d_%H-%M-%S') + '.csv'

    with open(os.path.join(results_dir, csv_fname), 'w') as f:

        f.write('Id,Category\n')

        for key, value in results.items():
            f.write(key + ',' + str(value) + '\n')


image_filenames = next(os.walk('MaskDataset/test'))[2]

results = {}
for image_name in image_filenames:
   img = Image.open('MaskDataset/test/'+image_name).convert('RGB')
   img = img.resize((img_w,img_h))
   img_array = np.array(img)
   img_array = np.expand_dims(img_array, 0) 
   img_array = tf.cast(img_array, tf.float32) / 255.
   prediction = model.predict(img_array)
   results[image_name] = np.argmax(prediction)

create_csv(results)            