In [40]:
def _get_triplet_mask(labels):
    """Return a 3D mask where mask[a, p, n] is True iff the triplet (a, p, n) is valid.
    A triplet (i, j, k) is valid if:
        - i, j, k are distinct
        - labels[i] == labels[j] and labels[i] != labels[k]
    Args:
        labels: tf.int32 `Tensor` with shape [batch_size]
    """
    # Check that i, j and k are distinct
    #labels = tf.reduce_sum(labels, axis=1)
    indices_equal = tf.cast(tf.eye(tf.shape(labels)[0]), tf.bool)
    indices_not_equal = tf.logical_not(indices_equal)
    i_not_equal_j = tf.expand_dims(indices_not_equal, 2)
    i_not_equal_k = tf.expand_dims(indices_not_equal, 1)
    j_not_equal_k = tf.expand_dims(indices_not_equal, 0)
    distinct_indices = tf.logical_and(tf.logical_and(i_not_equal_j, i_not_equal_k), j_not_equal_k)
    # Check if labels[i] == labels[j] and labels[i] != labels[k]
    label_equal = tf.equal(tf.expand_dims(labels, 0), tf.expand_dims(labels, 1))
    i_equal_j = tf.expand_dims(label_equal, 2)
    i_equal_k = tf.expand_dims(label_equal, 1)
    valid_labels = tf.logical_and(i_equal_j, tf.logical_not(i_equal_k))
    # Combine the two masks
    mask = tf.logical_and(distinct_indices, valid_labels)
    return mask
def _pairwise_distances(embeddings, squared=True):
    """Compute the 2D matrix of distances between all the embeddings.

    Args:
        embeddings: tensor of shape (batch_size, embed_dim)
        squared: Boolean. If true, output is the pairwise squared euclidean distance matrix.
                 If false, output is the pairwise euclidean distance matrix.

    Returns:
        pairwise_distances: tensor of shape (batch_size, batch_size)
    """
    # Get the dot product between all embeddings
    # shape (batch_size, batch_size)
    dot_product = tf.matmul(embeddings, tf.transpose(embeddings))

    # Get squared L2 norm for each embedding. We can just take the diagonal of `dot_product`.
    # This also provides more numerical stability (the diagonal of the result will be exactly 0).
    # shape (batch_size,)
    square_norm = tf.diag_part(dot_product)

    # Compute the pairwise distance matrix as we have:
    # ||a - b||^2 = ||a||^2  - 2 <a, b> + ||b||^2
    # shape (batch_size, batch_size)
    distances = tf.expand_dims(square_norm, 0) - 2.0 * dot_product + tf.expand_dims(square_norm, 1)

    # Because of computation errors, some distances might be negative so we put everything >= 0.0
    distances = tf.maximum(distances, 0.0)

    if not squared:
        # Because the gradient of sqrt is infinite when distances == 0.0 (ex: on the diagonal)
        # we need to add a small epsilon where distances == 0.0
        mask = K.to_float(tf.equal(distances, 0.0))
        distances = distances + mask * 1e-16

        distances = tf.sqrt(distances)

        # Correct the epsilon added: set the distances on the mask to be exactly 0.0
        distances = distances * (1.0 - mask)

    return distances

def _get_anchor_positive_triplet_mask(labels):
    """Return a 2D mask where mask[a, p] is True iff a and p are distinct and have same label.
    Args:
        labels: tf.int32 `Tensor` with shape [batch_size]
    Returns:
        mask: tf.bool `Tensor` with shape [batch_size, batch_size]
    """
    # Check that i and j are distinct
    indices_equal = tf.cast(tf.eye(tf.shape(labels)[0]), tf.bool)
    indices_not_equal = tf.logical_not(indices_equal)

    # Check if labels[i] == labels[j]
    # Uses broadcasting where the 1st argument has shape (1, batch_size) and the 2nd (batch_size, 1)
    labels_equal = tf.equal(tf.expand_dims(labels, 0), tf.expand_dims(labels, 1))

    # Combine the two masks
    mask = tf.logical_and(indices_not_equal, labels_equal)

    return mask

def _get_anchor_negative_triplet_mask(labels):
    """Return a 2D mask where mask[a, n] is True iff a and n have distinct labels.
    Args:
        labels: tf.int32 `Tensor` with shape [batch_size]
    Returns:
        mask: tf.bool `Tensor` with shape [batch_size, batch_size]
    """
    # Check if labels[i] != labels[k]
    # Uses broadcasting where the 1st argument has shape (1, batch_size) and the 2nd (batch_size, 1)
    labels_equal = tf.equal(tf.expand_dims(labels, 0), tf.expand_dims(labels, 1))

    mask = tf.logical_not(labels_equal)

    return mask


def batch_hard_triplet_loss(y_true,embeddings):
    """Build the triplet loss over a batch of embeddings.
    For each anchor, we get the hardest positive and hardest negative to form a triplet.
    Args:
        labels: labels of the batch, of size (batch_size,)
        embeddings: tensor of shape (batch_size, embed_dim)
        margin: margin for triplet loss
        squared: Boolean. If true, output is the pairwise squared euclidean distance matrix.
                 If false, output is the pairwise euclidean distance matrix.
    Returns:
        triplet_loss: scalar tensor containing the triplet loss
    """
    # Get the pairwise distance matrix
    margin=0.3
    labels=tf.reduce_sum(y_true,axis=1)
    pairwise_dist = _pairwise_distances(embeddings, squared=True)

    # For each anchor, get the hardest positive
    # First, we need to get a mask for every valid positive (they should have same label)
    mask_anchor_positive = _get_anchor_positive_triplet_mask(labels)
    mask_anchor_positive = tf.to_float(mask_anchor_positive)

    # We put to 0 any element where (a, p) is not valid (valid if a != p and label(a) == label(p))
    anchor_positive_dist = K.multiply(mask_anchor_positive, pairwise_dist)

    # shape (batch_size, 1)
    hardest_positive_dist = K.reduce_max(anchor_positive_dist, axis=1, keepdims=True)
    K.summary.scalar("hardest_positive_dist", K.reduce_mean(hardest_positive_dist))

    # For each anchor, get the hardest negative
    # First, we need to get a mask for every valid negative (they should have different labels)
    mask_anchor_negative = _get_anchor_negative_triplet_mask(labels)
    mask_anchor_negative = K.to_float(mask_anchor_negative)

    # We add the maximum value in each row to the invalid negatives (label(a) == label(n))
    max_anchor_negative_dist = tf.reduce_max(pairwise_dist, axis=1, keepdims=True)
    anchor_negative_dist = pairwise_dist + max_anchor_negative_dist * (1.0 - mask_anchor_negative)

    # shape (batch_size,)
    hardest_negative_dist = tf.reduce_min(anchor_negative_dist, axis=1, keepdims=True)
    tf.summary.scalar("hardest_negative_dist", tf.reduce_mean(hardest_negative_dist))

    # Combine biggest d(a, p) and smallest d(a, n) into final triplet loss
    triplet_loss = tf.maximum(hardest_positive_dist - hardest_negative_dist + margin, 0.0)

    # Get final mean triplet loss
    triplet_loss = tf.reduce_mean(triplet_loss)

    return triplet_loss

def  calculation_triplet_loss(margin,embeddings,labels):
    pairwise_dist = _pairwise_distances(embeddings, squared=True)
    anchor_positive_dist = tf.expand_dims(pairwise_dist, 2)
    anchor_negative_dist = tf.expand_dims(pairwise_dist, 1)
    triplet_loss=anchor_positive_dist - anchor_negative_dist + margin
    mask = _get_triplet_mask(labels)
    mask = tf.to_float(mask)
    triplet_loss=tf.multiply(mask,triplet_loss)
    # Remove negative losses (i.e. the easy triplets)
    triplet_loss = tf.maximum(triplet_loss, 0.0)
    valid_triplets= tf.to_float(tf.greater(triplet_loss, 1e-16))
    num_positive_triplets = tf.reduce_sum(valid_triplets)
    num_valid_triplets = tf.reduce_sum(mask)
    fraction_positive_triplets= num_positive_triplets / (num_valid_triplets + 1e-16)
    # Get final mean triplet loss over the positive valid triplets
    triplet_loss= tf.reduce_sum(triplet_loss) / (num_positive_triplets + 1e-16)
    return triplet_loss

def batch_all_triplet_loss(y_true_2):cd 
    def custom_loss(y_true,embeddings): 
        """Build the triplet loss over a batch of embeddings.

        We generate all the valid triplets and average the loss over the positive ones.

        Args:
            labels: labels of the batch, of size (batch_size,)
            embeddings: tensor of shape (batch_size, embed_dim)
            margin: margin for triplet loss
            squared: Boolean. If true, output is the pairwise squared euclidean distance matrix.
                     If false, output is the pairwise euclidean distance matrix.

        Returns:
            triplet_loss: scalar tensor containing the triplet loss
        """
        # Get the pairwise distance matrix
        margin=0.4
        labels_1=tf.reduce_sum(y_true,axis=1)
        labels_2=y_true_2
        embeddings_1,embeddings_2=tf.split(embeddings,2)
        triplet_loss_1=calculation_triplet_loss(margin,embeddings_1,labels_1)
        triplet_loss_2=calculation_triplet_loss(margin,embeddings_2,labels_2)                    
        triplet_loss=tf.reduce_mean(triplet_loss_1,triplet_loss_2)
        return triplet
    return custom_loss

In [41]:
from __future__ import print_function
import keras
from keras.datasets import mnist
from keras.models import Sequential
from keras.layers import Dense, Dropout, Flatten
from keras.layers import Conv2D, MaxPooling2D
from keras import backend as K
import keras
from keras import regularizers
from keras.models import Model
from keras.layers import Input, Dense, Conv2D, MaxPooling2D, Dropout, Flatten, Lambda
import numpy as np
from keras import optimizers
import matplotlib.pyplot as plt
from IPython.display import clear_output
from collections import defaultdict
from sklearn.datasets import fetch_mldata
from keras.datasets import cifar10,mnist
from keras.callbacks import ModelCheckpoint#save your model
import os
import pandas as pd
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow.contrib.tensorboard.plugins import projector
from tensorflow.examples.tutorials.mnist import input_data
np.random.seed(1)
np.random.seed(21)
batch_size = 128
num_classes = 10
epochs = 5

# input image dimensions
img_rows, img_cols = 28, 28

# the data, split between train and test sets
(x_train, y_train), (x_test, y_test) = mnist.load_data()

if K.image_data_format() == 'channels_first':
    x_train = x_train.reshape(x_train.shape[0], 1, img_rows, img_cols)
    x_test = x_test.reshape(x_test.shape[0], 1, img_rows, img_cols)
    input_shape = (1, img_rows, img_cols)
else:
    x_train = x_train.reshape(x_train.shape[0], img_rows, img_cols, 1)
    x_test = x_test.reshape(x_test.shape[0], img_rows, img_cols, 1)
    input_shape = (img_rows, img_cols, 1)

x_train = x_train.astype('float32')
x_test = x_test.astype('float32')
x_train /= 255
x_test /= 255
print('x_train shape:', x_train.shape)
print(x_train.shape[0], 'train samples')
print(x_test.shape[0], 'test samples')

y_train_1=y_train
y_test_1=y_test
# convert class vectors to binary class matrices
y_train = keras.utils.to_categorical(y_train, num_classes)
y_test = keras.utils.to_categorical(y_test, num_classes)

x_train shape: (60000, 28, 28, 1)
60000 train samples
10000 test samples


In [42]:
def convert_red(array): 
    array_size=np.shape(array)[0]
    add_mat=np.zeros((array_size,img_rows,img_cols,1))
    red_stuff=np.concatenate((array,add_mat,add_mat),axis=3)
    return red_stuff

def convert_green(array):
    array_size=np.shape(array)[0]
    add_mat=np.zeros((array_size,img_rows,img_cols,1))
    green_stuff=np.concatenate((add_mat,array,add_mat),axis=3)
    return green_stuff

def convert_blue(array):
    array_size=np.shape(array)[0]
    add_mat=np.zeros((array_size,img_rows,img_cols,1))
    green_stuff=np.concatenate((add_mat,add_mat,array),axis=3)
    return green_stuff

In [43]:
(x_train, y_train), (x_test, y_test) = mnist.load_data()
if K.image_data_format() == 'channels_first':
    x_train = x_train.reshape(x_train.shape[0], 1, img_rows, img_cols)
    x_test = x_test.reshape(x_test.shape[0], 1, img_rows, img_cols)
    input_shape = (1, img_rows, img_cols)
else:
    x_train = x_train.reshape(x_train.shape[0], img_rows, img_cols, 1)
    x_test = x_test.reshape(x_test.shape[0], img_rows, img_cols, 1)
    input_shape = (img_rows, img_cols, 1)

x_train = x_train.astype('float32')
x_test = x_test.astype('float32')
x_train /= 255
x_test /= 255

y_train_2=np.zeros(60000)    
sample_1_x=x_train[:20000]
sample_1_x=convert_blue(sample_1_x)

sample_2_x=x_train[20000:40000]
y_train_2[20000:40000]=1
sample_2_x=convert_red(sample_2_x)

sample_3_x=x_train[40000:]
y_train_2[40000:]=2
sample_3_x=convert_green(sample_3_x)

new_x_train=np.concatenate((sample_1_x,sample_2_x,sample_3_x),axis=0)
print(new_x_train.shape)
print(y_train_2.shape)
#plt.imshow(sample_1_x[5].reshape(28,28,3),cmap='gray')

(60000, 28, 28, 3)
(60000,)


In [44]:
def norm(inputs):
    return K.sqrt(K.sum(K.square(inputs), axis=1, keepdims=True) + 1e-16)
def put_on_sphere(inputs):
    return inputs/norm(inputs)

In [45]:
def k_nearest_neighbor(embedding_values,labels,test_value,k): 
    '''calculates the k nearest neighbor a a given test_value and a particular k'''
    similar=np.zeros((embedding_values.shape)[0])
    for i,emb_vec in enumerate(embedding_values):
        similar[i] = distance(test_value,emb_vec)
    arrangement= np.argsort(similar)
    arrangement=arrangement[:k]
    k_nearest=labels[arrangement]
    correct_index_list=stats.mode(k_nearest)
    correct_index=correct_index_list[0].tolist()
    return correct_index[0]

In [46]:
def triplet_knn_accuracy(k, labels_train, pairwise_dist, num_classes):
    
    '''accuracy function'''
    
    pairwise_dist = tf.matrix_set_diag(pairwise_dist, tf.fill([tf.shape(pairwise_dist)[0]], 1e+12))
    knn_values, knn_indices = tf.nn.top_k(-pairwise_dist, k=k, sorted=True)
    knn_labels = tf.gather(labels_train, knn_indices)
    print(knn_labels)
    knn_labels=tf.cast(knn_labels,tf.int32)
    knn_labels_onehot = tf.one_hot(knn_labels, num_classes, axis=2)
    knn_labels_dist = tf.reduce_sum(knn_labels_onehot, axis=1)
    pred = tf.argmax(knn_labels_dist, axis=1, output_type=tf.int32)
    x=tf.equal(pred,labels_train)
    print(labels_train)
    print(pred)
    accuracy = tf.reduce_mean(tf.to_float(tf.equal(pred, labels_train)))
    return accuracy

In [47]:
def custom_accuracy(y_true,embeddings):
    labels=tf.reduce_sum(y_true,axis=1)
    labels=tf.cast(labels,tf.int32)
    pairwise_dist=_pairwise_distances(embeddings)
    accuracy=triplet_knn_accuracy(1,labels,pairwise_dist,10)
    return accuracy

In [48]:
from keras.layers import BatchNormalization
from keras.callbacks import TensorBoard

tensorboard = TensorBoard(log_dir='./Desktop/logdir', histogram_freq=0,
                          write_graph=True, write_images=False)
K.clear_session()
model = Sequential()
model.add(Conv2D(16, kernel_size=(3, 3),
                 activation='relu',
                 input_shape=(28,28,3)))
model.add(BatchNormalization())
model.add(MaxPooling2D(pool_size=(2,2)))
model.add(BatchNormalization())
model.add(Flatten())
model.add(Dense(32, activation='relu'))
model.add(Dense(8, activation=None))
model.add(Lambda(put_on_sphere))
adam =keras.optimizers.Adam(0.001,beta_1=0.9, beta_2=0.999)
model.compile(loss=batch_all_triplet_loss(y_train_2), optimizer=adam) #metrics=['accuracy',custom_accuracy])
model.fit(new_x_train, y_train_1,batch_size=5,epochs=10,verbose=1) #validation_data=(x_test, y_test_1))

ValueError: Tensor("add_1:0", shape=(?, ?, ?), dtype=float32) must be from the same graph as Tensor("loss/lambda_1_loss/ToFloat:0", shape=(?, ?, ?), dtype=float32).

In [None]:
embedding = model.predict(x_test)
logdir="./Desktop/logdir"
#tf.reset_default_graph()
session=tf.Session()
embedding_var=tf.Variable(embedding,name='mnist_embedding')
saver=tf.train.Saver()
session.run(tf.global_variables_initializer())
saver.save(session,os.path.join(logdir,"model.ckpt"))

In [None]:
def write_meta_file(savedir, labels):
    with open(savedir+"/metadata.tsv", "w") as metafile:
        string_array = []
        for label in labels:
            string_array.append(str(label)+"\n")
        metafile.writelines(string_array)

In [None]:
write_meta_file(logdir,y_test_1)

In [None]:
def distance(emb1, emb2):
    return np.sqrt(np.sum(np.square(emb1-emb2)))

In [None]:
from scipy import stats
def k_nearest_neighbor(embedding_values,labels,test_value,k): 
    '''calculates the k nearest neighbor a a given test_value and a particular k'''
    similar=np.zeros((embedding_values.shape)[0])
    for i,emb_vec in enumerate(embedding_values):
        similar[i] = distance(test_value,emb_vec)
    arrangement= np.argsort(similar)
    arrangement=arrangement[:k]
    k_nearest=labels[arrangement]
    correct_index_list=stats.mode(k_nearest)
    correct_index=correct_index_list[0].tolist()
    return correct_index[0]

In [None]:
embedding_1 = model.predict(x_train)
count=0
for i in range(20):
    test_image=x_test[i]
    test_image=model.predict(np.expand_dims(test_image,axis=0))
    correct_index=k_nearest_neighbor(embedding_1,y_train_1,test_image,1)
    if (correct_index==y_test_1[i]):
        count=count+1
print(count)

In [None]:
test_image=x_train[100]
test_image=model.predict(np.expand_dims(test_image,axis=0))
correct_index=k_nearest_neighbor(embedding_1,y_train_1,test_image,1)
print(correct_index)
print(y_train_1[100])