In [None]:
%matplotlib notebook

import os
os.environ['CUDA_VISIBLE_DEVICES'] = '-1'

my_seed = 1508

import numpy as np
np.random.seed(my_seed)
import random
random.seed(my_seed)
import tensorflow as tf
tf.random.set_seed(my_seed)

from keras.preprocessing.image import load_img
from keras.preprocessing.image import img_to_array
from keras.layers import Dense, Flatten, Input, Dropout, Activation, Conv2D, MaxPool2D
from keras.layers.normalization import BatchNormalization
from keras.models import Sequential, Model, load_model
from keras.initializers import he_uniform, he_normal
from keras.applications.resnet50 import preprocess_input as preprocess_input_resnet
from keras.applications.resnet50 import ResNet50
from keras.optimizers import Adam, RMSprop

import matplotlib.pyplot as plt
from PIL import Image
from tqdm import tqdm
import pickle as pkl

# Import Triplet Data

In [None]:
# parse triplet file and return a list of lists of three image indices
def process_triplets(tuple_file_path):
    triplet_tuples = []
    with open(tuple_file_path) as fp:
        for line in fp:
            triplet_tuples.append([int(index.replace('\n','')) for index in line.split(' ')])
    triplet_tuples = np.array(triplet_tuples)
    return triplet_tuples


def train_val_split_images(triplets):
    unique_images = np.unique(triplets)
    train_image_ids = unique_images
    np.random.seed(11)
    permute_indices = np.random.permutation(len(train_image_ids))
    train_image_ids_perm = train_image_ids[permute_indices]
    perc_train = round(len(train_image_ids_perm) * 0.9)
    train_images = train_image_ids_perm[0: perc_train]
    val_images = train_image_ids_perm[perc_train:]
    
    return train_images, val_images


def train_val_split_triplets(triplets, train_images, val_images):
    train_triplet_ids = list()
    val_triplets_ids = list()
    for i, triplet in enumerate(triplets):
        if np.array([img in train_images for img in triplet]).all():
            train_triplet_ids.append(i)
        elif np.array([img in val_images for img in triplet]).all():
            val_triplets_ids.append(i)
            
    train_triplets = triplets[train_triplet_ids]
    val_triplets = triplets[val_triplets_ids]
    
    return train_triplets, val_triplets

In [None]:
train_triplets_full = process_triplets('train_triplets.txt')
print(train_triplets_full.shape)

output_triplets = process_triplets('test_triplets.txt')
print(output_triplets.shape)

train_idx, text_idx = train_val_split_images(train_triplets_full)
train_triplets, test_triplets = train_val_split_triplets(train_triplets_full, train_idx, text_idx)

print(train_triplets.shape, test_triplets.shape)

# Import Image Data

In [None]:
food_path = 'food'
img_H = 209
img_W = 300
img_C = 3

# read all images in memory, resize, preprocess, normalize
def food_proc(flip=False):
    food = np.zeros((len(os.listdir(food_path)), img_H, img_W, img_C))
    for index, img in tqdm(enumerate(os.listdir(food_path))):

        # load an image from file
        image = load_img(os.path.join(food_path, img), target_size=(img_H, img_W))
        # convert the image pixels to a numpy array
        image = img_to_array(image)
        # reshape data for the model
        image = image.reshape((1, image.shape[0], image.shape[1], image.shape[2]))
        # flip image left ro right (horizontally)
        if flip:
            tf.random.set_seed(my_seed)
            image = tf.image.flip_left_right(image)
        # prepare the image for the VGG model
        image = preprocess_input_resnet(image)

        food[index] = image

    print(food.shape)
    
    return food

In [None]:
# process food images
food = food_proc(flip=False)
#with open('resnet_food_'+str(img_H)+'_'+str(img_W)+'.pkl','wb') as f: pkl.dump(food, f)

In [None]:
# process flipped food images
#food_flip = food_proc(flip=True)
#with open('resnet_food_flip_'+str(img_H)+'_'+str(img_W)+'.pkl','wb') as f: pkl.dump(food_flip, f)

In [None]:
# # load and test food images
#with open('resnet_food_'+str(img_H)+'_'+str(img_W)+'.pkl','rb') as f: food = pkl.load(f)
#print(food.shape)

#plt.figure(figsize=(6, 2))
#plt.imshow((food[9999]).astype(np.int32))

In [None]:
# load and test flipped food images
#with open('resnet_food_flip_'+str(img_H)+'_'+str(img_W)+'.pkl','rb') as f: food_flip = pkl.load(f)
#print(food_flip.shape)

#plt.figure(figsize=(6, 2))
#plt.imshow((food_flip[9999]*255.).astype(np.int32))

# Pretrained Embedding Model

In [None]:
# use pretrained vgg model with the output layer replaced by a max-pooling layer
mdl = ResNet50(include_top=False, input_shape=(img_H, img_W, img_C))

pretrained_model = Model(inputs=mdl.inputs, outputs=mdl.layers[-1].output)

m_shape = pretrained_model.layers[-1].output_shape

# pretrained_model.summary()
print(m_shape)

In [None]:
# get embeddings from the pretrained model for the food images

def embed_food(food):
    print(food.shape)
    food_emb = np.zeros((food.shape[0], m_shape[1], m_shape[2], m_shape[3]))
    for i in tqdm(range(0, food.shape[0])):
        food_emb[i:i+1] = pretrained_model.predict(food[i:i+1])
    return food_emb

In [None]:
# embed food images
food_emb = embed_food(food)
#with open('resnet_food_emb_'+str(img_H)+'_'+str(img_W)+'.pkl','wb') as f: pkl.dump(food_emb, f)

In [None]:
# embed flip food images
#food_flip_emb = embed_food(food_flip)
#with open('resnet_food_flip_emb_'+str(img_H)+'_'+str(img_W)+'.pkl','wb') as f: pkl.dump(food_flip_emb, f)

In [None]:
# load and test food embeddings
#with open('resnet_food_emb_'+str(img_H)+'_'+str(img_W)+'.pkl','rb') as f: food_emb = pkl.load(f)
#print(food_emb.shape)

In [None]:
# load and test flipped food embeddings
#with open('resnet_food_flip_emb_'+str(img_H)+'_'+str(img_W)+'.pkl','rb') as f: food_flip_emb = pkl.load(f)
#print(food_flip_emb.shape)

# Functions

In [None]:
# returns a triplet of images
def get_triplet_images(triplet_indices):
    return [food_emb[triplet_indices[0]], food_emb[triplet_indices[1]], food_emb[triplet_indices[2]]]

def get_triplet_images_flip(triplet_indices):
    # TODO: fix the below since this function is actually used later on
    return [food_flip_emb[triplet_indices[0]], food_flip_emb[triplet_indices[1]], food_flip_emb[triplet_indices[2]]]

In [None]:
sample = get_triplet_images(train_triplets[0])
sample[1].shape

# Triplet Batches

In [None]:
def create_batch(batch_size, triplet_list):
    # corresponding zero arrays
    x_anchors = np.zeros((batch_size, m_shape[1], m_shape[2], m_shape[3]))
    x_positives = np.zeros((batch_size, m_shape[1], m_shape[2], m_shape[3]))
    x_negatives = np.zeros((batch_size, m_shape[1], m_shape[2], m_shape[3]))
    
    random.seed(my_seed)
    rand_list = random.sample(range(0, len(triplet_list)), batch_size)
    
    for i, random_index in enumerate(rand_list):

        triplet = get_triplet_images(triplet_list[random_index])
        
        x_anchors[i] = triplet[0]
        x_positives[i] = triplet[1]
        x_negatives[i] = triplet[2]
    
    return [x_anchors, x_positives, x_negatives]

In [None]:
sample = create_batch(1, train_triplets)
sample[0].shape

# Data Generator

In [None]:
class DataGenerator(tf.keras.utils.Sequence):
    def __init__(self, triplet_list, batch_size, shuffle=False, flip=False):
        self.triplet_list = triplet_list
        self.indices = np.arange(0, self.triplet_list.shape[0])
        self.batch_size = batch_size
        self.shuffle = shuffle
        self.flip = flip
        self.on_epoch_end()

    def __len__(self):
        if self.triplet_list.shape[0] % self.batch_size == 0:
            return int(self.triplet_list.shape[0]/self.batch_size)
        else:
            return int(self.triplet_list.shape[0]/self.batch_size+1)

    def __getitem__(self, index):
        batch = self.indices[index * self.batch_size : (index + 1) * self.batch_size]

        if self.flip == False:
            X, y = self.__get_data(batch)
        else:
            X, y = self.__get_flip_data(batch)
        return X, y

    def on_epoch_end(self):
        self.indices = np.arange(0, self.triplet_list.shape[0])
        if self.shuffle == True:
            np.random.shuffle(self.indices)

    def __get_data(self, btc):
        # corresponding zero arrays
        x_anchors = np.zeros((len(btc), m_shape[1], m_shape[2], m_shape[3]))
        x_positives = np.zeros((len(btc), m_shape[1], m_shape[2], m_shape[3]))
        x_negatives = np.zeros((len(btc), m_shape[1], m_shape[2], m_shape[3]))

        for i in range(0, len(btc)):
            
            triplet = get_triplet_images(self.triplet_list[btc[i]])

            x_anchors[i] = triplet[0]
            x_positives[i] = triplet[1]
            x_negatives[i] = triplet[2]
            
        X = [x_anchors, x_positives, x_negatives]
        y = np.zeros((len(btc), 3*emb_size))
        
        return X, y
    
        
    def __get_flip_data(self, btc):
        # corresponding zero arrays
        x_anchors = np.zeros((len(btc)*2, m_shape[1], m_shape[2], m_shape[3]))
        x_positives = np.zeros((len(btc)*2, m_shape[1], m_shape[2], m_shape[3]))
        x_negatives = np.zeros((len(btc)*2, m_shape[1], m_shape[2], m_shape[3]))

        for idx, i in enumerate(range(0, len(btc)*2, 2)):
            
            triplet = get_triplet_images(self.triplet_list[btc[idx]])

            x_anchors[i] = triplet[0]
            x_positives[i] = triplet[1]
            x_negatives[i] = triplet[2]
            
        for idx, i in enumerate(range(1, len(btc)*2, 2)):
            
            triplet = get_triplet_images_flip(self.triplet_list[btc[idx]])

            x_anchors[i] = triplet[0]
            x_positives[i] = triplet[1]
            x_negatives[i] = triplet[2]
            
        X = [x_anchors, x_positives, x_negatives]
        y = np.zeros((len(btc), 3*emb_size))

        return X, y

# Trainable Embedding Model

In [None]:
emb_size = 64 
drop_val = 0.8

def gen_emb_model(emb_seed=my_seed):
    embedding_model = Sequential()

    embedding_model.add(Input(shape=(m_shape[1], m_shape[2], m_shape[3],)))

    embedding_model.add(BatchNormalization())
    embedding_model.add(MaxPool2D(pool_size=(2,2),strides=(2,2)))
    embedding_model.add(Dropout(drop_val, seed=emb_seed))

    embedding_model.add(Flatten())

    embedding_model.add(Dense(emb_size, activation='sigmoid', kernel_initializer = he_normal(seed=emb_seed)))
    
    return embedding_model

In [None]:
# try embedding model
embedding_model = gen_emb_model()
embedding_model.summary()

example = np.expand_dims(get_triplet_images(train_triplets[0])[0], axis=0)
example_emb = embedding_model.predict(example)[0]

print(example_emb.shape)

# Siamese Network

In [None]:
# Imput layers for each of the three images

def get_siam_net(emb_seed=my_seed):

    embedding_model = gen_emb_model(emb_seed=emb_seed)
    
    input_anchor = Input(shape=(m_shape[1], m_shape[2], m_shape[3],))
    input_positive = Input(shape=(m_shape[1], m_shape[2], m_shape[3],))
    input_negative = Input(shape=(m_shape[1], m_shape[2], m_shape[3],))

    # get embeddings from the embedding model defined above
    embedding_anchor = embedding_model(input_anchor)
    embedding_positive = embedding_model(input_positive)
    embedding_negative = embedding_model(input_negative)

    # concatenate the embeddings ready for the triplet loss function
    output = tf.keras.layers.concatenate([embedding_anchor, embedding_positive, embedding_negative], axis=1)

    net = Model([input_anchor, input_positive, input_negative], output)
    
    return net


# Triplet Loss

In [None]:
alpha = 0.35

# cusom triplet loss function
def triplet_loss(y_true, y_pred):
    # slice the concatenated embeddings
    anchor, positive, negative = y_pred[:,:emb_size], y_pred[:,emb_size:2*emb_size], y_pred[:,2*emb_size:]
    # calculate the distances A-B, A-C
    positive_dist = tf.reduce_mean(tf.square(anchor - positive), axis=1)
    negative_dist = tf.reduce_mean(tf.square(anchor - negative), axis=1)
    
    # apply max and alpha
    return tf.maximum(positive_dist - negative_dist + alpha, 0.)

# Model Evaluation

In [None]:
def eval_output(output, model):
    if model != None:
        emb_size = int(model.layers[-1].output.shape[1] / 3)
    anchor, positive, negative = output[:,:emb_size], output[:,emb_size:2*emb_size], output[:,2*emb_size:]
    positive_dist = tf.reduce_mean(tf.square(anchor - positive), axis=1)
    negative_dist = tf.reduce_mean(tf.square(anchor - negative), axis=1)
    return (1 if positive_dist < (negative_dist) else 0)

In [None]:
def eval_accuracy_batch(net, triplets, items, model=None, tr_set='', quiet=False):
    count = 0
    
    outputs = net.predict(create_batch(items, triplets))
    
    for index in range(0, outputs.shape[0]):
        if eval_output(outputs[index:index+1], model) == 1:
            count += 1

    res = count/items * 100
    
    if quiet==False:
        print(tr_set+':',res)
    
    return res
    

def eval_accuracy(net, triplets, model=None, tr_set='', quiet=False):
    count = 0
    
    x_A = np.zeros((len(triplets), m_shape[1], m_shape[2], m_shape[3]))
    x_B = np.zeros((len(triplets), m_shape[1], m_shape[2], m_shape[3]))
    x_C = np.zeros((len(triplets), m_shape[1], m_shape[2], m_shape[3]))
    
    for index in range(0, len(triplets)):
        
        triplet = get_triplet_images(triplets[index])
        
        x_A[index] = triplet[0]
        x_B[index] = triplet[1]
        x_C[index] = triplet[2]
        
    outputs = net.predict([x_A, x_B, x_C])
    
    
    for index in range(0, outputs.shape[0]):
        if eval_output(outputs[index:index+1], model) == 1:
            count += 1
            
    res = count/len(triplets) * 100
    
    if quiet==False:
        print(tr_set+':', res)
    
    return res

# Model Training

In [None]:
batch_size = 128
epochs = 2


net = get_siam_net(emb_seed=123)

opt = RMSprop(learning_rate=0.001)
net.compile(loss=triplet_loss, optimizer=opt)

_ = net.fit(
    DataGenerator(train_triplets, batch_size=batch_size, shuffle=False, flip=False),
    epochs=epochs, verbose=True
)


eval_accuracy_batch(net, train_triplets, 100, model=net, tr_set=' Train Acc')
eval_accuracy(net, test_triplets, model=net, tr_set='  Test Acc')
eval_accuracy_batch(net, output_triplets, 100, model=net, tr_set='Output Acc')

# Generate Output

In [None]:
def gen_output(net, triplets, out_document, batch_size=128):
    
    dg = DataGenerator(triplets, batch_size=batch_size, shuffle=False, flip=False)
    
    output_list = []
    preds = []
    
    for index in tqdm(range(0, dg.__len__())):
        in_list = dg.__getitem__(index)
        pred = net.predict(in_list)
        
        for p in pred:
            preds.append(p)
        
    
    with open(out_document, 'w') as out_file:
        for index in tqdm(range(0, len(preds))):
            eval_pred = eval_output(preds[index].reshape((1, -1)), net)
            output_list.append(eval_pred)
            out_file.write(str(eval_pred)+'\n')
        
    return output_list, preds

In [None]:
output_list, preds = gen_output(net, output_triplets, 'results.txt', batch_size=64)

print(len(output_list))

count = 0
for i in output_list:
    if i == 1:
        count+=1
print(count)