In [30]:
import os 
import random

import numpy as np 
import pandas as pd 
import matplotlib.pyplot as plt
import tensorflow as tf

from keras.preprocessing.image import ImageDataGenerator 
from keras.utils.np_utils import to_categorical 
from sklearn.model_selection import train_test_split 
from tensorflow.keras.datasets import fashion_mnist

In [9]:
(x_train, y_train), (x_test, y_test) = fashion_mnist.load_data()

x_train = x_train.reshape(-1, 28, 28, 1).astype('float32') / 255 
x_test = x_test.reshape(-1, 28, 28, 1).astype('float32') / 255 

print('Training', x_train.shape, x_train.max()) 
print('Testing', x_test.shape, x_test.max()) 

Training (60000, 28, 28, 1) 1.0
Testing (10000, 28, 28, 1) 1.0


In [10]:
# reorganize by groups 
train_groups = [x_train[np.where(y_train==i)[0]] for i in np.unique(y_train)] 
test_groups = [x_test[np.where(y_test==i)[0]] for i in np.unique(y_train)] 

print('train groups:', [x.shape[0] for x in train_groups]) 
print('test groups:', [x.shape[0] for x in test_groups]) 


train groups: [6000, 6000, 6000, 6000, 6000, 6000, 6000, 6000, 6000, 6000]
test groups: [1000, 1000, 1000, 1000, 1000, 1000, 1000, 1000, 1000, 1000]


In [24]:
def create_pairs(x, digit_indices, labels_num):
    '''
    Positive and negative pair creation.
    Alternates between positive and negative pairs.
    '''
    pairs = []
    labels = []
    n = min([len(digit_indices[d]) for d in range(labels_num)]) - 1
    for d in range(labels_num):
        for i in range(n):
            z1, z2 = digit_indices[d][i], digit_indices[d][i + 1]
            pairs += [[x[z1], x[z2]]]
            inc = random.randrange(1, labels_num)
            dn = (d + inc) % labels_num
            z1, z2 = digit_indices[d][i], digit_indices[dn][i]
            pairs += [[x[z1], x[z2]]]
            labels += [1, 0]
    return np.array(pairs), np.array(labels)

def make_pair_dataset(images, labels, labels_num):
    digit_indices = [np.where(labels == i)[0] for i in range(labels_num)]
    pairs, y = create_pairs(images, digit_indices, labels_num)
    return pairs, y

In [28]:
train_pairs, train_lables = make_pair_dataset(x_train, y_train, len(np.unique(y_train)))
test_pairs, test_lables = make_pair_dataset(x_test, y_test, len(np.unique(y_test)))

In [25]:
len(np.unique(y_train))

10

# XCeption

In [15]:
from tensorflow.keras.layers import Input, Conv2D, SeparableConv2D, \
     Add, Dense, BatchNormalization, ReLU, MaxPool2D, GlobalAvgPool2D
from tensorflow.keras import Model 


In [12]:
def conv_bn(x, filters, kernel_size, strides=1):
    x = Conv2D(filters=filters,
               kernel_size=kernel_size,
               strides=strides,
               padding='same',
               use_bias=False)(x)
    x = BatchNormalization()(x)
    return x

def sep_bn(x, filters, kernel_size, strides=1):
    x = SeparableConv2D(filters=filters,
                        kernel_size=kernel_size,
                        strides=strides,
                        padding='same',
                        use_bias=False)(x)
    x = BatchNormalization()(x)
    return x

In [14]:
def entry_flow(x):
    x = conv_bn(x, filters=32, kernel_size=3, strides=2)
    x = ReLU()(x)
    x = conv_bn(x, filters=64, kernel_size=3)
    tensor = ReLU()(x)
  
    x = sep_bn(tensor, filters=128, kernel_size=3)
    x = ReLU()(x)
    x = sep_bn(x, filters=128, kernel_size=3)
    x = MaxPool2D(pool_size=3, strides=2, padding='same')(x)

    tensor = conv_bn(tensor, filters=128, kernel_size=1, strides=2)

    x = Add()([tensor, x])
    x = ReLU()(x)
    x = sep_bn(x, filters=256, kernel_size=3)
    x = ReLU()(x)
    x = sep_bn(x, filters=256, kernel_size=3)
    x = MaxPool2D(pool_size=3, strides=2, padding='same')(x)

    tensor = conv_bn(tensor, filters=256, kernel_size=1, strides=2)

    x = Add()([tensor, x])
    x = ReLU()(x)
    x = sep_bn(x, filters=728, kernel_size=3)
    x = ReLU()(x)
    x = sep_bn(x, filters=728, kernel_size=3)
    x = MaxPool2D(pool_size=3, strides=2, padding='same')(x)

    tensor = conv_bn(tensor, filters=728, kernel_size=1, strides=2)
    x = Add()([tensor, x])

    return x

def middle_flow(tensor):
    for _ in range(8):
        x = ReLU()(tensor)
        x = sep_bn(x, filters=728, kernel_size=3)
        x = ReLU()(x)
        x = sep_bn(x, filters=728, kernel_size=3)
        x = ReLU()(x)
        x = sep_bn(x, filters=728, kernel_size=3)

        tensor = Add()([tensor, x])

    return tensor

def exit_flow(tensor):
    x = ReLU()(tensor)
    x = sep_bn(x, filters=728, kernel_size=3)
    x = ReLU()(x)
    x = sep_bn(x, filters=1024, kernel_size=3)
    x = MaxPool2D(3, strides=2, padding='same')(x)

    tensor = conv_bn(tensor, filters=1024, kernel_size=1, strides=2)

    x = Add()([tensor, x])
    x = sep_bn(x, filters=1536, kernel_size=3)
    x = ReLU()(x)
    x = sep_bn(x, filters=2048, kernel_size=3)
    x = ReLU()(x)
    x = GlobalAvgPool2D()(x)
#     x = Dense(units=1000, activation='softmax')(x)

    return x

In [49]:
def XCeption(img_shape: list):
    input = Input(shape=img_shape)

    x = entry_flow(input)
    x = middle_flow(x)
    output = exit_flow(x)

    model = Model(input, output, name='XCeption')
    return model


In [55]:
XCeption([30, 30, 1]).summary()

Model: "XCeption"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_7 (InputLayer)            [(None, 30, 30, 1)]  0                                            
__________________________________________________________________________________________________
conv2d_26 (Conv2D)              (None, 15, 15, 32)   288         input_7[0][0]                    
__________________________________________________________________________________________________
batch_normalization_160 (BatchN (None, 15, 15, 32)   128         conv2d_26[0][0]                  
__________________________________________________________________________________________________
re_lu_140 (ReLU)                (None, 15, 15, 32)   0           batch_normalization_160[0][0]    
___________________________________________________________________________________________

# Define Siamese

In [41]:
img_shape =  tuple(np.concatenate((x_test.shape[1:], [1])))

In [56]:
feature_model = XCeption(img_shape[:-1])

In [57]:
img_a_in = tf.keras.layers.Input(shape = img_shape, name = 'ImageA_Input')
img_b_in = tf.keras.layers.Input(shape = img_shape, name = 'ImageB_Input')
img_a_feat = feature_model(img_a_in)
img_b_feat = feature_model(img_b_in)
combined_features = tf.keras.layers.concatenate([img_a_feat, img_b_feat], name='merge_features')
combined_features = tf.keras.layers.Dense(16, activation = 'linear')(combined_features)
combined_features = tf.keras.layers.BatchNormalization()(combined_features)
combined_features = tf.keras.layers.Activation('relu')(combined_features)
combined_features = tf.keras.layers.Dense(4, activation = 'linear')(combined_features)
combined_features = tf.keras.layers.BatchNormalization()(combined_features)
combined_features = tf.keras.layers.Activation('relu')(combined_features)
combined_features = tf.keras.layers.Dense(1, activation = 'sigmoid')(combined_features)

In [58]:
siamese_model = tf.keras.Model(inputs = [img_a_in, img_b_in], outputs=[combined_features], name = 'Siamese')
siamese_model.summary()


Model: "Siamese"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
ImageA_Input (InputLayer)       [(None, 28, 28, 1, 1 0                                            
__________________________________________________________________________________________________
ImageB_Input (InputLayer)       [(None, 28, 28, 1, 1 0                                            
__________________________________________________________________________________________________
XCeption (Model)                (None, 2048)         20860904    ImageA_Input[0][0]               
                                                                 ImageB_Input[0][0]               
__________________________________________________________________________________________________
merge_features (Concatenate)    (None, 4096)         0           XCeption[1][0]             

# Training

In [59]:
siamese_model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=1e-3),
                         loss='binary_crossentropy',
                         metrics=['mae'])

In [62]:
early_stopping = tf.keras.callbacks.EarlyStopping(
    monitor='val_loss',
    min_delta=0,
    patience=3,
    verbose=0,
    mode='auto',
    baseline=None,
    restore_best_weights=True
)

save_model = tf.keras.callbacks.ModelCheckpoint(
    "/content/drive/MyDrive/Classroom/similarity_model",
    monitor='val_loss',
    verbose=0,
    save_best_only=True,
    save_weights_only=False,
)
reduce_lr = tf.keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.2,
                              patience=3, min_lr=0.00001)

In [None]:
# train_pairs, train_lables
# test_pairs, test_lables

history = siamese_model.fit([train_pairs[:,0], train_pairs[:, 1]], train_lables, epochs=50, batch_size=16, 
                    validation_data=([test_pairs[:, 0], test_pairs[:, 1]], test_lables), callbacks=[early_stopping, save_model, reduce_lr])

Epoch 1/50
