In [None]:
import os
import numpy as np
import json
import pickle
from PIL import Image
import tensorflow as tf
from tensorflow.keras.applications import EfficientNetB4
from tensorflow.keras.layers import (Layer, Dense, Embedding, MultiHeadAttention, LayerNormalization, Dropout, Input, Add, Concatenate)
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import LearningRateScheduler, EarlyStopping
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from sklearn.model_selection import train_test_split
from nltk.translate.bleu_score import corpus_bleu
import matplotlib.pyplot as plt
import nltk
import re

nltk.download('punkt')
nltk.download('punkt_tab')

In [None]:
# 1. Configuration
class Config:

  #Dataset
  Image_Dir = '/content/coco_dataset/images/train2017'
  Ann_path = '/content/coco_dataset/annotations/annotations/captions_train2017.json'

  EMBED_DIM = 512
  TRANSFORMER_HEADS = 8
  TRANSFORMER_LAYERS = 3
  DROPOUT_RATE = 0.2
  LEARNING_RATE = 0.0001
  BATCH_SIZE = 64
  EPOCHS = 30
  MAX_SEQ_LENGTH = 40
  VOCAB_SIZE = 10000

  #Data Augmentation
  ROTATION_RANGE = 15
  WIDTH_SHIFT_RANGE = 0.1
  BRIGHTNESS_RANGE = [0.9, 1.1]

  MODEL_SAVE_PATH = 'coco_captioning_transformer.h5'
  TOKENIZER_SAVE_PATH = 'tokenizer.pkl'

config = Config()

In [None]:
# 2. Data Loading & Augmentation

class CocoDataLoader:
    def __init__(self, config):
        self.config = config
        self.image_dir = config.Image_Dir
        self.augmentor = tf.keras.preprocessing.image.ImageDataGenerator(
            rotation_range=config.ROTATION_RANGE,
            width_shift_range=config.WIDTH_SHIFT_RANGE,
            brightness_range=config.BRIGHTNESS_RANGE,
            horizontal_flip=True
        )

    def load_data(self, max_images=2000):
        with open(self.config.Ann_path) as f:
            data = json.load(f)

            # Extract images and captions (up to max_images)
            images = []
            captions = []

            for ann in data['annotations']:
                if len(images) >= max_images:
                    break  # Stop after collecting 2000 images

                img_path = os.path.join(self.image_dir, f"{ann['image_id']:012d}.jpg")
                if os.path.exists(img_path):
                    images.append(img_path)
                    captions.append(ann['caption'])

            return images, captions

In [None]:
# 3. Feature Extraction
class FeatureExtractor:
  def __init__(self):
    self.model = EfficientNetB4(weights='imagenet', include_top=False, pooling='avg')

  def extract(self, image_path):
    try:
      img = Image.open(image_path).convert('RGB').resize((380, 380))
      img = np.array(img) / 255.0
      img = tf.keras.applications.efficientnet.preprocess_input(img)
      features = self.model.predict(np.expand_dims(img, axis = 0), verbose=0)
      return features.flatten()
    except Exception as e:
      print(f"Error extracting features from image {image_path}: {e}")
      return None

In [None]:
# 4. Text Processing
class TextProcessor:
  def __init__(self, config):
    self.config = config
    self.tokenizer = Tokenizer(num_words=config.VOCAB_SIZE, oov_token= '<unk>')
  def preprocess(self, text):
    text = text.lower()
    text  =re.sub(r'[^\w\s]', '', text)
    return ' '.join(word_tokenize(text))
  def build_vocab(self, captions):
    processed = [self.preprocess(c) for c in captions]
    self.tokenizer.fit_on_texts(processed)
    self.tokenizer.word_index['<start>'] = len(self.tokenizer.word_index) + 1
    self.tokenizer.word_index['<end>'] = len(self.tokenizer.word_index) + 1
    return self.tokenizer
  def encode(self, captions):
    sequences =[]
    for cap in captions:
      seq = self.tokenizer.texts_to_sequences(
          ['<start>' + self.preprocess(cap) + ' <end>']
      )[0]
      sequences.append(seq)
      return pad_sequences(sequences, maxlen=self.config.MAX_SEQ_LENGTH, padding='post')

In [None]:
# 5. Transformer Architecture
class TransformerBlock(Layer):
  def __init__(self, embed_dim, num_heads, dropout_rate=0.2):
    super().__init()
    self.att = MultiHeadAttention(num_heads, embed_dim)
    self.fin = tf.keras.Sequential([
        Dense(embed_dim, activation ='relu'),
        Dense(embed_dim)
    ])
    self.layernorm1 = LayerNormalization()
    self.layernorm2 = LayerNormalization()
    self.dropout1 = Dropout(dropout_rate)
    self.dropout2 = Dropout(dropout_rate)
  def call(self, inputs):
    attn_output = self.att(inputs, inputs)
    attn_output = self.dropout1(attn_output)
    out1 = self.layernorm1(inputs + attn_output)
    ffn_output = self.fin(out1)
    return self.layernorm2(out1 + self.dropout2(ffn_output))
class ImageCaptioningModel:
  def __init__(self, config):
    self.config = config

  def build(self):
    image_input = Input(shape=(1792, ))
    image_dense = Dense(self.config.EMBED_DIM, activation='relu')(image_input)
    image_features = tf.expand_dims(image_dense, 1)

    caption_input = Input(shape=(self.config.MAX_SEQ_LENGTH, ))
    embedding = Embedding(
        self.config.VOCAB_SIZE.
        self.config.EMBED_DIM,
        mask_zero=True
    )(caption_input)

    x = embedding
    for _ in range(self.config.TRANSFORMER_LAYERS):
      x = TransformerBlock(
          self.config.EMBED_DIM,
          self.config.TRANSFORMER_HEADS,
          self.config.DROPOUT_RATE
      )(x)
    outputs = Dense(self.config.VOCAB_SIZE, activation='softmax')(x)

    model = Model(inputs=[image_input,  caption_input], outputs = outputs)
    model.compile(
        optimizer = Adam(self.config.LEARNING_RATE),
        loss = 'sparse_categorical_crossentropy',
        metrics=['accuracy']
    )
    return model

In [None]:
# 6. Training Pipeline

def train():
  loader = CocoDataLoader(config)
  image_paths, captions = loader.load_data()


  fe = FeatureExtractor()
  features = []
  valid_captions = []
  for img_path, cap in zip(image_paths, captions):
    feat = fe.extract(img_path)
    if feat is not None:
      features.append(feat)
      valid_captions.append(cap)

  tp = TextProcessor(config)
  tokenizer = tp.build_vocab(valid_captions)
  sequences = tp.encode(valid_captions)

  X_train, X_val, y_train, y_val = train_test_split(
      np.array(features),
      np.array(sequences),
      test_size = 0.2,
      random_state = 42
  )

  model = ImageCaptioningModel(config).build()


  callbacks =[
      EarlyStopping(patience=5, restore_best_weights=True),
      LearningRateScheduler(lambda epoch: config.LEARNING_RATE * (0.9 ** epoch))
  ]

  history = model.fit(
      x=[X_train, y_train[:, :-1]],
      y=y_train[:, 1:],
      validation_data=([X_val, y_val[:, :-1]], y_val[:, 1:]),
      epochs=config.EPOCHS,
      batch_size=config.BATCH_SIZE,
      callbacks=callbacks
  )

  model.save(config.MODEL_SAVE_PATH)
  with open(config.TOKENIZER_SAVE_PATH, 'wb') as f:
    pickle.dump(tokenizer, f)


  return model, tokenizer, history

In [None]:
# 7. Evaluation & Inference
class CaptionGenerator:
  def __init__(self, model_path, tokenizer_path):
    self.model = tf.keras.model.load_model(
        model_path,
        custom_objects={'TransformerBlock': TransformerBlock}
    )
    with open (tokenizer_path, 'rb') as f:
      self.tokenizer = pickle.load(f)

  def generate_caption(self, image_path, beam_width=3):
    fe = FeatureExtractor()
    features = fe.extractor(image_path)
    if features is None:
      return None

    start_token = self.tokenizer.word_index['<start>']
    end_token = self.tokenizer.word_index['<end>']

    sequences = [[[start_token], 0.0]]

    for _ in range(config.MAX_SEQ_LENGTH -  1):
      all_candidates = []
      for seq, score in sequences:
        if seq[-1] == end_token:
          all_candidates.append((seq, score))
          continue

        input_seq = pad_sequences([seq], max_len=config.MAX_SEQ_LENGTH, padding='post')

        preds = self.model.predict(
            [np.array([features]), input_seq],
            verbose=0
        )[0]

        top_k = np.argsort(preds[-1])[-beam_width:]
        for word_id in top_k:
          candidate_seq = seq + [word_id]
          candidate_score  =score = np.log(preds[-1][word_id])
          all_candidates.append((candidate_seq, candidate_score))
      ordered = sorted(all_candidates, key=lambda x: x[1])
      sequences = ordered[:beam_width]
    best_seq = sequences[0][0]
    caption = []
    for word_id in best_seq:
      word = self.tokenizer.index_word.get(word_id, '<unk>')
      if word == '<end>':
        break
      if word != '<start>':
        caption.append(word)
    return ' '.join(caption)

  def evaluate(self, test_images, test_captions):
    refrences = []
    hypothesis = []

    for img_path, true_cap in zip (test_images, test_captions):
      pred_cap = self.generate_caption(img_path)
      if pred_cap:
        refrences.append([true_cap.split()])
        hypothesis.append(pred_cap.split())

    bleu4 = corpus_bleu(refrences, hypothesis)
    return bleu4


In [None]:
# 8. Main

if __name__ == "__main__":
  model, tokenizer, history = train()

  generator = CaptionGenerator(config.MODEL_SAVE_PATH, config.TOKENIZER_SAVE_PATH)

  test_image = "/content/coco_dataset/images/val2017/000000000139.jpg"
  print("Generated Caption: ", generator.generate_caption(test_image))

  sample_images = [...]  # List of test image paths
  sample_captions = [...]  # List of ground truth captions
  bleu_score = generator.evaluate(sample_images, sample_captions)
  print(f"BLEU-4 Score: {bleu_score:.4f}")