# CNN + LSTM HyperModel V1
This hypermodel was created to help me select the best architecture so that I could improve the results of my image captioning model

### Hyperparameters Tuned
1. CNN feature extractor
2. Sequence length
3. Vocab size

In [1]:
import tensorflow as tf
import numpy as np
import random
import datetime

import os

from tensorflow.keras.layers import Lambda

from pickle import load
from math import ceil
from numpy.random import choice
from keras import Model
from keras.applications import ResNet152V2
from keras.layers import Input, Dense, LSTM, Embedding, Add, Dropout
from keras.optimizers import Adam
from keras.losses import SparseCategoricalCrossentropy
from keras.callbacks import EarlyStopping, TensorBoard
import matplotlib.pyplot as plt

from functions.text_processing import create_vocab_mappings
from functions.training import data_generator
from functions.model_evaluation import evaluate_captions, generate_caption
from functions.testing import generate_and_evaluate_test_caption
from keras.utils import plot_model

from keras.layers import TextVectorization # type: ignore


from keras.layers import Layer
from keras.applications import VGG16, VGG19, ResNet50, ResNet152, ResNet50V2, ResNet152V2, InceptionV3, InceptionResNetV2, EfficientNetB0, EfficientNetB1, EfficientNetB2, NASNetMobile, ConvNeXtTiny, ConvNeXtBase, DenseNet121, DenseNet201   # type: ignore
from keras.applications import vgg16, vgg19, resnet, resnet_v2, inception_v3, inception_resnet_v2, efficientnet, nasnet, convnext, densenet


from keras.models import Model
from keras.layers import Input, Dense, LSTM, Dropout, Embedding, Add, LayerNormalization, GRU, Bidirectional
from keras.optimizers import Adam, RMSprop
from keras.losses import SparseCategoricalCrossentropy
from keras.regularizers import L1, L2, L1L2
from keras_tuner import HyperModel, HyperParameters, RandomSearch

from keras.initializers import HeUniform, Orthogonal, GlorotUniform

from functions.dataset_loading import load_flicker8k_split
from functions.text_processing import create_vocab_mappings
from functions.training import data_generator
from functions.model_evaluation import evaluate_captions#

from functions.dataset_loading import load_flicker8k_captions, load_flicker8k_split
from functions.text_processing import custom_standardization, vectorize_captions, create_vocab_mappings
from functions.dataset_loading import create_embedding_matrix
from functions.image_processing import batch_extract_features
from functions.model_evaluation import batch_generate_captions, evaluate_captions

from tensorflow.data import Dataset, AUTOTUNE  # type: ignore


[nltk_data] Downloading package stopwords to
[nltk_data]     C:\Users\james\AppData\Roaming\nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


In [2]:
SEQ_LENGTH = 20
HIDDEN_DIM = 1024
BATCH_SIZE = 64
VOCAB_SIZE = 4096
DIR = 'preprocessed_data/flicker8k/'

# Hypermodel
Creating a model for chosing the architecture for the image captioning model

The parameters that I will be experimenting with are: 
1. Pre-trained CNN
2. Embedding dimension

In [3]:
import time
import json



In [4]:
from keras.callbacks import Callback
from nltk import  meteor_score


class METEORCallback(Callback):
    def __init__(self, model, val_data, idx_to_word, batch_size, steps, seq_length):
        super().__init__()
        self.model = model
        self.val_data = val_data
        self.idx_to_word = idx_to_word
        self.batch_size = batch_size
        self.steps = steps
        self.seq_length = seq_length
        
    def generate_caption(self, image):
        input_seq = np.zeros((1, self.seq_length))
        input_seq[0, 0] = self.word_to_idx['<start>']
        
        for i in range(self.seq_length - 1):
            predictions = self.model.predict([image[np.newaxis, ...], input_seq], verbose=0)
            predicted_id = np.argmax(predictions[0, i])
            input_seq[0, i + 1] = predicted_id
            
            if self.idx_to_word[predicted_id] == '<end>':
                break
                
        generated_words = [self.idx_to_word[idx] for idx in input_seq[0] 
             if self.idx_to_word[idx] not in ['<start>', '<end>', '<pad>']]
        return ' '.join(generated_words)
    
    def calculate_meteor(self):
        meteor_scores = []
        
        for _ in range(self.steps):
            batch = next(self.val_data)
            images, _, true_captions = batch
            
            for i in range(len(images)):
                generated_caption = self.generate_caption(images[i])
                reference_captions = true_captions[i]
                
                # Calculate METEOR score
                score = meteor_score([reference_captions], generated_caption)
                meteor_scores.append(score)
        
        return np.mean(meteor_scores)
    
    def on_epoch_end(self, epoch, logs=None):
        meteor_score = self.calculate_meteor()
        logs['meteor_score'] = meteor_score
        print(f'\nEpoch {epoch + 1} - METEOR Score: {meteor_score:.4f}')

In [5]:
from tensorflow import expand_dims
from numpy import argmax
from tensorflow.keras.utils import pad_sequences
from functions.model_evaluation import decode_caption
        
class MyHyperModel(HyperModel):
    def __init__(
        self, train_image_data, val_image_data, caption_map, caption_dataset,
        image_shape=(224, 224, 3), hidden_dim=1024, batch_size=32, dropout=0.2, seed=17
    ):
        super().__init__()
        self.train_image_data = train_image_data
        self.train_images = list(train_image_data.keys())
        self.val_image_data = val_image_data
        self.val_images = list(val_image_data.keys())
        self.caption_map = caption_map
        self.caption_dataset = caption_dataset
        self.image_shape = image_shape
        self.hidden_dim = hidden_dim
        self.batch_size = batch_size
        self.dropout = dropout
        self.seed = seed
        
        # CNN configurations
        self.cnn_configs = {
            'vgg16': (VGG16, vgg16.preprocess_input),
            'vgg19': (VGG19, vgg19.preprocess_input),
            'resnet50': (ResNet50, resnet.preprocess_input),
            'resnet152': (ResNet152, resnet.preprocess_input),
            'resnet50v2': (ResNet50V2, resnet_v2.preprocess_input),
            'resnet152v2': (ResNet152V2, resnet_v2.preprocess_input),
            'inception': (InceptionV3, inception_v3.preprocess_input),
            'inceptionresnet': (InceptionResNetV2, inception_resnet_v2.preprocess_input),
            'efficientb0': (EfficientNetB0, efficientnet.preprocess_input),
            'efficientb1': (EfficientNetB1, efficientnet.preprocess_input),
            'efficientb2': (EfficientNetB2, efficientnet.preprocess_input),
            'nasnetmobile': (NASNetMobile, nasnet.preprocess_input),
            'densenet121': (DenseNet121, densenet.preprocess_input),
            'densenet201': (DenseNet201, densenet.preprocess_input),
            'convnexttiny': (ConvNeXtTiny, convnext.preprocess_input),
            'convnextbase': (ConvNeXtBase, convnext.preprocess_input)
        }
        
        tf.random.set_seed(self.seed)
        
    def build(self, hp):
        vocab_size = hp.Choice('vocab_size', values=[2048, 4096, 8192])
        self.seq_length = hp.Int('sequence_length', min_value=15, max_value=30, step=5)
        embedding_dim = hp.Choice('embedding_dim', values=[100, 200, 300])
        
        # Text vectorization
        text_vectorizer = TextVectorization(
            max_tokens=vocab_size,
            output_mode='int',
            output_sequence_length=self.seq_length + 1,
            standardize=custom_standardization,
        )
        
        text_vectorizer.adapt(self.caption_dataset)
        
        vocab = text_vectorizer.get_vocabulary()
        self.word_to_idx, self.idx_to_word = create_vocab_mappings(vocab)
        vectorized_caption_map = vectorize_captions(self.caption_map, text_vectorizer)
        
        # Training and validation data
        self.train_data = data_generator(self.train_images, vectorized_caption_map, self.train_image_data, self.batch_size)
        self.val_data = data_generator(self.val_images, vectorized_caption_map, self.val_image_data, self.batch_size)
        
        # Embedding matrix
        embedding_matrix = create_embedding_matrix(embedding_dim, vocab_size, self.word_to_idx)
        
        # CNN choice
        cnn_choice = hp.Choice('cnn', list(self.cnn_configs.keys()))
        CNN, preprocess_func = self.cnn_configs[cnn_choice]
        
        feature_extractor = CNN(
            weights='imagenet',
            include_top=False,
            input_shape=self.image_shape,
            pooling='avg'
        )
        feature_extractor.trainable = False
        
        embedding_layer = Embedding(
            vocab_size, embedding_dim, input_length=self.seq_length,
            trainable=False, weights=[embedding_matrix], name='sequence_embedding'
        )
        
        # Model paths
        image_input = Input(shape=self.image_shape, name='image_input')
        processed_image = preprocess_func(image_input)
        image_features = feature_extractor(processed_image)
        features_dense = Dense(self.hidden_dim, activation='relu', name='image_dense')(image_features)
        
        caption_input = Input(shape=(self.seq_length,), name='caption_input')
        caption_embedding = embedding_layer(caption_input)
        caption_dropout = Dropout(self.dropout, name='embedding_dropout')(caption_embedding)
        lstm = LSTM(self.hidden_dim, return_sequences=True, name='lstm_layer')(caption_dropout)
        
        merging_layer = Add()([features_dense, lstm])
        merge_dropout = Dropout(self.dropout)(merging_layer)
        
        dense_layer = Dense(self.hidden_dim, activation='relu')(merge_dropout)
        prediction_dropout = Dropout(self.dropout)(dense_layer)
        output = Dense(vocab_size, activation='softmax')(prediction_dropout)
        
        decoder = Model(inputs=[image_input, caption_input], outputs=output)
        decoder.compile(
            optimizer=Adam(0.001),
            loss=SparseCategoricalCrossentropy(),
            metrics=['accuracy']
        )
        
        return decoder
    
    def fit(self, hp, model, *args, **kwargs):
        train_steps = ceil(len(self.train_images) * 5 / self.batch_size)
        val_steps = ceil(len(self.val_images) * 5 / self.batch_size)
        
        # Train the model
        history = model.fit(
            self.train_data,
            validation_data=self.val_data,
            steps_per_epoch=train_steps,
            validation_steps=val_steps,
            *args,
            **kwargs
        )
        
        test_image = choice(self.val_images)
        test_features = self.val_image_data[test_image]
        true_captions = self.caption_map[test_image]

        caption_timestamps = []
        image_features = expand_dims(test_features, axis= 0)
        
        caption = [3]
                
        for i in range(SEQ_LENGTH):  # wasnt starting off from zero
            input_seq = pad_sequences([caption], maxlen=SEQ_LENGTH, padding='post')
            prediction = model.predict([image_features, input_seq], verbose = 0)[0]
            
            caption_timestamps.append(argmax(prediction, 1))
                
            word_idx = argmax(prediction[i])
                    
            caption.append(word_idx) 
            
            if word_idx == 4:
                break
            
        print(caption)
                    
        print(decode_caption(caption, self.idx_to_word))

        for caption in caption_timestamps:
            print(decode_caption(caption, self.idx_to_word))
        
        # Return the best validation loss
        return min(history.history['val_loss'])

In [6]:
train_images, val_images, test_images = load_flicker8k_split()
train_images = train_images[:200]
val_images = val_images[:200]

train_image_data = {}
val_image_data = {}

for img in train_images:
    image_path = os.path.join( 'datasets/flicker8k/Flicker8k_images', img)
    image_data = tf.io.read_file(image_path)
    image_data = tf.image.decode_jpeg(image_data, channels=3)
    image_data = tf.image.resize(image_data, [224, 224])
    train_image_data[img] = image_data
    
for img in val_images:
    image_path = os.path.join( 'datasets/flicker8k/Flicker8k_images', img)
    image_data = tf.io.read_file(image_path)
    image_data = tf.image.decode_jpeg(image_data, channels=3)
    image_data = tf.image.resize(image_data, [224, 224])
    val_image_data[img] = image_data

caption_map, total_captions = load_flicker8k_captions() 
captions_dataset = Dataset.from_tensor_slices(total_captions)
captions_dataset = captions_dataset.batch(2048).prefetch(AUTOTUNE)

In [7]:
callbacks = [
    TensorBoard(
        log_dir='./tensorboard_logs/architecture_search_v1.2/',
        histogram_freq=1,
        write_graph=True,
        update_freq='epoch'
    ),
]

# Initialize hypermodel
hypermodel = MyHyperModel(
    train_image_data=train_image_data,
    val_image_data=val_image_data,
    caption_map=caption_map,
    caption_dataset=captions_dataset,
)

# Initialize tuner
tuner = RandomSearch(
    hypermodel,
    objective="val_loss", 
    max_trials=40,  
    directory="keras_tuning",
    project_name="cnn_lstm_architecture_search_v1",  
    overwrite=True
)

# Start grid search
tuner.search(
    epochs=1,
    callbacks= callbacks
)

Trial 1 Complete [00h 01m 36s]

Best val_loss So Far: None
Total elapsed time: 00h 01m 36s

Search: Running Trial #2

Value             |Best Value So Far |Hyperparameter
8192              |4096              |vocab_size
25                |30                |sequence_length
300               |100               |embedding_dim
convnextbase      |resnet152v2       |cnn



KeyboardInterrupt: 