In [None]:
import tensorflow as tf
import keras_tuner as kt
import numpy as np
from typing import Dict, Any, Tuple
import json
import os
from datetime import datetime

class TunableBahdanauAttention(tf.keras.Model):
    """Tunable version of BahdanauAttention with configurable parameters"""
    def __init__(self, units, dropout_rate=0.0, activation='tanh'):
        super().__init__()
        self.units = units
        self.W1 = tf.keras.layers.Dense(units)
        self.W2 = tf.keras.layers.Dense(units)
        self.V = tf.keras.layers.Dense(1)
        self.dropout = tf.keras.layers.Dropout(dropout_rate)
        self.activation = activation
        
    def call(self, features, hidden, training=None):
        hidden_with_time_axis = tf.expand_dims(hidden, 1)
        
        # Apply configurable activation function
        if self.activation == 'tanh':
            score = tf.nn.tanh(self.W1(features) + self.W2(hidden_with_time_axis))
        elif self.activation == 'relu':
            score = tf.nn.relu(self.W1(features) + self.W2(hidden_with_time_axis))
        else:  # leaky_relu
            score = tf.nn.leaky_relu(self.W1(features) + self.W2(hidden_with_time_axis))
        
        score = self.dropout(score, training=training)
        attention_weights = tf.nn.softmax(self.V(score), axis=1)
        context_vector = attention_weights * features
        context_vector = tf.reduce_sum(context_vector, axis=1)
        return context_vector, attention_weights

class TunableDecoder(tf.keras.Model):
    """Tunable version of Decoder with configurable parameters"""
    def __init__(self, embedding_dim, units, vocab_size, dropout_rate=0.0, 
                 recurrent_dropout=0.0, attention_dropout=0.0, attention_activation='tanh',
                 dense_activation='relu', use_layer_norm=False, num_lstm_layers=1):
        super().__init__()
        self.units = units
        self.num_lstm_layers = num_lstm_layers
        self.use_layer_norm = use_layer_norm
        
        # Embedding layer
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
        self.embedding_dropout = tf.keras.layers.Dropout(dropout_rate)
        
        # LSTM layers
        self.lstm_layers = []
        for i in range(num_lstm_layers):
            return_sequences = True if i < num_lstm_layers - 1 else True
            return_state = True if i == num_lstm_layers - 1 else False
            
            lstm = tf.keras.layers.LSTM(
                units, 
                return_sequences=return_sequences, 
                return_state=return_state,
                dropout=dropout_rate,
                recurrent_dropout=recurrent_dropout
            )
            self.lstm_layers.append(lstm)
        
        # Dense layers
        self.fc1 = tf.keras.layers.Dense(units, activation=dense_activation)
        self.fc1_dropout = tf.keras.layers.Dropout(dropout_rate)
        self.fc2 = tf.keras.layers.Dense(vocab_size)
        
        # Attention mechanism
        self.attention = TunableBahdanauAttention(units, attention_dropout, attention_activation)
        
        # Layer normalization (optional)
        if use_layer_norm:
            self.layer_norm1 = tf.keras.layers.LayerNormalization()
            self.layer_norm2 = tf.keras.layers.LayerNormalization()
    
    def call(self, x, features, hidden, training=None):
        context_vector, attention_weights = self.attention(features, hidden, training=training)
        
        # Embedding
        x = self.embedding(x)
        x = self.embedding_dropout(x, training=training)
        
        # Concatenate context vector with embeddings
        x = tf.concat([tf.expand_dims(context_vector, 1), x], axis=-1)
        
        # Apply layer normalization if enabled
        if self.use_layer_norm:
            x = self.layer_norm1(x, training=training)
        
        # Pass through LSTM layers
        states = []
        for i, lstm_layer in enumerate(self.lstm_layers):
            if i == len(self.lstm_layers) - 1:  # Last layer returns states
                x, state_h, state_c = lstm_layer(x, training=training)
                states = [state_h, state_c]
            else:
                x = lstm_layer(x, training=training)
        
        # Dense layers
        x = self.fc1(x)
        if self.use_layer_norm:
            x = self.layer_norm2(x, training=training)
        x = self.fc1_dropout(x, training=training)
        x = tf.reshape(x, (-1, x.shape[2]))
        x = self.fc2(x)
        
        return x, states[0], attention_weights  # Return hidden state
    
    def reset_state(self, batch_size):
        return tf.zeros((batch_size, self.units))

class ImageCaptionHyperModel(kt.HyperModel):
    """Hypermodel for tuning image captioning model parameters"""
    
    def __init__(self, vocab_size, cnn_encoder=None):
        self.vocab_size = vocab_size
        self.cnn_encoder = cnn_encoder
    
    def build(self, hp):
        """Build model with hyperparameters"""
        
        # Decoder hyperparameters
        embedding_dim = hp.Int('embedding_dim', min_value=128, max_value=512, step=64)
        units = hp.Int('lstm_units', min_value=256, max_value=1024, step=128)
        
        # Dropout hyperparameters
        dropout_rate = hp.Float('dropout_rate', min_value=0.0, max_value=0.5, step=0.1)
        recurrent_dropout = hp.Float('recurrent_dropout', min_value=0.0, max_value=0.3, step=0.1)
        attention_dropout = hp.Float('attention_dropout', min_value=0.0, max_value=0.3, step=0.1)
        
        # Attention mechanism hyperparameters
        attention_activation = hp.Choice('attention_activation', ['tanh', 'relu', 'leaky_relu'])
        
        # Dense layer hyperparameters
        dense_activation = hp.Choice('dense_activation', ['relu', 'gelu'])
        
        # Architecture hyperparameters
        use_layer_norm = hp.Boolean('use_layer_norm')
        num_lstm_layers = hp.Int('num_lstm_layers', min_value=1, max_value=3)
        
        # Learning rate
        learning_rate = hp.Float('learning_rate', min_value=1e-5, max_value=1e-2, sampling='LOG')
        
        # Build decoder
        decoder = TunableDecoder(
            embedding_dim=embedding_dim,
            units=units,
            vocab_size=self.vocab_size,
            dropout_rate=dropout_rate,
            recurrent_dropout=recurrent_dropout,
            attention_dropout=attention_dropout,
            attention_activation=attention_activation,
            dense_activation=dense_activation,
            use_layer_norm=use_layer_norm,
            num_lstm_layers=num_lstm_layers
        )
        
        # Create a wrapper model for training
        model = CaptionModelWrapper(self.cnn_encoder, decoder, learning_rate)
        
        return model

class CaptionModelWrapper(tf.keras.Model):
    """Wrapper model for training with Keras Tuner"""
    
    def __init__(self, cnn_encoder, decoder, learning_rate):
        super().__init__()
        self.cnn_encoder = cnn_encoder
        self.decoder_model = decoder
        self.loss_tracker = tf.keras.metrics.Mean(name="loss")
        
        # Optimizer
        self.optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)
        
        # Loss function
        self.loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
            from_logits=True, reduction='none'
        )
    
    def compile(self, optimizer=None, loss=None, metrics=None, **kwargs):
        # Override compile to use custom training
        super().compile(**kwargs)
    
    def loss_function(self, real, pred):
        mask = tf.math.logical_not(tf.math.equal(real, 0))
        loss_ = self.loss_object(real, pred)
        mask = tf.cast(mask, dtype=loss_.dtype)
        loss_ *= mask
        return tf.reduce_mean(loss_)
    
    @tf.function
    def train_step(self, data):
        img_tensor, target = data
        
        loss = 0
        hidden = self.decoder_model.reset_state(batch_size=target.shape[0])
        dec_input = tf.expand_dims([1] * target.shape[0], 1)  # Assuming start token is 1
        
        with tf.GradientTape() as tape:
            features = self.cnn_encoder(img_tensor, training=False)  # CNN frozen
            
            for i in range(1, target.shape[1]):
                predictions, hidden, _ = self.decoder_model(
                    dec_input, features, hidden, training=True
                )
                loss += self.loss_function(target[:, i], predictions)
                dec_input = tf.expand_dims(target[:, i], 1)
        
        total_loss = loss / int(target.shape[1])
        
        # Only train decoder variables
        trainable_vars = self.decoder_model.trainable_variables
        gradients = tape.gradient(loss, trainable_vars)
        self.optimizer.apply_gradients(zip(gradients, trainable_vars))
        
        # Update metrics
        self.loss_tracker.update_state(total_loss)
        return {"loss": self.loss_tracker.result()}
    
    @property
    def metrics(self):
        return [self.loss_tracker]

class ImageCaptionTuner:
    """Main hyperparameter tuner class"""
    
    def __init__(self, vocab_size: int, cnn_encoder, project_name: str = "image_caption_tuning"):
        self.vocab_size = vocab_size
        self.cnn_encoder = cnn_encoder
        self.project_name = project_name
        self.best_params = None
        self.tuner = None
    
    def create_tuner(self, 
                    tuner_type: str = 'bayesian',
                    max_trials: int = 50,
                    executions_per_trial: int = 1,
                    directory: str = 'hyperparameter_tuning',
                    overwrite: bool = False) -> kt.Tuner:
        """Create and configure the hyperparameter tuner"""
        
        hypermodel = ImageCaptionHyperModel(self.vocab_size, self.cnn_encoder)
        
        if tuner_type == 'bayesian':
            tuner = kt.BayesianOptimization(
                hypermodel,
                objective='loss',
                max_trials=max_trials,
                executions_per_trial=executions_per_trial,
                directory=directory,
                project_name=self.project_name,
                overwrite=overwrite
            )
        elif tuner_type == 'random':
            tuner = kt.RandomSearch(
                hypermodel,
                objective='loss',
                max_trials=max_trials,
                executions_per_trial=executions_per_trial,
                directory=directory,
                project_name=self.project_name,
                overwrite=overwrite
            )
        elif tuner_type == 'hyperband':
            tuner = kt.Hyperband(
                hypermodel,
                objective='loss',
                max_epochs=10,
                factor=3,
                directory=directory,
                project_name=self.project_name,
                overwrite=overwrite
            )
        else:
            raise ValueError(f"Unsupported tuner type: {tuner_type}")
        
        self.tuner = tuner
        return tuner
    
    def search(self, 
               train_dataset,
               validation_dataset=None,
               epochs: int = 10,
               callbacks: list = None) -> None:
        """Run hyperparameter search"""
        
        if self.tuner is None:
            raise ValueError("Tuner not created. Call create_tuner() first.")
        
        # Default callbacks
        if callbacks is None:
            callbacks = [
                tf.keras.callbacks.EarlyStopping(patience=3, restore_best_weights=True),
                tf.keras.callbacks.ReduceLROnPlateau(factor=0.2, patience=2)
            ]
        
        print(f"Starting hyperparameter search with {self.tuner.max_trials} trials...")
        
        self.tuner.search(
            train_dataset,
            validation_data=validation_dataset,
            epochs=epochs,
            callbacks=callbacks,
            verbose=1
        )
        
        print("Hyperparameter search completed!")
    
    def get_best_hyperparameters(self, num_trials: int = 1) -> Dict[str, Any]:
        """Get the best hyperparameters from the search"""
        
        if self.tuner is None:
            raise ValueError("No tuner found. Run search() first.")
        
        best_hps = self.tuner.get_best_hyperparameters(num_trials=num_trials)
        self.best_params = best_hps[0].values if best_hps else None
        
        return self.best_params
    
    def get_best_model(self):
        """Get the best model from the search"""
        
        if self.tuner is None:
            raise ValueError("No tuner found. Run search() first.")
        
        return self.tuner.get_best_models(num_models=1)[0]
    
    def build_best_model(self) -> Tuple[TunableDecoder, Dict[str, Any]]:
        """Build the best model with optimal hyperparameters"""
        
        best_params = self.get_best_hyperparameters()
        
        if best_params is None:
            raise ValueError("No best parameters found. Run search() first.")
        
        # Build decoder with best parameters
        decoder = TunableDecoder(
            embedding_dim=best_params['embedding_dim'],
            units=best_params['lstm_units'],
            vocab_size=self.vocab_size,
            dropout_rate=best_params['dropout_rate'],
            recurrent_dropout=best_params['recurrent_dropout'],
            attention_dropout=best_params['attention_dropout'],
            attention_activation=best_params['attention_activation'],
            dense_activation=best_params['dense_activation'],
            use_layer_norm=best_params['use_layer_norm'],
            num_lstm_layers=best_params['num_lstm_layers']
        )
        
        return decoder, best_params
    
    def save_results(self, filepath: str = None):
        """Save tuning results to file"""
        
        if filepath is None:
            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
            filepath = f"tuning_results_{timestamp}.json"
        
        results = {
            'best_hyperparameters': self.get_best_hyperparameters(),
            'tuner_summary': self.tuner.results_summary() if self.tuner else None,
            'timestamp': datetime.now().isoformat()
        }
        
        with open(filepath, 'w') as f:
            json.dump(results, f, indent=2, default=str)
        
        print(f"Results saved to {filepath}")
    
    def results_summary(self):
        """Print summary of tuning results"""
        
        if self.tuner is None:
            print("No tuner found. Run search() first.")
            return
        
        print("\n" + "="*50)
        print("HYPERPARAMETER TUNING RESULTS SUMMARY")
        print("="*50)
        
        # Best hyperparameters
        best_params = self.get_best_hyperparameters()
        if best_params:
            print("\nBest Hyperparameters:")
            print("-" * 30)
            for param, value in best_params.items():
                print(f"{param}: {value}")
        
        # Tuner summary
        print(f"\nTotal trials completed: {len(self.tuner.oracle.trials)}")
        self.tuner.results_summary()

# Example usage function
def example_usage(vocab_size):
    """Example of how to use the ImageCaptionTuner"""
    
    # Assuming you have your CNN encoder and dataset ready
    cnn_encoder = create_cnn_encoder()  # Your existing CNN encoder
    
    # Create tuner
    tuner = ImageCaptionTuner(vocab_size, cnn_encoder)
    
    # Create and configure the hyperparameter tuner
    kt_tuner = tuner.create_tuner(
        tuner_type='bayesian',
        max_trials=30,
        directory='tuning_results'
    )
    
    # Run search
    tuner.search(
        train_dataset,  # Your training dataset
        validation_dataset,  # Your validation dataset
        epochs=5
    )
    
    # Get results
    best_decoder, best_params = tuner.build_best_model()
    tuner.results_summary()
    tuner.save_results()

    return best_decoder, best_params

In [None]:
def create_cnn_encoder():
    base_model = tf.keras.applications.InceptionV3(include_top=False, weights='imagenet')
    base_model.trainable = False
    output = base_model.output
    output = tf.keras.layers.Reshape((-1, output.shape[-1]))(output)  # (batch, H*W, features)
    return tf.keras.Model(inputs=base_model.input, outputs=output)

In [None]:
class BahdanauAttention(tf.keras.layers.Layer):
    def __init__(self, units):
        super().__init__()
        self.W1 = tf.keras.layers.Dense(units)
        self.W2 = tf.keras.layers.Dense(units)
        self.V = tf.keras.layers.Dense(1)

    def call(self, features, hidden):
        hidden_with_time_axis = tf.expand_dims(hidden, 1)
        score = tf.nn.tanh(self.W1(features) + self.W2(hidden_with_time_axis))
        attention_weights = tf.nn.softmax(self.V(score), axis=1)
        context_vector = attention_weights * features
        context_vector = tf.reduce_sum(context_vector, axis=1)
        return context_vector, attention_weights


class CaptionDecoder(tf.keras.Model):
    def __init__(self, vocab_size, embedding_dim, units, dropout_rate=0.0):
        super().__init__()
        self.units = units
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
        self.lstm = tf.keras.layers.LSTM(units, return_sequences=True, return_state=True, dropout=dropout_rate)
        self.fc1 = tf.keras.layers.Dense(units, activation='tanh')
        self.fc2 = tf.keras.layers.Dense(vocab_size)
        self.attention = BahdanauAttention(units)

    def call(self, x, features, hidden):
        context_vector, attn_weights = self.attention(features, hidden)
        x = self.embedding(x)
        x = tf.concat([tf.expand_dims(context_vector, 1), x], axis=-1)
        output, state_h, state_c = self.lstm(x)
        x = self.fc1(output)
        x = tf.reshape(x, (-1, x.shape[2]))
        logits = self.fc2(x)
        return logits, state_h, attn_weights

    def reset_state(self, batch_size):
        return tf.zeros((batch_size, self.units))

In [None]:
class CaptionHyperModel(HyperModel):
    def __init__(self, vocab_size):
        self.vocab_size = vocab_size

    def build(self, hp):
        # Tune hyperparameters
        embedding_dim = hp.Int("embedding_dim", min_value=128, max_value=512, step=64)
        units = hp.Int("lstm_units", min_value=128, max_value=512, step=64)
        dropout_rate = hp.Float("dropout", 0.0, 0.5, step=0.1, default=0.1)
        learning_rate = hp.Float("lr", 1e-4, 1e-2, sampling="log")

        # Build encoder (frozen)
        cnn_encoder = create_cnn_encoder()
        cnn_encoder.trainable = False

        # Build decoder
        decoder = CaptionDecoder(
            vocab_size=self.vocab_size,
            embedding_dim=embedding_dim,
            units=units,
            dropout_rate=dropout_rate
        )

        # Optimizer
        optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)

        # Store for access later
        decoder.cnn_encoder = cnn_encoder
        decoder.optimizer = optimizer

        return decoder

In [None]:
class CaptionTuner(kt.Tuner):
    def run_trial(self, trial, train_data, val_data, epochs=3, steps_per_epoch=50, validation_steps=20):
        # Build model from hypermodel
        decoder = self.hypermodel.build(trial.hyperparameters)

        # Loss function
        def compute_loss(real, pred):
            mask = tf.not_equal(real, 0)
            loss = tf.keras.losses.sparse_categorical_crossentropy(real, pred, from_logits=True, reduction='none')
            loss = tf.boolean_mask(loss, mask)
            return tf.reduce_mean(loss)

        # Training step
        @tf.function
        def train_step(images, captions):
            hidden = decoder.reset_state(batch_size=captions.shape[0])
            dec_input = tf.expand_dims([word2idx('<start>')] * captions.shape[0], 1)
            total_loss = 0.0

            with tf.GradientTape() as tape:
                features = decoder.cnn_encoder(images, training=False)  # frozen
                for t in range(1, captions.shape[1]):
                    predictions, hidden, _ = decoder(dec_input, features, hidden)
                    loss = compute_loss(captions[:, t], predictions)
                    total_loss += loss
                    dec_input = tf.expand_dims(captions[:, t], 1)

            avg_loss = total_loss / captions.shape[1]
            grads = tape.gradient(avg_loss, decoder.trainable_variables)
            decoder.optimizer.apply_gradients(zip(grads, decoder.trainable_variables))
            return avg_loss

        # Validation step
        @tf.function
        def val_step(images, captions):
            hidden = decoder.reset_state(batch_size=captions.shape[0])
            dec_input = tf.expand_dims([word2idx('<start>')] * captions.shape[0], 1)
            total_loss = 0.0
            features = decoder.cnn_encoder(images, training=False)
            for t in range(1, captions.shape[1]):
                predictions, hidden, _ = decoder(dec_input, features, hidden)
                loss = compute_loss(captions[:, t], predictions)
                total_loss += loss
                dec_input = tf.expand_dims(captions[:, t], 1)
            return total_loss / captions.shape[1]

        # Run epochs
        for epoch in range(epochs):
            print(f"Trial {trial.trial_id}, Epoch {epoch + 1}/{epochs}")

            # Training
            train_loss = 0.0
            for step, (images, captions) in enumerate(train_data.take(steps_per_epoch)):
                loss = train_step(images, captions)
                train_loss += loss
            avg_train_loss = float(train_loss / steps_per_epoch)

            # Validation
            val_loss = 0.0
            for step, (images, captions) in enumerate(val_data.take(validation_steps)):
                loss = val_step(images, captions)
                val_loss += loss
            avg_val_loss = float(val_loss / validation_steps)

            # Report metrics
            self.oracle.update_trial(
                trial.trial_id,
                {
                    "epoch": epoch + 1,
                    "train_loss": avg_train_loss,
                    "val_loss": avg_val_loss,
                }
            )

        self.save_model(trial.trial_id, decoder)

In [None]:
# Define hypermodel
hypermodel = CaptionHyperModel(vocab_size=18366)

# Create tuner with Bayesian Optimization
tuner = CaptionTuner(
    oracle=kt.oracles.BayesianOptimization(
        objective=kt.Objective("val_loss", "min"),
        max_trials=15,
        seed=42,
        num_initial_points=3,
    ),
    hypermodel=hypermodel,
    directory="tuner_results",
    project_name="image_captioning_modern",
    overwrite=True,
)

# Start search
tuner.search(
    train_data=train_dataset,
    val_data=val_dataset,
    epochs=3,
    steps_per_epoch=50,
    validation_steps=20
)

In [None]:
# Get best hyperparameters
best_hp = tuner.get_best_hyperparameters()[0]

print("🔤 Best Hyperparameters:")
for param in best_hp.values:
    print(f"  {param}: {best_hp.get(param)}")

# Get best model
best_decoder = tuner.get_best_models(num_models=1)[0]
best_cnn_encoder = best_decoder.cnn_encoder