# Lab12-2: Image Caption
---
111062117 黃祥陞

In this assignment, we built a CAPTCHA recognition model using CNNs and RNNs with attention. CNNs extract image features, and RNNs generate sequences, focusing on relevant parts with attention. This approach effectively recognizes and translates CAPTCHA images into text.

In [1]:
import tensorflow as tf
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
    try:
        # Currently, memory growth needs to be the same across GPUs
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
        tf.config.experimental.set_visible_devices(gpus[0], 'GPU')
        logical_gpus = tf.config.experimental.list_logical_devices('GPU')
        print(len(gpus), "Physical GPUs,", len(logical_gpus), "Logical GPUs")
    except RuntimeError as e:
        # Memory growth must be set before GPUs have been initialized
        print(e)

## Data Loading

In [2]:
# import zipfile

# with zipfile.ZipFile('words_captcha.zip') as zip_ref:
#     zip_ref.extractall('words_captcha')

In [3]:
import glob

# Load image paths and captions
img_name_list = []
cap_list = []

with open('./words_captcha/spec_train_val.txt') as file:
    for line in file:
        image_name, caption = line.strip().split()
        img_name_list.append(f'./words_captcha/{image_name}.png')
        cap_list.append('<start> ' + ' '.join(caption) + ' <end>')

test_img_name_list = set(glob.glob('./words_captcha/*.png')) - set(img_name_list)
img_name_list += list(sorted(test_img_name_list))

print(f"Loaded {len(img_name_list)} images and {len(cap_list)} labels.")

Loaded 140000 images and 120000 labels.


## Data Preprocessing
This section tokenizes captions, converts them into integer sequences, and pads them to uniform length. It ensures captions are properly formatted for model input.

In [4]:
# Tokenize captions
tokenizer = tf.keras.preprocessing.text.Tokenizer(filters='!"#$%&()*+,-./:;=?@[\\]^_`{|}~\t\n', oov_token='')
tokenizer.fit_on_texts(cap_list)

# Convert captions to sequences and pad them
cap_seqs = tf.keras.preprocessing.sequence.pad_sequences(tokenizer.texts_to_sequences(cap_list), padding='post')
max_length = cap_seqs.shape[1]

# Verify results
print(cap_list[0])  # Example caption in text
print(cap_seqs[0])  # Example caption as indices

<start> t h u s <end>
[ 2  9 18 17  6  3  0]


## Data Splitting

In [5]:
# Split the data into training and validation sets
train_image_paths, val_image_paths, test_image_paths = img_name_list[:100000], img_name_list[100000:120000], img_name_list[120000:]
train_captions, val_captions = cap_seqs[:100000], cap_seqs[100000:]

# Verify the lengths of the splits
print(f"Training set size: {len(train_image_paths)}")
print(f"Validation set size: {len(val_image_paths)}")
print(f"Test set size: {len(test_image_paths)}")

Training set size: 100000
Validation set size: 20000
Test set size: 20000


## Parameter Setting

In [6]:
# Parameters
IMAGE_SIZE = (160, 300)
BATCH_SIZE = 32
BUFFER_SIZE = 1000
EMBEDDING_DIM = 256
UNITS = 512
VOCAB_SIZE = len(tokenizer.word_index) + 1
NUM_STEPS = len(train_image_paths) // BATCH_SIZE
EPOCHS = 3

## Build Dataset

This section defines a function to preprocess images and pairs them with their corresponding captions. TensorFlow `Dataset` objects are created for training and validation data, enabling efficient batching, shuffling, and prefetching to optimize data loading during model training.

In [7]:
def load_image(image_path):
    img = tf.io.read_file(image_path)
    img = tf.image.decode_png(img, channels=3)
    img = tf.image.resize(img, IMAGE_SIZE)
    img = tf.keras.applications.densenet.preprocess_input(img)
    return img

def map_func(img_name, cap):
    img_tensor = load_image(img_name)
    return img_tensor, cap

# Create TensorFlow Dataset objects
train_dataset = tf.data.Dataset.from_tensor_slices((train_image_paths, train_captions))\
                               .map(map_func, num_parallel_calls=tf.data.experimental.AUTOTUNE)\
                               .shuffle(BUFFER_SIZE)\
                               .batch(BATCH_SIZE)\
                               .prefetch(buffer_size=tf.data.experimental.AUTOTUNE)

val_dataset = tf.data.Dataset.from_tensor_slices((val_image_paths, val_captions))\
                             .map(map_func, num_parallel_calls=tf.data.experimental.AUTOTUNE)\
                             .batch(BATCH_SIZE)\
                             .prefetch(buffer_size=tf.data.experimental.AUTOTUNE)

# Verify the dataset
# for img, cap in train_dataset.take(1):
#     print(img.shape)
#     print(cap.shape)

## Build Model

### Feature Extractor
The DenseNet121 model, pre-trained on ImageNet, is used as the feature extractor. Its top layers are removed, allowing the output feature maps to serve as input for the encoder.

In [8]:
from tensorflow.keras.applications import DenseNet121
# Load the DenseNet121 model pre-trained on ImageNet
feature_extractor = DenseNet121(include_top=False, weights='imagenet', input_shape=(*IMAGE_SIZE, 3))

# feature_extractor.summary()

### CNN Encoder
A CNN-based encoder is defined to process the feature maps. It reduces the dimensionality using a fully connected layer followed by ReLU activation, preparing the data for the attention mechanism.

In [9]:
class CNN_Encoder(tf.keras.Model):
    def __init__(self, embedding_dim):
        super(CNN_Encoder, self).__init__()
        self.fc = tf.keras.layers.Dense(embedding_dim)

    def call(self, x):
        x = self.fc(x)
        x = tf.nn.relu(x)
        return x

The Bahdanau attention mechanism allows the model to focus on specific parts of the input features dynamically while generating each word in the output sequence. It takes the encoder’s output feature maps and the decoder’s hidden state as inputs. The hidden state is expanded to match the dimensions of the feature maps, and both are passed through separate dense layers. A combined score is computed using a `tanh` activation, followed by a softmax function to generate attention weights. These weights indicate the importance of each feature and are used to calculate a weighted sum of the feature maps, known as the context vector. The context vector guides the decoder in predicting the next word, enabling the model to adaptively attend to different parts of the input for improved performance.

In [10]:
class BahdanauAttention(tf.keras.Model):
    def __init__(self, units):
        super(BahdanauAttention, self).__init__()
        self.W1 = tf.keras.layers.Dense(units)
        self.W2 = tf.keras.layers.Dense(units)
        self.V = tf.keras.layers.Dense(1)

    def call(self, features, hidden):
        # features(CNN_encoder output) shape == (batch_size, 64, embedding_dim)

        # hidden shape == (batch_size, hidden_size)
        # hidden_with_time_axis shape == (batch_size, 1, hidden_size)
        hidden_with_time_axis = tf.expand_dims(hidden, 1)

        # score shape == (batch_size, 64, hidden_size)
        score = tf.nn.tanh(self.W1(features) + self.W2(hidden_with_time_axis))

        # attention_weights shape == (batch_size, 64, 1)
        # you get 1 at the last axis because you are applying score to self.V
        attention_weights = tf.nn.softmax(self.V(score), axis=1)

        # context_vector shape after sum == (batch_size, hidden_size)
        context_vector = attention_weights * features
        context_vector = tf.reduce_sum(context_vector, axis=1)

        return context_vector, attention_weights

An RNN decoder is constructed to generate captions. It combines an embedding layer, a GRU, and fully connected layers. The decoder integrates context vectors from the attention mechanism to sequentially predict words in the caption.

In [11]:
class RNN_Decoder(tf.keras.Model):
    def __init__(self, embedding_dim, units, vocab_size):
        super(RNN_Decoder, self).__init__()
        self.units = units

        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
        self.gru = tf.keras.layers.GRU(self.units,
                                       return_sequences=True,
                                       return_state=True,
                                       recurrent_initializer='glorot_uniform')
        self.fc1 = tf.keras.layers.Dense(self.units)
        self.fc2 = tf.keras.layers.Dense(vocab_size)

        self.attention = BahdanauAttention(self.units)

    def call(self, x, features, hidden):
        # defining attention as a separate model
        context_vector, attention_weights = self.attention(features, hidden)

        # x shape after passing through embedding == (batch_size, 1, embedding_dim)
        x = self.embedding(x)

        # x shape after concatenation == (batch_size, 1, embedding_dim + hidden_size)
        x = tf.concat([tf.expand_dims(context_vector, 1), x], axis=-1)

        # passing the concatenated vector to the GRU
        output, state = self.gru(x)

        # shape == (batch_size, max_length, hidden_size)
        x = self.fc1(output)

        # x shape == (batch_size * max_length, hidden_size)
        x = tf.reshape(x, (-1, x.shape[2]))

        # output shape == (batch_size * max_length, vocab)
        x = self.fc2(x)

        return x, state, attention_weights

    def reset_state(self, batch_size):
        return tf.zeros((batch_size, self.units))

In [12]:
encoder = CNN_Encoder(EMBEDDING_DIM)
decoder = RNN_Decoder(EMBEDDING_DIM, UNITS, VOCAB_SIZE)
# optizmizer = tf.keras.optimizers.Adam()
optimizer = tf.keras.optimizers.legacy.Adam(learning_rate=1e-4)
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True, reduction='none')

def loss_function(real, pred):
    mask = tf.math.logical_not(tf.math.equal(real, 0))
    loss_ = loss_object(real, pred)

    mask = tf.cast(mask, dtype=loss_.dtype)
    loss_ *= mask

    return tf.reduce_mean(loss_)

## Model Training

This section sets up the training process, including checkpointing, a training step function, and the main training loop. Checkpoints are managed to save and restore model weights, optimizer states, and training progress. The `train_step` function computes the loss and updates the model weights using the feature extractor, encoder, and decoder. The training loop iterates through the dataset for the specified epochs, logging the loss at each step and saving the model at the end of each epoch.

In [13]:
checkpoint_path = "./checkpoints/densenet"
ckpt = tf.train.Checkpoint(feature_extractor=feature_extractor,
                           encoder=encoder,
                           decoder=decoder,
                           optimizer=optimizer)
ckpt_manager = tf.train.CheckpointManager(ckpt, checkpoint_path, max_to_keep=5)

In [14]:
start_epoch = 0
if ckpt_manager.latest_checkpoint:
    ckpt.restore(ckpt_manager.latest_checkpoint).expect_partial()
    start_epoch = int(ckpt_manager.latest_checkpoint.split('-')[-1])
    print(f"Restored from {ckpt_manager.latest_checkpoint}")

Restored from ./checkpoints/densenet/ckpt-3


In [15]:
@tf.function
def train_step(img_tensor, target):
    loss = 0

    # initializing the hidden state for each batch
    hidden = decoder.reset_state(batch_size=target.shape[0])

    dec_input = tf.expand_dims([tokenizer.word_index['<start>']] * target.shape[0], 1)

    with tf.GradientTape() as tape:
        features = feature_extractor(img_tensor, True)
        features = tf.reshape(features, (features.shape[0], -1, features.shape[3]))
        features = encoder(features)

        for i in range(1, target.shape[1]):
            # passing the features through the decoder
            predictions, hidden, _ = decoder(dec_input, features, hidden)
            loss += loss_function(target[:, i], predictions)
            # using teacher forcing
            dec_input = tf.expand_dims(target[:, i], 1)

    total_loss = (loss / int(target.shape[1]))
    trainable_variables = encoder.trainable_variables + decoder.trainable_variables + feature_extractor.trainable_variables
    gradients = tape.gradient(loss, trainable_variables)
    optimizer.apply_gradients(zip(gradients, trainable_variables))

    return loss, total_loss

In [16]:
from tqdm import tqdm
import time
import matplotlib.pyplot as plt

loss_plot = []

for epoch in range(start_epoch, EPOCHS):
    start = time.time()
    total_loss = 0
    
    pbar = tqdm(train_dataset, total=NUM_STEPS, desc=f'Epoch {epoch + 1:2d}')
    for (step, (img_tensor, target)) in enumerate(pbar):
        batch_loss, t_loss = train_step(img_tensor, target)
        total_loss += t_loss
        pbar.set_postfix({'loss': total_loss.numpy() / (step + 1)})

    # Save the model every epochs
    ckpt_manager.save()

    epoch_loss = total_loss / NUM_STEPS
    loss_plot.append(epoch_loss)


## Evaluation

This section defines functions for evaluation and accuracy calculation. The `process` function converts tokenized captions back into text by removing `<start>` and `<end>` markers. The `evaluate` function generates predicted captions for a batch of image tensors by passing them through the feature extractor, encoder, and decoder, dynamically predicting each word using the attention mechanism. Finally, the `calculate_accuracy` function compares the generated captions with the ground truth captions from the validation dataset, computing the accuracy by iterating through all validation samples and checking for exact matches.

In [17]:
def process(caption):
    cap = ''
    for i in caption:
        if tokenizer.index_word[i] == '<start>':
            continue
        elif tokenizer.index_word[i] == '<end>':
            break
        cap += tokenizer.index_word[i]
    return cap

In [18]:
import numpy as np

def evaluate(img_tensor):
    batch_size = img_tensor.shape[0]
    dec_input = tf.expand_dims([tokenizer.word_index['<start>']] * batch_size, 1)
    
    features = feature_extractor(img_tensor)
    features = tf.reshape(features, (features.shape[0], -1, features.shape[3]))
    features = encoder(features)

    hidden = decoder.reset_state(batch_size=batch_size)

    result = tf.expand_dims([tokenizer.word_index['<start>']] * batch_size, 1)
    for _ in range(max_length):
        predictions, hidden, _ = decoder(dec_input, features, hidden)
        predicted_id = tf.argmax(predictions, axis=1).numpy()
        dec_input = tf.expand_dims(predicted_id, 1)
        result = tf.concat([result, predicted_id.reshape((batch_size, 1))], axis=1)
    return result.numpy()


In [19]:
def calculate_accuracy():
    pbar = tqdm(val_dataset, desc='Calculating Accuracy')
    correct = 0
    for img_tensor, cap in pbar:
        pred = evaluate(img_tensor)
        for pred, cap in zip(pred, cap.numpy()):
            if process(pred) == process(cap):
                correct += 1
        pbar.set_postfix({'accuracy': correct / ((pbar.n + 1) * BATCH_SIZE)})
    return correct / len(val_image_paths)

In [20]:
print(f'Accurcacy: {calculate_accuracy()}')

Calculating Accuracy: 100%|██████████| 625/625 [10:58<00:00,  1.05s/it, accuracy=0.91] 

Accurcacy: 0.9097





In [21]:
# # Plot the loss
# plt.plot(loss_plot)
# plt.xlabel('Epochs')
# plt.ylabel('Loss')
# plt.title('Loss Plot')
# plt.show()

## Predict Testing Data

In [22]:
def map_func_test(img_path):
    img_tensor = load_image(img_path)
    return img_tensor, img_path

test_dataset = tf.data.Dataset.from_tensor_slices(test_image_paths)\
                              .map(map_func_test, num_parallel_calls=tf.data.experimental.AUTOTUNE)\
                              .batch(BATCH_SIZE)\
                              .prefetch(buffer_size=tf.data.experimental.AUTOTUNE)

with open("Lab12-2_111062117.txt", "w") as file:
    for img_tensor, img_path in tqdm(test_dataset, desc="Processing Test Images"):
        result = evaluate(img_tensor)
        for pred, path in zip(result, img_path):
            cap = process(pred)
            path = path.numpy().decode('utf-8')
            file.write(f"{path[-11:-4]} {cap}\n")

Processing Test Images: 100%|██████████| 625/625 [12:20<00:00,  1.19s/it]


We used DenseNet as the feature extractor for the CAPTCHA recognition model. DenseNet’s dense connections enable efficient feature reuse, making it highly effective for capturing image details with fewer parameters. The encoder, attention mechanism, and decoder components followed the structure provided by the TA’s code.

The model was trained for 3 epochs, achieving a final accuracy of 0.9097 on validation set. This demonstrates the capability of DenseNet to extract meaningful features, combined with attention and RNNs to generate accurate text from CAPTCHA images. The architecture effectively integrates these components to solve a vision-to-text task.