In [None]:
import tensorflow as tf
import pickle
import numpy as np
from tensorflow.keras.optimizers.schedules import ExponentialDecay
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.utils import Progbar
import random

# -----------------------
# **Load Preprocessed Data**
# -----------------------
with open("features_train_inception.pkl", "rb") as f:
    train_features = pickle.load(f)  # 512D PCA-reduced features for training

with open("features_valid_inception.pkl", "rb") as f:
    val_features = pickle.load(f)  # 512D PCA-reduced features for validation

with open("features_test_inception.pkl", "rb") as f:
    test_features = pickle.load(f)  # 512D PCA-reduced features for testing

with open("captions.pkl", "rb") as f:
    captions = pickle.load(f)

with open("word2idx.pkl", "rb") as f:
    word2idx = pickle.load(f)

with open("idx2word.pkl", "rb") as f:
    idx2word = pickle.load(f)

# -----------------------
# **Split Captions Accordingly**
# -----------------------
train_captions = {img_id: captions[img_id] for img_id in train_features.keys()}
val_captions = {img_id: captions[img_id] for img_id in val_features.keys()}
test_captions = {img_id: captions[img_id] for img_id in test_features.keys()}

# -----------------------
# **Define Model Hyperparameters**
# -----------------------
embedding_dim = 256
units = 512  
vocab_size = len(word2idx) + 1
batch_size = 64
num_epochs = 15  
patience = 5  

# **Track Best Validation Loss**
best_val_loss = float("inf")
early_stop_counter = 0  


# -----------------------
# **Define CNN Encoder**
# -----------------------
class CNN_Encoder(tf.keras.Model):
    def __init__(self, embed_dim):
        super(CNN_Encoder, self).__init__()
        self.fc = tf.keras.layers.Dense(embed_dim, activation="relu", kernel_regularizer=tf.keras.regularizers.l2(0.01))

    def call(self, x):
        x = self.fc(x)
        return tf.expand_dims(x, axis=1)  # Ensure correct shape


# -----------------------
# **Define RNN Decoder**
# -----------------------
class RNN_Decoder(tf.keras.Model):
    def __init__(self, vocab_size, embed_dim, units):
        super(RNN_Decoder, self).__init__()
        self.embedding = tf.keras.layers.Embedding(vocab_size, embed_dim)
        self.lstm = tf.keras.layers.LSTM(units, return_sequences=True, return_state=True, dropout=0.3)
        self.fc = tf.keras.layers.Dense(vocab_size, activation="softmax")  # Use softmax for proper classification
        self.attention = tf.keras.layers.AdditiveAttention()

    def call(self, x, hidden, features):
        hidden = tf.expand_dims(hidden, axis=1)  
        context_vector, _ = self.attention([hidden, features], return_attention_scores=True)

        x = self.embedding(x)
        context_vector = tf.repeat(context_vector, repeats=x.shape[1], axis=1)
        x = tf.concat([context_vector, x], axis=-1)

        output, state_h, state_c = self.lstm(x)
        x = self.fc(output)
        return x, state_h, state_c, _ 


# -----------------------
# **Initialize Model and Optimizer**
# -----------------------
encoder = CNN_Encoder(512)  
decoder = RNN_Decoder(vocab_size, embedding_dim, units)

lr_schedule = ExponentialDecay(initial_learning_rate=0.001, decay_steps=1000, decay_rate=0.96, staircase=True)
optimizer = tf.keras.optimizers.Adam(learning_rate=lr_schedule)

# **Checkpoint Manager**
checkpoint = tf.train.Checkpoint(optimizer=optimizer, encoder=encoder, decoder=decoder)
checkpoint_manager = tf.train.CheckpointManager(checkpoint, "./checkpoints", max_to_keep=5)

# **Accuracy Metrics**
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy()
val_accuracy = tf.keras.metrics.SparseCategoricalAccuracy()


# -----------------------
# **Define Training Step**
# -----------------------
def train_step(image_feature, caption_input, caption_target, training=True):
    loss = 0
    with tf.GradientTape() as tape:
        hidden = tf.zeros((caption_input.shape[0], units))

        image_feature = tf.convert_to_tensor(image_feature, dtype=tf.float32)
        image_feature = tf.expand_dims(image_feature, axis=0)  
        image_feature = encoder(image_feature)  

        output, state_h, state_c, _ = decoder(caption_input, hidden, image_feature)

        loss = tf.keras.losses.sparse_categorical_crossentropy(caption_target, output, from_logits=True)
        loss = tf.reduce_mean(loss)

        if training:
            train_accuracy.update_state(caption_target, output)
        else:
            val_accuracy.update_state(caption_target, output)

    if training:
        gradients = tape.gradient(loss, decoder.trainable_variables + encoder.trainable_variables)
        optimizer.apply_gradients(zip(gradients, decoder.trainable_variables + encoder.trainable_variables))

    return loss


# -----------------------
# **Training Loop**
# -----------------------
for epoch in range(num_epochs):
    print(f"\n Epoch {epoch+1}/{num_epochs}")  

    # **Training Phase**
    total_loss = 0
    batch_count = len(train_captions)
    progress_bar = Progbar(batch_count, stateful_metrics=["loss", "accuracy"])
    train_accuracy.reset_state()

    for i, (image_id, caption_words) in enumerate(train_captions.items()):
        image_feature = train_features.get(image_id)
        if image_feature is None:
            continue

        caption_sequence = [word2idx.get(word, word2idx["<unk>"]) for word in caption_words]
        caption_input = [word2idx['<start>']] + caption_sequence[:-1]
        caption_target = caption_sequence

        caption_input = pad_sequences([caption_input], maxlen=30, padding='post')
        caption_target = pad_sequences([caption_target], maxlen=30, padding='post')

        caption_input = tf.convert_to_tensor(caption_input, dtype=tf.int32)
        caption_target = tf.convert_to_tensor(caption_target, dtype=tf.int32)

        loss = train_step(image_feature, caption_input, caption_target, training=True)
        total_loss += loss

        if i % 10 == 0:
            progress_bar.update(i + 1, values=[("loss", loss.numpy()), ("accuracy", train_accuracy.result().numpy())])

    train_loss = total_loss.numpy() / batch_count  
    train_acc = train_accuracy.result().numpy()  

    # **Validation Phase**
    val_loss = 0
    val_accuracy.reset_state()

    for image_id, caption_words in val_captions.items():
        image_feature = val_features.get(image_id)
        if image_feature is None:
            continue

        caption_sequence = [word2idx.get(word, word2idx["<unk>"]) for word in caption_words]
        caption_input = [word2idx['<start>']] + caption_sequence[:-1]
        caption_target = caption_sequence

        caption_input = pad_sequences([caption_input], maxlen=30, padding='post')
        caption_target = pad_sequences([caption_target], maxlen=30, padding='post')

        caption_input = tf.convert_to_tensor(caption_input, dtype=tf.int32)
        caption_target = tf.convert_to_tensor(caption_target, dtype=tf.int32)

        loss = train_step(image_feature, caption_input, caption_target, training=False)
        val_loss += loss.numpy()

    val_loss /= len(val_captions)
    val_acc = val_accuracy.result().numpy()

    print(f"\n Epoch {epoch+1}, Loss: {train_loss:.4f}, Accuracy: {train_acc:.4f}, Val Loss: {val_loss:.4f}, Val Accuracy: {val_acc:.4f}")

    checkpoint_manager.save()



 Epoch 1/15
[1m5821/5824[0m [32m━━━━━━━━━━━━━━━━━━━[0m[37m━[0m [1m0s[0m 303ms/step - loss: 6.8662e-06 - accuracy: 0.9986
 Epoch 1, Loss: 0.0110, Accuracy: 0.9986, Val Loss: 0.0000, Val Accuracy: 1.0000

 Epoch 2/15
[1m5821/5824[0m [32m━━━━━━━━━━━━━━━━━━━[0m[37m━[0m [1m0s[0m 300ms/step - loss: 1.4146e-06 - accuracy: 1.0000
 Epoch 2, Loss: 0.0000, Accuracy: 1.0000, Val Loss: 0.0000, Val Accuracy: 1.0000

 Epoch 3/15
[1m5821/5824[0m [32m━━━━━━━━━━━━━━━━━━━[0m[37m━[0m [1m0s[0m 304ms/step - loss: 7.9467e-06 - accuracy: 1.0000
 Epoch 3, Loss: 0.0001, Accuracy: 1.0000, Val Loss: 0.0000, Val Accuracy: 1.0000

 Epoch 4/15
[1m5821/5824[0m [32m━━━━━━━━━━━━━━━━━━━[0m[37m━[0m [1m0s[0m 298ms/step - loss: 2.0742e-06 - accuracy: 1.0000
 Epoch 4, Loss: 0.0001, Accuracy: 1.0000, Val Loss: 0.0000, Val Accuracy: 1.0000

 Epoch 5/15
[1m5821/5824[0m [32m━━━━━━━━━━━━━━━━━━━[0m[37m━[0m [1m0s[0m 301ms/step - loss: 5.1657e-08 - accuracy: 1.0000
 Epoch 5, Loss: 0.0000, Ac

In [3]:
import pickle
import numpy as np
from sklearn.decomposition import PCA

# -----------------------
# **Load Train, Validation, and Test Features**
# -----------------------
with open("features_train_inception.pkl", "rb") as f:
    features_train = pickle.load(f)

with open("features_valid_inception.pkl", "rb") as f:
    features_valid = pickle.load(f)

with open("features_test_inception.pkl", "rb") as f:
    features_test = pickle.load(f)

# Convert feature dictionaries to numpy arrays
train_vectors = np.array(list(features_train.values())).squeeze(axis=1)  # Shape (num_train, 2048)
valid_vectors = np.array(list(features_valid.values())).squeeze(axis=1)  # Shape (num_valid, 2048)
test_vectors = np.array(list(features_test.values())).squeeze(axis=1)  # Shape (num_test, 2048)

print(f"Original Train Feature Shape: {train_vectors.shape}")  # Expected (train_samples, 2048)
print(f"Original Validation Feature Shape: {valid_vectors.shape}")  # Expected (valid_samples, 2048)
print(f"Original Test Feature Shape: {test_vectors.shape}")  # Expected (test_samples, 2048)

# -----------------------
# **Apply PCA to Reduce to 512D**
# -----------------------
pca_512 = PCA(n_components=512)

train_reduced = pca_512.fit_transform(train_vectors)  # Shape (num_train, 512)
valid_reduced = pca_512.transform(valid_vectors)  # Shape (num_valid, 512)
test_reduced = pca_512.transform(test_vectors)  # Shape (num_test, 512)

print(f"Reduced Train Feature Shape: {train_reduced.shape}")  # Expected (train_samples, 512)
print(f"Reduced Validation Feature Shape: {valid_reduced.shape}")  # Expected (valid_samples, 512)
print(f"Reduced Test Feature Shape: {test_reduced.shape}")  # Expected (test_samples, 512)

# -----------------------
# **Overwrite Original Pickle Files with Reduced Features**
# -----------------------
# Convert back to dictionary format {image_id: reduced_feature}
reduced_train_dict = {key: train_reduced[i] for i, key in enumerate(features_train.keys())}
reduced_valid_dict = {key: valid_reduced[i] for i, key in enumerate(features_valid.keys())}
reduced_test_dict = {key: test_reduced[i] for i, key in enumerate(features_test.keys())}

# Overwrite the existing feature pickle files
with open("features_train_inception.pkl", "wb") as f:
    pickle.dump(reduced_train_dict, f)

with open("features_valid_inception.pkl", "wb") as f:
    pickle.dump(reduced_valid_dict, f)

with open("features_test_inception.pkl", "wb") as f:
    pickle.dump(reduced_test_dict, f)

print(" Features successfully reduced to 512D and overwritten in the original files.")


Original Train Feature Shape: (5824, 2048)
Original Validation Feature Shape: (648, 2048)
Original Test Feature Shape: (1619, 2048)
Reduced Train Feature Shape: (5824, 512)
Reduced Validation Feature Shape: (648, 512)
Reduced Test Feature Shape: (1619, 512)
 Features successfully reduced to 512D and overwritten in the original files.
