<a href="https://colab.research.google.com/github/Mary82/cross-Modal-Learning/blob/master/TripletLoss.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
import keras
from keras.models import Sequential
from keras.preprocessing.image import ImageDataGenerator
from keras.layers import Dense, Activation, Flatten, Dropout, BatchNormalization
from keras.layers import Conv2D, MaxPooling2D,Input
from keras import regularizers
from keras.callbacks import LearningRateScheduler
import numpy as np
import keras.backend as K
from keras.models import Model
from keras.layers.core import Lambda
from keras.optimizers import Adam
import random
import itertools
from keras.models import model_from_json
from google.colab import drive
import os
drive.mount('/content/gdrive')
os.chdir('/content/gdrive/My Drive/Colab Notebooks/')
from TripletLoss_utils import *

In [2]:
num_classes = 100
json_file = open('Models/cifar_model.json', 'r')
loaded_model_json = json_file.read()
json_file.close()
cifar_model = model_from_json(loaded_model_json)
# load weights into new model
cifar_model.load_weights("Models/CIFAR100_Weight.h5")
print("Loaded model from disk")
cifar_model.summary()

Loaded model from disk
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
conv2d_1 (Conv2D)            (None, 32, 32, 128)       3584      
_________________________________________________________________
activation_1 (Activation)    (None, 32, 32, 128)       0         
_________________________________________________________________
conv2d_2 (Conv2D)            (None, 30, 30, 128)       147584    
_________________________________________________________________
activation_2 (Activation)    (None, 30, 30, 128)       0         
_________________________________________________________________
max_pooling2d_1 (MaxPooling2 (None, 15, 15, 128)       0         
_________________________________________________________________
conv2d_3 (Conv2D)            (None, 15, 15, 256)       295168    
_________________________________________________________________
activation_3 (Activation)    (None, 15, 15, 256)     

In [0]:
Xs, Ys = load_cifar_flatten(cifar_model,num_classes)

In [4]:
print Xs.shape

(10000, 2048)


In [0]:
def build_model_tpl(n_in):
    X_input = Input(n_in)
    X = Dense(512, activation='relu', name = 'dense_layer_1')(X_input)
    X = BatchNormalization(name = 'Batch_layer_1')(X)
    X = Dropout(.2)(X)
    X = Dense(128, name='dense_layer_2')(X)
    X = Lambda(lambda  x: K.l2_normalize(x,axis=1))(X)
    emb_model = Model(inputs = X_input, outputs = X, name = 'ImageEmb')
    # load pre-trained weights into model
    emb_model.load_weights("Models/img_emb_model.h5")
    
    # anchor, positive, negative inputs
    a = Input(shape = n_in)
    p = Input(shape = n_in)
    n = Input(shape = n_in)

    a_emb = emb_model(a)
    p_emb = emb_model(p)
    n_emb = emb_model(n)

    loss = Lambda(triplet_loss)([a_emb,p_emb, n_emb])
    model = Model(inputs = [a, p, n], outputs = loss)

    model.compile(loss=identity_loss, optimizer=Adam(0.000003), metrics = [accuracy])
    return model, emb_model

In [6]:
# Create training data for triplet loss
random.seed(1)
subjects = np.unique(Ys).tolist()
anchors_inds = []
positives_inds = []
labels = []
train = random.sample(range(Xs.shape[0]),6000)
test = list(set(range(Xs.shape[0])) - set(train))

x_train = Xs[train]
y_train = Ys[train]
x_test = Xs[test]
y_test = Ys[test]
for subj in subjects:
    mask = y_train == subj
    inds = np.where(mask)[0]
    for a, p in itertools.permutations(inds, 2):
        a,p = random.sample(inds,2)
        anchors_inds.append(a)
        positives_inds.append(p)
        labels.append(subj)

anchors = x_train[anchors_inds]
positives = x_train[positives_inds]
n_anchors = len(anchors_inds)

print n_anchors


356246


In [0]:
tpl_model, emb_model = build_model_tpl(Xs[1].shape)


In [0]:
NB_EPOCH = 100
BATCH_SIZE = 128
BIG_BATCH_SIZE = 49000

inds = np.arange(n_anchors)

def get_batch(hard=False):
    batch_inds = np.random.choice(inds, size=BIG_BATCH_SIZE, replace=False)

    train_emb = emb_model.predict(x_train, batch_size=1024)
    scores = np.matmul(train_emb , train_emb.T)
    negative_inds = []

    for i in batch_inds:
        label = labels[i]
        mask = y_train == label
        if hard: # picking the closest negative to the anchor
            negative_inds.append(np.ma.array(scores[anchors_inds[i]], mask=mask).argmax())
        else: # picking randomly
            negative_inds.append(np.random.choice(np.where(np.logical_not(mask))[0], size=1)[0])

    return anchors[batch_inds], positives[batch_inds], x_train[negative_inds]


In [0]:
# Training the model each epoch with different set of data as (a,p,n)
z = np.zeros((BIG_BATCH_SIZE,))

for e in range(1):
    a, p, n = get_batch(e > 100)
    tpl_model.fit([a, p, n], z, batch_size=BATCH_SIZE, epochs=1, verbose = 0)


In [12]:
z = np.zeros((BIG_BATCH_SIZE,))
a, p, n = get_batch(True)
tpl_model.evaluate([a, p, n], z)



[0.0, 1.0]

In [0]:
def test_data():
    anchors_test_inds = []
    positives_test_inds = []
    negative_test_inds = []
    for subj in subjects:
        mask_test = y_test == subj
        mask_train = y_train == subj
        test_inds = np.where(mask_test)[0]
        train_inds = np.where(mask_train)[0]
        for p_ in test_inds:
            a_ = random.sample(train_inds,1)[0]
            anchors_test_inds.append(a_)
            positives_test_inds.append(p_)
            negative_test_inds.append(np.random.choice(np.where(np.logical_not(mask_train))[0], size=1)[0])


    a_test = x_train[anchors_test_inds]
    p_test = x_test[positives_test_inds]
    n_test = x_train[negative_test_inds]
    return a_test, p_test, n_test

In [0]:
a_test, p_test, n_test = test_data()
z = np.zeros((len(a_test),))



In [18]:
s = 0
c = 0
for i in range(len(a_test)):
    a_ = emb_model.predict(a_test[i:i+1])
    p_ = emb_model.predict(p_test[i:i+1])
    n_ = emb_model.predict(n_test[i:i+1])
    pos_dist = np.sum(np.square(a_ - p_))
    neg_dist = np.sum(np.square(a_ - n_))
    c += 1 if pos_dist- neg_dist +.2 > 0 else 0
    s += max(0.0,pos_dist- neg_dist+.2)
print "test accuracy: ", 1 - c*1.0/len(a_test)
print "test loss: ", s/len(a_test)
    

test accuracy:  0.97325
test loss:  0.0043714195847511204


In [0]:

model_json = emb_model.to_json()
with open('Models/img_emb_model.json', 'w') as json_file:
    json_file.write(model_json)
emb_model.save_weights('Models/img_emb_model.h5') 