In [1]:
!pip install keras
!pip install tensorflow
!pip install numpy
!pip install matplotlib



In [3]:
# Import libraries and modules
import numpy as np
import tensorflow as tf
import keras.backend as K
import matplotlib.pyplot as plt
from keras.models import Sequential
from keras.models import Model
from keras.layers import Conv2D
from keras.layers import MaxPooling2D
from keras.layers import Dense
from keras.layers import Flatten
from keras.layers import Dropout
from keras.layers import Lambda
from keras.layers import Input
from keras.layers import Embedding
from keras.layers import Resizing
from tensorflow.keras import layers
from keras.layers import GlobalAveragePooling2D
from tensorflow.keras.optimizers import Adam
from keras.applications.mobilenet_v2 import MobileNetV2
from tensorflow.keras.utils import to_categorical
from keras.preprocessing.image import load_img
from keras.preprocessing.image import img_to_array
from keras.models import load_model
from sklearn.metrics import cohen_kappa_score
from sklearn.metrics import classification_report
from sklearn.metrics import confusion_matrix
import seaborn as sns

### UTILITY FUNCTIONS

In [4]:
def normalize_pixels(image):
    """normalize pixels to be between 0 and 1"""

	# convert from integers to floats
    image_norm = image.astype('float32')
	# normalize to range -1 and 1
    image_norm = (image_norm - 127.5) / 127.5

	# return normalized images
    return image_norm

def de_normalize_pixels(image, _from = 0, _to = 1):
    """de-normalize pixels to be between 0 and 255"""

    # Normalize between 0 and 1
    image_de_norm = (image * 127.5) + 127.5

    # Normalize between 0 and 1
    image_de_norm = image_de_norm/255 

    # Normalize between _from and _to
    image_de_norm = (image_de_norm * (_to - _from)) + _from
    
    return image_de_norm

def load_image(path):
    """load image from path and convert to array"""

    img = load_img(path, target_size=(224, 224), interpolation='bilinear')
    x = img_to_array(img)
    x = normalize_pixels(x)
    x = np.expand_dims(x, axis=0)
    return x

def unison_shuffled_copies(a, b):
    """shuffle two arrays in unison"""

    assert len(a) == len(b)
    p = np.random.permutation(len(a))
    return a[p], b[p]

test_image_1 = load_image(r'./data/tiered_imagenet/train/n01530575/n0153057500000001.jpg')

test_image_1_de = de_normalize_pixels(test_image_1, 0, 255)
print(np.min(test_image_1))
print(np.max(test_image_1))
print(np.min(test_image_1_de))
print(np.max(test_image_1_de))

-1.0
0.9607843
0.0
250.0


### Load Data

In [5]:
# Load images into arrays
import pathlib


DATA_PATH = './data/tiered_imagenet/train'
DATA_TRAIN_SPLIT = 0.8
DATA_VALIDATION_SPLIT = 0.1
#specify a maximum how many pair that can be made pr class (value is for both positive and negative pairs )
MAX_PAIRS_PER_CLASS = 2


data_dir = pathlib.Path(DATA_PATH)

image_count = len(list(data_dir.glob('*/*.jpg')) + list(data_dir.glob('*/*.jpeg')) + list(data_dir.glob('*/*.png')))
print("number of available images: " + str(image_count))

folders = [x for x in data_dir.iterdir() if x.is_dir()]

img_array_data = []

for i, folder in enumerate(folders):
    img_array_data.append([])

    for j, img in enumerate(folder.iterdir()):
        if(j >= MAX_PAIRS_PER_CLASS):
            break
        img_array_data[i].append(load_image(img))
        
print("number of classes", len(img_array_data))
print("number of selected images: ", sum(len(item) for item in img_array_data))

number of available images: 448695
number of classes 351
number of selected images:  702


In [6]:
data = []
labels = []

# Create image pairs
def create_image_pairs(images):
    for i, array in enumerate(images):
        for j in range(len(array)-1):
            # True
            data.append([
                images[i][j], 
                images[i][np.random.randint(0, len(images[i]) - 1)]])
            labels.append(1)

            x_1 = np.random.randint(0, len(images) - 1)
            x_2 = np.random.randint(0, len(images[x_1]) - 1)

            # False
            data.append([
                images[i][j], 
                images[x_1][x_2]])
            labels.append(0)

create_image_pairs(img_array_data)

data = np.array(data)
labels = np.array(labels)
print("number of image pairs", len(data))

number of image pairs 702


In [7]:
data, labels = unison_shuffled_copies(data, labels)

show_amount = 10
print(data[:show_amount])
print(labels[:show_amount])

[[[[[[ 1.          1.          1.        ]
     [ 1.          1.          1.        ]
     [ 1.          1.          1.        ]
     ...
     [ 0.8980392   0.9137255   0.8901961 ]
     [ 0.8901961   0.90588236  0.88235295]
     [ 0.40392157  0.41960785  0.4117647 ]]

    [[ 1.          1.          1.        ]
     [ 1.          1.          1.        ]
     [ 1.          1.          1.        ]
     ...
     [ 0.8980392   0.9137255   0.8901961 ]
     [ 0.8901961   0.90588236  0.8980392 ]
     [ 0.40392157  0.41960785  0.4117647 ]]

    [[ 1.          1.          1.        ]
     [ 1.          1.          1.        ]
     [ 1.          1.          1.        ]
     ...
     [ 0.8980392   0.9137255   0.90588236]
     [ 0.8901961   0.90588236  0.8980392 ]
     [ 0.40392157  0.41960785  0.4117647 ]]

    ...

    [[ 0.84313726  0.8980392   0.94509804]
     [ 0.84313726  0.8980392   0.94509804]
     [ 0.84313726  0.8980392   0.94509804]
     ...
     [ 0.7411765   0.7019608   0.67058825]
   

Data splitting

In [8]:
train_split = int(DATA_TRAIN_SPLIT * len(data))
validation_split = train_split+int(DATA_VALIDATION_SPLIT * len(data))

print("TRAIN SPLIT: " + "0 - " + str(train_split) + " | " + str(train_split) + " | " + str(DATA_TRAIN_SPLIT*100) + '%')
print("VALIDATION SPLIT: " + str(train_split) + " - " + str(validation_split) + " | " + str(validation_split-train_split) + " | " + str(DATA_VALIDATION_SPLIT*100) + '%')
print("TEST SPLIT: " + str(validation_split) + " - " + str(len(data)) + " | " + str(len(data) - validation_split) + " | " + str(100 - DATA_TRAIN_SPLIT*100 - DATA_VALIDATION_SPLIT*100) + '%')

data_train = data[0:train_split]
data_validation = data[train_split:validation_split]
data_test = data[validation_split:]

labels_train = labels[0:train_split]
labels_validation = labels[train_split:validation_split]
labels_test = labels[validation_split:]

print("---")
print("Training Data Shape: " + str(data_train.shape))
print("Validation Data Shape: " + str(data_validation.shape))
print("Test Data Shape: " + str(data_test.shape))
print("---")
print("Training Labels Shape: " + str(labels_train.shape))
print("Validation Labels Shape: " + str(labels_validation.shape))
print("Test Labels Shape: " + str(labels_test.shape))

TRAIN SPLIT: 0 - 561 | 561 | 80.0%
VALIDATION SPLIT: 561 - 631 | 70 | 10.0%
TEST SPLIT: 631 - 702 | 71 | 10.0%
---
Training Data Shape: (561, 2, 1, 224, 224, 3)
Validation Data Shape: (70, 2, 1, 224, 224, 3)
Test Data Shape: (71, 2, 1, 224, 224, 3)
---
Training Labels Shape: (561,)
Validation Labels Shape: (70,)
Test Labels Shape: (71,)


### Define the Architecture 

Model With MobileNetV2

In [18]:
feaureExtracorInputSize = (224, 224, 3)

# define the inputshapes for the model
input_def_1 = Input(feaureExtracorInputSize)
input_def_2 = Input(feaureExtracorInputSize)

# define the feature extraction submodel
mobilenet = MobileNetV2(input_shape=feaureExtracorInputSize, include_top = False, weights='imagenet')
mobilenet.trainable = True
featureExtractor = Sequential()
featureExtractor.add(mobilenet)
featureExtractor.add(GlobalAveragePooling2D())

#Define our custom distance layer as a lambda layer
distanceLayer = Lambda(lambda tensors: K.abs(tensors[0] - tensors[1]))


# the inputs goes into the feature extractor
embedding_1 = featureExtractor(input_def_1)
embedding_2 = featureExtractor(input_def_2)

#now we use the distance layer, and we define the image embeddings as input to the distance layer
feature_distance = distanceLayer([embedding_1, embedding_2])

# use the feature distance to optain similairty measure
dense1 = Dense(1024, activation='relu')(feature_distance)
dense2 = Dense(512, activation='relu')(dense1)
output = Dense(1, activation='sigmoid')(dense2)


model = Model(inputs=[input_def_1, input_def_2], outputs=output )

model.summary()


Model: "model_7"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_15 (InputLayer)          [(None, 224, 224, 3  0           []                               
                                )]                                                                
                                                                                                  
 input_16 (InputLayer)          [(None, 224, 224, 3  0           []                               
                                )]                                                                
                                                                                                  
 sequential_8 (Sequential)      (None, 1280)         2257984     ['input_15[0][0]',               
                                                                  'input_16[0][0]']         

Train and validate

In [19]:
# Define the optimizer and compile the model
opt = Adam(learning_rate=0.0003, decay=1e-6)

model.compile(loss='binary_crossentropy', optimizer=opt, metrics=['acc'])

model.history = model.fit([data_train[:, 0, 0], data_train[:, 1, 0]], labels_train, validation_data=([data_validation[:, 0, 0], data_validation[:, 1, 0]], labels_validation),epochs = 4, batch_size = 16)

Epoch 1/4
Epoch 2/4
Epoch 3/4
Epoch 4/4


In [16]:
export_models = []
output = Sequential()
input = Input((224, 224, 3))
print(model.layers[len(model.layers)-1].get_weights())
for layer in model.layers:
    print(layer.name)
           

    if("lambda" not in layer.name):
        output.add(layer)
    else:
        output = output(input)
        export_models.append(Model(inputs=input, outputs=output))
        #clear input and output to initialize the building of the next model
        input = Input(layer.output_shape)
        output = Sequential()     
output = output(input)
export_models.append(Model(inputs=input, outputs=output))
for index, m in enumerate(export_models):
    print(index)
    m.save('./models/siamese_model_p'+str(index)+'.h5')


[array([[ 3.71314920e-02],
       [ 8.88634101e-02],
       [ 1.02191366e-01],
       [ 1.62391782e-01],
       [-1.02589250e-01],
       [-9.75748226e-02],
       [ 9.45838243e-02],
       [-5.65782562e-02],
       [-7.06245825e-02],
       [-6.69293255e-02],
       [-2.55370289e-02],
       [ 6.03892021e-02],
       [ 1.04985319e-01],
       [ 6.29869401e-02],
       [ 1.04351096e-01],
       [-1.05153419e-01],
       [ 7.58277699e-02],
       [ 1.25475051e-02],
       [ 6.47496572e-03],
       [-5.04302904e-02],
       [ 8.25972036e-02],
       [-3.07112280e-02],
       [-7.47652724e-02],
       [ 6.91028172e-03],
       [ 5.23241572e-02],
       [ 8.93804133e-02],
       [-3.35796289e-02],
       [ 7.38402307e-02],
       [ 2.42665410e-02],
       [ 1.42928697e-02],
       [ 6.02909625e-02],
       [ 7.04955086e-02],
       [-2.85230018e-02],
       [-6.20728359e-02],
       [ 4.43840511e-02],
       [ 1.04646698e-01],
       [-4.34361733e-02],
       [-6.05964176e-02],
       [-3.