In [27]:
import numpy as np
import pandas as pd
import string
import os
import tensorflow as tf
from collections import Counter
import spacy
import pickle
from tqdm import tqdm
from PIL import Image
from tensorflow.keras.applications import ResNet50
from tensorflow.keras.applications.resnet50 import preprocess_input
from tensorflow.keras.preprocessing import image
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.models import Model
from tensorflow.keras import layers
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping, ReduceLROnPlateau

In [4]:
nlp = spacy.load("en_core_web_sm")

class Vocabulary:
  def __init__(self, freq_threshold):
    self.freq_threshold = freq_threshold
    self.word2idx = {"<pad>" : 0, "<start>" : 1, "<end>" : 2, "<unk>" : 3}
    self.idx2word = {0 : "<pad>", 1 : "<start>", 2 : "<end>", 3 : "<unk>"}
    self.idx = 4
    
  def tokenizer(self, text):
    return [token.text.lower() for token in nlp(text)]
  
  def build_vocab(self, sentence_list):
    frequencies = Counter()
    for sentence in sentence_list:
      frequencies.update(self.tokenizer(sentence))
      
    for word, freq in frequencies.items():
      if freq >= self.freq_threshold:
        self.word2idx[word] = self.idx
        self.idx2word[self.idx] = word
        self.idx += 1
        
    def numericalize(self, text):
      tokenized_text = self.tokenizer(text)
      return [self.word2idx.get(token, self.word2idx["<unk>"]) for token in tokenized_text]

In [22]:
class Dataset:
  def __init__(self, image_dir, caption_file, vocab, transform, mode="train"):
    self.image_dir = image_dir
    self.df = pd.read_csv(caption_file)
    self.images = self.df['image'].tolist()
    self.captions = self.df['caption'].tolist()
    self.vocab = vocab
    self.transform = transform
    self.mode = mode
    
  def __len__(self):
    return len(self.df)

  def __getitem__(self, idx):
    img_path = os.path.join(self.image_dir, self.images[idx])
    img = Image.open(img_path).convert("RGB")
    img = self.transform(img)
    
    if self.mode == "train":
      caption = self.captions[idx]
      caption += self.vocab.numericalize(self.captions[idx])
      caption.append(self.vocab.word2idx["<end>"])
      return img, np.array(caption, dtype=np.int32)
    else:
      return img, self.images[idx]

In [25]:
def transform_img(img):
  img = img.resize((224, 224))
  img = np.array(img)
  img = preprocess_input(img)
  return img

def data_generator(dataset):
  for i in range(len(dataset)):
      img, cap = dataset[i]
      print(f"Yielding item {i}")
      yield img, cap

In [7]:
class EncoderCNN(tf.keras.Model):
    def __init__(self, embedding_dim):
        super(EncoderCNN, self).__init__()
        self.cnn = ResNet50(include_top=False, weights='imagenet')
        self.pool = layers.GlobalAveragePooling2D()
        self.fc = layers.Dense(embedding_dim)
        
    def call(self, x):
        x = self.cnn(x)
        x = self.pool(x)
        x = self.fc(x)
        return x

In [8]:
class BahdanauAttention(tf.keras.Model):
    def __init__(self, units):
        super(BahdanauAttention, self).__init__()
        self.W1 = layers.Dense(units)
        self.W2 = layers.Dense(units)
        self.V = layers.Dense(1)
        
    def call(self, features, hidden):
        hidden_with_time_axis = tf.expand_dims(hidden, 1)
        score = tf.nn.tanh(self.W1(features) + self.W2(hidden_with_time_axis))
        attention_wts = tf.nn.softmax(self.V(score), axis = 1)
        context_vector = attention_wts * features
        context_vector = tf.reduce_sum(context_vector, axis = 1)
        return context_vector, attention_wts

In [9]:
class DecoderRNN(tf.keras.Model):
    def __init__(self, vocab_size, embedding_dim, units):
        super(DecoderRNN, self).__init__()
        self.units = units
        self.embedding = layers.Embedding(vocab_size, embedding_dim)
        self.lstm = layers.LSTM(units, return_sequences=True, return_state=True)
        self.fc = layers.Dense(vocab_size)
        self.attention = BahdanauAttention(units=units)
        
    def call(self, x, features, hidden, cell):
        context_vector, attention_wts = self.attention(features, hidden)
        x = self.embedding(x)
        x = tf.concat([tf.expand_dims(context_vector, 1), x], axis = -1)
        output, state_h, state_c = self.lstm(x, initial_state = [hidden, cell])
        output = tf.reshape(output, (-1, output.shape[2]))
        x = self.fc(output)
        return x, state_h, state_c, attention_wts
    
    def reset_state(self, batch_size):
        return tf.zeros((batch_size, self.units)), tf.zeros((batch_size, self.units))

In [10]:
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True, reduction='none')

def loss_function(real, pred):
    mask = tf.math.logical_not(tf.math.equal(real, 0))
    loss = loss_object(real, pred)
    mask = tf.cast(mask, dtype=loss.dtype)
    loss *= mask
    return tf.reduce_mean(loss)

In [11]:
@tf.function
def train(img_tensor, target, encoder, decoder, optimizer, vocab):
    loss = 0
    hidden, cell = decoder.reset_state(batch_size = target.shape[0])
    dec_input = tf.expand_dims([vocab.word2idx['<start>']] * target.shape[0], 1)
    features = encoder(img_tensor)
    
    for i in range(1, target.shape[1]):
        predictions, hidden, cell, attention_wts = decoder(dec_input, features, hidden, cell)
        loss += loss_function(target[:, i], predictions)
        dec_input = tf.expand_dims(target[:, i], 1)

    total_loss = loss / int(target.shape[1])
    variables = encoder.trainable_variables + decoder.trainable_variables
    gradients = tf.gradients(total_loss, variables)
    optimizer.apply_gradients(zip(gradients, variables))
    
    return loss

In [12]:
def beam_search(img, encoder, decoder, vocab, beam_width=3, maxLen=50):
    img = tf.expand_dims(img, 0)
    features = encoder(img)
    hidden, cell = decoder.reset_state(batch_size=1)
    
    start_token = vocab.word2idx['<start>']
    end_token = vocab.word2idx['<end>']
    beams = [([start_token], hidden, cell, 0.0)] 
    
    for _ in range(maxLen):
        candidates = []
        for seq, h, c, score in beams:
            if seq[-1] == end_token:
                candidates.append((seq, h, c, score))
                continue
            
            dec_input = tf.expand_dims([seq[-1]], 0)
            predictions, h_new, c_new, _ = decoder(dec_input, features, h, c)
            predictions = tf.nn.softmax(predictions, axis = -1)
            top_k_probs, top_k_indices = tf.math.top_k(predictions, k=beam_width)
            
            for i in range(beam_width):
                word_idx = top_k_indices[0][i].numpy()
                word_log_prob = top_k_probs[0][i].numpy()
                candidates.append((seq + [word_idx], h_new, c_new, score + word_log_prob))
                
                
        ordered = sorted(candidates, key = lambda tup : tup[3] / len(tup[0]), reverse=True)
        beams = ordered[:beam_width]
        
        if all(seq[-1] == end_token for seq, _, _, _ in beams):
            break
        best_seq = beams[0][0]
        caption = [vocab.idx2word[idx] for idx in best_seq if idx not in [start_token, end_token, vocab.word2idx['<pad>']]]
        return ' '.join(caption)

In [14]:

def save_model(encoder, decoder, vocab):
    encoder.save("encoder_model")
    decoder.save_weights("decoder_weights.h5")

    with open("vocab.pkl", "wb") as f:
        pickle.dump(vocab, f)
        
def load_model_for_inference(vocab_dim, embed_dim=256, units=512):
    encoder = tf.keras.models.load_model("encoder_model", custom_objects={"EncoderCNN": EncoderCNN})
    decoder = DecoderRNN(vocab_dim, embedding_dim=embed_dim, units=units)
    decoder.build(input_shape=(None, 1))
    decoder.load_weights("decoder_weights.h5")

    import pickle
    with open("vocab.pkl", "rb") as f:
        vocab = pickle.load(f)

    return encoder, decoder, vocab

In [26]:
root_folder = "/home/devcontainers/Datasets/Images"
caption_file = "/home/devcontainers/Datasets/captions.txt"

vocab = Vocabulary(freq_threshold=5)
dataset = Dataset(root_folder, caption_file, vocab, transform_img)
vocab.build_vocab(dataset.captions)

train_ds = tf.data.Dataset.from_generator(
    lambda: data_generator(dataset),
    output_signature=(
        tf.TensorSpec(shape=(224, 224, 3), dtype=tf.float32),
        tf.TensorSpec(shape=(None,), dtype=tf.int32)
    )
).padded_batch(32, padded_shapes=([224, 224, 3], [None]))

# Init Model
encoder = EncoderCNN(embedding_dim=256)
decoder = DecoderRNN(len(vocab.word2idx), embedding_dim=256, units=512)
optimizer = tf.keras.optimizers.Adam()

# Train
EPOCHS = 5
for epoch in range(EPOCHS):
    for (img_tensor, target) in tqdm(train_ds):
        loss = train(img_tensor, target, encoder, decoder, optimizer, vocab)
    print(f"Epoch {epoch+1}, Loss: {loss.numpy():.4f}")

# Save model
save_model(encoder, decoder, vocab)

# Inference
encoder, decoder, vocab = load_model_for_inference(len(vocab.word2idx))

# Load test image
test_img_path = os.path.join(root_folder, dataset.images[0])
test_img = transform_img(Image.open(test_img_path).convert("RGB"))

# Generate caption
print("Generated Caption:", beam_search(test_img, encoder, decoder, vocab))

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/resnet/resnet50_weights_tf_dim_ordering_tf_kernels_notop.h5
[1m94765736/94765736[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m11s[0m 0us/step


0it [00:00, ?it/s]2025-07-28 00:29:04.463035: W tensorflow/core/framework/op_kernel.cc:1844] UNKNOWN: NameError: name 'Image' is not defined
Traceback (most recent call last):

  File "/home/devcontainers/miniconda3/envs/agents-env/lib/python3.11/site-packages/tensorflow/python/ops/script_ops.py", line 269, in __call__
    ret = func(*args)
          ^^^^^^^^^^^

  File "/home/devcontainers/miniconda3/envs/agents-env/lib/python3.11/site-packages/tensorflow/python/autograph/impl/api.py", line 643, in wrapper
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^

  File "/home/devcontainers/miniconda3/envs/agents-env/lib/python3.11/site-packages/tensorflow/python/data/ops/from_generator_op.py", line 198, in generator_py_func
    values = next(generator_state.get_iterator(iterator_id))
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

  File "/tmp/ipykernel_113036/1623902149.py", line 9, in data_generator
    img, cap = dataset[i]
               ~~~~~~~^^^

  File 

UnknownError: {{function_node __wrapped__IteratorGetNext_output_types_2_device_/job:localhost/replica:0/task:0/device:CPU:0}} NameError: name 'Image' is not defined
Traceback (most recent call last):

  File "/home/devcontainers/miniconda3/envs/agents-env/lib/python3.11/site-packages/tensorflow/python/ops/script_ops.py", line 269, in __call__
    ret = func(*args)
          ^^^^^^^^^^^

  File "/home/devcontainers/miniconda3/envs/agents-env/lib/python3.11/site-packages/tensorflow/python/autograph/impl/api.py", line 643, in wrapper
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^

  File "/home/devcontainers/miniconda3/envs/agents-env/lib/python3.11/site-packages/tensorflow/python/data/ops/from_generator_op.py", line 198, in generator_py_func
    values = next(generator_state.get_iterator(iterator_id))
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

  File "/tmp/ipykernel_113036/1623902149.py", line 9, in data_generator
    img, cap = dataset[i]
               ~~~~~~~^^^

  File "/tmp/ipykernel_113036/3396009401.py", line 16, in __getitem__
    img = Image.open(img_path).convert("RGB")
          ^^^^^

NameError: name 'Image' is not defined


	 [[{{node PyFunc}}]] [Op:IteratorGetNext] name: 