# Image Outlier Detection: Mahalanobis Distance

In [None]:
# Global Confguration
CFG = {
    'gpus': 0,
    'image_size': 28,
    'channel_size': 1,
    'batch_size': 32,
    'epochs': 5,
    'learning_rate': 0.001,
    'conv_regularization_factor': 0.0001,
    'dense_regularization_factor': 0.0001,
    'kernel_size': 3,
    'layer_filters': [32, 64, 128],
}

## Imports

In [None]:
import os
import math
import types
import time
import pandas as pd
import seaborn as sns
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from pathlib import Path
from PIL import Image
import tensorflow as tf
from collections import defaultdict
import scipy
from sklearn.model_selection import train_test_split
import tensorflow as tf
import tensorflow.keras.backend as K
from tensorflow.keras import layers
from keras.models import Model
from skimage.transform import resize
from skimage.color import rgb2gray

In [None]:
tf.config.list_physical_devices('GPU')

## Data


In [None]:
def scale(img):
    """
    Scale input image between [-1.0, 1.0].
    """
    return img.astype(np.float32)/127.5 - 1.0

def unscale(img):
    """
    Unscale input image between [0, 255].
    """
    return ((img+1.0)*127.5).astype(np.uint8)

def preprocess_image(img):
    """
    Preprocess function for image data generator.
    """
    img = scale(img)
    return img

def create_datagen(X, y, aug=False, onehot=False):
    """
    Create an image data generator from input images.
    """
    # Apply augmentation
    if aug:
        datagen = tf.keras.preprocessing.image.ImageDataGenerator(
            rescale=None,
            preprocessing_function=preprocess_image,
            #vertical_flip=True,
            #horizontal_flip=True, 
            shear_range=0.1,
            zoom_range=0.1,
            height_shift_range=0.1,
            width_shift_range=0.1,
            rotation_range=10,
            brightness_range=[0.7, 1.3],
            fill_mode= "nearest",
        )   
    else:
        datagen = tf.keras.preprocessing.image.ImageDataGenerator(
            rescale=None,
            preprocessing_function=preprocess_image,
        )
    
    # One-hot encodeing 
    labels = y if not onehot else tf.keras.utils.to_categorical(y)
    
    # Create data generator from numpy arrays (X, y).
    batches = datagen.flow(X, labels, batch_size=CFG['batch_size'], shuffle=True)
    batches.samples = X.shape[0]
        
    return batches

In [None]:
def show_batch(gen, figs_per_row=8, show_one_line=True):
    """
    Show images from a batch of image generator.
    """
    stats = defaultdict(int)
    x, y = gen.next()
    y=np.squeeze(y)
    rows, columns = math.ceil(CFG['batch_size']/figs_per_row), figs_per_row
    if show_one_line:
        rows = 1
    fig=plt.figure(figsize=(columns*1.5, rows*1.5))
    try:
        for i in range(columns*rows):
            img, target = x[i, ...], y[i]
            fig.add_subplot(rows, columns, i+1)
            plt.title(f"{target}")
            if x.shape[3]==1:
                plt.imshow(unscale(img), cmap="gray")
            else:
                plt.imshow(unscale(img))
            plt.axis('off')
    except IndexError:
        pass
    plt.tight_layout()
    plt.show()

## Data: trainings, tests, and outliers

In [None]:
def get_dataset(name="mnist"):
    """
    Return entire dataset as a tuple (X, y).
    We need it to define explicitly outliers group.
    """
    if name=="mnist":
        (X_train, y_train), (X_test, y_test) = tf.keras.datasets.mnist.load_data()
    elif name=="fashion_mnist":
        (X_train, y_train), (X_test, y_test) = tf.keras.datasets.fashion_mnist.load_data()
    elif name=="cifar10":
        (X_train, y_train), (X_test, y_test) = tf.keras.datasets.cifar10.load_data()
    else:
        raise ValueError(f"Unknown dataset {name}")

    X = np.concatenate((X_train, X_test), axis=0)
    y = np.concatenate((y_train, y_test), axis=0)
    
    # Add channel dimension for gray images
    if X.ndim < 4:
        X = X[..., np.newaxis]
        y = y[..., np.newaxis]
    
    return X, y

In [None]:
X, y = get_dataset()

# Set some of categories as outliers and the rest for traning classifier
outlier_indices = np.squeeze( (y==9) | (y==8) )
X_outliers, y_outliers = X[outlier_indices], y[outlier_indices]
X_inliers, y_inliers = X[~outlier_indices], y[~outlier_indices]

# Split train test
X_train, X_test, y_train, y_test = train_test_split(X_inliers, y_inliers, test_size=0.20, random_state=2021)
num_labels = len(np.unique(y_train))

print("Number of unique labels:", num_labels)
print(f"Train   : {X_train.shape}")
print(f"Test    : {X_test.shape}")
print(f"Outliers: {X_outliers.shape}")

In [None]:
show_batch(create_datagen(X_train, y_train, aug=True), show_one_line=False)

In [None]:
show_batch(create_datagen(X_test, y_test))

In [None]:
show_batch(create_datagen(X_outliers, y_outliers))

## Model: Features extractor

In [None]:
def build_custom_model():
    """
    Build a convolutional classifier which is used as image features extractor.
    """
    conv_regulizer = tf.keras.regularizers.l2(CFG['conv_regularization_factor'])
    dense_regulizer = tf.keras.regularizers.l2(CFG['dense_regularization_factor'])
    # Convolutional layers
    inp = layers.Input(shape=(CFG['image_size'], CFG['image_size'], CFG['channel_size']))
    x = inp
    for filters in CFG['layer_filters']:
        x = layers.Conv2D(filters, CFG['kernel_size'], padding='same', activation='relu', kernel_regularizer=conv_regulizer)(x)
        x = layers.AveragePooling2D((2, 2), padding='same')(x)
    # Top dense layers  
    x = layers.Flatten()(x)
    x= layers.Dropout(0.2)(x)
    features = layers.Dense(64, activation='sigmoid', kernel_regularizer=dense_regulizer)(x)   
    out = layers.Dense(num_labels, activation='softmax', kernel_regularizer=dense_regulizer)(features)
    # Return classifier, features extractor
    return Model(inp, out), Model(inp, features)

In [None]:
# Save model checkpoint
save_dir = os.path.join(os.getcwd(), 'saved_models')
model_name = 'best-model.h5' #'model-checkpoint.{epoch:03d}.h5'
checkpoint_fn  = os.path.join(save_dir, model_name)
csvlog_fn = os.path.join(save_dir, "training.log")
if not os.path.isdir(save_dir):
    os.makedirs(save_dir)
    
# Prepare callbacks for model saving 
checkpoint = tf.keras.callbacks.ModelCheckpoint(checkpoint_fn, monitor='val_loss', verbose=1, save_best_only=True)

# CSV logging callbacks
csv_logger = tf.keras.callbacks.CSVLogger(csvlog_fn, separator=",", append=False)

# Early stop
earlystop = tf.keras.callbacks.EarlyStopping(monitor='val_loss', mode="min", patience=6, verbose=1)

# Reduced learning rate
reducelr = tf.keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=np.sqrt(0.1), cooldown=0, patience=5, min_lr=0.5e-6)

# List of callbacks
callbacks = [csv_logger, earlystop, reducelr, checkpoint]

# Compiler classifier
clf, fm = build_custom_model()
clf.compile(optimizer="adam", loss='categorical_crossentropy', metrics=['accuracy'])

In [None]:
# Classfier 
clf.summary()

In [None]:
# Features extractor as model 
fm.summary()

In [None]:
# Create training and validation image data generators
train_batches= create_datagen(X_train, y_train, aug=True, onehot=True)
val_batches = create_datagen(X_test, y_test, onehot=True)

In [None]:
clf.fit(
    train_batches,
    steps_per_epoch=train_batches.samples//CFG['batch_size'],
    validation_data=val_batches,
    validation_steps=val_batches.samples//CFG['batch_size'],
    epochs=CFG['epochs'],
    callbacks=callbacks,
    workers=8,
    verbose=1,
 )

In [None]:
log = pd.read_csv(csvlog_fn)
log.plot(x="epoch", y=["loss", "val_loss"])
# log.plot(x="epoch", y=["accuracy", "val_accuracy"])

## Mahalanobis outlier detector

In [None]:
class MahalanobisOutlierDetector:
    """
    An outlier detector which uses an input trained model as feature extractor and
    calculates the Mahalanobis distance as outlier score.
    """
    def __init__(self, features_extractor: Model):
        self.features_extractor = features_extractor
        self.features = None
        self.features_mean = None
        self.features_covmat = None
        self.features_covmat_inv = None
        self.threshold = None
        
    def _extract_features(self, gen, steps, verbose) -> np.ndarray:
        """
        Extract features from the base model.
        """
        if steps is None:
            steps = gen.samples//CFG['batch_size']
        return self.features_extractor.predict(gen, steps=steps, workers=8, verbose=1)
        
    def _init_calculations(self):
        """
        Calculate the prerequired matrices for Mahalanobis distance calculation.
        """
        self.features_mean = np.mean(self.features, axis=0)
        self.features_covmat = np.cov(self.features, rowvar=False)
        self.features_covmat_inv = scipy.linalg.inv(self.features_covmat)
        
    def _calculate_distance(self, x) -> float:
        """
        Calculate Mahalanobis distance for an input instance.
        """
        return scipy.spatial.distance.mahalanobis(x, self.features_mean, self.features_covmat_inv)
    
    def _infer_threshold(self, verbose):
        """
        Infer threshold based on the extracted featres from the training set.
        """
        scores = np.asarray([self._calculate_distance(feature) for feature in self.features])
        mean = np.mean(scores)
        std = np.std(scores)
        self.threshold = mean + 2 * std
        if verbose > 0:
            print("OD score mean:", mean)
            print("OD score std :", std)
            print("OD threshold :", self.threshold)  
            
    def fit(self, gen, steps=None, verbose=1):
        """
        Fit detector model.
        """
        self.features = self._extract_features(gen, steps, verbose)
        self._init_calculations()
        self._infer_threshold(verbose)
        
    def predict(self, gen, steps=None, verbose=1) -> np.ndarray:
        """
        Calculate outlier score (Mahalanobis distance).
        """
        features  =  self._extract_features(gen, steps, verbose)
        scores = np.asarray([self._calculate_distance(feature) for feature in features])
        if verbose > 0:
            print("OD score mean:", np.mean(scores))
            print("OD score std :", np.std(scores))
            print(f"Outliers     :{len(np.where(scores > self.threshold )[0])/len(scores): 1.2%}")
            
        if verbose > 1:
            # CFD
            #plt.hist(scores, cumulative=True, density=1, bins=100);
            #plt.axvline(self.threshold, c='k', ls='--', label='threshold')
            #plt.xlabel("Mahalanobis distance"); plt.ylabel("CFD");
            #plt.show()
            # Hist
            plt.hist(scores, bins=100);
            plt.axvline(self.threshold, c='k', ls='--', label='threshold')
            plt.xlabel("Mahalanobis distance"); plt.ylabel("Distribution");
            plt.show()
            
        return scores

In [None]:
# Instantiate and fit Mahalanobis outlier detector
od = MahalanobisOutlierDetector(features_extractor=fm)
od.fit(train_batches, steps=None, verbose=1)

In [None]:
od_results = defaultdict(dict)
train_batches = create_datagen(X_train, y_train, aug=False, onehot=True)
od_results["MNIST (training)"]["od_scores"] = od.predict(train_batches, verbose=2)

In [None]:
od_results["MNIST (validation)"]["od_scores"] =  od.predict(val_batches, verbose=2)

In [None]:
outlier_batches = create_datagen(X_outliers, y_outliers)
od_results["MNIST (outliers)"]["od_scores"] = od.predict(outlier_batches, verbose=2)

In [None]:
X, y = get_dataset("fashion_mnist")
fashion_batches = create_datagen(X, y)
show_batch(fashion_batches)
od_results["Fashion MNIST"]["od_scores"] = od.predict(fashion_batches, verbose=2)

In [None]:
X, y = get_dataset("cifar10")
X_ = np.asarray([(255*rgb2gray(img[2:-2, 2:-2])[..., np.newaxis]).astype(np.uint8) for img in X])
cifar10_datagen = tf.keras.preprocessing.image.ImageDataGenerator(rescale=None, preprocessing_function=preprocess_image)
cifar10_batches = cifar10_datagen.flow(X_, y, batch_size=CFG['batch_size'], shuffle=True)
cifar10_batches.samples = len(X_)
show_batch(cifar10_batches)
od_results["CIFAR10"]["od_scores"] = od.predict(cifar10_batches, verbose=2)

X, y = next(cifar10_batches)
X.shape

## Results

In [None]:
od_results_df = pd.DataFrame(od_results).transpose()
od_results_df["n_samples"] = od_results_df["od_scores"].map(lambda x: len(x))
od_results_df["od_score_mean"] = od_results_df["od_scores"].map(lambda x: np.mean(x))
od_results_df["od_score_std"] = od_results_df["od_scores"].map(lambda x: np.std(x))
od_results_df

In [None]:
fig, ax = plt.subplots(figsize=(9, 4))
# sns.barplot(x=od_results_df.index, y=od_results_df["n_samples"])
sns.barplot(x=od_results_df.index, y=od_results_df.od_score_mean, yerr=od_results_df.od_score_std)

In [None]:
fig, ax = plt.subplots(figsize=(10, 6))
red_square = dict(markerfacecolor='r', marker='s')
plt.boxplot([scores for scores in od_results_df.od_scores.values], flierprops=red_square)
plt.violinplot([scores for scores in od_results_df.od_scores.values]);
plt.xticks(range(1, len(od_results_df)+1), od_results_df.index); plt.ylabel("Mahalanobis distance");
# plt.axhline(od.threshold, c='k', ls='--', label='threshold')

In [None]:
from random import random
fig, ax = plt.subplots(figsize=(10, 6))
for subset, scores in zip(od_results_df.index, od_results_df.od_scores):
    sns.histplot(scores, ax=ax, label=subset, alpha=0.8, color=(random(), random(), random()), bins=100, stat="density")
    
# plt.xlim([0, 30])
plt.axvline(od.threshold, c='k', ls='--', label='Outlier threshold')
plt.legend(loc="upper right"); plt.xlabel("Mahalanobis distance"); 