In [117]:
import tensorflow as tf
import numpy

# INSTALL KERAS by running 'conda install -c conda-forge keras' in conda terminal.
# Keras Applications are deep learning models.
# These models can be used for prediction, feature extraction, and fine-tuning.
from tensorflow.keras.preprocessing import image          # For getting image features
from tensorflow.keras.applications.vgg16 import preprocess_input
from tensorflow.keras.applications.vgg16 import VGG16
from sklearn.model_selection import train_test_split
from zipfile import ZipFile

path = '/content/drive/MyDrive/TASK 4/'  # This is path to google drive folder
                                         # Change this if you dont run on google colab

In [129]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
# Unzipping folder with food images

with ZipFile(path+'food.zip', 'r') as zipObj:
   zipObj.extractall(path)

In [116]:
######### Load train triplets. Separate them randomly into 2 sets- training and validation set

train_triplets = '/content/drive/MyDrive/TASK 4/train_triplets.txt'
with open(train_triplets, 'r') as file:
  triplets = [x for x in file.readlines()]   # Matrix with triplets
train_set, validation_set = train_test_split(triplets, test_size=0.1, shuffle = True)  # Separate matrix of triplets into 2 matrices - train and validation set

In [118]:
########### Additional functions we need for creating dataset ############
# Function that reads image into tensorflow dataset
# Code from https://github.com/taki0112/Tensorflow-DatasetAPI
# Needs to be a function because of the lambda layer in the further code

def train_image_processing(img):
    img = tf.image.decode_jpeg(img, channels=3)  # 3 channels for RGB
    img = tf.cast(img, tf.float32)/ 127.5 - 1
    img = tf.image.resize(img, (224, 224))
    # Flipping the train images to ensure that the algorithm 
    # isn't only trained to recognize images and their mirrors.
    img = tf.image.random_flip_left_right(img)
    img = tf.image.random_flip_up_down(img)
    return img

def test_image_processing(img):  # Same function for test data just without flipping the image
    img = tf.image.decode_jpeg(img, channels=3)
    img = tf.cast(img, tf.float32)/ 127.5 - 1
    img = tf.image.resize(img, (224, 224))
    return img

In [119]:
# Loads image with ids corresponding to the numbers in a triplet
# Needs to be a function because of the lambda layer in the further code

def load_triplets(triplet):
    split_triplet = tf.strings.split(triplet)
    anchor = train_image_processing(tf.io.read_file(path+'food/' + split_triplet[0] + '.jpg'))
    positive = train_image_processing(tf.io.read_file(path+'food/' + split_triplet[1] + '.jpg'))
    negative = train_image_processing(tf.io.read_file(path+'food/' + split_triplet[2] + '.jpg'))
    ret_val = tf.stack([anchor, positive, negative], axis=0)
    return ret_val, 1

def load_test_triplets(triplet):
    split_triplet = tf.strings.split(triplet)
    anchor = test_image_processing(tf.io.read_file(path+'food/' + split_triplet[0] + '.jpg'))
    positive = test_image_processing(tf.io.read_file(path+'food/' + split_triplet[1] + '.jpg'))
    negative = test_image_processing(tf.io.read_file(path+'food/' + split_triplet[2] + '.jpg'))
    ret_val = tf.stack([anchor, positive, negative], axis=0)
    return ret_val

In [144]:
# Change type of data to datasets, because the model works with this type

# Train dataset
train_dataset = tf.data.Dataset.from_tensor_slices(train_set)
train_dataset = train_dataset.map(lambda x: load_triplets(x),num_parallel_calls=tf.data.experimental.AUTOTUNE)

# Validation dataset
validation_dataset = tf.data.Dataset.from_tensor_slices(validation_set)
validation_dataset = validation_dataset.map(lambda x: load_triplets(x),num_parallel_calls=tf.data.experimental.AUTOTUNE)

# Test dataset
test_dataset = tf.data.TextLineDataset(path+'test_triplets.txt')
test_dataset = test_dataset.map(lambda x: load_test_triplets(x), num_parallel_calls=tf.data.experimental.AUTOTUNE)
test_dataset = test_dataset.batch(256).prefetch(2)

In [122]:
# Since this is too big for the memory, allocate a buffer;
# repeat() will re-initialize the dataset;
# batch() will take first batch_size entries and make a batch out of them.

batch_size=32

train_dataset = train_dataset.shuffle(buffer_size = batch_size**2).repeat().batch(batch_size)  
validation_dataset = validation_dataset.batch(batch_size)

In [124]:
############ Create the base model  ############
# Code more or less from https://keras.io/api/applications/ and https://keras.io/guides/transfer_learning/

inputs = tf.keras.Input(shape=(3, 224, 224, 3))   # Define inputs

# Go tru all the images. Use a pre-trained modelin Keras, e.g, VGG16,19 or ResNet.:
base_model = tf.keras.applications.DenseNet121(include_top=False, input_shape=(224, 224, 3))
#base_model = VGG16(weights='imagenet', include_top=False)
#base_model = tf.keras.applications.resnet50.ResNet50(weights='imagenet')

base_model.trainable = False         # Freeze all layers in the base model so hyperparameters are the pretraiend ones

# Adding layers to neural network
embeddings = tf.keras.Sequential()
embeddings.add(tf.keras.layers.GlobalAveragePooling2D())
embeddings.add(tf.keras.layers.Dense(128, activation='linear'))
embeddings.add(tf.keras.layers.Lambda(lambda x: tf.math.l2_normalize(x, axis=1)))

# Create outputs
anchor, positive, negative = inputs[:, 0, ...], inputs[:, 1, ...], inputs[:, 2, ...]
anchor_outputs = embeddings(base_model(anchor))
positive_outputs = embeddings(base_model(positive))
negative_outputs = embeddings(base_model(negative))
stacked_outputs = tf.stack([anchor_outputs, positive_outputs, negative_outputs], axis=-1)

In [125]:
############### Create and compile siamese model

# Create custom loss function
# Triplet loss: L(A, P, N) = max(‖f(A) - f(P)‖² - ‖f(A) - f(N)‖² + margin, 0)
def triplet_loss(_, triplet):
    anchor, positive, negative = triplet[..., 0], triplet[..., 1], triplet[..., 2]
    ret_val = tf.reduce_mean(tf.math.softplus(tf.reduce_sum(tf.square(anchor-positive), 1) - tf.reduce_sum(tf.square(anchor-negative), 1)))
    return ret_val

# Create custom accuracy function
def accuracy(_, triplet):
    anchor, positive, negative = triplet[..., 0], triplet[..., 1], triplet[..., 2]
    ret_val = tf.reduce_mean(tf.cast(tf.greater_equal(tf.reduce_sum(tf.square(anchor-negative), 1), tf.reduce_sum(tf.square(anchor-positive), 1)), tf.float32))
    return ret_val

siamese_model = tf.keras.Model(inputs=inputs, outputs=stacked_outputs)
opt = tf.keras.optimizers.Adam(learning_rate=0.001)
siamese_model.compile(optimizer=opt, loss=triplet_loss, metrics=[accuracy])


In [126]:
########### Fit the model
# SKIP THIS IF YOU ALREADY HAVE IMAGE FEATURES BCS RUNNING TAKES A LOT OF TIME

#calculated by 0.9*number_of_triplet/32 rounded up (bc batch size = 32)
siamese_model.fit(train_dataset, steps_per_epoch = 1674, epochs = 1, validation_data = validation_dataset, validation_steps = 1)

# Save weights
# out_path = 'trained_model.h5'
# siamese_model.save_weights(out_path)



<tensorflow.python.keras.callbacks.History at 0x7f446f227a50>

In [None]:
# Load weights if siamese model was already trained
# out_path = 'trained_model.h5'
# siamese_model.load_weights(out_path)

In [145]:
######### Predicting on test data

# Getting output types
anchor, positive, negative = siamese_model.output[..., 0], siamese_model.output[..., 1], siamese_model.output[..., 2]
pos_dist = tf.reduce_sum(tf.square(anchor - positive), 1)
neg_dist = tf.reduce_sum(tf.square(anchor - negative), 1)

# Creating predict model
compared_outputs = tf.cast(tf.greater_equal(neg_dist, pos_dist), tf.int8)   # Cast to int so its 0 or 1
predict_model = tf.keras.Model(inputs = siamese_model.inputs, outputs = compared_outputs)

num_test_samples = 59544
predictions = predict_model.predict(test_dataset, steps=int(numpy.ceil(num_test_samples / 256)), verbose = 1)



In [147]:
# Save the result to text file

numpy.savetxt(path+'output10.txt', predictions, fmt='%i')