In [None]:

import tensorflow as tf
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import os
from tqdm import tqdm
import re
import pickle
from sklearn.model_selection import train_test_split

print("TensorFlow version:", tf.__version__)



In [None]:
# Load features
features = np.load('../features/image_features.npy')
image_names = np.load('../features/image_names.npy')

# Load caption file
df = pd.read_csv('../data/captions.csv')
df['caption'] = df['caption'].apply(lambda x: f"<start> {x} <end>")
# Match features and captions
image_to_index = {name: idx for idx, name in enumerate(image_names)}
df = df[df['image'].isin(image_to_index)]

print("✅ Features and captions loaded:", len(df))



In [None]:
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences

all_captions = df['caption'].tolist()

# Build the tokenizer
tokenizer = Tokenizer(num_words=10000, oov_token="<unk>", filters='!"#$%&()*+.,-/:;=?@[\]^_`{|}~ ')
tokenizer.fit_on_texts(all_captions)
# Save tokenizer for later use
with open('../data/tokenizer.pkl', 'wb') as f:
    pickle.dump(tokenizer, f)

sequences = tokenizer.texts_to_sequences(captions)
max_len = max(len(seq) for seq in sequences)
print("Max caption length:", max_len)

padded_seqs = pad_sequences(sequences, padding='post')

# Match each caption to image feature
features_matched = np.array([features[image_to_index[img]] for img in df['image']])


In [None]:
print('<start>' in tokenizer.word_index)  # True
print(tokenizer.word_index['<start>'])   # should print a number like 1 or 2


In [None]:
train_img, val_img, train_cap, val_cap = train_test_split(
    features_matched, padded_seqs, test_size=0.1, random_state=42)

print(f"Train samples: {len(train_img)}, Val samples: {len(val_img)}")


In [12]:
# Encoder: Accepts 2048-D feature vector
class CNN_Encoder(tf.keras.Model):
    def __init__(self, embedding_dim):
        super(CNN_Encoder, self).__init__()
        self.fc = tf.keras.layers.Dense(embedding_dim, activation='relu')

    def call(self, x):
        return self.fc(x)

# Bahdanau Attention
class BahdanauAttention(tf.keras.Model):
    def __init__(self, units):
        super(BahdanauAttention, self).__init__()
        self.W1 = tf.keras.layers.Dense(units)
        self.W2 = tf.keras.layers.Dense(units)
        self.V = tf.keras.layers.Dense(1)

    def call(self, features, hidden):
        hidden_with_time_axis = tf.expand_dims(hidden, 1)
        score = tf.nn.tanh(self.W1(features) + self.W2(hidden_with_time_axis))
        attention_weights = tf.nn.softmax(self.V(score), axis=1)
        context_vector = attention_weights * features
        context_vector = tf.reduce_sum(context_vector, axis=1)
        return context_vector, attention_weights

# Decoder: uses LSTM
class RNN_Decoder(tf.keras.Model):
    def __init__(self, embedding_dim, units, vocab_size):
        super(RNN_Decoder, self).__init__()
        self.units = units
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
        self.lstm = tf.keras.layers.LSTM(units,
                                         return_sequences=True,
                                         return_state=True,
                                         recurrent_initializer='glorot_uniform')
        self.fc1 = tf.keras.layers.Dense(units)
        self.fc2 = tf.keras.layers.Dense(vocab_size)
        self.attention = BahdanauAttention(units)

    def call(self, x, features, hidden):
        context_vector, attention_weights = self.attention(features, hidden)
        x = self.embedding(x)
        x = tf.concat([tf.expand_dims(context_vector, 1), x], axis=-1)
        output, state, _ = self.lstm(x)
        x = self.fc1(output)
        x = tf.reshape(x, (-1, x.shape[2]))
        x = self.fc2(x)
        return x, state, attention_weights

    def reset_state(self, batch_size):
        return tf.zeros((batch_size, self.units))


In [13]:
embedding_dim = 256
units = 512
vocab_size = len(tokenizer.word_index) + 1
BATCH_SIZE = 64
BUFFER_SIZE = 1000
EPOCHS = 10

encoder = CNN_Encoder(embedding_dim)
decoder = RNN_Decoder(embedding_dim, units, vocab_size)

optimizer = tf.keras.optimizers.Adam()
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True, reduction='none')

def loss_function(real, pred):
    mask = tf.math.not_equal(real, 0)
    loss_ = loss_object(real, pred)
    mask = tf.cast(mask, dtype=loss_.dtype)
    loss_ *= mask
    return tf.reduce_mean(loss_)


In [14]:
train_dataset = tf.data.Dataset.from_tensor_slices((train_img, train_cap))
train_dataset = train_dataset.shuffle(BUFFER_SIZE).batch(BATCH_SIZE)

val_dataset = tf.data.Dataset.from_tensor_slices((val_img, val_cap))
val_dataset = val_dataset.batch(BATCH_SIZE)


In [None]:
@tf.function
def train_step(img_tensor, target):
    loss = 0
    hidden = decoder.reset_state(batch_size=target.shape[0])
    dec_input = tf.expand_dims([tokenizer.word_index['<start>']] * target.shape[0], 1)

    with tf.GradientTape() as tape:
        features = encoder(img_tensor)
        for i in range(1, target.shape[1]):
            predictions, hidden, _ = decoder(dec_input, features, hidden)
            loss += loss_function(target[:, i], predictions)
            dec_input = tf.expand_dims(target[:, i], 1)

    total_loss = loss / int(target.shape[1])
    trainable_variables = encoder.trainable_variables + decoder.trainable_variables
    gradients = tape.gradient(loss, trainable_variables)
    optimizer.apply_gradients(zip(gradients, trainable_variables))
    return loss, total_loss

# Training loop
for epoch in range(EPOCHS):
    total_loss = 0
    for batch, (img_tensor, target) in enumerate(train_dataset):
        batch_loss, t_loss = train_step(img_tensor, target)
        total_loss += t_loss

        if batch % 100 == 0:
            print(f"Epoch {epoch+1} Batch {batch} Loss {t_loss:.4f}")

    print(f"✅ Epoch {epoch+1} Loss {total_loss / len(train_dataset):.6f}")
