In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
%cd drive/MyDrive/final_proj

/content/drive/.shortcut-targets-by-id/1YMOPkl5pAMR5Y440QTaZiAbd1YXrDEUb/final_proj


In [None]:
%tensorflow_version 2.x
import tensorflow as tf
device_name = tf.test.gpu_device_name()
if device_name != '/device:GPU:0':
  raise SystemError('GPU device not found')
print('Found GPU at: {}'.format(device_name))

Found GPU at: /device:GPU:0


In [None]:
import os
import pickle
import numpy as np
import numpy.random as rng
from tensorflow.keras import layers, models
from keras import backend as K
from keras.regularizers import l2
from tensorflow.keras.optimizers import RMSprop
from tensorflow.keras.models import Model

In [None]:
train_in = open("mini-imagenet-cache-train.pkl", "rb")
train = pickle.load(train_in)
ungrouped_Xtrain = train["image_data"]
val_in = open("mini-imagenet-cache-val.pkl", "rb")
val = pickle.load(val_in)
ungrouped_Xval = val["image_data"]

In [None]:
train_mean = ungrouped_Xtrain.mean(axis=(0,1,2)) 
train_std = ungrouped_Xtrain.std(axis=(0,1,2))

ungrouped_Xtrain = ungrouped_Xtrain.astype('float32')
ungrouped_Xval = ungrouped_Xval.astype('float32')

In [None]:
ungrouped_Xtrain -= train_mean
ungrouped_Xtrain /= train_std
ungrouped_Xval -= train_mean
ungrouped_Xval /= train_std

In [None]:
train_data = ungrouped_Xtrain.reshape([64, 600, 84, 84, 3])
val_data = ungrouped_Xval.reshape([16, 600, 84, 84, 3])
train_data.shape

(64, 600, 84, 84, 3)

In [None]:
# returns three lists: anchor, positive, negative
def get_triple_batch(batch_size, train_data, input_shape):
    """
    Create three lists of anchor images, positive images, negative images
    """
    n_classes, n_examples, w, h, d = train_data.shape
    new_w, new_h, new_d = input_shape
    

    rng = np.random.default_rng()

    # randomly sample several classes to use in the batch
    categories = rng.choice(n_classes,size=(batch_size,),replace=False)
    
    # initialize 2 empty arrays for the input image batch
    anchor = np.zeros((batch_size, new_w, new_h, new_d))

    positive = np.zeros((batch_size, new_w, new_h, new_d))

    negative = np.zeros((batch_size, new_w, new_h, new_d))

    
    for i in range(batch_size):
        category = categories[i]
        idx_1 = np.random.randint(0, n_examples)
        anchor[i,:,:,:] = tf.image.resize(train_data[category, idx_1].reshape(w, h, d), (224, 224)).numpy()
        
        idx_pos = np.random.randint(0, n_examples)
        cat_pos = category
        positive[i,:,:,:] = tf.image.resize(train_data[cat_pos,idx_pos].reshape(w, h, d), (224, 224)).numpy()

        idx_neg = np.random.randint(0, n_examples)
        cat_neg = (category + np.random.randint(1,n_classes)) % n_classes
        negative[i,:,:,:] = tf.image.resize(train_data[cat_neg,idx_neg].reshape(w, h, d), (224, 224)).numpy()

    return anchor, positive, negative


def data_generator(data, batch_size, input_shape):
    while True:
        anchor_input, pos_input, neg_input = get_triple_batch(batch_size, data, input_shape)
        y = np.ones((batch_size))
        yield [pos_input,neg_input, anchor_input], y

In [None]:
def fn_loss(margin=1):
    """Provides 'triplet loss' an enclosing scope with variable 'margin'.

  Arguments:
      margin: Integer, defines the baseline for distance for which pairs
              should be classified as dissimilar. - (default is 1).

  Returns:
      'triplet_loss' function with data ('margin') attached.
  """
    # L(A, P, N) = max(‖f(A) - f(P)‖² - ‖f(A) - f(N)‖² + margin, 0)
    def triplet_loss(y_true, y_pred):
        anchor, positive, negative = y_pred[:,:emb_size], y_pred[:,emb_size:2*emb_size], y_pred[:,2*emb_size:]
        positive_dist = tf.reduce_sum(tf.square(anchor - positive), axis=1)
        negative_dist = tf.reduce_sum(tf.square(anchor - negative), axis=1)
      
        return tf.reduce_mean(tf.maximum(positive_dist - negative_dist + margin, 0.))
    

    return triplet_loss

In [None]:
def L2_Norm(vectors):
    # unpack the vectors into separate lists
    (featsA, featsB) = vectors
    # compute the sum of squared distances between the vectors
    sumSquared = K.sum(K.square(featsA - featsB), axis=1,
      keepdims=True)
    # return the euclidean distance between the vectors
    return K.sqrt(K.maximum(sumSquared, K.epsilon()))

In [None]:
input_shape = (224, 224, 3)
input_p = layers.Input(input_shape)
input_a = layers.Input(input_shape)
input_n = layers.Input(input_shape)

model = tf.keras.applications.MobileNetV2(input_shape=input_shape,
                                          include_top=False,
                                          weights='imagenet')
model.trainable = True

encoded_p = model(input_p)
encoded_p = layers.GlobalAveragePooling2D()(encoded_p)
encoded_a = model(input_a)
encoded_a = layers.GlobalAveragePooling2D()(encoded_a)
encoded_n = model(input_n)
encoded_n = layers.GlobalAveragePooling2D()(encoded_n)

embedded_layers = tf.keras.layers.concatenate([encoded_a, encoded_p, encoded_n], axis=1)

model = Model(inputs=[input_p, input_n, input_a],outputs=embedded_layers)

In [None]:
model.summary()

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_2 (InputLayer)           [(None, 224, 224, 3  0           []                               
                                )]                                                                
                                                                                                  
 input_1 (InputLayer)           [(None, 224, 224, 3  0           []                               
                                )]                                                                
                                                                                                  
 input_3 (InputLayer)           [(None, 224, 224, 3  0           []                               
                                )]                                                            

In [None]:
# Hyper params
lr = tf.keras.optimizers.schedules.ExponentialDecay(
    initial_learning_rate=1e-5,
    decay_steps=1000,
    decay_rate=0.98)
momentum = 0.9
margin = 1

In [None]:
# Hyper parameters
evaluate_every = 20 
# download_every = 1000
batch_size = 32
n_iter = 400 # No. of training iterations

# used for one batching testing
N_way = 10 # how many classes for testing one-shot tasks. has to be less than num classes in dataset
n_val = 250 # how many one-shot tasks to validate on

# used for straight validation testing
val_batch_size = 16

best = -1
epochs = 10
steps_per_epoch = 100
emb_size = 1280

In [None]:
optimizer = RMSprop(learning_rate = lr, momentum=momentum)
model.compile(loss=fn_loss(margin=margin),optimizer=optimizer)

In [None]:
history = model.fit(
    data_generator(train_data, batch_size, (224, 224, 3)),
    steps_per_epoch=steps_per_epoch,
    validation_data = data_generator(val_data, val_batch_size, (224, 224, 3)),
    validation_steps = 10,
    epochs=epochs, verbose=True)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


In [None]:
model_path = './weights/'
model_name = "mobilenet_pretrained_triplet_2"

In [None]:
model.save(os.path.join(model_path, 'weights_{}.h5'.format(model_name)))

  layer_config = serialize_layer_fn(layer)
