In [None]:
import os
import tensorflow as tf
from keras import layers
from keras import metrics
from keras import Model
from keras.applications import efficientnet_v2
from tensorflow.python.data.ops.dataset_ops import TensorSliceDataset
from PIL import Image
import tensorflow_hub as hub

BATCH_SIZE = 16

result = open('../sample.txt', 'r')
train_triplet = open('../train_triplets.txt', 'r')
test_triplet = open('../test_triplets.txt', 'r')
result_lines = result.readlines()
train_triplet_lines = train_triplet.readlines()
test_triplet_lines = test_triplet.readlines()
result.close()
train_triplet.close()
test_triplet.close()
images_dir = '../food_240/'


target_shape = (224, 224)


print('Num GPUs Available: ', len(tf.config.list_physical_devices('GPU')))


class DistanceLayer(layers.Layer):
	"""
	This layer is responsible for computing the distance between the anchor
	embedding and the positive embedding, and the anchor embedding and the
	negative embedding.
	"""
	def __init__(self, **kwargs):
		super().__init__(**kwargs)

	def call(self, anchor, positive, negative):
		ap_distance = tf.reduce_sum(tf.square(anchor - positive), -1)
		an_distance = tf.reduce_sum(tf.square(anchor - negative), -1)
		return (ap_distance, an_distance)


class SiameseModel(Model):
	"""
	The Siamese Network model with a custom training and testing loops.
	Computes the triplet loss using the three embeddings produced by the
	Siamese Network.

	The triplet loss is defined as:
		L(A, P, N) = max(‖f(A) - f(P)‖² - ‖f(A) - f(N)‖² + margin, 0)
	"""
	def __init__(self, siamese_network, margin=0.5):
		super(SiameseModel, self).__init__()
		self.siamese_network = siamese_network
		self.margin = margin
		self.loss_tracker = metrics.Mean(name='loss')

	def call(self, inputs):
		return self.siamese_network(inputs)

	def train_step(self, data):
		# GradientTape is a context manager that records every operation that
		# you do inside. We are using it here to compute the loss so we can get
		# the gradients and apply them using the optimizer specified in
		# `compile()`.
		with tf.GradientTape() as tape:
			loss = self._compute_loss(data)
		# Storing the gradients of the loss function with respect to the
		# weights/parameters.
		gradients = tape.gradient(loss, self.siamese_network.trainable_weights)
		# Applying the gradients on the model using the specified optimizer
		self.optimizer.apply_gradients(
			zip(gradients, self.siamese_network.trainable_weights)
		)
		# Let's update and return the training loss metric.
		self.loss_tracker.update_state(loss)
		return {'loss': self.loss_tracker.result()}

	def test_step(self, data):
		loss = self._compute_loss(data)
		# Let's update and return the loss metric.
		self.loss_tracker.update_state(loss)
		return {'loss': self.loss_tracker.result()}

	def _compute_loss(self, data):
		# The output of the network is a tuple containing the distances
		# between the anchor and the positive example, and the anchor and
		# the negative example.
		ap_distance, an_distance = self.siamese_network(data)
		# Computing the Triplet Loss by subtracting both distances and
		# making sure we don't get a negative value.
		loss = ap_distance - an_distance
		loss = tf.maximum(loss + self.margin, 0.0)
		return loss

	@property
	def metrics(self):
		# We need to list our metrics here so the `reset_states()` can be
		# called automatically.
		return [self.loss_tracker]


def resize_images():
	for filename in os.listdir('../food/'):
		try:
			img = Image.open('../food/' + filename)
			img = img.resize(target_shape)
			img.save('../food_224/' + filename)
		except Exception:
			pass
		

def preprocess_image(filename):
	image_string = tf.io.read_file(filename)
	image = tf.image.decode_jpeg(image_string, channels=3)
	image = tf.image.convert_image_dtype(image, tf.float32)
	image = tf.image.resize(image, target_shape)
	return image


def preprocess_triplets(anchor, positive, negative):
	return (
		preprocess_image(anchor),
		preprocess_image(positive),
		preprocess_image(negative)
	)


def preprocess_triplets_test(anchor, positive, negative):
	return (
		preprocess_image(anchor),
		preprocess_image(positive),
		preprocess_image(negative)
	), 1


def get_datasets() -> tuple[int, TensorSliceDataset, TensorSliceDataset, TensorSliceDataset]:
	anchor_images = []
	positive_images = []
	negative_images = []
	for index in range(len(train_triplet_lines)):
		temp = train_triplet_lines[index].strip().split(' ')
		anchor_images.append(images_dir + temp[0] + '.jpg')
		if train_triplet_lines[index].strip() == '1':
			positive_images.append(images_dir + temp[1] + '.jpg')
			negative_images.append(images_dir + temp[2] + '.jpg')
		else:
			negative_images.append(images_dir + temp[2] + '.jpg')
			positive_images.append(images_dir + temp[1] + '.jpg')
	return (
		tf.data.Dataset.from_tensor_slices(anchor_images),
		tf.data.Dataset.from_tensor_slices(positive_images),
		tf.data.Dataset.from_tensor_slices(negative_images),
	)


def get_train_val():
	anchor_dataset, positive_dataset, negative_dataset = get_datasets()

	dataset = tf.data.Dataset.zip((anchor_dataset, positive_dataset, negative_dataset))
	dataset = dataset.shuffle(buffer_size=1024)
	dataset = dataset.map(preprocess_triplets)
	train_dataset = dataset.take(round(len(dataset) * 0.8))
	val_dataset = dataset.skip(round(len(dataset) * 0.8))

	train_dataset = train_dataset.batch(BATCH_SIZE, drop_remainder=False)
	train_dataset = train_dataset.prefetch(8)

	val_dataset = val_dataset.batch(BATCH_SIZE, drop_remainder=False)
	val_dataset = val_dataset.prefetch(8)
	return (train_dataset, val_dataset)


def get_datasets_test() -> tuple[int, TensorSliceDataset, TensorSliceDataset, TensorSliceDataset]:
	anchor_images = []
	positive_images = []
	negative_images = []
	for index in range(len(test_triplet_lines)):
		temp = test_triplet_lines[index].strip().split(' ')
		anchor_images.append(images_dir + temp[0] + '.jpg')
		if test_triplet_lines[index].strip() == '1':
			positive_images.append(images_dir + temp[1] + '.jpg')
			negative_images.append(images_dir + temp[2] + '.jpg')
		else:
			negative_images.append(images_dir + temp[2] + '.jpg')
			positive_images.append(images_dir + temp[1] + '.jpg')
	dataset = tf.data.Dataset.zip((
		tf.data.Dataset.from_tensor_slices(anchor_images),
		tf.data.Dataset.from_tensor_slices(positive_images),
		tf.data.Dataset.from_tensor_slices(negative_images)
	))
	dataset = dataset.map(preprocess_triplets_test)
	dataset = dataset.batch(BATCH_SIZE, drop_remainder=False)
	dataset = dataset.prefetch(8)
	return dataset


def get_datasets_test_test() -> tuple[int, TensorSliceDataset, TensorSliceDataset, TensorSliceDataset]:
	anchor_images = []
	for filename in os.listdir(images_dir):
		anchor_images.append(images_dir + filename)
	dataset = tf.data.Dataset.from_tensor_slices(anchor_images)
	dataset = dataset.map(preprocess_image)
	dataset = dataset.batch(BATCH_SIZE, drop_remainder=False)
	dataset = dataset.prefetch(8)
	return dataset


def build_model() -> tuple[Model, SiameseModel]:
	embedding = tf.keras.Sequential()
	embedding.add(
		hub.KerasLayer(
		'https://tfhub.dev/google/imagenet/efficientnet_v2_imagenet1k_b0/feature_vector/2',
		input_shape=target_shape + (3,),
		trainable=True)
	)
	embedding.add(layers.Dropout(0.7))
	embedding.add(layers.Dense(4096, activation='relu'))
	embedding.add(layers.Dense(4096, activation='relu'))
	embedding.add(layers.Lambda(lambda  x: tf.keras.backend.l2_normalize(x, axis=1)))

	anchor_input = layers.Input(name='anchor', shape=target_shape + (3,))
	positive_input = layers.Input(name='positive', shape=target_shape + (3,))
	negative_input = layers.Input(name='negative', shape=target_shape + (3,))

	distances = DistanceLayer()(
		embedding(efficientnet_v2.preprocess_input(anchor_input)),
		embedding(efficientnet_v2.preprocess_input(positive_input)),
		embedding(efficientnet_v2.preprocess_input(negative_input)),
	)

	siamese_network = Model(
			inputs=[anchor_input, positive_input, negative_input], outputs=distances
	)
	siamese_model = SiameseModel(siamese_network)
	siamese_model.compile(optimizer=tf.keras.optimizers.Adam(0.00001))
	return siamese_model


In [None]:
train_dataset, val_dataset = get_train_val()
test_dataset = get_datasets_test()
siamese_model = build_model()

In [None]:
siamese_model.fit(train_dataset, epochs=6, validation_data=val_dataset)
# siamese_model.save_weights('./weights.h5')

In [None]:
# siamese_model.load_weights('./weights.h5')

In [None]:
pred = siamese_model.predict(
	test_dataset,
	verbose=1
)

In [None]:
pos = pred[0]
neg = pred[1]
result = open('result.txt', 'w')
for index in range(len(pos)):
	result.write('{}\n'.format(1 if pos[index] < neg[index] else 0))
result.close()