# Imports

In [None]:
import requests
from pathlib import Path

import cv2
import numpy as np
import pandas as pd
import tensorflow as tf
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split

from tensorflow import keras
from tensorflow.keras import layers

# Global variables definition

In [None]:
ROOT_DIR = Path.cwd().parent
CSV_LINK = '...' # I'm not authorized to display the url publicly
CSV_NAME = 'dataset.csv'
AUGMENTED_CSV_NAME = 'augmented_data.csv'

CSV_PATH = ROOT_DIR / CSV_NAME
BASE_IMG_DIR = ROOT_DIR / 'img'

WITH_IMGAUG_DATA_AUG = False
WITH_OPENCV_DATA_AUG = True

# Download the csv metadata file

In [None]:
# Disable unverified HTTPS warnings
import urllib3
urllib3.disable_warnings()

In [None]:
if not CSV_PATH.is_file():
    raw_csv = requests.get(CSV_LINK, verify=False).content
    CSV_PATH.write_bytes(raw_csv)

# Download the images with inventory level

In [None]:
def download_base_images(csv_path, output_dir):
    prefix_url = '...' # I'm not authorized to display the url publicly
    
    df = pd.read_csv(csv_path, sep=';', index_col=0).dropna()
    
    for (label_id, inv_level, img_url) in df.itertuples():
        full_url = prefix_url + img_url
        output_file = output_dir / f'{label_id}_{inv_level}.png'
        
        img_data = requests.get(full_url, verify=False).content
        output_file.write_bytes(img_data)

In [None]:
if not BASE_IMG_DIR.is_dir():
    BASE_IMG_DIR.mkdir()
    download_base_images(CSV_PATH, BASE_IMG_DIR)

# Train Test Split

In [None]:
dataset_files = list(BASE_IMG_DIR.glob('*'))
len(dataset_files)

In [None]:
train_files, test_files = train_test_split(dataset_files, test_size=0.2, random_state=42)
len(train_files), len(test_files)

# Load training files

In [None]:
# Load the gray image and normalize it

def pad_resize(img, h, w):
    ratio_h = h / img.shape[0]
    ratio_w = w / img.shape[1]
    
    # Resize while keeping same ratio
    ratio = min(ratio_h, ratio_w)
    img = cv2.resize(img, (0, 0), fx=ratio, fy=ratio, interpolation=cv2.INTER_CUBIC)
    
    # Pad the rest of the image
    pad_y, pad_x = h - img.shape[0], w - img.shape[1]
    img = cv2.copyMakeBorder(
        img,
        top = pad_y // 2,
        bottom = (pad_y + 1) // 2,
        left = pad_x // 2,
        right = (pad_x + 1) // 2,
        borderType = cv2.BORDER_CONSTANT,
        value = [255, 255, 255]
    )
    
    return img

def load_gray(file_paths, reshape_h, reshape_w):
    # Load the gray images & normalize
    imgs = [cv2.imread(str(f), cv2.IMREAD_GRAYSCALE).astype(np.float32) for f in file_paths]
    
    # Resize the images
    imgs = np.asarray([pad_resize(img, reshape_h, reshape_w) for img in imgs])
    
    imgs = imgs[..., None] / 255

    return imgs

reshape_h, reshape_w = 224, 224
train_imgs = load_gray(train_files, reshape_h, reshape_w)
test_imgs = load_gray(test_files, reshape_h, reshape_w)

plt.imshow(train_imgs[0,:,:,0], cmap='gray'); plt.show()
plt.imshow(train_imgs[-1,:,:,0], cmap='gray'); plt.show()

train_imgs.shape, test_imgs.shape

In [None]:
# Extract the inventory level from the filenames

def extract_inv_level(file_paths):
    # Remove file extension & Keep only the inventory level
    levels_str = [f.stem.split('_')[-1] for f in file_paths]
    
    # Convert to float
    return np.array(levels_str, dtype=np.float32)

train_y, test_y = extract_inv_level(train_files), extract_inv_level(test_files)
train_y[:5], test_y[:5]

# Computer Vision classic

In [None]:
import skimage
import math
import random
from skimage import io, filters, segmentation
from sklearn.metrics import mean_squared_error, mean_absolute_error, max_error

In [None]:
# Load files
gt = extract_inv_level(dataset_files)
orig_imgs = []
gray_imgs = []
for file in dataset_files:
    orig_img = skimage.io.imread(file)
    orig_imgs.append(orig_img)
    gray_img = skimage.color.rgb2gray(orig_img)
    gray_img *= 255
    gray_img.astype(np.uint8)
    gray_imgs.append(gray_img)

In [None]:
def show_images(orig_imgs, gray_imgs, gt, imno, pred=None):
    title = f'{imno}Ground truth: {gt[imno]:.2f}'

    if pred:
        title = f'{imno}Ground truth: {gt[imno]:.2f} | Pred: {pred:.2f}'

    fig, axes = plt.subplots(1, 2, figsize=(6, 4))
    ax = axes.ravel()

    ax[0].imshow(orig_imgs[imno])
    ax[0].set_title(title)
    ax[1].imshow(gray_imgs[imno], cmap=plt.cm.gray)
    ax[1].set_title(title)
    
    fig.tight_layout()
    plt.show()

Starting with a simple image: pens

In [None]:
show_images(orig_imgs, gray_imgs, gt, 62)
show_images(orig_imgs, gray_imgs, gt, 63)

As a baseline, we can use dummy and random classifiers, or a classifier based on the average color:

In [None]:
def dummy_classify(img, gt=gt):
    values, counts = np.unique(gt, return_counts=True)
    res = gt[np.argmax(counts)]
    return res

def random_classify(img, gt=gt):
    idx = np.random.randint(len(gt))
    res = gt[idx]
    return res

def mean_pixels_color_classify(img, gt=None):
    mask = (img < 255)
    avg = np.mean(img[mask])
    lvl = 1 - (avg / 255)

    ys = [0, 0.2, 0.4, 0.6, 0.8, 1]
    for y in ys:
        if abs(y - lvl) <= 0.1:
            res = y
            break
    return res

In [None]:
im_62 = gray_imgs[62]
im_63 = gray_imgs[63]
print('Dummy:')
print("pred: ", dummy_classify(im_62), dummy_classify(im_63))
print("true: ", gt[62], gt[63])
print('Mean color:')
print("pred: ", mean_pixels_color_classify(im_62), mean_pixels_color_classify(im_63))
print("true: ", gt[62], gt[63])

In [None]:
def predict_all_and_get_metrics(cls, gt=gt, verbose=True):
    y_pred = [cls(gray_imgs[imno], gt) for imno in range(len(gt))]
    if verbose:
        print("MSE: ", mean_squared_error(gt, y_pred))
        print("MAE: ", mean_absolute_error(gt, y_pred))
        print("Max err: ", max_error(gt, y_pred))
    t = np.sum(np.asarray(y_pred).astype(np.float32) == (gt))
    if verbose:
        print(f"Accuracy: {t}/{len(gt)} {t / len(gt)}")
    return y_pred

In [None]:
y_pred_rand = predict_all_and_get_metrics(random_classify)
y_pred_dummy = predict_all_and_get_metrics(dummy_classify)
y_pred_mean_pixels_color = predict_all_and_get_metrics(mean_pixels_color_classify)

In [None]:
def get_biggest_errors(y_true, y_pred):
    y_true = np.asarray(y_true)
    y_pred = np.asarray(y_pred)
    errors = abs(y_true - y_pred)
    idx_biggest_errors = np.argsort(errors, axis=-1)[::-1]
    return errors[idx_biggest_errors], idx_biggest_errors

biggest_errors, idx_biggest_errors = get_biggest_errors(gt, y_pred_mean_pixels_color)

Largest errors:

In [None]:
for i in range(5):
    j = idx_biggest_errors[i]
    show_images(orig_imgs, gray_imgs, gt, j, y_pred_mean_pixels_color[j])

Smallest errors:

In [None]:
for i in range(5):
    j = idx_biggest_errors[-i-1]
    show_images(orig_imgs, gray_imgs, gt, j, y_pred_mean_pixels_color[j])

Let's try with segmenting using the watershed algorithm.  
We use a Sobel filter to get our topographic map.  
The markers here are hard-coded, the values defined empirically.

In [None]:
def watershed_seg(img):
    elevation_map = skimage.filters.sobel(img)

    BACKGROUND = 1
    OBJECTS = 2

    markers = np.zeros(img.shape, dtype=np.uint)
    markers[(img > 145) & (img < 165)] = BACKGROUND # background
    markers[img < 50] = OBJECTS # black
    markers[img > 210] = OBJECTS # white

    segmentation = skimage.segmentation.watershed(elevation_map, markers, mask=(img<255))
    
    return segmentation, elevation_map, markers

def show_watershed_seg(orig_imgs, gray_imgs, gt, imno, segmentation, elevation_map, markers):
    fig, axes = plt.subplots(1, 4, figsize=(10, 6))
    ax = axes.ravel()

    title = f'{imno}Ground truth: {gt[imno]:.2f}'

    ax[0].imshow(orig_imgs[imno])
    ax[0].set_title(title)

    ax[1].imshow(gray_imgs[imno], cmap=plt.cm.gray)
    ax[2].imshow(elevation_map)
    ax[2].set_title('Sobel')
    ax[3].imshow(segmentation, cmap=plt.cm.nipy_spectral)
    ax[3].set_title('Segmentation')

    fig.tight_layout()
    plt.show()
    
imno = 63
im_63 = gray_imgs[imno]

segmentation, elevation_map, markers = watershed_seg(im_63)
show_watershed_seg(orig_imgs, gray_imgs, gt, imno, segmentation, elevation_map, markers)

imno = 43
im_43 = gray_imgs[imno]

segmentation, elevation_map, markers = watershed_seg(im_43)
show_watershed_seg(orig_imgs, gray_imgs, gt, imno, segmentation, elevation_map, markers)

In [None]:
def watershed_hcmarkers_classify(img, gt=None):
    segmentation, _, _ = watershed_seg(img)
    BACKGROUND = 1
    OBJECTS = 2
    epsilon = 1e-6
    
    bg = (segmentation == BACKGROUND).sum()
    fg = (segmentation == OBJECTS).sum()
        
    lvl = fg / (fg+bg+epsilon)
    
    ys = [0, 0.2, 0.4, 0.6, 0.8, 1]
    for y in ys:
        if abs(y - lvl) <= 0.1:
            return y
        
watershed_hcmarkers_classify(gray_imgs[44]), gt[44]

In [None]:
y_pred_watershed1 = predict_all_and_get_metrics(watershed_hcmarkers_classify)

In [None]:
biggest_errors, idx_biggest_errors = get_biggest_errors(gt, y_pred_watershed1)

The results are not great on the whole data set. This method is too naive to generalize well.

In [None]:
for i in range(5):
    j = idx_biggest_errors[i]
    show_images(orig_imgs, gray_imgs, gt, j, y_pred_watershed1[j])

Smallest errors:

In [None]:
for i in range(20):
    j = idx_biggest_errors[-i-1]
    show_images(orig_imgs, gray_imgs, gt, j, y_pred_watershed1[j])

# Deep

# Data augmentation

In [None]:
from skimage.util import random_noise
import imgaug as ia
import imgaug.augmenters as iaa

In [None]:
ia.seed(42)

# Define the augmentation to apply
aug = iaa.Sequential([
    iaa.Fliplr(0.7), # horizontal flips
        
    # Small gaussian blur with random sigma between 0 and 0.5.
    iaa.Sometimes(0.5, iaa.GaussianBlur(sigma=(0, 0.5))),
    
    # Strengthen or weaken the contrast in each image.
    iaa.LinearContrast((0.75, 1.5)),
    
    # Make some images brighter and some darker.
    iaa.Multiply((0.7, 1.3), per_channel=0.2),
    
    # Scale/zoom them, translate/move them, rotate them and shear them.
    iaa.Affine(
        scale={"x": (1, 1.1), "y": (1, 1.1)},
    )
], random_order=True) # apply augmenters in random order

In [None]:
def _hflip(imgs, y):
    return (np.reshape([cv2.flip(img, 1) for img in imgs], imgs.shape), y)

def _noise(imgs, y):
    return (np.reshape([random_noise(img, mode='s&p', amount=0.061) for img in imgs], imgs.shape), y)

def _imgaug(imgs, y, aug_sequence, nb_aug=3):
    augmented_images = []
    augmented_y = []
    
    for i in range(len(imgs)):
        duplicated_imgs = np.array([imgs[i] for _ in range(nb_aug)])
        augmented_images += list(aug_sequence(images=duplicated_imgs))
        augmented_y += [y[i]] * nb_aug
        
    return np.array(augmented_images), np.array(augmented_y)


def data_augmentation(train_imgs, train_y, aug_sequence):
    '''
    Augment training data by applying tranformations on it.
    
    Parameters:
    -----------
    train_imgs:
        A list of the training images data
    train_y:
        A list of the inventory level for each training image
    
    Returns:
    --------
    aug_imgs:
        A list of training images and augmented images
    aug_y:
        A list of the inventory level for each augmented image
    '''
    
    aug_methods = []
        
    if WITH_IMGAUG_DATA_AUG:
        aug_methods.append(lambda x,y: _imgaug(x, y, aug_sequence))
        
    if WITH_OPENCV_DATA_AUG:
        aug_methods.append(_hflip)
        aug_methods.append(_noise)
    
    aug_imgs = train_imgs.copy()
    aug_y = train_y.copy()
        
    for method in aug_methods:
        imgs, y = method(train_imgs.copy(), train_y.copy())
        aug_imgs = np.concatenate((aug_imgs, imgs))
        aug_y = np.concatenate((aug_y, y))
        
    return (aug_imgs, aug_y)


In [None]:
train_aug_imgs, train_aug_y = data_augmentation(train_imgs, train_y, aug)

print(len(train_aug_imgs) / len(train_imgs), 'more images after data augmentation')
len(train_aug_imgs), len(train_aug_y)

In [None]:
plt.imshow(train_aug_imgs[0,:,:,0], cmap='gray'); plt.show()
plt.imshow(train_aug_imgs[-1,:,:,0], cmap='gray'); plt.show()

# Build Models

In [None]:
# Model 1: CNN

input_shape = train_imgs.shape[1:]

inputs = keras.Input(shape=input_shape, name='CNN_model_input')

conv1 = layers.Conv2D(32, 5, strides=2, activation='relu', name='CNN_model_conv1')(inputs)
conv2 = layers.Conv2D(32, 5, strides=2, activation='relu', name='CNN_model_conv2')(conv1)
conv3 = layers.Conv2D(32, 3, activation='relu', name='CNN_model_conv3')(conv2)

maxpool = layers.GlobalMaxPooling2D(name='CNN_model_globalmaxpool1')(conv3)

dense1 = layers.Dense(32, activation='relu', name='CNN_model_dense1')(maxpool)
dense2 = layers.Dense(32, activation='relu', name='CNN_model_dense2')(dense1)

dropout = layers.Dropout(0.25, name='CNN_model_dropout1')(dense2)
outputs = layers.Dense(1, activation='sigmoid', name='CNN_model_dense3')(dropout)

model1 = keras.Model(inputs, outputs)
model1.summary()

In [None]:
# Model 2: Transfer learning with MobileNetV2

class MobileNetPreprocess(layers.Layer):
    def call(self, inputs):
        # mobilenet preprocess asks for data in [0, 255]
        inputs *= 255

        # mobilenet preprocess asks for data in rgb
        inputs = tf.image.grayscale_to_rgb(inputs)
        
        # call mobilenet preprocess
        inputs = keras.applications.mobilenet_v2.preprocess_input(inputs)
        
        return inputs

inputs = keras.Input(shape=input_shape, name='Mobilenet_model_input')
preprocess = MobileNetPreprocess(name='Mobilenet_preprocess')(inputs)

# Use transfer learning and disable training on these weights
mobilenet = keras.applications.MobileNetV2(input_shape=(224, 224, 3), include_top=False)
for layer in mobilenet.layers:
    layer.trainable = False
mobilenet = mobilenet(preprocess)

pool = layers.GlobalMaxPool2D(name='Mobilenet_model_globalmaxpool1')(mobilenet)
dense1 = layers.Dense(32, activation='relu', name='Mobilenet_model_dense1')(pool)
dense2 = layers.Dense(32, activation='relu', name='Mobilenet_model_dense2')(dense1)
dropout = layers.Dropout(0.25, name='Mobilenet_model_dropout1')(dense2)
outputs = layers.Dense(1, activation='sigmoid', name='Mobilenet_model_dense3')(dropout)

model2 = keras.Model(inputs, outputs)
model2.summary()

In [None]:
# Model 3: Transfer learning with ResNet50

class Resnet50V2Preprocess(layers.Layer):
    def call(self, inputs):
        # resnet preprocess asks for data in [0, 255]
        inputs *= 255

        # resnet preprocess asks for data in rgb
        inputs = tf.image.grayscale_to_rgb(inputs)
        
        # call resnet preprocess
        inputs = keras.applications.resnet_v2.preprocess_input(inputs)
        
        return inputs

inputs = keras.Input(shape=input_shape, name='Resnet_model_input')
preprocess = Resnet50V2Preprocess()(inputs)

# Use transfer learning and disable training on these weights
resnet = keras.applications.ResNet50V2(include_top=False)
for layer in resnet.layers:
    layer.trainable = False
resnet = resnet(preprocess)

pool = layers.GlobalMaxPool2D(name='Resnet_model_globalmaxpool1')(resnet)
dense1 = layers.Dense(32, activation='relu', name='Resnet_model_dense1')(pool)
dense2 = layers.Dense(32, activation='relu', name='Resnet_model_dense2')(dense1)
dropout = layers.Dropout(0.25, name='Resnet_model_dropout1')(dense2)
outputs = layers.Dense(1, activation='sigmoid', name='Resnet_model_dense3')(dropout)

model3 = keras.Model(inputs, outputs)
model3.summary()

In [None]:
# Model 4: Wide-and-deep

input_shape = train_imgs.shape[1:]

inputs = keras.Input(shape=input_shape, name='WD_model_input')

# Wide
wide_conv = layers.Conv2D(128, 5, activation='relu', name='WD_model_wide_conv')(inputs)
wide_pool = layers.GlobalMaxPooling2D(name='WD_model_wide_maxpool')(wide_conv)

# Deep
conv1 = layers.Conv2D(32, 5, strides=2, activation='relu', name='WD_model_conv1')(inputs)
conv2 = layers.Conv2D(32, 5, strides=2, activation='relu', name='WD_model_conv2')(conv1)
conv3 = layers.Conv2D(32, 3, activation='relu', name='WD_model_conv3')(conv2)

maxpool = layers.GlobalMaxPooling2D(name='WD_model_globalmaxpool1')(conv3)

dense1 = layers.Dense(32, activation='relu', name='WD_model_dense1')(maxpool)
dense2 = layers.Dense(32, activation='relu', name='WD_model_dense2')(dense1)
dropout = layers.Dropout(0.25, name='WD_model_dropout1')(dense2)

concat = layers.Concatenate()([wide_pool, dropout])
outputs = layers.Dense(1, activation='sigmoid', name='WD_model_dense3')(concat)

model4 = keras.Model(inputs, outputs)
model4.summary()

In [None]:
# We want to predict the level at 20% precision
# so we create this custom metric 

def accuracy_at_20(y_true, y_pred):
    '''Custom metric for computing accuracy at 20% precision'''
    
    precision = 0.2
    
    y_true /= precision
    y_pred /= precision
    
    return keras.backend.mean(y_true == keras.backend.round(y_pred))

print(accuracy_at_20(
    tf.constant([0.2, 0.2, 0.2]),
    tf.constant([0.2, 0.29, 0.3])
    #            OK   OK    KO
))

accuracy_at_20(
    tf.constant([0, 0.2, 0.4]),
    tf.constant([0.09, 0.2, 0.49])
    #            OK    OK   OK
)

In [None]:
# Compile & train on sample

loss = 'mean_squared_error'
metrics = [keras.metrics.mean_absolute_error, accuracy_at_20]

models = {
    'CNN': model1,
    'MobileNetV2': model2,
    'ResNet50V2': model3,
    'Wide-and-Deep': model4
}

for model in models.values():
    model.compile(optimizer='adam', loss=loss, metrics=metrics)

history = {}
for name, model in models.items():
    print('---', name, '---')
    history[name] = model.fit(
        np.asarray(train_aug_imgs),
        train_aug_y,
        epochs=100,
        validation_data=(test_imgs, test_y)
    )

In [None]:
def rolling_average(x, w):
    return np.convolve(x, np.ones(w), 'valid') / w

f, ax = plt.subplots(2,len(models), figsize=(30,10))
colors = ['red', 'green', 'blue', 'black']

ravg_w = 1

for i, name in enumerate(models.keys()):
    model_hist = history[name].history
    
    ax[0,i].plot(rolling_average(model_hist['mean_absolute_error'], ravg_w),
               label=f'{name} (train)',
               linestyle='dashed',
               c=colors[i])
    ax[0,i].plot(rolling_average(model_hist['val_mean_absolute_error'], ravg_w),
               label=f'{name} (test)',
               c=colors[i])
    
    ax[1,i].plot(rolling_average(model_hist['accuracy_at_20'], ravg_w),
               label=f'{name} (train)',
               linestyle='dashed',
               c=colors[i])
    ax[1,i].plot(rolling_average(model_hist['val_accuracy_at_20'], ravg_w),
               label=f'{name} (test)',
               c=colors[i])

    ax[0,i].set_title(f'Rolling average ({ravg_w}) of the Loss')
    ax[0,i].set_ylabel('MAE')
    ax[0,i].set_xlabel('Epoch')
    ax[0,i].set_ylim([0, 0.3])
    ax[0,i].legend(loc='upper right')

    ax[1,i].set_title(f'Rolling average ({ravg_w}) of the Accuracy at 20%')
    ax[1,i].set_ylabel('Accuracy at 20%')
    ax[1,i].set_xlabel('Epoch')
    ax[1,i].set_ylim([0, 1])
    ax[1,i].legend(loc='lower right')

plt.show()

ravg_w = 5

f, ax = plt.subplots(1, 2, figsize=(30,10))
for i, name in enumerate(models.keys()):
    model_hist = history[name].history

    ax[0].plot(rolling_average(model_hist['accuracy_at_20'], ravg_w),
               label=f'{name} (train)',
               linestyle='dashed',
               c=colors[i])
    
    ax[1].plot(rolling_average(model_hist['val_accuracy_at_20'], ravg_w),
               label=f'{name} (test)',
               c=colors[i])

    ax[0].set_title(f'Rolling average ({ravg_w}) of the train Accuracy at 20%')
    ax[0].set_ylabel('Accuracy at 20%')
    ax[0].set_xlabel('Epoch')
    ax[0].legend(loc='lower right')

    ax[1].set_title(f'Rolling average ({ravg_w}) of the test Accuracy at 20%')
    ax[1].set_ylabel('Accuracy at 20%')
    ax[1].set_xlabel('Epoch')
    ax[1].legend(loc='lower right')

In [None]:
# Test the model
for name, model in models.items():
    loss, mae, acc = model.evaluate(test_imgs, test_y, verbose=0)
    print(f'{name:>15} : MAE = {mae:.3f} - Accuracy at 20%: {acc:.3f}')

In [None]:
# Check a test image
nb_samples = 6
nb_cols = 3

sample_imgs, sample_y = test_imgs[:nb_samples], test_y[:nb_samples]
sample_levels = {
    name: model.predict(sample_imgs).flatten()
    for name, model in models.items()
}

f, ax = plt.subplots(int(np.ceil(nb_samples / nb_cols)), 
                     nb_cols, figsize=(10,10))
for i in range(nb_samples):
    title = f'Ground truth: {sample_y[i]:.2f}'
    
    for name, scores in sample_levels.items():
        title += f'\n{name}: {scores[i]:.2f}'
    
    x, y = i // nb_cols, i % nb_cols
    ax[x,y].set_title(title)
    ax[x,y].imshow(sample_imgs[i,:,:,0], cmap='gray')
plt.tight_layout()
plt.show()