In [0]:
from google.colab import drive
drive.mount('/content/gdrive')

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


In [0]:
#!unzip "gdrive/My Drive/Colab Notebooks/Patrones/denoised_testing.zip"
#!unzip "gdrive/My Drive/Colab Notebooks/Patrones/denoised_training.zip"
#!unzip "gdrive/My Drive/Colab Notebooks/Patrones/denoised_val.zip" # 10% of training

In [0]:
import numpy as np
import os
from keras.utils import Sequence, multi_gpu_model
from keras.layers import Input, Dense, LeakyReLU, Concatenate, Lambda, BatchNormalization, GlobalAveragePooling2D
from keras.applications.xception import Xception, preprocess_input
from keras.applications.vgg19 import VGG19
from keras import backend as K
from keras.models import Model, load_model
from keras.preprocessing import image
from keras.optimizers import Adam
from keras.callbacks import ModelCheckpoint

import matplotlib.pyplot as plt
%matplotlib inline

In [0]:
def triplet_loss(y_true, y_pred, cosine = True, alpha = 0.2):
    embedding_size = K.int_shape(y_pred)[-1] // 3
    ind = int(embedding_size * 2)
    a_pred = y_pred[:, :embedding_size]
    p_pred = y_pred[:, embedding_size:ind]
    n_pred = y_pred[:, ind:]
    if cosine:
        positive_distance = 1 - K.sum((a_pred * p_pred), axis=-1)
        negative_distance = 1 - K.sum((a_pred * n_pred), axis=-1)
    else:
        positive_distance = K.sqrt(K.sum(K.square(a_pred - p_pred), axis=-1))
        negative_distance = K.sqrt(K.sum(K.square(a_pred - n_pred), axis=-1))
    loss = K.maximum(0.0, positive_distance - negative_distance + alpha)
    return loss
  
class TripletImageLoader(Sequence):
    def __init__(self, path, img_shape, batchSize = 16, flip=False, examples=2):
        self.path = path
        self.batchSize = batchSize
        self.images = self.load_dataset(path)
        self.N = len(self.images[0])
        self.shape = img_shape
        self.flip = flip
        self.examples = examples
        
    def load_dataset(self, path):
        young = []
        old = []
        
        images = sorted(os.listdir(path))
        images_total = len(images)
        
        for i in range(images_total//2):
          img_y = path + '/{}'.format(images[2*i])
          img_o = path + '/{}'.format(images[2*i+1])
          young.append(img_y)
          old.append(img_o)
          
        young = np.asarray(young)
        old = np.asarray(old)
        return [young, old]
      
    def load_image(self, file):
      img = image.load_img(file)
      img = image.img_to_array(img)
      if self.flip:
          if np.random.randint(0, 2): # do flippings in 50% of the time
              img = img[:, ::-1, :]
      return img

    #gets the number of batches this generator returns
    def __len__(self):
        l,rem = divmod(self.N, self.batchSize)
        return (l + (1 if rem > 0 else 0))
    
    #shuffles data on epoch end
    def on_epoch_end(self):
        a = np.arange(len(self.images[0]))
        np.random.shuffle(a)
        self.images[0]= self.images[0][a] 
        self.images[1]= self.images[1][a] 
        
    #gets a batch with index = i
    def __getitem__(self, i):
        n = self.examples
        start = i*self.batchSize
        stop  = np.min([(i+1)*self.batchSize, self.N]) # clip stop index to be <= N
        # Memory preallocation
        size = stop-start
        ANCHOR = np.zeros( (n*size,) + self.shape + (3,) )
        POSITIVE = np.zeros( (n*size,) + self.shape + (3,) )
        NEGATIVE = np.zeros( (n*size,) + self.shape + (3,) )
        anchor_images = self.images[0][start:stop], self.images[1][start:stop]
        pos_images = []
        neg_images = []
        for k in range(size):
          for j in range(n):
            try:
              ANCHOR[n*k + j] = self.load_image(anchor_images[0][k])
              POSITIVE[n*k + j] = self.load_image(anchor_images[1][k])
              NEGATIVE[n*k + j] = self.load_image(anchor_images[1][(k+j+1) % size]) 
            except:
              pass


        return [ANCHOR, POSITIVE, NEGATIVE], np.empty(n*size) # we don't need labels so we reutrn dummy label

In [0]:
BATCH_SIZE = 8
FACE_DEFAULT_SHAPE = (128, 128)

In [0]:
# Create base model (convolution features extractor)
xception = Xception(include_top=False, weights=None, input_shape = FACE_DEFAULT_SHAPE + (3,))
output = GlobalAveragePooling2D()(xception.output)
base_model = Model(xception.input, output)

def embedder(conv_feat_size):
    '''
    Takes the output of the conv feature extractor and yields the embeddings
    '''
    input = Input((conv_feat_size,), name = 'input')
    normalize = Lambda(lambda x: K.l2_normalize(x, axis=-1), name='normalize')
    x = Dense(512)(input)
    x = LeakyReLU(alpha=0.1)(x)
    x = Dense(128)(x)
    x = normalize(x)
    model = Model(input, x)
    return model
    
def get_siamese_model(base_model):
    
    inp_shape = K.int_shape(base_model.input)[1:]
    conv_feat_size = K.int_shape(base_model.output)[-1]
    
    input_a = Input( inp_shape,  name='anchor')
    input_p = Input( inp_shape,  name='positive')
    input_n = Input( inp_shape,  name='negative')
    emb_model = embedder(conv_feat_size)
    output_a = emb_model(base_model(input_a))
    output_p = emb_model(base_model(input_p))
    output_n = emb_model(base_model(input_n))
    
    merged_vector = Concatenate(axis=-1)([output_a, output_p, output_n])
    model = Model(inputs=[input_a, input_p, input_n],
                  outputs=merged_vector)

    return model

model = get_siamese_model(base_model)
# model.load_weights('siamese_xception.h5')
if PARALLEL:
    parallel_model = multi_gpu_model(model, 2)

W0617 16:48:47.934749 140075677566848 deprecation_wrapper.py:119] From /usr/local/lib/python3.6/dist-packages/keras/backend/tensorflow_backend.py:74: The name tf.get_default_graph is deprecated. Please use tf.compat.v1.get_default_graph instead.

W0617 16:48:47.953429 140075677566848 deprecation_wrapper.py:119] From /usr/local/lib/python3.6/dist-packages/keras/backend/tensorflow_backend.py:517: The name tf.placeholder is deprecated. Please use tf.compat.v1.placeholder instead.

W0617 16:48:47.956917 140075677566848 deprecation_wrapper.py:119] From /usr/local/lib/python3.6/dist-packages/keras/backend/tensorflow_backend.py:4138: The name tf.random_uniform is deprecated. Please use tf.random.uniform instead.

W0617 16:48:47.978048 140075677566848 deprecation_wrapper.py:119] From /usr/local/lib/python3.6/dist-packages/keras/backend/tensorflow_backend.py:174: The name tf.get_default_session is deprecated. Please use tf.compat.v1.get_default_session instead.

W0617 16:48:47.979080 1400756775

In [0]:
train_gen = TripletImageLoader('denoised_training', FACE_DEFAULT_SHAPE, batchSize = BATCH_SIZE)
valid_gen = TripletImageLoader('denoised_val', FACE_DEFAULT_SHAPE, batchSize = BATCH_SIZE)

model.compile(Adam(lr = 0.0001), loss = triplet_loss)

checkpoint = ModelCheckpoint('siamese_xception.h5', monitor='val_loss', 
                             verbose=1, save_best_only=True, save_weights_only=True)


model.fit_generator(train_gen, steps_per_epoch=len(train_gen), 
                    epochs=5, validation_data=valid_gen, validation_steps=len(valid_gen), callbacks=[checkpoint])
# Load best model
model.load_weights('siamese_xception.h5')

W0617 16:49:00.848264 140075677566848 deprecation_wrapper.py:119] From /usr/local/lib/python3.6/dist-packages/keras/optimizers.py:790: The name tf.train.Optimizer is deprecated. Please use tf.compat.v1.train.Optimizer instead.

W0617 16:49:01.011344 140075677566848 deprecation.py:323] From /usr/local/lib/python3.6/dist-packages/tensorflow/python/ops/math_grad.py:1250: add_dispatch_support.<locals>.wrapper (from tensorflow.python.ops.array_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use tf.where in 2.0, which has the same broadcast rule as np.where


Epoch 1/5

Epoch 00001: val_loss improved from inf to 0.18211, saving model to siamese_xception.h5
Epoch 2/5

Epoch 00002: val_loss improved from 0.18211 to 0.13748, saving model to siamese_xception.h5
Epoch 3/5

Epoch 00003: val_loss improved from 0.13748 to 0.13580, saving model to siamese_xception.h5
Epoch 4/5

Epoch 00004: val_loss did not improve from 0.13580
Epoch 5/5

Epoch 00005: val_loss improved from 0.13580 to 0.12717, saving model to siamese_xception.h5


In [0]:
model.load_weights('siamese_xception.h5')

inp = model.input[0]
base_model = model.layers[3]
emb_model = model.layers[4]

In [0]:
infer_model = Model(inp, emb_model(base_model(inp)))

In [0]:
def get_embeddings(model, path):
  images = sorted(os.listdir(path))
  young = []
  old = []
  for i in range(len(images)//2):
    img = image.load_img(os.path.join(path, images[2*i]))
    img = image.img_to_array(img)
    young_emb = model.predict(img[None])[0]
    young.append(young_emb)
    img = image.load_img(os.path.join(path, images[2*i+1]))
    img = image.img_to_array(img)
    old_emb = model.predict(img[None])[0]
    old.append(old_emb)
  return np.asarray(young), np.asarray(old)

In [0]:
train_y, train_o = get_embeddings(infer_model, 'denoised_training')
np.save("gdrive/My Drive/Colab Notebooks/Patrones/train_y.npy", train_y)
np.save("gdrive/My Drive/Colab Notebooks/Patrones/train_o.npy", train_o)

val_y, val_o = get_embeddings(infer_model, 'denoised_val')
np.save("gdrive/My Drive/Colab Notebooks/Patrones/val_y.npy", val_y)
np.save("gdrive/My Drive/Colab Notebooks/Patrones/val_o.npy", val_o)

test_y, test_o = get_embeddings(infer_model, 'denoised_testing')
np.save("gdrive/My Drive/Colab Notebooks/Patrones/test_y.npy", test_y)
np.save("gdrive/My Drive/Colab Notebooks/Patrones/test_o.npy", test_o)