# Setup

In [1]:
import datetime
from pathlib import Path

import pandas as pd
import tensorflow as tf
from tensorflow import keras

%load_ext tensorboard

In [2]:
gpus = tf.config.list_physical_devices("GPU")
if gpus:
    try:
        # Currently, memory growth needs to be the same across GPUs
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
        logical_gpus = tf.config.list_logical_devices("GPU")
        print(len(gpus), "Physical GPUs,", len(logical_gpus), "Logical GPUs")
    except RuntimeError as e:
        # Memory growth must be set before GPUs have been initialized
        print(e)

del gpus

1 Physical GPUs, 1 Logical GPUs


# Build a universal one hot encoder that encodes cross-dataset category and ingredients 

In [3]:
class OneHotEncoder:
    def __init__(self, all_category_list, all_ingredient_list):
        self.all_food_categories = all_category_list
        self.all_food_categories.sort()
        self.all_food_categories_integer_encoded = (
            self.__encode_categories_to_integers()
        )
        self.all_ingredients = all_ingredient_list
        self.all_ingredients.sort()
        self.all_ingredients_integer_encoded = self.__encode_ingredients_to_integers()

    def get_category_one_hot_encoding(self, category_name):
        index = self.all_food_categories_integer_encoded[category_name]
        assert index is not None, f"{category_name} does not have an integer mapping"
        num_classes = len(self.all_food_categories)
        return keras.utils.to_categorical(index, num_classes)

    def get_ingredients_one_hot_encoding(self, ingredient_list):
        ingredient_list = list(
            map(lambda x: self.__transform_ingredient_to_integer(x), ingredient_list)
        )
        multi_one_hot_layer = tf.keras.layers.CategoryEncoding(
            num_tokens=len(self.all_ingredients), output_mode="multi_hot"
        )
        return multi_one_hot_layer(ingredient_list)

    def __transform_ingredient_to_integer(self, ingredient_name):
        index = self.all_ingredients_integer_encoded[ingredient_name]
        assert index is not None, f"{ingredient_name} does not have an integer mapping"
        return index

    def __encode_categories_to_integers(self):
        return {
            category_name: index
            for index, category_name in enumerate(self.all_food_categories)
        }

    def __encode_ingredients_to_integers(self):
        return {
            ingredient_name: index
            for index, ingredient_name in enumerate(self.all_ingredients)
        }

# Build dataset loaders for each dataset

In [4]:
class DatasetLoader:
    def __init__(self, image_dir, metadata_dir, dataset_name):
        self.image_dir = Path(image_dir)
        self.metadata_dir = Path(metadata_dir)
        self.name = dataset_name
        self.metadata = self.load_metadata(
            self.metadata_dir / ("{dataset}_metadata.csv".format(dataset=dataset_name))
        )
        # Default : all_files = metadata, since metadata consists of records of all files
        self.all_files = self.metadata.copy()
        self.all_categories = self.extract_all_categories()
        self.all_ingredients = self.extract_all_ingredients()

    def load_image_to_arr(self, path):
        image = tf.keras.preprocessing.image.load_img(path)
        img_tensor = tf.keras.preprocessing.image.img_to_array(image)
        return tf.image.resize(img_tensor, (224, 224))

    def load_metadata(self, path):
        metadata = pd.read_csv(path, sep="\t")
        new_metadata = metadata.copy()
        new_metadata["dataset_name"] = self.name
        return new_metadata

    def extract_all_categories(self):
        return self.metadata["Category"].unique().tolist()

    def extract_all_ingredients(self):
        unique_ingredients = set()
        for ingredient_list in self.metadata["Ingredients"]:
            ingredient_list = ingredient_list.split(",")
            unique_ingredients.update(ingredient_list)
        return [*unique_ingredients]

    def extract_file_pointers(self):
        dataset_name_col = self.all_files["dataset_name"]
        index_col = self.all_files.index
        return pd.DataFrame(
            {"metadata_index": index_col, "dataset_name": dataset_name_col}
        )

    def get_tensors(self, index, one_hot_encoder):
        img_dir = self.image_dir
        row = self.all_files.loc[index]
        img_path = img_dir / row["Category"] / row["ID/File Name"]
        # img_tensor = self.load_image_to_arr(img_path)
        img_tensor = open(img_path,"rb").read()
        calorie_tensor = row["Calorie(kcal)"]
        carbs_tensor = row["Carbohydrate(g)"]
        protein_tensor = row["Protein(g)"]
        fat_tensor = row["Fat(g)"]
        # one_hot_category_tensor = one_hot_encoder.get_category_one_hot_encoding(
        #     row["Category"]
        # )
        # one_hot_ingredient_tensor = one_hot_encoder.get_ingredients_one_hot_encoding(
        #     row["Ingredients"].split(",")
        # )
        return tf.constant(img_tensor), {
            "category_output": tf.constant(row["Category"]),
            "calorie_output": tf.constant(calorie_tensor),
            "carbs_output": tf.constant(carbs_tensor),
            "protein_output": tf.constant(protein_tensor),
            "fat_output": tf.constant(fat_tensor),
            "ingredients_output": tf.constant(row["Ingredients"]),
        }

    def flatten_tensors(self, tensor):
        result = []
        img_data = tensor[0].numpy()
        others_data = [value.numpy() for key, value in tensor[1].items()]
        result.append(img_data)
        result.extend(others_data)
        return result

    def __len__(self):
        return len(self.metadata)

In [5]:
class Recipes5k(DatasetLoader):
    def __init__(self, image_dir, metadata_dir):
        super().__init__(image_dir, metadata_dir, "recipes5k")

In [6]:
class Nutrition5k(DatasetLoader):
    def __init__(self, image_dir, metadata_dir):
        super().__init__(image_dir, metadata_dir, "nutrition5k")
        # Modify all_files since nutrition5k metadata only consists dish_level metadata not image_level
        self.all_files = pd.read_csv(self.metadata_dir / "nutrition5k_all_images.csv")

    # Override method from DatasetLoader
    def get_tensors(self, index, one_hot_encoder):
        img_dir = self.image_dir
        row = self.all_files.loc[index]
        img_path = img_dir / "generic" / row["dish_id"] / row["ID/File Name"]
        # img_tensor = self.load_image_to_arr(img_path)
        img_tensor = open(img_path,"rb").read()
        dish_metadata_row = self.metadata.loc[
            self.metadata["dish_id"] == row["dish_id"]
        ].squeeze()
        calorie_tensor = dish_metadata_row["Calorie(kcal)"]
        carbs_tensor = dish_metadata_row["Carbohydrate(g)"]
        protein_tensor = dish_metadata_row["Protein(g)"]
        fat_tensor = dish_metadata_row["Fat(g)"]
        # one_hot_category_tensor = one_hot_encoder.get_category_one_hot_encoding(
        #     dish_metadata_row["Category"]
        # )
        # one_hot_ingredient_tensor = one_hot_encoder.get_ingredients_one_hot_encoding(
        #     dish_metadata_row["Ingredients"].split(",")
        # )
        return tf.constant(img_tensor), {
            "category_output": tf.constant(dish_metadata_row["Category"]),
            "calorie_output": tf.constant(calorie_tensor),
            "carbs_output": tf.constant(carbs_tensor),
            "protein_output": tf.constant(protein_tensor),
            "fat_output": tf.constant(fat_tensor),
            "ingredients_output": tf.constant(dish_metadata_row["Ingredients"]),
        }

    # Overrding the method from DatasetLoader
    def __len__(self):
        return len(self.all_files)

In [7]:
class Food101(DatasetLoader):
    def __init__(self, image_dir, metadata_dir):
        super().__init__(image_dir, metadata_dir, "food101")

# Initializing one hot encoder

In [8]:
# Get all the categories and ingredients from all datasets

# Initialize dataset loader without one-hot encoder to get all unique category and ingredients from each dataset
recipes5k = Recipes5k(
    image_dir="../Food Datasets/final-dataset/images",
    metadata_dir="../Food Datasets/final-dataset/metadata",
)
nutrition5k = Nutrition5k(
    image_dir="../Food Datasets/final-dataset/images",
    metadata_dir="../Food Datasets/final-dataset/metadata",
)
food101 = Food101(
    image_dir="../Food Datasets/final-dataset/images",
    metadata_dir="../Food Datasets/final-dataset/metadata",
)

DATASETS = [recipes5k, nutrition5k, food101]
DATASETS_NAME = [x.name for x in DATASETS]


def create_one_hot_encoder(datasets):
    all_categories = []
    all_ingredients = []
    for x in datasets:
        all_categories.extend(x.all_categories)
        all_ingredients.extend(x.all_ingredients)
    all_categories = set(all_categories)
    all_ingredients = set(all_ingredients)
    return OneHotEncoder([*all_categories], [*all_ingredients])

In [9]:
ONE_HOT_ENCODER = create_one_hot_encoder(DATASETS)

# Building dataset pipeline

In [10]:
def get_dataset_cardinality(datasets):
    dataset_samples = [len(x) for x in datasets]
    return sum(dataset_samples)


def get_file_data(index, dataset_index):
    index = index.numpy()
    dataset_index = dataset_index.numpy()
    target_dataset = DATASETS[dataset_index]
    return target_dataset.flatten_tensors(
        target_dataset.get_tensors(index, ONE_HOT_ENCODER)
    )


def transform_file_pointers(index, dataset_index):
    result = tf.py_function(
        get_file_data,
        [index, dataset_index],
        [
            tf.string,
            tf.string,
            tf.float32,
            tf.float32,
            tf.float32,
            tf.float32,
            tf.string,
        ],
    )
    return tf.data.Dataset.from_tensors(tuple(result))


def build_data_pipeline(datasets, sample_size=None):
    if sample_size is None:
        sample_size = [1.0] * len(datasets)
    assert len(sample_size) == len(
        datasets
    ), "Illegal array of sample sizes provided. Number of sample size does not match number of datasets"
    file_pointers = [
        x.extract_file_pointers().sample(frac=s) for x, s in zip(datasets, sample_size)
    ]
    all_file_pointers = pd.concat(file_pointers).sample(frac=1)
    print(f"Total samples : {len(all_file_pointers)}")

    all_file_pointers["dataset_name"] = all_file_pointers["dataset_name"].apply(
        lambda x: DATASETS_NAME.index(x)
    )

    final_dataset = tf.data.Dataset.from_tensor_slices(
        (
            all_file_pointers["metadata_index"].tolist(),
            all_file_pointers["dataset_name"].tolist(),
        )
    )
    final_dataset = final_dataset.interleave(
        lambda index, name: transform_file_pointers(index, name),
        num_parallel_calls=tf.data.AUTOTUNE,
    ).prefetch(tf.data.AUTOTUNE)
    return final_dataset

In [11]:
recipes5k_dataset = build_data_pipeline([recipes5k])
food101_dataset = build_data_pipeline([food101])
nutrition5k_dataset = build_data_pipeline([nutrition5k])

Total samples : 4826
Total samples : 101000
Total samples : 271407


In [12]:
list(recipes5k_dataset.take(2))

[(<tf.Tensor: shape=(), dtype=string, numpy=b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x00\x00\x01\x00\x01\x00\x00\xff\xdb\x00\x84\x00\x03\x02\x02\n\n\n\n\x0b\n\n\n\x0e\x0b\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\x08\n\n\x0b\n\n\n\n\n\n\n\n\x08\x08\n\n\n\n\n\x0b\r\n\n\r\x08\n\n\x08\x01\x03\x04\x04\x06\x05\x06\n\x06\x06\n\x10\x0e\x0b\x0e\x10\x10\x10\x10\x10\x10\x0f\x0f\x10\x10\x10\x10\x10\x0f\x0f\x0f\x0f\x0f\x0f\x0f\x0f\x0f\r\r\x0f\x0f\x10\r\x0f\r\r\x0f\x0f\r\r\r\r\r\r\r\r\r\r\r\r\r\r\r\r\r\xff\xc0\x00\x11\x08\x00\xf0\x01h\x03\x01\x11\x00\x02\x11\x01\x03\x11\x01\xff\xc4\x00\x1d\x00\x00\x02\x03\x01\x01\x01\x01\x01\x00\x00\x00\x00\x00\x00\x00\x00\x06\x07\x04\x05\x08\x03\t\x02\x01\x00\xff\xc4\x00B\x10\x00\x02\x02\x00\x04\x04\x04\x03\x06\x05\x03\x03\x03\x03\x05\x00\x01\x02\x03\x11\x00\x04\x12!\x05\x06"1\x07\x13AQ2aq\x08\x14#B\x81\x91R\xa1\xb1\xc1\xf03b\xd1\x15C\xf1$r\xe1\t\x16\x92\x17%c\x82\xc2\xff\xc4\x00\x1c\x01\x00\x02\x03\x01\x01\x01\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x05\x06

## Serializing Data Pipeline to TFRecord

In [13]:
def _bytes_feature(value):
    """Returns a bytes_list from a string / byte."""
    if isinstance(value, type(tf.constant(0))):
        value = value.numpy()  # BytesList won't unpack a string from an EagerTensor.
    return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))


def _float_feature(value):
    """Returns a float_list from a float / double."""
    return tf.train.Feature(float_list=tf.train.FloatList(value=[value]))


def _int64_feature(value):
    """Returns an int64_list from a bool / enum / int / uint."""
    return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))

In [14]:
def serialize_example(
    image_raw,
    category_feature,
    calorie_feature,
    carbs_feature,
    protein_feature,
    fat_feature,
    ingredient_feature,
):

    feature = {
        "image_raw": _bytes_feature(image_raw),
        "category": _bytes_feature(category_feature),
        "calorie": _float_feature(calorie_feature),
        "carbs": _float_feature(carbs_feature),
        "protein": _float_feature(protein_feature),
        "fat": _float_feature(fat_feature),
        "ingredients": _bytes_feature(ingredient_feature),
    }

    # Create a Features message using tf.train.Example.
    example_proto = tf.train.Example(features=tf.train.Features(feature=feature))
    return example_proto.SerializeToString()

In [15]:
def tf_serialize_example(
    image_raw,
    category_feature,
    calorie_feature,
    carbs_feature,
    protein_feature,
    fat_feature,
    ingredient_feature,
):
    tf_string = tf.py_function(
        serialize_example,
        (
            image_raw,
            category_feature,
            calorie_feature,
            carbs_feature,
            protein_feature,
            fat_feature,
            ingredient_feature,
        ),
        tf.string,
    )
    return tf.reshape(tf_string, ())

In [16]:
serialized_recipes5k = recipes5k_dataset.map(tf_serialize_example)

In [None]:
serialized_food101 = food101_dataset.map(tf_serialize_example)

In [None]:
serialized_nutrition5k = nutrition5k_dataset.map(tf_serialize_example)

### Shard and write to TFRecord file

In [17]:
def shard_and_write(dataset, num_shards, path):
    for i in range(num_shards):
        dest_path = Path(path) / f"{i}.tfrecord"
        writer = tf.data.experimental.TFRecordWriter(dest_path.as_posix(), "GZIP")
        current_shard = dataset.shard(num_shards, i)
        writer.write(current_shard)

In [18]:
shard_and_write(
    serialized_recipes5k,
    10,
    f"../Food Datasets/final-dataset/tfrecord/{recipes5k.name}",
)

Instructions for updating:
To write TFRecords to disk, use `tf.io.TFRecordWriter`. To save and load the contents of a dataset, use `tf.data.experimental.save` and `tf.data.experimental.load`


In [None]:
shard_and_write(
    serialized_food101, 30, f"../Food Datasets/final_dataset/tfrecord/{food101.name}"
)

In [None]:
shard_and_write(
    serialized_nutrition5k,
    100,
    f"../Food Datasets/final_dataset/tfrecord/{nutrition5k.name}",
)

## Loading and deserializing TFRecord file 

In [None]:
# Create a description of the features.

feature_description = {
    "image_raw": tf.io.FixedLenFeature([], tf.string),
    "category": tf.io.FixedLenFeature([], tf.string),
    "calorie": tf.io.FixedLenFeature([], tf.float32),
    "carbs": tf.io.FixedLenFeature([], tf.float32),
    "protein": tf.io.FixedLenFeature([], tf.float32),
    "fat": tf.io.FixedLenFeature([], tf.float32),
    "ingredients": tf.io.FixedLenFeature([], tf.string),
}


def _parse_function(example_proto):
    # Parse the input `tf.train.Example` proto using the dictionary above.
    result = tf.io.parse_single_example(example_proto, feature_description)
    image_decoded = tf.io.parse_tensor(result["image_raw"], tf.float32)
    category_decoded = tf.io.parse_tensor(result["category"], tf.float32)
    ingredient_decoded = tf.io.parse_tensor(result["ingredients"], tf.float32)
    return (
        image_decoded,
        {
            "category_output": category_decoded,
            "calorie_output": result["calorie"],
            "carbs_output": result["carbs"],
            "protein_output": result["protein"],
            "fat_output": result["fat"],
            "ingredients_output": ingredient_decoded,
        },
    )

In [None]:
x = list(serialized_features_dataset.take(1))

In [None]:
y = _parse_function(x[0])

# Model Development

In [None]:
class BaseModel:
    def __init__(
        self, input_shape, total_food_category, total_ingredients_category, name
    ):
        self.input_shape = input_shape
        self.input_layer = self.get_input_layer()
        self.model = None
        self.name = name

    def get_input_layer(self):
        return keras.Input(shape=self.input_shape)

    def save_model(self, path):
        assert self.model is not None
        self.model.save(path, save_format="h5")

    def load_model(self, path):
        self.model = keras.models.load_model(path)

    def train_model(self, reset_cache=True, enforce_backup_empty=True, **kwargs):
        cache_dir = Path("./temp/dataset_cache")
        backup_dir = Path(f"./temp/backup/{self.name}")
        if reset_cache:
            for file in cache_dir.iterdir():
                file.unlink()
        if enforce_backup_empty and backup_dir.exists():
            assert (
                len(list(backup_dir.iterdir())) == 0
            ), f"WARNING : THERE IS A BACKUP FILE FOR THIS MODEL : {self.name}."

        return self.model.fit(**kwargs, callbacks=self.get_callbacks())

    def get_callbacks(self):
        tensorboard_dir = f"./models/{self.name}/tensorboard/{datetime.datetime.now().strftime('%Y%m%d-%H%M%S')}"
        tensorboard_callback = keras.callbacks.TensorBoard(
            log_dir=tensorboard_dir, profile_batch=(100, 500)
        )
        backup_restore_dir = f"./temp/backup/{self.name}"
        backup_restore_callback = keras.callbacks.BackupAndRestore(
            backup_dir=backup_restore_dir
        )
        checkpoint_dir = f"./temp/checkpoint/{self.name}"
        checkpoint_file = checkpoint_dir + "/{epoch:03d}-{val_loss:.2f}.hdf5"
        checkpoint_callback = keras.callbacks.ModelCheckpoint(
            checkpoint_file, monitor="val_loss", save_best_only=True, mode="min"
        )
        early_stopping_callback = keras.callbacks.EarlyStopping("val_loss", patience=5)
        return [
            tensorboard_callback,
            early_stopping_callback,
            backup_restore_callback,
            checkpoint_callback,
        ]

## Flat Model

In [None]:
class FlatModel(BaseModel):
    def __init__(
        self, input_shape, total_food_category, total_ingredients_category, name
    ):
        super().__init__(
            input_shape, total_food_category, total_ingredients_category, name
        )
        self.augmentation_layers = self.get_augmentation_layers()
        self.preprocess_layers = self.get_preprocess_layers(self.augmentation_layers)
        self.convolution_block = self.get_convolution_block()
        self.shared_layers = self.get_shared_layers(self.convolution_block)

        self.category_classification_layers = self.get_category_classification_layers(
            self.shared_layers, total_food_category
        )
        self.shared_nutrition_regression_layers = self.get_shared_nutrition_layers(
            self.shared_layers
        )
        self.calorie_regression_layers = self.get_calorie_regression_layers(
            self.shared_nutrition_regression_layers
        )
        self.carbs_regression_layers = self.get_carbs_regression_layers(
            self.shared_nutrition_regression_layers
        )
        self.protein_regression_layers = self.get_protein_regression_layers(
            self.shared_nutrition_regression_layers
        )
        self.fat_regression_layers = self.get_fat_regression_layers(
            self.shared_nutrition_regression_layers
        )
        self.ingredients_multilabel_layers = self.get_ingredients_multilabel_layers(
            self.shared_layers, total_ingredients_category
        )

    def get_augmentation_layers(self):
        input_layer = keras.layers.Input(shape=self.input_shape)
        augmentation_layer = keras.layers.RandomFlip()(input_layer)
        augmentation_layer = keras.layers.RandomRotation(0.2)(augmentation_layer)
        return keras.Model(
            inputs=input_layer, outputs=augmentation_layer, name="augmentation_layers"
        )

    def get_preprocess_layers(self, previous_layer):
        return None

    def get_convolution_block(self):
        return None

    def get_shared_layers(self, previous_layer):
        input_layer = keras.layers.Input(shape=previous_layer.output_shape[1:])
        shared_layer = keras.layers.Flatten()(input_layer)
        shared_layer = keras.layers.Dense(
            2048, activation="relu", name="shared_dense_1"
        )(shared_layer)
        shared_layer = keras.layers.BatchNormalization()(shared_layer)
        output_layer = keras.layers.Dropout(0.3)(shared_layer)
        return keras.Model(
            inputs=input_layer, outputs=output_layer, name="shared_layers"
        )

    def get_category_classification_layers(self, previous_layer, total_categories):
        input_layer = keras.layers.Input(shape=previous_layer.output_shape[1:])
        category_classification_layer = keras.layers.Dense(
            total_categories, activation="softmax", name="category_output"
        )(input_layer)
        return keras.Model(
            inputs=input_layer,
            outputs=category_classification_layer,
            name="category_output",
        )

    def get_shared_nutrition_layers(self, previous_layer):
        input_layer = keras.layers.Input(shape=previous_layer.output_shape[1:])
        shared_nutrition_regression_layers = keras.layers.Dense(
            256, activation="relu", name="nutrition_dense_1"
        )(input_layer)
        shared_nutrition_regression_layers = keras.layers.BatchNormalization()(
            shared_nutrition_regression_layers
        )
        output_layer = keras.layers.Dropout(0.2)(shared_nutrition_regression_layers)

        return keras.Model(
            inputs=input_layer,
            outputs=output_layer,
            name="nutrition_regression_shared_layers",
        )

    def get_calorie_regression_layers(self, previous_layer):
        input_layer = keras.layers.Input(shape=previous_layer.output_shape[1:])
        calorie_regression_layers = keras.layers.Dense(1, name="calorie_output")(
            input_layer
        )
        return keras.Model(
            inputs=input_layer, outputs=calorie_regression_layers, name="calorie_output"
        )

    def get_carbs_regression_layers(self, previous_layer):
        input_layer = keras.layers.Input(shape=previous_layer.output_shape[1:])
        carbs_regression_layers = keras.layers.Dense(1, name="carbs_output")(
            input_layer
        )
        return keras.Model(
            inputs=input_layer, outputs=carbs_regression_layers, name="carbs_output"
        )

    def get_protein_regression_layers(self, previous_layer):
        input_layer = keras.layers.Input(shape=previous_layer.output_shape[1:])
        protein_regression_layers = keras.layers.Dense(1, name="protein_output")(
            input_layer
        )
        return keras.Model(
            inputs=input_layer, outputs=protein_regression_layers, name="protein_output"
        )

    def get_fat_regression_layers(self, previous_layer):
        input_layer = keras.layers.Input(shape=previous_layer.output_shape[1:])
        fat_regression_layers = keras.layers.Dense(1, name="fat_output")(input_layer)
        return keras.Model(
            inputs=input_layer, outputs=fat_regression_layers, name="fat_output"
        )

    def get_ingredients_multilabel_layers(self, previous_layer, total_ingredients):
        input_layer = keras.layers.Input(shape=previous_layer.output_shape[1:])
        ingredients_multilabel_layers = []
        ingredients_multilabel_layers = keras.layers.Dense(
            512, activation="relu", name="ingredients_dense_1"
        )(input_layer)
        ingredients_multilabel_layers = keras.layers.BatchNormalization()(
            ingredients_multilabel_layers
        )
        ingredients_multilabel_layers = keras.layers.Dropout(0.2)(
            ingredients_multilabel_layers
        )
        output_layers = keras.layers.Dense(
            total_ingredients, activation="sigmoid", name="ingredients_output"
        )(ingredients_multilabel_layers)
        return keras.Model(
            inputs=input_layer, outputs=output_layers, name="ingredients_output"
        )

    def build_and_compile(
        self,
        category_classification_loss=keras.losses.CategoricalCrossentropy(),
        calorie_regression_loss=keras.losses.MeanAbsoluteError(),
        carbs_regression_loss=keras.losses.MeanAbsoluteError(),
        protein_regression_loss=keras.losses.MeanAbsoluteError(),
        fat_regression_loss=keras.losses.MeanAbsoluteError(),
        ingredient_multilabel_loss=keras.losses.BinaryCrossentropy(),
        category_classification_metrics=[
            keras.metrics.CategoricalAccuracy(),
            keras.metrics.Precision(),
            keras.metrics.Recall(),
        ],
        calorie_regression_metrics=[keras.metrics.MeanAbsoluteError()],
        carbs_regression_metrics=[keras.metrics.MeanAbsoluteError()],
        protein_regression_metrics=[keras.metrics.MeanAbsoluteError()],
        fat_regression_metrics=[keras.metrics.MeanAbsoluteError()],
        ingredient_multilabel_metrics=[
            keras.metrics.TopKCategoricalAccuracy(1, name="Ingredients_Top1_Acc"),
            keras.metrics.TopKCategoricalAccuracy(5, name="Ingredients_Top5_Acc"),
        ],
        category_classification_loss_weights=1.0,
        ingredient_multilabel_loss_weights=1.0,
        calorie_regression_loss_weights=1.0,
        carbs_regression_loss_weights=1.0,
        protein_regression_loss_weights=1.0,
        fat_regression_loss_weights=1.0,
    ):
        assert (
            self.preprocess_layers is not None
        ), "Error: No preprocess layers exists. Class FlatModel should not be instantiated."
        assert (
            self.convolution_block is not None
        ), "Error: No convolution block exists. Class FlatModel should not be instantiated."
        model_inputs = self.input_layer
        x = self.augmentation_layers(model_inputs)
        x = self.preprocess_layers(x)
        x = self.convolution_block(x)
        x = self.shared_layers(x)
        category_classification_head = self.category_classification_layers(x)
        nutrition_regression_head = self.shared_nutrition_regression_layers(x)
        ingredients_multilabel_head = self.ingredients_multilabel_layers(x)
        calorie_regression_head = self.calorie_regression_layers(
            nutrition_regression_head
        )
        carbs_regression_head = self.carbs_regression_layers(nutrition_regression_head)
        protein_regression_head = self.protein_regression_layers(
            nutrition_regression_head
        )
        fat_regression_head = self.fat_regression_layers(nutrition_regression_head)

        model = keras.Model(
            inputs=model_inputs,
            outputs=[
                category_classification_head,
                ingredients_multilabel_head,
                calorie_regression_head,
                carbs_regression_head,
                protein_regression_head,
                fat_regression_head,
            ],
            name=self.name,
        )
        model.compile(
            optimizer=keras.optimizers.Adam(learning_rate=0.001),
            loss={
                "category_output": category_classification_loss,
                "calorie_output": calorie_regression_loss,
                "carbs_output": carbs_regression_loss,
                "protein_output": protein_regression_loss,
                "fat_output": fat_regression_loss,
                "ingredients_output": ingredient_multilabel_loss,
            },
            metrics={
                "category_output": category_classification_metrics,
                "calorie_output": calorie_regression_metrics,
                "carbs_output": carbs_regression_metrics,
                "protein_output": protein_regression_metrics,
                "fat_output": fat_regression_metrics,
                "ingredients_output": ingredient_multilabel_metrics,
            },
            loss_weights={
                "category_output": category_classification_loss_weights,
                "calorie_output": calorie_regression_loss_weights,
                "carbs_output": carbs_regression_loss_weights,
                "protein_output": protein_regression_loss_weights,
                "fat_output": fat_regression_loss_weights,
                "ingredients_output": ingredient_multilabel_loss_weights,
            },
        )
        self.model = model
        return model

### MobileNetv2

In [None]:
class FlatMobileNetv2Model(FlatModel):
    def __init__(self, input_shape, total_food_category, total_ingredients_category):
        super().__init__(
            input_shape,
            total_food_category,
            total_ingredients_category,
            "FlatFoodNet_with_MobileNetv2",
        )
        self.preprocess_layers = self.get_preprocess_layers(self.augmentation_layers)
        self.convolution_block = self.get_convolution_block()

    def get_preprocess_layers(self, previous_layer):
        input_layer = keras.layers.Input(shape=previous_layer.output_shape[1:])
        output_layer = keras.applications.mobilenet_v2.preprocess_input(input_layer)
        return keras.Model(
            inputs=input_layer, outputs=output_layer, name="preprocessing_layers"
        )

    def get_convolution_block(self):
        mobilenet_v2_convolution_layers = keras.applications.MobileNetV2(
            input_shape=self.input_shape, include_top=False, weights="imagenet"
        )
        mobilenet_v2_convolution_layers.trainable = False
        return mobilenet_v2_convolution_layers

In [None]:
flat_mobilenetv2 = FlatMobileNetv2Model(
    input_shape=(224, 224, 3),
    total_food_category=len(ONE_HOT_ENCODER.all_food_categories),
    total_ingredients_category=len(ONE_HOT_ENCODER.all_ingredients),
)
flat_mobilenetv2.build_and_compile()
flat_mobilenetv2.model.summary()

In [None]:
keras.utils.plot_model(
    flat_mobilenetv2.model,
    "./models/flat_foodnet_mobilenetv2.png",
    show_shapes=True,
    expand_nested=False,
)

In [None]:
flat_mobilenetv2_history = flat_mobilenetv2.train_model(
    x=training_dataset, epochs=5, verbose=1, validation_data=validation_dataset
)

### ResNet50 Model

In [None]:
class FlatResNet50Model(FlatModel):
    def __init__(self, input_shape, total_food_category, total_ingredients_category):
        super().__init__(
            input_shape,
            total_food_category,
            total_ingredients_category,
            "FlatFoodNet_with_ResNet50",
        )
        self.preprocess_layers = self.get_preprocess_layers(self.augmentation_layers)
        self.convolution_block = self.get_convolution_block()

    def get_preprocess_layers(self, previous_layer):
        input_layer = keras.layers.Input(shape=previous_layer.output_shape[1:])
        output_layer = keras.applications.resnet50.preprocess_input(input_layer)
        return keras.Model(
            inputs=input_layer, outputs=output_layer, name="preprocessing_layers"
        )

    def get_convolution_block(self):
        resnet50_convolution_layers = keras.applications.ResNet50(
            input_shape=self.input_shape, include_top=False, weights="imagenet"
        )
        resnet50_convolution_layers.trainable = False
        return resnet50_convolution_layers

In [None]:
flat_resnet50 = FlatResNet50Model(
    input_shape=(224, 224, 3),
    total_food_category=len(ONE_HOT_ENCODER.all_food_categories),
    total_ingredients_category=len(ONE_HOT_ENCODER.all_ingredients),
)
flat_resnet50.build_and_compile()
flat_resnet50.model.summary()

In [None]:
flat_resnet50_history = flat_resnet50.train_model(
    training_dataset, epochs=1, verbose=1, validation_data=validation_dataset
)

### EfficientNet Model

In [None]:
class FlatEfficientNetV2B0Model(FlatModel):
    def __init__(self, input_shape, total_food_category, total_ingredients_category):
        super().__init__(
            input_shape,
            total_food_category,
            total_ingredients_category,
            "FlatFoodNet_with_EfficientNetV2B0",
        )
        self.preprocess_layers = self.get_preprocess_layers(self.augmentation_layers)
        self.convolution_block = self.get_convolution_block()

    def get_preprocess_layers(self, previous_layer):
        input_layer = keras.layers.Input(shape=previous_layer.output_shape[1:])
        output_layer = keras.applications.efficientnet_v2.preprocess_input(input_layer)
        return keras.Model(
            inputs=input_layer, outputs=output_layer, name="preprocessing_layers"
        )

    def get_convolution_block(self):
        efficientnetv2b0_convolution_layers = (
            keras.applications.efficientnet_v2.EfficientNetV2B0(
                input_shape=self.input_shape, include_top=False, weights="imagenet"
            )
        )
        efficientnetv2b0_convolution_layers.trainable = False
        return efficientnetv2b0_convolution_layers

In [None]:
flat_efficientnetv2b0 = FlatEfficientNetV2B0Model(
    input_shape=(224, 224, 3),
    total_food_category=len(ONE_HOT_ENCODER.all_food_categories),
    total_ingredients_category=len(ONE_HOT_ENCODER.all_ingredients),
)
flat_efficientnetv2b0.build_and_compile()
flat_efficientnetv2b0.model.summary()

In [None]:
flat_efficientnetv2b0_history = flat_efficientnetv2b0.train_model(
    training_dataset, epochs=1, verbose=1, validation_data=validation_dataset
)

## Region Wise Model

In [None]:
class RegionWiseModel(BaseModel):
    def __init__(
        self, input_shape, total_food_category, total_ingredients_category, name
    ):
        super().__init__(
            input_shape, total_food_category, total_ingredients_category, name
        )
        self.augmentation_layers = self.get_augmentation_layers()
        self.preprocess_layers = self.get_preprocess_layers(self.augmentation_layers)
        self.convolution_block = self.get_convolution_block()
        self.shared_layers = self.get_shared_layers(self.convolution_block)

        self.category_classification_layers = self.get_category_classification_layers(
            self.shared_layers, total_food_category
        )
        self.shared_nutrition_regression_layers = self.get_shared_nutrition_layers(
            self.shared_layers
        )
        self.calorie_regression_layers = self.get_calorie_regression_layers(
            self.shared_nutrition_regression_layers
        )
        self.carbs_regression_layers = self.get_carbs_regression_layers(
            self.shared_nutrition_regression_layers
        )
        self.protein_regression_layers = self.get_protein_regression_layers(
            self.shared_nutrition_regression_layers
        )
        self.fat_regression_layers = self.get_fat_regression_layers(
            self.shared_nutrition_regression_layers
        )
        self.ingredients_multilabel_layers = self.get_ingredients_multilabel_layers(
            self.convolution_block, total_ingredients_category
        )

    def get_augmentation_layers(self):
        input_layer = keras.layers.Input(shape=self.input_shape)
        augmentation_layer = keras.layers.RandomFlip()(input_layer)
        augmentation_layer = keras.layers.RandomRotation(0.2)(augmentation_layer)
        return keras.Model(
            inputs=input_layer, outputs=augmentation_layer, name="augmentation_layers"
        )

    def get_preprocess_layers(self, previous_layer):
        return None

    def get_convolution_block(self):
        return None

    def get_shared_layers(self, previous_layer):
        input_layer = keras.layers.Input(shape=previous_layer.output_shape[1:])
        shared_layer = keras.layers.Flatten()(input_layer)
        shared_layer = keras.layers.Dense(
            128, activation="relu", name="shared_dense_1"
        )(shared_layer)
        shared_layer = keras.layers.BatchNormalization()(shared_layer)
        output_layer = keras.layers.Dropout(0.2)(shared_layer)
        return keras.Model(
            inputs=input_layer, outputs=output_layer, name="shared_layers"
        )

    def get_category_classification_layers(self, previous_layer, total_categories):
        input_layer = keras.layers.Input(shape=previous_layer.output_shape[1:])
        category_classification_layer = keras.layers.Dense(
            total_categories, activation="softmax", name="category_output"
        )(input_layer)
        return keras.Model(
            inputs=input_layer,
            outputs=category_classification_layer,
            name="category_output",
        )

    def get_shared_nutrition_layers(self, previous_layer):
        input_layer = keras.layers.Input(shape=previous_layer.output_shape[1:])
        nutrition_regression_layers = keras.layers.Dense(
            128, activation="relu", name="nutrition_dense_1"
        )(input_layer)
        nutrition_regression_layers = keras.layers.BatchNormalization()(
            nutrition_regression_layers
        )
        output_layer = keras.layers.Dropout(0.1)(nutrition_regression_layers)
        return keras.Model(
            inputs=input_layer,
            outputs=output_layer,
            name="nutrition_regression_shared_layers",
        )

    def get_calorie_regression_layers(self, previous_layer):
        input_layer = keras.layers.Input(shape=previous_layer.output_shape[1:])
        calorie_regression_layers = keras.layers.Dense(1, name="calorie_output")(
            input_layer
        )
        return keras.Model(
            inputs=input_layer, outputs=calorie_regression_layers, name="calorie_output"
        )

    def get_carbs_regression_layers(self, previous_layer):
        input_layer = keras.layers.Input(shape=previous_layer.output_shape[1:])
        carbs_regression_layers = keras.layers.Dense(1, name="carbs_output")(
            input_layer
        )
        return keras.Model(
            inputs=input_layer, outputs=carbs_regression_layers, name="carbs_output"
        )

    def get_protein_regression_layers(self, previous_layer):
        input_layer = keras.layers.Input(shape=previous_layer.output_shape[1:])
        protein_regression_layers = keras.layers.Dense(1, name="protein_output")(
            input_layer
        )
        return keras.Model(
            inputs=input_layer, outputs=protein_regression_layers, name="protein_output"
        )

    def get_fat_regression_layers(self, previous_layer):
        input_layer = keras.layers.Input(shape=previous_layer.output_shape[1:])
        fat_regression_layers = keras.layers.Dense(1, name="fat_output")(input_layer)
        return keras.Model(
            inputs=input_layer, outputs=fat_regression_layers, name="fat_output"
        )

    def get_ingredients_multilabel_layers(self, previous_layer, total_ingredients):
        # get the shape of the feature map (batch_size,height,width,channels)
        feature_map_shape = previous_layer.output_shape[1:]
        feature_map_height = feature_map_shape[0]
        feature_map_width = feature_map_shape[1]

        input_layer = keras.layers.Input(feature_map_shape)

        # crop the feature map into grids of feature_map_height * feature_map_width
        region_branches = []
        for row in range(1, feature_map_height + 1):
            top_crop = row - 1
            bottom_crop = feature_map_height - row
            for col in range(1, feature_map_width + 1):
                left_crop = col - 1
                right_crop = feature_map_width - col
                crop_layer = keras.layers.Cropping2D(
                    cropping=((top_crop, bottom_crop), (left_crop, right_crop))
                )(input_layer)
                flatten_layer = keras.layers.Flatten()(crop_layer)
                ingredient_prediction = keras.layers.Dense(
                    total_ingredients, activation="softmax"
                )(flatten_layer)
                reshape_layer = keras.layers.Reshape((1, total_ingredients))(
                    ingredient_prediction
                )  # reshape for global pooling 1D
                region_branches.append(reshape_layer)
        concatenate_layer = keras.layers.Concatenate(axis=1)(region_branches)
        global_max_pooling_layer = keras.layers.GlobalMaxPool1D()(concatenate_layer)
        output_layer = keras.layers.Dense(total_ingredients, activation="sigmoid")(
            global_max_pooling_layer
        )
        return keras.Model(input_layer, output_layer, name="ingredients_output")

    def build_and_compile(
        self,
        category_classification_loss=keras.losses.CategoricalCrossentropy(),
        calorie_regression_loss=keras.losses.MeanAbsoluteError(),
        carbs_regression_loss=keras.losses.MeanAbsoluteError(),
        protein_regression_loss=keras.losses.MeanAbsoluteError(),
        fat_regression_loss=keras.losses.MeanAbsoluteError(),
        ingredient_multilabel_loss=keras.losses.BinaryCrossentropy(),
        category_classification_metrics=[
            keras.metrics.CategoricalAccuracy(),
            keras.metrics.Precision(),
            keras.metrics.Recall(),
        ],
        calorie_regression_metrics=[keras.metrics.MeanAbsoluteError()],
        carbs_regression_metrics=[keras.metrics.MeanAbsoluteError()],
        protein_regression_metrics=[keras.metrics.MeanAbsoluteError()],
        fat_regression_metrics=[keras.metrics.MeanAbsoluteError()],
        ingredient_multilabel_metrics=[
            keras.metrics.TopKCategoricalAccuracy(5),
            keras.metrics.Precision(),
            keras.metrics.Recall(),
        ],
        category_classification_loss_weights=1.0,
        ingredient_multilabel_loss_weights=1.0,
        calorie_regression_loss_weights=1.0,
        carbs_regression_loss_weights=1.0,
        protein_regression_loss_weights=1.0,
        fat_regression_loss_weights=1.0,
    ):
        assert (
            self.preprocess_layers is not None
        ), "Error: No preprocess layers exists. Class RegionWiseModel should not be instantiated."
        assert (
            self.convolution_block is not None
        ), "Error: No convolution block exists. Class RegionWiseModel should not be instantiated."
        model_inputs = self.input_layer
        x = self.augmentation_layers(model_inputs)
        x = self.preprocess_layers(x)
        x = self.convolution_block(x)
        ingredients_multilabel_head = self.ingredients_multilabel_layers(x)
        x = self.shared_layers(x)
        category_classification_head = self.category_classification_layers(x)
        nutrition_regression_head = self.shared_nutrition_regression_layers(x)
        calorie_regression_head = self.calorie_regression_layers(
            nutrition_regression_head
        )
        carbs_regression_head = self.carbs_regression_layers(nutrition_regression_head)
        protein_regression_head = self.protein_regression_layers(
            nutrition_regression_head
        )
        fat_regression_head = self.fat_regression_layers(nutrition_regression_head)

        model = keras.Model(
            inputs=model_inputs,
            outputs=[
                category_classification_head,
                ingredients_multilabel_head,
                calorie_regression_head,
                carbs_regression_head,
                protein_regression_head,
                fat_regression_head,
            ],
            name=self.name,
        )
        model.compile(
            optimizer=keras.optimizers.Adam(learning_rate=0.0001),
            loss={
                "category_output": category_classification_loss,
                "calorie_output": calorie_regression_loss,
                "carbs_output": carbs_regression_loss,
                "protein_output": protein_regression_loss,
                "fat_output": fat_regression_loss,
                "ingredients_output": ingredient_multilabel_loss,
            },
            metrics={
                "category_output": category_classification_metrics,
                "calorie_output": calorie_regression_metrics,
                "carbs_output": carbs_regression_metrics,
                "protein_output": protein_regression_metrics,
                "fat_output": fat_regression_metrics,
                "ingredients_output": ingredient_multilabel_metrics,
            },
            loss_weights={
                "category_output": category_classification_loss_weights,
                "calorie_output": calorie_regression_loss_weights,
                "carbs_output": carbs_regression_loss_weights,
                "protein_output": protein_regression_loss_weights,
                "fat_output": fat_regression_loss_weights,
                "ingredients_output": ingredient_multilabel_loss_weights,
            },
        )
        self.model = model
        return model

### MobileNetv2 Model

In [None]:
class RegionWiseMobileNetv2Model(RegionWiseModel):
    def __init__(self, input_shape, total_food_category, total_ingredients_category):
        super().__init__(
            input_shape,
            total_food_category,
            total_ingredients_category,
            "RegionWiseFoodNet_with_MobileNetv2",
        )
        self.preprocess_layers = self.get_preprocess_layers(self.augmentation_layers)
        self.convolution_block = self.get_convolution_block()

    def get_preprocess_layers(self, previous_layer):
        input_layer = keras.layers.Input(shape=previous_layer.output_shape[1:])
        output_layer = keras.applications.mobilenet_v2.preprocess_input(input_layer)
        return keras.Model(
            inputs=input_layer, outputs=output_layer, name="preprocessing_layers"
        )

    def get_convolution_block(self):
        mobilenet_v2_convolution_layers = keras.applications.MobileNetV2(
            input_shape=self.input_shape, include_top=False, weights="imagenet"
        )
        mobilenet_v2_convolution_layers.trainable = False
        return mobilenet_v2_convolution_layers

In [None]:
regionwise_foodnet_mobilenetv2 = RegionWiseMobileNetv2Model(
    (224, 224, 3),
    len(ONE_HOT_ENCODER.all_food_categories),
    len(ONE_HOT_ENCODER.all_ingredients),
)
regionwise_foodnet_mobilenetv2.build_and_compile()
regionwise_foodnet_mobilenetv2.model.summary()

In [None]:
keras.utils.plot_model(
    regionwise_foodnet_mobilenetv2.model,
    "./models/regionwise_foodnet_mobilenetv2.png",
    show_shapes=True,
    expand_nested=False,
)

In [None]:
regionwise_foodnet_mobilenetv2.model.fit(
    training_dataset, epochs=1, verbose=1, validation_data=validation_dataset
)

# Model Evaluation

## Flat Model

### MobileNetv2

In [None]:
%tensorboard --logdir ./models/FlatFoodNet_with_MobileNetv2/tensorboard/

# Testing 

In [None]:
def load_recipe5k_metadata():
    directory = (
        Path("../Food Datasets/final-dataset") / "metadata" / "recipes5k_metadata.csv"
    )
    return pd.read_csv(directory, sep="\t")

In [None]:
data = load_recipe5k_metadata()

In [None]:
data.head()

In [None]:
test_recipes5k = Recipes5k()

In [None]:
len(test_recipes5k)

In [None]:
test_gen_func = test_recipes5k.generate_dataset()

In [None]:
test_dataset = test_recipes5k.get_dataset()

In [None]:
list(recipes5k.take(1))

In [None]:
test_model = Model()

In [None]:
test_model = test_model.build_and_compile()

In [None]:
test_model.summary()

In [None]:
test_model.fit(
    x=test_recipes5k.training_dataset,
    epochs=1,
    verbose=1,
    validation_data=test_recipes5k.validation_dataset,
)

In [None]:
test = tf.constant([[[1, 2, 3]]])

In [None]:
test.shape

In [None]:
tf.expand_dims(test, axis=0)

In [None]:
test_recipes5k.training_dataset

In [None]:
test_recipes5k.validation_dataset

In [None]:
row = 0
for x in test_recipes5k.training_dataset:
    row += 1
print(row)

In [None]:
# check training dataset (first 70%)
training_rows = int(len(recipes5k) * 0.7)
training_data = recipes5k.metadata.iloc[:training_rows]
validation_data = recipes5k.metadata.iloc[training_rows:,]

In [None]:
from pprint import pprint

In [None]:
# check categories of training
category_count = {x: 0 for x in recipes5k.extract_all_categories()}
for x in training_data["Category"].tolist():
    category_count[x] += 1

In [None]:
pprint(category_count)

In [None]:
validation_count = {x: 0 for x in recipes5k.extract_all_categories()}
for x in validation_data["Category"].tolist():
    validation_count[x] += 1

In [None]:
pprint(validation_count)

In [None]:
list(recipes5k.training_dataset.take(1))

In [None]:
mobilenet_v2_convolution_layers = keras.applications.MobileNetV2(
    input_shape=(224, 224, 3), include_top=False, weights="imagenet"
)

In [None]:
mobilenet_v2_convolution_layers.layers[-1].output_shape[1:3]

In [None]:
mobilenet_v2_convolution_layers.output_shape

In [None]:
test_model = RegionWise_FoodNet_MobileNetv2()

In [None]:
final_model = test_model.mobilenetv2_convolution_block(test_model.input_layer)

In [None]:
final_model = test_model.ingredient_classifier(final_model)

In [None]:
t = keras.Model(test_model.input_layer, final_model)

In [None]:
t.summary()

In [None]:
test_model.ingredient_classifier.summary()

In [None]:
import numpy as np

x = keras.layers.concatenate(
    [
        tf.constant([1, 0, 1]),
        tf.constant([0, 0, 2]),
        tf.constant([1, 0, 1]),
        tf.constant([5, 0, 1]),
    ],
    axis=0,
)

In [None]:
s = tf.reshape(x, (1, 4, 3))

In [None]:
keras.layers.MaxPooling1D()(s)

In [None]:
keras.layers.GlobalMaxPooling1D()(s)

In [None]:
df = tf.data.Dataset.range(300000)
df = df.shuffle(200000)
list(df.take(1))

In [None]:
#     # Overriding the method from DatasetLoader
#     def generate_dataset(self):
#         img_dir = self.dir_path / "images"
#         for index, row in self.metadata.iterrows():
#             dish_dir = img_dir / row["Category"] / row["dish_id"]
#             for img_path in dish_dir.iterdir():
#                 assert (
#                     img_path.suffix == ".jpeg" or img_path.suffix == ".png"
#                 ), f"{img_path} is not an expected image file"
#                 img_tensor = self.load_image_to_arr(img_path)
#                 calorie_tensor = row["Calorie(kcal)"]
#                 carbs_tensor = row["Carbohydrate(g)"]
#                 protein_tensor = row["Protein(g)"]
#                 fat_tensor = row["Fat(g)"]
#                 one_hot_category_tensor = (
#                     self.one_hot_encoder.get_category_one_hot_encoding(row["Category"])
#                 )
#                 one_hot_ingredient_tensor = (
#                     self.one_hot_encoder.get_ingredients_one_hot_encoding(
#                         row["Ingredients"].split(",")
#                     )
#                 )
#                 yield tf.constant(img_tensor), {
#                     "category_output": tf.constant(one_hot_category_tensor),
#                     "calorie_output": tf.constant(calorie_tensor),
#                     "carbs_output": tf.constant(carbs_tensor),
#                     "protein_output": tf.constant(protein_tensor),
#                     "fat_output": tf.constant(fat_tensor),
#                     "ingredients_output": one_hot_ingredient_tensor,
#                 }

#     # Overriding the method from DatasetLoader
#     def get_dataset(self):
#         dataset = tf.data.Dataset.from_generator(
#             self.generate_dataset,
#             output_signature=(
#                 tf.TensorSpec(shape=(224, 224, 3), dtype=tf.dtypes.float32),
#                 {
#                     "category_output": tf.TensorSpec(
#                         shape=(len(self.one_hot_encoder.all_food_categories)),
#                         dtype=tf.dtypes.float32,
#                     ),
#                     "calorie_output": tf.TensorSpec(shape=(), dtype=tf.dtypes.float32),
#                     "carbs_output": tf.TensorSpec(shape=(), dtype=tf.dtypes.float32),
#                     "protein_output": tf.TensorSpec(shape=(), dtype=tf.dtypes.float32),
#                     "fat_output": tf.TensorSpec(shape=(), dtype=tf.dtypes.float32),
#                     "ingredients_output": tf.TensorSpec(
#                         shape=(len(self.one_hot_encoder.all_ingredients)),
#                         dtype=tf.dtypes.float32,
#                     ),
#                 },
#             ),
#         )
#         return dataset.shuffle(len(self.metadata) * 30, seed=1234)

In [None]:
tf.constant("sting").numpy()

In [None]:
dataset = tf.data.Dataset.range(250)

In [None]:
# reset_index -> range(total_index) -> shuffle -> interleave

In [None]:
dataset = tf.data.Dataset.from_tensors((1, {"a": 1, "b": 2}))

In [None]:
list(dataset.as_numpy_iterator())

In [None]:
def get_dataset_cardinality(datasets):
    dataset_samples = [len(x) for x in datasets]
    return sum(dataset_samples)


def get_file_data(index, dataset_index):
    index = index.numpy()
    dataset_index = dataset_index.numpy()
    target_dataset = DATASETS[dataset_index]
    return target_dataset.flatten_tensors(
        target_dataset.get_tensors(index, ONE_HOT_ENCODER)
    )


def transform_file_pointers(index, dataset_index):
    result = tf.py_function(
        get_file_data,
        [index, dataset_index],
        [
            tf.float32,
            tf.float32,
            tf.float32,
            tf.float32,
            tf.float32,
            tf.float32,
            tf.float32,
        ],
    )
    return result[0], {
        "category_output": result[1],
        "calorie_output": result[2],
        "carbs_output": result[3],
        "protein_output": result[4],
        "fat_output": result[5],
        "ingredients_output": result[6],
    }


def build_data_pipeline(datasets, sample_size=None, training_split=0.7, batch_size=32):
    if sample_size is None:
        sample_size = [1.0] * len(datasets)
    assert len(sample_size) == len(
        datasets
    ), "Illegal array of sample sizes provided. Number of sample size does not match number of datasets"
    file_pointers = [
        x.extract_file_pointers().sample(frac=s) for x, s in zip(datasets, sample_size)
    ]
    all_file_pointers = pd.concat(file_pointers).sample(frac=1)
    print(f"Total samples : {len(all_file_pointers)}")

    all_file_pointers["dataset_name"] = all_file_pointers["dataset_name"].apply(
        lambda x: DATASETS_NAME.index(x)
    )
    training_size = int(len(all_file_pointers) * training_split)
    # training_dataset = (
    #     tf.data.Dataset.from_generator(
    #         get_generator(all_file_pointers.iloc[:training_size]),
    #         output_signature=(
    #             tf.TensorSpec(shape=(224, 224, 3), dtype=tf.dtypes.float32),
    #             {
    #                 "category_output": tf.TensorSpec(
    #                     shape=(len(ONE_HOT_ENCODER.all_food_categories)),
    #                     dtype=tf.dtypes.float32,
    #                 ),
    #                 "calorie_output": tf.TensorSpec(shape=(), dtype=tf.dtypes.float32),
    #                 "carbs_output": tf.TensorSpec(shape=(), dtype=tf.dtypes.float32),
    #                 "protein_output": tf.TensorSpec(shape=(), dtype=tf.dtypes.float32),
    #                 "fat_output": tf.TensorSpec(shape=(), dtype=tf.dtypes.float32),
    #                 "ingredients_output": tf.TensorSpec(
    #                     shape=(len(ONE_HOT_ENCODER.all_ingredients)),
    #                     dtype=tf.dtypes.float32,
    #                 ),
    #             },
    #         ),
    #     )
    #     .cache(filename="./dataset_cache/train")
    #     .batch(batch_size)
    #     .prefetch(tf.data.AUTOTUNE)
    # )
    # validation_dataset = (
    #     tf.data.Dataset.from_generator(
    #         get_generator(all_file_pointers.iloc[training_size:]),
    #         output_signature=(
    #             tf.TensorSpec(shape=(224, 224, 3), dtype=tf.dtypes.float32),
    #             {
    #                 "category_output": tf.TensorSpec(
    #                     shape=(len(ONE_HOT_ENCODER.all_food_categories)),
    #                     dtype=tf.dtypes.float32,
    #                 ),
    #                 "calorie_output": tf.TensorSpec(shape=(), dtype=tf.dtypes.float32),
    #                 "carbs_output": tf.TensorSpec(shape=(), dtype=tf.dtypes.float32),
    #                 "protein_output": tf.TensorSpec(shape=(), dtype=tf.dtypes.float32),
    #                 "fat_output": tf.TensorSpec(shape=(), dtype=tf.dtypes.float32),
    #                 "ingredients_output": tf.TensorSpec(
    #                     shape=(len(ONE_HOT_ENCODER.all_ingredients)),
    #                     dtype=tf.dtypes.float32,
    #                 ),
    #             },
    #         ),
    #     )
    #     .cache(filename="./dataset_cache/validation")
    #     .batch(batch_size)
    #     .prefetch(tf.data.AUTOTUNE)
    # )

    final_dataset = tf.data.Dataset.from_tensor_slices(
        (
            all_file_pointers["metadata_index"].tolist(),
            all_file_pointers["dataset_name"].tolist(),
        )
    )
    training_dataset = (
        final_dataset.take(training_size)
        .map(
            lambda index, name: transform_file_pointers(index, name),
            num_parallel_calls=tf.data.AUTOTUNE,
        )
        .cache(filename="./dataset_cache/train")
        .batch(batch_size)
        .prefetch(tf.data.AUTOTUNE)
    )
    validation_dataset = (
        final_dataset.skip(training_size)
        .map(
            lambda index, name: transform_file_pointers(index, name),
            num_parallel_calls=tf.data.AUTOTUNE,
        )
        .cache(filename="./dataset_cache/validation")
        .batch(batch_size)
        .prefetch(tf.data.AUTOTUNE)
    )
    print(f"Training size : {training_size}")
    print(f"Validation size : {len(all_file_pointers)-training_size}")
    return training_dataset, validation_dataset

In [None]:
backup_dir = Path(f"./temp/backup/")

In [None]:
list(backup_dir.iterdir())