# Initial setup

In [None]:
%matplotlib inline
%reload_ext autoreload
%autoreload 2

Import all needed packages

In [None]:
import os
import cv2
import tensorflow as tf
import numpy as np
import sklearn
import matplotlib.pyplot as plt
import json
from six.moves import urllib
from tensorflow.keras.preprocessing.image import img_to_array
from sklearn.utils import shuffle
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.utils import to_categorical

In [None]:
from keras import applications
preprocess_input = applications.mobilenet_v2.preprocess_input 
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.layers import Dense
from tensorflow.keras.models import Model
from tensorflow.keras.models import load_model
from sklearn.metrics import roc_curve
from sklearn.metrics import roc_auc_score

Run models on GPU 1

In [None]:
print("Num GPUs Available: ", len(tf.config.experimental.list_physical_devices('GPU')))

In [None]:
gpus = tf.config.experimental.list_physical_devices('GPU')
tf.config.experimental.set_visible_devices(gpus[1], 'GPU')
tf.config.experimental.set_memory_growth(gpus[1], True)

## Set useful paths

The folder structure is the following: there is a main folder *Dataset* that contains all sub folders where taking picture to pre-process (*data_test*, *data_test2_esterni* and *test_surveillance*) and where putting all pre-processed ones (*testBN*, *test2BN_esterni* and *testIR*).

The class we want to recognize among all is still the *target class Persona*. 


In [None]:
path_ds = "Dataset"
target = "Persona"

## Losses computation

The two customized losses must be defined to import correctly the trained model

Quantities *batch_size* and *sub_batch_size* are defined, with also the constant *beta* which is used in the *compactness loss* function

In [None]:
#batch_size = 256   
batch_size = 32
sub_batch_size = batch_size // 2
beta = sub_batch_size**2 / (sub_batch_size-1)**2  #1.0158 with batch_size=256
print("beta = ", beta)

The two input quantities of *compactness loss* are:

• *y_true*: the true labels of the batch, of size (batch_size, n_classes_ref).
This quantity is not used in the lc computation because it has no role in
imposing similarity among person features;

• *y_pred*: predictions of the intermediate features for each element in the
batch, of size (batch_size, n_features). It is produced by the average pooling layer, so the number of features is 1280. We choose this layer because it has weights pre-trained on ImageNet, that speed up the learning process compared to those with random inizialization.

In order to consider only features of person images, the first half part of the batch is isolated. Then, the following operations are performed: the variance of the feature distribution along the batch for each feature and the mean of all variances. This number is then multiplied by a correction factor beta.

In [None]:
def compactness_loss(y_true, y_pred):
    #y_pred_target = y_pred[0:128]   #shape (128, 1280)
    y_pred_target = y_pred[0:16]   #shape (16, 1280)
    # ERRATA -> l_c = tf.keras.backend.mean(tf.keras.backend.var(y_pred_target, axis = 1, keepdims=False)) 
    #axis = 1 means variance along the row -> tf.keras.backend.var of shape=(128,)
    l_c = tf.keras.backend.mean(tf.keras.backend.var(y_pred_target, axis = 0, keepdims=False)) 
    #axis = 0 means variance along the columns (so the features)-> tf.keras.backend.var of shape=(2048,)

    return l_c * beta

#when features are extraxted from convolutional layer: apply average pooling layer ->  compute loss 

The *descriptiveness loss* is computed using the *cross-entropy loss*, that is here defined.

In [None]:
#Categorical crossentropy loss used in the descriptiveness loss
cce = tf.keras.losses.CategoricalCrossentropy(from_logits=False) 

#**Note - Using from_logits=True is more numerically stable.** -> remove softmax layer
#used default redution: reduction=losses_utils.ReductionV2.AUTO

The two input quantities of *descriptiveness loss* are:

• *y_true*: the true labels of the batch, of size (batch_size, n_classes_ref).
This quantity is provided by the inputgenerator, later defined;

• *y_pred*: predictions coming from the last fully connected layer, of size
(batch_size, n_classes_ref). The second dimension n_classes_ref is 20,
corresponding to the categorical label of classes from the reference dataset.
The label of the person class is not included because this is not a multiclass classification problem.

The descriptiveness loss is computed with respect to only elements of the reference dataset. Therefore, the second half part of the batch is considered both in *y_true* and in *y_pred*. The first part of them contains meaningless numbers, because we don’t care about person image labels.
Then, the categorical cross-entropy loss is evaluated between the predicted labels and the desired ones and it is minimized to realize a good classification.

In [None]:
def descriptiveness_loss(y_true, y_pred): 
    #y_true_reference = y_true[128:256]  #shape (128, 20)
    #y_pred_reference = y_pred[128:256]  #shape (128, 20)
    y_true_reference = y_true[16:32]  #shape (16, 20)
    y_pred_reference = y_pred[16:32]  #shape (16, 20)
    l_d = cce(y_true_reference, y_pred_reference)
    return l_d

## Load trained model and isolate *model_features* for feature extraction

In [None]:
path_model = os.path.join(path_ds, "my_model200_400.h5")

In [None]:
model_tot = load_model(path_model, custom_objects={'compactness_loss': compactness_loss, 'descriptiveness_loss': descriptiveness_loss})

We visualize properties of all layers that are part of the *model_tot*

In [None]:
model_tot.summary()

From *model_tot* we extrapolate the model for feature extraction: *model_features*

In [None]:
model_features = Model(model_tot.inputs, model_tot.layers[-2].output) #output = <tf.Tensor 'dense_2/Identity:0' shape=(None, 1280) dtype=float32>

We visualize properties of all layers from the *model_features*

In [None]:
model_features.summary()

# Testing part

The testing part is realized by a *template matching framework*: firstly, in the *template generation phase*, some baseline features of person intances are stored as templates and then, in *matching phase*, a score is generated considering the Euclidean distance between them and new features from the test image.

## 1. Template generation

In template generation phase, some samples are selected from the target dataset and are sent into *model_features* to generate *templates*.

Firstly, templates are taken from folder *templates*, transformed into 224x224 grayscale images and saved in folder *templatesBN*. Templates for grayscale datasets BN1 and BN2 are 40.

In [None]:
path_images_templates = os.path.join(path_ds, "templates")  

In [None]:
path_images_templatesBN = os.path.join(path_ds, "templatesBN")  

In [None]:
img_size = 224
for folder in os.listdir(path_images_templates):
    path_folder = os.path.join(path_images_templates, folder)
    print("\n------------------------------------------------------")
    print("\nFolder ", folder, " with ", len(os.listdir(path_folder)), "images inside")

    path_out = os.path.join(path_images_templatesBN, "templates_Persona")
    if not os.path.exists(path_out):
        os.makedirs(path_out)

    i=0   #new images
    j=0   #images already pre-processed
    for file in os.listdir(path_folder):
        if os.path.exists(path_out + "/" + file):
            j+=1
            print("Image " + file + " already pre-processed" )
        else:
            i+=1
            print("Processing ... ", file)
            
            #read the image
            image = cv2.imread(path_folder + "/"+ file)
            
            image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)     #gray image
            image = cv2.merge((image, image, image))            #gray image on 3 channels
            #write the pre-proc image in train folder
            cv2.imwrite(path_out + "/" + file, image)

    print("\nImages that have been previously pre-processed: " + str(j))
    print("\nNewly pre-processed images: " + str(i))
  

Then *templates* of size (40, 1280) are produced by *test_datagen*

In [None]:
test_datagen = ImageDataGenerator(preprocessing_function = preprocess_input)

test_generator = test_datagen.flow_from_directory(path_images_templatesBN,
                                                  target_size=(224, 224),
                                                  shuffle = False,
                                                  class_mode='categorical',
                                                  batch_size= len(os.listdir(os.path.join(path_images_templatesBN, "templates_Persona"))))  #50

templates = model_features.predict(test_generator,steps = 1)  #(n_templates, n_features) ex. (40, 1280)

In [None]:
from matplotlib import pyplot as plt
batch = next(test_generator)
for i in range (0,10):
    image = batch[0][i]
    plt.imshow(image)
    plt.show()

In [None]:
templates.shape  #(40, 1280)

## 1.1 Creation of  predictions *features_test* and true labels *Y_test* (in DOC labels are 0: target class = Person, 1: alien class, no people inside)

In this part we extract features from test images. The two grayscale dataset contain both 1000 pictures with people and 1000 pictures without individuals.

## Pre-process test images

Folder *data_test* and *data_test2_esterni* contains RGB images of the two categories to pre-process e to store respectively in folders *test* and *test2BN_esterni*.

The structure of folders is the following:
<pre>
<b>data_test or data_test2_esterni</b>
|__ <b>Persona</b>
|__ <b>Others</b>
</pre>

<pre>
<b>testBN or test2BN_esterni</b>
|__ <b>Persona</b>
   |__ <b>1</b>
|__ <b>Others</b>
   |__ <b>0</b>
<pre>

Set useful paths


In [None]:
path_data_test = os.path.join(path_ds, "data_test") 
path_test = os.path.join(path_ds, "testBN")

#path_data_test = os.path.join(path_ds, "data_test2_esterni")
#path_test = os.path.join(path_ds, "test2BN_esterni")

path_test_Persona = os.path.join(path_test, "Persona")

path_test_Others = os.path.join(path_test, "Others")

Pre-processing:

• each image is centrally cropped along its smaller size. In this way we
can resize it without altering the image aspect ratio and the properties
of objects within;

• each picture is resized to square format of 224×224 with a bilinear interpolation;

• each image is made a grayscale image with size of (224, 224, 1), having a
single channel;

• each grayscale image is brought back on three channels, repeating the single channel three times. This operation is done since the structure of
most of networks presents a three channel configuration.

In [None]:
!rm -rf `find -type d -name .ipynb_checkpoints`

In [None]:
img_size = 224
for folder in os.listdir(path_data_test):
    path_folder = os.path.join(path_data_test, folder)
    print("\n------------------------------------------------------")
    print("\nFolder ", folder, " with ", len(os.listdir(path_folder)), "images inside")

    if folder == target:
        path_out = os.path.join(path_test_Persona, "0")
        if not os.path.exists(path_out):
            os.makedirs(path_out)
    else: 
        path_out = os.path.join(path_test_Others, "1")
        if not os.path.exists(path_out):
            os.makedirs(path_out)

    i=0   #new images
    j=0   #images already pre-processed
    for file in os.listdir(path_folder):
        if os.path.exists(path_out + "/" + file):
            j+=1
            print("Image " + file + " already pre-processed" )
        else:
            i+=1
            print("Processing ... ", file)
            
            #read the image
            image = cv2.imread(path_folder + "/"+ file)
            #crop image -> square image along its min dimension
            h, w, c = image.shape
            if w>h:
                start = (w-h)//2
                image = image[:, start:start+h]
            else:
                start = (h-w)//2
                image = image[start:start+w,:]
            #resize
            image = cv2.resize(image, (img_size, img_size), interpolation=cv2.INTER_LINEAR)
            image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)     #gray image
            image = cv2.merge((image, image, image))            #gray image on 3 channels
            #write the pre-proc image in train folder
            cv2.imwrite(path_out + "/" + file, image)

    print("\nImages that have been previously pre-processed: " + str(j))
    print("\nNewly pre-processed images: " + str(i))
  

Extract features from Person images using *test_datagen0*

In [None]:
test_datagen0 = ImageDataGenerator(preprocessing_function = preprocess_input)

test_generator0 = test_datagen0.flow_from_directory(path_test_Persona,
                                                  target_size=(224, 224),
                                                  shuffle = False,
                                                  class_mode='categorical',
                                                  batch_size= 100
                                                  ) 

In [None]:
features_Persona = model_features.predict(test_generator0, steps = len(os.listdir(os.path.join(path_test_Persona, "0"))) // 100)

In [None]:
features_Persona.shape

Extract features from Others images using *test_datagen1*

In [None]:
test_datagen1 = ImageDataGenerator(preprocessing_function = preprocess_input)

test_generator1 = test_datagen1.flow_from_directory(path_test_Others,
                                                  target_size=(224, 224),
                                                  shuffle = False,
                                                  class_mode='categorical',
                                                  batch_size= 100
                                                  ) 

In [None]:
features_Others = model_features.predict(test_generator1, steps = len(os.listdir(os.path.join(path_test_Others, "1"))) // 100)

In [None]:
features_Others.shape

Append all features in *features_test*

In [None]:
features_test = np.concatenate([features_Persona, features_Others])    #Y_test will be 00000 ... 11111

In [None]:
features_test.shape

Create a *features_test_tsne* for visualizing features and templates thanks to t-SNE

In [None]:
features_test_tsne = np.concatenate([features_Persona, features_Others, templates]) 

Create true labels of test images in *Y_test*

In [None]:
Y_test=np.concatenate([np.zeros(features_Persona.shape[0]), np.ones(features_Others.shape[0])])

In [None]:
Y_test.shape

Create a *Y_test_tsne* with inside true labels of features and templates for t-SNE visualization of features

In [None]:
Y_test_tsne=np.concatenate([np.zeros(features_Persona.shape[0]), np.ones(features_Others.shape[0]), np.ones(templates.shape[0])*2])

Implement t-SNE visualization of 1280 features extracted from each test image.

• red points with labels 0 are the features associated to images containg people;

• green points labeled with 1 are the features extracted from pictures with no people;

• blue points with a fake label 2 are the templates from which the classification score is generated.

In [None]:
from __future__ import print_function
import time
import numpy as np
import pandas as pd
#from sklearn.datasets import fetch_mldata
from sklearn.decomposition import PCA
from sklearn.manifold import TSNE
%matplotlib inline
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
import seaborn as sns

#feat_cols = [ 'pixel'+str(i) for i in range(features_test.shape[1]) ]
#df = pd.DataFrame(features_test,columns=feat_cols)
#df['y'] = Y_test

feat_cols = [ 'pixel'+str(i) for i in range(features_test_tsne.shape[1]) ]
df = pd.DataFrame(features_test_tsne,columns=feat_cols)
df['y'] = Y_test_tsne

df['label'] = df['y'].apply(lambda i: str(i))
#features_test, Y_test = None, None
print('Size of the dataframe: {}'.format(df.shape))

# For reproducability of the results
np.random.seed(20)
rndperm = np.random.permutation(df.shape[0])

N = Y_test.shape[0]
df_subset = df.loc[rndperm[:N],:].copy()
data_subset = df_subset[feat_cols].values
#pca = PCA(n_components=3)
#pca_result = pca.fit_transform(data_subset)
#df_subset['pca-one'] = pca_result[:,0]
#df_subset['pca-two'] = pca_result[:,1] 
#df_subset['pca-three'] = pca_result[:,2]
#print('Explained variation per principal component: {}'.format(pca.explained_variance_ratio_))

#_____________________
time_start = time.time()
tsne = TSNE(n_components=2, verbose=1, perplexity=40, n_iter=300)
tsne_results = tsne.fit_transform(data_subset)
print('t-SNE done! Time elapsed: {} seconds'.format(time.time()-time_start))

df_subset['tsne-2d-one'] = tsne_results[:,0]
df_subset['tsne-2d-two'] = tsne_results[:,1]
plt.figure(figsize=(16,10))
sns.scatterplot(
    x="tsne-2d-one", y="tsne-2d-two",
    hue="y",
    palette=sns.color_palette("hls", 3),
    data=df_subset,
    legend="full",
    alpha=1
)

## 2. Template matching

In matching phase, features extracted from test images are compared to templates using a *matching function f*, the Euclidean distance.

In particular the inputs provided to *scores_generation* function are:

• features_test: features extracted from test images, of size (n_test, n_features);

• templates: stored templates corresponding to baseline characteristics of
the person class, of size (n_templates, n_features).

Features coming from each test image are compared to all templates: the quantity *d* contains the euclidean distances between them, sizing (n_templates,). Each vector *d* is computed for all images in the test dataset and is saved in the *distances_vector*, of size (n_test, n_templates). The scores, stored in the vector *scores* of size (n_test,), are selected taking the minimum value among all computed distances in distances_vector, for all images.

In [None]:
#def euclidean(v1, v2):
#  return sum((p-q)**2 for p, q in zip(v1, v2)) ** .5

#d = [euclidean(f, t) for t in templates]


In [None]:
def scores_generation(features_test, templates):
    for f in features_test:
        d = [np.linalg.norm(f-t) for t in templates]  #np.linalg.norm(f-t) = Euclidean norm 
        distances_vector.append(d)                    #shape of (n_X_test, n_templates): euclidean norm between each element of features_X_test and each template  
    scores = np.amin(distances_vector, axis=1)      #axis=1 -> min value for each row
    #scores = np.mean(distances_vector, axis=1)
    scores = np.array(scores)
    return scores

In [None]:
scores = []
distances_vector = []

In [None]:
scores = scores_generation(features_test, templates)  #shape of (n_X_test,) for each image belonging to X_test

In [None]:
scores.shape

In [None]:
scores

## Plot ROC curve

The Receiver Operating Characteristic curve plots the True Positive Rate (TPR) versus the False Positive Rate (FPR) for all possible thresholds. It is used to evaluate DOC models.
Best ones have ROC curves very close to the top left corner of
the plot.

In [None]:
from sklearn.metrics import roc_curve, roc_auc_score
#from sklearn.metrics import roc_auc_score

In [None]:
def plot_roc_curve(fpr, tpr, label):
    plt.plot(fpr, tpr, linewidth=2, label=label)
    plt.legend()
    plt.title('ROC curve')
    plt.plot([0, 1], [0, 1], 'k--') # Dashed diagonal
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.grid(True)

In [None]:
fpr, tpr, thresholds = roc_curve(Y_test, scores)
AUC = roc_auc_score(Y_test, scores)
print(AUC)

In [None]:
#thresholds

In [None]:
plot_roc_curve(fpr, tpr, label='DeepOneClassification(AUC = %.2f)'%AUC)
plt.show()

Save fpr and tpr in folder *metrics* to retrieve them for plots

In [None]:
path_metrics = os.path.join(path_ds, "metrics")

path_fpr = os.path.join(path_metrics, "fpr200t.npy") 
path_tpr = os.path.join(path_metrics, "tpr200t.npy")
np.save(path_fpr, fpr, allow_pickle=True, fix_imports=True)
np.save(path_tpr, tpr, allow_pickle=True, fix_imports=True)

## Compute variance of Person features

This is the quantity minimized in the compactness loss

In [None]:
features_Persona = tf.convert_to_tensor(features_Persona, np.float32)

In [None]:
tf.keras.backend.var(features_Persona, axis = 0, keepdims=False)

In [None]:
var = tf.keras.backend.mean(tf.keras.backend.var(features_Persona, axis = 0, keepdims=False))
print(var)

## Optimal threshold and DOC output *y_pred*

Scores are finally transformed in considerable output for One-Class Classification thanks to a threshold delta. Remember that labels in DOC are 0: person and 1:others, so the class person is the negative class.

The chosen $\delta$ in our Deep One-class Classification is the one that maximizes the quantity (TNR-FNR), producing an high value of TNR, the True Negative Rate, and a low value of FNR, the False Negative Rate.
The first one indicates the ratio of negative instances correctly classified as negative, while the second one is the ratio of positive instance incorrectly classified.
Therefore, maximizing the term (TNR-FNR) allows to reach an high value of intances classified as people that are actually people and a low value of alien objects wrongly classified as people.
Considering also that TNR=1-FPR and FNR=1-TPR, finding the maximum value for (TNR-FNR) means maximizing (TPR-FPR) ->TNR-FNR=1-FPR-(1-TPR)=TPR-FPR, that corresponds to the closest point to the top left corner of the ROC curve. These quantities have been already computed by *roc_curve* command.

In [None]:
optimal_idx = np.argmax(tpr - fpr)
optimal_threshold = thresholds[optimal_idx]
print("Threshold value is:", optimal_threshold)

In [None]:
y_pred = np.zeros(scores.shape[0])
for i in range(scores.shape[0]):
    if scores[i] > optimal_threshold:
        y_pred[i] = 1

In [None]:
y_pred.shape 

Labels are "reverse" in DOC (0: person, 1:others) w.r.t. binary models. 

The fact that the positive class is not the *person class* causes issues because metrics are closely related to the chosen positive class.

If we want to refer all metrics that are precision, recall, F1 score, accuracy to the target class, we need to reverse label values produced by DOC models -> 1-Y_test and 1-y_pred

Confusion matrix

In [None]:
from sklearn.metrics import confusion_matrix
cm = confusion_matrix(1-Y_test, 1-y_pred)
print(cm)
#           predicted               0:negative - others   1:positive - person
           #   0  1                   FP=false positive, actual others but predicted person
#actual    #0 TN FP                   FN=false negative, actual person but predicted others
           #1 FN TP                   

F1 score

In [None]:
from sklearn.metrics import f1_score
#f1_score(Y_test, y_pred)
f1_score(1-Y_test, 1-y_pred)

Precision

In [None]:
from sklearn.metrics import precision_score
#precision_score(Y_test, y_pred)
precision_score(1-Y_test, 1-y_pred)

###### Recall

In [None]:
from sklearn.metrics import recall_score
#recall_score(Y_test, y_pred)
recall_score(1-Y_test, 1-y_pred)

Accuracy

In [None]:
from sklearn.metrics import accuracy_score
#tn, fp, fn, tp = confusion_matrix(y_true, y_pred_class).ravel()
#accuracy = (tp + tn) / (tp + fp + fn + tn)
# or simply
accuracy_score(1-Y_test, 1-y_pred)

## Plot DET curve

The Detection Error Tradeoff curve plots the False Positive Rate (FPR) against the False Negative Rate (FNR) for all possible threshold values

In [None]:
fps=fpr
fns=1-tpr

In [None]:
from matplotlib import pyplot as plt
def DETCurve(fps,fns):
    """
    Given false positive and false negative rates, produce a DET Curve.
    The false positive rate is assumed to be increasing while the false
    negative rate is assumed to be decreasing.
    """
    axis_min = min(fps[0],fns[-1])
    fig,ax = plt.subplots()
    plt.plot(fps,fns)
    plt.yscale('log')
    plt.xscale('log')
    plt.xlabel('False Positive Rate (%)')
    plt.ylabel('False Negative Rate (%)')
    ticks_to_use = [0.001,0.002,0.005,0.01,0.02,0.05,0.1,0.2,0.5,1,2,5,10,20,50]
    ax.get_xaxis().set_major_formatter(plt.matplotlib.ticker.ScalarFormatter())
    ax.get_yaxis().set_major_formatter(plt.matplotlib.ticker.ScalarFormatter())
    ax.set_xticks(ticks_to_use)
    ax.set_yticks(ticks_to_use)
    plt.axis([0.001,50,0.001,50])
    plt.grid(True)

DETCurve(fps,fns)