# Siamese archhitecture with triplet loss function

Prepare images to use it in the classification

In [2]:
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import numpy as np
import cv2
from os.path import join


def fetch(img_dir, name):
    #print('image ' + str(name))
    img = cv2.imread(join(img_dir, name))
    if img.shape == 2:
        img = cv2.cvtColor(img, cv2.COLOR_GRAY2RGB)
    elif img.shape == 3:
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    return img


def resize(img, size=(1024, 768)):
    assert len(size) == 2
    return cv2.resize(img, size, interpolation=cv2.INTER_CUBIC)


def pad(img, size=(1024, 768)):
    assert len(img.shape) == 3
    assert len(size) == 2
    h, w, _ = img.shape
    #assert w <= size[0] and h <= size[1]
    pad_vert = np.ceil((size[1]-h) / 2).astype(np.uint32)
    pad_hor = np.ceil((size[0]-w) / 2).astype(np.uint32)

    padded = np.full((size[1], size[0], 3), 255).astype(np.uint8)
    padded[pad_vert:pad_vert+h, pad_hor:pad_hor+w, :] = img.copy()
    return padded

Words sequence class that uses data preparation of the dataset

In [3]:
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import sys 
sys.path.insert(0, '../')

import numpy as np
import pandas as pd

from keras.utils import Sequence
from keras.preprocessing.image import ImageDataGenerator


class WordsSequence(Sequence):
    def __init__(self, img_dir, input_shape, x_set, y_set=None, batch_size=16):
        if y_set is not None:
            self.x, self.y = x_set, y_set
            self.dataset = pd.DataFrame(data={'x': self.x, 'y': self.y, 'used': np.zeros_like(self.y)})
            self.dataset['class_count'] = self.dataset.groupby('y')['y'].transform('count')
        else:
            self.x, self.y = x_set, None
            
        self.img_dir = img_dir
        self.input_shape = input_shape
        self.batch_size = batch_size

    def __len__(self):
        return int(np.ceil(len(self.x) / float(self.batch_size)))

    def __getitem__(self, idx):
        if self.y is None:
            batch_x = self.x[idx * self.batch_size:(idx + 1) * self.batch_size]
            return np.array([self.preprocess(fetch(self.img_dir, name)) for name in batch_x])

        unused = self.dataset.loc[self.dataset['used'] == 0]
            
        if len(unused) >= self.batch_size:
            batch_indices = unused.sample(n=self.batch_size).index
        else:
            batch_indices = unused.sample(n=self.batch_size, replace=True).index

        self.dataset.loc[batch_indices, 'used'] = 1
        batch_x = self.dataset.iloc[batch_indices]['x'].values
        batch_y = self.dataset.iloc[batch_indices]['y'].values
        return np.array([self.preprocess(fetch(self.img_dir, name)) for name in batch_x]), np.array(batch_y)

    def preprocess(self, img):
        assert len(img.shape) == 3

        h, w, _ = img.shape
        if h / w <= self.input_shape[0] / self.input_shape[1]:
            img = resize(img, (self.input_shape[1], int(self.input_shape[1] * h / w)))
        else:
            img = resize(img, (int(self.input_shape[0] * w / h), self.input_shape[0]))

        img = pad(img, (self.input_shape[1], self.input_shape[0]))
        return img / 255.  

    def on_epoch_end(self):
        if self.y is not None:
            self.dataset = pd.DataFrame(data={'x': self.x, 'y': self.y, 'used': np.zeros_like(self.y)})
            self.dataset['class_count'] = self.dataset.groupby('y')['y'].transform('count')



Using TensorFlow backend.


Triplets loss functions

In [4]:
import tensorflow as tf


def valid_triplets_mask(labels):
    """Compute the 3D boolean mask where mask[a, p, n] is True if (a, p, n) is a valid triplet,
    as in a, p, n are distinct and labels[a] == labels[p], labels[a] != labels[n].

    :param labels: tensor of shape (batch_size,)
    :return mask: tf.bool tensor of shape (batch_size, batch_size, batch_size)
    """

    indices_equal = tf.cast(tf.eye(tf.shape(labels)[0]), tf.bool)
    indices_not_equal = tf.logical_not(indices_equal)
    i_not_equal_j = tf.expand_dims(indices_not_equal, 2)
    i_not_equal_k = tf.expand_dims(indices_not_equal, 1)
    j_not_equal_k = tf.expand_dims(indices_not_equal, 0)
    distinct_indices = tf.logical_and(tf.logical_and(i_not_equal_j, i_not_equal_k), j_not_equal_k)

    label_equal = tf.equal(tf.expand_dims(labels, 0), tf.expand_dims(labels, 1))
    i_equal_j = tf.expand_dims(label_equal, 2)
    i_equal_k = tf.expand_dims(label_equal, 1)
    valid_labels = tf.logical_and(i_equal_j, tf.logical_not(i_equal_k))

    mask = tf.logical_and(distinct_indices, valid_labels)
    return mask


def euclidean_distance(embeddings, squared=False):
    """Computes pairwise euclidean distance matrix with numerical stability.
    output[i, j] = || feature[i, :] - feature[j, :] ||_2

    :param embeddings: 2-D Tensor of size [number of data, feature dimension].
    :param squared: Boolean, whether or not to square the pairwise distances.
    :return dist: 2-D Tensor of size [number of data, number of data].
    """
    dist_squared = tf.add(tf.reduce_sum(tf.square(embeddings), axis=1, keepdims=True),
                          tf.reduce_sum(tf.square(tf.transpose(embeddings)), axis=0, keepdims=True)
                          ) - 2.0 * tf.matmul(embeddings, tf.transpose(embeddings))

    # Deal with numerical inaccuracies. Set small negatives to zero.
    dist_squared = tf.maximum(dist_squared, 0.0)
    # Get the mask where the zero distances are at.
    error_mask = tf.less_equal(dist_squared, 0.0)
    # Optionally take the sqrt.
    dist = dist_squared if squared else tf.sqrt(dist_squared + tf.cast(error_mask, dtype=tf.float32) * 1e-16)
    # Undo conditionally adding 1e-16.
    dist = tf.multiply(dist, tf.cast(tf.logical_not(error_mask), dtype=tf.float32))

    n_data = tf.shape(embeddings)[0]
    # Explicitly set diagonals to zero.
    mask_offdiagonals = tf.ones_like(dist) - tf.linalg.diag(tf.ones([n_data]))
    dist = tf.multiply(dist, mask_offdiagonals)
    return dist


def masked_maximum(data, mask, dim=1):
    """Computes the axis wise maximum over chosen elements.
    :param data: 2-D float `Tensor` of size [n, m].
    :param mask: 2-D Boolean `Tensor` of size [n, m].
    :param dim: The dimension over which to compute the maximum.
    :return masked_maximums: N-D `Tensor`. The maximized dimension is of size 1 after the operation.
    """
    axis_minimums = tf.reduce_min(data, axis=dim, keepdims=True)
    masked_maximums = tf.reduce_max(tf.multiply(data - axis_minimums, mask), axis=dim, keepdims=True) + axis_minimums
    return masked_maximums


def masked_minimum(data, mask, dim=1):
    """Computes the axis wise minimum over chosen elements.
    :param data: 2-D float `Tensor` of size [n, m].
    :param mask: 2-D Boolean `Tensor` of size [n, m].
    :param dim: The dimension over which to compute the minimum.
    :return masked_minimums: N-D `Tensor`. The minimized dimension is of size 1 after the operation.
    """
    axis_maximums = tf.reduce_max(data, axis=dim, keepdims=True)
    masked_minimums = tf.reduce_min(tf.multiply(data - axis_maximums, mask), axis=dim, keepdims=True) + axis_maximums
    return masked_minimums


def triplet_loss(margin=1.0, strategy='batch_semi_hard'):
    """Compute the triplet loss over the batch of embeddings. tf contrib inspired:
    https://github.com/tensorflow/tensorflow/blob/master/tensorflow/contrib/losses/python/metric_learning/metric_loss_ops.py

    :param margin: margin that is going to be enforced by the triplet loss
    :param strategy: string, that indicated whether we're using the 'batch hard', 'batch all' or 'batch_semi_hard' mining strategy
    :return: a callback function that calculates the loss according to the specified strategy
    """
    def get_loss_tensor(positive_dists, negative_dists):
        """Compute the triplet loss function tensor using specified margin:

        :param positive_dists: positive distances tensor
        :param negative_dists:  negative distances tensor
        :return: resulting triplet loss tensor
        """
        if margin == 'soft':
            return tf.nn.softplus(positive_dists - negative_dists)

        return tf.maximum(positive_dists - negative_dists + margin, 0.0)

    def batch_semi_hard(labels, embeddings):
        """Computes the triplet loss with semi-hard negative mining.
        The loss encourages the positive distances (between a pair of embeddings with
        the same labels) to be smaller than the minimum negative distance among
        which are at least greater than the positive distance plus the margin constant
        (called semi-hard negative) in the mini-batch. If no such negative exists,
        uses the largest negative distance instead.
        See: https://arxiv.org/abs/1503.03832.

        :param labels: 1-D tf.int32 `Tensor` with shape [batch_size] of multiclass integer labels.
        :param embeddings: 2-D float `Tensor` of embedding vectors. Embeddings should be l2 normalized.
        :return loss: tf.float32 scalar.
        """
        labels = tf.reshape(labels, [-1, 1])
        batch_size = tf.size(labels)
        # Build pairwise squared distance matrix.
        dist = euclidean_distance(embeddings, squared=True)
        # Build pairwise binary adjacency matrix (equal label mask).
        adjacency = tf.equal(labels, tf.transpose(labels))
        # Invert so we can select negatives only.
        adjacency_not = tf.logical_not(adjacency)

        # Compute the mask.
        dist_tile = tf.tile(dist, [batch_size, 1])  # stack dist matrix batch_size times, axis=0
        mask = tf.logical_and(tf.tile(adjacency_not, [batch_size, 1]), tf.greater(dist_tile, tf.reshape(dist, [-1, 1])))
        mask = tf.cast(mask, dtype=tf.float32)
        is_negatives_outside = tf.reshape(tf.greater(tf.reduce_sum(mask, axis=1, keepdims=True), 0.0), [batch_size, batch_size])
        is_negatives_outside = tf.transpose(is_negatives_outside)

        # negatives_outside: smallest D_an where D_an > D_ap.
        negatives_outside = tf.reshape(masked_minimum(dist_tile, mask), [batch_size, batch_size])
        negatives_outside = tf.transpose(negatives_outside)

        # negatives_inside: largest D_an.
        adjacency_not = tf.cast(adjacency_not, dtype=tf.float32)
        negatives_inside = tf.tile(masked_maximum(dist, adjacency_not), [1, batch_size])

        semi_hard_negatives = tf.where(is_negatives_outside, negatives_outside, negatives_inside)

        # In lifted-struct, the authors multiply 0.5 for upper triangular
        #   in semihard, they take all positive pairs except the diagonal.
        mask_positives = tf.cast(adjacency, dtype=tf.float32) - tf.linalg.diag(tf.ones([batch_size]))
        n_positives = tf.reduce_sum(mask_positives)

        loss_mat = get_loss_tensor(dist, semi_hard_negatives)
        loss = tf.math.divide_no_nan(tf.reduce_sum(tf.multiply(loss_mat, mask_positives)), n_positives)
        return loss

    def batch_all(labels, embeddings):
        """Compute the loss by generating all the valid triplets and averaging over the positive ones

        :param labels: 1-D tf.int32 `Tensor` with shape [batch_size] of multiclass integer labels.
        :param embeddings: 2-D float `Tensor` of embedding vectors. Embeddings should be l2 normalized.
        :return loss: tf.float32 scalar.
        """
        dist = euclidean_distance(embeddings, squared=True)
        #mask = tf.to_float(valid_triplets_mask(labels))
        mask = tf.cast(valid_triplets_mask(labels), dtype=tf.float32)

        anchor_positive_dist = tf.expand_dims(dist, 2)
        anchor_negative_dist = tf.expand_dims(dist, 1)

        loss_tensor = get_loss_tensor(anchor_positive_dist, anchor_negative_dist)
        loss_tensor = tf.multiply(loss_tensor, mask)

        #num_non_easy_triplets = tf.reduce_sum(tf.to_float(tf.greater(loss_tensor, 1e-16)))
        num_non_easy_triplets = tf.reduce_sum(tf.cast(tf.greater(loss_tensor, 1e-16), dtype=tf.float32))
        #loss = tf.div_no_nan(tf.reduce_sum(loss_tensor), num_non_easy_triplets)
        loss = tf.math.divide_no_nan(tf.reduce_sum(loss_tensor), num_non_easy_triplets)
        return loss

    def batch_hard(labels, embeddings):
        """Compute the loss by generating only hardest valid triplets and averaging over the positive ones.
        One triplet per embedding, i.e. per anchor

        :param labels: 1-D tf.int32 `Tensor` with shape [batch_size] of multiclass integer labels.
        :param embeddings: 2-D float `Tensor` of embedding vectors. Embeddings should be l2 normalized.
        :return loss: tf.float32 scalar.
        """
        dist = euclidean_distance(embeddings, squared=True)
        adjacency = tf.cast(tf.equal(tf.reshape(labels, (-1, 1)), tf.reshape(labels, (1, -1))), tf.float32)

        pos_dist = tf.reduce_max(adjacency * dist, axis=1)
        inf = tf.constant(1e+9, tf.float32)
        neg_dist = tf.reduce_min((adjacency * inf) + dist, axis=1)

        loss_mat = get_loss_tensor(pos_dist, neg_dist)

        num_non_easy_triplets = tf.reduce_sum(tf.to_float(tf.greater(loss_mat, 1e-16)))
        loss = tf.div_no_nan(tf.reduce_sum(loss_mat), num_non_easy_triplets)
        return loss

    if strategy == 'batch_semi_hard':
        return batch_semi_hard
    elif strategy == 'batch hard':
        return batch_hard
    else:
        return batch_all


Neural network building

In [6]:
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import os
import random
import numpy as np
import pandas as pd
from random import  sample
from collections import Counter
from itertools import combinations


from keras.models import load_model
from keras.models import Model, Sequential
from keras import backend as K
from keras.optimizers import RMSprop, Adam
from keras.applications.mobilenet import MobileNet
from keras.layers import Input, Lambda, Dense, Flatten
from keras.callbacks import ModelCheckpoint
from sklearn.neighbors import KNeighborsClassifier
from keras.utils.generic_utils import CustomObjectScope


from keras.callbacks import Callback
from tensorflow.python.keras.utils.generic_utils import Progbar


def get_str2numb_numb2dict(vect):
    str_to_ind_dict = {}
    count = 0
    for v in vect:
        if v not in str_to_ind_dict.keys():
            str_to_ind_dict[v] = count
            count += 1
    reverse_dict = {v:k for k, v in str_to_ind_dict.items()}
    return str_to_ind_dict, reverse_dict

def apply_dict(dict_keys, X):
    res = []
    for x in X:
        res.append(dict_keys[x])
    return res

class ProgbarLossLogger(Callback):
    def __init__(self):
        super(ProgbarLossLogger, self).__init__()

    def on_train_begin(self, logs=None):
        self.epochs = self.params['epochs']

    def on_epoch_begin(self, epoch, logs=None):
        self.seen = 0
        self.target = self.params['steps']

        if self.epochs > 1:
            print('Epoch %d/%d' % (epoch + 1, self.epochs))
        self.progbar = Progbar(target=self.target, verbose=True, stateful_metrics=['loss'])

    def on_batch_begin(self, batch, logs=None):
        if self.seen < self.target:
            self.log_values = []

    def on_batch_end(self, batch, logs=None):
        logs = logs or {}
        num_steps = logs.get('num_steps', 1)
        self.seen += num_steps

        for k in self.params['metrics']:
            if k in logs:
                self.log_values.append((k, logs[k]))
        self.progbar.update(self.seen, self.log_values)
        
class TripletModel:
    def __init__(self, alpha, input_shape, cache_dir):
        self.alpha = alpha
        self.input_shape = input_shape
        self.cache_dir = cache_dir
        if not os.path.isdir(self.cache_dir):
            os.makedirs(self.cache_dir)
        self.model = self.build_model()
        self.embeddings = None
        
    def build_model(self):
        
        base_network = MobileNet(input_shape=self.input_shape, alpha=self.alpha, weights='imagenet', include_top=False, 
                                 pooling='avg')
        x = Dense(128)(base_network.output)
        x = Lambda(lambda x: K.l2_normalize(x, axis=1))(x)
        model = Model(inputs=base_network.input, outputs=x)
        model.summary()
        return model
           
    def train(self, train_dir, train_csv, validation_dir, validation_csv, epochs, batch_size=32, learning_rate=0.001, margin=0.5):
        train = pd.read_csv(train_csv)
        # validation = pd.read_csv(validation_csv)
        x_train, y_train = train['file_name'].as_matrix(), train['label'].as_matrix()
        # x_validation, y_validation = validation['file_name'].as_matrix(), validation['label'].as_matrix()
        
        str2ind_train_dict, ind2str_train_dict = get_str2numb_numb2dict(y_train)
        y_train = np.array(apply_dict(str2ind_train_dict, y_train))

        # str2ind_val_dict, ind2str_val_dict = get_str2numb_numb2dict(y_validation)
        # y_validation = np.array(apply_dict(str2ind_val_dict, y_validation))
        
        self.num_classes = len(np.unique(y_train))
        train_generator = WordsSequence(train_dir, input_shape=self.input_shape, x_set=x_train, y_set=y_train, batch_size=batch_size)
        # validation_generator = WordsSequence(validation_dir, input_shape=self.input_shape, x_set=validation_pairs, y_set=validation_y, batch_size=batch_size)

        # optimize = RMSprop(lr=learning_rate)
        optimize = Adam(lr=0.00001)
        self.model.summary()
        self.model.compile(loss=triplet_loss.triplet_loss(margin=1.0, strategy="batch_all"), optimizer=optimize)
        
        # validation_data=validation_generator, 
        self.model.fit_generator(train_generator, shuffle=True, epochs=epochs, verbose=1, 
        callbacks=[ModelCheckpoint(filepath=os.path.join(self.cache_dir, 'checkpoint-{epoch:02d}.h5'), save_weights_only=True)])
        
        self.model.save('final_model.h5')
        self.save_weights('final_weights.h5')


    def save_embeddings(self, filename):
        self.embeddings.to_pickle(filename)
    
    def load_embeddings(self, filename):
        self.embeddings = pd.read_pickle(filename)    
        
    def save_weights(self, filename):
        self.model.save_weights(filename)
        
    def load_weights(self, filename):
        self.model.load_weights(filename, by_name=True, skip_mismatch=True)
        
    
    def make_embeddings(self, img_dir, csv, batch_size=32):
        if self.embeddings is not None:
            print(self.embeddings[0][0])
            self.clf = KNeighborsClassifier(n_neighbors=1, metric='euclidean')
            self.clf.fit(self.embeddings[0][0], self.embeddings[0][1])
        else:
            data = pd.read_csv(csv)
            x, y = data['file_name'].as_matrix(), data['label'].as_matrix()
            
            self.str2ind_test_dict, self.ind2str_test_dict = get_str2numb_numb2dict(y)
            y = np.array(apply_dict(self.str2ind_test_dict, y))

            words = WordsSequence(img_dir, input_shape=self.input_shape, x_set=x, batch_size=batch_size)
            pred = self.model.predict_generator(words, verbose=1)

            self.clf = KNeighborsClassifier(n_neighbors=1, metric='euclidean')
            self.clf.fit(pred, y) 
     
            self.embeddings =  pd.DataFrame(data=[pred, y])
            self.save_embeddings('embeddings.pkl')
    
    def predict(self, img_dir, test_csv, batch_size=32):
        self.model.summary()
        test = pd.read_csv(test_csv)
        x_test, y_test = test['file_name'].as_matrix(), test['label'].as_matrix()
        
        str2ind_test_dict, ind2str_test_dict = get_str2numb_numb2dict(y_test)
        test_y = np.array(apply_dict(str2ind_test_dict, y_test))

        words = WordsSequence(img_dir, input_shape=self.input_shape, x_set=x_test, batch_size=batch_size)
        test_embeddings = self.model.predict_generator(words, verbose=1)
 
        res = self.clf.predict(test_embeddings) 
        predict = np.array(apply_dict(ind2str_test_dict , res))
        count = 0
        for i,j in zip(predict, y_test):
            if i == j:
                count += 1

        print('word accuracy: ', count / len(y_test))
        
        count = 0
        autors = np.unique(y_test)
        autor_ind = [np.argwhere(y_test == a) for a in autors]
        for i,inds in enumerate(autor_ind):
            p = Counter(np.ravel(predict[inds])).most_common(1)[0][0]
            if p == autors[i]:
                count += 1

        print('top-5 autor accuracy: ', count / len(autors))
        
        count = 0
        for i,inds in enumerate(autor_ind):
            p = [pair[0] for pair in Counter(np.ravel(predict[inds])).most_common(5)]
            if autors[i] in p:
                count += 1

        print('top-5 autor accuracy: ', count / len(autors))
 
