In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import cv2
import seaborn as sns
import plotly.express as ex

from tqdm.cli import tqdm
from glob import glob
from sklearn.manifold import TSNE
from plotly import express as ex
from os import path as pathlib,walk

%matplotlib inline

KeyboardInterrupt: 

In [None]:
class Dataset(object):
    """
    Dataset wrapper to read images from folder and structure it.
    """
    def __init__(self,path,n_faces,resize=110):
        self.path = pathlib.abspath(path)
        self.resize = resize

    def __repr__(self,):
        return f"Dataset Parser @ {self.path}"
        
    def parse(self,progress_bar=True):
        w = walk(self.path)
        self.data = np.array([[[r,pathlib.join(r,f_)] for f_ in f] for r,_,f in list(w)[1:]]).reshape(-1,2)
        self.y = np.array(self.data[:,0])
        self.x = np.array([
            cv2.cvtColor(
                cv2.resize(
                    cv2.imread(
                        i
                    ),
                    (self.resize,self.resize)
                ),
                cv2.COLOR_BGR2RGB
            ) / 255
            for 
                i
            in 
                (tqdm(self.data[:,1]) if progress_bar else self.data[:,1])
        ]).astype(np.float32)
        return self
            

In [None]:
dataset = Dataset("./images/train/",100)

In [None]:
dataset.parse(progress_bar=1)

In [None]:
dataset

In [None]:
import tensorflow as tf

from tensorflow import keras
from tensorflow.keras.layers import *
from tensorflow.keras.callbacks import LearningRateScheduler,ModelCheckpoint,Callback

### Base Network

<!-- |layer  | size-in    | size-out   | kernel     | param | FLPS |
|-------|------------|------------|------------|-------|------|
|conv1  | 220×220×3  | 110×110×64 | 7×7×3, 2   | 9K    |115M  |
|pool1  | 110×110×64 | 55×55×64   | 3×3×64, 2  | 0     |      |
|rnorm1 | 55×55×64   | 55×55×64   | 0          |       |      |
|conv2a | 55×55×64   | 55×55×64   | 1×1×64, 1  | 4K    | 13M  |
|conv2  | 55×55×64   | 55×55×192  | 3×3×64, 1  | 111K  | 335M |
|rnorm2 | 55×55×192  | 55×55×192  | 0          |       |      |
|pool2  | 55×55×192  | 28×28×192  | 3×3×192, 2 | 0     |      |
|conv3a | 28×28×192  | 28×28×192  | 1×1×192, 1 | 37K   | 29M  |
|conv3  | 28×28×192  | 28×28×384  | 3×3×192, 1 | 664K  | 521M |
|pool3  | 28×28×384  | 14×14×384  | 3×3×384, 2 | 0     |      |
|conv4a | 14×14×384  | 14×14×384  | 1×1×384, 1 | 148K  | 29M  |
|conv4  | 14×14×384  | 14×14×256  | 3×3×384, 1 | 885K  | 173M |
|conv5a | 14×14×256  | 14×14×256  | 1×1×256, 1 | 66K   | 13M  |
|conv5  | 14×14×256  | 14×14×256  | 3×3×256, 1 | 590K  | 116M |
|conv6a | 14×14×256  | 14×14×256  | 1×1×256, 1 | 66K   | 13M  |
|conv6  | 14×14×256  | 14×14×256  | 3×3×256, 1 | 590K  | 116M |
|pool4  | 14×14×256  | 7×7×256    | 3×3×256, 2 | 0     |      |
|concat | 7×7×256    | 7×7×256    | 0          |       |      |
|fc1    | 7×7×256    | 1×32×128   | maxout p=2 | 103M  | 103M |
|fc2    | 1×32×128   | 1×32×128   | maxout p=2 | 34M   | 34M  |
|fc7128 | 1×32×128   | 1×1×128    | 524K 0.5M  |       |      |
|L2     | 1×1×128    | 1×1×128    | 0          |       |      | -->

In [None]:
d = 32

In [None]:
def base_network(d=d):

    _input = Input(shape=(110,110,3))

    a_conv0 = Conv2D(32,3,activation="relu",padding="same")(_input)
    a_conv1 = Conv2D(32,3,activation="relu",padding="same")(a_conv0)
    a_conv2 = Conv2D(32,3,activation="relu",padding="same")(a_conv1)
    a_conc = concatenate([a_conv0,a_conv2,a_conv2])
    a_pool = MaxPool2D()(a_conc)

    b_conv0 = Conv2D(32,3,activation="relu",padding="same")(a_pool)
    b_conv1 = Conv2D(32,3,activation="relu",padding="same")(b_conv0)
    b_conv2 = Conv2D(32,3,activation="relu",padding="same")(b_conv1)
    b_conc = concatenate([a_pool,b_conv0,b_conv2,b_conv2])
    b_pool = MaxPool2D()(b_conc)

    c_conv0 = Conv2D(64,3,activation="relu",padding="same")(b_pool)
    c_conv1 = Conv2D(64,3,activation="relu",padding="same")(c_conv0)
    c_conv2 = Conv2D(64,3,activation="relu",padding="same")(c_conv1)
    c_conc = concatenate([b_pool,c_conv0,c_conv2,c_conv2])
    c_pool = MaxPool2D()(c_conc)

    d_conv0 = Conv2D(64,3,activation="relu",padding="same")(c_pool)
    d_conv1 = Conv2D(64,3,activation="relu",padding="same")(d_conv0)
    d_conv2 = Conv2D(64,3,activation="relu",padding="same")(d_conv1)
    d_conc = concatenate([c_pool,d_conv0,d_conv2,d_conv2])
    d_pool = MaxPool2D()(d_conc)

    e_conv0 = Conv2D(128,3,activation="relu",padding="same")(d_pool)
    e_conv1 = Conv2D(128,3,activation="relu",padding="same")(e_conv0)
    e_conv2 = Conv2D(128,3,activation="relu",padding="same")(e_conv1)
    e_conc = concatenate([d_pool,e_conv0,e_conv1,e_conv2])
    e_pool = MaxPool2D()(e_conc)

    f_conv0 = Conv2D(256,3,activation="relu",padding="same")(e_pool)
    f_conv1 = Conv2D(256,3,activation="relu",padding="same")(f_conv0)
    f_conv2 = Conv2D(256,3,activation="relu",padding="same")(f_conv1)
    f_conc = concatenate([f_conv0,f_conv1,f_conv2])
    f_pool = MaxPool2D()(f_conc)

    dense = Flatten()(f_pool)
    dense = Dense(512,activation="linear")(dense)
    dense = Dense(d,activation="tanh")(dense)

    dense = tf.multiply(dense,32)
    
    model = keras.Model(_input,dense)
    return model

### Pairs

In [None]:
class Pairs(object):
    def __init__(self,model,x,y):
        assert len(x) == len(y)
        self.x = x.reshape(-1,*x[0].shape[:2],3)
        self.y = y
        self.model = model
        self.dummy = np.array([[0]])
        
    def get_pair(self,e,x,y):
        a = x.reshape(1,110,110,3)
        
        p_index = np.where(self.y == y)
        n_index = np.where(self.y != y)

        p = self.epoch_enc[p_index]
        n = self.epoch_enc[n_index]

        p_dist = np.sum(np.square(p - e),axis=1).argmax()
        n_dist = np.sum(np.square(n - e),axis=1).argmin()

        p = self.x[p_index][p_dist].reshape(1,110,110,3)
        n = self.x[n_index][n_dist].reshape(1,110,110,3)
        
        return np.array([a,p,n])
        
    def flow(self,epochs=1):
        for epoch in range(epochs):
            self.epoch_enc = self.model.predict(self.x,batch_size=600)
            _iter = zip(self.epoch_enc,self.x,self.y)#,total=len(self.epoch_enc))
            this_batch = np.array([self.get_pair(e,x,y) for e,x,y in _iter])
            for a,p,n in this_batch:
                yield (a,p,n),self.dummy

In [None]:
class Pairs(object):
    def __init__(self,model:tf.keras.Model,x:np.ndarray,y:np.ndarray,size:int=110):
        assert len(x) == len(y)
        self.x = x.reshape(-1,*x[0].shape[:2],3)
        self.y = y
        self.model = model
        self.dummy = np.array([[0]])
        self.size = size
        
    def get_pair(self,e,x,y):
        a = x.reshape(1,self.size,self.size,3)
        
        p_index = np.where(self.y == y)
        n_index = np.where(self.y != y)

        p = self.epoch_enc[p_index]
        n = self.epoch_enc[n_index]

        p_dist = np.sum(np.square(p - e),axis=1).argmax()
        n_dist = np.sum(np.square(n - e),axis=1).argmin()

        p = self.x[p_index][p_dist].reshape(1,self.size,self.size,3)
        n = self.x[n_index][n_dist].reshape(1,self.size,self.size,3)
        
        return np.array([a,p,n])
        
    def flow(self,epochs=1):
        for epoch in range(epochs):
            self.epoch_enc = self.model.predict(self.x,batch_size=600)
            _iter = zip(self.epoch_enc,self.x,self.y)
            for e,x,y in _iter:
                yield (*self.get_pair(e,x,y),),self.dummy

<!-- class Pairs(tf.Module):
        def __init__(self,net,x,y):
            self.net = net
            self.x = tf.constant(x.reshape(-1,*x[0].shape[:2],1).copy(),tf.float32)
            self.y = tf.constant(y.copy(),tf.uint8)       
            self.y_ = tf.argmax(self.y,axis=1)
            self.range_tensor = tf.range(0,len(x),dtype=tf.int32)
            self.dummy = tf.constant([[0]],dtype=tf.float32)
            
        def __repr__(self,):
            return "Pairs Generator"
            
        @tf.function
        def l2(self,x,y):
            return tf.square(tf.subtract(x,y))
        
        @tf.function
        def get_pair(self,i):
            
            x = tf.slice(self.x,[i,0,0,0],[1,-1,-1,-1])
            y = tf.slice(self.y_,[i],[1])
            e = tf.slice(self.epoch_emb,[i,0],[1,-1])
    
            p_index = tf.equal(y,self.y_)
            n_index = tf.not_equal(y,self.y_)
    
            p_dist = self.epoch_emb[p_index]
            n_dist = self.epoch_emb[n_index]
    
            p = tf.argmax(tf.reduce_sum(tf.square(tf.subtract(e,p_dist)),axis=1))
            n = tf.argmin(tf.reduce_sum(tf.square(tf.subtract(e,n_dist)),axis=1))
            
            p = tf.expand_dims(self.x[p_index][p],0)
            n = tf.expand_dims(self.x[n_index][n],0)
    
            return (x,p,n),self.dummy
        
        def flow(self,epochs):
            for epoch in range(epochs):
                self.epoch_emb = tf.constant(self.net.predict(self.x,batch_size=512),tf.float32)
                for i in self.range_tensor:
                    yield self.get_pair(i)
                    " -->

In [None]:
model = base_network()
pairs = Pairs(model,dataset.x,dataset.y)

In [None]:
flow = pairs.flow(1)

In [None]:
fig,axes = plt.subplots(3,3,figsize=(12,12))

for ax in axes:
    (a,p,n),r = next(flow)
    ax[0].imshow(a.reshape(110,110,3))
    ax[1].imshow(p.reshape(110,110,3))
    ax[2].imshow(n.reshape(110,110,3))

### Triplet Loss

### Triplet 

![](triplet.png)

In [None]:
class Triplet(tf.Module):
    """
    Triplet Loss
    """
    __name__ = "TripletLoss"
    def __init__(self,margin=.75):
        self.margin = margin
        
    @tf.function
    def l2(self,x,y):
        return tf.reduce_sum(tf.square(tf.subtract(x,y)))
    
    @tf.function
    def __call__(self,y_true,y_pred,*args,**kwargs):
        a,p,n = tf.unstack(tf.reshape(y_pred,(3,-1,d)))
        
        Dp = self.l2(a,p)
        Dn = self.l2(a,n)
        
        return tf.nn.relu(Dp - Dn + self.margin)

In [None]:
model = base_network()

a_inp = Input(shape=(110,110,3),name="anc")
p_inp = Input(shape=(110,110,3),name="pos")
n_inp = Input(shape=(110,110,3),name="neg")

a_net = model(a_inp)
p_net = model(p_inp)
n_net = model(n_inp)

out = concatenate([a_net,p_net,n_net],name="out")
train = keras.Model([a_inp,p_inp,n_inp],out)

In [None]:
class EpochOutputSave(Callback):
    tsne = TSNE(2)
    def on_epoch_end(self,epoch,log,*args,**kwrags):
        p = model.predict(x,batch_size=5)
        a = self.tsne.fit_transform(p)
        fig = plt.figure(figsize=(13,10))
        sns.scatterplot(a[:,0],a[:,1],hue=y)
        plt.legend("off")
        plt.title(f"epoch : {epoch} | loss : {log['loss']}")
        fig.savefig(f"../doc/epoch/facenet/{epoch}.jpg")
        plt.close()        

In [None]:
class EpochHandler(Callback):
    def __init__(self):
        self.loss_history = [*list(range(5))]
        
    def on_epoch_end(self,epoch,logs=dict()):
        epoch_loss = logs.get('loss')
        self.loss_history.append(epoch_loss)
        if sum(self.loss_history[-3:]) == 0:
            self.model.stop_training = True 
            print (f"Stopped Training At {epoch} Epochs.")

In [None]:
ckp = ModelCheckpoint(filepath="./checkpoints/facenet",save_weights_only=True)
save = EpochOutputSave()

In [None]:
eh = EpochHandler()

In [None]:
pairs = Pairs(model,dataset.x,dataset.y)

In [None]:
opt = keras.optimizers.Adagrad(.0001)

In [None]:
triplet = Triplet(margin=8)

In [None]:
train.compile(optimizer=opt,loss=triplet)

In [None]:
epochs = 250

In [None]:
flow = pairs.flow(epochs=epochs)

In [None]:
history = train.fit_generator(flow,steps_per_epoch=len(dataset.x),epochs=epochs,callbacks=[eh])

In [None]:
p = model.predict(dataset.x,batch_size=32)

In [None]:
tsne = TSNE(2,)

In [None]:
a = tsne.fit_transform(p)

In [None]:
fig = plt.figure(figsize=(13,10))
sns.scatterplot(a[:,0],a[:,1],hue=dataset.y,)
plt.legend("off")
plt.title("Final Output")

In [None]:
tsne = TSNE(3,)
a = tsne.fit_transform(p)

In [None]:
label = [i.split("\\")[-1] for i in dataset.y]

In [None]:
ex.scatter_3d(x=a[:,0],y=a[:,1],z=a[:,2],color=label)

In [None]:
# fig.savefig("../doc/2d_cluster.jpg")

In [None]:
# open("./checkpoints/facenet.json","w+").write(model.to_json())

### Testing On Faces

In [None]:
def mse(x,y):
    return np.mean(np.square(np.subtract(x,y)))

In [None]:
i = 3

face = x[i].copy()

face_true = model.predict(face.reshape(1,110,110,3))
fig,axes = plt.subplots(5,2,figsize=(12,35))

index = np.random.randint(0,len(x),5)

for (ax0,ax1),j in zip(axes,index):
    img = x[j]
    enc = model.predict(img.reshape(1,110,110,3))
    dist = mse(face_true,enc)
    ax0.imshow(face)
    ax1.imshow(img)
    ax1.set_title(f"Distance : {dist}")
    
    ax0.axis("off")
    ax1.axis("off")

face_next = x[i+1]
enc = model.predict(face_next.reshape(1,110,110,3))

dist = mse(face_true,enc)
axes[0,0].imshow(face)
axes[0,1].imshow(face_next)
axes[0,1].set_title(f"Distance : {dist}")

In [None]:
fig.savefig("../doc/test.jpg")