# Skillbox. Thesis on computer vision. Recognition of human emotions.

## Settings

### Main

In [None]:
PROJECT_NAME = 'skillbox-computer-vision-project' # Project name
LOCAL_PROJ_PATH = f'D:/{PROJECT_NAME}' # Path to the project folder on the local computer
COLAB_PROJ_PATH = f'/content/{PROJECT_NAME}' # Path to the project folder in Google Colab session storage
LOCAL_GD_PROJ_PATH = f'G:/My Drive/{PROJECT_NAME}' # Path to the project folder on Google Drive on the local computer
COLAB_GD_PROJ_PATH = f'/content/drive/MyDrive/{PROJECT_NAME}' # Path to the project folder on Google Drive in Google Colab
TRAIN_DATASET_PATH = 'train' # Path to the original training dataset inside the project folder
TEST_DATASET_PATH = 'test_kaggle' # Path to the original test dataset inside the project folder
TRAIN_DATASET_URL = 'https://drive.google.com/file/d/1TG9P5B2k3eTbC4XDxDmEc07dyAORPC16/view?usp=sharing' # Link to the training dataset archive
TRAIN_DATASET_EXT = 'zip' # Type (file extension) of the training dataset archive
TEST_DATASET_URL = 'https://drive.google.com/file/d/12QrDrLT1F-X7UycvOoApXFqxTw3Zx93K/view?usp=sharing' # Link to the test dataset archive
TEST_DATASET_EXT = 'zip' # Type (file extension) of the test dataset archive
KAGGLE_API_TOKEN_URL = 'https://drive.google.com/file/d/1yS7Y5xFBxTYRPQd9sx1TudLWYYNOUGbu/view?usp=share_link' # Link to the token for connecting to the Kaggle platform via API
MAX_INFERENCE_TIME = .033 # Maximum allowed model inference time in seconds
INFERENCE_TIME_WEIGHT = .6 # Inference time weight when selecting a base model
FACE_DETECTOR_CLASSIFIER = 'haarcascade_frontalface_default.xml' # Trained XML classifier
FACE_DETECTOR_SCALE_FACTOR = 1.1 # Specify how much the image size is reduced at each image scale
FACE_DETECTOR_MIN_NEIGHBORS = 3 # Specify how many neighbors each candidate rectangle should have to retain it
import cv2 as cv
FACE_DETECTOR_FLAGS = cv.CASCADE_SCALE_IMAGE + cv.CASCADE_FIND_BIGGEST_OBJECT # Flags (It informs the classifier that the Haar features for detecting the face are applied to the image and It instructs the classifier to find the biggest face in the image)
FACE_DETECTOR_MIN_RATIO = 0.5 # Specify the minimum ratio of size of a face that we are expecting to detect to size of a image
BASE_MODEL_MAX_SIZE = 64 # Maximum allowed base model size in MB
BASE_MODEL_POOLINGS = 'avg' # Pooling type at the output of the base models ('avg' - average, 'max' - max)
MODEL_ON_TOP_DENSE_NUMS = [1, 2] # Options for the number of additional fully connected layers
MODEL_ON_TOP_DENSE_UNITS = [512, 1024] # Options for the number of output neurons in the additional fully connected layer
MODEL_ON_TOP_DROPOUT_RATES = [.0, .2] # Options for the proportion of data to drop before feeding into the fully connected layer during training
OPTIMIZER = 'Adam' # Name of the optimizer used to train the model
MODEL_ON_TOP_INITIAL_LEARNING_RATE = 1e-4 # Initial learning rate of the model on top
MODEL_ON_TOP_LEARNING_RATE_DECAY_RATE = 0.96 # The rate at which the learning rate of the model on top changes after each epoch
MODEL_INITIAL_LEARNING_RATE = 1e-5 # Initial learning rate of the model when fine-tuning
MODEL_LEARNING_RATE_DECAY_RATE = 0.96 # The rate at which the model's learning rate changes after each epoch when fine-tuning
RANDOM_FLIP = 'horizontal' # Type of random image flip
RANDOM_ZOOM = .2 # Maximum image zoom
RANDOM_ROTATION_FACTOR = .1 # Maximum image rotation (in fractions of a full rotation - 360°)
RANDOM_CONTRACT_FACTOR = .2 # Maximum contrast change (as a fraction of the original value)
RANDOM_BRIGHTNESS_FACTOR = .2 # Maximum brightness change (as a fraction of the original value)
SEED = 123 # Random number generator initializer
VERBOSE = 1 # Verbosity mode (0-quiet, 1-message output)

### Description of emotions

If data is marked up using emotion ordinal numbers, the emotion names should be listed as a list or tuple.

When using Valence-Arousal markup, the description of emotions should be presented as a dictionary. The dictionary keys should be the names of emotions. The dictionary values should be pairs of numbers characterizing the Valence and Arousal levels of emotions. Valence and Arousal levels should be numbers in the range from -1.0 to 1.0 inclusive.

In both cases, the listing of emotions should be done in alphabetical order.

In [None]:
EMOTIONS = (
    'anger', # anger, rage
    'contempt', # contempt
    'disgust', # disgust
    'fear', # fear
    'happy', # cheerful
    'neutral', # neutral
    'sad', # sadness
    'surprise', # astonishment
    'uncertain', # uncertainty
)

### List of base models from Keras Applications with [reference data](https://keras.io/api/applications/):
- size in MB (Size (MB))
- prediction accuracy in % (Top-1 Accuracy)

In [None]:
KERAS_BASE_MODELS = {
    'MobileNet': (16, 70.40),
    'MobileNetV2': (14, 71.30),
    'NASNetMobile': (23, 74.40),
    'InceptionV3': (92, 77.90),
    'ResNet50V2': (98, 76.00),
    'EfficientNetB0': (29, 77.10),
    'ResNet50': (98, 74.90),
    'EfficientNetB1': (31, 79.10),
    'VGG16': (528, 71.30),
    'ResNet101V2': (171, 77.20),
    'DenseNet121': (33, 75.00),
    'EfficientNetB2': (36, 80.10),
    'VGG19': (549, 71.30),
    'ResNet101': (171, 76.40),
    'DenseNet169': (57, 76.20),
    'ResNet152V2': (232, 78.00),
    'Xception': (88, 79.00),
    'DenseNet201': (80, 77.30),
    'ResNet152': (232, 76.60),
    'InceptionResNetV2': (215, 80.30),
    'EfficientNetB3': (48, 81.60),
    'EfficientNetB4': (75, 82.90),
    'NASNetLarge': (343, 82.50),
    'EfficientNetB5': (118, 83.60),
    'EfficientNetB6': (166, 84.00),
    'EfficientNetB7': (256, 84.30),
    'EfficientNetV2B0': (29, 78.70),
    'EfficientNetV2B1': (34, 79.80),
    'EfficientNetV2B2': (42, 80.50),
    'EfficientNetV2B3': (59, 82.00),
    'EfficientNetV2S': (88, 83.90),
    'EfficientNetV2M': (220, 85.30),
    'EfficientNetV2L': (479, 85.70),
}

### Pipeline for gathering information about underlying models in Keras Applications

In [None]:
KERAS_BASE_MODELS_PROCESSING_PIPELINE = {
    'name': 'keras_base_models_processing',
    'description': 'A pipeline for collecting information about underlying models in Keras Applications',
    'report_csv': 'pipeline_base_models_processing.csv',
    'stages': [
        {
            'name': 'sizes_retrieving',
            'description': 'Getting information about the sizes of input images and feature vectors',
            'platform': 'colab', # Runs in Google Colab
            'params': {
               'result_csv': 'base_model_sizes.csv', # Path to the file with the selected models
            }
        },
        {
            'name': 'inference_time_measuring',
            'description': 'Measuring the inference time of models',
            'platform': 'colab', # Runs in Google Colab
            'params': {
                'batch_size': 1, # Batch size
                'batches': 1, # Number of batches in the dataset
                'repetitions': 100, # Number of repetitions
                'result_csv': 'model_inference_times.csv', # Path to the file with the selected models
            }
        },
        {
            'name': 'base_model_selection',
            'description': 'Selecting a base model',
            'platform': 'colab', # Runs in Google Colab
            'params': {
                'inference_time_weight': INFERENCE_TIME_WEIGHT, # Inference time weight when selecting a base model
                'top1_accuracy_weight': 1 - INFERENCE_TIME_WEIGHT, # Accuracy weight when selecting a base model
                'process_csv': 'base_model_selection.csv', # Path to file with base model selection process data
                'result_csv': 'base_model.csv', # Path to the file with the description of the selected base model
            }
        },
    ]
}

### Image Preprocessing Pipeline

In [None]:
IMAGE_PREPROCESSING_PIPELINE = {
    'name': 'image_preprocessing',
    'description': 'Image Preprocessing Pipeline',
    'report_csv': 'pipeline_images_preprocessing.csv',
    'stages':
    [
        {
            'name': 'train_face_extraction',
            'description': 'Extracting face images from training dataset',
            'platform': 'colab', # Runs on the local computer
            'params': {
                'path': 'train_faces', # Path to the training dataset folder with face images
                'classifier': FACE_DETECTOR_CLASSIFIER, # Trained XML classifier
                'scale_factor': FACE_DETECTOR_SCALE_FACTOR, # Specify how much the image size is reduced at each image scale
                'min_neighbors': FACE_DETECTOR_MIN_NEIGHBORS, # Specify how many neighbors each candidate rectangle should have to retain it
                'flags': FACE_DETECTOR_FLAGS, # Flags
                'face_min_ratio': FACE_DETECTOR_MIN_RATIO, # Specify the minimum size of a face that we are expecting to detect
                'process_csv': 'train_face_extraction_process.csv', # Path to file with detailed information
                'result_csv': 'train_face_extraction.csv', # Path to the results file
            },
        },
        {
            'name': 'test_face_extraction',
            'description': 'Extracting face images from test dataset',
            'platform': 'colab', # Runs on the local computer
            'params': {
                'path': 'test_faces', # Path to the training dataset folder with face images
                'classifier': FACE_DETECTOR_CLASSIFIER, # Trained XML classifier
                'scale_factor': FACE_DETECTOR_SCALE_FACTOR, # Specify how much the image size is reduced at each image scale
                'min_neighbors': FACE_DETECTOR_MIN_NEIGHBORS, # Specify how many neighbors each candidate rectangle should have to retain it
                'flags': FACE_DETECTOR_FLAGS, # Flags
                'face_min_ratio': FACE_DETECTOR_MIN_RATIO, # Specify the minimum size of a face that we are expecting to detect
                'process_csv': 'test_face_extraction_process.csv', # Path to file with detailed information
                'result_csv': 'test_face_extraction.csv', # Path to the results file
            },
        },
        {
            'name': 'train_face_feature_extraction',
            'description': 'Extracting features from training fadataset',
            'platform': 'colab', # Runs in Google Colab
            'params': {
                'path': 'train_features', # Path to the folder with the batch files of extracted features
                'batch_size': 64, # Batch size
                'buffer_size': 10, # Buffer size
            }
        },
        {
            'name': 'test_face_feature_extraction',
            'description': 'Extracting features from a test dataset',
            'platform': 'colab', # Runs in Google Colab
            'params': {
                'path': 'test_features', # Path to the folder with batch files of extracted features
                'batch_size': 64, # Batch size
                'buffer_size': 10, # Buffer size
            }
        },
        {
            'name': 'train_cleaning',
            'description': 'Additional cleaning of the training dataset',
            'platform': 'colab', # Runs on the local computer
            'params': {
                'features_path': 'train_clean_features', # Path to the training dataset folder with face images
                'dataset_path': 'train_clean_faces', # Path to the folder of the cleaned training dataset
                'process_csv': 'train_cleaning_process.csv', # Path to file with detailed information
                'result_csv': 'train_cleaning.csv', # Name of the file with results
            },
        },
    ]
}

### Model creation pipeline

In [None]:
MODEL_BUILDING_PIPELINE = {
    'name': 'model_building',
    'description': 'Model creation pipeline',
    'report_csv': 'pipeline_model_building.csv',
    'stages': [
        {
            'name': 'model_on_top_selection',
            'description': 'Selecting the best model on top',
            'platform': 'colab', # Runs on the local computer
            'params': {
                'path': 'model_on_top_selection', # Path to the folder with logs and weights of the model on top
                'batch_size': 64, # Batch size
                'optimizer_name': OPTIMIZER, # Optimizer,
                'initial_learning_rate': MODEL_ON_TOP_INITIAL_LEARNING_RATE, # Initial learning rate
                'learning_rate_decay_rate': MODEL_ON_TOP_LEARNING_RATE_DECAY_RATE, # Learning rate decay rate
                'epochs': 20, # Number of epochs when measuring inference time
                'patience': 3, # Max epochs without accuracy improvement
                'process_csv': 'model_on_top_selection.csv', # Path with model training results
                'result_csv': 'selected_model_on_top.csv', # Path to file with description of selected base model
            }
        },
        {
            'name': 'model_fine_tuning',
            'description': 'Fine-tuning the model',
            'platform': 'colab', # Runs in Google Colab
            'params': {
                'path': 'model_fine_tuning', # Path to the folder with logs and weights of the model on top
                'flip': RANDOM_FLIP, # Randomly flip the image
                'rotation_factor': RANDOM_ROTATION_FACTOR, # Random rotation factor (counter-clockwise or clockwise) of the image during augmentation, fraction of 360°
                'zoom_factor': RANDOM_ZOOM, # Factor of random zooming in or out of the image during augmentation
                'contrast_factor': RANDOM_CONTRACT_FACTOR, # Factor for randomly changing image contrast
                'brightness_factor': RANDOM_BRIGHTNESS_FACTOR, # Factor for randomly changing the brightness of an image
                'batch_size': 32, # Batch size
                'buffer_size': 100, # Buffer size
                'optimizer_name': OPTIMIZER, # Optimizer
                'initial_learning_rate': MODEL_INITIAL_LEARNING_RATE, # Initial learning rate
                'learning_rate_decay_rate': MODEL_LEARNING_RATE_DECAY_RATE, # Learning rate decay rate
                'epochs': 50, # Number of epochs to learn
                'epochs_per_run': 10, # Number of training epochs per run
                'patience': 3, # Max epochs without accuracy improvement
                'process_csv': 'model_fine_tuning.csv', # Path to file with model fine tuning process data
                'result_csv': 'tuned_model.csv', # Path to file with resulting model validation
            }
        },
        {
            'name': 'model_test',
            'description': 'Testing the model',
            'platform': 'colab', # Runs in Google Colab
            'params': {
                'path': 'model_test',
                'batch_size': 32, # Batch size
                'buffer_size': 100, # Buffer size
                'process_csv': 'test_prediction.csv', # Path to file with prediction data
                'result_csv': 'test_scoring.csv', # Path to file with resulting model score
            }
        },
    ]
}

## Preparation

### Determining the platform on which the notebook is running (local computer or Google Colab)

In [None]:
if 'google.colab' in str(get_ipython()):
    platform = 'colab'
    print('Notebook is running on Google Colab.')
else:
    platform = 'local'
    print(f'Notebook is running locally.')

Notebook is running on Google Colab.


### Installing and loading the necessary libraries

#### Installing required libraries if they are missing

In [None]:
from importlib.util import find_spec

# List of packages
packages = [
    'validators',
    'ipyparallel',
    'tqdm',
    'numpy',
    'pandas',
    'gdown',
    'matplotlib',
    ('scikit-learn', 'sklearn'),
    'kaggle',
    ('opencv-python', 'cv2')
]
if platform == 'local':
    packages.append(('tensorflow-cpu', 'tensorflow'))
    packages.append('ipywidgets')
else:
    packages.append(('tensorflow-gpu', 'tensorflow'))

# Installing packages
for package in packages:
    if isinstance(package, str):
        space = package
    else:
        package, space = package
    if not find_spec(space):
        print(f'Installing {package}...')
        !pip install {package}

#### Importing required libraries

In [None]:
from typing import Optional, Union, Tuple, List, Dict
import inspect
import itertools
import validators
from time import sleep
from datetime import datetime
from timeit import timeit
from pathlib import Path
import shutil
import gdown
import ipyparallel as ipp
from tqdm.notebook import tqdm, trange
from matplotlib import pyplot as plt
import numpy as np
import pandas as pd
import cv2 as cv
from sklearn.metrics.pairwise import cosine_similarity
from copy import deepcopy
from PIL import Image, ImageOps
import tensorflow as tf
tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.ERROR)
from tensorflow.keras import models, layers, activations, optimizers, metrics, losses, callbacks, utils, applications, initializers
if platform == 'colab':
    from psutil import virtual_memory
    from google.colab import output
    output.enable_custom_widget_manager()

#### Importing required extensions

In [None]:
%load_ext tensorboard

### Connecting Google Drive when running in Google Colab

In [None]:
if platform == 'colab':
    from google.colab import drive
    drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


### Checking basic settings

In [None]:
assert isinstance(EMOTIONS, (list, tuple)) or isinstance(EMOTIONS, dict), 'Emotions must be a list or tuple of emotion names, or a dictionary whose keys are emotion names and whose values ​​are value-arousal pairs.'
assert len(EMOTIONS) > 1, 'The number of emotions must be greater than 1.'
assert all([isinstance(emotion, str) for emotion in EMOTIONS]), 'Emotion names must be strings.'
if isinstance(EMOTIONS, dict):
    assert all(isinstance(value, (list, tuple)) for value in EMOTIONS.values()), 'Values ​​value-arousal must be specified as a list or tuple of numbers.'
    assert all((isinstance(x, (int, float)) for x in value) for value in EMOTIONS.values()), 'Values ​​of value-arousal must be numbers.'
    assert all(len(value)==2 for value in EMOTIONS.values()), 'There must be two elements in the list or tuple of values ​​value-arousal.'

In [None]:
assert isinstance(PROJECT_NAME, str), 'Project name must be a string.'
assert PROJECT_NAME != '', 'Project name cannot be an empty string.'

In [None]:
if platform == 'colab':
    PROJ_PATH = COLAB_PROJ_PATH
else:
    PROJ_PATH = LOCAL_PROJ_PATH
assert isinstance(PROJ_PATH, str), 'The project folder path must be a string.'
proj_path = Path(PROJ_PATH)
assert Path(proj_path).parent.exists(), 'There is no folder to place the project folder.'

In [None]:
if platform == 'colab':
    GD_PROJ_PATH = COLAB_GD_PROJ_PATH
else:
    GD_PROJ_PATH = LOCAL_GD_PROJ_PATH
assert isinstance(GD_PROJ_PATH, str), 'The path to the project folder on Google Drive must be a string.'
gd_proj_path = Path(GD_PROJ_PATH)
assert Path(gd_proj_path).parent.exists(), 'There is no folder on Google Drive to contain the project folder.'

In [None]:
assert isinstance(TRAIN_DATASET_URL, str), 'The training dataset archive URL must be a string.'
assert validators.url(TRAIN_DATASET_URL), 'The training dataset archive link is in an invalid format.'

In [None]:
assert TRAIN_DATASET_EXT in ("zip", "tar", "tar.gz", "tgz", "tar.bz2", "tbz"), \
'The archive file of the original training dataset must have any of the following extensions: *.zip, *.tar, *.tar.gz, *.tgz, *.tar.bz2, *.tbz'

In [None]:
assert isinstance(TRAIN_DATASET_PATH, str), 'The path to the original training dataset inside the project folder must be a string.'
try:
    Path(TRAIN_DATASET_PATH).exists()
    valid = True
except OSError as e:
    valid = False
assert valid, 'Syntax error in path to source training dataset inside project folder.'

In [None]:
assert isinstance(TEST_DATASET_URL, str), 'The link to the test dataset archive must be specified as a string.'
assert validators.url(TEST_DATASET_URL), 'The link to the test dataset archive has an invalid format.'

In [None]:
assert TEST_DATASET_EXT in ("zip", "tar", "tar.gz", "tgz", "tar.bz2", "tbz"), \
'The archive file of the original test dataset must have any of the following extensions: *.zip, *.tar, *.tar.gz, *.tgz, *.tar.bz2, *.tbz'

In [None]:
assert isinstance(TEST_DATASET_PATH, str), 'The path to the original test dataset inside the project folder must be a string.'
try:
    Path(TEST_DATASET_PATH).exists()
    valid = True
except OSError as e:
    valid = False
assert valid, 'Syntax error in path to source test dataset inside project folder.'

In [None]:
assert isinstance(KAGGLE_API_TOKEN_URL, str), 'The token URL for connecting to the Kaggle platform via API must be specified as a string.'
assert validators.url(KAGGLE_API_TOKEN_URL), 'The token link for connecting to the Kaggle platform via API is not in the correct format.'

In [None]:
assert isinstance(MAX_INFERENCE_TIME, (int, float)), 'The maximum allowed model inference time in seconds must be a floating point number.'
assert MAX_INFERENCE_TIME > 0., 'The maximum allowed model inference time in seconds must be a positive floating point number.'

In [None]:
assert isinstance(INFERENCE_TIME_WEIGHT, (int, float)), 'The inference time weight of a model when selecting a base model must be a number.'
assert INFERENCE_TIME_WEIGHT >= 0.0 and INFERENCE_TIME_WEIGHT <= 1.0, 'The inference time weight when selecting the base model must be in the range [0., 1.].'

In [None]:
assert isinstance(BASE_MODEL_MAX_SIZE, (int, float)), 'The maximum allowed base model size in MB must be a number.'
assert BASE_MODEL_MAX_SIZE > 0.0, 'The maximum allowed base model size in MB must be a positive number.'

In [None]:
assert BASE_MODEL_POOLINGS in ('avg', 'max'), 'The pooling type at the base model output must be either "avg" (average) or "max" (max).'

In [None]:
assert isinstance(MODEL_ON_TOP_DENSE_NUMS, (list, tuple)), 'Options for the number of additional fully connected layers must be specified as a list or tuple.'
assert all(isinstance(num, int) for num in MODEL_ON_TOP_DENSE_NUMS), 'The number of additional fully connected layers must be an integer.'
assert all(num > 0 for num in MODEL_ON_TOP_DENSE_NUMS), 'The number of additional fully connected layers must be a positive number.'
assert len(MODEL_ON_TOP_DENSE_NUMS) >= 1, 'At least one option for the number of additional fully connected layers must be specified.'

In [None]:
assert isinstance(MODEL_ON_TOP_DENSE_UNITS, (list, tuple)), 'The options for the number of output neurons in the additional fully connected layer must be specified by a list or a tuple.'
assert all(isinstance(num, int) for num in MODEL_ON_TOP_DENSE_UNITS), 'The number of output neurons in the additional fully connected layer must be an integer.'
assert all(num > 0 for num in MODEL_ON_TOP_DENSE_UNITS), 'The number of output neurons in the additional fully connected layer must be a positive number.'
assert len(MODEL_ON_TOP_DENSE_UNITS) >= 1, 'At least one option for the number of output neurons in the additional fully connected layer must be specified.'

In [None]:
assert isinstance(MODEL_ON_TOP_DROPOUT_RATES, (list, tuple)), 'The rates of data to drop out before feeding into the fully connected layer during training must be specified as a list or tuple.'
assert all(isinstance(num, float) for num in MODEL_ON_TOP_DROPOUT_RATES), 'The dropout rates before feeding into the fully connected layer during training must be an integer.'
assert all((num >= 0. and num < 1.0) for num in MODEL_ON_TOP_DROPOUT_RATES), 'The rates of data dropped before feeding into the fully connected layer during training must be a positive number.'
assert len(MODEL_ON_TOP_DROPOUT_RATES) >= 1, 'At least one dropout rate must be specified before feeding into the fully connected layer during training.'

In [None]:
assert isinstance(MODEL_ON_TOP_INITIAL_LEARNING_RATE, float), 'The initial learning rate of the model on top must be a number.'
assert MODEL_ON_TOP_INITIAL_LEARNING_RATE > 0.0, 'The inference time weight when selecting the base model must be a positive number.'

In [None]:
assert isinstance(MODEL_ON_TOP_LEARNING_RATE_DECAY_RATE, float), 'The rate at which the model on top\'s learning rate changes after each epoch must be a number.'
assert MODEL_ON_TOP_LEARNING_RATE_DECAY_RATE > 0.0, 'The rate at which the learning rate of the model on top changes after each epoch must be a positive number.'

In [None]:
assert isinstance(MODEL_INITIAL_LEARNING_RATE, float), 'The initial learning rate of the model during fine-tuning must be a number.'
assert MODEL_INITIAL_LEARNING_RATE > 0.0, 'The initial learning rate of the model during fine-tuning must be a positive number.'

In [None]:
assert isinstance(MODEL_LEARNING_RATE_DECAY_RATE, float), 'The rate by which the model\'s learning rate changes after each epoch during fine-tuning must be a number.'
assert MODEL_LEARNING_RATE_DECAY_RATE > 0.0, 'The rate at which the model\'s learning rate changes after each epoch during fine-tuning must be a positive number.'

In [None]:
assert hasattr(optimizers, OPTIMIZER), f'Optimizer {OPTIMIZER} is not present in the Keras library.'

In [None]:
assert VERBOSE in (0, 1), f'The verbosity mode {VERBOSE} is specified incorrectly. The value must be 0 (quiet) or 1 (message output).'

In [None]:
assert isinstance(SEED, int) or SEED is None, 'The random number generator initializer must be an integer or None.'

### Preparing for the first launch

#### Creating a project folder

In [None]:
if not proj_path.exists():
    proj_path.mkdir()

#### Creating a Project Folder on Google Drive

In [None]:
if not gd_proj_path.exists():
    gd_proj_path.mkdir()

#### Copying kaggle token

In [None]:
if platform == 'colab':
    kaggle_path = Path('/root/.kaggle/kaggle.json')
else:
    kaggle_path = Path('.kaggle/kaggle.json')
if not kaggle_path.parent.exists():
    kaggle_path.parent.mkdir()
if not kaggle_path.exists():
    gdown.download(KAGGLE_API_TOKEN_URL, kaggle_path.as_posix(), fuzzy=True)

### Setting the project folder as the working directory

In [None]:
%cd {proj_path}

/content/skillbox-computer-vision-project


### Displaying information about the dedicated GPU and available virtual memory in Google Colab

In [None]:
if platform == 'colab':
    gpu_info = !nvidia-smi
    gpu_info = '\n'.join(gpu_info)
    if gpu_info.find('failed') >= 0:
        print('Not connected to a GPU')
    else:
        print(gpu_info)
    ram_gb = virtual_memory().total / 1.024e9
    print(f'\nYour runtime has {ram_gb:.1f} gigabytes of available RAM')

Sun Apr  6 15:18:53 2025       
+-----------------------------------------------------------------------------------------+
| NVIDIA-SMI 550.54.15              Driver Version: 550.54.15      CUDA Version: 12.4     |
|-----------------------------------------+------------------------+----------------------+
| GPU  Name                 Persistence-M | Bus-Id          Disp.A | Volatile Uncorr. ECC |
| Fan  Temp   Perf          Pwr:Usage/Cap |           Memory-Usage | GPU-Util  Compute M. |
|                                         |                        |               MIG M. |
|   0  Tesla T4                       Off |   00000000:00:04.0 Off |                    0 |
| N/A   54C    P8              9W /   70W |       0MiB /  15360MiB |      0%      Default |
|                                         |                        |                  N/A |
+-----------------------------------------+------------------------+----------------------+
                                                

## Helper functions and classes

### Building and training models

#### Building an augmentation model

In [None]:
def build_augment_model(image_size: int,
                        flip: Optional[str]=None,
                        rotation_factor: Optional[float]=None,
                        zoom_factor: Optional[float]=None,
                        contrast_factor: Optional[float]=None,
                        brightness_factor: Optional[float]=None,
                        training: bool=False,
                        seed=SEED) -> models.Model:
    '''Creates an augmentation model of a square input image. Augmentation is achieved by randomly mirroring, rotating, scaling, changing the contrast and brightness of the original image.
    The model works only in inference mode and only during training, provided that the training flag is set.

    Arguments:
    - image_size: image size,
    - flip: mirroring type: 'horizonal', 'vertical', if None, mirroring is not performed.
    - rotation_factor: the value of the rotation angle (clockwise or counterclockwise) of the image in fractions of a full rotation; if None, then no rotation is performed.
    - zoom_factor: the maximum change (increase or decrease) of the image in fractions of the original size; if None, the scale does not change.
    - contrast_factor: the maximum change (increase or decrease) in image contrast as a fraction of the original contrast; if None, the contrast does not change.
    - brightness_factor: the value of the maximum change (increase or decrease) in the image brightness as a fraction of the original brightness; if None, the brightness does not change.
    - training: if False, the model does not work, if True, the model works during training.
    - seed: random number generator initializer.'''
    i = layers.Input(shape=(image_size, image_size, 3), name='original_image_input')
    x = i
    if flip is not None:
        x = layers.RandomFlip(flip, seed=seed, name='random_flip')(x, training=training)
    if rotation_factor is not None:
        x = layers.RandomRotation(rotation_factor, seed=seed, name='random_rotation')(x, training=training)
    if zoom_factor is not None:
        x = layers.RandomZoom(zoom_factor, seed=seed, name='random_zoom')(x, training=training)
    if contrast_factor is not None:
        x = layers.RandomContrast(contrast_factor, seed=seed, name='random_contrast')(x, training=training)
    if brightness_factor is not None:
        x = layers.RandomBrightness(brightness_factor, seed=seed, name='random_brightness')(x, training=training)
    o = x
    model = models.Model(inputs=[i], outputs=[o], name='augmentation_model')
    return model

#### Building a Basic Model

In [None]:
def build_base_model(name: str,
                     weights: Optional[str]='imagenet',
                     image_size: Optional[int]=None,
                     pooling: str='avg',
                     include_preprocess_input:bool=False,
                     training: bool=False) -> models.Model:
    '''Creates a basic model from the Keras Applications library. The model processes a square-shaped image.

    Arguments:
    - name: name of the base model.
    - weights: initial weights of the model: None (random), 'imagenet' (obtained during training on the ImageNet dataset) or path to a file with weights.
    - image_size: input image size.
    - pooling: output layer pooling type: 'avg' (average) or 'max' (max).
    - include_preprocess_input: Add image preprocessing layers.
    - training: using the model for training.'''

    # We specify the input image shape only if the input image size is specified
    input_shape = (image_size, image_size, 3)

    # Create the core of the basic model
    bild_core = getattr(applications, name)
    core = bild_core(include_top=False, weights=weights, input_shape=input_shape, pooling=pooling)

    # Create an input layer for the image as an array of uint8
    i = layers.Input(input_shape, name=f'image_input')

    # Add preprocessing
    if include_preprocess_input:
        app_module = inspect.getmodule(bild_core)
        build_preprocess_input = getattr(app_module, 'preprocess_input')
        x = build_preprocess_input(i)
    else:
        x = i

    # Add a base model
    o = core(x, training=training)

    # We combine everything into one model
    model = models.Model(inputs=[i], outputs=[o], name=core.name + ('_with' if include_preprocess_input else '_without') + '_preprocessing')

    # Return the received model
    return model

#### Building the model on top

In [None]:
def build_model_on_top(
    feature_size: int,
    config: Union[List[Tuple[float, int]], Tuple[Tuple[float, int]]],
    emotions: Union[Union[List[str], Tuple[str]], Dict[str, Tuple[float, float]]]=EMOTIONS,
    training: bool=False,
    seed: int=SEED,
) -> models.Model:
    '''Creates a model from fully connected layers. The 'Dropout' method is used for regularization during training.
    If the model predicts emotions, the number of output neurons in the output corresponds to the number of emotions and the activation type 'Softmax' is applied.
    If the model predicts valance-arousal of emotions, then the output layer has only 2 output neurons and the 'ReLU' activation type is used with the output values ​​limited to 2.
    To bring the output valance-arousal values ​​closer, a final layer is added, ensuring that the valance-arousal values ​​are reduced by 1.

    Arguments:
    - feature_size: feature size.
    - config: configuration of regularization layers and fully connected layers (except output) in the format: [(regularization coefficient, fully connected layer size), ...]
    - emotions: description of predicted emotions.
    - training: for training.
    - seed: initializer for the random number generator of regularization layers.'''

    # Create the input layer of the model
    i = layers.Input(shape=(feature_size,), name='feature_input')
    x = i

    # Initialize the kernel
    initializer = initializers.GlorotUniform(seed=seed)

    # Add fully connected layers according to the configuration
    for index, (dropout_rate, dense_units) in enumerate(config):
        if dropout_rate > 0.:
            x = layers.Dropout(dropout_rate, seed=seed, name=f'dropout_{index}')(x, training=training)
        if dense_units > 0:
            x = layers.Dense(dense_units, kernel_initializer=initializer, activation="relu", name=f'dense_{index}')(x, training=training)

    # Add output fully connected layer
    if isinstance(emotions, (list, tuple)):
        # In the usual classification
        o = layers.Dense(len(emotions), kernel_initializer=initializer, activation="softmax", name='probs')(x, training=training)
    else:
        # When using the "valence-arousal" emotion decomposition
        x = layers.Dense(2, kernel_initializer=initializer, name='dense_valence_arousal')(x, training=training)
        x = layers.ReLU(max_value=2.0)(x)
        o = layers.Lambda(lambda x: x - 1.0)(x)

    # Create a model
    model = models.Model(inputs=i, outputs=o, name='model_on_top')

    return model

#### Building a model

In [None]:
def build_model(
    augment_model: Optional[models.Model]=None,
    base_model: Optional[models.Model]=None,
    model_on_top: Optional[models.Model]=None,
) -> models.Model:
    '''Creates an emotion prediction model from three models, sequentially connected: an image augmentation model, a base model, and a model on top.

    Arguments:
    - augment_model: image augmentation model (see function build_augment_model).
    - base_model: base model (see function build_base_model).
    - model_on_top: model on top (see function build_on_top_model).
    '''
    models_list = []
    if augment_model is not None:
        models_list.append(augment_model)
    if base_model is not None:
        models_list.append(base_model)
    if model_on_top is not None:
        models_list.append(model_on_top)
    if len(models_list) == 0:
        return
    elif len(models_list) == 1:
        model = models_list[0]
    else:
        model = models.Sequential(models_list)
    return model

#### Stop training

In [None]:
class EarlyStoppingAtBestMetric(callbacks.Callback):
    """Stops training an emotion learning model when the model's loss on validation data stops decreasing."""

    def __init__(
        self,
        model: models.Model,
        metric: str,
        mode: str,
        patience: int=0,
        restore_best_weights: bool=False,
        verbose: int=VERBOSE,
        best_epoch: int=-1,
        best_loss: float=0.,
        best_metric: float=0.,
        best_weights: Optional[List[np.array]]=None,
        wait: int=0
    ):
        '''Arguments:

        - model: trainable model
        - metric: target metric name
        - monitor: "max" - monitor metric increasing, "min" - monitor metric desccreasing
        - patience: the number of epochs during which the score does not improve, then training stops.
        - verboes: verbosity mode: 0 (quiet) or 1 (message output).
        - best_epoch: the number of the training epoch with the best score (at the end of the previous iteration).
        - best_loss: best loss on training dataset (at the end of the previous iteration).
        - best_metric: best metric on training dataset (at the end of the previous iteration).
        - best_weights: weights at the end of the epoch with the best model score (at the end of the previous iteration).
        - wait: the number of epochs during which the estimate did not improve (after the end of the previous iteration).'''

        super().__init__()

        self.__model = model
        self.__metric = metric
        self.__mode = mode
        self.__patience = patience
        self.__restore_best_weights = restore_best_weights
        self.__verbose = verbose
        self.__best_epoch = best_epoch
        self.__best_loss = best_loss
        self.__best_metric = best_metric
        self.__best_weights = best_weights
        self.__wait = wait

    @property
    def best_epoch(self):
        return self.__best_epoch

    @property
    def best_loss(self):
        return self.__best_loss

    @property
    def best_metric(self):
        return self.__best_metric

    @property
    def best_weights(self):
        return self.__best_weights

    def on_train_begin(self, logs=None):
        self.__stopped_epoch = -1

    def on_epoch_end(self, epoch, logs=None):
        # End of the era of learning
        current = logs[self.__metric]
        if self.__best_epoch == -1:
            new_best = True
        elif self.__mode == 'max':
            new_best = current > self.__best_metric
        elif self.__mode == 'min':
            new_best = current < self.__best_metric

        if new_best:
            # Loss descreased - we understand the value and weights of the model
            self.__best_loss = logs['loss']
            self.__best_metric = current
            self.__best_epoch = epoch
            self.__wait = 0
            self.__best_weights = self.__model.get_weights()
            if self.__verbose:
                print(f'Epoch #{self.__best_epoch + 1}: metric has been improved ({current:.4f}).')
        else:
            # Loss did not descreas - wait for a specified number of epochs, then stop training
            self.__wait += 1
            if self.__wait >= self.__patience:
                self.__stopped_epoch = epoch
                self.__model.stop_training = True

    def on_train_end(self, logs=None):
        # Stop learning
        if self.__stopped_epoch >= 0:
            if self.__verbose:
                print(f"Epoch #{self.__stopped_epoch + 1}: early stopping.")

        # Restore the scales of a better era
        if self.__restore_best_weights:
            if self.__verbose:
                print(f"Restoring model weights from the end of the best epoch (#{self.__best_epoch + 1}).")
            self.__model.set_weights(self.__best_weights)

In [None]:
class EarlyStoppingAtMinValLoss(callbacks.Callback):
    """Stops training an emotion learning model when the model's loss on validation data stops decreasing."""

    def __init__(
        self,
        model: models.Model,
        metric: str,
        patience: int=0,
        restore_best_weights: bool=False,
        verbose: int=VERBOSE,
        best_epoch: int=-1,
        best_val_loss: float=0.,
        best_val_metric: float=0.,
        best_weights: Optional[List[np.array]]=None,
        wait: int=0
    ):
        '''Arguments:

        - model: trainable model
        - metric: target metric name
        - patience: the number of epochs during which the score does not improve, then training stops.
        - verboes: verbosity mode: 0 (quiet) or 1 (message output).
        - best_epoch: the number of the training epoch with the best score (at the end of the previous iteration).
        - best_val_loss: best loss on validation dataset (at the end of the previous iteration).
        - best_val_metric: best metric on validation dataset (at the end of the previous iteration).
        - best_weights: weights at the end of the epoch with the best model score (at the end of the previous iteration).
        - wait: the number of epochs during which the estimate did not improve (after the end of the previous iteration).'''

        super().__init__()

        self.__model = model
        self.__metric = metric
        self.__patience = patience
        self.__restore_best_weights = restore_best_weights
        self.__verbose = verbose
        self.__best_epoch = best_epoch
        self.__best_val_loss = best_val_loss
        self.__best_val_metric = best_val_metric
        self.__best_weights = best_weights
        self.__wait = wait

    @property
    def best_epoch(self):
        return self.__best_epoch

    @property
    def best_val_loss(self):
        return self.__best_val_loss

    @property
    def best_val_metric(self):
        return self.__best_val_metric

    @property
    def best_weights(self):
        return self.__best_weights

    def on_train_begin(self, logs=None):
        self.__stopped_epoch = -1

    def on_epoch_end(self, epoch, logs=None):
        # End of the era of learning
        current = logs["val_loss"]
        if (current < self.__best_val_loss) or (self.__best_val_loss == 0.):
            # Loss descreased - we understand the value and weights of the model
            self.__best_val_loss = current
            self.__best_val_metric = logs[f"val_{self.__metric}"]
            self.__best_epoch = epoch
            self.__wait = 0
            self.__best_weights = self.__model.get_weights()
            if self.__verbose:
                print(f'Epoch #{self.__best_epoch + 1}: val loss has been improved ({current:.4f}).')
        else:
            # Loss did not descreas - wait for a specified number of epochs, then stop training
            self.__wait += 1
            if self.__wait >= self.__patience:
                self.__stopped_epoch = epoch
                self.__model.stop_training = True

    def on_train_end(self, logs=None):
        # Stop learning
        if self.__stopped_epoch >= 0:
            if self.__verbose:
                print(f"Epoch #{self.__stopped_epoch + 1}: early stopping.")

        # Restore the scales of a better era
        if self.__restore_best_weights:
            if self.__verbose:
                print(f"Restoring model weights from the end of the best epoch (#{self.__best_epoch + 1}).")
            self.__model.set_weights(self.__best_weights)

In [None]:
class EarlyStoppingAtMaxTestScore(callbacks.Callback):
    """Stops training an emotion learning model when the model's prediction score on the Kaggle test data stops increasing."""

    def __init__(
        self,
        model: models.Model,
        test_dataset: tf.data.Dataset,
        test_image_paths: str,
        patience: int=0,
        restore_best_weights: bool=False,
        verbose: int=VERBOSE,
        emotions: Union[Union[List[str], Tuple[str]], Dict[str, Tuple[float, float]]]=EMOTIONS,
        best_epoch: int=-1,
        best_test_score: float=0.,
        best_weights: Optional[List[np.array]]=None,
        wait: int=0
    ):
        '''Arguments:

        - model: trainable model
        - test_dataset: dataset of test images.
        - test_image_paths: list of paths to test dataset files.
        - patience: the number of epochs during which the score does not improve, then training stops.
        - verboes: verbosity mode: 0 (quiet) or 1 (message output).
        - best_epoch: the number of the training epoch with the best score (at the end of the previous iteration).
        - best_test_score: best score (at the end of the previous iteration).
        - best_weights: weights at the end of the epoch with the best model score (at the end of the previous iteration).
        - wait: the number of epochs during which the estimate did not improve (after the end of the previous iteration).'''

        super(EarlyStoppingAtMaxTestScore, self).__init__()

        self.__model = model
        self.__emotions = emotions

        self.__description = f'test_{model.name}'
        self.__file_path = f'test_{model.name}.csv'
        self.__keras = Kaggle()

        self.__test_dataset = test_dataset
        self.__test_result = pd.DataFrame(columns=['image_path', 'emotion'])
        self.__test_result['image_path'] = test_image_paths

        self.__restore_best_weights = restore_best_weights
        self.__patience = patience
        self.__verbose = verbose
        self.__best_epoch = best_epoch
        self.__best_test_score = best_test_score
        self.__best_weights = best_weights
        self.__wait = wait

    @property
    def best_epoch(self):
        return self.__best_epoch

    @property
    def best_test_score(self):
        return self.__best_test_score

    @property
    def best_weights(self):
        return self.__best_weights

    def on_train_begin(self, logs=None):
        self.__stopped_epoch = -1

    def on_epoch_end(self, epoch, logs=None):
        # End of the era of learning

        # Checking the accuracy of model predictions in Kaggle
        predicts = self.__model.predict(test_dataset, verbose=self.__verbose)
        if isinstance(EMOTIONS, (list, tuple)):
            labels = predicts.argmax(axis=1).tolist()
            self.__test_result['emotion'] = [self.__emotions[label] for label in labels]
        else:
            dists = np.apply_along_axis(lambda a: np.linalg.norm(a - np.array(list(self.__emotions.values())), axis=1), arr=predicts, axis=1)
            labels = dists.argmin(axis=1).tolist()
            self.__test_result['emotion'] = [list(self.__emotions)[label] for label in labels]
        self.__test_result.to_csv(self.__file_path, index=False)
        self.__keras.send_submission_files(descriptions=[self.__description], file_paths=[self.__file_path])
        Path(self.__file_path).unlink()
        test_scores = self.__keras.receive_submission_scores(descriptions=[self.__description]).loc[0, ['publicScore', 'privateScore']]
        current = (test_scores['publicScore'] + test_scores['privateScore']) / 2
        logs['test_public_score'] = test_scores['publicScore']
        logs['test_private_score'] = test_scores['privateScore']
        logs['test_score'] = current

        if current > self.__best_test_score:
            # Accuracy has improved - we understand the value and weights of the model
            self.__best_test_score = current
            self.__best_epoch = epoch
            self.__wait = 0
            self.__best_weights = self.__model.get_weights()
            if self.__verbose:
                print(f'Epoch #{self.__best_epoch + 1}: accuracy has been improved ({current:.4f}).')
        else:
            # Accuracy has not improved - wait for a specified number of epochs, then stop training
            self.__wait += 1
            if self.__wait >= self.__patience:
                self.__stopped_epoch = epoch
                self.__model.stop_training = True

    def on_train_end(self, logs=None):
        # Stop learning
        if self.__stopped_epoch >= 0:
            if self.__verbose:
                print(f"Epoch #{self.__stopped_epoch + 1}: early stopping.")

        # Restore the scales of a better era
        if self.__restore_best_weights:
            if self.__verbose:
                print(f"Restoring model weights from the end of the best epoch (#{self.__best_epoch + 1}).")
            self.__model.set_weights(self.__best_weights)

#### Exponential Decay of Learning Rate

In [None]:
class LearningRateExpDecayScheduler(callbacks.LearningRateScheduler):
    '''Exponential Decay of Learning Rate.'''
    def __init__(self, decay_rate: float=1., verbose: int=VERBOSE):
        '''Arguments:
        - decay_rate - decay rate [0.0, 1.0].
        - verbose - verbose mode: 0-quiet, 1-output messages about changes in learning speed.'''
        self.__decay_rate = decay_rate
        super(LearningRateExpDecayScheduler, self).__init__(self.__scheduler)

    def __scheduler(self, epoch, lr) -> float:
        return lr * self.__decay_rate

#### Model for implementation

In [None]:
class FaceEmotionRecognitionNet():

    def __init__(self, file_path: str, emotions: Union[Union[List[str], Tuple[str]], Dict[str, Tuple[float, float]]]=EMOTIONS):
        '''
        file_path: path to the saved model file.
        emotions: predicted emotions.
        '''
        # Loading the model
        self.__model = models.load_model(filepath=file_path, compile=False, safe_mode=False)
        self.__emotions = emotions

    def predict(self, face_image: np.array) -> Union[Tuple[str, float], Tuple[str, float, float, float]]:
        '''Predicting a person's emotion based on their facial image.

        Arguments:
        - face_image: image of a person's face.
        '''
        image = Image.fromarray(face_image)
        size = max(image.width, image.height)
        # Make the image square
        padded_image = ImageOps.pad(image, (size, size))
        # Adjust the image size
        resized_image = padded_image.resize(self.__model.input_shape[1:3])
        # We get a prediction
        tensor = np.asarray(resized_image)[None, ...]
        predicts = self.__model.predict(tensor, verbose=0)[0]
        # Preparing the resulting data
        if isinstance(self.__emotions, (list, tuple)):
            probability = predicts.max()
            label = predicts.argmax()
            emotion = self.__emotions[label]
        else:
            valence, arousal = predicts
            dists = np.apply_along_axis(lambda a: np.linalg.norm(a - np.array(list(self.__emotions.values())), axis=1), arr=predicts[None, ...], axis=1)
            error = dists.min()
            label = dists.argmin()
            emotion = list(self.__emotions.keys())[label]
        # Return the result
        if isinstance(self.__emotions, (list, tuple)):
            return emotion, probability
        else:
            return emotion, error, valence, arousal

### Dataset

#### Function of face extraction from a batch of images with saving to files

#### Function for creating a feature dataset

In [None]:
def build_feature_dataset(
    file_path: str,
    emotions: Union[Union[List[str], Tuple[str]], Dict[str, Tuple[float, float]]]=EMOTIONS,
    labeled: bool=True,
    batch_size: int=1,
    shuffle: bool=True,
    reshuffle_each_iteration: bool=True,
    seed: int=SEED,
    validation_split: Optional[float]=None,
    test_split: Optional[float]=None
) -> Union[
    tf.data.Dataset,
    Tuple[tf.data.Dataset, tf.data.Dataset],
    Tuple[tf.data.Dataset, tf.data.Dataset, tf.data.Dataset]
]:
    '''Loads a dataset of image features from an array file.
    Returns a dataset split into training, validation (optional) and test (optional) parts.
    Optionally, data can be moved randomly.
    The dataset contains markup by default, but it can be optionally excluded from it.
    Dataset(s) are divided into batches of a given size.

    Arguments:
    - file_path: path to the data file.
    - emotions: description of emotions in the dataset.
    - batch_size: batch size.
    - shuffle: shuffle data.
    - reshuffle_each_iteration: reshuffle data at each iteration.
    - seed: initializer for random number generator during shuffling.'''

    # Create training and validation datasets
    with np.load(file_path, allow_pickle=True) as data:
        if not labeled:
            dataset = tf.data.Dataset.from_tensor_slices(data['features'])
        elif isinstance(EMOTIONS, (list, tuple)):
            dataset = tf.data.Dataset.from_tensor_slices((data['features'], data['labels']))
        else:
            labels = np.apply_along_axis(lambda label: list(EMOTIONS.values())[int(label)], axis=1, arr=data['labels'])
            dataset = tf.data.Dataset.from_tensor_slices((data['features'], labels))
    size = len(dataset)

    # Stirring
    if shuffle:
        dataset = dataset.shuffle(size, seed, reshuffle_each_iteration=reshuffle_each_iteration)

    # Returning the full dataset
    if validation_split is None and test_split is None:
        return dataset.batch(batch_size)

    # Return the dataset, divided into training and testing parts
    if test_split is None:
        train_size = int(size * (1-validation_split))
        val_size = size - train_size
        train_dataset = dataset.take(train_size).batch(batch_size)
        val_dataset = dataset.skip(train_size).take(val_size).batch(batch_size)
        return train_dataset, val_dataset

    # Return the dataset, divided into training, validation and test parts
    train_size = int(size * (1-validation_split-test_split))
    val_size = int(size * validation_split)
    test_size = size - train_size - val_size
    train_dataset = dataset.take(train_size).batch(batch_size)
    val_dataset = dataset.skip(train_size).take(val_size).batch(batch_size)
    test_dataset = dataset.skip(train_size + val_size).take(test_size).batch(batch_size)
    return train_dataset, val_dataset, test_dataset

### Pipeline

In [None]:
class Pipeline():
    '''Model creation pipeline.'''

    def __init__(self, config: dict, proj_path: Path, is_prev_complete: bool, platform: str):
        '''Arguments:
        - config[dict]: stage configuration.
        - proj_path[Path]: path to the project folder.
        - is_prev_complete[bool]: Is the previous step completed?
        - platform[str]: platform on which the pipeline is executed: 'colab' (Goggle Colab), 'local' (local).
        '''
        self.__name = config['name']
        self.__description = config['description']
        self.__stages = (stage for stage in config['stages'])
        self.__path = proj_path / config['name']
        if not self.__path.exists():
            self.__path.mkdir()
        self.__report_path = self.__path / config['report_csv']
        if self.__report_path.exists():
            self.__report = pd.read_csv(self.__report_path, index_col='stage')
        else:
            self.__report = pd.DataFrame(
                columns = [
                    'stage',
                    'params',
                    'platform',
                    'start_time',
                    'update_time',
                    'state',
                ]
            )
            self.__report['stage'] = [stage['name'] for stage in config['stages']]
            self.__report.set_index('stage', inplace=True)
        self.__is_prev_complete = is_prev_complete
        self.__platform = platform
        self.__stage = None

    def next_stage(self):
        '''Returns the next stage of the pipeline.'''
        self.__stage = next(self.__stages)

        # If the stage has already been completed, then we skip its execution
        if self.__report.loc[self.__stage['name'], 'state'] == 'complete':
            return

        # Stage not yet completed
        self.__report.loc[self.__stage['name'], 'platform'] = self.__platform
        self.__report.loc[self.__stage['name'], 'params'] = str(self.__stage['params'])

        # If the previous pipeline has not yet been fully executed, or all previous stages of this pipeline have not yet been executed
        # then we skip the stage
        if not self.__is_prev_complete or (self.__report.iloc[:self.__report.index.get_loc(self.__stage['name'])]['state'] != 'complete').any():
            if self.__report.loc[self.__stage['name'], 'state'] != 'skipped (not ready)':
                self.__report.loc[self.__stage['name'], 'update_time'] = datetime.now()
                self.__report.loc[self.__stage['name'], 'state'] = 'skipped (not ready)'
            return

        # If the runtime does not match the required one, then skip the stage
        if self.__platform != self.__stage['platform']:
            if self.__report.loc[self.__stage['name'], 'state'] != 'skipped (platform)':
                self.__report.loc[self.__stage['name'], 'update_time'] = datetime.now()
                self.__report.loc[self.__stage['name'], 'state'] = 'skipped (platform)'
            return

        # If the stage is performed in several iterations, then we move on to the next iteration
        if self.__report.loc[self.__stage['name'], 'state'] == 'run complete':
            self.__report.loc[self.__stage['name'], 'update_time'] = datetime.now()
            self.__report.loc[self.__stage['name'], 'state'] = 'run started'
            return

        # Remember the start time of the stage execution
        self.__report.loc[self.__stage['name'], 'start_time'] = datetime.now()
        self.__report.loc[self.__stage['name'], 'update_time'] = self.__report.loc[self.__stage['name'], 'start_time']
        self.__report.loc[self.__stage['name'], 'state'] = 'started'

    @property
    def name(self) -> str:
        '''Pipeline name.'''
        return self.__name

    @property
    def description(self) -> str:
        '''Pipeline Description.'''
        return self.__description

    @property
    def report(self) -> pd.DataFrame:
        '''Pipeline Execution Report.'''
        return self.__report

    @property
    def stage_name(self) -> str:
        '''The name of the current stage.'''
        return self.__stage['name']

    @property
    def is_complete(self) -> bool:
        '''Is the current pipeline complete?'''
        return (self.__report['state'] == 'complete').all()

    @property
    def is_stage_failed(self) -> bool:
        return self.__report.loc[self.__stage['name'], 'state'] == 'failed'

    @property
    def is_stage_complete(self) -> bool:
        '''Has the current stage been completed?'''
        return self.__report.loc[self.__stage['name'], 'state'] == 'complete'

    @property
    def is_stage_started(self) -> bool:
        '''Is the current stage running?'''
        return self.__report.loc[self.__stage['name'], 'state'] == 'started'

    @property
    def is_stage_skipped(self) -> bool:
        '''Is the current stage skipped?'''
        return not self.__report.loc[self.__stage['name'], 'state'].find('started') >= 0

    @property
    def stage_params(self) -> dict:
        '''Parameters of the current stage.'''
        return self.__stage['params']

    @property
    def stage_description(self) -> str:
        '''Stage Description.'''
        return self.__stage['description']

    def __save(self):
        '''Saving a pipeline to a file.'''
        self.__report.to_csv(self.__report_path)

    def complete_stage_run(self):
        '''End of stage iteration.'''
        self.__report.loc[self.__stage['name'], 'update_time'] = datetime.now()
        self.__report.loc[self.__stage['name'], 'state'] = 'run complete'
        self.__save()

    def fail_stage(self):
        self.__report.loc[self.__stage['name'], 'update_time'] = datetime.now()
        self.__report.loc[self.__stage['name'], 'state'] = 'failed'
        self.__save()

    def complete_stage(self):
        '''End of stage.'''
        self.__report.loc[self.__stage['name'], 'update_time'] = datetime.now()
        self.__report.loc[self.__stage['name'], 'state'] = 'complete'
        self.__save()

    def save_stage_processing(self, result: pd.DataFrame):
        '''Saving the stage execution log to a file.'''
        result.to_csv(self.__path / self.__stage['params']['process_csv'])

    def load_stage_processing(self) -> pd.DataFrame:
        '''Loading a stage execution log from a file.'''
        return pd.read_csv(self.__path / self.__stage['params']['process_csv'])

    def save_stage_result(self, result: pd.DataFrame):
        '''Saving the result of the stage execution to a file.'''
        result.to_csv(self.__path / self.__stage['params']['result_csv'])

    def load_stage_result(self) -> pd.DataFrame:
        '''Loading the result of a stage execution from a file.'''
        return pd.read_csv(self.__path / self.__stage['params']['result_csv'])

### Kaggle

In [None]:
class Kaggle():
    '''Interaction with the Kaggle platform.'''

    __COLUMNS = ['fileName', 'date', 'description', 'status', 'publicScore', 'privateScore']

    def __init__(self, competition: str=PROJECT_NAME, verbose: int=VERBOSE):
        '''Initializing interaction with the Kaggle platform.

        Arguments:
        - competition: name of the competition.
        - verbose: verbose mode: 0 (silent) or 1 (message output).'''

        self.__competition = competition
        self.__verbose = verbose

    def send_submission_files(self, descriptions: Union[List[str], Tuple[str]], file_paths: Union[List[str], Tuple[str]]):
        '''Submitting solution files for review.

        Arguments:
        - descriptions: list of solution descriptions.
        - file_paths: list of paths to solution files.'''
        for file_path, description in zip(file_paths, descriptions):
            cmd = f'kaggle competitions submit -c {self.__competition} -f "{file_path}" -m "{description}" -q'
            lines = !{cmd}
            print(lines)
            if self.__verbose:
                print(f'Sended file {file_path} of submission {description} to competition {self.__competition}.')

    def receive_submission_scores(self, descriptions: Union[List[str], Tuple[str]]) -> pd.DataFrame:
        '''Receiving the results of decision verification.

        Arguments:
        - descriptions: list of solution descriptions.'''

        # Request a list of results
        cmd = f'kaggle competitions submissions -c {self.__competition}'
        while True:
            lines = !{cmd}
            if all([line.find('pending') == -1 for line in lines]):
                break
            sleep(1)

        if self.__verbose:
            descriptions_str = ', '.join(descriptions)
            print(f'Received scores of submissions {descriptions_str} from competition {self.__competition}.')

        # Find the header line
        for index, line in enumerate(lines):
            if line.split() == self.__COLUMNS:
                break

        # Find the position of columns in the text
        header_start_positions = [line.find(column) for column in self.__COLUMNS]
        header_end_positions = header_start_positions[1:]
        header_end_positions.append(len(line))

        # Leave the lines with the results
        lines = lines[index+2:]

        # Extract data from result rows
        data = [
            [
                line[header_start_position: header_end_position].strip()
                for header_start_position, header_end_position in zip(header_start_positions, header_end_positions)
            ] for line in lines
        ]

        # Create a dataset from the obtained results
        result = pd.DataFrame(data, columns=self.__COLUMNS)
        result['publicScore'] = pd.to_numeric(result['publicScore'], errors='coerce')
        result['privateScore'] = pd.to_numeric(result['privateScore'], errors='coerce')
        result['date'] = pd.to_datetime(result['date'])

        # We leave only the results of the sent predictions
        result = result[result['description'].isin(descriptions)]

        # Since there may be files with the same name, we take the latest ones
        indexes = sorted([indexes[0] for indexes in result.groupby('description').groups.values()])
        result = result.iloc[indexes]

        # Sort the results in the order they were sent
        result = result.sort_values('date').reset_index(drop=True)

        # Output the results
        if self.__verbose:
            print(result)

        # Return the result
        return result

## Pipeline for collecting information about underlying models in Keras Applications

### Create/download pipeline from Google Drive

In [None]:
pipeline = Pipeline(config=KERAS_BASE_MODELS_PROCESSING_PIPELINE, proj_path=gd_proj_path, is_prev_complete=True, platform=platform)

### Getting information about the sizes of input images and feature vectors

In [None]:
pipeline.next_stage()
skip = pipeline.is_stage_skipped
params = pipeline.stage_params

#### Creating a results table

In [None]:
if not skip:

    base_models_sizes = pd.DataFrame(columns=['base_model_name', 'image_size', 'feature_size'])
    base_models_sizes['base_model_name'] = [base_model_name for base_model_name, (base_model_size, _) in KERAS_BASE_MODELS.items() if base_model_size <= BASE_MODEL_MAX_SIZE]
    base_models_sizes.set_index('base_model_name', inplace=True)

#### Getting information about image sizes and model features

In [None]:
if not skip:

    with tqdm(base_models_sizes.index, unit='model') as t:
        for base_model_name in t:
            t.set_description(f'{base_model_name}')

            # Get the size of the input image
            base_model = getattr(applications, base_model_name)(include_top=True, weights=None)
            image_size = base_model.input_shape[1]
            if image_size is None:
                image_size = 299

            # Get the size of the feature vector
            base_model = getattr(applications, base_model_name)(include_top=False, weights=None)
            feature_size = base_model.output_shape[-1]

            # We enter the results into the table
            base_models_sizes.loc[base_model_name] = image_size, feature_size

#### Saving results to Google Drive

In [None]:
if not skip:

    pipeline.save_stage_result(base_models_sizes)

#### Fixing the completion of the stage

In [None]:
if not skip:

    pipeline.complete_stage()

#### Loading results from Google Drive (done when step is skipped)

In [None]:
if skip:

    if pipeline.is_stage_complete:
        base_models_sizes = pipeline.load_stage_result().set_index('base_model_name')

#### Output of results

In [None]:
if pipeline.is_stage_complete:
    display(base_models_sizes)

Unnamed: 0_level_0,image_size,feature_size
base_model_name,Unnamed: 1_level_1,Unnamed: 2_level_1
MobileNet,224,1024
MobileNetV2,224,1280
NASNetMobile,224,1056
EfficientNetB0,224,1280
EfficientNetB1,240,1280
DenseNet121,224,1024
EfficientNetB2,260,1408
DenseNet169,224,1664
EfficientNetB3,300,1536
EfficientNetV2B0,224,1280


### Measuring the inference time of models

In [None]:
pipeline.next_stage()
skip = pipeline.is_stage_skipped
params = pipeline.stage_params

#### Creating a results table

In [None]:
if not skip:

    model_inference_times = pd.DataFrame(columns=['base_model_name', 'inference_time'])
    model_inference_times['base_model_name'] = [base_model_name for base_model_name, (base_model_size, _) in KERAS_BASE_MODELS.items() if base_model_size <= BASE_MODEL_MAX_SIZE]
    model_inference_times.set_index('base_model_name', inplace=True)

#### We form the "heaviest" configuration of the model on top

In [None]:
if not skip:

    model_on_top_config = [(max(MODEL_ON_TOP_DROPOUT_RATES), max(MODEL_ON_TOP_DENSE_UNITS))] * max(MODEL_ON_TOP_DENSE_NUMS)

#### Measuring the inference time of models

In [None]:
if not skip:

    with tqdm(model_inference_times.index, unit='model') as t:
        for base_model_name in t:
            t.set_description(f'{base_model_name}')

            image_size, feature_size = base_models_sizes.loc[base_model_name, ['image_size', 'feature_size']]

            # Create a model with random weights to measure inference time
            base_model = build_base_model(base_model_name, image_size=image_size, weights=None, pooling=BASE_MODEL_POOLINGS, include_preprocess_input=True)
            model_on_top = build_model_on_top(feature_size, model_on_top_config)
            model = build_model(base_model=base_model, model_on_top=model_on_top)
            model.trainable = False

            # Create a dataset for measuring inference time
            dataset = tf.data.Dataset.from_tensor_slices(np.random.randint(0, 255, (params['batch_size'], image_size, image_size, 3)))
            dataset = dataset.batch(params['batch_size'])
            dataset = dataset.repeat(params['batches'])

            # Warming up the model on one batch
            model.predict(dataset.take(1))

            # Measure the total inference time
            inf_time = timeit(
                lambda: model.predict(dataset), number=params['repetitions']
            )

            # We enter the results into the table
            model_inference_times.loc[base_model_name, 'inference_time'] = inf_time

    # Calculate the average inference time at one step
    steps_total = params['batch_size'] * params['batches'] * params['repetitions']
    model_inference_times['inference_time'] /= steps_total

#### Saving results to Google Drive

In [None]:
if not skip:

    pipeline.save_stage_result(model_inference_times)

#### Fixing the completion of the stage

In [None]:
if not skip:

    pipeline.complete_stage()

#### Loading results from Google Drive (done when step is skipped)

In [None]:
if skip:

    if pipeline.is_stage_complete:
        model_inference_times = pipeline.load_stage_result().set_index('base_model_name')

#### Output of results

In [None]:
if pipeline.is_stage_complete:
    display(model_inference_times)

Unnamed: 0_level_0,inference_time
base_model_name,Unnamed: 1_level_1
MobileNet,0.009052
MobileNetV2,0.010616
NASNetMobile,0.029175
EfficientNetB0,0.013702
EfficientNetB1,0.015081
DenseNet121,0.020741
EfficientNetB2,0.018812
DenseNet169,0.0271
EfficientNetB3,0.020881
EfficientNetV2B0,0.013438


### Selecting a base model

In [None]:
pipeline.next_stage()
skip = pipeline.is_stage_skipped
params = pipeline.stage_params

#### Selection of models by the ratio of accuracy and max. inference time

In [None]:
if not skip:

    base_model_selection = pd.DataFrame()
    base_model_selection[['base_model_name', 'top1_accuracy']] = [(base_model_name, top1_accuracy) for base_model_name, (base_model_size, top1_accuracy) in KERAS_BASE_MODELS.items() if base_model_size <= BASE_MODEL_MAX_SIZE]
    base_model_selection.set_index('base_model_name', inplace=True)
    base_model_selection = base_model_selection.merge(model_inference_times['inference_time'], on='base_model_name')
    base_model_selection = base_model_selection.loc[base_model_selection['inference_time'] <= MAX_INFERENCE_TIME]

#### Ranking of selected models by the ratio of accuracy and max. inference time

In [None]:
if not skip:

    base_model_selection['top1_accuracy_score'] = (base_model_selection['top1_accuracy'] - min(base_model_selection['top1_accuracy'])) / (max(base_model_selection['top1_accuracy']) - min(base_model_selection['top1_accuracy']))
    base_model_selection['inference_time_score'] = (max(base_model_selection['inference_time']) - base_model_selection['inference_time']) / (max(base_model_selection['inference_time']) - min(base_model_selection['inference_time']))
    base_model_selection['weighted_score'] = base_model_selection['top1_accuracy_score']*params['top1_accuracy_weight'] + base_model_selection['inference_time_score']*params['inference_time_weight']
    base_model_selection['rank'] = base_model_selection['weighted_score'].rank(ascending=False).astype(int)
    base_model_selection.sort_values('rank', inplace=True)
    display(base_model_selection)

#### Selecting the most suitable model

In [None]:
if not skip:

    base_model_info = base_models_sizes.loc[base_model_selection.index[0]: base_model_selection.index[0]]

#### Saving results to Google Drive

In [None]:
if not skip:

    pipeline.save_stage_processing(base_model_selection)
    pipeline.save_stage_result(base_model_info)

#### Fixing the completion of the stage

In [None]:
if not skip:

    pipeline.complete_stage()

#### Loading results from Google Drive (done when step is skipped)

In [None]:
if skip:

    if pipeline.is_stage_complete:
        base_model_info = pipeline.load_stage_result().set_index('base_model_name')

#### Output of results

In [None]:
if pipeline.is_stage_complete:
    base_model_name = base_model_info.index[0]
    image_size, feature_size = base_model_info.iloc[0]
    display(base_model_info)

Unnamed: 0_level_0,image_size,feature_size
base_model_name,Unnamed: 1_level_1,Unnamed: 2_level_1
EfficientNetV2B0,224,1280


### Pipeline Execution Report

In [None]:
display(pipeline.report)

Unnamed: 0_level_0,params,platform,start_time,update_time,state
stage,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
sizes_retrieving,{'result_csv': 'base_model_sizes.csv'},colab,2025-03-31 10:22:43.463421,2025-03-31 10:23:23.066135,complete
inference_time_measuring,"{'batch_size': 1, 'batches': 1, 'repetitions':...",colab,2025-03-31 10:23:23.113022,2025-03-31 10:26:02.449111,complete
base_model_selection,"{'inference_time_weight': 0.6, 'top1_accuracy_...",colab,2025-03-31 10:26:02.493997,2025-03-31 10:26:02.583442,complete


## Image Preprocessing Pipeline

### Create/download pipeline from Google Drive

In [None]:
pipeline = Pipeline(config=IMAGE_PREPROCESSING_PIPELINE, proj_path=gd_proj_path, is_prev_complete=pipeline.is_complete, platform=platform)

### Extracting face images from the training dataset

In [None]:
pipeline.next_stage()
skip = pipeline.is_stage_skipped
params = pipeline.stage_params
train_faces_dataset_path = Path(params['path'])

#### Creating the dataset folder structure

In [None]:
if not skip:

    if train_faces_dataset_path.exists():
        shutil.rmtree(train_faces_dataset_path)
    train_faces_dataset_path.mkdir()
    for emotion in list(EMOTIONS):
        (train_faces_dataset_path / emotion).mkdir()

#### Loading the original training dataset

In [None]:
if not skip:

    train_dataset_path = Path(TRAIN_DATASET_PATH)
    if not train_dataset_path.exists():
        gdown.cached_download(
            url=TRAIN_DATASET_URL,
            path=f'temp.{TRAIN_DATASET_EXT}',
            postprocess=gdown.extractall,
            fuzzy=True)
        Path(f'temp.{TRAIN_DATASET_EXT}').unlink()

#### Getting a list of image files in the training dataset

In [None]:
if not skip:

    file_paths = [Path(file_path).relative_to(train_dataset_path).as_posix()
                  for file_path in utils.image_dataset_from_directory(train_dataset_path, shuffle=False, batch_size=1).file_paths]

#### Creating a process table

In [None]:
if not skip:

    processing = pd.DataFrame(
        columns = ['file_path', 'min_size', 'face_box']
    )
    processing['file_path'] = file_paths

#### Creating a results table

In [None]:
if not skip:

    train_face_extraction = pd.DataFrame(
        columns = [
            'emotion',
            'failed_images',
            'detected_faces',
        ],
    )
    train_face_extraction['emotion'] = list(EMOTIONS)
    train_face_extraction.set_index('emotion', inplace=True)

#### Creating face detector

In [None]:
if not skip:

    # Create a face detector
    face_detector = cv.CascadeClassifier(Path(cv.data.haarcascades) / params['classifier'])

#### Detecting faces

In [None]:
if not skip:

    with trange(processing.shape[0], unit='file') as t:

        min_sizes = []
        face_boxes = []

        for file_path in processing['file_path']:

            # Load a image
            image_path = (train_dataset_path / file_path).as_posix()
            image = cv.imread(image_path)

            # convert to gray scale
            grayscaled_image = cv.cvtColor(image, cv.COLOR_BGR2GRAY)

            # calculate min size of face
            min_size = int(image.shape[0] * params['face_min_ratio']), int(image.shape[1] * params['face_min_ratio'])

            # Detects faces of different sizes in the input image
            face_box = face_detector.detectMultiScale(
                grayscaled_image,
                params['scale_factor'], params['min_neighbors'], params['flags'], min_size
            )

            min_sizes.append(min_size)
            face_boxes.append(face_box)

            # Updating the Progress Bar counter
            t.update()

        # Enter the result into the process table
        processing['min_size'] = min_sizes
        processing['face_box'] = face_boxes

#### Saving train faces dataset

In [None]:
if not skip:

    with trange(processing.shape[0], unit='file') as t:

        for _, (file_path, face_box) in processing[['file_path', 'face_box']].iterrows():

            # Load a image
            with Image.open(train_dataset_path / file_path) as image:

                if len(face_box) == 0:
                    # If the detector did not find a face image,
                    # save orginal image
                    image.save(train_faces_dataset_path / file_path)
                else:
                    # Extract and savthe image of the first face found
                    x1, y1, w, h = face_box[0]
                    x2 = x1 + w
                    y2 = y1 + h
                    face_image = image.crop((x1, y1, x2, y2))
                    face_image.save(train_faces_dataset_path / file_path)

                t.update()

#### Saving training dataset face images to Google Drive archive

In [None]:
if not skip:

    if not pipeline.is_stage_failed:
        shutil.make_archive(gd_proj_path / train_faces_dataset_path.name, 'zip', train_faces_dataset_path)

#### Saving results to Google Drive

In [None]:
if not skip:

    pipeline.save_stage_processing(processing)
    if not pipeline.is_stage_failed:
        for emotion in EMOTIONS:
            indices = processing.index[processing['file_path'].str.contains(emotion)]
            images_num = indices.size
            detected_num = sum(processing.loc[indices, 'face_box'].apply(len) > 0)
            failed_num = images_num - detected_num
            train_face_extraction.loc[emotion] = failed_num, detected_num
        train_face_extraction = train_face_extraction.astype(int)
        pipeline.save_stage_result(train_face_extraction)

#### Fixing the completion of the stage

In [None]:
if not skip and not pipeline.is_stage_failed:

    pipeline.complete_stage()

#### Loading results from Google Drive (done when step is skipped)

In [None]:
if skip:

    if pipeline.is_stage_complete:
        train_face_extraction = pipeline.load_stage_result().set_index('emotion')

#### Output of results

In [None]:
if pipeline.is_stage_complete:
    display(train_face_extraction)

Unnamed: 0_level_0,failed_images,detected_faces
emotion,Unnamed: 1_level_1,Unnamed: 2_level_1
anger,275,6748
contempt,62,3023
disgust,109,3046
fear,242,4802
happy,168,5787
neutral,249,6546
sad,374,6366
surprise,315,6008
uncertain,257,5670


### Extracting face images from the test dataset

In [None]:
pipeline.next_stage()
skip = pipeline.is_stage_skipped
params = pipeline.stage_params
test_faces_dataset_path = Path(params['path'])

#### Loading the original test dataset

In [None]:
if not skip:

    test_dataset_path = Path(TEST_DATASET_PATH)
    if not test_dataset_path.exists():
        gdown.cached_download(
            url=TEST_DATASET_URL,
            path=f'temp.{TEST_DATASET_EXT}',
            postprocess=gdown.extractall,
            fuzzy=True)
        Path(f'temp.{TEST_DATASET_EXT}').unlink()

#### Creating a dataset folder

In [None]:
if not skip:

    if test_faces_dataset_path.exists():
        shutil.rmtree(test_faces_dataset_path)
    test_faces_dataset_path.mkdir()

#### Getting a list of image files in the source dataset

In [None]:
if not skip:

    file_paths = [Path(file_path).relative_to(test_dataset_path).as_posix()
                  for file_path in utils.image_dataset_from_directory(test_dataset_path, labels=None, shuffle=False, batch_size=1).file_paths]

#### Creating a process table

In [None]:
if not skip:

    processing = pd.DataFrame(
        columns = ['file_path', 'face_box']
    )
    processing['file_path'] = file_paths

#### Creating a results table

In [None]:
if not skip:

    test_face_extraction = pd.DataFrame(
        columns = [
            'emotion',
            'failed_images',
            'detected_faces',
        ],
    )
    test_face_extraction['emotion'] = ['all']
    test_face_extraction.set_index('emotion', inplace=True)

#### Creating face detector

In [None]:
if not skip:

    # Create a face detector
    face_detector = cv.CascadeClassifier(Path(cv.data.haarcascades) / params['classifier'])

#### Extracting face images from the test dataset

In [None]:
if not skip:

    with trange(processing.shape[0], unit='file') as t:

        min_sizes = []
        face_boxes = []

        for file_path in processing['file_path']:

            # Load a image
            image_path = (test_dataset_path / file_path).as_posix()
            image = cv.imread(image_path)

            # convert to gray scale
            grayscaled_image = cv.cvtColor(image, cv.COLOR_BGR2GRAY)

            # calculate min size of face
            min_size = int(image.shape[0] * params['face_min_ratio']), int(image.shape[1] * params['face_min_ratio'])

            # Detects faces of different sizes in the input image
            face_box = face_detector.detectMultiScale(
                grayscaled_image,
                params['scale_factor'], params['min_neighbors'], params['flags'], min_size
            )

            min_sizes.append(min_size)
            face_boxes.append(face_box)

            # Updating the Progress Bar counter
            t.update()

        # Enter the result into the process table
        processing['min_size'] = min_sizes
        processing['face_box'] = face_boxes

#### Saving test faces dataset

In [None]:
if not skip:

    with trange(processing.shape[0], unit='file') as t:

        for _, (file_path, face_box) in processing[['file_path', 'face_box']].iterrows():

            # Load a image
            with Image.open(test_dataset_path / file_path) as image:

                if len(face_box) == 0:
                    # If the detector did not find a face image,
                    # save orginal image
                    image.save(test_faces_dataset_path / file_path)
                else:
                    # Extract and savthe image of the first face found
                    x1, y1, w, h = face_box[0]
                    x2 = x1 + w
                    y2 = y1 + h
                    face_image = image.crop((x1, y1, x2, y2))
                    face_image.save(test_faces_dataset_path / file_path)

                t.update()

#### Saving test dataset face images to Google Drive archive

In [None]:
if not skip:

    if not pipeline.is_stage_failed:
        shutil.make_archive(gd_proj_path / test_faces_dataset_path.name, 'zip', test_faces_dataset_path)

#### Saving results to Google Drive

In [None]:
if not skip:

    pipeline.save_stage_processing(processing)
    if not pipeline.is_stage_failed:
        images_num = processing.shape[0]
        detected_num = sum(processing['face_box'].apply(len) > 0)
        failed_num = images_num - detected_num
        test_face_extraction.loc['all'] = failed_num, detected_num
        test_face_extraction = test_face_extraction.astype(int)
        pipeline.save_stage_result(test_face_extraction)

#### Fixing the completion of the stage

In [None]:
if not skip and not pipeline.is_stage_failed:

    pipeline.complete_stage()

#### Loading results from Google Drive (done when step is skipped)

In [None]:
if skip:

    if pipeline.is_stage_complete:
        test_face_extraction = pipeline.load_stage_result().set_index('emotion')

#### Output of results

In [None]:
if pipeline.is_stage_complete:
    display(test_face_extraction)

Unnamed: 0_level_0,failed_images,detected_faces
emotion,Unnamed: 1_level_1,Unnamed: 2_level_1
all,175,4825


### Extracting features from training dataset

In [None]:
pipeline.next_stage()
skip = pipeline.is_stage_skipped
params = pipeline.stage_params
train_features_dataset_path = Path(params['path'])

#### Reading training dataset of face images from archive on Google Drive

In [None]:
if not skip:

    if not train_faces_dataset_path.exists():
        train_faces_dataset_path.mkdir()
        shutil.unpack_archive(
            gd_proj_path / train_faces_dataset_path.with_suffix('.zip').name,
            train_faces_dataset_path
        )

#### Creating a directory for feature files

In [None]:
if not skip:

    if train_features_dataset_path.exists():
        shutil.rmtree(train_features_dataset_path)
    train_features_dataset_path.mkdir()

#### Extract and save features for the selected model

In [None]:
if not skip:

    # Form a dataset with images of the optimal size for the model
    # Add the original images to the dataset
    dataset = utils.image_dataset_from_directory(
        train_faces_dataset_path,
        batch_size=params['batch_size'],
        label_mode='int',
        image_size=(image_size, image_size),
        shuffle=False)

    # The number of images is determined by the length of the file list
    images_cnt = len(dataset.file_paths)

    # Initialize the pre-production of the next batch
    dataset = dataset.prefetch(buffer_size=params['buffer_size'])

    # Create arrays of features and labels
    features = np.zeros(shape=(images_cnt, feature_size))
    labels = np.zeros(shape=(images_cnt, 1))

    # Create a base model trained on the ImageNet dataset
    base_model = build_base_model(
        base_model_name, weights='imagenet',
        image_size=image_size,
        pooling=BASE_MODEL_POOLINGS,
        include_preprocess_input=True,
        training=False
    )

    # Create a Progress Bar to track the progress of feature extraction
    with tqdm(dataset, desc=f'{base_model_name}', unit='batch') as t:

        # Slice indices for insertion into feature and label arrays
        start_index = 0
        end_index = 0

        # We go through the batches of the image dataset, each of which consists of a batch of images and a batch of labels
        for batch, (batch_images, batch_labels) in enumerate(dataset.as_numpy_iterator()):

            end_index += batch_images.shape[0]

            # Extract features from a batch of images and insert them into an array
            batch_features = base_model(batch_images)
            features[start_index: end_index] = batch_features

            # Transform the size of the batch of labels and insert it into the array
            batch_labels = batch_labels.reshape(-1, 1)
            labels[start_index: end_index] = batch_labels

            start_index = end_index

            # Updating the Progress Bar
            t.update()

    # Write arrays of features and labels to a file
    feature_path = (train_features_dataset_path / base_model_name).with_suffix('.npz')
    np.savez(feature_path, features=features, labels=labels)

#### Saving the feature file archive to Google Drive

In [None]:
if not skip:

    shutil.make_archive(gd_proj_path / train_features_dataset_path.name, 'zip', train_features_dataset_path)

#### Fixing the completion of the stage

In [None]:
if not skip:

    pipeline.complete_stage()

### Extracting features from the test dataset

In [None]:
pipeline.next_stage()
skip = pipeline.is_stage_skipped
params = pipeline.stage_params
test_features_dataset_path = Path(params['path'])

#### Reading a test dataset of face images from the archive on Google Drive

In [None]:
if not skip:

    if not test_faces_dataset_path.exists():
        test_faces_dataset_path.mkdir()
        shutil.unpack_archive(
            gd_proj_path / test_faces_dataset_path.with_suffix('.zip').name,
            test_faces_dataset_path
        )

#### Creating directories for feature files

In [None]:
if not skip:

    if test_features_dataset_path.exists():
        shutil.rmtree(test_features_dataset_path)
    test_features_dataset_path.mkdir()

#### Extract and save features for the selected model

In [None]:
if not skip:

    # Form a dataset with images of the optimal size for the model
    # Add the original images to the dataset
    dataset = utils.image_dataset_from_directory(
        test_faces_dataset_path,
        batch_size=params['batch_size'],
        label_mode=None,
        image_size=(image_size, image_size),
        shuffle=False)

    # The number of images is determined by the length of the file list
    images_cnt = len(dataset.file_paths)

    # Initialize the pre-production of the next batch
    dataset = dataset.prefetch(buffer_size=params['buffer_size'])

    # Create arrays of features
    features = np.zeros(shape=(images_cnt, feature_size))

    # Create a base model trained on the ImageNet dataset
    base_model = build_base_model(
        base_model_name, weights='imagenet',
        image_size=image_size,
        pooling=BASE_MODEL_POOLINGS,
        include_preprocess_input=True,
        training=False
    )

    # Create a Progress Bar to track the progress of feature extraction
    with tqdm(dataset, desc=f'{base_model_name}', unit='batch') as t:

        # Slice indices for insertion into feature and label arrays
        start_index = 0
        end_index = 0

        # We go through the batches of the image dataset, each of which consists of a batch of images and a batch of labels
        for batch, batch_images in enumerate(dataset.as_numpy_iterator()):

            end_index += batch_images.shape[0]

            # Extract features from a batch of images and insert them into an array
            batch_features = base_model(batch_images)
            features[start_index: end_index] = batch_features

            start_index = end_index

            # Updating the Progress Bar
            t.update()

    # Write arrays of features and labels to a file
    feature_path = (test_features_dataset_path / base_model_name).with_suffix('.npz')
    np.savez(feature_path, features=features)

#### Saving the feature file archive to Google Drive

In [None]:
if not skip:

    shutil.make_archive(gd_proj_path / test_features_dataset_path.name, 'zip', test_features_dataset_path)

#### Fixing the completion of the stage

In [None]:
if not skip:

    pipeline.complete_stage()

### Additional cleaning of the training dataset

In [None]:
pipeline.next_stage()
skip = pipeline.is_stage_skipped
params = pipeline.stage_params
train_faces_clean_features_path = Path(params['features_path'])
train_faces_clean_dataset_path = Path(params['dataset_path'])

#### Creating directories for feature files

In [None]:
if not skip:

    if train_faces_clean_features_path.exists():
        shutil.rmtree(train_faces_clean_features_path)
    train_faces_clean_features_path.mkdir()

#### Creating the dataset folder structure

In [None]:
if not skip:

    if train_faces_clean_dataset_path.exists():
        shutil.rmtree(train_faces_clean_dataset_path)
    train_faces_clean_dataset_path.mkdir()
    for emotion in EMOTIONS:
        (train_faces_clean_dataset_path / emotion).mkdir()

#### Reading training dataset of features from archive on Google Drive

In [None]:
if not skip:

    if not train_features_dataset_path.exists():
        train_features_dataset_path.mkdir()
        shutil.unpack_archive(
            gd_proj_path / train_features_dataset_path.with_suffix('.zip').name,
            train_features_dataset_path
        )

#### Reading training dataset face images from Google Drive archive

In [None]:
if not skip:

    if not train_faces_dataset_path.exists():
        train_faces_dataset_path.mkdir()
        shutil.unpack_archive(
            gd_proj_path / train_faces_dataset_path.with_suffix('.zip').name,
            train_faces_dataset_path
        )

#### Getting a list of training dataset face image files

In [None]:
if not skip:

    file_paths = [Path(file_path).relative_to(train_faces_dataset_path).as_posix()
                  for file_path in utils.image_dataset_from_directory(train_faces_dataset_path, shuffle=False, batch_size=1).file_paths]

#### Creating a process table

In [None]:
if not skip:

    processing = pd.DataFrame(
        columns = ['file_path',
                   'similarity_max', 'similar_to', 'similarity', 'duplicated',
                   'similarity_median_min', 'similarity_median', 'different',
                   'failed']
    )
    processing['file_path'] = file_paths

#### Creating a results table

In [None]:
if not skip:

    train_cleaning = pd.DataFrame(
        columns = [
            'emotion',
            'duplicated_number',
            'different_number',
            'failed_number',
            'remain_number',
        ],
    )
    train_cleaning['emotion'] = list(EMOTIONS)
    train_cleaning.set_index('emotion', inplace=True)

#### Extraction training dataset of features from archive

In [None]:
if not skip:

    file_path = (train_features_dataset_path / base_model_name).with_suffix('.npz')
    with np.load(file_path, allow_pickle=True) as data:
        train_features = data['features']
        train_labels = data['labels']

#### Identifying features that are too similar to each other and features that are too different from others

In [None]:
if not skip:

    index = 0
    for emotion in EMOTIONS:
        file_paths = [Path(file_path).relative_to(train_faces_dataset_path).as_posix()
                      for file_path in utils.image_dataset_from_directory(train_faces_dataset_path / emotion, label_mode=None, shuffle=False, batch_size=1).file_paths]
        emotion_size = len(file_paths)

        # Get a matrix of feature vectors
        features = train_features[index: index+emotion_size]
        # Form a matrix of similarity of features between themselves
        similarity_matrix = cosine_similarity(features, features)
        similarity_rows = [
            similarity_matrix[row, row + 1:]
            for row in range(similarity_matrix.shape[0])
        ]
        # Get flatten array of similarity between all combinations of two feature vectors
        similarity = np.concatenate(similarity_rows)
        # Find the upper bound of reliability
        similarity_max = np.mean(similarity) + np.std(similarity) * 3
        # Enter the similarity data into the process table
        indices = list(range(index, index + emotion_size))
        processing.loc[indices, 'similarity_max'] = similarity_max
        # Get a list of pairs of features that are too similar to each other
        similar_pairs = np.argwhere(similarity_matrix > similarity_max)
        for similar_pair in similar_pairs:
            if (similar_pair[0] >= similar_pair[1]):
                continue
            first_index = indices[similar_pair[0]]
            second_index = indices[similar_pair[1]]
            if processing.at[second_index, 'similar_to'] is not np.nan:
                continue
            processing.at[second_index, 'similar_to'] = processing.at[first_index, 'file_path']
            processing.at[second_index, 'similarity'] = similarity_matrix[similar_pair[0], similar_pair[1]]
        # Find the median values of similarity of each feature with other features
        similarity_medians = np.median(similarity_matrix, axis=0)
        # Find the upper bound of reliability
        similarity_median_min = np.mean(similarity_medians) - 3 * np.std(similarity_medians)
        # Enter the data on the difference into the process table
        processing.loc[indices, 'similarity_median_min'] = similarity_median_min
        processing.loc[indices, 'similarity_median'] = similarity_medians
        # Mark the signs that have very similar characteristics,
        # or are too different from other features
        processing.loc[indices, 'duplicated'] = processing.loc[indices, 'similarity'].notna()
        processing.loc[indices, 'different'] = processing.loc[indices, 'similarity_median'] < processing.loc[indices, 'similarity_median_min']
        processing.loc[indices, 'failed'] = processing.loc[indices, 'duplicated'] | processing.loc[indices, 'different']
        # Enter data into the results table
        train_cleaning.loc[emotion, 'duplicated_number'] = sum(processing.loc[indices, 'duplicated'])
        train_cleaning.loc[emotion, 'different_number'] = sum(processing.loc[indices, 'different'])
        train_cleaning.loc[emotion, 'failed_number'] = sum(processing.loc[indices, 'failed'])
        train_cleaning.loc[emotion, 'remain_number'] = emotion_size - train_cleaning.loc[emotion, 'failed_number']
        # First index of the next emotion
        index += emotion_size

    processing = processing.convert_dtypes()

#### Copying files of reliable face images of the training dataset

In [None]:
if not skip:

    indices = processing.loc[~processing['failed']].index
    train_clean_features = train_features[indices]
    train_clean_labels = train_labels[indices]
    # Write arrays of features and labels to a file
    feature_path = (train_faces_clean_features_path / base_model_name).with_suffix('.npz')
    np.savez(feature_path, features=train_clean_features, labels=train_clean_labels)

#### Copying files of reliable face images of the training dataset

In [None]:
if not skip:

    with tqdm(processing, unit='file') as t:
        # We go through the iterations of augmentations
        for _, (file_path, failed) in processing[['file_path', 'failed']].iterrows():
            if not failed:
                source = train_faces_dataset_path / file_path
                dest = train_faces_clean_dataset_path / file_path
                shutil.copyfile(source, dest)
            # Updating the Progress Bar counter
            t.update()

#### Saving the cleaned training dataset of face images and dataset of its features to a Google Drive archive

In [None]:
if not skip:

    pipeline.save_stage_processing(processing)
    shutil.make_archive(gd_proj_path / train_faces_clean_features_path.name, 'zip', train_faces_clean_features_path)
    shutil.make_archive(gd_proj_path / train_faces_clean_dataset_path.name, 'zip', train_faces_clean_dataset_path)

#### Saving results to Google Drive

In [None]:
if not skip:

    pipeline.save_stage_result(train_cleaning)

#### Fixing the completion of the stage

In [None]:
if not skip:

    pipeline.complete_stage()

#### Loading results from Google Drive (done when step is skipped)

In [None]:
if skip:

    if pipeline.is_stage_complete:
        train_cleaning = pipeline.load_stage_result().set_index('emotion')

#### Output of results

In [None]:
if pipeline.is_stage_complete:
    display(train_cleaning)

Unnamed: 0_level_0,duplicated_number,different_number,failed_number,remain_number
emotion,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
anger,816,68,877,6146
contempt,102,35,137,2948
disgust,465,28,493,2662
fear,1349,26,1372,3672
happy,93,61,154,5801
neutral,172,80,250,6545
sad,1155,52,1201,5539
surprise,898,36,933,5390
uncertain,829,36,864,5063


### Pipeline Execution Report

In [None]:
display(pipeline.report)

Unnamed: 0_level_0,params,platform,start_time,update_time,state
stage,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
train_face_extraction,"{'path': 'train_faces', 'classifier': 'haarcas...",colab,2025-03-31 10:28:56.835055,2025-03-31 10:41:25.374223,complete
test_face_extraction,"{'path': 'test_faces', 'classifier': 'haarcasc...",colab,2025-03-31 10:41:25.406164,2025-03-31 10:42:47.195168,complete
train_face_feature_extraction,"{'path': 'train_features', 'batch_size': 64, '...",colab,2025-03-31 10:42:47.223775,2025-03-31 10:50:29.608719,complete
test_face_feature_extraction,"{'path': 'test_features', 'batch_size': 64, 'b...",colab,2025-03-31 10:50:29.623191,2025-03-31 10:51:19.507252,complete
train_cleaning,"{'features_path': 'train_clean_features', 'dat...",colab,2025-03-31 10:51:19.524240,2025-03-31 10:56:25.587176,complete


## Model creation pipeline

### Create/download pipeline from Google Drive

In [None]:
pipeline = Pipeline(config=MODEL_BUILDING_PIPELINE, proj_path=gd_proj_path, is_prev_complete=pipeline.is_complete, platform=platform)

### Selecting the best model on top

In [None]:
pipeline.next_stage()
skip = pipeline.is_stage_skipped
params = pipeline.stage_params
model_on_top_selection_path = Path(params['path'])
model_on_top_selection_logs_path = model_on_top_selection_path / 'logs'
model_on_top_selection_models_path = model_on_top_selection_path / 'models'
model_on_top_selection_predictions_path = model_on_top_selection_path / 'predictions'

#### Reading training dataset of features from archive on Google Drive

In [None]:
if not skip:

    if not train_faces_clean_features_path.exists():
        train_faces_clean_features_path.mkdir()
        shutil.unpack_archive(
            gd_proj_path / train_faces_clean_features_path.with_suffix('.zip').name,
            train_faces_clean_features_path
        )

#### Reading test dataset of features from archive on Google Drive

In [None]:
if not skip:

    if not test_features_dataset_path.exists():
        test_features_dataset_path.mkdir()
        shutil.unpack_archive(
            gd_proj_path / test_features_dataset_path.with_suffix('.zip').name,
            test_features_dataset_path
        )

#### Reading a test dataset of face images from the archive on Google Drive

In [None]:
if not skip:

    if not test_faces_dataset_path.exists():
        test_faces_dataset_path.mkdir()
        shutil.unpack_archive(
            gd_proj_path / test_faces_dataset_path.with_suffix('.zip').name,
            test_faces_dataset_path
        )

#### Getting a list of image files in the test dataset

In [None]:
if not skip:

    image_paths = [Path(file_path).relative_to(test_faces_dataset_path).as_posix()
                   for file_path in utils.image_dataset_from_directory(test_faces_dataset_path, labels=None, shuffle=False, batch_size=1).file_paths]

#### Generate a list of possible hyperparameter combinations

In [None]:
if not skip:

    # Using the enumeration method, we obtain a list of possible combinations of the regularization coefficient and the size of the fully connected layer
    dropout_rate_dense_units_combs = [(dropout_rate, dense_units) for dense_units in MODEL_ON_TOP_DENSE_UNITS for dropout_rate in MODEL_ON_TOP_DROPOUT_RATES]
    # Get a list of combinations

    for dense_num in MODEL_ON_TOP_DENSE_NUMS:
        model_on_top_configs += set(itertools.permutations(dropout_rate_dense_units_combs, dense_num)).union(set(itertools.combinations_with_replacement(dropout_rate_dense_units_combs, dense_num)))
    model_on_top_configs = sorted(model_on_top_configs)
    model_on_top_config_strs = [', '.join([str(element) for element in model_on_top_config]) for model_on_top_config in model_on_top_configs]

#### Delete existing logs and model weights

In [None]:
if not skip:

    shutil.rmtree(model_on_top_selection_path, ignore_errors=True)
    model_on_top_selection_path.mkdir()
    model_on_top_selection_models_path.mkdir()
    model_on_top_selection_predictions_path.mkdir()
    model_on_top_selection_logs_path.mkdir()
    for model_on_top_config_str in model_on_top_config_strs:
        (model_on_top_selection_models_path / model_on_top_config_str).mkdir()
        (model_on_top_selection_logs_path / model_on_top_config_str).mkdir()
        (model_on_top_selection_predictions_path / model_on_top_config_str).mkdir()

#### Creating a table of learning results

In [None]:
if not skip:

    processing = pd.DataFrame(
        columns=[
            'model_on_top_config',
            'best_epoch',
            'loss',
            'metric',
            'submission',
            'public_score',
            'private_score',
            'score',
        ],
        dtype='float'
    )
    processing['model_on_top_config'] = model_on_top_config_strs
    processing.set_index('model_on_top_config', inplace=True)

#### Creating a results table

In [None]:
if not skip:

    model_on_top_selection = pd.DataFrame(
        columns = [
            'model_on_top_config',
            'best_score',
        ],
        dtype=float
    )
    model_on_top_selection.set_index('model_on_top_config', inplace=True)

#### Creating training dataset

In [None]:
if not skip:

    # Create a training and validating dataset
    train_dataset = build_feature_dataset(
        (train_faces_clean_features_path / base_model_name).with_suffix('.npz').as_posix(),
        batch_size=params['batch_size'],
        shuffle=True,
        seed=SEED
    )

#### Creating test dataset

In [None]:
if not skip:

    # Create a training and validating dataset
    test_dataset = build_feature_dataset(
        (test_features_dataset_path / base_model_name).with_suffix('.npz').as_posix(),
        batch_size=params['batch_size'],
        shuffle=False,
        labeled=False
    )

#### Training of models on top

In [None]:
if not skip:

    # Start logging for TensorBoard
    %tensorboard --logdir {model_on_top_selection_logs_path.as_posix()}

In [None]:
if not skip:

    if isinstance(EMOTIONS, (list, tuple)):
        loss = 'sparse_categorical_crossentropy'
        metric = 'sparse_categorical_accuracy'
    else:
        loss = 'mean_absolute_error'
        metric = 'mean_absolute_percentage_error'

    kaggle = Kaggle()

    # Iterating through configurations
    with tqdm(model_on_top_configs, unit='config') as t:

        for model_on_top_config, model_on_top_config_str in zip(model_on_top_configs, model_on_top_config_strs):

            t.set_description(model_on_top_config_str)

            # Create and compile the model on top
            model_on_top = build_model_on_top(
                feature_size=feature_size,
                config=model_on_top_config,
                training=True
            )

            # Create an optimizer with exponential speed decay (every epoch)
            optimizer = getattr(optimizers, params['optimizer_name'])(learning_rate=params['initial_learning_rate'])

            # Compile the model
            model_on_top.compile(optimizer=optimizer, loss=loss, metrics=[metric])

            # Reduced learning speed
            learning_rate_callback = LearningRateExpDecayScheduler(params['learning_rate_decay_rate'])

            # Stop training if accuracy on test data stops growing
            earlystop_callback = callbacks.EarlyStopping(
                monitor=metric,
                patience=params['patience'],
                restore_best_weights=True
            )

            # Displaying training graphs in TensorBoard
            tensorboard_callback = callbacks.TensorBoard(
                log_dir=model_on_top_selection_logs_path / model_on_top_config_str,
                update_freq="epoch",
            )

            # Update Progress Bar information
            epoch_end_callback = callbacks.LambdaCallback(
                on_epoch_end=lambda epoch, logs: t.set_description(
                    f"{model_on_top_config_str} [#{epoch+1}]"
                )
            )

            # Train the model on top
            history = model_on_top.fit(
                train_dataset,
                epochs=params['epochs'],
                verbose=VERBOSE,
                callbacks=[learning_rate_callback, earlystop_callback, tensorboard_callback, epoch_end_callback]
            )

            # model_on_top.set_weights(earlystop_callback.best_weights)
            model_on_top.save(model_on_top_selection_models_path / model_on_top_config_str / 'best_model.keras', include_optimizer=False)

            # Using history, we find the best value of the accuracy metric on the test dataset and enter it into the process table
            processing.loc[model_on_top_config_str, 'best_epoch'] = earlystop_callback.best_epoch
            processing.loc[model_on_top_config_str, 'loss'] = history.history['loss'][earlystop_callback.best_epoch]
            processing.loc[model_on_top_config_str, 'metric'] = history.history[metric][earlystop_callback.best_epoch]

            # Scoring the on top model on Kaggle
            predicts = model_on_top.predict(test_dataset, verbose=VERBOSE)
            if isinstance(EMOTIONS, (list, tuple)):
                labels = predicts.argmax(axis=1)
            else:
                errors = np.apply_along_axis(lambda a: np.linalg.norm(a - np.array(list(EMOTIONS.values())), axis=1), arr=predicts, axis=1)
                labels = errors.argmin(axis=1)

            submission = f'{base_model_name}_{model_on_top_config_str}'.replace(', ', '_').replace('(', '').replace(')', '')
            file_path = model_on_top_selection_predictions_path / model_on_top_config_str / 'submission.csv'

            df = pd.DataFrame(columns=['image_path', 'emotion'])
            df['image_path'] = image_paths
            df['emotion'] = [list(EMOTIONS)[label] for label in labels]
            df.to_csv(file_path, index=False)

            kaggle.send_submission_files(descriptions=[submission], file_paths=[file_path])
            sleep(10)
            scores = kaggle.receive_submission_scores(descriptions=[submission]).loc[0, ['publicScore', 'privateScore']].convert_dtypes()
            scores['meanScore'] = (scores['publicScore'] + scores['privateScore']) / 2
            processing.loc[model_on_top_config_str, 'submission'] = submission
            processing.loc[model_on_top_config_str, ['public_score', 'private_score', 'score']] = scores.to_list()

            # Updating the Progress Bar
            t.update()

#### Selecting the model on top configuration that provides the best accuracy on test data

In [None]:
if not skip:

    processing['best_epoch'] = processing['best_epoch'].astype(int)
    best_model_on_top = processing['score'].idxmax()
    best_score = processing['score'].max()
    model_on_top_selection.loc[best_model_on_top] = [best_score]

#### Saving training logs and model weights in an archive on Google Drive

In [None]:
if not skip:

    pipeline.save_stage_processing(processing.convert_dtypes())
    shutil.make_archive(gd_proj_path / model_on_top_selection_path.name, 'zip', model_on_top_selection_path)

#### Saving learning results to Google Drive

In [None]:
if not skip:

    pipeline.save_stage_result(model_on_top_selection)

#### Fixing the completion of the stage

In [None]:
if not skip:

    pipeline.complete_stage()

#### Loading learning results from Google Drive (done when skipping a stage)

In [None]:
if skip:

    if pipeline.is_stage_complete:
        model_on_top_selection = pipeline.load_stage_result().set_index('model_on_top_config')

#### Output of results

In [None]:
if pipeline.is_stage_complete:
    model_on_top_config_str = model_on_top_selection.index[0]
    model_on_top_config_substrs = [element.split(', ') for element in model_on_top_config_str[1:-1].split('), (')]
    model_on_top_config = [(float(dropout_rate_str), int(dense_units_str)) for dropout_rate_str, dense_units_str in model_on_top_config_substrs]
    display(model_on_top_config_str)

'(0.0, 1024)'

### Fine tuning the model

In [None]:
pipeline.next_stage()
skip = pipeline.is_stage_skipped
params = pipeline.stage_params
model_fine_tuning_path = Path(params['path'])
model_fine_tuning_logs_path = model_fine_tuning_path / 'logs'
model_fine_tuning_models_path = model_fine_tuning_path / 'models'

#### Reading training dataset face images from Google Drive archive

In [None]:
if not skip:

    if not train_faces_clean_dataset_path.exists():
        train_faces_clean_dataset_path.mkdir()
        shutil.unpack_archive(
            gd_proj_path / train_faces_clean_dataset_path.with_suffix('.zip').name,
            train_faces_clean_dataset_path
        )

#### Reading training logs and model weights of the model on top selection from an archive on Google Drive

In [None]:
if not skip:

    if pipeline.is_stage_started and not model_on_top_selection_path.exists():
        model_on_top_selection_path.mkdir()
        shutil.unpack_archive(
            gd_proj_path / model_on_top_selection_path.with_suffix('.zip').name,
            model_on_top_selection_path
        )

#### Delete existing logs and model weights (done on first iteration)

In [None]:
if not skip:

    # Delete the folder for logs and model weights on the first pass
    if pipeline.is_stage_started:
        shutil.rmtree(model_fine_tuning_path, ignore_errors=True)

    # Create a folder for logs and model weights
    if not model_fine_tuning_path.exists():
        model_fine_tuning_path.mkdir()
        model_fine_tuning_logs_path.mkdir()
        model_fine_tuning_models_path.mkdir()

#### Loading logs and model weights from the previous iteration (done during the second and subsequent iterations)

In [None]:
if not skip:

    if not pipeline.is_stage_started:
        # Read the archive of previous passes from the archive on Google Drive
        shutil.unpack_archive(
            gd_proj_path / model_fine_tuning_path.with_suffix('.zip').name,
            model_fine_tuning_path
        )

#### Creating a table of learning results

In [None]:
if not skip:

    model_fine_tuning = pd.DataFrame(
        columns=[
            'base_model_name',
            'model_on_top_config',
            'best_epoch',
            'best_loss',
            'best_metric'
        ],
        dtype=np.int32
    )
    model_fine_tuning.loc[0, 'base_model_name'] = base_model_name
    model_fine_tuning.loc[0, 'model_on_top_config'] = model_on_top_config_str
    model_fine_tuning.set_index(['base_model_name', 'model_on_top_config'], inplace=True)

#### Initializing iteration

In [None]:
if not skip:

    if pipeline.is_stage_started:
        # First iteration
        initial_epoch = 0
        initial_learning_rate = params['initial_learning_rate']
    else:
        # Second or subsequent iteration
        # Find the initial epoch number according to the protocol
        processing = pipeline.load_stage_processing().set_index('epoch')
        last_epoch = processing.index[-1]
        initial_epoch = last_epoch + 1
        initial_learning_rate = processing.loc[last_epoch, 'learning_rate']
        best_epoch = processing['metric'].idxmax()
        best_loss = processing.loc[best_epoch, 'loss']
        best_metric = processing.loc[best_epoch, 'metric']
        best_weights = models.load_model(model_fine_tuning_models_path / f'best_model.keras', safe_mode=False).get_weights()
        wait = last_epoch - best_epoch

    # Calculate the last epoch number
    final_epoch = min(initial_epoch + params['epochs_per_run'], params['epochs'])

#### Creating training dataset

In [None]:
if not skip:

    # If training is performed in Valence-Arousal mode, then emotion labels are replaced with pairs of values valence, arousal
    if isinstance(EMOTIONS, (list, tuple)):
        labels = 'inferred'
    else:
        file_paths = utils.image_dataset_from_directory(
            train_faces_clean_dataset_path, shuffle=False
        ).file_paths
        labels = [
            EMOTIONS[Path(file_path).parent.name]
            for file_path in file_paths
        ]

    # Create a training dataset
    train_dataset = utils.image_dataset_from_directory(
        train_faces_clean_dataset_path,
        batch_size=params['batch_size'],
        labels=labels,
        # label_mode='int',
        image_size=(image_size, image_size),
        shuffle=True,
        seed=SEED,
        pad_to_aspect_ratio=True,
        verbose=VERBOSE
    )

    # Initialize the pre-production of the next batch
    train_dataset = train_dataset.prefetch(buffer_size=params['buffer_size'])

#### Create a model for fine-tuning (done in the first iteration)

In [None]:
if not skip:

    if initial_epoch == 0:
        # Create the selected base model trained on the ImageNet dataset
        base_model = build_base_model(
            base_model_name,
            weights='imagenet',
            image_size=image_size,
            pooling=BASE_MODEL_POOLINGS,
            include_preprocess_input=True,
            training=True
        )
        base_model.trainable=False
        # Load the selected model on top
        model_on_top = models.load_model(
            model_on_top_selection_models_path / model_on_top_config_str / 'best_model.keras',
            safe_mode=False
        )
        # Create an augmentation model
        augment_model = build_augment_model(
            image_size=image_size,
            flip=params['flip'],
            rotation_factor=params['rotation_factor'],
            zoom_factor=params['zoom_factor'],
            contrast_factor=params['contrast_factor'],
            brightness_factor=params['brightness_factor'],
            training=True,
        )
        # We combine all the models created above into one
        model = build_model(augment_model=augment_model, base_model=base_model, model_on_top=model_on_top)
        model.trainable = False
        model.trainable = True

#### Load the latest model to train the model on top from the previous iteration (done on the second and subsequent iterations)

In [None]:
if not skip:

    if initial_epoch > 0:
        model = models.load_model(model_fine_tuning_models_path / 'last_model.keras', safe_mode=False)

#### Compiling the model for fine-tuning

In [None]:
if not skip:

    if isinstance(EMOTIONS, (list, tuple)):
        loss = 'sparse_categorical_crossentropy'
        metric = 'sparse_categorical_accuracy'
        mode = 'max'
    else:
        loss = 'mean_absolute_error'
        metric = 'mean_absolute_percentage_error'
        mode = 'min'

    if initial_epoch == 0:
        # Create an optimizer with exponential speed decay (every epoch)
        optimizer = getattr(optimizers, params['optimizer_name'])(learning_rate=initial_learning_rate)
        # Compile the model
        model.compile(optimizer=optimizer, loss=loss, metrics=[metric])
        model.summary()

#### Fine tuning the model

In [None]:
if not skip:

    # Start logging for TensorBoard
    %tensorboard --logdir {model_fine_tuning_logs_path.as_posix()}

In [None]:
if not skip:

    # Reduced learning speed
    learning_rate_callback = LearningRateExpDecayScheduler(params['learning_rate_decay_rate'])

    # Stop training if loss on validation data stops decreasing
    if initial_epoch == 0:
        earlystop_callback = EarlyStoppingAtBestMetric(
            model=model,
            metric=metric,
            mode=mode,
            patience=params['patience'],
        )
    else:
        earlystop_callback = EarlyStoppingAtBestMetric(
            model=model,
            metric=metric,
            mode=mode,
            patience=params['patience'],
            best_epoch=best_epoch,
            best_loss=best_loss,
            best_metric=best_metric,
            best_weights=best_weights,
            wait=wait
        )

    # Displaying training graphs in TensorBoard
    tensorboard_callback = callbacks.TensorBoard(
        log_dir=model_fine_tuning_logs_path,
        write_graph=True,
        update_freq="epoch",
    )

    # Progress Bar Update at the End of an Era
    def epoch_end(epoch, logs):
        t.set_description(
            f"#{epoch+1}"
        )
        t.update()

    epoch_end_callback = callbacks.LambdaCallback(on_epoch_end=epoch_end)

    # Train the model
    with tqdm(range(params['epochs']), initial=initial_epoch, desc='', unit='epoch') as t:
        history = model.fit(
            train_dataset,
            initial_epoch=initial_epoch,
            epochs=final_epoch,
            verbose=VERBOSE,
            callbacks=[learning_rate_callback, earlystop_callback, tensorboard_callback, epoch_end_callback],
        )

        # Save the latest model
        model.save(model_fine_tuning_models_path / 'last_model.keras')

        # Keep the weights of the best model
        model.set_weights(earlystop_callback.best_weights)
        model.save(model_fine_tuning_models_path / 'best_model.keras')

#### Saving detailed information about the learning process

In [None]:
if not skip:

    history_df = pd.DataFrame(history.history, index=pd.Index(history.epoch, name='epoch'))
    history_df.rename(columns={metric: 'metric'}, inplace=True)
    if initial_epoch == 0:
        processing = history_df
    else:
        processing = pd.concat([processing, history_df], axis=0)
    pipeline.save_stage_processing(processing)
    shutil.make_archive(gd_proj_path / model_fine_tuning_path.name, 'zip', model_fine_tuning_path)

#### Saving results

In [None]:
if not skip:

    if (final_epoch == params['epochs']) or model.stop_training:
        model_fine_tuning.loc[(base_model_name, model_on_top_config_str), 'best_epoch'] = earlystop_callback.best_epoch
        model_fine_tuning['best_epoch'] = model_fine_tuning['best_epoch'].astype(int)
        model_fine_tuning.loc[(base_model_name, model_on_top_config_str), 'best_loss'] = earlystop_callback.best_loss
        model_fine_tuning.loc[(base_model_name, model_on_top_config_str), 'best_metric'] = earlystop_callback.best_metric
        pipeline.save_stage_result(model_fine_tuning)
        # Save the final model
        final_model = models.Sequential(model.layers[1:])
        final_model.trainable = False
        final_model.save(gd_proj_path / 'final_model.keras', include_optimizer=False)

#### Committing completion of iteration/stage

In [None]:
if not skip:

    if (final_epoch == params['epochs']) or model.stop_training:
        # Training completed
        pipeline.complete_stage()
    else:
        # Another iteration completed
        pipeline.complete_stage_run()

#### Loading results from Google Drive (done when step is skipped)

In [None]:
if skip:

    if pipeline.is_stage_complete:
        model_fine_tuning = pipeline.load_stage_result().set_index(['base_model_name', 'model_on_top_config'])

#### Output of results

In [None]:
if pipeline.is_stage_complete:
    display(model_fine_tuning)

Unnamed: 0_level_0,Unnamed: 1_level_0,best_epoch,best_loss,best_metric
base_model_name,model_on_top_config,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
EfficientNetV2B0,"(0.0, 1024)",46,1.134255,0.591624


### Testing the model's operation

In [None]:
pipeline.next_stage()
skip = pipeline.is_stage_skipped
params = pipeline.stage_params
test_prediction_path = Path(params['path'])

#### Deleting existing prediction

In [None]:
if not skip:

    shutil.rmtree(test_prediction_path, ignore_errors=True)
    test_prediction_path.mkdir()

In [None]:
if not skip:

    test_dataset_path = Path(TEST_DATASET_PATH)
    if not test_dataset_path.exists():
        gdown.cached_download(
            url=TEST_DATASET_URL,
            path=f'temp.{TEST_DATASET_EXT}',
            postprocess=gdown.extractall,
            fuzzy=True)
        Path(f'temp.{TEST_DATASET_EXT}').unlink()

#### Reading a test dataset of face images from the archive on Google Drive

In [None]:
if not skip:

    if not test_faces_dataset_path.exists():
        test_faces_dataset_path.mkdir()
        shutil.unpack_archive(
            gd_proj_path / test_faces_dataset_path.with_suffix('.zip').name,
            test_faces_dataset_path
        )

#### Getting a list of image files in the test dataset

In [None]:
if not skip:

    file_paths = [Path(file_path).relative_to(test_faces_dataset_path).as_posix()
                  for file_path in utils.image_dataset_from_directory(test_faces_dataset_path, label_mode=None, shuffle=False, batch_size=1).file_paths]

Found 5000 files.


#### Creating a process table

In [None]:
if not skip:

    if isinstance(EMOTIONS, (list, tuple)):
        processing = pd.DataFrame(
            columns=[
                'image_path',
                'probabilities',
                'label',
                'emotion'
            ]
        )
    else:
        processing = pd.DataFrame(
            columns=[
                'image_path',
                'valence',
                'arousal',
                'errors',
                'label',
                'emotion',
            ]
        )
    processing['image_path'] = file_paths
    processing.set_index('image_path', inplace=True)

#### Creating a results table

In [None]:
if not skip:

    model_test = pd.DataFrame(
        columns=[
            'submission',
            'public_score',
            'private_score',
            'score',
        ]
    )
    model_test.set_index('submission', inplace=True)

#### Loading the final model

In [None]:
if not skip:

    model = models.load_model(gd_proj_path / 'final_model.keras', compile=False, safe_mode=False)

  saveable.load_own_variables(weights_store.get(inner_path))


#### Exporting the final model for deployment

In [None]:
if not skip:

    model.export(gd_proj_path / 'final_model')

Saved artifact at '/content/drive/MyDrive/skillbox-computer-vision-project/final_model'. The following endpoints are available:

* Endpoint 'serve'
  args_0 (POSITIONAL_ONLY): TensorSpec(shape=(None, 224, 224, 3), dtype=tf.float32, name='input_layer')
Output Type:
  TensorSpec(shape=(None, 9), dtype=tf.float32, name=None)
Captures:
  139320894714320: TensorSpec(shape=(1, 1, 1, 3), dtype=tf.float32, name=None)
  139320894716048: TensorSpec(shape=(1, 1, 1, 3), dtype=tf.float32, name=None)
  139320894714128: TensorSpec(shape=(), dtype=tf.resource, name=None)
  139320894711632: TensorSpec(shape=(), dtype=tf.resource, name=None)
  139320894715472: TensorSpec(shape=(), dtype=tf.resource, name=None)
  139320894715664: TensorSpec(shape=(), dtype=tf.resource, name=None)
  139320894709328: TensorSpec(shape=(), dtype=tf.resource, name=None)
  139320894713552: TensorSpec(shape=(), dtype=tf.resource, name=None)
  139320894710288: TensorSpec(shape=(), dtype=tf.resource, name=None)
  139320894709136:

#### Creating test dataset

In [None]:
if not skip:

    test_dataset = utils.image_dataset_from_directory(
        test_faces_dataset_path,
        label_mode=None,
        shuffle=False,
        batch_size=params['batch_size'],
        image_size=(image_size, image_size),
        verbose=VERBOSE
    )
    # Initialize the pre-production of the next batch
    test_dataset = test_dataset.prefetch(buffer_size=params['buffer_size'])

Found 5000 files.


#### Getting test predictions

In [None]:
if not skip:

    predicts = model.predict(test_dataset, verbose=VERBOSE)
    if isinstance(EMOTIONS, (list, tuple)):
        labels = predicts.argmax(axis=1)
        processing['probabilities'] = predicts.tolist()
    else:
        errors = np.apply_along_axis(lambda a: np.linalg.norm(a - np.array(list(EMOTIONS.values())), axis=1), arr=predicts, axis=1)
        labels = errors.argmin(axis=1)
        processing[['valence', 'arousal']] = predicts.tolist()
        processing['errors'] = errors.tolist()
    processing['label'] = labels.tolist()
    processing['emotion'] = [list(EMOTIONS)[label] for label in labels]

[1m157/157[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m44s[0m 107ms/step


#### Getting score from Kaggle

In [None]:
if not skip:

    file_path = test_prediction_path / 'submission.csv'
    processing[['emotion']].to_csv(file_path, index=True)
    kaggle = Kaggle()
    submission = f'test_{base_model_name}_{model_on_top_config_str}'.replace(', ', '_').replace('(', '').replace(')', '')
    kaggle.send_submission_files(descriptions=[submission], file_paths=[file_path])
    sleep(10)
    scores = kaggle.receive_submission_scores(descriptions=[submission]).loc[0, ['publicScore', 'privateScore']].convert_dtypes()
    scores['meanScore'] = (scores['publicScore'] + scores['privateScore']) / 2

Sended file model_test/submission.csv of submission test_EfficientNetV2B0_0.0_1024 to competition skillbox-computer-vision-project.
Received scores of submissions test_EfficientNetV2B0_0.0_1024 from competition skillbox-computer-vision-project.
         fileName                    date                     description  \
0  submission.csv 2025-04-06 15:19:59.697  test_EfficientNetV2B0_0.0_1024   

                      status  publicScore  privateScore  
0  SubmissionStatus.COMPLETE       0.5092          0.52  


#### Saving predictions in an archive on Google Drive

In [None]:
if not skip:

    pipeline.save_stage_processing(processing)
    shutil.make_archive(gd_proj_path / test_prediction_path.name, 'zip', test_prediction_path)

#### Saving test results to Google Drive

In [None]:
if not skip:

    model_test.loc[submission] = scores.to_list()
    pipeline.save_stage_result(model_test)

#### Fixing the completion of the stage

In [None]:
if not skip:

    pipeline.complete_stage()

#### Loading test results from Google Drive (done when step is skipped)

In [None]:
if skip:

    if pipeline.is_stage_complete:
        model_test = pipeline.load_stage_result().set_index('submission')

#### Output of results

In [None]:
if pipeline.is_stage_complete:

    display(model_test)

Unnamed: 0_level_0,public_score,private_score,score
submission,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
test_EfficientNetV2B0_0.0_1024,0.5092,0.52,0.5146


### Pipeline Execution Report

In [None]:
display(pipeline.report)

Unnamed: 0_level_0,params,platform,start_time,update_time,state
stage,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
model_on_top_selection,"{'path': 'model_on_top_selection', 'batch_size...",colab,2025-03-31 12:03:24.512772,2025-03-31 15:10:55.404819,complete
model_fine_tuning,"{'path': 'model_fine_tuning', 'flip': 'horizon...",colab,2025-03-31 15:50:49.324256,2025-04-01 23:50:51.941296,complete
model_test,"{'path': 'model_test', 'batch_size': 32, 'buff...",colab,2025-04-06 15:18:54.527570,2025-04-06 15:20:14.312456,complete
