In [1]:
import tensorflow as tf
from tensorflow.keras.models import *
from tensorflow.keras.layers import *
import matplotlib.pyplot as plt
import numpy as np
import random
import pickle
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.model_selection import train_test_split
from sklearn.datasets import fetch_olivetti_faces

In [2]:
faces = fetch_olivetti_faces()
targets, data = faces["target"], faces["data"]

In [3]:
n = 64
data = data.reshape(-1, 64, 64)

In [4]:
x_train, x_test, y_train, y_test = train_test_split(
    data, 
    targets, 
    stratify = targets,
    test_size = 0.2,
    random_state = 0
)

In [5]:
m = x_train.shape[0]

In [6]:
def plot_triplets(examples):
    n = examples[0].shape[1]
    plt.figure(figsize = (6, 2))
    for i in range(3):
        plt.subplot(1, 3, 1 + i)
        plt.imshow(examples[i].reshape(n, n), cmap = 'gray')
        plt.xticks([])
        plt.yticks([])
    plt.show()

In [7]:
def create_batch(x_train, y_train, batch_size):
    m = x_train.shape[0]
    n = x_train.shape[1]
    
    x_anchors = np.zeros((batch_size, n, n))
    x_positives = np.zeros((batch_size, n, n))
    x_negatives = np.zeros((batch_size, n, n))
    
    for i in range(0, batch_size):
        # We need to find an anchor, a positive example and a negative example
        random_index = random.randint(0, m - 1)
        x_anchor = x_train[random_index]
        y = y_train[random_index]
        
        indices_for_pos = np.squeeze(np.where(y_train == y))
        indices_for_neg = np.squeeze(np.where(y_train != y))
           
        x_positive = x_train[indices_for_pos[random.randint(0, len(indices_for_pos) - 1)]]
        x_negative = x_train[indices_for_neg[random.randint(0, len(indices_for_neg) - 1)]]
        
        x_anchors[i] = x_anchor
        x_positives[i] = x_positive
        x_negatives[i] = x_negative
        
    x_anchors = x_anchors.reshape(-1, n, n, 1)    
    x_positives = x_positives.reshape(-1, n, n, 1)
    x_negatives = x_negatives.reshape(-1, n, n, 1)
    
    return x_anchors, x_positives, x_negatives

In [8]:
emb_size = 16

# Initialising the CNN
embedding_model = Sequential()

# 1 - Convolution
embedding_model.add(Conv2D(16,(3,3), padding='same', input_shape=((n, n, 1))))
embedding_model.add(BatchNormalization())
embedding_model.add(Activation('relu'))
embedding_model.add(MaxPooling2D(pool_size=(2, 2)))
embedding_model.add(Dropout(0.25))

# 2nd Convolution layer
embedding_model.add(Conv2D(32,(5,5), padding='same'))
embedding_model.add(BatchNormalization())
embedding_model.add(Activation('relu'))
embedding_model.add(MaxPooling2D(pool_size=(2, 2)))
embedding_model.add(Dropout(0.25))

# 3rd Convolution layer
embedding_model.add(Conv2D(64,(3,3), padding='same'))
embedding_model.add(BatchNormalization())
embedding_model.add(Activation('relu'))
embedding_model.add(MaxPooling2D(pool_size=(2, 2)))
embedding_model.add(Dropout(0.25))

# Flattening
embedding_model.add(Flatten())

# Fully connected layer
embedding_model.add(Dense(64))
embedding_model.add(BatchNormalization())
embedding_model.add(Activation('relu'))
embedding_model.add(Dropout(0.25))

# Last layer
embedding_model.add(Dense(emb_size, activation='sigmoid'))

embedding_model.summary()

Model: "sequential"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
conv2d (Conv2D)              (None, 64, 64, 16)        160       
_________________________________________________________________
batch_normalization (BatchNo (None, 64, 64, 16)        64        
_________________________________________________________________
activation (Activation)      (None, 64, 64, 16)        0         
_________________________________________________________________
max_pooling2d (MaxPooling2D) (None, 32, 32, 16)        0         
_________________________________________________________________
dropout (Dropout)            (None, 32, 32, 16)        0         
_________________________________________________________________
conv2d_1 (Conv2D)            (None, 32, 32, 32)        12832     
_________________________________________________________________
batch_normalization_1 (Batch (None, 32, 32, 32)        1

In [9]:
input_anchor = tf.keras.layers.Input(shape = (n, n, 1))
input_positive = tf.keras.layers.Input(shape = (n, n, 1))
input_negative = tf.keras.layers.Input(shape = (n, n, 1))

embedding_anchor = embedding_model(input_anchor)
embedding_positive = embedding_model(input_positive)
embedding_negative = embedding_model(input_negative)

output = tf.keras.layers.concatenate([embedding_anchor, embedding_positive, embedding_negative], axis=1)

net = tf.keras.models.Model([input_anchor, input_positive, input_negative], output)
net.summary()

Model: "functional_1"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_1 (InputLayer)            [(None, 64, 64, 1)]  0                                            
__________________________________________________________________________________________________
input_2 (InputLayer)            [(None, 64, 64, 1)]  0                                            
__________________________________________________________________________________________________
input_3 (InputLayer)            [(None, 64, 64, 1)]  0                                            
__________________________________________________________________________________________________
sequential (Sequential)         (None, 16)           295440      input_1[0][0]                    
                                                                 input_2[0][0]         

In [10]:
def triplet_loss(alpha, emb_size):
    
    def loss(y_true, y_pred): # Euclidean distance
        anchor, positive, negative = y_pred[:,:emb_size], y_pred[:,emb_size:2*emb_size], y_pred[:,2*emb_size:]
        positive_dist = tf.reduce_mean(tf.square(anchor - positive), axis = 1)
        negative_dist = tf.reduce_mean(tf.square(anchor - negative), axis = 1)
        
        return tf.maximum(positive_dist - negative_dist + alpha, 0.)
    
    return loss

In [11]:
def data_generator(x_train, y_train, batch_size, emb_size):
    while True:
        x = create_batch(x_train, y_train, batch_size)
        y = np.zeros((batch_size, 3*emb_size))
        # Target (y) is a matrix of 0s. We aren't actually using it.
        yield x, y

In [12]:
batch_size = 16
epochs = 100
steps_per_epoch = int(m/batch_size)

net.compile(
    loss = triplet_loss(0.2, emb_size), 
    optimizer = 'adam'
)

history = net.fit(
    data_generator(x_train, y_train, batch_size, emb_size),
    steps_per_epoch = steps_per_epoch,
    epochs = epochs, 
    verbose = True,
)

Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100
Epoch 10/100
Epoch 11/100
Epoch 12/100
Epoch 13/100
Epoch 14/100
Epoch 15/100
Epoch 16/100
Epoch 17/100
Epoch 18/100
Epoch 19/100
Epoch 20/100
Epoch 21/100
Epoch 22/100
Epoch 23/100
Epoch 24/100
Epoch 25/100
Epoch 26/100
Epoch 27/100
Epoch 28/100
Epoch 29/100
Epoch 30/100
Epoch 31/100
Epoch 32/100
Epoch 33/100
Epoch 34/100
Epoch 35/100
Epoch 36/100
Epoch 37/100
Epoch 38/100
Epoch 39/100
Epoch 40/100
Epoch 41/100
Epoch 42/100
Epoch 43/100
Epoch 44/100
Epoch 45/100
Epoch 46/100
Epoch 47/100
Epoch 48/100
Epoch 49/100
Epoch 50/100
Epoch 51/100
Epoch 52/100
Epoch 53/100
Epoch 54/100
Epoch 55/100
Epoch 56/100
Epoch 57/100
Epoch 58/100
Epoch 59/100
Epoch 60/100
Epoch 61/100
Epoch 62/100
Epoch 63/100
Epoch 64/100
Epoch 65/100
Epoch 66/100
Epoch 67/100
Epoch 68/100
Epoch 69/100
Epoch 70/100
Epoch 71/100
Epoch 72/100
Epoch 73/100
Epoch 74/100
Epoch 75/100
Epoch 76/100
Epoch 77/100
Epoch 78

Epoch 100/100


In [13]:
history.history

{'loss': [0.1509251743555069,
  0.10886970907449722,
  0.09359375387430191,
  0.08184546232223511,
  0.08012211322784424,
  0.06901495903730392,
  0.05989028885960579,
  0.06119062751531601,
  0.055470384657382965,
  0.0481996051967144,
  0.05493689700961113,
  0.044487349689006805,
  0.049256786704063416,
  0.048073265701532364,
  0.03907403349876404,
  0.04024697095155716,
  0.04073173925280571,
  0.03780272603034973,
  0.03703644499182701,
  0.03154565393924713,
  0.031303487718105316,
  0.033992134034633636,
  0.03340541571378708,
  0.026767298579216003,
  0.03024202212691307,
  0.030400436371564865,
  0.026095133274793625,
  0.03145259618759155,
  0.02768944576382637,
  0.025003736838698387,
  0.03377087041735649,
  0.029419507831335068,
  0.027707591652870178,
  0.026433557271957397,
  0.02624903991818428,
  0.024844948202371597,
  0.024600300937891006,
  0.023549597710371017,
  0.0222068652510643,
  0.01800914853811264,
  0.027073483914136887,
  0.024587135761976242,
  0.0251270

In [25]:
test_anchor = embedding_model.predict(x_test[2].reshape(-1, n, n, 1))
test_positive = embedding_model.predict(x_test[7].reshape(-1, n, n, 1))
test_negative = embedding_model.predict(x_test[12].reshape(-1, n, n, 1))

In [26]:
cosine_similarity(test_anchor, test_positive)

array([[0.9560251]], dtype=float32)

In [27]:
cosine_similarity(test_anchor, test_negative)

array([[0.53414106]], dtype=float32)

In [147]:
_, x_embed, _, y_embed = train_test_split(
    x_train, 
    y_train, 
    stratify = y_train,
    test_size = 40,
    random_state = 0
)

In [163]:
db_embeddings = {}
for i, y in enumerate(y_embed):
    db_embeddings[str(y)] = embedding_model.predict(x_embed[i].reshape(-1, n, n, 1))

In [168]:
with open("embeddings.pickle", "wb") as f:
    pickle.dump(db_embeddings, f)

In [85]:
embedding_model.save("embedding_model.h5")

# mod =  tf.keras.models.load_model(
#     "embedding_model.h5", 
#     custom_objects = {
#         'loss': triplet_loss(0.2, 16)
#     },
#     compile = False
# )
# mod.compile( 
#     loss = triplet_loss(0.2, 16), 
#     optimizer = "adam"
# )