In [0]:
import numpy as np

from tensorflow.keras import datasets
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import Dense, Flatten, Conv2D, MaxPooling2D, Lambda, Dropout, concatenate, Input
from tensorflow.keras.optimizers import Adam
from tensorflow.keras import backend as K

from sklearn.neighbors import KNeighborsClassifier

In [0]:
 (X_train, y_train), (X_test, y_test) = datasets.mnist.load_data()

 X_train = np.reshape(X_train, (X_train.shape[0], 28, 28, 1))
 X_test = np.reshape(X_test, (X_test.shape[0], 28, 28, 1))

In [0]:
def sort_data(X_train, y_train, X_test, y_test):
  '''
  sort the data by label
  '''
  data_train = []
  data_test = []
  
  for i in range(10):
    data_train.append(X_train[y_train == i])
    data_test.append(X_test[y_test == i])
  return np.array(data_train), np.array(data_test)

In [0]:
data_train, data_test = sort_data(X_train, y_train, X_test, y_test)

In [0]:
## get hard batch
def get_rd_batch(batch_size, datas):
  '''
  batch_size: size of the batch we want to build
  datas: datas goten (data_train or data_test)
  GOAL: build triplet data set -> (A, P, N)
  '''
  triplets = [np.zeros((batch_size, 28, 28, 1)) for _ in range(3)]

  for i in range(batch_size):
    ## build one triplet
    rd_label = np.random.randint(0, 10)
    nb_sample = datas[rd_label].shape[0]

    anchor_pos = np.random.randint(0, nb_sample)
    positive_pos = np.random.randint(0, nb_sample)

    rd_label_neg = (rd_label + np.random.randint(1, 10)) % 10
    nb_sample_neg = datas[rd_label_neg].shape[0]
    negative_pos = np.random.randint(0, nb_sample_neg)

    triplets[0][i,:,:,:] = datas[rd_label][anchor_pos]
    triplets[1][i,:,:,:] = datas[rd_label][positive_pos]
    triplets[2][i,:,:,:] = datas[rd_label_neg][negative_pos]
  return triplets

In [0]:
triplets_train = get_rd_batch(40000, data_train)
triplets_test = get_rd_batch(1000, data_test)

In [0]:
## triplet loss function
def triplet_loss(y_target, y_pred, margin=0.4):
  '''
  Triplet loss function
  y_target: useless param
  '''
  l = y_pred.shape[-1]

  a = y_pred[:, 0:int(l//3)]
  p = y_pred[:, int(l//3):int(l * 2//3)]
  n = y_pred[:, int(l * 2//3):l]

  pos_dist = K.sum(K.square(a - p), axis=1)
  neg_dist = K.sum(K.square(a - n), axis=1)

  return K.maximum(pos_dist - neg_dist + margin, 0.0)

In [0]:
## create our model
def create_triplet_model():
  '''
  Create our model
  '''
  input_shape = (28, 28, 1)

  inpt_a = Input(input_shape)
  inpt_p = Input(input_shape)
  inpt_n = Input(input_shape)

  ## create our CNN model
  cnn = Sequential()
  cnn.add(Conv2D(32, (3, 3), input_shape=input_shape, activation='relu'))
  cnn.add(MaxPooling2D(2, 2))
  cnn.add(Conv2D(64, (3, 3), activation='relu'))
  cnn.add(MaxPooling2D(2, 2))
  cnn.add(Dropout(0.1))
  cnn.add(Flatten())
  cnn.add(Dense(10, activation='relu')) ## embeddings
  cnn.add(Lambda(lambda x:K.l2_normalize(x, axis=1)))

  ## link the cnn with the input to create encoded vector
  encoded_a = cnn (inpt_a)
  encoded_p = cnn (inpt_p)
  encoded_n = cnn (inpt_n)

  ## create merged layer (with output of a, p and n)
  merged_layer = concatenate([encoded_a, encoded_p, encoded_n])

  ## create final model
  model = Model([inpt_a, inpt_p, inpt_n], merged_layer)

  ## compile
  adm = Adam(lr=0.0001)
  model.compile(loss=triplet_loss, optimizer=adm)
  
  return model



In [0]:
model = create_triplet_model()

In [99]:
model.summary()

Model: "model_5"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_16 (InputLayer)           [(None, 28, 28, 1)]  0                                            
__________________________________________________________________________________________________
input_17 (InputLayer)           [(None, 28, 28, 1)]  0                                            
__________________________________________________________________________________________________
input_18 (InputLayer)           [(None, 28, 28, 1)]  0                                            
__________________________________________________________________________________________________
sequential_5 (Sequential)       (None, 10)           34826       input_16[0][0]                   
                                                                 input_17[0][0]             

In [103]:
y_empty_tr = np.empty((triplets_train[0].shape[0],))
y_empty_te = np.empty((triplets_test[0].shape[0],))
acc = model.fit([triplets_train[0], triplets_train[1], triplets_train[2]], y_empty_tr, batch_size=64, epochs=3, validation_data=([triplets_test[0], triplets_test[1], triplets_test[2]], y_empty_te))

Train on 40000 samples, validate on 1000 samples
Epoch 1/3
Epoch 2/3
Epoch 3/3


In [0]:
def compute_accuracy(y_true, y_pred):
  '''
  Compute accuracy
  '''
  true = 0
  n = y_true.shape[0]
  for i in range(n):
    true += (y_true[i] == y_pred[i])
  return true / n

In [0]:
def find_embedding(model, data_train, embedding_size):
  '''
  Get 1 random image for each label (sum: 10 img)
  Find the embedding vector for each image
  '''
  nothing = np.zeros((28, 28, 1))
  embedding_vect = []
  for i in range(10):
    m = data_test[i].shape[0]
    rd_pos = np.random.randint(0, m)
    img = data_test[i][rd_pos]
    y_pred = model.predict([[img], [nothing], [nothing]])
    embedding_vect.append(y_pred[0][:embedding_size])
  return np.array(embedding_vect), np.arange(10)

In [0]:
## embedding vector for 10 images from the training set
embedding_ref, embedding_labels = find_embedding(model, data_train, 10)

In [134]:
## KNN model
neigh_model = KNeighborsClassifier(n_neighbors=1)
neigh_model.fit(embedding_ref, embedding_labels)

KNeighborsClassifier(algorithm='auto', leaf_size=30, metric='minkowski',
                     metric_params=None, n_jobs=None, n_neighbors=1, p=2,
                     weights='uniform')

In [0]:
## make predictions
to_pred = X_test[:1000] ## data we want to predict
y_true = y_test[:1000] ## label of the data to predict
nothing = np.zeros((to_pred.shape[0], 28, 28, 1))

pred = model.predict([to_pred, nothing, nothing])[:, 0:10]
y_pred = neigh_model.predict(pred)

In [152]:
acc = compute_accuracy(y_true, y_pred)
print(acc)

0.843


In [0]:
### Quite low accuracy because the model has learnt from easy triplet and not from hard triplet
### Here triplet batches have been generated randomly
### One way to improve this work: create a function that generate hard triplet batch