<a href="https://colab.research.google.com/github/fcolombo7/AN2DL-2020/blob/main/Final%20Notebooks/2_Base_CNN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **AN2DL** - Image classification challenge
* **Colombo** Filippo - 10559531
* **Del Vecchio** Giovanni - 10570682

## CNN model (without transfer learning)



Configuration and constants:

In [1]:
import os
import tensorflow as tf
import numpy as np
import pandas as pd
from datetime import datetime

In [2]:
#Random seed to make experiments reproducible
SEED = 1234
tf.random.set_seed(SEED)  

#Parameters
IMG_H, IMG_W = (256, 256)
BS = 64 #BATCH SIZE
VALIDATION_SPLIT = 0.2
DATA_AUGMENTATION = True

Load Google Drive to get the data and save the results:

In [6]:
from google.colab import drive

drive.mount('/content/drive')

cwd = os.getcwd()
drive_root_folder = '/content/drive/My Drive/ANN_project/'

Mounted at /content/drive


### Import the Mask Dataset

Check if the dataset has been already processed, otherwise unzip it.

**N.B.**: we are unzipping in the `/content` folder and not in `drive`.  
If you want to unzip in your drive folder, write `os.chdir(drive_root_folder)`



In [8]:
#check if the dataset is already available
if not os.path.exists(cwd+'/MaskDataset'):
  !unzip '/content/drive/My Drive/ANN_project/artificial-neural-networks-and-deep-learning-2020.zip'
else:
  print('MaskDataset already loaded')

MaskDataset already loaded


### Split the dataset

Definition of the `ImageDataGenerator` for _data augmentation_:

In [9]:
from tensorflow.keras.preprocessing.image import ImageDataGenerator

if DATA_AUGMENTATION:
    train_data_gen = ImageDataGenerator(rotation_range=20, 
                                        width_shift_range=0.3, 
                                        height_shift_range=0.3, 
                                        zoom_range=0.4, 
                                        horizontal_flip=True, #
                                        shear_range=10, 
                                        channel_shift_range=100, 
                                        fill_mode='reflect', 
                                        rescale=1./255)
else:
    train_data_gen = ImageDataGenerator(rescale=1./255)
    
valid_data_gen = ImageDataGenerator(rescale=1./255)

In [10]:
import json

dataset_dir = os.path.join(cwd, "MaskDataset")
training_dir = os.path.join(dataset_dir, "training")
with open(os.path.join(dataset_dir,"train_gt.json")) as f:
  dic = json.load(f)
  dataframe = pd.DataFrame(dic.items())
  dataframe.rename(columns = {0:'filename', 1:'class'}, inplace = True)
  dataframe = dataframe.sample(frac=1, random_state=SEED)
  
  tot_length = dataframe.shape[0]
  valid = dataframe.iloc[:int(np.ceil(tot_length * VALIDATION_SPLIT)),:] 
  train = dataframe.iloc[int(np.ceil(tot_length * VALIDATION_SPLIT)):,:]
  train["class"] = train["class"].astype('string')
  valid["class"] = valid["class"].astype('string')
  
  train_gen = train_data_gen.flow_from_dataframe(train,
                                               training_dir,
                                               batch_size=BS,
                                               target_size=(IMG_H, IMG_W),
                                               class_mode='categorical',
                                               shuffle=True,
                                               seed=SEED)
  
  validation_gen = valid_data_gen.flow_from_dataframe(valid,
                                               training_dir,
                                               batch_size=BS,
                                               target_size=(IMG_H, IMG_W),
                                               class_mode='categorical',
                                               shuffle=True,
                                               seed=SEED)

Found 4491 validated image filenames belonging to 3 classes.
Found 1123 validated image filenames belonging to 3 classes.


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  from ipykernel import kernelapp as app


### Build the Model

#### Create the dataset objects

In [11]:
num_classes = len(train_gen.class_indices)
num_classes
train_ds = tf.data.Dataset.from_generator(lambda: train_gen, 
                                          output_types=(tf.float32, tf.float32),
                                          output_shapes= ([None, IMG_H, IMG_W, 3], [None, num_classes]))

train_ds = train_ds.repeat()

valid_ds = tf.data.Dataset.from_generator(lambda: validation_gen,
                                          output_types = (tf.float32, tf.float32),
                                          output_shapes = ([None, IMG_H, IMG_W, 3], [None, num_classes]))
valid_ds = valid_ds.repeat()

#### CNN model

In [12]:
def get_feature_extraction(start_f=8, depth=6, conv_per_depth = 1, activation='relu', kernel_size = 3, stride = (1, 1)):
  # Architecture: Features extraction -> Classifier
  model = tf.keras.Sequential()

  # Features extraction
  for i in range(depth):

      if i == 0:
          input_shape = [IMG_H, IMG_W, 3]
      else:
          input_shape=[None]

      # Conv block: Conv2D -> Activation -> Pooling
      for n in range(conv_per_depth):
        model.add(tf.keras.layers.Conv2D(filters=start_f, 
                                        kernel_size=(kernel_size, kernel_size),
                                        strides=stride,
                                        padding='same',
                                        input_shape=input_shape,
                                        activation = activation))
      model.add(tf.keras.layers.MaxPool2D(pool_size=(2, 2)))

      start_f *= 2

  return model

In [13]:
def get_classifier(model, hidden_layer=3, units=512, dropout=0.2, regularization=0.001, activation='relu'):
  model.add(tf.keras.layers.Flatten())
  if dropout and regularization:
    for i in range(hidden_layer):
      model.add(tf.keras.layers.Dense(units=units, activation=activation, kernel_regularizer=tf.keras.regularizers.l2(regularization)))
      model.add(tf.keras.layers.Dropout(dropout))
  elif dropout:
    for i in range(hidden_layer):
      model.add(tf.keras.layers.Dense(units=units, activation=activation))
      model.add(tf.keras.layers.Dropout(dropout))
  elif regularization:
    for i in range(hidden_layer):
      model.add(tf.keras.layers.Dense(units=units, activation=activation, kernel_regularizer=tf.keras.regularizers.l2(regularization)))
  else:
    for i in range(hidden_layer):
      model.add(tf.keras.layers.Dense(units=units, activation=activation))

  model.add(tf.keras.layers.Dense(units=num_classes, activation='softmax'))
  return model

In [14]:
model = get_feature_extraction(depth = 5)
model = get_classifier(model, hidden_layer=1, units=512, regularization=0.001, dropout=0.2)
model.summary()

Model: "sequential"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
conv2d (Conv2D)              (None, 256, 256, 8)       224       
_________________________________________________________________
max_pooling2d (MaxPooling2D) (None, 128, 128, 8)       0         
_________________________________________________________________
conv2d_1 (Conv2D)            (None, 128, 128, 16)      1168      
_________________________________________________________________
max_pooling2d_1 (MaxPooling2 (None, 64, 64, 16)        0         
_________________________________________________________________
conv2d_2 (Conv2D)            (None, 64, 64, 32)        4640      
_________________________________________________________________
max_pooling2d_2 (MaxPooling2 (None, 32, 32, 32)        0         
_________________________________________________________________
conv2d_3 (Conv2D)            (None, 32, 32, 64)        1

#### Compile the model

In [None]:
def get_optimizer(lr_schedule = 1e-3):
  return tf.keras.optimizers.Adam(learning_rate=lr_schedule)

In [None]:
def get_callbacks(name, es=True, patience = 10):
  obj = []

  exps_dir = os.path.join(drive_root_folder,'classification_result')
  if not os.path.exists(exps_dir):
    os.mkdir(exps_dir)

  now = datetime.now().strftime('%b%d_%H-%M-%S')
  temp_dir = name +'_'+ now

  #Model Checkpoints
  ckpt_dir = os.path.join(exps_dir, temp_dir, 'ckpts')
  if not os.path.exists(ckpt_dir):
    os.makedirs(ckpt_dir)

  obj.append(tf.keras.callbacks.ModelCheckpoint(filepath = os.path.join(ckpt_dir, 'cp.ckpt'),
                                                save_weights_only = True,
                                                save_best_only = True))
  
  #Tensor board logs
  tb_dir = os.path.join(exps_dir, temp_dir + '/tb_logs')
  if not os.path.exists(tb_dir):
    os.makedirs(tb_dir)
  obj.append(tf.keras.callbacks.TensorBoard(log_dir=tb_dir,
                                            profile_batch = 0,
                                            histogram_freq = 1))
  if es:
    obj.append(tf.keras.callbacks.EarlyStopping(monitor = 'val_loss', patience = patience))

  return obj

#### Train

In [None]:
name = 'CNN-depth5' #check the model name in the drive folder (the time stamp changes)
model.compile(optimizer=get_optimizer(),
              loss=tf.keras.losses.CategoricalCrossentropy(),
              metrics=['accuracy'])

model.fit(x=train_ds,
          epochs=100,
          steps_per_epoch=len(train_gen),
          validation_data=valid_ds,
          validation_steps=len(validation_gen), 
          callbacks=get_callbacks(name, es=True, ),
          verbose = 1)

### Load the best model according to the checkpoints

In [None]:
model_to_load = 'CNN-depth5' #check the model name in the drive folder (the time stamp changes)
ckpt_path = os.path.join(drive_root_folder,'classification_result',model_to_load, 'ckpts','cp.ckpt')
model.load_weights(ckpt_path)

<tensorflow.python.training.tracking.util.CheckpointLoadStatus at 0x7f49301b9dd8>

## Predictions


First load the test images:

In [None]:
import pathlib
path = os.path.join(os.getcwd(),'MaskDataset', 'test')
data_dir = pathlib.Path(path)

test_image_filenames = list(data_dir.glob('*.jpg'))
len(test_image_filenames)

In [None]:
test_normalization = tf.keras.Sequential([
                                       tf.keras.layers.experimental.preprocessing.Rescaling(1./255)                 
])

In [None]:
def make_prediction():
  results = {}
  results_expanded = {}
  for test_image in test_image_filenames:
    img = tf.keras.preprocessing.image.load_img(test_image, target_size=(IMG_H, IMG_W))
    img_array = tf.keras.preprocessing.image.img_to_array(img)
    img_array = tf.expand_dims(img_array, 0) # Create a batch

    normalized_img = test_normalization(img_array, training=True)
    predictions = model.predict(normalized_img)
    results_expanded[os.path.basename(test_image)]=predictions
    results[os.path.basename(test_image)]=np.argmax(predictions)
  return results, results_expanded

In [None]:
def create_csv(results, results_dir):
    csv_fname = 'results_'
    csv_fname += datetime.now().strftime('%b%d_%H-%M-%S') + '.csv'

    with open(os.path.join(results_dir, csv_fname), 'w') as f:
        f.write('Id,Category\n')
        for key, value in results.items():
            f.write(key + ',' + str(value) + '\n')

In [None]:
create_csv(make_prediction()[0], drive_root_folder)