In [0]:
import numpy as np
from sklearn.metrics import roc_curve
from sklearn.neighbors import KNeighborsClassifier
import random
import matplotlib.patheffects as PathEffects

In [3]:
from keras.layers import Input, Conv2D, Lambda, Dense, Flatten,MaxPooling2D, concatenate
from keras.models import Model, Sequential
from keras.regularizers import l2
from keras import backend as K
from keras.optimizers import SGD,Adam
from keras.losses import binary_crossentropy
import os
import pickle
import matplotlib.pyplot as plt

Using TensorFlow backend.


In [0]:
from itertools import permutations
import seaborn as sns
from keras.datasets import mnist
from sklearn.manifold import TSNE
from sklearn.svm import SVC
import json

In [5]:
x=np.load("/content/drive/My Drive/features.npy")
y=np.load("/content/drive/My Drive/labels.npy")
print(x.shape)
print(y.shape)

(30354, 30008)
(30354,)


In [6]:
a,b=np.unique(y,return_counts=True)
print(a)
print(b)
print(len(a))

[ 0  1  2  3  4  5  6  7  8  9 10 11 12 14 15 16 17 18 19 20 21 22 23 24
 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69]
[5113 4345 2314 2366    3   75  109   37    5 2417 2237    8    7 1539
 1481    5    3 1372   20    5    3  734  762  754    3  569  610  417
  366  222  189  381  364  205  159  156  120   94   72   64   51   60
   57   48   38   29   27   45   21   21    9   32   15   21   13   21
   23   17   23   15    9    9    6    7    7   10    6    5    4]
69


In [0]:
def generate_triplet(x,y,testsize=0.3,ap_pairs=10,an_pairs=10):
    data_xy = tuple([x,y])

    trainsize = 1-testsize

    triplet_train_pairs = []
    triplet_test_pairs = []
    triplet_train_labels=[]
    triplet_test_labels=[]

    for data_class in sorted(set(data_xy[1])):

        same_class_idx = np.where((data_xy[1] == data_class))[0]
        diff_class_idx = np.where(data_xy[1] != data_class)[0]
        if(len(same_class_idx)<ap_pairs):
            continue
        A_P_pairs = random.sample(list(permutations(same_class_idx,2)),k=ap_pairs) #Generating Anchor-Positive pairs
        Neg_idx = random.sample(list(diff_class_idx),k=an_pairs)
        

        #train
        A_P_len = len(A_P_pairs)
        Neg_len = len(Neg_idx)
        for ap in A_P_pairs[:int(A_P_len*trainsize)]:
            Anchor = data_xy[0][ap[0]]
            Positive = data_xy[0][ap[1]]
            for n in Neg_idx:
                Negative = data_xy[0][n]
                triplet_train_pairs.append([Anchor,Positive,Negative])  
                triplet_train_labels.append(data_xy[1][n])             
        #test
        for ap in A_P_pairs[int(A_P_len*trainsize):]:
            Anchor = data_xy[0][ap[0]]
            Positive = data_xy[0][ap[1]]
            for n in Neg_idx:
                Negative = data_xy[0][n]
                triplet_test_pairs.append([Anchor,Positive,Negative])   
                triplet_test_labels.append(data_xy[1][n])  
                
    return np.array(triplet_train_pairs), np.array(triplet_train_labels), np.array(triplet_test_pairs), np.array(triplet_test_labels)

In [8]:
X_train, y_train, X_test, y_test = generate_triplet(x,y, testsize=0.2,ap_pairs=10, an_pairs=10)
print(X_train.shape)
print(y_train.shape)
print(X_test.shape)
print(y_test.shape)

(4080, 3, 30008)
(4080,)
(1020, 3, 30008)
(1020,)


In [0]:
def triplet_loss(y_true, y_pred, alpha = 0.4):
    """
    Implementation of the triplet loss function
    Arguments:
    y_true -- true labels, required when you define a loss in Keras, you don't need it in this function.
    y_pred -- python list containing three objects:
            anchor -- the encodings for the anchor data
            positive -- the encodings for the positive data (similar to anchor)
            negative -- the encodings for the negative data (different from anchor)
    Returns:
    loss -- real number, value of the loss
    """

    print('y_pred.shape = ',y_pred)
    
    total_length = y_pred.shape.as_list()[-1]
    anchor = y_pred[:,0:int(total_length*1/3)]
    positive = y_pred[:,int(total_length*1/3):int(total_length*2/3)]
    negative = y_pred[:,int(total_length*2/3):int(total_length*3/3)]

    # distance between the anchor and the positive
    pos_dist = K.sum(K.square(anchor-positive),axis=1)

    # distance between the anchor and the negative
    neg_dist = K.sum(K.square(anchor-negative),axis=1)

    # compute loss
    basic_loss = pos_dist-neg_dist+alpha
    loss = K.maximum(basic_loss,0.0)
 
    return loss

In [0]:
def create_base_network(in_dims,b_dims):
    """
    Base network to be shared.
    """
    inputs=Input(shape=(100,100,3),name="inputs")
    features=Input(shape=(8,),name="features")
    hidden1=Conv2D(32,(7,7),padding="same",activation="relu")(inputs)
    pool1=MaxPooling2D((2,2),padding="same")(hidden1)
    hidden2=Conv2D(64,(5,5),padding="same",activation="relu")(pool1)
    pool2=MaxPooling2D((2,2),padding="same")(hidden2)
    flat_features=Flatten(name="Flatten")(pool2)
    merged=concatenate([flat_features,features])
    #hidden3=Dense(100,activation="relu")(merged)
    #output=Dense(1,activation="softmax")(hidden3)
    output=Dense(7,name="embedding")(merged)
    model=Model(inputs=[inputs,features],outputs=output)
    return model

In [0]:
adam_optim = Adam(lr=0.0001, beta_1=0.9, beta_2=0.999)

In [12]:
anchor_input = Input((100,100,3,), name='anchor_input')
anchor_bb=Input((8,),name='anchor_bounding_box')
positive_input = Input((100,100,3, ), name='positive_input')
positive_bb=Input((8,),name='positive_bounding_box')
negative_input = Input((100,100,3, ), name='negative_input')
negative_bb=Input((8,),name='negative_bounding_box')

# Shared embedding layer for positive and negative items
Shared_DNN = create_base_network([100,100,3],[8])


encoded_anchor = Shared_DNN(inputs=[anchor_input,anchor_bb])
encoded_positive = Shared_DNN(inputs=[positive_input,positive_bb])
encoded_negative = Shared_DNN(inputs=[negative_input,negative_bb])
print(encoded_anchor.shape)
print(encoded_positive.shape)
print(encoded_negative.shape)

merged_vector = concatenate([encoded_anchor, encoded_positive, encoded_negative], axis=-1, name='merged_layer')

model = Model(inputs=[anchor_input,positive_input, negative_input,anchor_bb,positive_bb,negative_bb], outputs=merged_vector)
#model=create_base_network([100,100,3],[8])
model.compile(loss=triplet_loss, optimizer=adam_optim,metrics=['accuracy'])





(?, 7)
(?, 7)
(?, 7)

y_pred.shape =  Tensor("merged_layer/concat:0", shape=(?, 21), dtype=float32)


In [13]:
model.summary()

Model: "model_2"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
anchor_input (InputLayer)       (None, 100, 100, 3)  0                                            
__________________________________________________________________________________________________
anchor_bounding_box (InputLayer (None, 8)            0                                            
__________________________________________________________________________________________________
positive_input (InputLayer)     (None, 100, 100, 3)  0                                            
__________________________________________________________________________________________________
positive_bounding_box (InputLay (None, 8)            0                                            
____________________________________________________________________________________________

In [20]:
Anchor = X_train[:,0,:][:,:30000].reshape(-1,100,100,3)
Positive = X_train[:,1,:][:,:30000].reshape(-1,100,100,3)
Negative = X_train[:,2,:][:,:30000].reshape(-1,100,100,3)
Anchor_bb=X_train[:,0,:][:,30000:].reshape(-1,8)
Positive_bb=X_train[:,1,:][:,30000:].reshape(-1,8)
Negative_bb=X_train[:,2,:][:,30000:].reshape(-1,8)


Anchor_test = X_test[:,0,:][:,:30000].reshape(-1,100,100,3)
Positive_test = X_test[:,1,:][:,:30000].reshape(-1,100,100,3)
Negative_test = X_test[:,2,:][:,:30000].reshape(-1,100,100,3)
Anchor_test_bb=X_test[:,0,:][:,30000:].reshape(-1,8)
Positive_test_bb=X_test[:,1,:][:,30000:].reshape(-1,8)
Negative_test_bb=X_test[:,2,:][:,30000:].reshape(-1,8)

Y_dummy = np.empty((Anchor.shape[0],300))
Y_dummy2 = np.empty((Anchor_test.shape[0],1))

model.fit([Anchor,Positive,Negative,Anchor_bb,Positive_bb,Negative_bb],y=y_train,validation_data=([Anchor_test,Positive_test,Negative_test,Anchor_test_bb,Positive_test_bb,Negative_test_bb],y_test), batch_size=32, epochs=50)

Train on 4080 samples, validate on 1020 samples
Epoch 1/50
Epoch 2/50

KeyboardInterrupt: ignored

In [0]:
trained_model = Model(inputs=[anchor_input,anchor_bb], outputs=encoded_anchor)

In [17]:
from sklearn.preprocessing import LabelBinarizer
le=LabelBinarizer()
y_train_onehot=le.fit_transform(y_train)
y_test_onehot=le.fit_transform(y_test)
print(y_train_onehot.shape)
print(y_test_onehot.shape)

(4080, 37)
(1020, 37)


In [19]:
X_train_trm = trained_model.predict([X_train[:,0,:][:,:30000].reshape(-1,100,100,3),X_train[:,0,:][:,30000:].reshape(-1,8)])
X_test_trm = trained_model.predict([X_test[:,0,:][:,:30000].reshape(-1,100,100,3),X_test[:,0,:][:,30000:].reshape(-1,8)])

Classifier_input = Input((7,))

Classifier_output = Dense(37, activation='softmax')(Classifier_input)
Classifier_model = Model(Classifier_input, Classifier_output)


Classifier_model.compile(optimizer='adam',loss='categorical_crossentropy',metrics=['accuracy'])

Classifier_model.fit(X_train_trm,y_train_onehot, validation_data=(X_test_trm,y_test_onehot),epochs=50)

Train on 4080 samples, validate on 1020 samples
Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50
Epoch 29/50
Epoch 30/50
Epoch 31/50
Epoch 32/50
Epoch 33/50
Epoch 34/50
Epoch 35/50
Epoch 36/50
Epoch 37/50
Epoch 38/50
Epoch 39/50
Epoch 40/50
Epoch 41/50
Epoch 42/50
Epoch 43/50
Epoch 44/50
Epoch 45/50
Epoch 46/50
Epoch 47/50
Epoch 48/50
Epoch 49/50
Epoch 50/50


<keras.callbacks.History at 0x7fe5575d2be0>

In [0]:
from google.colab import drive
drive.mount('/content/drive')