## Transfomer-based Encoder Decoder with Pretrained Dense121+CheXNet Weights

#### 1. Importing Libraries and Configuring the Session

In [None]:
import tensorflow as tf
import random
from collections import Counter
import os
import warnings
import pickle
from tqdm import tqdm
import numpy as np
import pandas as pd
import re
import nltk
from keras.layers import TextVectorization
import keras
from keras import layers, models
import matplotlib.pyplot as plt
from tensorflow.keras.layers import Reshape, Conv2D, GlobalAveragePooling2D, Dense
from tensorflow.keras.applications import DenseNet121
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, TensorBoard
import datetime

In [None]:
def configure_gpus():
    gpus = tf.config.list_physical_devices('GPU')
    if gpus:
        try:
            for gpu in gpus:
                tf.config.experimental.set_memory_growth(gpu, True)
            logical_gpus = tf.config.list_logical_devices('GPU')
            print(f"{len(gpus)} Physical GPU(s), {len(logical_gpus)} Logical GPU(s) configured.")
        except RuntimeError as e:
            print(f"RuntimeError in configuring GPUs: {e}")
    else:
        print("No GPU is available.")

def seed_everything(seed=0):
    random.seed(seed)
    np.random.seed(seed)
    tf.random.set_seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    os.environ['TF_DETERMINISTIC_OPS'] = '1'

def check_jupyter_notebook():
    try:
        cfg = get_ipython().config 
        print("Jupyter Notebook environment detected. Configuring...")
        cfg.NotebookApp.iopub_msg_rate_limit = 10000.0
        cfg.NotebookApp.rate_limit_window = 5.0
    except NameError:
        print("Not running in a Jupyter Notebook environment.")

# Clear TensorFlow session and suppress warnings
tf.keras.backend.clear_session()
warnings.filterwarnings("ignore")

# GPU configuration and seed setting
configure_gpus()
seed_everything()

# Check if running in Jupyter Notebook and configure
check_jupyter_notebook()

# Initialize TensorFlow distributed strategy
strategy = tf.distribute.MirroredStrategy()

#### 2. Train, Test and Validation Data Preparation (Image Paths, Pre-processed Captions)

In [None]:
with strategy.scope():
    class CaptionDataProcessor:
        def __init__(self, train_csv, valid_csv, train_images_path, valid_images_path):
            self.train_csv = train_csv
            self.valid_csv = valid_csv
            self.train_images_path = train_images_path
            self.valid_images_path = valid_images_path
            self.image_caption_pairs = {}
            self.caption_ls = []
            self.vocab_list = set()
            self.cap_len = []

        def load_data(self):
            df_train = pd.read_csv(self.train_csv, delimiter='\t')
            df_valid = pd.read_csv(self.valid_csv, delimiter='\t')
            self.process_data(df_train, self.train_images_path, "Training Image and Caption Data")
            self.process_data(df_valid, self.valid_images_path, "Validation Image and Caption Data")

        def process_caption(self, text):
            # Lowercase the caption label
            caption_labels_lower = text.lower()
            caption_labels_lower = caption_labels_lower.strip()

            # Replace hyphens with spaces
            caption_labels_clean = re.sub(r'-', ' ', caption_labels_lower)

            # Remove special characters using regular expressions
            caption_labels_clean = re.sub(r'[^a-zA-Z0-9\s]', '', caption_labels_clean)

            # Tokenize the caption label into words
            words = caption_labels_clean.split()

            # Remove numbers, words containing any numeric values
            processed_words = [word for word in words if not word.isdigit() and not any(char.isdigit() for char in word)]

            # Join the processed words back into a sentence
            caption_labels_processed = ' '.join(processed_words)
            caption_labels_processed = '<START> '+caption_labels_processed+ ' <END>'
            return caption_labels_processed

        def process_data(self, df, image_path_prefix, desc):
            for index, row in tqdm(df.iterrows(), total=df.shape[0], desc=desc):
                img_id, img_caption = row['ID'], row['caption']
                img_path = os.path.join(image_path_prefix, str(img_id) + '.jpg')
                caption = self.process_caption(img_caption)

                if os.path.exists(img_path):
                    if img_path not in self.image_caption_pairs:
                        self.image_caption_pairs[img_path] = []

                    self.image_caption_pairs[img_path].append(caption)
                    self.caption_ls.append(caption)
                    self.vocab_list.update(caption.split(' '))
                    words = caption.split(' ')
                    self.cap_len.append(len(words))
                else:
                    print(f"File not found: {img_path}")



        def save_datasets(self, save_dir):
            """Saves the main dataset and the train, validation, and test splits to files."""
            try:
                os.makedirs(save_dir, exist_ok=True)
                self._save_dataset(os.path.join(save_dir, 'main_dataset.pkl'), self.image_caption_pairs)
                self._save_dataset(os.path.join(save_dir, 'train.pkl'), self.training_data)
                self._save_dataset(os.path.join(save_dir, 'valid.pkl'), self.validation_data)
                self._save_dataset(os.path.join(save_dir, 'test.pkl'), self.test_data)
                print("Datasets saved successfully.")
            except Exception as e:
                print(f"Error saving datasets: {e}")

        def _save_dataset(self, file_path, dataset):
            """Helper method to save a single dataset to a file."""
            with open(file_path, 'wb') as file:
                pickle.dump(dataset, file)
        
        
        
        def train_val_test_split(self, train_size=0.8, val_size=0.15, shuffle=True):
            all_images = list(self.image_caption_pairs.keys())
            if shuffle:
                np.random.shuffle(all_images)

            total_size = len(self.image_caption_pairs)
            train_end = int(total_size * train_size)
            val_end = train_end + int(total_size * val_size)

            # Properly set the class attributes for the splits
            self.training_data = {img_name: self.image_caption_pairs[img_name] for img_name in all_images[:train_end]}
            self.validation_data = {img_name: self.image_caption_pairs[img_name] for img_name in all_images[train_end:val_end]}
            self.test_data = {img_name: self.image_caption_pairs[img_name] for img_name in all_images[val_end:]}

            # Return the splits for external use as well
            return self.training_data, self.validation_data, self.test_data

    processor = CaptionDataProcessor(
        'ImageCLEFmedical_Caption_2023_caption_prediction_train_labels.csv',
        'ImageCLEFmedical_Caption_2023_caption_prediction_valid_labels.csv',
        'ImageCLEFmedical_Caption_2023_train_images',
        'ImageCLEFmedical_Caption_2023_valid_images'
    )
    processor.load_data()
    train_data, valid_data, test_data = processor.train_val_test_split()
    processor.save_datasets('pickles_clef23')

    print("Number of records in the main dataset: ", len(processor.image_caption_pairs))
    print("Number of training samples: ", len(train_data))
    print("Number of validation samples: ", len(valid_data))
    print("Number of test samples: ", len(test_data))

In [None]:
def display_sample_data(data, title, sample_count=3):
    """Displays a sample of image paths and their corresponding captions.

    Args:
        data (dict): The dataset containing image paths as keys and captions as values.
        title (str): Title to describe the dataset being displayed.
        sample_count (int): Number of samples to display.
    """
    print(f"--- {title} ---")
    image_paths = list(data.keys())
    captions = list(data.values())

    for i in range(sample_count):
        print(f"\nImage Path {i + 1}: {image_paths[i]}")
        print(f"Pre-processed Caption {i + 1}: {captions[i]}")
    
    print("-------------------------------------------------------------------------\n")

# Display sample data from train, validation, and test datasets
display_sample_data(train_data, "Sample Train Data")
display_sample_data(valid_data, "Sample Validation Data")
display_sample_data(test_data, "Sample Test Data")

#### 3. Max Length Determination with Other Hyperparameters

In [None]:
##Finding the Max Length

data = pd.read_pickle('pickles_clef23/main_dataset.pkl')

# Calculate frequencies
frequency = Counter(processor.cap_len)
total_items = sum(frequency.values())
sorted_frequency = sorted(frequency.items(), key=lambda x: x[1], reverse=True)


# print("Frequency of numbers in descending order:")
# for num, freq in sorted_frequency:
#     print(f"Number {num}: Frequency {freq}")


# Calculate cumulative frequency and find the 90% point
cumulative = 0
ninety_percent_mark = total_items * 0.99
for num, freq in sorted_frequency:
    cumulative += freq
    if cumulative >= ninety_percent_mark:
        print(f"99% images have maximum length of caption is: {num}")
        print ("This should be the max length")
        break

In [None]:
IMAGE_SIZE = (224, 224)
VOCAB_SIZE = len(processor.vocab_list)
SEQ_LENGTH = num

# Dimension for the image embeddings and token embeddings
EMBED_DIM = 512

# Per-layer units in the feed-forward network
FF_DIM = 512

# Other training parameters
BATCH_SIZE = 128
EPOCHS = 50

#### 4. Text Vectorization

In [None]:
with strategy.scope():
    def custom_standardization(input_string):
        lowercase = tf.strings.lower(input_string)
        return tf.strings.regex_replace(lowercase, "[%s]" % re.escape(strip_chars), "")


    strip_chars = "!\"#$%&'()*+,-./:;<=>?@[\]^_`{|}~"
    strip_chars = strip_chars.replace("<", "")
    strip_chars = strip_chars.replace(">", "")


    vectorization = TextVectorization(
        max_tokens=VOCAB_SIZE,
        output_mode="int",
        output_sequence_length=SEQ_LENGTH,
        standardize=custom_standardization,
    )
    vectorization.adapt(processor.caption_ls)

In [None]:
# Example captions
example_captions = processor.caption_ls[:3]

# Vectorize the example captions
vectorized_captions = vectorization(example_captions)

# Print the original and vectorized captions
for original, vectorized in zip(example_captions, vectorized_captions.numpy()):
    print("Original:", original)
    print("Vectorized:", vectorized)

#### 5. Data Input Pipeline

In [None]:
with strategy.scope():
    class DatasetBuilder:
        def __init__(self, vectorization_layer, batch_size, img_size, autotune=tf.data.AUTOTUNE):
            self.vectorization_layer = vectorization_layer
            self.batch_size = batch_size
            self.img_size = img_size
            self.autotune = autotune

        def decode_and_resize(self, img_path):
            img = tf.io.read_file(img_path)
            img = tf.image.decode_jpeg(img, channels=3)
            img = tf.image.resize(img, self.img_size)
            img = tf.cast(img, tf.float32) / 255.0
            return img

        def process_input(self, img_path, caption):
            img = self.decode_and_resize(img_path)
            caption = tf.expand_dims(caption, 0)
            caption = self.vectorization_layer(caption)
            return img, tf.squeeze(caption, 0)

        def make_dataset(self, image_paths, captions):
            dataset = tf.data.Dataset.from_tensor_slices((image_paths, captions))
            dataset = dataset.shuffle(buffer_size=len(image_paths))
            dataset = dataset.map(self.process_input, num_parallel_calls=self.autotune)
            dataset = dataset.batch(self.batch_size).prefetch(self.autotune)
            return dataset


    dataset_builder = DatasetBuilder(vectorization,BATCH_SIZE,IMAGE_SIZE)

    # Assuming 'train_data', 'valid_data', and 'test_data' are available
    train_dataset = dataset_builder.make_dataset(list(train_data.keys()), list(train_data.values()))
    valid_dataset = dataset_builder.make_dataset(list(valid_data.keys()), list(valid_data.values()))
    test_dataset = dataset_builder.make_dataset(list(test_data.keys()), list(test_data.values()))

    # To print example data
    for img, caption in train_dataset.take(1):
        print("Image shape:", img.numpy().shape)

        # Display the first image in the batch
        plt.imshow(img.numpy()[0])
        plt.title("Sample Image")
        plt.show()

        print("Caption shape:", caption.numpy().shape)
        print("Caption:", caption.numpy()[0])

#### 6. Dense121 with ChexNet Weights for Extracting Image Features

In [None]:
chexnet_weights = "brucechou1983_CheXNet_Keras_0.3.0_weights.h5"

with strategy.scope():
    def get_cnn_model():
        base_model = DenseNet121(include_top=False, input_shape=(*IMAGE_SIZE, 3))
        x = base_model.output
        x = GlobalAveragePooling2D()(x)
        x = Dense(14, activation="sigmoid", name="chexnet_output")(x)
        chexnet = tf.keras.Model(inputs = base_model.input,outputs = x)
        chexnet.load_weights(chexnet_weights)
        x = chexnet.get_layer('relu').output
        x = Conv2D(2048, (1, 1), padding='same', activation='relu')(x)
        x = Reshape((49, 2048))(x)
        cnn_model = tf.keras.Model(inputs=base_model.input, outputs=x)
        return cnn_model
#cnn_model.summary()

# for layer in base_model.layers:
#     layer.trainable = False
# for layer in cnn_model.layers:
#     print(layer.name, layer.trainable)

#### 7. Transformer Encoder Block

In [None]:
with strategy.scope():
    class TransformerEncoderBlock(layers.Layer):
        def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
            super().__init__(**kwargs)
            self.embed_dim = embed_dim
            self.dense_dim = dense_dim
            self.num_heads = num_heads
            self.attention_1 = layers.MultiHeadAttention(
                num_heads=num_heads, key_dim=embed_dim, dropout=0.0
            )
            self.layernorm_1 = layers.LayerNormalization()
            self.layernorm_2 = layers.LayerNormalization()
            self.dense_1 = layers.Dense(embed_dim, activation="relu")

        def call(self, inputs, training, mask=None):
            #print("Encoder input shape:", tf.shape(inputs))
            inputs = self.layernorm_1(inputs)
            inputs = self.dense_1(inputs)

            attention_output_1 = self.attention_1(
                query=inputs,
                value=inputs,
                key=inputs,
                attention_mask=None,
                training=training,
            )
            out_1 = self.layernorm_2(inputs + attention_output_1)
            #print("Encoder output shape:", tf.shape(out_1))
            return out_1

#### 8. Positional Encoding

In [None]:
with strategy.scope():
    class PositionalEmbedding(layers.Layer):
        def __init__(self, sequence_length, vocab_size, embed_dim, **kwargs):
            super().__init__(**kwargs)
            self.token_embeddings = layers.Embedding(
                input_dim=vocab_size, output_dim=embed_dim
            )
            self.position_embeddings = layers.Embedding(
                input_dim=sequence_length, output_dim=embed_dim
            )
            self.sequence_length = sequence_length
            self.vocab_size = vocab_size
            self.embed_dim = embed_dim
            self.embed_scale = tf.math.sqrt(tf.cast(embed_dim, tf.float32))

        def call(self, inputs):
            #print("Positional Embedding input shape:", tf.shape(inputs))
            length = tf.shape(inputs)[-1]
            positions = tf.range(start=0, limit=length, delta=1)
            embedded_tokens = self.token_embeddings(inputs)
            embedded_tokens = embedded_tokens * self.embed_scale
            embedded_positions = self.position_embeddings(positions)
            final_embeddings = embedded_tokens + embedded_positions

            # Checking the shape of the output tensor
            #print("Positional Embedding Output shape:", tf.shape(final_embeddings))

            return final_embeddings

        def compute_mask(self, inputs, mask=None):
            return tf.math.not_equal(inputs, 0)

#### 9. Transformer Decoder Block

In [None]:
with strategy.scope():
    class TransformerDecoderBlock(layers.Layer):
        def __init__(self, embed_dim, ff_dim, num_heads, **kwargs):
            super().__init__(**kwargs)
            self.embed_dim = embed_dim
            self.ff_dim = ff_dim
            self.num_heads = num_heads
            self.attention_1 = layers.MultiHeadAttention(
                num_heads=num_heads, key_dim=embed_dim, dropout=0.1
            )
            self.attention_2 = layers.MultiHeadAttention(
                num_heads=num_heads, key_dim=embed_dim, dropout=0.1
            )
            self.ffn_layer_1 = layers.Dense(ff_dim, activation="relu")
            self.ffn_layer_2 = layers.Dense(embed_dim)

            self.layernorm_1 = layers.LayerNormalization()
            self.layernorm_2 = layers.LayerNormalization()
            self.layernorm_3 = layers.LayerNormalization()

            self.embedding = PositionalEmbedding(
                embed_dim=EMBED_DIM,
                sequence_length=SEQ_LENGTH,
                vocab_size=VOCAB_SIZE,
            )
            self.out = layers.Dense(VOCAB_SIZE, activation="softmax")

            self.dropout_1 = layers.Dropout(0.3)
            self.dropout_2 = layers.Dropout(0.5)
            self.supports_masking = True

        def call(self, inputs, encoder_outputs, training, mask=None):
            # Print input shapes
            #print("Decoder input shape:", tf.shape(inputs))
            #print("Encoder output shape:", tf.shape(encoder_outputs))
            inputs = self.embedding(inputs)
            #print("Shape after embedding in Decoder Block:", tf.shape(inputs))
            causal_mask = self.get_causal_attention_mask(inputs)

            if mask is not None:
                padding_mask = tf.cast(mask[:, :, tf.newaxis], dtype=tf.int32)
                combined_mask = tf.cast(mask[:, tf.newaxis, :], dtype=tf.int32)
                combined_mask = tf.minimum(combined_mask, causal_mask)

            attention_output_1 = self.attention_1(
                query=inputs,
                value=inputs,
                key=inputs,
                attention_mask=combined_mask,
                training=training,
            )
            out_1 = self.layernorm_1(inputs + attention_output_1)
            #print("Shape after first attention:", tf.shape(out_1))

            attention_output_2 = self.attention_2(
                query=out_1,
                value=encoder_outputs,
                key=encoder_outputs,
                attention_mask=padding_mask,
                training=training,
            )
            out_2 = self.layernorm_2(out_1 + attention_output_2)
            #print("Shape after second attention:", tf.shape(out_2))
            ffn_out = self.ffn_layer_1(out_2)
            ffn_out = self.dropout_1(ffn_out, training=training)
            ffn_out = self.ffn_layer_2(ffn_out)

            ffn_out = self.layernorm_3(ffn_out + out_2, training=training)
            ffn_out = self.dropout_2(ffn_out, training=training)
            preds = self.out(ffn_out)
            #print("Decoder final output shape:", tf.shape(preds))
            return preds

        def get_causal_attention_mask(self, inputs):
            input_shape = tf.shape(inputs)
            batch_size, sequence_length = input_shape[0], input_shape[1]
            i = tf.range(sequence_length)[:, tf.newaxis]
            j = tf.range(sequence_length)
            mask = tf.cast(i >= j, dtype="int32")
            mask = tf.reshape(mask, (1, input_shape[1], input_shape[1]))
            mult = tf.concat(
                [
                    tf.expand_dims(batch_size, -1),
                    tf.constant([1, 1], dtype=tf.int32),
                ],
                axis=0,
            )
            return tf.tile(mask, mult)

#### 10. Image Captioning Model

In [None]:
with strategy.scope():
    class ImageCaptioningModel(keras.Model):
        def __init__(
            self,
            cnn_model,
            encoder,
            decoder,
            image_aug=None,
        ):
            super().__init__()
            self.cnn_model = cnn_model
            self.encoder = encoder
            self.decoder = decoder
            self.loss_tracker = keras.metrics.Mean(name="loss")
            self.acc_tracker = keras.metrics.Mean(name="accuracy")
            self.image_aug = image_aug

        def calculate_loss(self, y_true, y_pred, mask):
            loss = self.loss(y_true, y_pred)
            mask = tf.cast(mask, dtype=loss.dtype)
            loss *= mask
            return tf.reduce_sum(loss) / tf.reduce_sum(mask)

        def calculate_accuracy(self, y_true, y_pred, mask):
            accuracy = tf.equal(y_true, tf.argmax(y_pred, axis=2))
            accuracy = tf.math.logical_and(mask, accuracy)
            accuracy = tf.cast(accuracy, dtype=tf.float32)
            mask = tf.cast(mask, dtype=tf.float32)
            return tf.reduce_sum(accuracy) / tf.reduce_sum(mask)

        def _compute_caption_loss_and_acc(self, img_embed, batch_seq, training=True):
            # Process a single caption per image
            encoder_out = self.encoder(img_embed, training=training)
            batch_seq_inp = batch_seq[:, :-1]
            batch_seq_true = batch_seq[:, 1:]
            mask = tf.math.not_equal(batch_seq_true, 0)
            batch_seq_pred = self.decoder(
                batch_seq_inp, encoder_out, training=training, mask=mask
            )
            loss = self.calculate_loss(batch_seq_true, batch_seq_pred, mask)
            acc = self.calculate_accuracy(batch_seq_true, batch_seq_pred, mask)
            #print("Shape of encoder output:", tf.shape(encoder_out))
            #print("Shape of predicted sequence:", tf.shape(batch_seq_pred))
            return loss, acc

        def train_step(self, batch_data):
            batch_img, batch_seq = batch_data

            # Print shapes of inputs
            #print("Shape of batch_img:", tf.shape(batch_img))
            #print("Shape of batch_seq:", tf.shape(batch_seq))

            if self.image_aug:
                batch_img = self.image_aug(batch_img)
            img_embed = self.cnn_model(batch_img)
            #print("Shape of img_embed:", tf.shape(img_embed))
            with tf.GradientTape() as tape:
                loss, acc = self._compute_caption_loss_and_acc(
                    img_embed, batch_seq, training=True
                )
                self.loss_tracker.update_state(loss)
                self.acc_tracker.update_state(acc)

            train_vars = self.encoder.trainable_variables + self.decoder.trainable_variables
            grads = tape.gradient(loss, train_vars)
            self.optimizer.apply_gradients(zip(grads, train_vars))

            return {"loss": self.loss_tracker.result(), "acc": self.acc_tracker.result()}

        def test_step(self, batch_data):
            batch_img, batch_seq = batch_data
            # Print shapes of inputs
            #print("Shape of batch_img (test):", tf.shape(batch_img))
            #print("Shape of batch_seq (test):", tf.shape(batch_seq))
            img_embed = self.cnn_model(batch_img)
            #print("Shape of img_embed (test):", tf.shape(img_embed))
            loss, acc = self._compute_caption_loss_and_acc(
                img_embed, batch_seq, training=False
            )
            self.loss_tracker.update_state(loss)
            self.acc_tracker.update_state(acc)
            return {"loss": self.loss_tracker.result(), "acc": self.acc_tracker.result()}
        @property
        def metrics(self):
            # We need to list our metrics here so the `reset_states()` can be
            # called automatically.
            return [self.loss_tracker, self.acc_tracker]

#### 11. Building Final Model

In [None]:
with strategy.scope():
    cnn_model = get_cnn_model()
    encoder = TransformerEncoderBlock(embed_dim=EMBED_DIM, dense_dim=FF_DIM, num_heads=1)
    decoder = TransformerDecoderBlock(embed_dim=EMBED_DIM, ff_dim=FF_DIM, num_heads=2)
    caption_model = ImageCaptioningModel(
        cnn_model=cnn_model,
        encoder=encoder,
        decoder=decoder,
    )

In [None]:
with strategy.scope():

    # Define the loss function
    cross_entropy = keras.losses.SparseCategoricalCrossentropy(
        from_logits=False,
        reduction=tf.keras.losses.Reduction.NONE,
    )

    # EarlyStopping criteria
    early_stopping = keras.callbacks.EarlyStopping(patience=5, restore_best_weights=True)

    

    # Learning Rate Scheduler for the optimizer
    class LRSchedule(tf.keras.optimizers.schedules.LearningRateSchedule):
        def __init__(self, post_warmup_learning_rate, warmup_steps):
            super().__init__()
            self.post_warmup_learning_rate = post_warmup_learning_rate
            self.warmup_steps = warmup_steps

        def __call__(self, step):
            global_step = tf.cast(step, tf.float32)
            warmup_steps = tf.cast(self.warmup_steps, tf.float32)
            warmup_progress = global_step / warmup_steps
            warmup_learning_rate = self.post_warmup_learning_rate * warmup_progress
            return tf.cond(
                global_step < warmup_steps,
                lambda: warmup_learning_rate,
                lambda: self.post_warmup_learning_rate,
            )


    # Create a learning rate schedule
    num_train_steps = len(train_dataset) * EPOCHS
    num_warmup_steps = num_train_steps // 15
    lr_schedule = LRSchedule(post_warmup_learning_rate=1e-4, warmup_steps=num_warmup_steps)

    # Compile the model
    caption_model.compile(optimizer=tf.keras.optimizers.Adam(lr_schedule), loss=cross_entropy)

#### 12. Training the Final Model

In [None]:
with strategy.scope():
    history = caption_model.fit(
        train_dataset,
        epochs=EPOCHS,
        validation_data=valid_dataset,
        callbacks=[early_stopping],
    )

#### 13. Loss and Accuracy Presentation

In [None]:
# Extracting loss and other metrics from the history object
loss = history.history['loss']
val_loss = history.history['val_loss']

# Update these lines with the correct keys
acc = history.history['acc']  # Assuming 'acc' is the correct key for accuracy
val_acc = history.history['val_acc']  # And 'val_acc' for validation accuracy

epochs = range(1, len(loss) + 1)

# Plotting training and validation loss
plt.figure(figsize=(12, 6))
plt.subplot(1, 2, 1)
plt.plot(epochs, loss, 'b-', label='Training Loss')
plt.plot(epochs, val_loss, 'r-', label='Validation Loss')
plt.title('Training and Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()

# Plotting training and validation accuracy
plt.subplot(1, 2, 2)
plt.plot(epochs, acc, 'b-', label='Training Accuracy')
plt.plot(epochs, val_acc, 'r-', label='Validation Accuracy')
plt.title('Training and Validation Accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()

plt.show()

#### 14. Test Dataset Predicted Outputs based on Greedy Search

In [None]:
for img, caption in test_dataset.take(1):
    sample_img = img.numpy()[0]
    print("Image shape:", sample_img.shape)
    plt.imshow(sample_img)
    plt.title("Sample Image")
    plt.show()
    image = tf.expand_dims(sample_img, 0)
    print("Image shape after reshaping:", image.shape)
    cnn_img = caption_model.cnn_model(image)
    encoded_img = caption_model.encoder(cnn_img, training=False)

    vocab = vectorization.get_vocabulary()
    index_lookup = dict(zip(range(len(vocab)), vocab))
    max_decoded_sentence_length = SEQ_LENGTH - 1

    sample_cap = caption.numpy()[0]
    print("Caption shape:", sample_cap.shape)
    print("Caption Tokens:", sample_cap)
    
    # Convert each token in sample_cap to the corresponding word and join them
    actual_caption = " ".join([index_lookup.get(token, '') for token in sample_cap if token != 0])
    print("Ground Truth Caption:", actual_caption)

    decoded_caption = "<start> "

    for i in range(max_decoded_sentence_length):
        tokenized_caption = vectorization([decoded_caption])[:, :-1]
        mask = tf.math.not_equal(tokenized_caption, 0)
        predictions = caption_model.decoder(
            tokenized_caption, encoded_img, training=False, mask=mask
        )
        sampled_token_index = np.argmax(predictions[0, i, :])
        sampled_token = index_lookup[sampled_token_index]
        if sampled_token == "<end>":
            break
        decoded_caption += " " + sampled_token

    decoded_caption = decoded_caption.replace("<start> ", "")
    decoded_caption = decoded_caption.replace(" <end>", "").strip()
    print("Predicted Caption:", decoded_caption)

In [None]:
from tqdm import tqdm

predicted_captions = []
actual_captions = []


# Assuming test_dataset is a tf.data.Dataset and each element is a tuple (image, caption, image_id)
for img, caption in tqdm(test_dataset, desc="Processing Test Dataset"):
    for idx in range(img.shape[0]):  # Loop through the batch
        sample_img = img.numpy()[idx]
        image = tf.expand_dims(sample_img, 0)
        cnn_img = caption_model.cnn_model(image)
        encoded_img = caption_model.encoder(cnn_img, training=False)

        vocab = vectorization.get_vocabulary()
        index_lookup = dict(zip(range(len(vocab)), vocab))
        max_decoded_sentence_length = SEQ_LENGTH - 1

        sample_cap = caption.numpy()[idx]
        actual_caption = " ".join([index_lookup.get(token, '') for token in sample_cap if token != 0]).replace("<start>", "").replace("<end>", "").strip()
        actual_captions.append(actual_caption)

        decoded_caption = "<start> "
        for i in range(max_decoded_sentence_length):
            tokenized_caption = vectorization([decoded_caption])[:, :-1]
            mask = tf.math.not_equal(tokenized_caption, 0)
            predictions = caption_model.decoder(
                tokenized_caption, encoded_img, training=False, mask=mask
            )
            sampled_token_index = np.argmax(predictions[0, i, :])
            sampled_token = index_lookup[sampled_token_index]
            if sampled_token == "<end>":
                break
            decoded_caption += " " + sampled_token

        decoded_caption = decoded_caption.replace("<start> ", "").replace("<end>", "").strip()
        predicted_captions.append(decoded_caption)

#### 15. Evaluation Metric

In [None]:
def get_bleu(reference,prediction):
    """
    Given a reference and prediction string, outputs the 1-gram,2-gram,3-gram and 4-gram bleu scores
    """
    reference = [reference.split()] #should be in an array (cos of multiple references can be there here only 1)
    prediction = prediction.split()
    bleu1 = sentence_bleu(reference,prediction,weights = (1,0,0,0))
    bleu2 = sentence_bleu(reference,prediction,weights = (0.5,0.5,0,0))
    bleu3 = sentence_bleu(reference,prediction,weights = (0.33,0.33,0.33,0))
    bleu4 = sentence_bleu(reference,prediction,weights = (0.25,0.25,0.25,0.25))

    return bleu1,bleu2,bleu3,bleu4

In [None]:
from nltk.translate.bleu_score import sentence_bleu
def mean_bleu(pred_ls, act_ls, **kwargs):

    bleu1, bleu2, bleu3, bleu4 = [], [], [], []

    for k in range (len(pred_ls)):
        # Tokenize the true and predicted captions
        true_tokens = act_ls[k]
        #print ("True: ",true_tokens)
        predict_tokens = pred_ls[k]
        #print ("Predicted: ",predict_tokens)
        
        # Calculate BLEU scores
        bleu1.append(sentence_bleu([true_tokens], predict_tokens, weights=(1, 0, 0, 0)))
        bleu2.append(sentence_bleu([true_tokens], predict_tokens, weights=(0.5, 0.5, 0, 0)))
        bleu3.append(sentence_bleu([true_tokens], predict_tokens, weights=(0.33, 0.33, 0.33, 0)))
        bleu4.append(sentence_bleu([true_tokens], predict_tokens, weights=(0.25, 0.25, 0.25, 0.25)))

    return np.array(bleu1).mean(), np.array(bleu2).mean(), np.array(bleu3).mean(), np.array(bleu4).mean()
bleu1,bleu2,bleu3,bleu4 = mean_bleu(predicted_captions,actual_captions)
print ("Bleu Score 1-gram: ",bleu1)
print ("Bleu Score 2-gram: ",bleu2)
print ("Bleu Score 3-gram: ",bleu3)
print ("Bleu Score 4-gram: ",bleu4)

In [None]:
from rouge import Rouge

def calculate_rouge_scores(predictions, references):
    rouge = Rouge()
    scores = rouge.get_scores(predictions, references, avg=True)
    return scores

rouge_scores = calculate_rouge_scores(predicted_captions, actual_captions)
print(rouge_scores)

    ROUGE-1:
        r: 19.85% (This means 19.85% of the words in the reference appear in the generated text)
        p: 26.59% (This means 26.59% of the words in the generated text are also in the reference)
        f: 21.08% (This is the F1-score, balancing precision and recall for unigrams)

    ROUGE-2:
        r: 6.10% (Lower recall for bigrams suggests less exact matches of word pairs)
        p: 8.59% (Precision for bigrams is also lower, indicating less exact pairings of words)
        f: 6.47% (The F1-score for bigrams is significantly lower than for unigrams)

    ROUGE-L:
        r: 18.00% (Recall for the longest common subsequence)
        p: 23.81% (Precision for the longest common subsequence)
        f: 18.98% (F1-score for the longest common subsequence)

In [None]:
from nltk.translate.meteor_score import meteor_score
import numpy as np

# Tokenize the captions
# Assuming actual_captions and predicted_captions are lists of strings
actual_captions_tokenized = [[act.split()] for act in actual_captions]  # List of lists of lists for references
predicted_captions_tokenized = [pred.split() for pred in predicted_captions]  # List of lists for hypothesis

# Calculate METEOR scores
meteor_scores = [meteor_score(ref, pred) for ref, pred in zip(actual_captions_tokenized, predicted_captions_tokenized)]
mean_meteor_score = np.mean(meteor_scores)

print(f"Mean METEOR score: {mean_meteor_score}")