## Import Modules

In [1]:
import os                                                                        # used to handle files using system commands.
import pickle                                                                    # used to store numpy features extracted.
import numpy as np                                                               # used to perform a wide variety of mathematical operations on arrays.
import tensorflow as tf
import pandas as pd
import matplotlib.pyplot as plt
from tqdm.notebook import tqdm                                                   # progress bar decorator for iterators. Includes a default range iterator printing to stderr.
from tensorflow.keras.applications.vgg16 import VGG16, preprocess_input          # imported modules for feature extraction from the image data.
from tensorflow.keras.preprocessing.image import load_img, img_to_array          # used for loading the image and converting the image to a numpy array.
from tensorflow.keras.preprocessing.text import Tokenizer                        # used for loading the text as convert them into a token.
from tensorflow.keras.preprocessing.sequence import pad_sequences                # used for equal distribution of words in sentences filling the remaining spaces with zeros.
from tensorflow.keras.models import Model
from tensorflow.keras.utils import to_categorical, plot_model                    # used to visualize the architecture of the model through different images.
from tensorflow.keras.layers import Input, Dense, LSTM, Embedding, Dropout, Add, LayerNormalization
from tensorflow.keras.layers import MultiHeadAttention
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau, TensorBoard
import gc
import datetime
import re
from sklearn.model_selection import train_test_split

## Directories of the data

In [2]:
BASE_DIR = 'Project/Flickr 8k Dataset'
WORKING_DIR = 'Project'

## Load VGG16 model without top layer

In [3]:
# load vgg16 model
vgg_model = VGG16(weights='imagenet')  
# restructure the model
vgg_model = Model(inputs=vgg_model.inputs, outputs=vgg_model.layers[-2].output)
# summarize
print(vgg_model.summary())

Model: "model"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_1 (InputLayer)        [(None, 224, 224, 3)]     0         
                                                                 
 block1_conv1 (Conv2D)       (None, 224, 224, 64)      1792      
                                                                 
 block1_conv2 (Conv2D)       (None, 224, 224, 64)      36928     
                                                                 
 block1_pool (MaxPooling2D)  (None, 112, 112, 64)      0         
                                                                 
 block2_conv1 (Conv2D)       (None, 112, 112, 128)     73856     
                                                                 
 block2_conv2 (Conv2D)       (None, 112, 112, 128)     147584    
                                                                 
 block2_pool (MaxPooling2D)  (None, 56, 56, 128)       0     

## Extract the image features and load the data for preprocess

In [4]:
def extract_image_features(): 
    # Extract features from all images
    features = {}
    directory = os.path.join(BASE_DIR, 'Images')
    
    if not os.path.exists(directory):
        print(f" Error: Images directory not found at {directory}")
        return None
    
    image_files = [f for f in os.listdir(directory) if f.lower().endswith(('.jpg', '.jpeg', '.png'))]
    print(f"Found {len(image_files)} images")
    
    for img_name in tqdm(image_files, desc="Extracting features"):
        try:
            # Load and preprocess image
            img_path = os.path.join(directory, img_name)
            image = load_img(img_path, target_size=(224, 224))
            image = img_to_array(image)
            image = image.reshape((1, image.shape[0], image.shape[1], image.shape[2]))
            image = preprocess_input(image)
            
            # Extract features
            feature = vgg_model.predict(image, verbose=0)
            
            # Store feature with image ID (without extension)
            image_id = img_name.split('.')[0]
            features[image_id] = feature
            
        except Exception as e:
            print(f"Error processing {img_name}: {e}")
    
    print(f"Feature extraction completed. Extracted features for {len(features)} images")
    
    # Save features to pickle file
    pickle_path = os.path.join(WORKING_DIR, 'features.pkl')
    with open(pickle_path, 'wb') as f:
        pickle.dump(features, f)
    print(f"Features saved to {pickle_path}")
    
    return features


## Load features from pickle

In [5]:
def load_image_features():
    """Load pre-extracted image features"""
    pickle_path = os.path.join(WORKING_DIR, 'features.pkl')
    
    if os.path.exists(pickle_path):
        print("Loading pre-extracted features...")
        with open(pickle_path, 'rb') as f:
            features = pickle.load(f)
        print(f" Loaded features for {len(features)} images")
        return features
    else:
        print("No pre-extracted features found. Starting extraction...")
        return extract_image_features()

# Extract or load features
features = load_image_features()

if features is None:
    print(" Failed to extract features. Please check your image directory.")
    exit()


No pre-extracted features found. Starting extraction...
Found 8091 images


Extracting features:   0%|          | 0/8091 [00:00<?, ?it/s]

Feature extraction completed. Extracted features for 8091 images
Features saved to Project\features.pkl


## LOAD AND PROCESS CAPTIONS

In [6]:
def load_captions():
    """Load captions from the text file"""
    captions_path = os.path.join(BASE_DIR, 'captions.txt')
    
    if not os.path.exists(captions_path):
        print(f" Error: Captions file not found at {captions_path}")
        return None
    
    print("Loading captions...")
    with open(captions_path, 'r', encoding='utf-8') as f:
        next(f)  # Skip header
        captions_doc = f.read()
    
    # Create mapping of image_id to captions
    mapping = {}
    for line in tqdm(captions_doc.split('\n'), desc="Processing captions"):
        if len(line.strip()) < 2:
            continue
        
        # Split by comma - first part is image_id, rest is caption
        tokens = line.split(',')
        if len(tokens) < 2:
            continue
        
        image_id = tokens[0].split('.')[0]  # Remove file extension
        caption = ','.join(tokens[1:])  # Rejoin in case caption contains commas
        
        if image_id not in mapping:
            mapping[image_id] = []
        
        mapping[image_id].append(caption)
    
    print(f"Loaded captions for {len(mapping)} images")
    print(f"Total captions: {sum(len(caps) for caps in mapping.values())}")
    
    return mapping

# Load captions
mapping = load_captions()

if mapping is None:
    print("Failed to load captions. Please check your captions file.")
    exit()

Loading captions...


Processing captions:   0%|          | 0/40456 [00:00<?, ?it/s]

Loaded captions for 8091 images
Total captions: 40455


## TEXT PREPROCESSING

In [7]:
def clean_captions(mapping):
    """Clean and preprocess caption text"""
    print("Cleaning captions...")
    
    cleaned_count = 0
    for key, captions in mapping.items():
        for i in range(len(captions)):
            caption = captions[i]
            
            # Convert to lowercase
            caption = caption.lower()
            
            # Remove punctuation and special characters
            caption = re.sub(r'[^a-zA-Z\s]', '', caption)
            
            # Remove extra spaces
            caption = re.sub(r'\s+', ' ', caption).strip()
            
            # Remove very short words (length < 2)
            words = [word for word in caption.split() if len(word) >= 2]
            
            # Add start and end tokens
            caption = 'startseq ' + ' '.join(words) + ' endseq'
            
            captions[i] = caption
            cleaned_count += 1
    
    print(f" Cleaned {cleaned_count} captions")
    
    # Show some examples
    sample_key = list(mapping.keys())[0]
    print("\nSample cleaned captions:")
    for i, caption in enumerate(mapping[sample_key][:3]):
        print(f"  {i+1}. {caption}")
    
    return mapping

# Clean captions
mapping = clean_captions(mapping)


Cleaning captions...
 Cleaned 40455 captions

Sample cleaned captions:
  1. startseq child in pink dress is climbing up set of stairs in an entry way endseq
  2. startseq girl going into wooden building endseq
  3. startseq little girl climbing into wooden playhouse endseq


## TOKENIZATION AND VOCABULARY CREATION

In [8]:
def create_tokenizer(mapping):
    """Create tokenizer from all captions"""
    print("Creating tokenizer...")
    
    # Collect all captions
    all_captions = []
    for key in mapping:
        for caption in mapping[key]:
            all_captions.append(caption)
    
    print(f"Total captions for tokenization: {len(all_captions)}")
    
    # Create and fit tokenizer
    tokenizer = Tokenizer()
    tokenizer.fit_on_texts(all_captions)
    
    vocab_size = len(tokenizer.word_index) + 1
    
    # Calculate max sequence length
    max_length = max(len(caption.split()) for caption in all_captions)
    
    print(f"✓ Vocabulary size: {vocab_size}")
    print(f"✓ Maximum sequence length: {max_length}")
    
    # Show most common words
    word_counts = [(word, tokenizer.word_counts[word]) for word in tokenizer.word_index]
    word_counts.sort(key=lambda x: x[1], reverse=True)
    
    print("\nMost common words:")
    for word, count in word_counts[:10]:
        print(f"  '{word}': {count}")
    
    return tokenizer, vocab_size, max_length, all_captions

# Create tokenizer
tokenizer, vocab_size, max_length, all_captions = create_tokenizer(mapping)


Creating tokenizer...
Total captions for tokenization: 40455
✓ Vocabulary size: 8768
✓ Maximum sequence length: 34

Most common words:
  'startseq': 40455
  'endseq': 40455
  'in': 18974
  'the': 18418
  'on': 10743
  'is': 9345
  'and': 8851
  'dog': 8136
  'with': 7765
  'man': 7265


## TRAIN-TEST SPLIT

In [9]:
def split_data(mapping, features, test_size=0.1):
    """Split data into train and test sets"""
    print("Splitting data into train and test sets...")
    
    # Get image IDs that have both features and captions
    valid_image_ids = [img_id for img_id in mapping.keys() if img_id in features]
    
    print(f"Images with both captions and features: {len(valid_image_ids)}")
    
    # Split data
    train_ids, test_ids = train_test_split(
        valid_image_ids, 
        test_size=test_size, 
        random_state=42
    )
    
    print(f" Training images: {len(train_ids)}")
    print(f" Test images: {len(test_ids)}")
    
    return train_ids, test_ids

# Split data
train_ids, test_ids = split_data(mapping, features)

Splitting data into train and test sets...
Images with both captions and features: 8091
 Training images: 7281
 Test images: 810


## DATA GENERATOR

In [10]:
def create_data_generator(data_keys, mapping, features, tokenizer, max_length, vocab_size, batch_size):
    """Create data generator for training"""
    
    def data_generator():
        while True:
            X1, X2, y = [], [], []
            
            for key in data_keys:
                if key not in features:
                    continue
                
                captions = mapping[key]
                for caption in captions:
                    # Encode caption to sequence
                    seq = tokenizer.texts_to_sequences([caption])[0]
                    
                    # Only process sequences with reasonable length
                    if len(seq) < 3:  # Too short
                        continue
                    
                    # Pad sequence
                    seq = pad_sequences([seq], maxlen=max_length, padding='post')[0]
                    
                    # Create input-output pairs
                    input_seq = seq[:-1]  # All tokens except last
                    target_seq = seq[1:]  # All tokens except first
                    
                    # Add to batch
                    X1.append(features[key][0])
                    X2.append(input_seq)
                    y.append(target_seq)
                    
                    # Yield batch when ready
                    if len(X1) == batch_size:
                        yield [np.array(X1), np.array(X2)], np.array(y)
                        X1, X2, y = [], [], []
            
            # Yield remaining data
            if len(X1) > 0:
                yield [np.array(X1), np.array(X2)], np.array(y)
    
    return data_generator

## MODEL ARCHITECTURE

In [11]:
# Model parameters
EMBED_DIM = 256
LSTM_UNITS = 512
DROPOUT_RATE = 0.3
BATCH_SIZE = 32
EPOCHS = 25
LEARNING_RATE = 0.0005

In [12]:
def build_caption_model(vocab_size, max_length, embed_dim=EMBED_DIM, lstm_units=LSTM_UNITS):
    """Build the caption generation model"""
    print("Building model architecture...")
    
    # Image input
    image_input = Input(shape=(4096,), name='image')
    image_dense = Dense(embed_dim, activation='relu')(image_input)
    image_dense = Dropout(DROPOUT_RATE)(image_dense)
    image_dense = Dense(embed_dim, activation='relu')(image_dense)  # Additional layer
    
    # Text input
    text_input = Input(shape=(max_length-1,), name='text')
    text_embed = Embedding(vocab_size, embed_dim, mask_zero=True)(text_input)
    text_embed = Dropout(DROPOUT_RATE)(text_embed)
    
    # Repeat image features for each time step
    image_seq = tf.expand_dims(image_dense, 1)
    image_seq = tf.tile(image_seq, [1, max_length-1, 1])
    
    # Combine image and text features
    combined = tf.concat([text_embed, image_seq], axis=-1)
    
    # LSTM layers
    lstm1 = LSTM(lstm_units, return_sequences=True, dropout=DROPOUT_RATE, 
                 recurrent_dropout=DROPOUT_RATE)(combined)
    lstm2 = LSTM(lstm_units, return_sequences=True, dropout=DROPOUT_RATE, 
                 recurrent_dropout=DROPOUT_RATE)(lstm1)
    
    # Dense layers
    dense = Dense(512, activation='relu')(lstm2)
    dense = Dropout(0.4)(dense)
    dense = Dense(256, activation='relu')(dense)
    dense = Dropout(0.4)(dense)
    
    # Output layer
    output = Dense(vocab_size, activation='softmax')(dense)
    
    # Create model
    model = Model(inputs=[image_input, text_input], outputs=output)
    
    # Compile model
    optimizer = tf.keras.optimizers.Adam(learning_rate=LEARNING_RATE)
    model.compile(
        loss='sparse_categorical_crossentropy',
        optimizer=optimizer,
        metrics=['accuracy']
    )
    
    print(" Model built successfully")
    print(model.summary())
    
    return model

# Build model
model = build_caption_model(vocab_size, max_length)

# Plot model architecture
try:
    plot_model(model, to_file='caption_model_architecture.png', show_shapes=True, show_layer_names=True)
    print(" Model architecture saved to caption_model_architecture.png")
except:
    print(" Could not save model architecture plot")

Building model architecture...
 Model built successfully
Model: "model_1"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 image (InputLayer)             [(None, 4096)]       0           []                               
                                                                                                  
 dense (Dense)                  (None, 256)          1048832     ['image[0][0]']                  
                                                                                                  
 dropout (Dropout)              (None, 256)          0           ['dense[0][0]']                  
                                                                                                  
 text (InputLayer)              [(None, 33)]         0           []                               
                                   

## TRAINING SETUP

In [13]:
def setup_training():
    """Setup training parameters and callbacks"""
    print("Setting up training...")
    
    # Calculate steps per epoch
    train_samples = sum(len(mapping[key]) for key in train_ids if key in mapping)
    val_samples = sum(len(mapping[key]) for key in test_ids if key in mapping)
    
    train_steps = max(1, train_samples // BATCH_SIZE)
    val_steps = max(1, val_samples // BATCH_SIZE)
    
    print(f"Training samples: {train_samples}")
    print(f"Validation samples: {val_samples}")
    print(f"Steps per epoch - Train: {train_steps}, Validation: {val_steps}")
    
    # Callbacks
    callbacks = [
        EarlyStopping(
            monitor='val_loss',
            patience=5,
            restore_best_weights=True,
            verbose=1
        ),
        ModelCheckpoint(
            'best_caption_model.h5',
            monitor='val_loss',
            save_best_only=True,
            verbose=1
        ),
        ReduceLROnPlateau(
            monitor='val_loss',
            factor=0.8,
            patience=3,
            min_lr=1e-7,
            verbose=1
        )
    ]
    
    # Create generators
    train_gen_func = create_data_generator(
        train_ids, mapping, features, tokenizer, max_length, vocab_size, BATCH_SIZE
    )
    val_gen_func = create_data_generator(
        test_ids, mapping, features, tokenizer, max_length, vocab_size, BATCH_SIZE
    )
    
    train_generator = train_gen_func()
    val_generator = val_gen_func()
    
    return train_generator, val_generator, train_steps, val_steps, callbacks

# Setup training
train_generator, val_generator, train_steps, val_steps, callbacks = setup_training()


Setting up training...
Training samples: 36405
Validation samples: 4050
Steps per epoch - Train: 1137, Validation: 126


## MODEL TRAINING

In [14]:
def train_model():
    """Train the caption generation model"""
    print("\n" + "="*60)
    print("STARTING MODEL TRAINING")
    print("="*60)
    print(f"Epochs: {EPOCHS}")
    print(f"Batch size: {BATCH_SIZE}")
    print(f"Learning rate: {LEARNING_RATE}")
    print("="*60)
    
    # Train the model
    history = model.fit(
        train_generator,
        steps_per_epoch=train_steps,
        epochs=EPOCHS,
        validation_data=val_generator,
        validation_steps=val_steps,
        callbacks=callbacks,
        verbose=1
    )
    
    print("\n Training completed!")
    
    # Save final model
    model.save('final_caption_model.h5')
    print(" Final model saved")
    
    # Save tokenizer
    with open('tokenizer.pkl', 'wb') as f:
        pickle.dump(tokenizer, f)
    print(" Tokenizer saved")
    
    return history

# Start training
print("Ready to start training!")
print("Uncomment the next line to begin training:")
print("# history = train_model()")

# start training:
history = train_model()

Ready to start training!
Uncomment the next line to begin training:
# history = train_model()

STARTING MODEL TRAINING
Epochs: 25
Batch size: 32
Learning rate: 0.0005
Epoch 1/25
Epoch 1: val_loss improved from inf to 1.45058, saving model to best_caption_model.h5
Epoch 2/25
Epoch 2: val_loss improved from 1.45058 to 1.34676, saving model to best_caption_model.h5
Epoch 3/25
Epoch 3: val_loss improved from 1.34676 to 1.27622, saving model to best_caption_model.h5
Epoch 4/25
Epoch 4: val_loss improved from 1.27622 to 1.23011, saving model to best_caption_model.h5
Epoch 5/25
Epoch 5: val_loss improved from 1.23011 to 1.19651, saving model to best_caption_model.h5
Epoch 6/25
Epoch 6: val_loss improved from 1.19651 to 1.17029, saving model to best_caption_model.h5
Epoch 7/25
Epoch 7: val_loss improved from 1.17029 to 1.15341, saving model to best_caption_model.h5
Epoch 8/25
Epoch 8: val_loss improved from 1.15341 to 1.14223, saving model to best_caption_model.h5
Epoch 9/25
Epoch 9: val_loss 

## INFERENCE FUNCTIONS

In [15]:
def generate_caption(model, tokenizer, image_feature, max_length, method='greedy'):
    """Generate caption for an image"""
    
    if method == 'greedy':
        return generate_caption_greedy(model, tokenizer, image_feature, max_length)
    elif method == 'beam_search':
        return generate_caption_beam_search(model, tokenizer, image_feature, max_length)
    else:
        return generate_caption_greedy(model, tokenizer, image_feature, max_length)

In [16]:
def generate_caption_greedy(model, tokenizer, image_feature, max_length):
    """Generate caption using greedy search"""
    in_text = 'startseq'
    
    for i in range(max_length):
        # Encode input sequence
        sequence = tokenizer.texts_to_sequences([in_text])[0]
        sequence = pad_sequences([sequence], maxlen=max_length-1, padding='post')
        
        # Predict next word
        y_pred = model.predict([image_feature.reshape(1, -1), sequence], verbose=0)
        
        # Get word with highest probability
        y_pred = np.argmax(y_pred[0, len(tokenizer.texts_to_sequences([in_text])[0])-1, :])
        
        # Convert index to word
        word = None
        for word_text, index in tokenizer.word_index.items():
            if index == y_pred:
                word = word_text
                break
        
        # Stop if no word found or end sequence
        if word is None or word == 'endseq':
            break
        
        # Add word to sequence
        in_text += ' ' + word
    
    # Remove start sequence
    caption = in_text.replace('startseq ', '')
    return caption

In [17]:
def generate_caption_beam_search(model, tokenizer, image_feature, max_length, beam_width=3):
    """Generate caption using beam search"""
    # Initialize beam
    sequences = [(['startseq'], 0.0)]
    
    for _ in range(max_length):
        all_candidates = []
        
        for seq, score in sequences:
            if seq[-1] == 'endseq':
                all_candidates.append((seq, score))
                continue
            
            # Encode sequence
            text = ' '.join(seq)
            encoded = tokenizer.texts_to_sequences([text])[0]
            padded = pad_sequences([encoded], maxlen=max_length-1, padding='post')
            
            # Get predictions
            preds = model.predict([image_feature.reshape(1, -1), padded], verbose=0)
            
            # Get top predictions
            position = min(len(encoded)-1, max_length-2)
            top_indices = np.argsort(preds[0, position, :])[-beam_width:]
            
            for idx in top_indices:
                # Find word for index
                word = None
                for w, i in tokenizer.word_index.items():
                    if i == idx:
                        word = w
                        break
                
                if word:
                    new_seq = seq + [word]
                    new_score = score + np.log(preds[0, position, idx] + 1e-8)
                    all_candidates.append((new_seq, new_score))
        
        # Keep top sequences
        sequences = sorted(all_candidates, key=lambda x: x[1], reverse=True)[:beam_width]
        
        # Check if all ended
        if all(seq[-1] == 'endseq' for seq, _ in sequences):
            break
    
    # Return best sequence
    best_seq = sequences[0][0]
    caption = ' '.join(best_seq[1:]).replace(' endseq', '')
    return caption

In [18]:
def predict_caption_for_image(image_path, model, tokenizer, max_length):
    """Complete pipeline for new image"""
    # Load VGG16 model
    vgg_model = VGG16(weights='imagenet')
    vgg_model = Model(inputs=vgg_model.inputs, outputs=vgg_model.layers[-2].output)
    
    # Extract features
    image = load_img(image_path, target_size=(224, 224))
    image = img_to_array(image)
    image = image.reshape((1, image.shape[0], image.shape[1], image.shape[2]))
    image = preprocess_input(image)
    feature = vgg_model.predict(image, verbose=0)
    
    # Generate caption
    caption = generate_caption(model, tokenizer, feature, max_length)
    return caption

## EVALUATION AND TESTING

In [19]:
def evaluate_model(model, test_ids, mapping, features, tokenizer, max_length, num_samples=5):
    """Evaluate model on test samples"""
    print("\n" + "="*60)
    print("MODEL EVALUATION")
    print("="*60)
    
    for i, image_id in enumerate(test_ids[:num_samples]):
        if image_id not in features or image_id not in mapping:
            continue
        
        print(f"\n--- Test {i+1}: Image {image_id} ---")
        
        # Actual captions
        actual_captions = mapping[image_id]
        print("Actual captions:")
        for j, caption in enumerate(actual_captions):
            print(f"  {j+1}. {caption}")
        
        # Generated caption
        try:
            generated = generate_caption(model, tokenizer, features[image_id], max_length)
            print(f"\nGenerated caption: {generated}")
            
            # Also try beam search
            generated_beam = generate_caption(model, tokenizer, features[image_id], max_length, method='beam_search')
            print(f"Generated (beam search): {generated_beam}")
            
        except Exception as e:
            print(f"Error generating caption: {e}")
        
        print("-" * 50)

In [20]:
def plot_training_history(history):
    """Plot training history"""
    if history is None:
        print("No training history available")
        return
    
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 4))
    
    # Loss plot
    ax1.plot(history.history['loss'], label='Training Loss', marker='o')
    ax1.plot(history.history['val_loss'], label='Validation Loss', marker='s')
    ax1.set_title('Model Loss')
    ax1.set_xlabel('Epoch')
    ax1.set_ylabel('Loss')
    ax1.legend()
    ax1.grid(True)
    
    # Accuracy plot
    ax2.plot(history.history['accuracy'], label='Training Accuracy', marker='o')
    ax2.plot(history.history['val_accuracy'], label='Validation Accuracy', marker='s')
    ax2.set_title('Model Accuracy')
    ax2.set_xlabel('Epoch')
    ax2.set_ylabel('Accuracy')
    ax2.legend()
    ax2.grid(True)
    
    plt.tight_layout()
    plt.savefig('training_history.png', dpi=300, bbox_inches='tight')
    plt.show()