In [3]:
# ============================================================================
# INSTALLATION LIBRARIES
# ============================================================================
import os
import pickle
import numpy as np
from tqdm import tqdm
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import (
    Input, Dense, Embedding, LSTM, Dropout, Add
)
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping
import warnings
warnings.filterwarnings('ignore')


SEED = 42
np.random.seed(SEED)
tf.random.set_seed(SEED)

print("="*80)
print("IMAGE CAPTIONING - FLICKR8K (CNN-LSTM)")
print("="*80)


IMAGE CAPTIONING - FLICKR8K (CNN-LSTM)


In [4]:
# ============================================================================
# PATH CONFIG
# ============================================================================
DATA_DIR = "../data"
FEATURE_DIR = os.path.join(DATA_DIR, "features")
CHECKPOINT_DIR = "../checkpoints"

os.makedirs(CHECKPOINT_DIR, exist_ok=True)

In [11]:
# ============================================================================
# LOAD FEATURES
# ============================================================================

# Image Features
with open(os.path.join(FEATURE_DIR, "image_features.pkl"), "rb") as f:
    image_features = pickle.load(f)

# Caption Features
with open(os.path.join(FEATURE_DIR, "image_to_captions.pkl"), "rb") as f:
    image_to_captions = pickle.load(f)

# Vocabulary Mapping
with open(os.path.join(FEATURE_DIR, "word_to_idx.pkl"), "rb") as f:
    word_to_idx = pickle.load(f)

with open(os.path.join(FEATURE_DIR, "idx_to_word.pkl"), "rb") as f:
    idx_to_word = pickle.load(f)
vocab_size = len(word_to_idx) + 1

# Max Caption Length
all_captions = []
for caps in image_to_captions.values():
    all_captions.extend(caps)

max_length = max(len(c.split()) for c in all_captions)


print("Total image features:", len(image_features))
print("Total images with captions:", len(image_to_captions))
print("Vocab size:", vocab_size)
print("Max caption length:", max_length)

Total image features: 8091
Total images with captions: 8091
Vocab size: 2567
Max caption length: 32


In [None]:
# ============================================================================
# SANITY CHECK: PRINT A SAMPLE
# ============================================================================
sample_img_id = list(image_to_captions.keys())[0]

print("Image ID:", sample_img_id)
print("Feature shape:", image_features[sample_img_id].shape)
print("Sample captions:")
for c in image_to_captions[sample_img_id][:2]:
    print("-", c)

Image ID: 1000268201_693b08cb0e.jpg
Feature shape: (2048,)
Sample captions:
- startseq seorang anak dengan gaun merah muda sedang menaiki seperangkat tangga dengan jalan masuk endseq
- startseq seorang gadis pergi ke sebuah bangunan kayu endseq


In [13]:
# ============================================================================
# DATA GENERATOR
# ============================================================================
def caption_to_sequence(caption, word_to_idx):
    return [word_to_idx[word] for word in caption.split() if word in word_to_idx]

def data_generator(image_to_captions, image_features, word_to_idx,
                   max_length, vocab_size, batch_size=32):

    X_img, X_seq, y = [], [], []
    
    while True:
        for img_id, captions in image_to_captions.items():
            
            feature = image_features[img_id]
            
            for caption in captions:
                seq = caption_to_sequence(caption, word_to_idx)

                for i in range(1, len(seq)):
                    in_seq = seq[:i]
                    out_word = seq[i]

                    in_seq = pad_sequences(
                        [in_seq],
                        maxlen=max_length,
                        padding='post'
                    )[0]

                    out_word = to_categorical(
                        out_word,
                        num_classes=vocab_size
                    )

                    X_img.append(feature)
                    X_seq.append(in_seq)
                    y.append(out_word)

                    if len(X_img) == batch_size:
                        yield [np.array(X_img), np.array(X_seq)], np.array(y)
                        X_img, X_seq, y = [], [], []

In [14]:
# ============================================================================
# SANITY CHECK DATA GENERATOR
# ============================================================================
gen = data_generator(
    image_to_captions=image_to_captions,
    image_features=image_features,
    word_to_idx=word_to_idx,
    max_length=max_length,
    vocab_size=vocab_size,
    batch_size=2
)

(X_img, X_seq), y = next(gen)

print("Image input shape:", X_img.shape)   # (2, 2048)
print("Seq input shape:", X_seq.shape)     # (2, max_length)
print("Target shape:", y.shape)            # (2, vocab_size)


Image input shape: (2, 2048)
Seq input shape: (2, 32)
Target shape: (2, 2567)


In [None]:
# ============================================================================
# BUILD MODEL
# ============================================================================
# Image feature input
image_input = Input(shape=(2048,), name="image_input")

image_dense = Dense(256, activation="relu", name="image_dense")(image_input)
image_dense = Dropout(0.5)(image_dense)

# Caption sequence input
seq_input = Input(shape=(max_length,), name="seq_input")

seq_embed = Embedding(
    input_dim=vocab_size,
    output_dim=256,
    mask_zero=True,
    name="embedding"
)(seq_input)

seq_lstm = LSTM(256, name="lstm")(seq_embed)
seq_lstm = Dropout(0.5)(seq_lstm)

# Merge Image & Text
decoder = Add(name="merge")([image_dense, seq_lstm])

decoder = Dense(256, activation="relu")(decoder)
output = Dense(vocab_size, activation="softmax", name="output")(decoder)

# Build & Compile Model
model = Model(
    inputs=[image_input, seq_input],
    outputs=output
)

model.compile(
    loss="categorical_crossentropy",
    optimizer="adam"
)

model.summary()



Model: "model"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 seq_input (InputLayer)      [(None, 32)]                 0         []                            
                                                                                                  
 image_input (InputLayer)    [(None, 2048)]               0         []                            
                                                                                                  
 embedding (Embedding)       (None, 32, 256)              657152    ['seq_input[0][0]']           
                                                                                                  
 image_dense (Dense)         (None, 256)                  524544    ['image_input[0][0]']         
                                                                                            

In [19]:
# ============================================================================
# TRAINING CONFIG
# ============================================================================
# Training Config
EPOCHS = 30
BATCH_SIZE = 64

# Steps per Epoch
def count_steps(image_to_captions, batch_size):
    total_pairs = 0
    for captions in image_to_captions.values():
        for c in captions:
            total_pairs += len(c.split()) - 1
    return total_pairs // batch_size

steps_per_epoch = count_steps(image_to_captions, BATCH_SIZE)
print("Steps per epoch:", steps_per_epoch)

# Data Generator
train_generator = data_generator(
    image_to_captions=image_to_captions,
    image_features=image_features,
    word_to_idx=word_to_idx,
    max_length=max_length,
    vocab_size=vocab_size,
    batch_size=BATCH_SIZE
)

# Callbacks (Checkpoint & Early Stopping)
checkpoint = ModelCheckpoint(
    filepath="../checkpoints/caption_model_best.h5",
    monitor="loss",
    save_best_only=True,
    verbose=1
)
early_stop = EarlyStopping(
    monitor="loss",
    patience=5,
    restore_best_weights=True
)


Steps per epoch: 6401


In [20]:
# ============================================================================
# TRAINING MODEL
# ============================================================================
history = model.fit(
    train_generator,
    epochs=EPOCHS,
    steps_per_epoch=steps_per_epoch,
    callbacks=[checkpoint, early_stop],
    verbose=1
)

Epoch 1/30
Epoch 1: loss improved from inf to 3.42048, saving model to ../checkpoints\caption_model_best.h5
Epoch 2/30
Epoch 2: loss improved from 3.42048 to 3.31061, saving model to ../checkpoints\caption_model_best.h5
Epoch 3/30
Epoch 3: loss improved from 3.31061 to 3.24304, saving model to ../checkpoints\caption_model_best.h5
Epoch 4/30
Epoch 4: loss improved from 3.24304 to 3.19070, saving model to ../checkpoints\caption_model_best.h5
Epoch 5/30
Epoch 5: loss improved from 3.19070 to 3.15390, saving model to ../checkpoints\caption_model_best.h5
Epoch 6/30
Epoch 6: loss improved from 3.15390 to 3.12296, saving model to ../checkpoints\caption_model_best.h5
Epoch 7/30
Epoch 7: loss improved from 3.12296 to 3.10014, saving model to ../checkpoints\caption_model_best.h5
Epoch 8/30
Epoch 8: loss improved from 3.10014 to 3.08079, saving model to ../checkpoints\caption_model_best.h5
Epoch 9/30
Epoch 9: loss improved from 3.08079 to 3.06235, saving model to ../checkpoints\caption_model_best