# Triplet Loss with Online Triplet Mining

source: 
1. https://github.com/bukhari-utp/Face-identification-with-cnn-triplet-loss
2. https://github.com/bukhari-utp/siamese-triplet (Siamese and triplet networks with online pair/triplet mining in PyTorch)
3. siamese-transfer-learning/siamese_dogs_vs_cats_vgg16.ipynb

- seems doesn't work, no improvement (see the source)

# train a cnn to extract the features

In [2]:
import os.path
import numpy as np

from keras.datasets import mnist
from keras.utils import plot_model
from keras.callbacks import ModelCheckpoint
from keras.preprocessing.image import ImageDataGenerator
from sklearn.preprocessing import OneHotEncoder
from keras.models import Model, Sequential
from keras.optimizers import SGD, Adam
from keras.layers import Dense, Lambda, Input, merge, Conv2D, Activation, Dropout, MaxPooling2D, Flatten, GlobalMaxPooling2D, BatchNormalization
import keras.backend as K

import dlib

from IPython.display import Image as image_disp

import skimage.transform as tr

from enum import Enum

import os.path
import numpy as np

Using TensorFlow backend.


In [3]:
class FaceDetector:
    def __init__(self):
        self.detector = dlib.get_frontal_face_detector()

    def detect_faces(self,
                     image, *,
                     upscale_factor=1,
                     greater_than=None,
                     get_top=None):

        face_rects = list(self.detector(image, upscale_factor))

        if greater_than is not None:
            face_rects = list(filter(lambda r:
                              r.height() > greater_than and r.width() > greater_than,
                              face_rects))

        face_rects.sort(key=lambda r: r.width() * r.height(), reverse=True)

        if get_top is not None:
            face_rects = face_rects[:get_top]

        return face_rects


class FaceAlignMask(Enum):
    INNER_EYES_AND_BOTTOM_LIP = [39, 42, 57]
    OUTER_EYES_AND_NOSE = [36, 45, 33]


class FaceAligner:
    def __init__(self,
                 dlib_predictor_path,
                 face_template_path):
        self.predictor = dlib.shape_predictor(dlib_predictor_path)
        self.face_template = np.load(face_template_path)

    def get_landmarks(self,
                      image,
                      face_rect):
        points = self.predictor(image, face_rect)
        return np.array(list(map(lambda p: [p.x, p.y], points.parts())))

    def align_face(self,
                   image,
                   face_rect, *,
                   dim=96,
                   border=0,
                   mask=FaceAlignMask.INNER_EYES_AND_BOTTOM_LIP):
        mask = np.array(mask.value)

        landmarks = self.get_landmarks(image, face_rect)
        proper_landmarks = border + dim * self.face_template[mask]
        A = np.hstack([landmarks[mask], np.ones((3, 1))]).astype(np.float64)
        B = np.hstack([proper_landmarks, np.ones((3, 1))]).astype(np.float64)
        T = np.linalg.solve(A, B).T

        wrapped = tr.warp(image,
                          tr.AffineTransform(T).inverse,
                          output_shape=(dim + 2 * border, dim + 2 * border),
                          order=3,
                          mode='constant',
                          cval=0,
                          clip=True,
                          preserve_range=True)

        return wrapped

    def align_faces(self,
                    image,
                    face_rects,
                    *args,
                    **kwargs):
        result = []

        for rect in face_rects:
            result.append(self.align_face(image, rect, *args, **kwargs))

        return result


def clip_to_range(img):
    return img / 255.0

In [4]:
GREATER_THAN = 32
BATCH_SIZE = 128
IMSIZE = 217
IMBORDER = 5

def build_my_cnn(dim, n_class, n_channel):
    model = Sequential()

    model.add(BatchNormalization(input_shape=(dim, dim, 1)))
    
    model.add(Conv2D(32, kernel_size=(3, 3),padding='same'))
    model.add(Activation('relu'))
    model.add(MaxPooling2D(pool_size=(2, 2)))
    
    model.add(Conv2D(64, (3, 3)))
    model.add(Activation('relu'))
    model.add(MaxPooling2D(pool_size=(2, 2)))
    
    model.add(Conv2D(128,kernel_size=(3, 3), padding='same'))
    model.add(Activation('relu'))
    model.add(MaxPooling2D(pool_size=(2, 2)))
    
    model.add(Conv2D(32, kernel_size=(3, 3)))
    model.add(Activation('relu'))
    model.add(MaxPooling2D(pool_size=(2, 2)))
    
    model.add(GlobalMaxPooling2D())
    
    model.add(Dense(64)) #512
    model.add(Activation('relu'))    
    
    model.add(Dense(n_class))
    model.add(Activation('sigmoid'))
    
    adam = Adam(lr=0.001, beta_1=0.9, beta_2=0.999, epsilon=1e-08, decay=0.0)
    model.compile(loss='binary_crossentropy', optimizer=adam, metrics=['accuracy'])
    
    return model

def triplet_loss(y_true, y_pred):
    return -K.mean(K.log(K.sigmoid(y_pred)))

def triplet_merge(inputs):
    a, p, n = inputs
    return K.sum(a * (p - n), axis=1)

def triplet_merge_shape(input_shapes):
    return (input_shapes[0][0], 1)

def build_tpe(n_in, n_out, W_pca=None):
    a = Input(shape=(n_in,))
    p = Input(shape=(n_in,))
    n = Input(shape=(n_in,))

    if W_pca is None:
        W_pca = np.zeros((n_in, n_out))

    base_model = Sequential()
    base_model.add(Dense(n_out, input_dim=n_in, bias=False, weights=[W_pca], activation='linear'))
    base_model.add(Lambda(lambda x: K.l2_normalize(x, axis=1)))

    a_emb = base_model(a)
    p_emb = base_model(p)
    n_emb = base_model(n)

    e = merge([a_emb, p_emb, n_emb], mode=triplet_merge, output_shape=triplet_merge_shape)

    model = Model(input=[a, p, n], output=e)
    predict = Model(input=a, output=a_emb)

    model.compile(loss=triplet_loss, optimizer='rmsprop')

    return model, predict

class Bottleneck:
    def __init__(self, model, layer):
        self.fn = K.function([model.layers[0].input, K.learning_phase()], [model.layers[layer].output])

    def predict(self, data_x, batch_size=32, learning_phase=False):
        n_data = len(data_x)
        n_batches = n_data // batch_size + (0 if n_data % batch_size == 0 else 1)

        result = None

        learning_phase = 1 if learning_phase else 0

        for i in range(n_batches):
            batch_x = data_x[i * batch_size:(i + 1) * batch_size]
            batch_y = self.fn([batch_x, 0])[0]

            if result is None:
                result = batch_y
            else:
                result = np.vstack([result, batch_y])

        return result

class FaceVerificator:
    def __init__(self, model_dir):
        self._model_dir = model_dir

        self._model_files = {
            'shape_predictor': os.path.join(model_dir, 'shape_predictor_68_face_landmarks.dat'),
            'face_template': os.path.join(model_dir, 'face_template.npy'),
            'mean': os.path.join(model_dir, 'mean.npy'),
            'stddev': os.path.join(model_dir, 'stddev.npy'),
            'cnn_weights': os.path.join(model_dir, 'weights_cnn.h5'),
            'tpe_weights': os.path.join(model_dir, 'weights_tpe.h5'),
        }

    def initialize_model(self):
        self._mean = np.load(self._model_files['mean'])
        self._stddev = np.load(self._model_files['stddev'])
        self._fd = FaceDetector()
        self._fa = FaceAligner(self._model_files['shape_predictor'],
                               self._model_files['face_template'])
        cnn = build_my_cnn(227, 24)
        cnn.load_weights(self._model_files['cnn_weights'])
        self._cnn = Bottleneck(cnn, ~1)
        _, tpe = build_tpe(24, 24)
        tpe.load_weights(self._model_files['tpe_weights'])
        self._tpe = tpe

    def normalize(self, img):
        img = clip_to_range(img)
        return (img - self._mean) / self._stddev

    def process_image(self, img):
        face_rects = self._fd.detect_faces(img, upscale_factor=2, greater_than=GREATER_THAN)

        if not face_rects:
            return []

        faces = self._fa.align_faces(img, face_rects, dim=IMSIZE, border=IMBORDER)
        faces = list(map(self.normalize, faces))

        faces_y = self._cnn.predict(faces, batch_size=BATCH_SIZE)
        faces_y = self._tpe.predict(faces_y, batch_size=BATCH_SIZE)

        return list(zip(face_rects, faces_y))

    def compare_many(self, dist, xs, ys):
        xs = np.array(xs)
        ys = np.array(ys)
        scores = xs @ ys.T
        return scores, scores > dist

In [5]:
def get_scores(data_y, protocol):
    data_y = data_y / np.linalg.norm(data_y, axis=1)[:, np.newaxis]
    scores = data_y @ data_y.T

    return scores[protocol], scores[np.logical_not(protocol)]


def calc_metrics(targets_scores, imposter_scores):
    min_score = np.minimum(np.min(targets_scores), np.min(imposter_scores))
    max_score = np.maximum(np.max(targets_scores), np.max(imposter_scores))

    n_tars = len(targets_scores)
    n_imps = len(imposter_scores)

    N = 100

    fars = np.zeros((N,))
    frrs = np.zeros((N,))
    dists = np.zeros((N,))

    min_gap = float('inf')
    eer = 0

    for i, dist in enumerate(np.linspace(min_score, max_score, N)):
        far = len(np.where(imposter_scores > dist)[0]) / n_imps
        frr = len(np.where(targets_scores < dist)[0]) / n_tars
        fars[i] = far
        frrs[i] = frr
        dists[i] = dist

        gap = np.abs(far - frr)

        if gap < min_gap:
            min_gap = gap
            eer = (far + frr) / 2

    return eer, fars, frrs, dists


In [6]:
OUT_DIR = '../output/DR-Net_triplet_loss'
if not os.path.exists(OUT_DIR):
    os.mkdir('../output/DR-Net_triplet_loss')

NB_EPOCH = 30
BATCH_SIZE = 16

AUGMENTATION = True

# Change this for MNIST (Notebook: DR-Net_contrastive_loss)
"""
train_x, train_y = np.load('data/train_x.npy'), np.load('data/train_y.npy')
test_x, test_y = np.load('data/test_x.npy'), np.load('data/test_y.npy')
"""
# the data, split between train and test sets
(train_x, train_y), (test_x, test_y) = mnist.load_data()
train_x = train_x.astype('float32')
test_x = test_x.astype('float32')
train_x /= 255
test_x /= 255

n_subjects = len(set(train_y))
n_train = train_x.shape[0]
n_test = test_x.shape[0]

One = OneHotEncoder()
One.fit(train_y.reshape(-1, 1))

train_y = One.transform(train_y.reshape(-1, 1)).todense()
test_y = One.transform(test_y.reshape(-1, 1)).todense()

print('n_train: {}'.format(n_train))
print('n_test: {}'.format(n_test))
print('n_subjects: {}'.format(n_subjects))

n_train: 60000
n_test: 10000
n_subjects: 10


In [1]:
MODEL_IMAGE = os.path.join(OUT_DIR, 'Net_triplet_loss.png')
WEIGHTS_DIR = os.path.join(OUT_DIR, 'weights.best.h5')

checkpoint = ModelCheckpoint(WEIGHTS_DIR, monitor='val_acc', verbose=0, save_best_only=True, mode='max')

datagen = ImageDataGenerator(
    rotation_range=20,
    width_shift_range=0.2,
    height_shift_range=0.2,
    zoom_range=0.1,
    horizontal_flip=True)

model = build_my_cnn(train_x.shape[1], n_subjects, train_x.shape[2])  # default=227
model.summary()

plot_model(model, to_file=MODEL_IMAGE, show_shapes=True)
image_disp(filename=MODEL_IMAGE)

NameError: name 'os' is not defined

In [None]:
model.fit_generator(datagen.flow(train_x, train_y, batch_size=BATCH_SIZE),
                        samples_per_epoch=train_x.shape[0],
                        nb_epoch=NB_EPOCH,
                        validation_data=[test_x, test_y],
                        callbacks=[checkpoint])

# train the model with triplet loss

In [None]:
import numpy as np


def get_scores(data_y, protocol):
    data_y = data_y / np.linalg.norm(data_y, axis=1)[:, np.newaxis]
    scores = data_y @ data_y.T

    return scores[protocol], scores[np.logical_not(protocol)]


def calc_metrics(targets_scores, imposter_scores):
    min_score = np.minimum(np.min(targets_scores), np.min(imposter_scores))
    max_score = np.maximum(np.max(targets_scores), np.max(imposter_scores))

    n_tars = len(targets_scores)
    n_imps = len(imposter_scores)

    N = 100

    fars = np.zeros((N,))
    frrs = np.zeros((N,))
    dists = np.zeros((N,))

    min_gap = float('inf')
    eer = 0

    for i, dist in enumerate(np.linspace(min_score, max_score, N)):
        far = len(np.where(imposter_scores > dist)[0]) / n_imps
        frr = len(np.where(targets_scores < dist)[0]) / n_tars
        fars[i] = far
        frrs[i] = frr
        dists[i] = dist

        gap = np.abs(far - frr)

        if gap < min_gap:
            min_gap = gap
            eer = (far + frr) / 2

    return eer, fars, frrs, dists


In [None]:
import itertools
import numpy as np
#from model import build_my_cnn
#from model import build_tpe
#from model import Bottleneck
#from identification import get_scores, calc_metrics
from sklearn.decomposition import PCA

n_in = 24
n_out = 24

data_dir='data/'

cnn = build_my_cnn(227, 24)
cnn.load_weights(data_dir+'weights/weights.best.h5')
bottleneck = Bottleneck(cnn, ~1)

train_x, train_y = np.load(data_dir+'train_x.npy'), np.load(data_dir+'train_y.npy')
test_x, test_y = np.load(data_dir+'test_x.npy'), np.load(data_dir+'test_y.npy')

train_x = np.vstack([train_x, test_x])
train_y = np.hstack([train_y, test_y])

dev_x = np.load(data_dir+'dev_x.npy')
dev_protocol = np.load(data_dir+'dev_protocol.npy')

train_emb = bottleneck.predict(train_x, batch_size=256)
dev_emb = bottleneck.predict(dev_x, batch_size=256)

del train_x

pca = PCA(n_out)
pca.fit(train_emb)
W_pca = pca.components_

tpe, tpe_pred = build_tpe(n_in, n_out, W_pca.T)
# tpe.load_weights('data/weights/weights.tpe.mineer.h5')

train_y = np.array(train_y)
subjects = list(set(train_y))

anchors_inds = []
positives_inds = []
labels = []

for subj in subjects:
    mask = train_y == subj
    inds = np.where(mask)[0]
    for a, p in itertools.permutations(inds, 2):
        anchors_inds.append(a)
        positives_inds.append(p)
        labels.append(subj)

anchors = train_emb[anchors_inds]
positives = train_emb[positives_inds]
n_anchors = len(anchors_inds)

NB_EPOCH = 100
COLD_START = NB_EPOCH
BATCH_SIZE = 4
BIG_BATCH_SIZE = 512

inds = np.arange(n_anchors)

def get_batch(hard=False):
    batch_inds = np.random.choice(inds, size=BIG_BATCH_SIZE, replace=False)

    train_emb2 = tpe_pred.predict(train_emb, batch_size=1024)
    scores = train_emb2 @ train_emb2.T
    negative_inds = []

    for i in batch_inds:
        label = labels[i]
        mask = train_y == label
        if hard:
            negative_inds.append(np.ma.array(scores[label], mask=mask).argmax())
        else:
            negative_inds.append(np.random.choice(np.where(np.logical_not(mask))[0], size=1)[0])

    return anchors[batch_inds], positives[batch_inds], train_emb[negative_inds]


def test():
    dev_emb2 = tpe_pred.predict(dev_emb)
    tsc, isc = get_scores(dev_emb2, dev_protocol)
    eer, _, _, _ = calc_metrics(tsc, isc)
    return eer

z = np.zeros((BIG_BATCH_SIZE,))

mineer = float('inf')

for e in range(NB_EPOCH):
    print('epoch: {}'.format(e))
    a, p, n = get_batch(e > COLD_START)
    tpe.fit([a, p, n], z, batch_size=BATCH_SIZE, epochs=1)
    eer = test()
    print('EER: {:.2f}'.format(eer * 100))
    if eer < mineer:
        mineer = eer
        tpe.save_weights(data_dir+'weights/weights.tpe.h5')

# test some images

In [None]:
import os.path
import numpy as np
from preprocessing import FaceDetector, FaceAligner, clip_to_range
import keras.backend as K
from keras.models import Model, Sequential
from keras.optimizers import SGD, Adam
from keras.layers import Dense, Lambda, Input, merge, Conv2D, Activation, Dropout, MaxPooling2D, Flatten, GlobalMaxPooling2D, BatchNormalization

GREATER_THAN = 32
BATCH_SIZE = 128
IMSIZE = 217
IMBORDER = 5

def build_my_cnn(dim,n_class):
    model = Sequential()

    model.add(BatchNormalization(input_shape=(dim,dim,3)))
    
    model.add(Conv2D(32, kernel_size=(3, 3),padding='same'))
    model.add(Activation('relu'))
    model.add(MaxPooling2D(pool_size=(2, 2)))
    
    model.add(Conv2D(64, (3, 3)))
    model.add(Activation('relu'))
    model.add(MaxPooling2D(pool_size=(2, 2)))
    
    model.add(Conv2D(128,kernel_size=(3, 3), padding='same'))
    model.add(Activation('relu'))
    model.add(MaxPooling2D(pool_size=(2, 2)))
    
    model.add(Conv2D(32, kernel_size=(3, 3)))
    model.add(Activation('relu'))
    model.add(MaxPooling2D(pool_size=(2, 2)))
    
    model.add(GlobalMaxPooling2D())
    
    model.add(Dense(64)) #512
    model.add(Activation('relu'))    
    
    model.add(Dense(n_class))
    model.add(Activation('sigmoid'))
    
    adam = Adam(lr=0.001, beta_1=0.9, beta_2=0.999, epsilon=1e-08, decay=0.0)
    model.compile(loss='binary_crossentropy', optimizer=adam, metrics=['accuracy'])
    
    return model

def triplet_loss(y_true, y_pred):
    return -K.mean(K.log(K.sigmoid(y_pred)))

def triplet_merge(inputs):
    a, p, n = inputs
    return K.sum(a * (p - n), axis=1)

def triplet_merge_shape(input_shapes):
    return (input_shapes[0][0], 1)

def build_tpe(n_in, n_out, W_pca=None):
    a = Input(shape=(n_in,))
    p = Input(shape=(n_in,))
    n = Input(shape=(n_in,))

    if W_pca is None:
        W_pca = np.zeros((n_in, n_out))

    base_model = Sequential()
    base_model.add(Dense(n_out, input_dim=n_in, bias=False, weights=[W_pca], activation='linear'))
    base_model.add(Lambda(lambda x: K.l2_normalize(x, axis=1)))

    a_emb = base_model(a)
    p_emb = base_model(p)
    n_emb = base_model(n)

    e = merge([a_emb, p_emb, n_emb], mode=triplet_merge, output_shape=triplet_merge_shape)

    model = Model(input=[a, p, n], output=e)
    predict = Model(input=a, output=a_emb)

    model.compile(loss=triplet_loss, optimizer='rmsprop')

    return model, predict

class Bottleneck:
    def __init__(self, model, layer):
        self.fn = K.function([model.layers[0].input, K.learning_phase()], [model.layers[layer].output])

    def predict(self, data_x, batch_size=32, learning_phase=False):
        n_data = len(data_x)
        n_batches = n_data // batch_size + (0 if n_data % batch_size == 0 else 1)

        result = None

        learning_phase = 1 if learning_phase else 0

        for i in range(n_batches):
            batch_x = data_x[i * batch_size:(i + 1) * batch_size]
            batch_y = self.fn([batch_x, 0])[0]

            if result is None:
                result = batch_y
            else:
                result = np.vstack([result, batch_y])

        return result

class FaceVerificator:
    def __init__(self, model_dir):
        self._model_dir = model_dir

        self._model_files = {
            'shape_predictor': os.path.join(model_dir, 'shape_predictor_68_face_landmarks.dat'),
            'face_template': os.path.join(model_dir, 'face_template.npy'),
            'mean': os.path.join(model_dir, 'mean.npy'),
            'stddev': os.path.join(model_dir, 'stddev.npy'),
            'cnn_weights': os.path.join(model_dir, 'weights_cnn.h5'),
            'tpe_weights': os.path.join(model_dir, 'weights_tpe.h5'),
        }

    def initialize_model(self):
        self._mean = np.load(self._model_files['mean'])
        self._stddev = np.load(self._model_files['stddev'])
        self._fd = FaceDetector()
        self._fa = FaceAligner(self._model_files['shape_predictor'],
                               self._model_files['face_template'])
        cnn = build_my_cnn(227, 24)
        cnn.load_weights(self._model_files['cnn_weights'])
        self._cnn = Bottleneck(cnn, ~1)
        _, tpe = build_tpe(24, 24)
        tpe.load_weights(self._model_files['tpe_weights'])
        self._tpe = tpe

    def normalize(self, img):
        img = clip_to_range(img)
        return (img - self._mean) / self._stddev

    def process_image(self, img):
        face_rects = self._fd.detect_faces(img, upscale_factor=2, greater_than=GREATER_THAN)

        if not face_rects:
            return []

        faces = self._fa.align_faces(img, face_rects, dim=IMSIZE, border=IMBORDER)
        faces = list(map(self.normalize, faces))

        faces_y = self._cnn.predict(faces, batch_size=BATCH_SIZE)
        faces_y = self._tpe.predict(faces_y, batch_size=BATCH_SIZE)

        return list(zip(face_rects, faces_y))

    def compare_many(self, dist, xs, ys):
        xs = np.array(xs)
        ys = np.array(ys)
        scores = xs @ ys.T
        return scores, scores > dist


In [None]:
# from model import FaceVerificator
from skimage import io

###
img_path_0 = 'data/dev/21.jpg'
img_path_1 = 'data/dev/22.jpg'
dist = 0.85
###

extractor = FaceVerificator('model')
extractor.initialize_model()

img_0 = io.imread(img_path_0)
img_1 = io.imread(img_path_1)

faces_0 = extractor.process_image(img_0)
faces_1 = extractor.process_image(img_1)

n_faces_0 = len(faces_0)
n_faces_1 = len(faces_1)

if n_faces_0 == 0 or n_faces_1 == 0:
    print('Error: No faces found on the {}!'.format(img_path_0 if n_faces_0 == 0 else img_path_1))
    exit()

rects_0 = list(map(lambda p: p[0], faces_0))
rects_1 = list(map(lambda p: p[0], faces_1))

embs_0 = list(map(lambda p: p[1], faces_0))
embs_1 = list(map(lambda p: p[1], faces_1))

scores, comps = extractor.compare_many(dist, embs_0, embs_1)

print('Rects on image 0: {}'.format(rects_0))
print('Rects on image 1: {}'.format(rects_1))

print('Embeddings of faces on image 0:')
print(embs_0)

print('Embeddings of faces on image 1:')
print(embs_1)

print('Score matrix:')
print(scores)

print('Decision matrix :')
print(comps)