In [1]:
# !pip install optuna

In [2]:
import os
import pickle
import random
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from official.nlp import optimization
import optuna


TensorFlow Addons (TFA) has ended development and introduction of new features.
TFA has entered a minimal maintenance and release mode until a planned end of life in May 2024.
Please modify downstream libraries to take dependencies from other repositories in our TensorFlow community (e.g. Keras, Keras-CV, and Keras-NLP). 

For more information see: https://github.com/tensorflow/addons/issues/2807 

 The versions of TensorFlow you are currently using is 2.10.1 and is not supported. 
Some things might work, some things might not.
If you were to encounter a bug, do not file an issue.
If you want to make sure you're using a tested and supported configuration, either change the TensorFlow version or the TensorFlow Addons's version. 
You can find the compatibility matrix in TensorFlow Addon's readme:
https://github.com/tensorflow/addons
  from .autonotebook import tqdm as notebook_tqdm


In [3]:
# Class for loading image and text data

class ITM_DataLoader:
    BATCH_SIZE = 16
    IMAGE_SIZE = (224, 224)
    IMAGE_SHAPE = (224, 224, 3)
    SENTENCE_EMBEDDING_SHAPE = 384
    AUTOTUNE = tf.data.AUTOTUNE
    DATA_PATH = "D:\_GITHUB_\Image-Text-Matching\data"
    IMAGES_PATH = DATA_PATH + "/images"
    train_data_file = DATA_PATH + "/flickr8k.TrainImages.txt"
    dev_data_file = DATA_PATH + "/flickr8k.DevImages.txt"
    test_data_file = DATA_PATH + "/flickr8k.TestImages.txt"
    sentence_embeddings_file = (DATA_PATH + "/flickr8k.cmp9137.sentence_transformers.pkl")
    sentence_embeddings = {}
    train_ds = None
    val_ds = None
    test_ds = None

    def __init__(self):
        self.sentence_embeddings = self.load_sentence_embeddings()
        self.train_ds = self.load_classifier_data(self.train_data_file)
        self.val_ds = self.load_classifier_data(self.dev_data_file)
        self.test_ds = self.load_classifier_data(self.test_data_file)

    def load_sentence_embeddings(self):
        with open(self.sentence_embeddings_file, "rb") as f:
            return pickle.load(f)

    def process_input(self, img_path, dense_vector, text, label):
        img = tf.io.read_file(img_path)
        img = tf.image.decode_jpeg(img, channels=3)
        img = tf.image.resize(img, (224, 224))
        img = tf.cast(img, tf.float32) / 255.0
        return {'image_input': img, 'text_embedding': dense_vector, 'caption': text, 'file_name': img_path}, label

    def load_classifier_data(self, data_files):
        print("LOADING data from " + str(data_files))
        print("=========================================")
        image_data = []
        text_data = []
        embeddings_data = []
        label_data = []

        # get image, text, label of image_files
        with open(data_files) as f:
            lines = f.readlines()
            for line in lines:
                line = line.rstrip("\n")
                img_name, text, raw_label = line.split("	")
                img_name = os.path.join(self.IMAGES_PATH, img_name.strip())

                # get binary labels from match/no-match answers
                label = [1, 0] if raw_label == "match" else [0, 1]
                # print("I=%s T=%s _L=%s L=%s" % (img_name, text, raw_label, label))

                # get sentence embeddings (of textual captions)
                text_sentence_embedding = self.sentence_embeddings[text]
                text_sentence_embedding = tf.constant(text_sentence_embedding)

                image_data.append(img_name)
                embeddings_data.append(text_sentence_embedding)
                text_data.append(text)
                label_data.append(label)

        print("|image_data|=" + str(len(image_data)))
        print("|text_data|=" + str(len(text_data)))
        print("|label_data|=" + str(len(label_data)))

        # prepare a tensorflow dataset using the lists generated above
        dataset = tf.data.Dataset.from_tensor_slices(
            (image_data, embeddings_data, text_data, label_data)
        )
        dataset = dataset.shuffle(self.BATCH_SIZE * 8)
        dataset = dataset.map(self.process_input, num_parallel_calls=self.AUTOTUNE)
        dataset = dataset.batch(self.BATCH_SIZE).prefetch(self.AUTOTUNE)
        return dataset

# Main class for the Image-Text Matching (ITM) task

class ITM_Classifier(ITM_DataLoader):
    epochs = 10
    learning_rate = 3e-5
    class_names = {"match", "no-match"}
    num_classes = len(class_names)
    classifier_model = None
    history = None
    classifier_model_name = "ITM_Classifier-flickr"

    def __init__(self,num_projection_layers=1,projection_dims=128,dropout_rate=0.1,learning_rate=3e-5,):
        self.num_projection_layers = num_projection_layers
        self.projection_dims = projection_dims
        self.dropout_rate = dropout_rate
        self.learning_rate = learning_rate
        super().__init__()
        if not self.train_ds:
            raise Exception("Training dataset not initialized properly.")
        self.build_classifier_model()
        self.train_classifier_model()
        self.test_classifier_model()

    # return learnt feature representations of input data (images)
    def create_vision_encoder(
        self, num_projection_layers, projection_dims, dropout_rate
    ):
        img_input = layers.Input(shape=self.IMAGE_SHAPE, name="image_input")
        cnn_layer = layers.Conv2D(16, 3, padding="same", activation="relu")(img_input)
        cnn_layer = layers.MaxPooling2D()(cnn_layer)
        cnn_layer = layers.Conv2D(32, 3, padding="same", activation="relu")(cnn_layer)
        cnn_layer = layers.MaxPooling2D()(cnn_layer)
        cnn_layer = layers.Conv2D(64, 3, padding="same", activation="relu")(cnn_layer)
        cnn_layer = layers.MaxPooling2D()(cnn_layer)
        cnn_layer = layers.Dropout(dropout_rate)(cnn_layer)
        cnn_layer = layers.Flatten()(cnn_layer)
        outputs = self.project_embeddings(
            cnn_layer, num_projection_layers, projection_dims, dropout_rate
        )
        return img_input, outputs

    # return learnt feature representations based on dense layers, dropout, and layer normalisation
    def project_embeddings(
        self, embeddings, num_projection_layers, projection_dims, dropout_rate
    ):
        projected_embeddings = layers.Dense(units=projection_dims)(embeddings)
        for _ in range(num_projection_layers):
            x = tf.nn.gelu(projected_embeddings)
            x = layers.Dense(projection_dims)(x)
            x = layers.Dropout(dropout_rate)(x)
            x = layers.Add()([projected_embeddings, x])
            projected_embeddings = layers.LayerNormalization()(x)
        return projected_embeddings

    # return learnt feature representations of input data (text embeddings in the form of dense vectors)
    def create_text_encoder(self, num_projection_layers, projection_dims, dropout_rate):
        text_input = keras.Input(
            shape=self.SENTENCE_EMBEDDING_SHAPE, name="text_embedding"
        )
        outputs = self.project_embeddings(
            text_input, num_projection_layers, projection_dims, dropout_rate
        )
        return text_input, outputs

    # put together the feature representations above to create the image-text (multimodal) deep learning model
    def build_classifier_model(self):
        print(f"BUILDING model")
        img_input, vision_net = self.create_vision_encoder(
            num_projection_layers=self.num_projection_layers,
            projection_dims=self.projection_dims,
            dropout_rate=self.dropout_rate,
        )
        text_input, text_net = self.create_text_encoder(
            num_projection_layers=self.num_projection_layers,
            projection_dims=self.projection_dims,
            dropout_rate=self.dropout_rate,
        )
        net = tf.keras.layers.Concatenate(axis=1)([vision_net, text_net])
        net = tf.keras.layers.Dropout(self.dropout_rate)(net)
        net = tf.keras.layers.Dense(
            self.num_classes, activation="softmax", name=self.classifier_model_name
        )(net)
        self.classifier_model = tf.keras.Model(
            inputs=[img_input, text_input], outputs=net
        )
        self.classifier_model.summary()

    def train_classifier_model(self):
        print(f"TRAINING model")
        steps_per_epoch = tf.data.experimental.cardinality(self.train_ds).numpy()
        num_train_steps = steps_per_epoch * self.epochs
        num_warmup_steps = int(0.2 * num_train_steps)

        loss = tf.keras.losses.KLDivergence()
        metrics = tf.keras.metrics.BinaryAccuracy()
        optimizer = optimization.create_optimizer(
            init_lr=self.learning_rate,
            num_train_steps=num_train_steps,
            num_warmup_steps=num_warmup_steps,
            optimizer_type="adamw",
        )

        self.classifier_model.compile(optimizer=optimizer, loss=loss, metrics=metrics)

        # uncomment the next line if you wish to make use of early stopping during training
        # callbacks = [tf.keras.callbacks.EarlyStopping(patience=11, restore_best_weights=True)]

        self.history = self.classifier_model.fit(
            x=self.train_ds, validation_data=self.val_ds, epochs=self.epochs
        )  # , callbacks=callbacks)
        print("model trained!")

    def test_model_for_tuning(self):
        print("TESTING classifier model for tuning...")
        total_samples = 0
        total_correct_predictions = 0

        # Iterate over the test dataset
        for features, groundtruth in self.test_ds:
            predictions = self.classifier_model.predict(
                features
            )  # Get model predictions
            predicted_classes = tf.argmax(predictions, axis=1)
            actual_classes = tf.argmax(groundtruth, axis=1)

            # Calculate correct predictions
            correct_predictions = tf.reduce_sum(
                tf.cast(predicted_classes == actual_classes, tf.float32)
            )
            total_correct_predictions += correct_predictions.numpy()
            total_samples += groundtruth.shape[0]

            # Optionally print some predictions
            if (
                random.random() < 0.1
            ):  # Roughly 10% chance to print a batch's sample prediction
                sample_index = random.randint(0, groundtruth.shape[0] - 1)
                caption = features["caption"][sample_index].numpy().decode("utf-8")
                match_probability = predictions[sample_index][0]
                print(
                    f"Sample Caption: '{caption}', Match Probability: {match_probability:.4f}"
                )

        # Calculate and print the overall accuracy
        accuracy = total_correct_predictions / total_samples
        print(f"Overall Accuracy: {accuracy:.4f}")

        # Evaluate using TensorFlow's built-in metrics
        loss, tf_accuracy = self.classifier_model.evaluate(self.test_ds)
        print(f"TensorFlow Evaluate Loss: {loss:.4f}, Accuracy: {tf_accuracy:.4f}")

        return tf_accuracy

In [4]:
def create_model(trial):
    # Define hyperparameters
    num_projection_layers = trial.suggest_int('num_projection_layers', 1, 3)
    projection_dims = trial.suggest_int('projection_dims', 64, 256)
    dropout_rate = trial.suggest_uniform('dropout_rate', 0.1, 0.5)
    learning_rate = trial.suggest_loguniform('learning_rate', 1e-5, 1e-3)

    # Initialize the ITM Classifier with the suggested hyperparameters
    itm_classifier = ITM_Classifier(
        num_projection_layers=num_projection_layers, 
        projection_dims=projection_dims, 
        dropout_rate=dropout_rate, 
        learning_rate=learning_rate
    )
    return itm_classifier

def objective(trial):
    tf.keras.backend.clear_session()  # Clear the TensorFlow graph
    itm_classifier = create_model(trial)
    accuracy = itm_classifier.test_model_for_tuning()
    return 1 - accuracy

In [5]:
def main():
    study = optuna.create_study(direction='minimize')
    study.optimize(objective, n_trials=20)  # Set the number of trials based on your resource availability

    print('Best trial:')
    trial = study.best_trial

    print(f'  Value: {1 - trial.value}')
    print('  Params: ')
    for key, value in trial.params.items():
        print(f'    {key}: {value}')

In [6]:
if __name__ == "__main__":
    main()

[I 2024-04-13 14:45:35,882] A new study created in memory with name: no-name-789e1666-6bc1-437f-b986-2773c54f5416
  dropout_rate = trial.suggest_uniform('dropout_rate', 0.1, 0.5)
  learning_rate = trial.suggest_loguniform('learning_rate', 1e-5, 1e-3)


LOADING data from D:\_GITHUB_\Image-Text-Matching\data/flickr8k.TrainImages.txt
|image_data|=19386
|text_data|=19386
|label_data|=19386
LOADING data from D:\_GITHUB_\Image-Text-Matching\data/flickr8k.DevImages.txt
|image_data|=1164
|text_data|=1164
|label_data|=1164
LOADING data from D:\_GITHUB_\Image-Text-Matching\data/flickr8k.TestImages.txt
|image_data|=1161
|text_data|=1161
|label_data|=1161
BUILDING model
Model: "model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 image_input (InputLayer)       [(None, 224, 224, 3  0           []                               
                                )]                                                                
                                                                                                  
 conv2d (Conv2D)                (None, 224, 224, 16  448         ['image_inp

  inputs = self._flatten_to_reference_inputs(inputs)
