<a name="Table-of-Contents"></a>
# Table of Contents
1. [Introduction](#Introduction)
    1. [Outcomes](#Outcomes)
    1. [Performance](#Performance)
    1. [Network Architecture](#Network-Architecture)
    1. [Setup](#Setup)
    1. [Public Datasets Supported](#Public-Datasets-Supported)
    1. [System Info](#System-Info)
1. [Settings](#Settings)
1. [Datasets Importing](#Datasets-Importing)
    1. [Dataset Organization Utils](#Dataset-Organization-Utils)
    1. [Generate Datasets CSV Metadata](#Generate-Datasets-CSV-Metadata)
1. [Define Skin Detector](#Define-Skin-Detector)
    1. [Architecture](#Architecture)
    1. [Model Settings](#Model-Settings)
    1. [Model Functions](#Model-Functions)
1. [Use Skin Detector](#Use-Skin-Detector)
    1. [Train](#Train)
    1. [Test](#Test)
    1. [Predict](#Predict)

<a name="Introduction"></a>
# Introduction
[[Return to ToC]](#Table-of-Contents)

Detecting human skin using a U-Net.  

#### Original Paper
T. Tarasiewicz, J. Nalepa, and M. Kawulok. “Skinny: A Lightweight U-net for Skin Detection and Segmentation”. In: 2020 IEEE International Conference on Image Processing (ICIP). IEEE. 2020, pp. 2386–2390. https://doi.org/10.1109/ICIP40778.2020.9191209.

#### Credits
Credits to the authors of the original version: 
https://github.com/ttarasiewicz/Skinny


<a name="Outcomes"></a>
### Outcomes
[[Return to ToC]](#Table-of-Contents)

![Outcomes](docs/outcomes.png "Outcomes")
> Significant outcomes: (a) the input image; (b) the ground truth; (c) Skinny's binarized prediction.  
Predictions have different dimensions than other images due to the network preprocessing.  
Input images are from ECU, HGR, and Schmugge datasets.
Various models have been used to detect skin pixels.


These are some significant outcomes (so not representative; for the skin detector performance see [Performance](#Performance))
that shows how well the skin detector performs given the right training dataset,
but also its limitations.  
Skin Detection is a challenging task because of materials with a skin-like colors (wood, copper, leather, clay),
conditions that modify an image appearance (lighting, camera color science, motion blur),
and the wide range of skin tones that the human skin may assume.  

The first, third, and the last three rows represent some of the challenges described.  
The U-Net has the capability of extracting features from images, hence it can detect that there are no skin pixels
in the fifth row, despite it can contain pixels with skin-like color. 

An in-depth analysis of outcomes can be seen in the thesis.


<a name="Performance"></a>
### Performance
[[Return to ToC]](#Table-of-Contents)

Apart from the Validation process, which uses the original paper methodologies, the metrics are calculated as follows.  
Initially, the metrics are measured for all the instances, then the average and population standard
deviation for each metric are computed.

Again, an in-depth analysis of performance can be seen in the thesis.

#### Validation
Before the evaluation process on the chosen datasets, the skin detector has been validated on the datasets splits used in its original paper. In this way, it has been possible to check their proper functioning. The original paper calculates the F1-Score directly as the average score over all the set of instances.

|             | HGR<sup>1</sup> F<sub>1</sub>-Score | ECU<sup>2</sup> F<sub>1</sub>-Score |
| ---:            | :---:              | :---:  |
| Original        | 0.9494             | 0.9230 |
| Implementation  | 0.9308             | 0.9133 |
| | |
| Change          | 0.0186             | 0.0097 |
> <sup>1</sup>HGR consists of: HGR1, HGR2A-downscaled, HGR2B-downscaled.  
<sup>2</sup>ECU was split accordingly to the original work of the method.  
The model was trained on the ECU splits; HGR has not been used for training.  
The testing was performed on the test set of ECU and the entirety of HGR.

#### Performance on single databases
For each dataset: the skin detector is trained on the training set, and then predictions are performed on the test set.  
For example, with ECU as the dataset, it means that the skin detector is trained using the training set of ECU, and then tested on the test set of ECU.

|             | ECU | HGR | Schmugge |
| ---:            | :---:              | :---:  | :---: |
| F<sub>1</sub> ↑       | 0.9133 ± 0.08 | 0.9848 ± 0.02 | 0.6121 ± 0.45 |
| IoU ↑                 | 0.8489 ± 0.12 | 0.9705 ± 0.03 | 0.5850 ± 0.44 |
| D<sub>prs</sub> ↓     | 0.1333 ± 0.12 | 0.0251 ± 0.03 | 0.5520 ± 0.64 |

#### Performance across databases
For each dataset: the skin detector is trained on the training set, and then predictions are performed on all the images of every other datasets.  
For example, with ECU as the training dataset and HGR as the testing dataset, it means
that the skin detector is trained using the training set of ECU, and then tested on all the HGR
dataset.  
The expression “HGR on ECU”
describes the situation in which the evaluation is performed by using HGR as the training set and ECU as the test set.

|             | ECU_on_HGR | ECU_on_Schmugge | HGR_on_ECU | HGR_on_Schmugge | Schmugge_on_ECU | Schmugge_on_HGR |
| ---: | :---: | :---: | :---: | :---: | :---: | :---: |
| F<sub>1</sub> ↑       | 0.9308 ± 0.11 | 0.4625 ± 0.41 | 0.7252 ± 0.20 | 0.2918 ± 0.31 | 0.6133 ± 0.21 | 0.8106 ± 0.19 |
| IoU ↑                 | 0.8851 ± 0.15 | 0.3986 ± 0.37 | 0.6038 ± 0.22 | 0.2168 ± 0.25 | 0.4754 ± 0.22 | 0.7191 ± 0.23 |
| D<sub>prs</sub> ↓     | 0.1098 ± 0.15 | 0.7570 ± 0.56 | 0.3913 ± 0.26 | 0.9695 ± 0.44 | 0.5537 ± 0.27 | 0.2846 ± 0.27 |
| F<sub>1</sub> - IoU ↓ | 0.0457 | 0.0639 | 0.1214 | 0.0750 | 0.1379 | 0.0915 |

#### Performance on single skin tones
The methodology is the same as of 'Performance on single databases', but skin tones datasets are involved instead.

|             | DARK | MEDIUM | LIGHT
| ---:            | :---:              | :---:  | :---: |
| F<sub>1</sub> ↑       | 0.9529 ± 0.00 | 0.9260 ± 0.15 | 0.9387 ± 0.12 |
| IoU ↑                 | 0.9100 ± 0.01 | 0.8883 ± 0.18 | 0.9006 ± 0.14 |
| D<sub>prs</sub> ↓     | 0.0720 ± 0.01 | 0.1078 ± 0.21 | 0.0926 ± 0.15 |

#### Performance across skin tones
The methodology is the same as of 'Performance across databases', but skin tones datasets are involved instead.

|             | DARK_on_MEDIUM | DARK_on_LIGHT | MEDIUM_on_DARK | MEDIUM_on_LIGHT | LIGHT_on_DARK | LIGHT_on_MEDIUM |
| ---: | :---: | :---: | :---: | :---: | :---: | :---: |
| F<sub>1</sub> ↑       | 0.7300 ± 0.25 | 0.7262 ± 0.26 | 0.8447 ± 0.13 | 0.8904 ± 0.14 | 0.7660 ± 0.17 | 0.9229 ± 0.11 |
| IoU ↑                 | 0.6279 ± 0.27 | 0.6276 ± 0.28 | 0.7486 ± 0.15 | 0.8214 ± 0.16 | 0.6496 ± 0.21 | 0.8705 ± 0.13 |
| D<sub>prs</sub> ↓     | 0.3805 ± 0.33 | 0.3934 ± 0.34 | 0.2326 ± 0.17 | 0.1692 ± 0.18 | 0.3402 ± 0.21 | 0.1192 ± 0.16 |
| F<sub>1</sub> - IoU ↓ | 0.1021 | 0.0986 | 0.0961 | 0.0690 | 0.1164 | 0.0524 |


<a name="Skin-Detection-Algorithm"></a>
### Network Architecture
[[Return to ToC]](#Table-of-Contents)


![Skinny Architecture](docs/skinny_architecture.png "Skinny Architecture")
> The architecture of Skinny. Adapted from the original paper (Tarasiewicz et al. 2020) 

The Skinny network consists of a modified U-Net incorporating dense
blocks and inception modules to benefit from a wider spatial context.  
An additional deep level is appended to the original U-Net model,
to better capture large-scale contextual features in the deepest
part of the network.  
The features extracted in the contracting path propagate to the
corresponding expansive levels through the dense blocks.  
The original U-Net convolutional layers are replaced with
the inception modules: before each max-pooling layer, in the contracting path,
and after concatenating features, in the expanding path.  
Thanks to these architectural choices, Skinny benefits from a wider pixel context.


<a name="Setup"></a>
### Setup
[[Return to ToC]](#Table-of-Contents)

Things to Note:  
The notebook has only been used in Google Colab, so it may need some modifications to use locally.  
Pre-defined splits can be found in the folders of provided models.

1. Zip datasets without adding any intermediate folder and upload them into Google Drive
2. Upload any pre-trained model into Google Drive
3. Change paths of models and datasets in the [Settings](#Settings) section  
4. Change paths of pre-defined splits in the [Generate Datasets CSV Metadata](#Generate-Datasets-CSV-Metadata) section 
5. To use the skin detector, run all code till the [Use Skin Detector](#Use-Skin-Detector) section and change final settings, which are the few first lines of each function of the skin detector  


<a name="Public-Datasets-Supported"></a>
### Public Datasets Supported
[[Return to ToC]](#Table-of-Contents)

[ecu]: https://documents.uow.edu.au/~phung/download.html "ECU download page"
[hgr]: http://sun.aei.polsl.pl/~mkawulok/gestures/ "HGR download page"
[schmugge]: https://www.researchgate.net/publication/257620282_skin_image_Data_set_with_ground_truth "Schmugge download page"
[pratheepan]: http://cs-chan.com/downloads_skin_dataset.html "Pratheepan download page"
[abd]: https://github.com/MRE-Lab-UMD/abd-skin-segmentation "abd-skin download page"
[vpu]: http://www-vpu.eps.uam.es/publications/SkinDetDM/#dataset "VPU download page"
[uchile]: http://web.archive.org/web/20070707151628/http://agami.die.uchile.cl/skindiff/ "UChile download page"

| Name            |  Description                                               | Download |
| ---:            | :---:                                                      | :---: |
| ECU [1]         | 3998 pictures, mostly face and half-body shots             | [Download (ask the authors)][ecu] |
| HGR [2]         | 1558 hand gesture images                                   | [Download][hgr] |
| Schmugge [3]    | 845 images, mostly face shots                              | [Download][schmugge] |
| Pratheepan [4]  | 78 pictures randomly sampled from the web                  | [Download][pratheepan] |
| abd [5]         | 1400 abdominal pictures                                    | [Download][abd] |
| VPU [6]         | 285 human activity recognition images                      | [Download][vpu] |
| UChile [7]      | 101 images obtained from the web and digitized news videos | [Download][uchile] |


| Ref   | Publication |
| :---  | :--- |
| 1     | Phung, S., Bouzerdoum, A., & Chai, D. (2005). Skin segmentation using color pixel classification: analysis and comparison. IEEE Transactions on Pattern Analysis and Machine Intelligence, 27(1), 148-154. https://doi.org/10.1109/tpami.2005.17  |
| 2 | Kawulok, M., Kawulok, J., Nalepa, J., & Smolka, B. (2014). Self-adaptive algorithm for segmenting skin regions. EURASIP Journal on Advances in Signal Processing, 2014(1). https://doi.org/10.1186/1687-6180-2014-170 |
| 3 | Schmugge, S. J., Jayaram, S., Shin, M. C., & Tsap, L. V. (2007). Objective evaluation of approaches of skin detection using ROC analysis. Computer Vision and Image Understanding, 108(1-2), 41-51. https://doi.org/10.1016/j.cviu.2006.10.009 |
| 4 | Tan, W. R., Chan, C. S., Yogarajah, P., & Condell, J. (2012). A Fusion Approach for Efficient Human Skin Detection. IEEE Transactions on Industrial Informatics, 8(1), 138-147. https://doi.org/10.1109/tii.2011.2172451 |
| 5 | Topiwala, A., Al-Zogbi, L., Fleiter, T., & Krieger, A. (2019). Adaptation and Evaluation of Deep Learning Techniques for Skin Segmentation on Novel Abdominal Dataset. 2019 IEEE 19th International Conference on Bioinformatics and Bioengineering (BIBE). https://doi.org/10.1109/bibe.2019.00141 |
| 6 | SanMiguel, J. C., & Suja, S. (2013). Skin detection by dual maximization of detectors agreement for video monitoring. Pattern Recognition Letters, 34(16), 2102-2109. https://doi.org/10.1016/j.patrec.2013.07.016 |
| 7 | J. Ruiz-del-Solar and R. Verschae. “SKINDIFF-Robust and fast skin segmentation”. Department of Electrical Engineering, Universidad de Chile, 2006. |


<a name="System-Info"></a>
### System Info
[[Return to ToC]](#Table-of-Contents)

Check Python, Tensorflow, CUDA versions

In [None]:
!python --version
!python3 -c 'import tensorflow as tf; print(tf.__version__)'
!nvcc -V

Python 3.7.10
2021-05-23 19:18:24.481383: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudart.so.11.0
2.4.1
nvcc: NVIDIA (R) Cuda compiler driver
Copyright (c) 2005-2020 NVIDIA Corporation
Built on Wed_Jul_22_19:09:09_PDT_2020
Cuda compilation tools, release 11.0, V11.0.221
Build cuda_11.0_bu.TC445_37.28845127_0


<a name="Settings"></a>
# Settings
[[Return to ToC]](#Table-of-Contents)

In [1]:
#  Pre-trained models paths
models = {}
models['ecu'] = 'drive/MyDrive/training/skinny/checkpoint-20210428-155148/saved_model.ckpt/saved_model.pb'
models['schmugge'] = 'drive/MyDrive/training/skinny/checkpoint-20210505-225202/saved_model.ckpt/saved_model.pb'
models['hgr'] = 'drive/MyDrive/training/skinny/checkpoint-20210512-220723/saved_model.ckpt/saved_model.pb'
models['dark'] = 'drive/MyDrive/training/skinny/checkpoint-20210523-110554/saved_model.ckpt/saved_model.pb'
models['medium'] = 'drive/MyDrive/training/skinny/checkpoint-20210523-112308/saved_model.ckpt/saved_model.pb'
models['light'] = 'drive/MyDrive/training/skinny/checkpoint-20210523-122027/saved_model.ckpt/saved_model.pb'

#  Datasets paths
datasets = {}
datasets['_imports'] = 'drive/MyDrive/datasets/imports/dataset_imports.zip'
datasets['ecu'] = 'drive/MyDrive/datasets/fullbody/ECU.zip'
datasets['schmugge'] = 'drive/MyDrive/datasets/face/Schmugge.zip'
datasets['hgr'] = 'drive/MyDrive/datasets/hand/HGR_small.zip'
datasets['schmugge_darkaug'] = 'drive/MyDrive/schm/Schmugge.zip'
datasets['vpu'] = 'drive/MyDrive/datasets/fullbody/VDM.zip'
datasets['uchile'] = 'drive/MyDrive/datasets/fullbody/Uchile.zip'
datasets['pratheepan'] = 'drive/MyDrive/datasets/fullbody/Pratheepan.zip'
datasets['abd'] = 'drive/MyDrive/datasets/abdomen/abd-skin.zip'

#  Datasets zips to import and extract to folders
#  no need to include "imports" as it is always unzipped
to_import = ['ecu', 'hgr', 'schmugge', 'pratheepan']

#  Valid skintone values
skintones = ['light', 'medium', 'dark']

<a name="Datasets-Importing"></a>
# Datasets Importing
[[Return to ToC]](#Table-of-Contents)

Mount Google Drive

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


Extract the dataset(s)

In [None]:
!rm -rf dataset

_imports = datasets['_imports']

#  Always unzip "imports" as it is necessary to import other datasets
!unzip $_imports -d dataset

for db in to_import:
    if db == 'schmugge_darkaug':
        !rm -rf dataset/Schmugge
    current_db = datasets[db]
    !unzip $current_db -d dataset

<a name="Dataset-Organization-Utils"></a>
### Dataset Organization Utils
[[Return to ToC]](#Table-of-Contents)

General Utils

In [4]:
import os, re, sys, json, traceback
import cv2
import imghdr
import numpy as np
from random import shuffle
from math import floor

#  Remember that Pratheepan dataset has one file with comma in its filename
csv_sep = '?'


def get_timestamp() -> str:
    return time.strftime("%Y%m%d-%H%M%S")

def read_csv(csv_file) -> list:
    '''Return the multi-column content of the dataset CSV file'''
    file_content = None
    try: #  may fail on accessing CSV file: eg. file does not exist
        file = open(csv_file)
        file_content = file.read().splitlines() #  multi-column file
        file.close()
    except Exception:
        print(traceback.format_exc())
        print('Error on accessing ' + csv_file)
        exit()
    return file_content

def split_csv_fields(row: str) -> list:
    return row.split(csv_sep)

def to_csv_row(*args) -> str:
    row = args[0]
    for item in args[1:]:
        row += csv_sep
        row += item
    row += '\n'
    return row

def match_rows(csv_file: str, targets: list, target_column: int) -> list:
    '''Return all rows matching targets in the given column'''
    csv_content = read_csv(csv_file)
    data = [x for x in csv_content if split_csv_fields(x)[target_column] in targets]
    return data

def csv_full_test(csv_file: str, count_: int = -1):
    '''Overwrite the first count_ note attributes of CSV rows as test notes'''
    file_content = read_csv(csv_file)
    with open(csv_file, 'w') as out: #  rewrite csv file
        for i, entry in enumerate(file_content):
            csv_fields = split_csv_fields(entry)
            note = 'te'
            if count_ != -1 and i >= count_: #  count is disabled if == -1
                    note = 'tr'
            csv_fields[2] = note
            out.write(to_csv_row(*csv_fields))

def csv_not_test(csv_file: str):
    '''All the rows with note "te" become "tr", all the rows with note != "te", become "te"'''
    file_content = read_csv(csv_file)
    with open(csv_file, 'w') as out: #  rewrite csv file
        for entry in file_content:
            csv_fields = split_csv_fields(entry)
            nt = csv_fields[2]
            note = 'tr' if nt == 'te' else 'te'
            csv_fields[2] = note
            out.write(to_csv_row(*csv_fields))

def get_training_and_testing_sets(file_list: list, split: float = 0.7):
    print(file_list)
    split_index = floor(len(file_list) * split)
    training = file_list[:split_index]
    testing = file_list[split_index:]
    return training, testing

def get_variable_filename(filename: str, format: str) -> str:
    '''Get the variable part of a filename into a dataset'''
    if format == '':
        return filename

    #  debug:
    #  re.fullmatch(r'^img(.*)$', 'imgED (1)').group(1)
    #  re.fullmatch(r'^(.*)-m$', 'att-massu.jpg-m').group(1)
    match =  re.fullmatch('^{}$'.format(format), filename)
    if match:
        return match.group(1)
    else:
        #print('Cannot match {} with pattern {}'.format(filename, format))  #  debug
        return None

def is_image(path: str) -> bool:
    return os.path.isfile(path) and imghdr.what(path) != None

Datasets importing utils

In [5]:
def analyze_dataset(gt: str, ori: str, root_dir: str, note: str = 'nd',
                    gt_filename_format: str = '', ori_filename_format: str = '') -> None:
    '''Create CSV file containing dataset metadata (such as paths of images)'''
    out_analysis_filename = 'data.csv'
    out_file = os.path.join(root_dir, out_analysis_filename)
    
    #  Number of images found
    i = 0

    #  Append to data file
    with open(out_file, 'a') as out:
        for gt_file in os.listdir(gt):
            gt_path = os.path.join(gt, gt_file)

            #  Check if current file is an image (avoid issues with files like thumbs.db)
            if is_image(gt_path):
                matched = False
                gt_name, gt_e = os.path.splitext(gt_file)
                gt_identifier = get_variable_filename(gt_name, gt_filename_format)

                if gt_identifier == None:
                    continue
                
                for ori_file in os.listdir(ori):
                    ori_path = os.path.join(ori, ori_file)
                    ori_name, ori_e = os.path.splitext(ori_file)
                    ori_identifier = get_variable_filename(ori_name, ori_filename_format)
                    
                    if ori_identifier == None:
                        continue
                    
                    #  Try to find a match (original image - gt)
                    if gt_identifier == ori_identifier:
                        out.write(to_csv_row(ori_path, gt_path, note))
                        i += 1
                        matched = True
                        break
                
                if not matched:
                    print(f'No matches found for {gt_identifier}')
            else:
                print(f'File {gt_path} is not an image')
        
        print(f"Found {i} images")

# Perform image-processing on a directory content
# 
# Processing Pipeline example:
#   "png,skin=255_255_255,invert"
#   skin=.. Skin-based binarization rule:
#           pixels of whatever is not skin will be set black; skin pixels will be set white
#   bg=..   Background-based binarization rule:
#           pixels of whatever is not background will be set white; background pixels will be set black
#   png     Convert the image to PNG format
# Processing operations are performed in order!
def process_images(data_dir: str, process_pipeline: str, out_dir = '', im_filename_format: str = '') -> str:
    #  Loop all files in the directory
    for im_basename in os.listdir(data_dir):
        im_path = os.path.join(data_dir, im_basename)
        im_filename, im_e = os.path.splitext(im_basename)

        #  Check if current file is an image (avoid issues with files like thumbs.db)
        if is_image(im_path):
            if out_dir == '':
                out_dir = os.path.join(data_dir, 'processed')

            os.makedirs(out_dir, exist_ok=True)

            im_identifier = get_variable_filename(im_filename, im_filename_format)
            if im_identifier == None:
                continue

            #  Load image
            im = cv2.imread(im_path)

            #  Prepare path for out image
            im_path = os.path.join(out_dir, im_basename)

            for operation in process_pipeline.split(','):
                #  Binarize
                if operation.startswith('skin') or operation.startswith('bg'):
                    #  inspired from https://stackoverflow.com/a/53989391
                    bgr_data = operation.split('=')[1]
                    b,g,r = [int(i) for i in bgr_data.split('_')]
                    lower_val = (b, g, r)
                    upper_val = lower_val

                    #  If 'skin': catch only skin pixels via thresholding
                    #  If 'bg':   catch only background pixels via thresholding
                    mask = cv2.inRange(im, lower_val, upper_val)
                    im = mask if operation.startswith('skin') else cv2.bitwise_not(mask)
                #  Invert image
                elif operation == 'invert':
                    im = cv2.bitwise_not(im)
                #  Convert to png
                elif operation == 'png':
                    im_path = os.path.join(out_dir, im_filename + '.png')
                #  Reload image
                elif operation == 'reload':
                    im = cv2.imread(im_path)
                else:
                    print(f'Image processing operation unknown: {operation}')
            
            #  Save processing 
            cv2.imwrite(im_path, im)
    return out_dir

def randomize_split(csv_file: str):
    file3c = read_csv(csv_file)

    #  randomize
    shuffle(file3c)

    #  calculate splits
    #  70% train, 15% val, 15% test
    train_files, test_files = get_training_and_testing_sets(file3c)
    test_files, val_files = get_training_and_testing_sets(test_files, split=.5)

    #  rewrite csv file
    with open(csv_file, 'w') as out:
        for entry in file3c:
            csv_fields = split_csv_fields(entry)

            if entry in val_files:
                csv_fields[2] = 'va'
            elif entry in test_files:
                csv_fields[2] = 'te'
            else:
                csv_fields[2] = 'tr'

            out.write(to_csv_row(*csv_fields))

def import_split(csv_file: str, single_col_file: str, outfile: str,
                 note: str, gtf = '', orif = '', inf = '') -> None:
    '''Import the CSV from a 1 column format split'''
    file_content = read_csv(csv_file)
    
    #  Read single column file lines
    file1c = open(single_col_file)
    singles = file1c.read().splitlines()
    file1c.close()

    #  Create the new split file as csv
    with open(os.path.join(outfile), 'w') as out:
        i = 0
        for entry in file_content: #  oriname.ext, gtname.ext, te/tr/va
            csv_fields = split_csv_fields(entry)
            ori_path = csv_fields[0]
            gt_path = csv_fields[1]
            note_old = csv_fields[2]
            ori_name, ori_ext = os.path.splitext(os.path.basename(ori_path))
            gt_name, gt_ext = os.path.splitext(os.path.basename(gt_path))

            ori_identifier = get_variable_filename(ori_name, orif)
            gt_identifier = get_variable_filename(gt_name, gtf)

            for line in singles: #  imgname
                line_name, line_ext = os.path.splitext(line)
                in_identifier = get_variable_filename(line_name, inf)

                if ori_identifier == in_identifier or gt_identifier == in_identifier:
                    note_old = note
                    i += 1
                    print(f'Match found: {ori_identifier}\|{gt_identifier} - {in_identifier}')
                    break
            
            out.write(to_csv_row(ori_path, gt_path, note_old))
        
        print(f'''Converted {i}/{len(singles)} lines.\n
        Source file: {single_col_file}\n
        Target file: {outfile}''')

def import_dataset(import_json: str) -> None:
    '''Import dataset and generate metadata'''
    if os.path.exists(import_json):
        with open(import_json, 'r') as stream:
            data = json.load(stream)

            #  Load JSON values
            gt = data['gt']
            ori = data['ori']
            root = data['root']
            note = data['note']
            gt_format = data['gtf']
            ori_format = data['orif']
            ori_process = data['oriprocess']
            ori_process_out = data['oriprocessout']
            gt_process = data['gtprocess']
            gt_process_out = data['gtprocessout']
            
            #  Non-Defined as default note
            if not note:
                note = 'nd'
            
            #  Check if processing is required
            if ori_process:
                ori = process_images(ori, ori_process, ori_process_out, ori_format)
            if gt_process:
                gt = process_images(gt, gt_process, gt_process_out, gt_format)
            
            #  Analyze the dataset and create the csv files
            analyze_dataset(gt, ori, root,
                            note, gt_format, ori_format)
    else:
        print("JSON import file does not exist!")

Schmugge utils

In [6]:
def read_schmugge(skin_im_manager_path: str, images_dir: str) -> list:
    '''From schmugge custom config (.config.SkinImManager) to a list of dict structure'''
    sch = []
    
    #  images with gt errors, aa69 is also duplicated in the config file
    blacklist = ['aa50.gt.d3.pgm', 'aa69.gt.d3.pgm', 'dd71.gt.d3.pgm', 'hh54.gt.d3.pgm']

    with open(skin_im_manager_path) as f:
        start = 0
        i = 0
        tmp = {}
        for line in f:
            blacklisted = False

            #  skip first 2 lines
            if start < 2:
                start += 1
                continue
            
            #print(f'{line}\t{i}') #  debug
            if line: #  line not empty
                line = line.rstrip() #  remove End Of Line (\n)

                if i == 2: #  skin tone type
                    skin_type = int(line)
                    if skin_type == 0:
                        tmp['skintone'] = 'light'
                    elif skin_type == 1:
                        tmp['skintone'] = 'medium'
                    elif skin_type == 2:
                        tmp['skintone'] = 'dark'
                    else:
                        tmp['skintone'] = 'nd'
                elif i == 3: #  db type
                    tmp['db'] = line
                elif i == 8: #  ori
                    tmp['ori'] = os.path.join(images_dir, line)
                elif i == 9: #  gt
                    tmp['gt'] = os.path.join(images_dir, line)
                    if line in blacklist:
                        blacklisted = True
                
                #  update image counter
                i += 1
                if i == 10: #  10 lines read, prepare for next image data
                    if not blacklisted:
                        sch.append(tmp)
                    tmp = {}
                    i = 0
    
    print(f'Schmugge custom config read correctly, found {len(sch)} images')
    return sch

def process_schmugge(sch: list, outfile: str, train = 70, test = 15, val = 15,
                     ori_out_dir = 'new_ori', gt_out_dir = 'new_gt'):
    '''From schmugge list of dicts structure to csv file and processed images'''
    #  Prepare new ori and gt dirs
    os.makedirs(ori_out_dir, exist_ok=True)
    os.makedirs(gt_out_dir, exist_ok=True)

    with open(outfile, 'w') as out:
        #  Randomize
        shuffle(sch)

        #  70% train, 15% val, 15% test
        train_files, test_files = get_training_and_testing_sets(sch)
        test_files, val_files = get_training_and_testing_sets(test_files, split=.5)

        for entry in sch:
            db = int(entry['db'])
            ori_path = entry['ori']
            gt_path = entry['gt']
            
            ori_basename = os.path.basename(ori_path)
            gt_basename = os.path.basename(gt_path)
            ori_filename, ori_e = os.path.splitext(ori_basename)
            gt_filename, gt_e = os.path.splitext(gt_basename)

            #  Process images
            
            #  load images
            ori_im = cv2.imread(ori_path)
            gt_im = cv2.imread(gt_path)
            #  png
            ori_out = os.path.join(ori_out_dir, ori_filename + '.png')
            gt_out = os.path.join(gt_out_dir, gt_filename + '.png')
            #  binarize gt: whatever isn't background, is skin
            if db == 4 or db == 3: #  Uchile/UW: white background
                b = 255
                g = 255
                r = 255
                lower_val = (b, g, r)
                upper_val = lower_val
                #  Threshold the image to get only selected colors
                mask = cv2.inRange(gt_im, lower_val, upper_val)
                #cv2_imshow(mask) #  debug
                #  what isn't bg is white
                sk = cv2.bitwise_not(mask)
                gt_im = sk
            else: #  background = 180,180,180
                b = 180
                g = 180
                r = 180
                lower_val = (b, g, r)
                upper_val = lower_val
                #  Threshold the image to get only selected colors
                mask = cv2.inRange(gt_im, lower_val, upper_val)
                #cv2_imshow(mask) #  debug
                #  what isn't bg is white
                sk = cv2.bitwise_not(mask)
                gt_im = sk
            
            #  Save processing 
            cv2.imwrite(ori_out, ori_im)
            cv2.imwrite(gt_out, gt_im)

            skintone = entry['skintone']
            note = 'te'
            if entry in train_files:
                note = 'tr'
            elif entry in val_files:
                note = 'va'
            
            out.write(to_csv_row(ori_out, gt_out, note, skintone))

def csv_skintone_filter(csv_file: str, skintone: str, mode = 'train', val_percent = .15, test_percent = .15):
    '''Generate random splits for the given skintone. Overwrite Schmugge CSV file'''
    #  read the images CSV
    file3c = read_csv(csv_file)

    #  randomize
    shuffle(file3c)

    #  calculate splits length
    totalsk = csv_skintone_count(csv_file, skintone) #  total items to train/val/test on
    totalva = round(totalsk * val_percent)
    totalte = round(totalsk * test_percent)
    jva = 0
    jte = 0

    #  rewrite csv file
    with open(csv_file, 'w') as out:
        for entry in file3c:
            csv_fields = split_csv_fields(entry)
            ori_path = csv_fields[0]
            gt_path = csv_fields[1]
            skint = csv_fields[3]

            if skint != skintone: #  should not be filtered
                note = 'nd'
            else: #  should be in the filter
                if mode == 'train': #  if it is a training filter
                    if jva < totalva: #  there are still places left to be in validation set
                        note = 'va'
                        jva += 1
                    elif jte < totalte: #  there are still places left to be in test set
                        note = 'te'
                        jte += 1
                    else: #  no more validation places to sit in, go in train set
                        note = 'tr'
                else: #  if it is a testing filter, just place them all in test set
                    note = 'te'
            out.write(to_csv_row(ori_path, gt_path, note, skint))

def csv_skintone_count(csv_file: str, skintone: str):
    '''Print the total amount of items of the given skintone'''
    data = match_rows(csv_file, (skintone), 3)
    data_len = len(data)
    # print('\n'.join(data))  #  debug
    print(f"Found {data_len} items of type {skintone}")
    return data_len

def csv_note_count(csv_file: str, mode: str):
    '''Print the total amount of items of the given mode; "train" mode also includes validation'''
    targets = ('tr', 'va') if mode == 'train' else ('te')
    data = match_rows(csv_file, (targets), 2)
    data_len = len(data)
    # print('\n'.join(data))  #  debug
    print(f"Found {data_len} items of type {' '.join(targets)}")
    return data_len

def gen_sch_by_skintone(skintone: str, mode: str):
    '''
    Re-import and re-process Schmugge, and generate random splits for the given skintone
    
    mode can either be "train" or "test"
    
    eg. usage:
    gen_sch_by_skintone('dark', 'train')
    gen_sch_by_skintone('light', 'train')
    gen_sch_by_skintone('medium', 'test')
    '''
    sch_csv = 'dataset/Schmugge/data.csv'

    #  re-import Schmugge
    schm = read_schmugge('dataset/Schmugge/data/.config.SkinImManager', 'dataset/Schmugge/data/data')
    process_schmugge(schm, sch_csv, ori_out_dir='dataset/Schmugge/newdata/ori', gt_out_dir='dataset/Schmugge/newdata/gt')

    csv_skintone_filter(sch_csv, skintone, mode = mode)
    csv_skintone_count(sch_csv, skintone)
    csv_note_count(sch_csv, mode)

<a name="Generate-Datasets-CSV-Metadata"></a>
### Generate Datasets CSV Metadata
[[Return to ToC]](#Table-of-Contents)

In [None]:
#  ECU has no native splits; Skinny splits will be used as default
if 'ecu' in to_import:
    import_dataset("dataset/import_ecu.json")

#  Uchile has no native splits; randomize
if 'uchile' in to_import:
    import_dataset("dataset/import_uchile.json")
    randomize_split('dataset/Pratheepan/data.csv')

#  HGR is composed of 3 sub datasets
#  has no native splits; mine will be used as default
if 'hgr' in to_import:
    import_dataset("dataset/import_hgr1.json")
    import_dataset("dataset/import_hgr2a.json")
    import_dataset("dataset/import_hgr2b.json")

#  Pratheepan is composed of 2 sub datasets
#  but has no native splits; randomize
if 'pratheepan' in to_import:
    import_dataset("dataset/import_pratheepanface.json")
    import_dataset("dataset/import_pratheepanfamily.json")
    randomize_split('dataset/Pratheepan/data.csv')

#  abd has native train/test splits
if 'abd' in to_import:
    import_dataset("dataset/import_abd_te.json")
    import_dataset("dataset/import_abd_tr.json")

#  VPU is composed of 5 sub datasets with native train/test splits
if 'vpu' in to_import:
    import_dataset("dataset/import_ami_te.json")
    import_dataset("dataset/import_ami_tr.json")
    import_dataset("dataset/import_ed_te.json")
    import_dataset("dataset/import_ed_tr.json")
    import_dataset("dataset/import_liris_te.json")
    import_dataset("dataset/import_liris_tr.json")
    import_dataset("dataset/import_ssg_te.json")
    import_dataset("dataset/import_ssg_tr.json")
    import_dataset("dataset/import_ut_te.json")
    import_dataset("dataset/import_ut_tr.json")

#  Schmugge has really diverse filename formats but has a custom data manager file included
#  has no native splits; mine will be used as default
if 'schmugge' in to_import:
    schm = read_schmugge('dataset/Schmugge/data/.config.SkinImManager', 'dataset/Schmugge/data/data')
    process_schmugge(schm, 'dataset/Schmugge/data.csv',
                     ori_out_dir='dataset/Schmugge/newdata/ori', gt_out_dir='dataset/Schmugge/newdata/gt')

Import Splits

In [None]:
# Import ECU splits from the Skinny paper

# unzip archive
!unzip -j drive/MyDrive/datasets/sets/ECU_Skinny.zip -d dataset/ECU/

# import splits
import_split('dataset/ECU/data.csv', 'dataset/ECU/train.txt',
             'dataset/ECU/data.csv', 'tr')
import_split('dataset/ECU/data.csv', 'dataset/ECU/test.txt',
            'dataset/ECU/data.csv', 'te')
import_split('dataset/ECU/data.csv', 'dataset/ECU/val.txt',
            'dataset/ECU/data.csv', 'va')


# Import my HGR and Schmugge splits
sch_from = 'drive/MyDrive/training/skinny/checkpoint-20210505-225202/schmugge_datacsv_model.csv'
hgr_from = 'drive/MyDrive/training/skinny/checkpoint-20210512-220723/HGR_data.csv'
sch_to = 'dataset/Schmugge/data.csv'
hgr_to = 'dataset/HGR_small/data.csv'

!rm $sch_to
!rm $hgr_to

!cp $sch_from $sch_to
!cp $hgr_from $hgr_to

<a name="Define-Skin-Detector"></a>
# Define Skin Detector
[[Return to ToC]](#Table-of-Contents)

<a name="Architecture"></a>
### Architecture
[[Return to ToC]](#Table-of-Contents)

Python Imports

In [9]:
import os
import time
from abc import abstractmethod
from xml.etree import ElementTree as ET
from typing import Callable, Any

import tensorflow as tf
import tensorflow.keras as keras
import tensorflow.keras.layers as layers
from tensorflow.keras.utils import plot_model
from tensorflow.keras.models import load_model
from tensorflow.keras import backend as K

import numpy as np

import cv2
import json

from google.colab.patches import cv2_imshow

Define model

In [10]:
def inception_module(prev_layer, filters: int, activation=layers.LeakyReLU):
    filters = filters // 4
    conv_1 = layers.Conv2D(filters, (1, 1), padding='same')(prev_layer)
    conv_1 = activation()(conv_1)
    conv_3 = layers.Conv2D(filters, (1, 1), padding='same')(prev_layer)
    conv_3 = layers.Conv2D(filters, (3, 3), padding='same')(conv_3)
    conv_3 = activation()(conv_3)
    conv_5 = layers.Conv2D(filters, (1, 1), padding='same')(prev_layer)
    conv_5 = layers.Conv2D(filters, (5, 5), padding='same')(conv_5)
    conv_5 = activation()(conv_5)
    max_pool = layers.MaxPool2D(padding='same', strides=(1, 1))(prev_layer)
    max_pool = layers.Conv2D(filters, (1, 1), padding='same')(max_pool)
    max_pool = activation()(max_pool)
    return tf.concat([conv_1, conv_3, conv_5, max_pool], axis=-1)

def dense_block(prev_layer, filters: int, kernel_size: int or tuple, activation=layers.LeakyReLU):
    dense_1 = layers.Conv2D(filters // 2, kernel_size, padding='same')(prev_layer)
    dense_1 = layers.BatchNormalization()(dense_1)
    dense_1 = activation()(dense_1)
    dense_2 = layers.Conv2D(filters // 4, kernel_size, padding='same')(dense_1)
    dense_2 = layers.BatchNormalization()(dense_2)
    dense_2 = activation()(dense_2)
    dense_3 = layers.Conv2D(filters // 8, kernel_size, padding='same')(dense_2)
    dense_3 = layers.BatchNormalization()(dense_3)
    dense_3 = activation()(dense_3)
    return tf.concat([dense_1, dense_2, dense_3, prev_layer], axis=-1)

def get_filters_count(level: int, initial_filters: int) -> int:
    return 2**(level-1)*initial_filters


class Model:
    def __init__(self, levels: int = None, initial_filters: int = None,
                 image_channels: int = None, log_dir: str = 'logs',
                 load_checkpoint: bool = False, checkpoint_extension: str = 'ckpt',
                 model_name: str = None) -> None:
        self.levels = levels
        self.initial_filters = initial_filters
        self.image_channels = image_channels
        self.keras_model = None
        self.log_dir = log_dir
        self.checkpoint_extension = checkpoint_extension
        if model_name is not None:
            self.name = model_name
        self.load_checkpoint = load_checkpoint
        if not load_checkpoint and os.path.isdir(self.get_logdir()):
            self.name = f"{self.name}_{get_timestamp()}"

    def get_model(self) -> keras.Model:
        path = os.path.join(self.log_dir, self.name, 'checkpoint', f'saved_model.{self.checkpoint_extension}')
        
        if self.load_checkpoint and self.checkpoint_extension != 'ckpt': #  load full model
            self.keras_model = load_model(path, compile=False)
            return self.keras_model

        self.keras_model = self.create_model()
        self.__change_model_name()
        self.__plot_model(self.keras_model)

        if self.load_checkpoint: #  load weights
            try:
                self.keras_model.load_weights(path)
            except Exception as e:
                raise e
        return self.keras_model

    def __plot_model(self, model: keras.Model) -> None:
        plot_model(model, to_file=os.path.join(self.log_dir, self.name, 'model.png'), show_shapes=True)

    def __change_model_name(self) -> None:
        if self.name is not None and self.keras_model is not None:
            self.keras_model._name = self.name

    def get_logdir(self) -> str:
        return os.path.join(self.log_dir, self.name)

    @abstractmethod
    def create_model(self) -> keras.Model:
        pass


class Skinny(Model):
    name = "Skinny"

    def create_model(self) -> keras.Model:
        self.levels += 1
        kernel_size = (3, 3)
        layers_list = [None for _ in range(self.levels)]
        layers_list[0] = layers.Input(shape=(None, None, self.image_channels), name='feature')
        activation = layers.LeakyReLU

        for i in range(1, self.levels):
            filters = get_filters_count(i, self.initial_filters)
            prev = i-1
            if i != 1:
                layers_list[i] = layers.MaxPool2D()(layers_list[prev])
                prev = i

            layers_list[i] = layers.Conv2D(filters, kernel_size, padding='same')(layers_list[prev])
            layers_list[i] = layers.BatchNormalization()(layers_list[i])
            layers_list[i] = activation()(layers_list[i])
            layers_list[i] = inception_module(layers_list[i], filters, activation)

        for i in range(self.levels-2, 0, -1):
            filters = get_filters_count(i, self.initial_filters)
            layers_list[i+1] = layers.UpSampling2D()(layers_list[i+1])
            layers_list[i+1] = layers.Conv2D(filters, kernel_size, padding='same')(layers_list[i+1])
            layers_list[i+1] = layers.BatchNormalization()(layers_list[i+1])
            layers_list[i+1] = activation()(layers_list[i+1])

            layers_list[i] = dense_block(layers_list[i], filters, kernel_size, activation)

            layers_list[i] = tf.concat([layers_list[i+1], layers_list[i]], axis=-1)
            layers_list[i] = inception_module(layers_list[i], filters, activation)

        layers_list[1] = layers.Conv2D(self.initial_filters, kernel_size, padding='same')(layers_list[1])
        layers_list[1] = activation()(layers_list[1])
        layers_list[1] = layers.Conv2D(self.initial_filters//2, kernel_size, padding='same')(layers_list[1])
        layers_list[1] = activation()(layers_list[1])
        layers_list[1] = layers.Conv2D(1, kernel_size, padding='same',
                                       activation='sigmoid', name='label')(layers_list[1])

        model = keras.Model(inputs=[layers_list[0]], outputs=[layers_list[1]], name=self.name)
        return model

Define PreProcessor

In [11]:
class Preprocessor:
    def __init__(self):
        self.operations = [] #  Preprocessor has no operations by default
    
    def downscale(self, max_pixel_count):
        '''Add a downscale operation to the preprocessing list of operations. Return self'''
        def downscale_operation(data):
            '''Downscale the images composed of more pixels than max_pixel_count preserving the aspect ratio'''
            for k, v in data.items():
                tensor_shape = tf.cast(tf.shape(v), tf.float32)
                coefficient = max_pixel_count / (tensor_shape[0] * tensor_shape[1])
                coefficient = tf.math.sqrt(coefficient)
                data[k] = tf.cond(coefficient >= 1.0, lambda: v,
                                  lambda: tf.image.resize(v, [tf.cast(tensor_shape[0] * coefficient, tf.uint16),
                                                              tf.cast(tensor_shape[1] * coefficient, tf.uint16)]))
            return data

        self.operations.append(downscale_operation)
        return self
    
    def cast(self, dtype):
        '''Add a cast operation to the preprocessing list of operations. Return self'''
        def cast_operation(data):
            '''Cast the images data into the given dtype'''
            for k, v in data.items():
                data[k] = tf.cast(v, dtype)
            return data

        self.operations.append(cast_operation)
        return self
    
    def normalize(self):
        '''Add a normalize operation to the preprocessing list of operations. Return self'''
        def normalize_operation(data):
            '''Transform the images data from uint8(range 0-255) into floats(range 0-1)'''
            for k, v in data.items():
                data[k] = v / 255.0
            return data

        self.operations.append(normalize_operation)
        return self
    
    def pad(self, network_levels):
        '''Add a padding operation to the preprocessing list of operations. Return self'''
        number_multiple = 2**(network_levels-1)
        def padding_operation(data):
            '''Add padding to the bottom and right sides of the images'''
            for k, v in data.items():
                tensor_shape = tf.shape(v)
                data[k] = tf.pad(v, [[0, number_multiple - tensor_shape[0] % number_multiple],
                                     [0,  number_multiple - tensor_shape[1] % number_multiple],
                                     [0, 0]])
            return data

        self.operations.append(padding_operation)
        return self
    
    def add_to_graph(self, dataset) -> tf.data.Dataset:
        '''
        Execute all the operation functions defined in the Preprocessor instance
        on the given Dataset object.
        Return the transformed Dataset object
        '''
        for operation in self.operations:
            dataset = dataset.map(operation) #  map apply one function on every element of the Dataset
        return dataset

Define DataLoader

In [12]:
class DataLoader(object):
    buffer_size = tf.data.experimental.AUTOTUNE

    def __init__(self, dataset_dir: str, batch_size: int, preprocessor: Preprocessor = None):
        self.dataset_dir = dataset_dir
        self.batch_size = batch_size
        self.val_dataset = None
        self.train_dataset = None
        self.test_dataset = None
        self.preprocessor = Preprocessor() if preprocessor is None else preprocessor

    @property
    def train_dataset(self):
        return self.__batch_and_prefetch(self.__train_dataset)

    @train_dataset.setter
    def train_dataset(self, dataset: tf.data.Dataset):
        self.__train_dataset = dataset

    @property
    def test_dataset(self):
        return self.__batch_and_prefetch(self.__test_dataset)

    @test_dataset.setter
    def test_dataset(self, dataset: tf.data.Dataset):
        self.__test_dataset = dataset

    @property
    def val_dataset(self):
        return self.__batch_and_prefetch(self.__val_dataset)

    @val_dataset.setter
    def val_dataset(self, dataset: tf.data.Dataset):
        self.__val_dataset = dataset

    @property
    def preprocessor(self) -> Preprocessor:
        return self.__preprocessor

    @preprocessor.setter
    def preprocessor(self, preprocessor: Preprocessor):
        self.__preprocessor = preprocessor
        self.__reinstantiate()

    def __reinstantiate(self):
        self.train_dataset = self.__create_dataset_pipeline('tr')
        self.val_dataset = self.__create_dataset_pipeline('va', shuffle=False)
        self.test_dataset = self.__create_dataset_pipeline('te', shuffle=False)

    def __batch_and_prefetch(self, dataset: tf.data.Dataset) -> tf.data.Dataset:
        return dataset.\
            padded_batch(self.batch_size, padded_shapes=({'feature': [None, None, 3]}, {'label': [None, None, 1]})).\
            prefetch(buffer_size=self.buffer_size)
    
    def __get_subset_paths(self, subset: str) -> list:
        # Read the images CSV (ori_image_filename.ext, gt_image_filename.ext)
        file = open(os.path.join(self.dataset_dir, 'data.csv'))
        file3c = file.read().splitlines()
        file.close()
        files = []

        for entry in file3c:
            csv_fields = split_csv_fields(entry)
            ori_path = csv_fields[0]
            gt_path = csv_fields[1]
            note = csv_fields[2]
            
            if note == subset:
                files.append((ori_path, gt_path))
        
        if len(files) == 0:
            print(f'''No files found for subset {subset}!\n
            using the whole dataset instead''')
            for entry in file3c:
                csv_fields = split_csv_fields(entry)
                ori_path = csv_fields[0]
                gt_path = csv_fields[1]
                
                files.append((ori_path, gt_path))
        else:
            print(f'Found {subset} split of {len(files)} files')
        
        return files

    def __create_dataset_pipeline(self, subset: str, shuffle: bool = True) -> tf.data.Dataset:
        def process_example_paths(example):
            return {'feature': tf.io.decode_image(tf.io.read_file(example[0]), channels=3, expand_animations = False),
                    'label': tf.io.decode_image(tf.io.read_file(example[1]), channels=1,  expand_animations = False)}

        def convert_to_in_out_dicts(example):
            output_dict = {'label': example.pop('label')}
            return example, output_dict

        dataset = self.__get_subset_paths(subset)
        dataset = tf.data.Dataset.from_tensor_slices(dataset)
        dataset = dataset.map(process_example_paths)
        dataset = self.preprocessor.add_to_graph(dataset)
        dataset = dataset.map(convert_to_in_out_dicts).cache()
        if shuffle:
            dataset = dataset.shuffle(2000, reshuffle_each_iteration=True)
        return dataset

In [13]:
#  Custom loader used for predictions


def my_batch(dataset: tf.data.Dataset, batch_size) -> tf.data.Dataset:
    return dataset.\
        padded_batch(batch_size, padded_shapes=({'feature': [None, None, 3]})).\
        prefetch(buffer_size=tf.data.experimental.AUTOTUNE)

def my_file(path: str) -> list:
    files = [path]
    return files

def my_loader(path, preprocessor) -> tf.data.Dataset:
    def my_process(example):
        return {'feature': tf.io.decode_image(tf.io.read_file(example), channels=3, expand_animations = False)}

    dataset = my_file(path)
    dataset = tf.data.Dataset.from_tensor_slices(dataset)
    dataset = dataset.map(my_process)

    t_start = time.time()
    #  preprocessing
    dataset = preprocessor.add_to_graph(dataset)
    t_elapsed = time.time() - t_start

    dataset = dataset.cache()
    return dataset, t_elapsed

def single_predict(model, im_path: str, out_path: str, preprocessor):
    os.makedirs(os.path.dirname(out_path), exist_ok = True)

    #  Get tf Dataset structure containing the image to predict and elapsed preprocessing time
    tf_ds, t_elapsed_pre = my_loader(im_path, preprocessor)

    #  Predict the image
    for entry in tf_ds:
        #  Convert to tensor to prevent memory leak https://stackoverflow.com/a/64765018
        tensor = tf.convert_to_tensor(entry['feature'], dtype=tf.float32)
        tensor = tf.expand_dims(tensor, axis=0) #  add a dimension

        model.get_model()

        #  Get time before prediction
        t_start = time.time()

        #  Predict from feature image (X)
        pred = model.keras_model.predict(tensor)
        #  post-processing
        pred = pred[0]*255 #  reshape and de-preprocess
        
        #  prediction + postprocessing
        t_elapsed = time.time() - t_start
        #  preprocessing + prediction + postprocessing elapsed time
        t_elapsed_full = t_elapsed_pre + t_elapsed

        # Save to a file
        cv2.imwrite(out_path, pred)

        print(t_elapsed_pre)
        print(t_elapsed)
        print(t_elapsed_full)

Define Trainer

In [14]:
class Trainer:

    def __init__(self, data_loader: DataLoader, model: Model,
                 log_dir: str = './logs', evaluate_test_data=False):
        self.data_loader = data_loader
        self.model = model
        self.metrics = []
        self.losses = []
        self.callbacks = []
        self.log_dir = os.path.join(log_dir, model.name)
        self.timelog = None
        self.evaluate_test_data = evaluate_test_data

    @property
    def model(self) -> Model:
        return self.__model

    @model.setter
    def model(self, model: Model):
        self.__model = model

    @property
    def data_loader(self) -> DataLoader:
        return self.__data_loader

    @data_loader.setter
    def data_loader(self, data_loader: DataLoader):
        self.__data_loader = data_loader

    def add_metrics(self, metrics):
        if type(metrics) is not list:
            metrics = [metrics]
        for metric in metrics:
            self.metrics.append(metric)

    def add_losses(self, losses) -> None:
        if type(losses) is not list:
            losses = [losses]
        for loss in losses:
            self.losses.append(loss)

    def add_callbacks(self, callbacks) -> None:
        if type(callbacks) is not list:
            callbacks = [callbacks]
        for callback in callbacks:
            self.callbacks.append(callback)

    def combined_loss(self):
        def loss(y_true, y_pred):
            result = None
            for i, v in enumerate(self.losses):
                if i == 0:
                    result = v(y_true, y_pred)
                else:
                    result += v(y_true, y_pred)
            return result
        return loss

    def __log_evaluation_metrics(self, metrics: dict):
        root = ET.Element('metrics')
        tree = ET.ElementTree(root)
        for name, value in metrics.items():
            metric_element = ET.SubElement(root, name)
            metric_element.text = str(value)
        tree.write(open(os.path.join(self.model.get_logdir(), 'test_metrics.xml'), 'w'), encoding='unicode')

    def train(self, epochs, optimizer, initial_epoch=0, verbose=1):
        assert self.model is not None, "Model hasn't been set for the trainer."
        assert self.data_loader is not None, "DataLoader hasn't been set for the trainer."
        os.makedirs(self.model.get_logdir(), exist_ok=True)
        model = self.model.get_model()
        model.compile(optimizer=optimizer, loss=self.combined_loss(), metrics=self.metrics)

        model.fit(self.data_loader.train_dataset, validation_data=self.data_loader.val_dataset,
                  epochs=epochs, verbose=verbose, initial_epoch=initial_epoch,
                  callbacks=self.callbacks, shuffle=True)
        if self.evaluate_test_data:
            evaluation_metrics = model.evaluate(self.data_loader.test_dataset, verbose=1)
            evaluation_metrics = dict(zip(model.metrics_names, evaluation_metrics))
            self.__log_evaluation_metrics(evaluation_metrics)

Define WorkScheduler

In [15]:
class WorkScheduler:
    def __init__(self) -> None:
        self.data = []

    def add_data(self, model: Model,
                 func: Callable[[Model, Any], None],
                 **kwargs) -> None:
        self.data.append((model, func, kwargs))

    def do_work(self) -> None:
        for model, func, kwargs in self.data:
            tf.keras.backend.clear_session()
            tf.compat.v1.reset_default_graph()
            func(model=model, **kwargs)

Define losses

In [16]:
def dice_loss(y_true, y_pred, epsilon=1e-15):
    numerator = 2 * tf.reduce_sum(y_true * y_pred, axis=(1, 2, 3))
    denominator = tf.reduce_sum(y_true + y_pred, axis=(1, 2, 3))
    loss = tf.squeeze(tf.reshape(1 - numerator / denominator, (-1, 1, 1)))
    return loss

Define metrics

In [17]:
def recall(y_true, y_pred):
    y_pred = K.round(y_pred)
    true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
    possible_positives = K.sum(K.round(K.clip(y_true, 0, 1)))
    recall_score = true_positives / (possible_positives + K.epsilon())
    return recall_score

def precision(y_true, y_pred):
    y_pred = K.round(y_pred)
    true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
    predicted_positives = K.sum(K.round(K.clip(y_pred, 0, 1)))
    precision_score = true_positives / (predicted_positives + K.epsilon())
    return precision_score

def f1(y_true, y_pred):
    precision_score = precision(y_true, y_pred)
    recall_score = recall(y_true, y_pred)
    return 2 * ((precision_score * recall_score) / (precision_score + recall_score + K.epsilon()))

def iou(y_true, y_pred):
    y_pred = K.round(y_pred)
    intersection = y_true * y_pred
    not_true = 1 - y_true
    union = y_true + (not_true * y_pred)

    return (K.sum(intersection, axis=-1) + K.epsilon()) / (K.sum(union, axis=-1) + K.epsilon())

Define callbacks

In [18]:
class CustomCallback(keras.callbacks.Callback):
    pass

class ModelCheckpoint(keras.callbacks.ModelCheckpoint, CustomCallback):
    checkpoint_name = 'saved_model.ckpt'

    def set_model(self, model):
        dir_name = os.path.join(self.filepath, 'checkpoint')
        os.makedirs(dir_name, exist_ok=True)
        timestr = get_timestamp()
        self.filepath = os.path.join(F"/content/drive/MyDrive/training/skinny/checkpoint-" + timestr, self.checkpoint_name )
        #self.filepath = os.path.join(F"/content/drive/MyDrive/training/skinny/dark/checkpoint-" + timestr, self.checkpoint_name )
        super().set_model(model)

class ReduceLROnPlateau(keras.callbacks.ReduceLROnPlateau, CustomCallback):
    pass

class ProgbarLogger(keras.callbacks.ProgbarLogger, CustomCallback):
    pass

class EarlyStopping(keras.callbacks.EarlyStopping, CustomCallback):
    pass

class TensorBoard(keras.callbacks.TensorBoard, CustomCallback):
    def set_model(self, model):
        self.log_dir = os.path.join(self.log_dir, 'tensorboard')
        os.makedirs(self.log_dir, exist_ok=True)
        super().set_model(model)

<a name="Model-Settings"></a>
### Model Settings
[[Return to ToC]](#Table-of-Contents)

In [None]:
#  Use the full Skinny model with inception and dense blocks
model_name = 'Skinny'

#  Model settings
levels = 6
initial_filters = 19
image_channels = 3

#  Train settings
max_epochs = 200
initial_lr = 1e-4
batch_size = 3
patience = 10 #  use 50 for training on dark skintones

#  Log Dir
log_dir = 'logs'

#  Preprocessing operations
preprocessor = Preprocessor()
preprocessor.cast(dtype=tf.float32).normalize().downscale(max_pixel_count=512**2).pad(network_levels=levels)

<a name="Model-Functions"></a>
### Model Functions
[[Return to ToC]](#Table-of-Contents)

Train, Test, Predict functions

In [20]:
def train_function(model: Model, batch_size: int, dataset_dir: str, patience: int) -> None:
    data_loader = DataLoader(dataset_dir=dataset_dir, batch_size=batch_size, preprocessor=preprocessor)
    trainer = Trainer(data_loader=data_loader, model=model, evaluate_test_data=True)
    trainer.add_losses([K.binary_crossentropy, dice_loss])
    trainer.add_metrics([
        f1,
        iou,
        precision,
        recall
    ])
    trainer.add_callbacks([
        ModelCheckpoint(filepath=model.get_logdir(), verbose=1, save_best_only=True,
                                  monitor='val_f1', mode='max', save_weight_only=False),#save_weights_only=True),
        ReduceLROnPlateau(monitor='val_f1', factor=0.5, verbose=1, mode='max', min_lr=1e-6, patience=5),
        EarlyStopping(monitor='val_f1', mode='max', patience=patience, verbose=1),
        TensorBoard(log_dir=model.get_logdir(), histogram_freq=5)
    ])

    trainer.train(max_epochs, tf.keras.optimizers.Adam(learning_rate=initial_lr), verbose=1)

def test_function(model: Model, dataset_dir: str) -> None:
    data_loader = DataLoader(dataset_dir=dataset_dir, batch_size=1, preprocessor=preprocessor)
    trainer = Trainer(data_loader=data_loader, model=model, evaluate_test_data=True)
    trainer.add_losses([K.binary_crossentropy, dice_loss])
    trainer.add_metrics([
        f1,
        iou,
        precision,
        recall
    ])

    model.get_model()
    optimizer = tf.keras.optimizers.Adam(learning_rate=initial_lr)
    model.keras_model.compile(optimizer=optimizer, loss=trainer.combined_loss(), metrics=trainer.metrics)
    evaluation_metrics = model.keras_model.evaluate(data_loader.test_dataset, verbose=1)
    evaluation_metrics = dict(zip(model.keras_model.metrics_names, evaluation_metrics))
    print(evaluation_metrics)

def save_x(model: Model, dataset_dir: str, preprocessor: Preprocessor, out_dir: str = 'x', skip = 0) -> None:
    '''
    Save Skinny X pre-processed images (512**2) to files
    Save the images after the pre-processing, before they enter the model
    
    Image names are defined by a counter, which starts at skip
    '''
    data_loader = DataLoader(dataset_dir=dataset_dir, batch_size=1, preprocessor=preprocessor)

    os.makedirs(out_dir, exist_ok = True)

    i = skip
    for entry in data_loader.test_dataset:
        i += 1

        entry = entry[0]          #  dict: {'feature': data, dtype=float32}
        entry = entry['feature']  #  shape=(1, 320, 384, 1) dtype=float32
        entry = entry[0]*255      #  reshape(320, 384, 3) and de-preprocess
        
        #  Convert to numpy array or cv2.imwrite doesn't work
        entry = np.array(entry)
        #  cv2 works with BGR, not RGB
        entry = cv2.cvtColor(entry, cv2.COLOR_RGB2BGR)

        #  Save to a file
        filename = f"{i}.png"
        cv2.imwrite(os.path.join(out_dir, filename), entry)
        #tf.keras.preprocessing.image.save_img(os.path.join(out_dir, filename), entry)
        # https://stackoverflow.com/a/61041738

def save_y(model: Model, dataset_dir: str, preprocessor: Preprocessor, out_dir: str = 'y', skip = 0) -> None:
    '''
    Save Skinny Y pre-processed-images (512**2) to files
    Save the images after the pre-processing, before they enter the model
    
    Image names are defined by a counter, which starts at skip
    '''
    data_loader = DataLoader(dataset_dir=dataset_dir, batch_size=1, preprocessor=preprocessor)

    os.makedirs(out_dir, exist_ok = True)

    i = skip
    for entry in data_loader.test_dataset:
        i += 1

        entry = entry[1]        #  dict: {'label': data, dtype=float32}
        entry = entry['label']  #  shape=(1, 320, 384, 1) dtype=float32
        entry = entry[0]*255    #  reshape(320, 384, 1) and de-preprocess
        
        # Convert to numpy array or cv2.imwrite doesn't work
        entry = np.array(entry)

        # save to a file
        filename = f"{i}.png"
        cv2.imwrite(os.path.join(out_dir, filename), entry)

def predict_function(model: Model, dataset_dir: str, out_dir: str, skip = 0) -> None:
    data_loader = DataLoader(dataset_dir=dataset_dir, batch_size=1, preprocessor=preprocessor)
    model.get_model()
    os.makedirs(out_dir, exist_ok = True)

    #  Predict on test images
    i = skip
    for entry in data_loader.test_dataset:
        i += 1
        entry = entry[0]  #  dict: {'feature': data, 'types': float32}

        #  Convert to tensor to prevent memory leak https://stackoverflow.com/a/64765018
        tensor = tf.convert_to_tensor(entry['feature'], dtype=tf.float32)
        #  Predict from feature image (X)
        #pred = model.keras_model.predict(tensor) # predict from feature image (X)
        pred = model.keras_model(tensor)
        #print(pred)  #  debug

        pred = pred[0]*255 #  reshape and de-preprocess
        #print(pred)  #  debug
        pred = pred.numpy()

        #  Save to a file
        filename = f"{i}.png"
        cv2.imwrite(os.path.join(out_dir, filename), pred)

Models loading

In [21]:
def load_checkpoint(checkpoint_filepath):
    '''Import a model's files to set it as current model'''
    out = None
    ext = os.path.splitext(checkpoint_filepath)[1]
    from_path = os.path.join(os.path.dirname(checkpoint_filepath), '.')

    if ext == '.chkp':
        to_path = 'logs/Skinny/checkpoint'
        out = 'chkp'
    elif ext == '.pb':
        to_path = 'logs/Skinny/checkpoint/saved_model.pb'
        out = 'pb'
    else:
        print(f'Unknown model filetype: {ext}')
    
    !rm -rf logs/Skinny/checkpoint  #  Clear checkpoints
    !mkdir -p $to_path              #  Make default model folder
    !cp -r $from_path $to_path      #  Copy the model files
    
    return out

def load_schmugge_skintone_split(skintone):
    '''Replace Schmugge CSV file with a predefined skintone split'''
    assert skintone in skintones, f'Invalid skintone: {skintone}'
    !rm dataset/Schmugge/data.csv
    
    if skintone == 'dark':
        !cp drive/MyDrive/training/skinny/checkpoint-20210523-110554/dark2305_1309.csv dataset/Schmugge/data.csv
    elif skintone == 'medium':
        !cp drive/MyDrive/training/skinny/checkpoint-20210523-112308/medium2305_1323.csv dataset/Schmugge/data.csv
    elif skintone == 'light':
        !cp drive/MyDrive/training/skinny/checkpoint-20210523-122027/light2305_1420.csv dataset/Schmugge/data.csv
    
    print(f'{skintone}(sch) split imported!')

<a name="Use-Skin-Detector"></a>
# Use Skin Detector
[[Return to ToC]](#Table-of-Contents)

<a name="Train"></a>
### Train
[[Return to ToC]](#Table-of-Contents)

In [None]:
dataset_dir = 'dataset/Pratheepan'


#mod = Skinny(levels, initial_filters, image_channels, log_dir, load_checkpoint=True, model_name=model_name)
mod = Skinny(levels, initial_filters, image_channels, log_dir)  # creates new model

scheduler = WorkScheduler()
scheduler.add_data(mod, train_function, batch_size = batch_size, dataset_dir = dataset_dir, patience = patience)
scheduler.do_work()

<a name="Test"></a>
### Test
[[Return to ToC]](#Table-of-Contents)

In [None]:
chkp_ext = load_checkpoint(models['schmugge'])
#chkp_ext = load_checkpoint('drive/MyDrive/training/skinny/checkpoint-20220320-183637/saved_model.ckpt/saved_model.pb')
dataset_dir = 'dataset/ECU'


mod = Skinny(levels, initial_filters, image_channels, log_dir, load_checkpoint=True,
             model_name=model_name, checkpoint_extension=chkp_ext)

scheduler = WorkScheduler()
scheduler.add_data(mod, test_function, dataset_dir = dataset_dir)
scheduler.do_work()

<a name="Predict"></a>
### Predict
[[Return to ToC]](#Table-of-Contents)

Utils functions

In [22]:
def pred_dirs(out_dir: str) -> list:
    x_dir = os.path.join(out_dir, 'x')
    y_dir = os.path.join(out_dir, 'y')
    pred_dir = os.path.join(out_dir, 'p')
    return x_dir, y_dir, pred_dir

def schedule_predict(model, dataset_dir: str, out_dir: str, skip: int = 0):
    x_dir, y_dir, pred_dir = pred_dirs(out_dir)
    scheduler = WorkScheduler()
    scheduler.add_data(None, save_x, dataset_dir = dataset_dir, out_dir = x_dir, preprocessor = preprocessor, skip = skip)
    scheduler.add_data(None, save_y, dataset_dir = dataset_dir, out_dir = y_dir,  preprocessor = preprocessor, skip = skip)
    scheduler.add_data(model, predict_function, dataset_dir = dataset_dir, out_dir = pred_dir, skip = skip)
    scheduler.do_work()

def cross_predict(train_db, predict_db, timestr = None, save = False):
    '''Perform a cross prediction of a normal model over a normal dataset'''
    is_ecu = False
    if predict_db == 'ecu':
        predict_db = 'dataset/ECU'
        is_ecu = True
    elif predict_db == 'hgr':
        predict_db = 'dataset/HGR_small'
    elif predict_db == 'schmugge':
        predict_db = 'dataset/Schmugge'
    
    if timestr == None:
        timestr = get_timestamp()

    #  Set the whole dataset as the testing set
    whole_test = os.path.join(predict_db, 'data.csv') #  dataset to process
    
    #  limit ecu to 2000 predictions or ram crashes, do it 2 times
    if is_ecu:
        csv_full_test(whole_test, 2000)
    else:
        csv_full_test(whole_test)

    #  Load model files
    chkp_ext = load_checkpoint(models[train_db])
    mod = Skinny(levels, initial_filters, image_channels, log_dir, load_checkpoint=True,
            model_name=model_name, checkpoint_extension=chkp_ext)

    #  Predict
    ds_name = os.path.basename(predict_db).lower()
    if ds_name == 'hgr_small':
        ds_name = 'hgr'

    out_dir = os.path.join(timestr, 'skinny', 'cross', f'{train_db}_on_{ds_name}')
    schedule_predict(mod, predict_db, out_dir)

    if is_ecu: #  time to do second half
        csv_not_test(whole_test)
        out_dir = os.path.join(timestr, 'skinny', 'cross', f'{train_db}_on_{ds_name}')
        schedule_predict(mod, predict_db, out_dir, skip = 2000)

    #  Zip and save predictions
    if save:
        zip_path = 'drive/MyDrive/testing/skinny/' + timestr + '_p.zip'
        !zip -r $zip_path $timestr

def cross_predict_skintones(train_skintone, predict_skintone, timestr = None, save = False):
    '''Perform a cross prediction of a skintone model over a skintone dataset'''
    db_dir = 'dataset/Schmugge'

    if timestr == None:
        timestr = get_timestamp()

    #  Update the csv file to set the prediction set
    gen_sch_by_skintone(predict_skintone, 'test')

    #  Load model files
    chkp_ext = load_checkpoint(models[train_skintone])
    mod = Skinny(levels, initial_filters, image_channels, log_dir, load_checkpoint=True,
            model_name=model_name, checkpoint_extension=chkp_ext)

    #  Predict
    out_dir = os.path.join(timestr, 'skinny', 'cross', f'{train_skintone}_on_{predict_skintone}')
    schedule_predict(mod, db_dir, out_dir)

    #  Zip and save predictions
    if save:
        zip_path = 'drive/MyDrive/testing/skinny/' + timestr + '_p.zip'
        !zip -r $zip_path $timestr

def base_predict(db_name, timestr = None, save = False):
    '''Perform a base prediction of a model over a dataset'''
    if db_name == 'ecu':
        db_dir = 'dataset/ECU'
    elif db_name == 'hgr':
        db_dir = 'dataset/HGR_small'
    elif db_name == 'schmugge':
        db_dir = 'dataset/Schmugge'
    elif db_name in skintones:
        db_dir = 'dataset/Schmugge'
        load_schmugge_skintone_split(db_name) #  Load skintone split
    
    if timestr == None:
        timestr = get_timestamp()
    
    #  Load model files
    chkp_ext = load_checkpoint(models[db_name])
    mod = Skinny(levels, initial_filters, image_channels, log_dir, load_checkpoint=True,
            model_name=model_name, checkpoint_extension=chkp_ext)

    #  Predict
    out_dir = os.path.join(timestr, 'skinny', 'base', db_name)
    schedule_predict(mod, db_dir, out_dir)

    #  Zip and save predictions
    if save:
        zip_path = 'drive/MyDrive/testing/skinny/' + timestr + '_p.zip'
        !zip -r $zip_path $timestr

Predict

In [None]:
# Custom Predict
cross_predict('ecu', 'dataset/Pratheepan', timestr = get_timestamp(), save = True)

In [None]:
# Run thesis predictions

# Do base+cross of either skintones or normal
mode = 'normal' #  'normal' or 'skintones'


timestr = get_timestamp()

if mode == 'normal':
    modls = ['ecu', 'hgr', 'schmugge']

    #  Base predictions: based on splits defined by me and only predict on self
    #  timestr/skinny/base/{ecu,hgr,schmugge}/{p/y/x}
    for ds_name in modls:
        base_predict(ds_name, timestr = timestr, save = False)

    #  Cross predictions: use a dataset whole as the testing set
    #  timestr/skinny/cross/ecu_on_ecu/{p/y/x}
    for ds_train in modls:
        for ds_test in modls:
            if ds_train != ds_test: #  do not cross-predict on self
                cross_predict(ds_train, ds_test, timestr = timestr, save = False)
elif mode == 'skintones':
    modls = ['light', 'medium', 'dark']

    #  Base predictions: based on splits defined by me and only predict on self
    #  timestr/skinny/base/{ecu,hgr,schmugge}/{p/y/x}
    for ds_name in modls:
        base_predict(ds_name, timestr = timestr, save = False)

    #  Cross predictions: use a dataset whole as the testing set
    #  timestr/skinny/cross/ecu_on_ecu/{p/y/x}
    for ds_train in modls:
        for ds_test in modls:
            if ds_train != ds_test: #  do not cross-predict on self
                cross_predict_skintones(ds_train, ds_test, timestr = timestr, save = False)

#  Zip and save predictions
zip_path = 'drive/MyDrive/testing/skinny/' + timestr + '_p.zip'
!zip -r $zip_path $timestr

Predict on a single image

In [None]:
# preprocessing operations
preprocessor = Preprocessor()
preprocessor.cast(dtype=tf.float32).normalize().downscale(max_pixel_count=512**2).pad(network_levels=levels)

chkp_ext = load_checkpoint(models['ecu'])
inim = 'dataset/ECU/origin_images/im00001.jpg'
outim = 'spred/im00001.png'


mod = Skinny(levels, initial_filters, image_channels, log_dir, load_checkpoint=True,
        model_name=model_name, checkpoint_extension=chkp_ext)
single_predict(mod, inim, outim, preprocessor)