# Imports

In [4]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [1]:
!pip install pyyaml h5py
import tensorflow as tf
import numpy as np
from sklearn.model_selection import train_test_split

import matplotlib.pyplot as plt
import numpy as np
import os
import random
from pathlib import Path
from tensorflow.keras import applications
from tensorflow.keras import layers
from tensorflow.keras import losses
from tensorflow.keras import optimizers
from tensorflow.keras import metrics
from tensorflow.keras import Model
from tensorflow.keras.applications import resnet
from tensorflow.data import Dataset
from zipfile import ZipFile

IMG_WIDTH = 200
IMG_HEIGHT = 200

target_shape = (IMG_HEIGHT,IMG_WIDTH)



# Helper Functions

### Data Read in

In [2]:
def unzip():
  file_name = '/content/drive/MyDrive/Colab Notebooks/task3/food.zip'
  with ZipFile(file_name, 'r') as zip:
    zip.extractall()
  print('Done')


In [3]:
def make_training_labels():
    """
    This function reads in all triplets from the file and stores them in a list.  
    It splits them into train and validation samples and store them into train_triplets and val_triplets respectively.
    In the end it returns length of train_samples
    """
    samples = '/content/drive/MyDrive/Colab Notebooks/task3/train_triplets.txt'
    with open(samples, 'r') as file:
        triplets = [line for line in file.readlines()]
    train_samples, val_samples = train_test_split(triplets, test_size=0.2)
    with open('val_samples.txt', 'w') as file:
        for item in val_samples:
            file.write(item)
    with open('train_samples.txt', 'w') as file:
        for item in train_samples:
            file.write(item)
    return len(train_samples)


### Data preprocessing

In [4]:
def make_dataset(dataset_filename, training=True):
    """
    This function makes a dataset from the inputfile, I.e it uses the load_tripplets function, but tbh idk what it does
    """
    dataset = tf.data.TextLineDataset(dataset_filename)
    dataset = dataset.map(lambda x: load_triplets(x,training),num_parallel_calls=-1) 
    return dataset


def load_triplets(triplet, training):
    """
    Function returns a Stack of the three preprocessed image from the triplet
    """
    line = tf.strings.split(triplet)
    anchor = 'food/' + line[0] + '.jpg'
    positive = 'food/' + line[1] + '.jpg'
    negative = 'food/' + line[2] + '.jpg'
    return (
        preprocess_image(anchor,training),
        preprocess_image(positive,training),
        preprocess_image(negative,training),
    ),


def preprocess_image(img, training):
    """
    converts image into 32-bit Float, nomrmalizes it and resizes it, and during training it randomly flips the picture
    """
    img = tf.io.read_file(img)
    img = tf.image.decode_jpeg(img, channels=3)
    img = tf.cast(img, tf.float32)
    #img = img / 127.5 - 1 #normalize image
    img = tf.image.resize(img, (IMG_HEIGHT, IMG_WIDTH))
    if training:
        img = tf.image.random_flip_left_right(img)
        img = tf.image.random_flip_up_down(img)
    return img

 


 


# Creating the neural net

### Embedding generator model

In [5]:
base_cnn = resnet.ResNet50(weights="imagenet", input_shape=(IMG_HEIGHT,IMG_WIDTH) + (3,), include_top=False
)
flatten = layers.Flatten()(base_cnn.output)
dense1 = layers.Dense(512, activation="relu")(flatten)
dense1 = layers.BatchNormalization()(dense1)
dense2 = layers.Dense(256, activation="relu")(dense1)
dense2 = layers.BatchNormalization()(dense2)
output = layers.Dense(256)(dense2)

embedding = Model(base_cnn.input, output, name="Embedding")

embedding.summary()

#This codeblock makes such that the pretained weights from imagenet doesn't get changed which safes a lot of complexity
trainable = False
for layer in base_cnn.layers:
    if layer.name == "conv5_block1_out":
        trainable = True
    layer.trainable = trainable

Model: "Embedding"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_1 (InputLayer)           [(None, 200, 200, 3  0           []                               
                                )]                                                                
                                                                                                  
 conv1_pad (ZeroPadding2D)      (None, 206, 206, 3)  0           ['input_1[0][0]']                
                                                                                                  
 conv1_conv (Conv2D)            (None, 100, 100, 64  9472        ['conv1_pad[0][0]']              
                                )                                                                 
                                                                                          

### Embedding using EfficientNetB3

In [10]:
# #currently useless code
# base_cnn = f.keras.applications.EfficientNetB3(
#         include_top=False, input_shape=(IMG_HEIGHT, IMG_WIDTH, 3))
# base_cnn.trainable = False
# flatten = layers.Flatten()(base_cnn.output)
# dense1 = layers.Dense(512, activation="relu")(flatten)
# dense1 = layers.BatchNormalization()(dense1)
# dense2 = layers.Dense(256, activation="relu")(dense1)
# dense2 = layers.BatchNormalization()(dense2)
# output = layers.Dense(256)(dense2)

# embedding = Model(base_cnn.input, output, name="Embedding")

# embedding.summary()

# #This codeblock makes such that the pretained weights from imagenet doesn't get changed which safes a lot of complexity
# trainable = False
# for layer in base_cnn.layers:
#     if layer.name == "conv5_block1_out":
#         trainable = True
#     layer.trainable = trainable


# Distance Layer


In [6]:
class DistanceLayer(layers.Layer):
    """
    This layer is responsible for computing the distance between the anchor
    embedding and the positive embedding, and the anchor embedding and the
    negative embedding.
    """

    def __init__(self, **kwargs):
        super().__init__(**kwargs)

    def call(self, anchor, positive, negative):
        ap_distance = tf.reduce_sum(tf.square(anchor - positive), -1)
        an_distance = tf.reduce_sum(tf.square(anchor - negative), -1)
        return (ap_distance, an_distance)


anchor_input = layers.Input(name="anchor", shape=target_shape + (3,))
positive_input = layers.Input(name="positive", shape=target_shape + (3,))
negative_input = layers.Input(name="negative", shape=target_shape + (3,))

distances = DistanceLayer()(
    embedding(resnet.preprocess_input(anchor_input)),
    embedding(resnet.preprocess_input(positive_input)),
    embedding(resnet.preprocess_input(negative_input)),
)

siamese_network = Model(
    inputs=[anchor_input, positive_input, negative_input], outputs=distances
) 
siamese_network.summary()


Model: "model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 anchor (InputLayer)            [(None, 200, 200, 3  0           []                               
                                )]                                                                
                                                                                                  
 positive (InputLayer)          [(None, 200, 200, 3  0           []                               
                                )]                                                                
                                                                                                  
 negative (InputLayer)          [(None, 200, 200, 3  0           []                               
                                )]                                                            

# Siamese Model

In [7]:
class SiameseModel(Model):
    """The Siamese Network model with a custom training and testing loops.

    Computes the triplet loss using the three embeddings produced by the
    Siamese Network.

    The triplet loss is defined as:
       L(A, P, N) = max(‖f(A) - f(P)‖² - ‖f(A) - f(N)‖² + margin, 0)
    """

    def __init__(self, siamese_network, margin=0.5):
        super(SiameseModel, self).__init__()
        self.siamese_network = siamese_network
        self.margin = margin
        self.loss_tracker = metrics.Mean(name="loss")
        self.accuracy_tracker = metrics.Mean(name="accuracy")

    def call(self, inputs):
        return self.siamese_network(inputs)

    def train_step(self, data):
        # GradientTape is a context manager that records every operation that
        # you do inside. We are using it here to compute the loss so we can get
        # the gradients and apply them using the optimizer specified in
        # `compile()`. #Override
        with tf.GradientTape() as tape:
            loss = self._compute_loss(data)

        # Storing the gradients of the loss function with respect to the
        # weights/parameters.
        gradients = tape.gradient(loss, self.siamese_network.trainable_weights)

        # Applying the gradients on the model using the specified optimizer
        self.optimizer.apply_gradients(
            zip(gradients, self.siamese_network.trainable_weights)
        )

        # Let's update and return the training loss and accuracy metric.
        self.loss_tracker.update_state(loss)
        accuracy = self.compute_accuracy(data)
        self.accuracy_tracker.update_state(accuracy)

        return {"loss": self.loss_tracker.result(),"accuracy":self.accuracy_tracker.result()}

    def test_step(self, data):
        loss = self._compute_loss(data)
        accuracy = self.compute_accuracy(data)

        # Let's update and return the loss and accuracy metric.
        self.loss_tracker.update_state(loss)
        self.accuracy_tracker.update_state(accuracy)
        return {"loss": self.loss_tracker.result(),"accuracy":self.accuracy_tracker.result()}

    def _compute_loss(self, data):
        # The output of the network is a tuple containing the distances
        # between the anchor and the positive example, and the anchor and
        # the negative example.
        ap_distance, an_distance = self.siamese_network(data)

        # Computing the Triplet Loss by subtracting both distances and
        # making sure we don't get a negative value.
        loss = ap_distance - an_distance
        loss = tf.maximum(loss + self.margin, 0.0)
        return loss
    
    def compute_accuracy(self,data):
      ap_distance, an_distance = self.siamese_network(data)
      return tf.reduce_mean(tf.cast(tf.greater_equal(an_distance, ap_distance), tf.float32))


    @property
    def metrics(self):
        # We need to list our metrics here so the `reset_states()` can be
        # called automatically.
        return [self.loss_tracker,self.accuracy_tracker]


# Training


### Checkpointing




In [13]:
checkpoint_path = r'C:\Users\yarek\OneDrive - ETH Zurich\introduction_to_machine_learning\Task3\checkpoints\cp.ckpt'
checkpoint_dir = os.path.dirname(checkpoint_path)
cp_callback = tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_path,
                                                 save_weights_only=True,
                                                 verbose=1)
siamese_model = SiameseModel(siamese_network)

### Training step

In [15]:
unzip()
siamese_model.load_weights(checkpoint_path)
num_train_samples = make_training_labels()
train_batch_size = 32
train_dataset = make_dataset('train_samples.txt')
val_dataset = make_dataset('val_samples.txt')
train_dataset = train_dataset.shuffle(
    1024, reshuffle_each_iteration=True).repeat().batch(train_batch_size).prefetch(8)
    #might want to remove repeat
val_dataset = val_dataset.batch(train_batch_size).prefetch(8)
#just some debugging info
siamese_model.compile(optimizer=optimizers.Adam(0.0001))
siamese_model.fit(train_dataset, epochs=5, validation_data=val_dataset,
                steps_per_epoch=int(np.ceil(num_train_samples / train_batch_size)),callbacks=[cp_callback])
#then one needs this steps_per_epoch=int(np.ceil(num_train_samples / train_batch_size)



Done
Epoch 1/5
   1/1488 [..............................] - ETA: 22:55:12 - loss: 0.1902 - accuracy: 0.9688

KeyboardInterrupt: ignored

# Prediction

In [14]:
def create_eval_model(model):
    print(model.input_shape)
    print(model.output)
    #tf.print(model.output)
    distance_positive, distance_negative = model.output
    predictions = tf.cast(tf.greater_equal(distance_negative, distance_positive), tf.int8)
    return tf.keras.Model(inputs=model.inputs, outputs=predictions)

In [15]:
siamese_model.load_weights(checkpoint_path)
inference_batch_size = 256
num_test_samples = 59544
test_dataset = make_dataset('test_triplets.txt', training=False).batch(
    inference_batch_size).prefetch(2)
#siamese_model.summary()

inference_model = create_eval_model(siamese_network)
inference_model.summary()
predictions = inference_model.predict(test_dataset, steps=int(
                np.ceil(num_test_samples / inference_batch_size)), verbose=1)
np.savetxt('predictions.txt', predictions, fmt='%i')
print("=== finished ===")

[(None, 200, 200, 3), (None, 200, 200, 3), (None, 200, 200, 3)]
(<KerasTensor: shape=(None,) dtype=float32 (created by layer 'distance_layer')>, <KerasTensor: shape=(None,) dtype=float32 (created by layer 'distance_layer')>)
Model: "model_1"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 anchor (InputLayer)            [(None, 200, 200, 3  0           []                               
                                )]                                                                
                                                                                                  
 positive (InputLayer)          [(None, 200, 200, 3  0           []                               
                                )]                                                                
                                                                 

KeyboardInterrupt: 

In [None]:
print(predictions)