## **Introduction**

This notebook has been created for the final contest of Artificial Vision subject at University of Salerno.The aim of this project is to design a DCNN (as regressor or classifier) for age estimation on [VggFace2 dataset](https://github.com/ox-vgg/vgg_face2) labeled with ages by [MiviaLab](https://mivia.unisa.it/).

<br/>

We decided to build a classifier able to recognize 101 classes (ages from 0 to 100), in particular we choose the [Resnet50 model](https://github.com/WeidiXie/Keras-VGGFace2-ResNet50).

In this notebook, we show our **test procedure**.

## **Initialization**

First of all, we have to mount the Drive and to go in the folder where all operations has to be done because it contains all the needed files

In [None]:
from google.colab import drive
import os

drive.mount('/content/drive')
os.chdir('/content/drive/Shareddrives/ArtificialVision/FinalContest2020')

Check if we are using a GPU

In [None]:
%tensorflow_version 2.x
import tensorflow as tf

device_name = tf.test.gpu_device_name()
if device_name != '/device:GPU:0':
  raise SystemError('GPU device not found')
print('Found GPU at: {}'.format(device_name))

Install MTCNN for face detection

In [None]:
!pip3 install mtcnn

Install progressbar for checking progress in predictions

In [None]:
!pip install progressbar

## Load previously trained model

In [None]:
log_dir = "./logs/resnet50/"
model_path = log_dir+"/model/resnet50_25epochs.h5"

In [None]:
from keras.models import load_model

print("Model loading...")
model = load_model(model_path)
print("Model loading...DONE")

## Recover last saved weights

In [None]:
def _find_latest_checkpoint(d):
    all_checks = glob(os.path.join(d, '*'))
    max_ep = 0
    max_c = None
    for c in all_checks:
        epoch_num = re.search(ep_re, c)
        if epoch_num is not None:
            epoch_num = int(epoch_num.groups(1)[0])
            if epoch_num > max_ep:
                max_ep = epoch_num
                max_c = c
    return max_ep, max_c

In [None]:
from glob import glob
import re

dirnm = "training_logs/"
dirnm = os.path.join(log_dir, dirnm) #./logs/<net>/inference-training/
print("Log dir: {}".format(dirnm))
if not os.path.isdir(dirnm): os.mkdir(dirnm)

chk_dir = dirnm + "weights/" #./logs/<net>/inference-training/weights/
print("Checkpoint dir: {}".format(chk_dir))
if not os.path.isdir(chk_dir): os.mkdir(chk_dir)

filepath = os.path.join(chk_dir, "checkpoint.{epoch:02d}.h5")
ep_re = re.compile('checkpoint.([0-9]+).h5')

In [None]:
test_epoch, _ = _find_latest_checkpoint(chk_dir)
print("Using epoch %d" % test_epoch)

print("Weights loading...")
model.load_weights(filepath.format(epoch=int(test_epoch)))
print("Weights loading...DONE")

## Do face extraction (we used annotation provided by MiviaLab)

In [None]:
def extract_face(img):
  if img is not None:
    detector = MTCNN()
    # detect all faces presented in the image
    results = detector.detect_faces(img)
    # compute max detected area for choosing face in close-up
    max_area = 0
    index = 0 
    if len(results) == 0: #if no faces are detected, return original image
      return img
    else:
      for i in range(0, len(results)):
        x1, y1, width, height = results[i]['box']
        area = width*height
        if area>=max_area:
          max_area = area
          index = i

      # crop faces using parameters of max detected area
      x_o, y_o, width, height = results[index]['box']
      if width >= 10 and height >= 10: #if detected area too small, return original image
        # check if top-left point is negative, that means faces outside limits of image
        if x_o >= 0:
          x_o = 0
        if y_o >=0:
          y_o = 0
        # crop
        x1, y1 = x_o, y_o
        x2, y2 = x_o + width, y_o+height
        face = img[y1:y2, x1:x2]
        return face
      else:
        return img
  else:
    print("Image {} not found".format(img))
    return img

## Utility function for reads image from TFRecord

In [None]:
def decode_image(image):
  image = tf.image.decode_jpeg(image, channels=3)
  #image = tf.cast(image, tf.float32)        
  return image

## Dataset creation from TFRecord file

In [None]:
%tensorflow_version 2.x
import tensorflow as tf
from functools import partial
import numpy as np

def read_tfrecord_test(example):
    tfrecord_format = (
        {
          'path': tf.io.FixedLenFeature([], tf.string),
          'image_raw': tf.io.FixedLenFeature([], tf.string),
        }
    )    
    return tf.io.parse_single_example(example, tfrecord_format)
    
def read_tfrecord(example):
    tfrecord_format = (
        {
          'path': tf.io.FixedLenFeature([], tf.string),
          'height': tf.io.FixedLenFeature([], tf.int64),
          'width': tf.io.FixedLenFeature([], tf.int64),
          'label': tf.io.FixedLenFeature([], tf.int64),
          'image_raw': tf.io.FixedLenFeature([], tf.string),
        }
    )
    return tf.io.parse_single_example(example, tfrecord_format)

def load_dataset(filenames, test):
    ignore_order = tf.data.Options()
    ignore_order.experimental_deterministic = False
    dataset = tf.data.TFRecordDataset(filenames) # create dataset from path passed as input
    dataset = dataset.with_options(ignore_order) # uses data as soon as it streams in, rather than in its original order
    
    # read dataset records according to its type (test or not)
    if not test:
      dataset = dataset.map(partial(read_tfrecord))
    else:
      dataset = dataset.map(partial(read_tfrecord_test))
    return dataset

def get_dataset(filenames, dataset_dim, test=False):
    dataset = load_dataset(filenames, test) 
    if not test: #shuffle elements at each epoch
      dataset = dataset.shuffle(dataset_dim//256, reshuffle_each_iteration=True).repeat()
    #This allows later elements to be prepared while the current element is being processed.
    dataset = dataset.prefetch(buffer_size=tf.data.experimental.AUTOTUNE) 
    if not test: # set batch size
      dataset = dataset.batch(batch_size)
    return dataset


## Predictions

In [None]:
PATH_TO_TFR = "./tfrecords/own_test_set_cropped.record"

TOT_TEST_SAMPLE = 126179

In [None]:
import progressbar
import csv

x_test = []

model_name = log_dir.split('/')[-2]
PATH_TO_CSV_PRED = './tfrecords/annotations/own_test_set_predictions_{}.csv'.format(model_name) #GROUP18.csv

print ("Testing {} on {} - {} samples".format(model_name,PATH_TO_TFR,TOT_TEST_SAMPLE))

# create dataset iterator
test_dataset = get_dataset(PATH_TO_TFR, TOT_TEST_SAMPLE, test=True)
test_generator = iter(test_dataset)

MAX_VALUE = TOT_TEST_SAMPLE

print("Writing predictions to {} ...".format(PATH_TO_CSV_PRED))
with progressbar.ProgressBar(max_value=MAX_VALUE) as bar:
  with open(PATH_TO_CSV_PRED, mode='w', newline="", encoding="utf-8") as csvfile:
    csv_writer = csv.writer(csvfile, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
    for j in range(0, TOT_TEST_SAMPLE):
      try:
        #take a single elem of TFRecord (path+image)
        parsing_dict = test_generator.get_next() 
        #read each image-label of the batch
        path = parsing_dict["path"].numpy().decode('utf-8')
        jpg = (decode_image(parsing_dict["image_raw"])).numpy()
        jpg = np.reshape(jpg, [1,jpg.shape[0], jpg.shape[1], jpg.shape[2]])
        # do prediction
        pred = np.argmax(model.predict(jpg))
        # write prediction to CSV file
        csv_writer.writerow([path,int(pred)])
        bar.update(j+1)
      except tf.errors.OutOfRangeError:
        print("Iterator exhausted\n")
print("Writing predictions to {} ... DONE".format(PATH_TO_CSV_PRED))

## Compute MAE

In [None]:
from sklearn.metrics import mean_absolute_error
import csv

y_pred = []
y_true = []

def compute_mae(gt_path, pred_path):
  with open(gt_path) as csvfile_gt:
    with open(pred_path) as csvfile_pred:
      csvreader_gt = csv.reader(csvfile_gt, delimiter=',')
      csvreader_pred = csv.reader(csvfile_pred, delimiter=',')
      for row in csvreader_gt:
        if len(row)!=0:
          val = row[1]
          y_true.append(float(val))
      for row in csvreader_pred:
        if len(row)!=0:
          val = row[1]
          y_pred.append(float(val))
  
  print("MAE:{}".format(mean_absolute_error(y_true, y_pred)))

In [None]:
PATH_TO_CSV_GT = "./tfrecords/annotations/own_test_set_gt.csv"
PATH_TO_CSV_PRED = "./tfrecords/annotations/own_test_set_predictions_xception.csv"

compute_mae(PATH_TO_CSV_GT, PATH_TO_CSV_PRED)