<a href="https://colab.research.google.com/github/MS1034/2021-CS-35-CVIP/blob/master/SNN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**Require Libraries**

In [25]:
!pip install tensorflow
!pip install pillow
!pip install tqdm
!pip install keras
!pip install numpy



**Mount Drive**

In [26]:
from google.colab import drive
import os
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [27]:
import os
import pickle

import tqdm
from PIL import Image
import numpy as np


class DataLoader(object):
    """
    Class for loading data from image files
    """

    def __init__(self, width, height, cells, data_path, output_path):
        """
        Proper width and height for each image.
        """
        self.width = width
        self.height = height
        self.cells = cells
        self.data_path = data_path
        self.output_path = output_path

    def _open_image(self, path):
        """
        Using the Image library we open the image in the given path. The path must lead to a .jpg file.
        We then resize it to 105x105 like in the paper (the dataset contains 250x250 images.)

        Returns the image as a numpy array.
        """
        image = Image.open(path)
        image = image.resize((self.width, self.height))
        data = np.asarray(image)
        data = np.array(data, dtype='float64')
        return data

    def convert_image_to_array(self, person, image_num, data_path, predict=False):
        """
        Given a person, image number and datapath, returns a numpy array which represents the image.
        predict - whether this function is called during training or testing. If called when training, we must reshape
        the images since the given dataset is not in the correct dimensions.
        """
        max_zeros = 4
        image_num = '0' * max_zeros + image_num
        image_num = image_num[-max_zeros:]
        image_path = os.path.join(data_path, 'lfw2', person, f'{person}_{image_num}.jpg')
        image_data = self._open_image(image_path)
        if not predict:
            image_data = image_data.reshape(self.width, self.height, self.cells)
        return image_data

    def load(self, set_name):
        """
        Writes into the given output_path the images from the data_path.
        dataset_type = train or test
        """
        file_path = os.path.join(self.data_path, 'splits', f'{set_name}.txt')
        print(file_path)
        print('Loading dataset...')
        x_first = []
        x_second = []
        y = []
        names = []
        with open(file_path, 'r') as file:
            lines = file.readlines()
        for line in tqdm.tqdm(lines):
            line = line.split()
            if len(line) == 4:  # Class 0 - non-identical
                names.append(line)
                first_person_name, first_image_num, second_person_name, second_image_num = line[0], line[1], line[2], \
                                                                                           line[3]
                first_image = self.convert_image_to_array(person=first_person_name,
                                                          image_num=first_image_num,
                                                          data_path=self.data_path)
                second_image = self.convert_image_to_array(person=second_person_name,
                                                           image_num=second_image_num,
                                                           data_path=self.data_path)
                x_first.append(first_image)
                x_second.append(second_image)
                y.append(0)
            elif len(line) == 3:  # Class 1 - identical
                names.append(line)
                person_name, first_image_num, second_image_num = line[0], line[1], line[2]
                first_image = self.convert_image_to_array(person=person_name,
                                                          image_num=first_image_num,
                                                          data_path=self.data_path)
                second_image = self.convert_image_to_array(person=person_name,
                                                           image_num=second_image_num,
                                                           data_path=self.data_path)
                x_first.append(first_image)
                x_second.append(second_image)
                y.append(1)
            elif len(line) == 1:
                print(f'line with a single value: {line}')
        print('Done loading dataset')
        with open(self.output_path, 'wb') as f:
            pickle.dump([[x_first, x_second], y, names], f)


print("Loaded data loader")


Loaded data loader


In [28]:
import os
import pickle
import random

import numpy as np
import tensorflow as tf
from sklearn.model_selection import train_test_split
from tensorflow.keras import Input, Sequential, Model
from tensorflow.keras import backend as K
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Lambda, BatchNormalization, Activation, \
    Dropout
from tensorflow.keras.regularizers import l2


class SiameseNetwork(object):
    def __init__(self, seed, width, height, cells, loss, metrics, optimizer, dropout_rate):
        """
        Seed - The seed used to initialize the weights
        width, height, cells - used for defining the tensors used for the input images
        loss, metrics, optimizer, dropout_rate - settings used for compiling the siamese model (e.g., 'Accuracy' and 'ADAM)
        """
        K.clear_session()
        self.load_file = None
        self.seed = seed
        self.initialize_seed()
        self.optimizer = optimizer

        # Define the matrices for the input images
        input_shape = (width, height, cells)
        left_input = Input(input_shape)
        right_input = Input(input_shape)

        # Get the CNN architecture as presented in the paper (read the readme for more information)
        model = self._get_architecture(input_shape)
        encoded_l = model(left_input)
        encoded_r = model(right_input)

        # Add a layer to combine the two CNNs
        L1_layer = Lambda(lambda tensors: K.abs(tensors[0] - tensors[1]))
        L1_siamese_dist = L1_layer([encoded_l, encoded_r])
        L1_siamese_dist = Dropout(dropout_rate)(L1_siamese_dist)

        # An output layer with Sigmoid activation function
        prediction = Dense(1, activation='sigmoid', bias_initializer=self.initialize_bias)(L1_siamese_dist)

        siamese_net = Model(inputs=[left_input, right_input], outputs=prediction)
        self.siamese_net = siamese_net
        self.siamese_net.compile(loss=loss, optimizer=optimizer, metrics=metrics)

    def initialize_seed(self):
        """
        Initialize seed all for environment
        """
        os.environ['PYTHONHASHSEED'] = str(self.seed)
        random.seed(self.seed)
        np.random.seed(self.seed)
        tf.random.set_seed(self.seed)

    def initialize_weights(self, shape, dtype=None):
        """
        Called when initializing the weights of the siamese model, uses the random_normal function of keras to return a
        tensor with a normal distribution of weights.
        """
        return K.random_normal(shape, mean=0.0, stddev=0.01, dtype=dtype, seed=self.seed)

    def initialize_bias(self, shape, dtype=None):
        """
        Called when initializing the biases of the siamese model, uses the random_normal function of keras to return a
        tensor with a normal distribution of weights.
        """
        return K.random_normal(shape, mean=0.5, stddev=0.01, dtype=dtype, seed=self.seed)

    def _get_architecture(self, input_shape):
        """
        Returns a Convolutional Neural Network based on the input shape given of the images. This is the CNN network
        that is used inside the siamese model. Uses parameters from the siamese one shot paper.
        """
        model = Sequential()
        model.add(
            Conv2D(filters=64,
                   kernel_size=(10, 10),
                   input_shape=input_shape,
                   kernel_initializer=self.initialize_weights,
                   kernel_regularizer=l2(2e-4),
                   name='Conv1'
                   ))
        model.add(BatchNormalization())
        model.add(Activation("relu"))
        model.add(MaxPooling2D())

        model.add(
            Conv2D(filters=128,
                   kernel_size=(7, 7),
                   kernel_initializer=self.initialize_weights,
                   bias_initializer=self.initialize_bias,
                   kernel_regularizer=l2(2e-4),
                   name='Conv2'
                   ))
        model.add(BatchNormalization())
        model.add(Activation("relu"))
        model.add(MaxPooling2D())

        model.add(
            Conv2D(filters=128,
                   kernel_size=(4, 4),
                   kernel_initializer=self.initialize_weights,
                   bias_initializer=self.initialize_bias,
                   kernel_regularizer=l2(2e-4),
                   name='Conv3'
                   ))
        model.add(BatchNormalization())
        model.add(Activation("relu"))
        model.add(MaxPooling2D())

        model.add(
            Conv2D(filters=256,
                   kernel_size=(4, 4),
                   kernel_initializer=self.initialize_weights,
                   bias_initializer=self.initialize_bias,
                   kernel_regularizer=l2(2e-4),
                   name='Conv4'
                   ))
        model.add(BatchNormalization())
        model.add(Activation("relu"))

        model.add(Flatten())
        model.add(
            Dense(4096,
                  activation='sigmoid',
                  kernel_initializer=self.initialize_weights,
                  kernel_regularizer=l2(2e-3),
                  bias_initializer=self.initialize_bias))
        return model

    def _load_weights(self, weights_file):
        """
        A function that attempts to load pre-existing weight files for the siamese model. If it succeeds then returns
        True and updates the weights, otherwise False.
        :return True if the file is already exists
        """
        # self.siamese_net.summary()
        self.load_file = weights_file
        if os.path.exists(weights_file):  # if the file is already exists, load and return true
            print('Loading pre-existed weights file')
            self.siamese_net.load_weights(weights_file)
            return True
        return False

    def fit(self, weights_file, train_path, validation_size, batch_size, epochs, early_stopping, patience, min_delta):
        """
        Function for fitting the model. If the weights already exist, just return the summary of the model. Otherwise,
        perform a whole train/validation/test split and train the model with the given parameters.
        """
        print(train_path)
        with open(train_path, 'rb') as f:
            x_train, y_train, names = pickle.load(f)
            print(f"Trining Length:{len(x_train[0])}")
        """
        X_train[0]:  |----------x_train_0---------------------------|-------x_val_0--------|
        X_train[1]:  |----------x_train_1---------------------------|-------x_val_1--------|
        y_train:     |----------y_train_0 = y_train_1---------------|----y_val_0=y_val_1---|
        """
        x_train_0, x_val_0, y_train_0, y_val_0 = train_test_split(x_train[0], y_train,
                                                                  test_size=validation_size,
                                                                  random_state=self.seed)
        x_train_1, x_val_1, y_train_1, y_val_1 = train_test_split(x_train[1], y_train,
                                                                  test_size=validation_size,
                                                                  random_state=self.seed)
        x_train_0 = np.array(x_train_0, dtype='float64')
        x_val_0 = np.array(x_val_0, dtype='float64')
        x_train_1 = np.array(x_train_1, dtype='float64')
        x_val_1 = np.array(x_val_1, dtype='float64')
        x_train = [x_train_0, x_train_1]
        x_val = [x_val_0, x_val_1]
        if y_train_0 != y_train_1 and y_val_0 != y_val_1:
            raise Exception("y train lists or y validation list do not equal")
        y_train_both = np.array(y_train_0, dtype='float64')
        y_val_both = np.array(y_val_0, dtype='float64')
        if not self._load_weights(weights_file=weights_file):
            print('No such pre-existed weights file')
            print('Beginning to fit the model')
            callback = []
            if early_stopping:
                """
                We used the EarlyStopping function monitoring on the validation loss with a minimum delta of 0.1
                (Minimum change in the monitored quantity to qualify as an improvement, i.e.
                an absolute change of less than min_delta, will count as no improvement.) and patience 5
                (Number of epochs with no improvement after which training will be stopped.).
                The direction is automatically inferred from the name of the monitored quantity (‘auto’).
                """
                es = EarlyStopping(monitor='val_loss', min_delta=min_delta, patience=patience, mode='auto', verbose=1)
                callback.append(es)
            self.siamese_net.fit(x_train, y_train_both, batch_size=batch_size, epochs=epochs,
                                 validation_data=(x_val, y_val_both), callbacks=callback, verbose=1)
            self.siamese_net.save_weights(self.load_file)
        # evaluate on the testing set
        loss, accuracy = self.siamese_net.evaluate(x_val, y_val_both, batch_size=batch_size)

        print(f'Loss on Validation set: {loss}')
        print(f'Accuracy on Validation set: {accuracy}')

    def evaluate(self, test_file, batch_size, analyze=False):
        """
        Function for evaluating the final model after training.
        test_file - file path to the test file.
        batch_size - the batch size used in training.

        Returns the loss and accuracy results.
        """
        with open(test_file, 'rb') as f:
            x_test, y_test, names = pickle.load(f)
        print(f'Available Metrics: {self.siamese_net.metrics_names}')
        y_test = np.array(y_test, dtype='float64')
        x_test[0] = np.array(x_test[0], dtype='float64')
        x_test[1] = np.array(x_test[1], dtype='float64')
        # evaluate on the test set
        loss, accuracy = self.siamese_net.evaluate(x_test, y_test, batch_size=batch_size)

        y_pred = self.siamese_net.predict(x_test)
        y_pred_binary = (y_pred > 0.5).astype(int)
        report = classification_report(y_test, y_pred_binary)
        print("Classification Report:")
        print(report)
        if analyze:
            self._analyze(x_test, y_test, names)
        return loss, accuracy

    def _analyze(self, x_test, y_test, names):
        """
        Function used for evaluating our network in the methods proposed in the assignment.
        We will find:
        - The person who has 2 images that are the most dissimilar to each other
        - The person with the two images that are the most similar to each other
        - Two people with the most dissimilar images, and
        - The two people with the most similar images.
        """
        best_class_0_prob = 1  # correct classification for different people, y=0, prediction->0
        best_class_0_name = None
        worst_class_0_prob = 0  # misclassification for different people, y=0, prediction->1
        worst_class_0_name = None
        best_class_1_prob = 0  # correct classification for same people, y=1, prediction->1
        best_class_1_name = None
        worst_class_1_prob = 1  # misclassification for same people, y=1, prediction->0
        worst_class_1_name = None
        prob = self.siamese_net.predict(x_test)
        for pair_index in range(len(names)):
            name = names[pair_index]
            y_pair = y_test[pair_index]
            pair_prob = prob[pair_index][0]
            if y_pair == 0:  # different people (actual)
                if pair_prob < best_class_0_prob:  # correct classification for different people, y=0, prediction->0
                    best_class_0_prob = pair_prob
                    best_class_0_name = name
                if pair_prob > worst_class_0_prob:  # misclassification for different people, y=0, prediction->1
                    worst_class_0_prob = pair_prob
                    worst_class_0_name = name
            else:  # the same person (actual)
                if pair_prob > best_class_1_prob:  # correct classification for same people, y=1, prediction->1
                    best_class_1_prob = pair_prob
                    best_class_1_name = name
                if pair_prob < worst_class_1_prob:  # misclassification for same people, y=1, prediction->0
                    worst_class_1_prob = pair_prob
                    worst_class_1_name = name

        print(f'correct classification for different people, y=0, prediction->0, name: {best_class_0_name} | prob: {best_class_0_prob}')
        print(f'misclassification for different people, y=0, prediction->1, name: {worst_class_0_name} | prob: {worst_class_0_prob}')
        print(f'correct classification for same people, y=1, prediction->1, name: {best_class_1_name} | prob: {best_class_1_prob}')
        print(f'misclassification for same people, y=1, prediction->0, name: {worst_class_1_name} | prob: {worst_class_1_prob}')


print("Loaded Siamese Network")

Loaded Siamese Network


In [29]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [30]:
import os
import random
import time

import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras.optimizers import Adam
from sklearn.metrics import classification_report


path_separator = os.path.sep
# Environment settings
IS_COLAB = (os.name == 'posix')
LOAD_DATA = not (os.name == 'posix')
IS_EXPERIMENT = False
train_name = 'train'
test_name = 'test'
WIDTH = HEIGHT = 105
CEELS = 1
loss_type = "binary_crossentropy"
validation_size = 0.2
early_stopping = True

if IS_COLAB:
    # the google drive folder we used
    data_path = os.path.sep + os.path.join('content', 'drive', 'My\ Drive', 'datasets', 'lfw2').replace('\\', '')
else:
    # locally
    from data_loader import DataLoader
    from siamese_network import SiameseNetwork

    data_path = os.path.join('lfwa', 'lfw2')
    os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'


def run_combination(l, bs, ep, pat, md, seed, train_path, test_path):
    """
    This function gets the parameters and run the experiment.
    :return: loss - loss on the testing set, accuracy - accuracy on the testing set
    """
    # file types
    model_save_type = 'h5'
    # files paths
    initialize_seed(seed)
    parameters_name = f'seed_{seed}_lr_{l}_bs_{bs}_ep_{ep}_val_{validation_size}_' \
                      f'es_{early_stopping}_pa_{pat}_md_{md}'
    print(f'Running combination with {parameters_name}')
    # A path for the weights
    load_weights_path = os.path.join(data_path, 'weights', f'weights_{parameters_name}.{model_save_type}')

    siamese = SiameseNetwork(seed=seed, width=WIDTH, height=HEIGHT, cells=CEELS, loss=loss_type, metrics=['accuracy'],
                             optimizer=Adam(lr=l), dropout_rate=0.4)
    siamese.fit(weights_file=load_weights_path, train_path=train_path, validation_size=validation_size,
                batch_size=bs, epochs=ep, early_stopping=early_stopping, patience=pat,
                min_delta=md)
    loss, accuracy = siamese.evaluate(test_file=test_path, batch_size=bs, analyze=True)
    print(f'Loss on Testing set: {loss}')
    print(f'Accuracy on Testing set: {accuracy}')

    # predict_pairs(model)
    return loss, accuracy


def run():
    """
    The main function that runs the training and experiments. Uses the global variables above.
    """
    # file types
    data_set_save_type = 'pickle'
    train_path = os.path.join(data_path, f'{train_name}.{data_set_save_type}')  # A path for the train file
    test_path = os.path.join(data_path, f'{test_name}.{data_set_save_type}')  # A path for the test file
    if LOAD_DATA:  # If the training data already exists
        loader = DataLoader(width=WIDTH, height=HEIGHT, cells=CEELS, data_path=data_path, output_path=train_path)
        loader.load(set_name=train_name)
        loader = DataLoader(width=WIDTH, height=HEIGHT, cells=CEELS, data_path=data_path, output_path=test_path)
        loader.load(set_name=test_name)

    result_path = os.path.join(data_path, f'results.csv')  # A path for the train file
    results = {'lr': [], 'batch_size': [], 'epochs': [], 'patience': [], 'min_delta': [], 'seed': [], 'loss': [],
               'accuracy': []}
    for l in lr:
        for bs in batch_size:
            for ep in epochs:
                for pat in patience:
                    for md in min_delta:
                        for seed in seeds:
                            loss, accuracy = run_combination(l=l, bs=bs, ep=ep, pat=pat, md=md, seed=seed,
                                                             train_path=train_path, test_path=test_path)
                            results['lr'].append(l)
                            results['batch_size'].append(bs)
                            results['epochs'].append(ep)
                            results['patience'].append(pat)
                            results['min_delta'].append(md)
                            results['seed'].append(seed)
                            results['loss'].append(loss)
                            results['accuracy'].append(accuracy)
    df_results = pd.DataFrame.from_dict(results)
    df_results.to_csv(result_path)


def initialize_seed(seed):
    """
    Initialize all relevant environments with the seed.
    """
    os.environ['PYTHONHASHSEED'] = str(seed)
    random.seed(seed)
    np.random.seed(seed)
    tf.random.set_seed(seed)


if __name__ == '__main__':
    if IS_EXPERIMENT:
        # Experiments settings
        seeds = [0]
        lr = [0.00005]
        batch_size = [5]
        epochs = [5]
        patience = [5]
        min_delta = [0.1]
    else:
        # Final settings
        seeds = [0]
        lr = [0.00005]
        batch_size = [5]
        epochs = [5]
        patience = [5]
        min_delta = [0.1]

    print(os.name)
    start_time = time.time()
    print('Starting the experiments')
    run()
    print(f'Total Running Time: {time.time() - start_time}')




posix
Starting the experiments
Running combination with seed_0_lr_5e-05_bs_5_ep_5_val_0.2_es_True_pa_5_md_0.1
/content/drive/My Drive/datasets/lfw2/train.pickle
Trining Length:2200
Loading pre-existed weights file
Loss on Validation set: 1.7150386571884155
Accuracy on Validation set: 0.6522727012634277
Available Metrics: ['loss', 'accuracy']
Classification Report:
              precision    recall  f1-score   support

         0.0       0.71      0.53      0.61       500
         1.0       0.63      0.79      0.70       500

    accuracy                           0.66      1000
   macro avg       0.67      0.66      0.65      1000
weighted avg       0.67      0.66      0.65      1000

correct classification for different people, y=0, prediction->0, name: ['Jose_Bove', '1', 'Michalis_Chrisohoides', '1'] | prob: 0.10458244383335114
misclassification for different people, y=0, prediction->1, name: ['Bob_Iger', '1', 'Brian_Cowen', '1'] | prob: 0.8832272291183472
correct classification for 

In [31]:
import pickle


def load_pickle_file(file_path):
    with open(file_path, 'rb') as f:
        data = pickle.load(f)
    return data
train_name = 'train'

data_path = os.path.sep + os.path.join('content', 'drive', 'My\ Drive', 'datasets', 'lfw2').replace('\\', '')
data_set_save_type = 'pickle'
train_path = os.path.join(data_path, f'{train_name}.{data_set_save_type}')
print(train_path)
data = load_pickle_file(train_path)

# Now you can inspect the loaded data
print(data)
print("I Love u")


/content/drive/My Drive/datasets/lfw2/train.pickle


IOPub data rate exceeded.
The notebook server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--NotebookApp.iopub_data_rate_limit`.

Current values:
NotebookApp.iopub_data_rate_limit=1000000.0 (bytes/sec)
NotebookApp.rate_limit_window=3.0 (secs)

