In [21]:
import numpy as np
import pandas as pd
import tensorflow as tf
import tensorflow.keras.backend as K

In [61]:
#load data
from tensorflow.keras import datasets as dt
((x_train, y_train), (x_test, y_test)) = dt.mnist.load_data(path='mnist.npz')
(x_test.shape, y_test.shape)

((10000, 28, 28), (10000,))

In [62]:
#reformat to fit with expected shapes for Conv2D
x_train = np.expand_dims(x_train, axis=3)
x_test = np.expand_dims(x_test, axis=3)
x_test.shape

(10000, 28, 28, 1)

if you want to make it a bit more challenging, remove most of the training examples from 1-2 classes, leave just 0-10 from each

In [68]:
#build a simple model based on a 2D convolution

def get_CNN_model(input_shape):
    #use keras.layers.Conv2D interleaved with keras.layers.Maxpooling2D and with some keras.layers.Dense in the end
    #alternatively use only dense layers (you will have to Flatten() the inputs)
    #keras.Sequential is recommended
    #do not forget to set the input shape to the first layer
    filter_size = 3
    pool_size = 2
    from keras.models import Sequential
    from keras.layers import Conv2D, MaxPooling2D, Dense, Flatten

    model = Sequential([
      Conv2D(32, filter_size, input_shape=input_shape,activation='relu'),
      MaxPooling2D(pool_size=pool_size),
      Conv2D(64, filter_size,activation='relu'),
      MaxPooling2D(pool_size=pool_size),
      Conv2D(128, filter_size,activation='relu'),
      Flatten(),
      Dense(1024, activation='softmax'), # this is the output of this NN, the feature vector
    ])

    return model


In [69]:
model = get_CNN_model((28,28,1))
model.summary()

_________________________________________________________________
Layer (type)                 Output Shape              Param #   
conv2d_25 (Conv2D)           (None, 26, 26, 32)        320       
_________________________________________________________________
max_pooling2d_17 (MaxPooling (None, 13, 13, 32)        0         
_________________________________________________________________
conv2d_26 (Conv2D)           (None, 11, 11, 64)        18496     
_________________________________________________________________
max_pooling2d_18 (MaxPooling (None, 5, 5, 64)          0         
_________________________________________________________________
conv2d_27 (Conv2D)           (None, 3, 3, 128)         73856     
_________________________________________________________________
flatten_9 (Flatten)          (None, 1152)              0         
_________________________________________________________________
dense_19 (Dense)             (None, 1024)              1180672   
Total para

In [70]:
#construct a siammese network (two towers) from a previous CNN model. 
#Use some distance/similarity metric (L1, L2, cosine sim) and possibly an activation on top of it (or a simple tf.reduce_sum)
def get_Siammese_model(model, input_shape):
    #define kers.layers.Input for left and right tower
    #parameter sharing (use the same model)
    #you may need layer.Lambda()
    #keras.Model() prefered here
    from keras.models import Model
    from keras.layers import Input, Lambda, Dense
    import keras.backend as K
    
    # Define the tensors for the two input images
    left_input = Input(input_shape)
    right_input = Input(input_shape)
    
    # Generate the encodings (feature vectors) for the two images
    encoded_l = model(left_input)
    encoded_r = model(right_input)
    
    def cosine_distance(vests):
        x, y = vests
        x = K.l2_normalize(x, axis=-1)
        y = K.l2_normalize(y, axis=-1)
        return -K.mean(x * y, axis=-1, keepdims=True)

    # Add a customized layer to compute the absolute difference between the encodings
    L1_layer = Lambda(lambda tensors:K.abs(tensors[0] - tensors[1]))
    L1_distance = L1_layer([encoded_l, encoded_r])
    
    # Add a dense layer with a sigmoid unit to generate the similarity score
    prediction = Dense(1,activation='sigmoid')(L1_distance)
    
    # Connect the inputs with the outputs
    model_siam = Model(inputs=[left_input,right_input],outputs=prediction)
    
    return model_siam  
    

In [71]:
model_siam = get_Siammese_model(model, (28,28,1))
model_siam.summary()

__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_23 (InputLayer)           (None, 28, 28, 1)    0                                            
__________________________________________________________________________________________________
input_24 (InputLayer)           (None, 28, 28, 1)    0                                            
__________________________________________________________________________________________________
sequential_9 (Sequential)       (None, 1024)         1273344     input_23[0][0]                   
                                                                 input_24[0][0]                   
__________________________________________________________________________________________________
lambda_12 (Lambda)              (None, 1024)         0           sequential_9[1][0]               
          

In [72]:
#compile the model with Adam optimizer and e.g. binary cross entropy loss
#or define your own loss function, see e.g. here: 
#you may add the metrics of your choice

model_siam.compile(loss="binary_crossentropy", optimizer="adam", metrics=["accuracy"])

In [73]:
# source data are in an incompatible format - we need to construct pairs
# a batch generator would be better, but lets resort for simple static samples for the sake of simplicity
def create_pairs(x, y, size):
    pairs = []
    labels = []
    #select random indices
    a = np.random.choice(len(y), size)
    b = np.random.choice(len(y), size)
    x1 = x[a]
    x2 = x[b]
    y = [int(i) for i in y[a] == y[b]]
             
    return (x1, x2, np.array(y))

In [74]:
#run the model training (depending on your PC performance, you may set lower/higher train set sizes)
#do not increase epochs too much (overfitting)

x1, x2, y = create_pairs(x_train, y_train, 300000)
x1_test, x2_test, y_pair_test = create_pairs(x_test, y_test, 10000)
model_siam.fit([x1, x2], y,
          batch_size=128,
          epochs=1,
          validation_data=([x1_test, x2_test], y_pair_test))

Train on 300000 samples, validate on 10000 samples
Epoch 1/1


<keras.callbacks.History at 0x121100da0>

In [None]:
# real task: classification: get the class of the most similar item
# get K samples of each class, label with the max class with maximal similarity to the target

def get_samples(x, y):
    indices = [(y == i).nonzero()[0][0:10] for i in np.unique(y)]
    indices = np.array(indices).flatten()
    return (x[indices], y[indices])

In [32]:
# a simple validation for a single sample. Extend this to get some overall results for the whole test set. 
# Return, e.g., accuracy
i=0
x0_sample, y0_sample = get_samples(x_train, y_train)
example = x_test[i]
x0_test = np.repeat(example[np.newaxis, :, :, :], len(y0_sample), axis=0)
pred = np.argmax(model_siam.predict([x0_test, x0_sample]))
(y_test[i], y0_sample[pred])

(7, 0)

## Extensions

- Get the embeddings of individual data samples (use the original model)
- Do some dimensionality reduction (PCA) and display the selected items based on their embeddings
- What similarity metrics did you use & why? What are the consequences?
- How does the individual convolutional filters work? Can you display their values for selected inputs?
