<a href="https://colab.research.google.com/github/kode-git/FER-Visual-Transformers/blob/main/Preprocessing.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Preprocessing

In this notebook, we will balance dataset and prepares data for the training phase.

## Install Dependencies and Import Libraries

In [1]:
!pip3 install Pillow



In [9]:
# classic libraries for collections.
import pandas as pd
import numpy as np

# utility library.
import random, time, copy, sys, shutil

# plot libraries.
import matplotlib.pyplot as plt
%matplotlib inline
import seaborn as sns

# libraries for image processing.
import os, cv2, glob, imageio, sys
from PIL import Image

# warning library for service warnings.
import warnings

# ImageDataGenerator from keras library.
from tensorflow.keras.preprocessing.image import ImageDataGenerator

# colab library.
from google.colab import drive

In [4]:
# load Google Drive environment.
drive.mount('/content/drive')

Mounted at /content/drive


## Image Worker Implementation


ImageWorker provides some useful functions:
- Format Converter: For resize and move an image from *source_path* to *dest_path* filtered for *format_img*
- List Classes: Listing the classes and put them in an array to manipulate the subfolders for class functions divisions.
- Counter Samples per Class: Given a *dataset_path*, return a dictionary with counters of images classified by subfolders for plot or data visualization pourposes. 
- Counter Samples: Given a *dataset_path*, return a counter of images in the tree.
- Extension Converter: Convert an image format for every image in a specified path
- Counter Files Extension: Given a *path*, return the counter of image in the directory with a specific *format*
- Navigate Path: Counter every file in a subtree

In [5]:
class ImageWorker():
    """
    Image Worker class for Data Integration.
    This class manages images data, size and format.
    """
    def __init__(self) -> None:
         pass

    def format_converter(self, path, format_img, source_path, dest_path, resize=(224,224)):
        """
        Move an image from source_path to dest_path.
        Images selected follow format_img.
        There is a default resize of (224,244).
        """
        count = 0
        for file in glob.glob(path + "/*." + format_img):
            img = cv2.imread(file, cv2.IMREAD_UNCHANGED)
            resized = cv2.resize(img, resize, interpolation=cv2.INTER_CUBIC)
            cv2.imwrite(dest_path + "resized_on_" + source_path + "_" + str(count) + "."+ format_img, resized)
            count += 1

    def list_classes(self, dataset_path):
        """
        List the classes of a dataset.
        """
        langs = []
        for el in glob.glob(dataset_path):
          langs.append(os.path.basename(str(el)))
        return langs


    def counter_samples_on_class(self, dataset_path):
      """
      Counts samples of classes.
      Each class has its own counter.
      Return a dictionary with (class, counter) pair.
      """
      classes = self.list_classes(dataset_path)
      counter_classes = {}
      if dataset_path[len(dataset_path) - 1] == "/":
          path = dataset_path
      if dataset_path[len(dataset_path) - 1] == "*":
          path = dataset_path[0:len(dataset_path) - 2] + "/"
      else:
          path = dataset_path + "/"
      for class_ in classes:
        counter = 0
        for file in glob.glob(path + class_ + "/*"):
            counter += 1
        counter_classes[class_] = counter
      return counter_classes


    def counter_samples(self, dataset_path):
      """
      Counts total samples of a dataset.
      """
      a = self.counter_samples_on_class(dataset_path)
      counter = 0
      for el in a.keys():
        counter += a[el]
      return counter


    def extension_converter(self, path, format_source, format_result, dest_path):   
      """
      Convert a file from format_source to format_result.
      The file is loaded from path and the result is stored to dest_path.
      """
      for file in glob.glob(path + "/*." + format_source):
          im1 = Image.open(file)
          im1.save(file[0:len(file)-4] + "." + format_result)
          os.remove(file)


    def counter_file_extension(self, path, format):
      """
      Counts samples in path based on format input.
      """
      counter = 0
      for file in glob.glob(path + "/*." + format):
          counter += 1
      return counter
      

    def navigate_path(self, path):    
      """
      Navigate in the path and counts every file
      """
      count = 0
      for dir in os.listdir(path):
          if os.path.isfile(os.path.join(path, dir)):
              count += 1
      return count

In [7]:
# define Image Worker instance
iw = ImageWorker()

## Common utilities


We implemented some logic and reusable functions useful for the data analysis or data manipulation phases. These functions carry out support routines for ImageWorker's class. They are:
- Min, Max and Mean: According to values or set of values passed as parameter.
- Plot Dataset: Function for plot image's dataset and color values according to the mean of classes cardinalities.
- Channel Distribution: Analyze images and return counters of images for different channels dimension.

In [6]:
def mean(values):
  """
  Calculates the mean in values.
  """
  if len(values) <= 0:
    return 0
  else:
    sum = 0
    for el in values:
      sum += el
    return int(sum / len(values))

def min(val):
  """
  Calculates the minimum in val.
  """
  min = sys.maxsize
  for el in val.keys():
    if val[el] < min:
      min = val[el]
  return min


def max(val):
  """
  Calculates the maximum in val.
  """
  max = sys.minsize
  for el in val.keys():
    if val[el] > max:
      max = val[el]
  return max


def plot_dataset(dataset_path, title=""):
  """
  Plot the dataset and color bars.
  Color depends on the lower bound and upper bound.
  The mean value is the congiuntion between lower and upper bound.
  """
  classes = iw.list_classes(dataset_path)
  l_classes = iw.counter_samples_on_class(dataset_path)

  fig = plt.figure()
  ax = fig.add_axes([0,0,1,1,])
  x = [l_classes[class_] for class_ in classes]
  y = [class_ for class_ in classes]
  
  colors = []
  x_cap = mean(x)

  # colors identify when the elements are greater or lesser than the mean values.
  for el in x:
    if el < x_cap:
      colors.append("#BC3434")
    else:
      colors.append("#49A131")
  ax.bar(y, x, color=colors)
  plt.title(title)
  plt.show()

## Data Balancing

AVFER contains AffectNet in the validation and testing set and FER-2013 and CK+48 in the training set. We need to balance it remains only a small amount of samples in the val/test sets and put the rest in the training set. We need to balance every class in the testing and validation set before put the residual samples in the training folder.

In [None]:
!mkdir /content/drive/MyDrive/Datasets/AVFER/tmp

In [None]:
# main variables.
basedir = "/content/drive/MyDrive/Datasets/AVFER/"
types = ['val', 'test', 'train']
ref = [str(basedir  + types[i] + "/*") for i in range(len(types))]
total = 0
for el in ref:
  total += iw.counter_samples(el)

# Splitting ratio 100 :-> 80/20 (train/test) and 80 :-> 90/10 (train/val).
train_ratio = 80
val_ratio = 10
test_ratio = 20

# Splitting distribution.
val_amount = int((((total * train_ratio) / 100) * 10 / 100))
test_amount = int((total * test_ratio)/ 100)
train_amount = int(total - (val_amount + test_amount))
print('Amount of samples per class for validation set:', val_amount) 
print('Amount of samples per class for testing set:', test_amount) 
print('Amount of samples per class for training set:', train_amount) 

# check augmentation of 3. 
augm_train = train_amount * 3
print('Amount of augmented training set: ', augm_train)
print('Amount of samples for augmented training set divided by classes:', int(augm_train / 8))
print('Amount of samples for validation set divided by classes:', int(val_amount / 8))
print('Amount of samples for testing set divided by classes:', int(test_amount / 8))

# validation balancing.
val = ref[0]
min = sys.maxsize
countcl = iw.counter_samples_on_class(val)
for el in countcl.keys():
  if countcl[el] < min:
    min = countcl[el]

# controls on the minimum number.
if min < (val_amount / 8):
  print('Error, the amount of samples for the validation set can\'t be reduce to minimum values')

# updating residuals counters.
classes = iw.list_classes(val)
residual = {}
for cl in classes:
  residual[cl] = countcl[cl] - ((val_amount / 8) + (test_amount / 8))

# create temporal classes directories
for cl in classes:
  os.mkdir(basedir + "tmp/" + cl + "/")

print('-'*40)
# check validation preconditions.
print('Verify the correct amount for validation...')
err = False
for cl in classes:
  if countcl[cl] - residual[cl] - (test_amount / 8) < (val_amount / 8):
    print(f'Error, the residual amount put class {cl} to illegal value')
    err = True

if not err:
  print('Splitting possible.')

# splittig training and validation set according to the proportion previously calculated.
valdir = basedir + ref[0] + "/"
traindir = basedir + ref[2] + "/"
for cl in classes:
  c = 0
  for fl in glob.glob(valdir + cl + "/*"):
    if c < int(residual[cl]):
      c += 1
      shutil.copyfile(fl, os.path.join(traindir + cl, os.path.basename(fl)))
      os.remove(os.path.join(valdir + cl, os.path.basename(fl)))
    else:
      break

We put the residual samples of AffectNet from validation set to training set. The current amount of samples in the validation set is the sum of validation and testing validation. If we want to be sure that the splitting is doing correctly, we will execute the followings checking.

In [None]:
# check testing splitting preconditions.
residual_test = {}
print('Verify the correct amount for testing...')
for cl in classes:
  residual_test[cl] = countcl[cl] - residual[cl]
  if residual_test[cl] - (test_amount / 8) < (val_amount / 8):
    print(f'Error, the residual amount put class {cl} to illegal value')
    err = True

# check errors.
if not err:
  print('Splitting possible.')

# shows the current status of the validation set.
print('Current status of the validation set:')
print('-'*60)
valdir = ref[0]
clcount = iw.counter_samples_on_class(valdir)
res = 0

# checks on classes.
for cl in classes:
  print(f'Total amount of {cl} samples in validation samples:', clcount[cl])
  res = clcount[cl] - int((test_amount / 8))
  print(f'Residual amount after last splitting {res}')
print('-'*60)
print(f'Final total amount of samples in the validation set:', res * 8)

In [None]:
# update directory references
valdir = basedir + "val" + "/"
testdir = basedir + "tmp" + "/"

# splitting processing between validation and testing set for balance samples.
for cl in classes:
  c = 0
  for fl in glob.glob(valdir + cl + "/*"):
    if c < int((test_amount / 8)):
      c += 1
      shutil.copyfile(fl, os.path.join(testdir + cl, os.path.basename(fl)))
      os.remove(os.path.join(valdir + cl, os.path.basename(fl)))
    else:
      break

print('Splitting completed.')

In [None]:
# delete the subtree of th temporal directory.
for cl in classes:
  shutil.rmtree(basedir + "tmp/" + cl + "/")
os.rmdir(basedir + "tmp/")

In [None]:
# plot testing set.
testdir = "/content/drive/MyDrive/Datasets/AVFER/test"
plot_dataset(testdir + "/*")

In [None]:
# plot validation set.
valdir = "/content/drive/MyDrive/Datasets/AVFER/val"
plot_dataset(valdir + "/*")

In [None]:
# plot training set.
traindir = "/content/drive/MyDrive/Datasets/AVFER/train"
plot_dataset(traindir + "/*")

In [None]:
# counter total samples of the training set.
print('Tot. samples in the training set:', iw.counter_samples(traindir + "/*"))

To adjust that, we need to put more data in the training set taken from the validation set and try to underestimate the variation in the training phase. So, we can have more variety during the training phase and maintain high variance in well-formed images from AffectNet in the validation set and testing set to analyze the results.

In [None]:
# display the total amount of samples for each class.
quantities = iw.counter_samples_on_class(valdir + "/*")
for cl in quantities.keys():
  print('Amount for class {} is {}'.format(cl, quantities[cl]))

In [None]:
# display the future amount of data for each class with the respective residual value.
counters = iw.counter_samples_on_class(valdir + "/*")
for key in counters.keys():
  print(f'The current amount for the class {key} is {counters[key]}')
print('-'*40)
print('Balance it as well as the testing set...')

# residual is the remaining samples in the validation set.
residual = 680

# apply the balancing, moves images from validation set to training set until balancing.
# residual is the result size of samples collections for each class in the validation set.
for cl in glob.glob(valdir + "/*"):
  total = counters[os.path.basename(cl)]
  count = total - residual
  print("{}: {} -> {} with residual of {}".format(os.path.basename(cl), total, count, total - count ))
  for fl in glob.glob(cl + "/*"):
    if count != 0:
      count = count - 1
      shutil.copyfile(fl, os.path.join(traindir,os.path.basename(cl), os.path.basename(fl)))
      os.remove(fl)
    else:
      break

print('Balanced completed.')
print('-'*40)

In [None]:
# plot the result training set.
traindir = "/content/drive/MyDrive/Datasets/AVFER/train"
plot_dataset(traindir + "/*")

In [None]:
# plot the validation set.
valdir = "/content/drive/MyDrive/Datasets/AVFER/val"
plot_dataset(valdir + "/*")

In [None]:
# plot the testing set.
testdir = "/content/drive/MyDrive/Datasets/AVFER/test"
plot_dataset(testdir + "/*")

Unfortunately, the amount of data for the training set is unbalanced. So, we need to preprocess data of this subfolder using data augmentation and possible integration with additional data, especially for the disgust and contempt classes.

## Convert Formats

AVFER has some samples from CK+ as PNG and, generally, some images have 1 channel (gray-scaling). We will avoid the artificial coloring and reduce side-effects of the fooling image with only changes the number of channels and put the pixel value in RGB.

In [None]:
# base path for of the dataset.
path = "/content/drive/MyDrive/Datasets/AVFER/"

# checking the number of channels.
total = [0,0] # 0 for 1 channel, 1 for 3 channels.
for path in glob.glob(path + "*"):
  for cl in glob.glob(path + "/*"):
    counter = [0,0] # 0 for 1 channel, 1 for 3 channels.
    for fl in glob.glob(cl + "/*"):
      image = cv2.imread(fl)
      if(len(image.shape)<2):
        counter[0] += 1
        conv = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        if len(image.shape)==3:
          cv2.imwrite(fl, image)
      else:
        counter[1] += 1
    total = [total[i] + counter[i] for i in range(2)]

print('Tot. number of channels, respectively, in gray-scales and RGB:', total)

In general, elements in the gray-scale mode are FER-2013 and CK+48 samples. So we need to convert them to RGB (3 channels). In the gray-scale samples, each pixel has 1 byte (8 bit equals to a value from 0 to 255, corresponding of the brighness value of the image as described in the gray-scale representation). During the transformation, we need to convert the byte associated to one pixel in 3 channels representation corresponding to 3 bytes equals to the red, green and blue values. In this convertion, each channel has the same value of the brighness to mantain the gray tone of colored pixel without value perturbation.

In [None]:
# base path of the dataset.
path = "/content/drive/MyDrive/Datasets/AVFER/"
total = [0,0] # 0 for 1 channel, 1 for 3 channels.

# check changes after formatting.
for path in glob.glob(path + "*"):
  for cl in glob.glob(path + "/*"):
    counter = [0,0] # 0 for 1 channel, 1 for 3 channels.
    for fl in glob.glob(cl + "/*"):
      image = cv2.imread(fl)
      if(len(image.shape)<2):
        counter[0] += 1
      else:
        counter[1] += 1
    total = [total[i] + counter[i] for i in range(2)]

print('Tot. number of channels, respectively, in gray-scales and RGB:', total)

In [10]:
image = Image.open('/content/drive/MyDrive/Datasets/FER-2013/train/anger/Training_10118481.jpg')
# L is gray-scale.
print('Gray-scale mode:', image.mode) 

Gray-scale mode: L


In [None]:
# test image in L mode.
test = '/content/drive/MyDrive/Datasets/FER-2013/train/anger/Training_10118481.jpg'
pic = imageio.imread(test)
image = Image.open(test)

# convert to RGB.
image = image.convert('RGB')
im_rgb = cv2.cvtColor(pic, cv2.COLOR_BGR2RGB)
# temporal saving .
tmp = '/content/test.jpg'
image.save(tmp)
pic2 = imageio.imread(tmp)

# remove temporal image.
os.remove(tmp)

# plot the result and compare with converted RGB and original version.
fig, ax = plt.subplots(nrows = 1, ncols=2, figsize=(15,5)) 
ax1, ax2 = ax
ax1.imshow(im_rgb[ :, :])
ax2.imshow(pic2[:,:,:])

In [None]:
# display array of pixel values for a colored image.
pic3 = imageio.imread("/content/drive/MyDrive/Datasets/AffectNet/train_class/anger/Copia di image0000006.jpg")
pic3[0] 

In [None]:
# display rray of pixels values for 3 channels adaptation of the gray-scale image.
pic2[0]

In [None]:
# training dataset reference.
ref = "/content/drive/MyDrive/Datasets/AVFER/train"

# automatic convert frm .png to .jpg (only on Google Drive)
for cl in glob.glob(ref + "/*"):
  print("Convert from class {}".format(cl))
  for fl in glob.glob(cl + "/*"):
    if fl[len(fl)-4:len(fl)] == '.png':
      os.rename(fl, str(fl[0:len(fl)-4]) + ".jpg")

In [None]:
# counting of possible residual .png images after convertion.
counter = 0
for cl in glob.glob(ref + "/*"):
  print("Counting png on class {}".format(cl))
  for fl in glob.glob(cl + "/*"):
    if fl[len(fl)-4:len(fl)] == '.png':
      counter += 1

# show the result.
print('Remaining png samples: {}'.format(counter))

## Data Augmentation

Renaming the source folders for augmentation, only train is considered.

In [None]:
# renaming files pre-augmentation.
traindir = "/content/drive/MyDrive/Datasets/AVFER/train"
for cl in glob.glob(traindir + "/*"):
  counter = 0
  for img in glob.glob(cl + "/*"):
      os.rename(img, cl + "/" + os.path.basename(cl) + "-" + str(counter) + "-file" + img[len(img) - 4 : len(img)])
      counter += 1

In [None]:
def generator(path, 
              format_img, 
              dest_path,
              starting_counter=0, 
              num_augment=1, 
              zoom_range=0.6, 
              brightness_range=(0.2,0.8),
              width_shift_range=0.2,
              height_shift_range=0.2,
              rotation_range=10
              ):
   """
   Generate images with augmentation parameters specified in input.
   Zero parameters avoids a specific technique application.
   The generator can do strong augmentation according to the num_augment value.
   """
   # generator declaration.
   gen = ImageDataGenerator(
            featurewise_center=True,
            featurewise_std_normalization=True,
            rotation_range=rotation_range,
            width_shift_range=width_shift_range,
            height_shift_range=height_shift_range,
            brightness_range=brightness_range,
            zoom_range=zoom_range,
            horizontal_flip=True
            )
   print(f'Data Augmentation parameters:\nZoom Range: {zoom_range}\nBrighness Range: {brightness_range}\nShift: ({width_shift_range},{height_shift_range})\nRotation Degrees: {rotation_range}')
   
   # make directory in case it doesn't exist.
   if iw.navigate_path(dest_path) == 0:
      try:
        os.mkdir(dest_path)
      except FileExistsError:
        pass
   num_el = iw.navigate_path(dest_path)
   c = starting_counter
   print('Starting generation...')
   for file in glob.glob(path + "/*." + format_img):
      img = cv2.imread(file)
      # convert to numpy array.
      # expand dimension to one sample.
      samples = np.expand_dims(img, 0)
      iterator = gen.flow(samples, batch_size=1)
      for i in range(0, num_augment):
          batch = iterator.next()
          image = batch[0].astype('uint8')
          cv2.imwrite(dest_path + "augmented_on_" + str(num_el) + "_" + str(c) + "."+ format_img, image)
          c += 1
   print(f'Data Augmentation for the {path} is done!')

In [None]:
!rm -rf /content/drive/MyDrive/Datasets/AVFER/train/.ipynb_checkpoints

The total amount of copies for each samples depends on the total amount of images for its class. We can calculate the number of copies N as follow:
$ C(i) = T \div N(i) + 1 $ $ \forall i \in classes.keys()$

With $ T $ the total amount of samples that we want for each class, it is equal to 20.000, $ N(i) $ the initial amount of samples for the class $ i $ and $C(i)$ the final amount of data of the class $ i $ after the augmentation phase.

In [None]:
# utilities variables.
augm_class = []
copies = {}

# counters of samples in each class of the training set. 
categories_data = iw.counter_samples_on_class("/content/drive/MyDrive/Datasets/AVFER/train/*")

# defines the number of copies weighted to the current amount of data of the class samples.
for cl in categories_data.keys():
  copies[cl] = int(20000/categories_data[cl]) + 1

In [None]:
%%time

# applying weighted data augmentation.
for class_ in categories_data.keys():
  print('Generation of {} samples with {} copy...'.format(class_, copies[class_]))
  generator("/content/drive/MyDrive/Datasets/AVFER/train/" + class_, "jpg", "/content/drive/MyDrive/Datasets/AVFER/train/" + class_ + "/", 0, num_augment=copies[class_])

In [None]:
# making of the result and augmented dataset version. VFER is the augmented version of AVFER.
!rm -rf /content/drive/MyDrive/Datasets/VFER/
!mkdir /content/drive/MyDrive/Datasets/VFER/
!mkdir /content/drive/MyDrive/Datasets/VFER/train
!mkdir /content/drive/MyDrive/Datasets/VFER/val
!mkdir /content/drive/MyDrive/Datasets/VFER/test

In [None]:
# creates subfolders for VFER.
base_dir = "/content/drive/MyDrive/Datasets/VFER/"
x = "/content/drive/MyDrive/Datasets/AffectNet/train_class/"
subfolders = [el for el in os.listdir(base_dir)]
classes = [cl for cl in os.listdir(x)]
for cl in classes:
  for folder in subfolders:
    os.mkdir(base_dir  + folder + "/" + cl)

In [None]:
# dump balanced dataset to VFER from AVFER samples.
source_folder = r"/content/drive/MyDrive/Datasets/AVFER/"
destination_folder = r"/content/drive/MyDrive/Datasets/VFER/"
cap_class = 20000
print('Start dataset copying...')
print('-'*40)
# fetch all files.
for subfolder in os.listdir(source_folder):
  if subfolder == ".ipynb_checkpoints":
      print('Passing checkpoint files')
      continue
  print("Going in the subfolder {}".format(subfolder))
  for cl in os.listdir(source_folder + subfolder + "/"):
    print("Going in the class {}/{}".format(subfolder, cl))
    if cl == ".ipynb_checkpoints":
      print('Passing checkpoint files')
      continue
    else:
      counter_file = 0
      for file_name in glob.glob(source_folder  + subfolder + "/" + cl + "/*.jpg"):
        # construct full file path.
        source = source_folder + subfolder + "/" + cl + "/" + os.path.basename(file_name)
        destination = destination_folder + subfolder + "/" + cl + "/" + os.path.basename(file_name)
        # copy only files.
        if os.path.isfile(source) and counter_file < cap_class:
          counter_file += 1
          shutil.copy(source, destination)
        if counter_file > cap_class:
          break
  
      print('Copied on {}'.format(cl))
print('-'*40)
print('Dump done.')

In [None]:
# plot final training set.
plot_dataset("/content/drive/MyDrive/Datasets/VFER/train/*")

In [None]:
# plot final validation set.
plot_dataset("/content/drive/MyDrive/Datasets/VFER/val/*")

In [None]:
# plot final testing set.
plot_dataset("/content/drive/MyDrive/Datasets/VFER/test/*")