<a href="https://colab.research.google.com/github/LapTQ/image_captioning/blob/main/image_captioning.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!git clone https://github.com/hungpham13/Vietnamese-HTR.git

Cloning into 'Vietnamese-HTR'...
remote: Enumerating objects: 2403, done.[K
remote: Counting objects: 100% (4/4), done.[K
remote: Compressing objects: 100% (3/3), done.[K
remote: Total 2403 (delta 0), reused 4 (delta 0), pack-reused 2399[K
Receiving objects: 100% (2403/2403), 427.59 MiB | 38.26 MiB/s, done.
Checking out files: 100% (2395/2395), done.


In [26]:
import os
import re
import numpy as np
import tensorflow as tf

import matplotlib.pyplot as plt
from tensorflow import keras
from pathlib import Path

seed = 42
np.random.seed(seed)
tf.random.set_seed(seed)

In [27]:
!head -20 '/content/Vietnamese-HTR/Data 1: Handwriting OCR for Vietnamese Address/0825_DataSamples 1/labels.json'

{
    "1.jpg": "Số 3 Nguyễn Ngọc Vũ, Hà Nội",
    "2.jpg": "Số 30 Nguyên Hồng, Láng Hạ, Đống Đa, Hà Nội",
    "3.jpg": "58 Thái Thịnh, Đống Đa, Hà Nội",
    "4.jpeg": "Số 370/8 khu phố 5B, phường Tân Biên, Biên Hòa, Đồng Nai",
    "5.jpg": "Vĩnh Trung Plaza, B, 255-257 đường Hùng Vương, phường Vĩnh Trung",
    "6.jpg": "Tòa nhà 34T, Hoàng Đạo Thúy, Hà Nội",
    "7.jpg": "40 Cát Linh, Đống Đa, Hà Nội",
    "8.jpg": "phòng 101, tầng 1, lô 04-TT5B, khu đô thị Tây Nam Linh Đàm",
    "9.JPG": "Nhà 87 ngõ 416 Đê La Thành",
    "10.JPG": "Up coworking Space, 89 Láng Hạ, Hà Nội",
    "11.jpg": "192 Ngô Đức Kế, quận 1, Hồ Chí Minh",
    "12.jpg": "số 5 Công Trường Mê Linh, phường Bến Nghé, quận 1",
    "13.jpg": "90A đường Mai Xuân Thưởng, tỉnh Gia Lai",
    "14.jpg": "96/7/12B Phạm Văn Đồng, thành phố Pleiku",
    "15.jpg": "168 Ngô Gia Tự, thành phố Hà Tĩnh"
}

In [94]:
train_img_dir = '/content/Vietnamese-HTR/Data 1: Handwriting OCR for Vietnamese Address/0916_Data Samples 2'
test_img_dir = '/content/Vietnamese-HTR/Data 1: Handwriting OCR for Vietnamese Address/1015_Private Test'

image_height, image_width = 120, 1900
vocab_size = 10000

# Fixed length allowed for any sequence
seq_length = 25

# Dimension for the image embeddings and token embeddings
embedding_dim = 512

# Per-layer units in the feed-forward network
units = 512

batch_size = 4
epochs = 30
AUTOTUNE = tf.data.AUTOTUNE

In [95]:
print('Number of training images:', len(list(Path(train_img_dir).glob('*.png'))))
print('Number of testing images:', len(list(Path(test_img_dir).glob('*.png'))))

Number of training images: 1823
Number of testing images: 549


In [96]:
import json

train_json = json.load(
    open(train_img_dir + '/labels.json', 'r')
)

test_json = json.load(
    open(test_img_dir + '/labels.json', 'r')
)

train_data = {os.path.join(train_img_dir, image_name): '<start> ' + label + ' <end>' for image_name, label in train_json.items()}
test_data = {os.path.join(test_img_dir, image_name): '<start> ' + label + ' <end>' for image_name, label in test_json.items()}

In [97]:
def decode_and_resize(img_path):
    img_string = tf.io.read_file(img_path)
    img = tf.image.decode_png(img_string)

    # resize to desired shape
    # input is of int [0, 255], but output is of float [0, 255]
    img = tf.image.resize_with_pad(img, image_height, image_width)

    # preprocess_input accept input of type float [0, 255]
    img = keras.applications.densenet.preprocess_input(img)

    return img

strip_chars = "!\"#$%&'()*+,-./:;<=>?@[\]^_`{|}~"
strip_chars = strip_chars.replace('<', '')
strip_chars = strip_chars.replace('>', '')
strip_chars = strip_chars.replace('/', '')
strip_chars = strip_chars.replace('-', '')

vectorization = keras.layers.TextVectorization(
    max_tokens=vocab_size,
    output_mode='int',
    output_sequence_length=seq_length,
    standardize=lambda label: tf.strings.regex_replace(label, "[%s]" % re.escape(strip_chars), "")
)

vectorization.adapt(list(train_data.values()))

def preprocess_input(img_path, label):
    return decode_and_resize(img_path), vectorization(label)

def make_dataset(img_paths, labels, training):
    assert training is True or training is False

    dataset = tf.data.Dataset.from_tensor_slices((img_paths, labels))
    dataset = dataset.map(preprocess_input, num_parallel_calls=AUTOTUNE)
    dataset = dataset.prefetch(buffer_size=2000)
    # dataset = dataset.cache()
    if training: 
        dataset = dataset.shuffle(buffer_size=2000)
    dataset = dataset.batch(batch_size)

    return dataset


train_ds = make_dataset(
    train_data.keys(),
    train_data.values(),
    training = True
)
test_ds = make_dataset(
    test_data.keys(),
    test_data.values(),
    training = False
)

  "Even though the `tf.config.experimental_run_functions_eagerly` "


In [98]:
# for images, labels in train_ds.take(1):
#     for image, label in zip(images, labels):
#         plt.imshow(image)
#         plt.show()
#         print(label)

In [99]:
# i = 0
# for label in train_data.values():
#     print(
#         tf.strings.regex_replace(label, "[%s]" % re.escape("!\"#$%&'()*+,.:;=?@[\]^_`{|}~"), "")
#     )
#     i += 1
#     if i == 3:
#         break
    

In [100]:
# vectorization.get_vocabulary()

In [101]:
# print(list(json_file.values())[0])
# vectorization(list(train_data.values()))[:3]

In [102]:
base_model = keras.applications.DenseNet121(
    include_top=False,
    input_shape=(image_height, image_width, 3),

)
# get the feature map from DenseNet
base_model_out = base_model.output
# squash the feature map from shape [f_height, f_width, f_channel]
# to shape [f_height x f_width, f channel], we'll pass it through
# a CNN Encoder later on
base_model_out = keras.layers.Reshape(
    (-1, base_model_out.shape[-1])
)(base_model_out)

cnn_model = keras.models.Model(
    base_model.input,
    base_model_out
)

In [103]:
class CNN_Encoder(keras.Model):
    # Since you have already extracted the features and dumped it
    # This encoder passes those features through a Fully connected layer
    def __init__(self, embedding_dim):
        super(CNN_Encoder, self).__init__()

        self.feature_extractor = keras.applications.DenseNet121(
            include_top=False,
            input_shape=(image_height, image_width, 3),

        )
        
        self.fc = keras.layers.Dense(embedding_dim)

    def call(self, x):
        # get the feature map from DenseNet
        x = self.feature_extractor(x)
        # squash the feature map from shape [f_height, f_width, f_channel]
        # to shape [f_height x f_width, f channel]
        x = keras.layers.Reshape((-1, x.shape[-1]))(x)
        # shape after fc == (f_height x f_width, embedding_dim)
        x = self.fc(x)
        x = tf.nn.relu(x)
        return x

In [104]:
class BahdanauAttention(keras.Model):
    def __init__(self, units):
        super(BahdanauAttention, self).__init__()
        self.W1 = keras.layers.Dense(units)
        self.W2 = keras.layers.Dense(units)
        self.V = keras.layers.Dense(1)

    def call(self, features, hidden):
        # features(CNN_encoder output) shape = (batch_size, f_height x f_width, embedding_dim)

        # hidden shape == (batch_size, hidden_size)
        # hidden_with_time_axis shape == (batch_size, 1, hidden_size)
        hidden_with_time_axis = tf.expand_dims(hidden, 1)

        # print('BahdanauAttention: hidden_with_time_axis.shape == ', hidden_with_time_axis.shape)

        # attention_hidden_layer shape == (batch_size, f_height x f_width, units)
        attention_hidden_layer = tf.nn.tanh(
            self.W1(features) + self.W2(hidden_with_time_axis)
        )

        # score shape == (batch_size, f_height x f_width, 1)
        # this gives an unnormalized score for each image feature
        score = self.V(attention_hidden_layer)

        # print('BahdanauAttention: score.shape == ', score.shape)

        # attention_weights shape == (batch_size, f_height x f_width, 1)
        attention_weights = tf.nn.softmax(score, axis=1)

        # print('BahdanauAttention: attention_weights.shape == ', attention_weights.shape)

        # context vector shape after sum = (batch_size, hidden_size)
        context_vector = attention_weights * features
        # print('BahdanauAttention: context_vector.shape == ', context_vector.shape)
        context_vector = tf.reduce_sum(context_vector, axis=1)

        return context_vector, attention_weights


Bahdanau vs Luong

In [105]:
class RNN_Decoder(tf.keras.Model):
    def __init__(self, embedding_dim, units, vocab_size):
        super(RNN_Decoder, self).__init__()
        self.units = units

        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
        self.gru = tf.keras.layers.GRU(self.units,
                                    return_sequences=True,
                                    return_state=True,
                                    recurrent_initializer='glorot_uniform')
        self.fc1 = tf.keras.layers.Dense(self.units)
        self.fc2 = tf.keras.layers.Dense(vocab_size)

        self.attention = BahdanauAttention(self.units)

    def call(self, x, features, hidden):
        # defining attention as a separate model
        context_vector, attention_weights = self.attention(features, hidden)

        # print('RNN_Decoder: context_vector.shape == ', context_vector.shape)

        # x shape after passing through embedding == (batch_size, 1, embedding_dim)
        x = self.embedding(x)

        # x shape after concatenation == (batch_size, 1, embedding_dim + hidden_size)
        x = tf.concat([tf.expand_dims(context_vector, 1), x], axis=-1)

        # passing the concatenated vector to the GRU
        output, state = self.gru(x)

        # shape == (batch_size, max_length, hidden_size)
        x = self.fc1(output)

        # x shape == (batch_size * max_length, hidden_size)
        x = tf.reshape(x, (-1, x.shape[2]))

        # output shape == (batch_size * max_length, vocab)
        x = self.fc2(x)

        return x, state, attention_weights

    def reset_state(self, batch_size):
        return tf.zeros((batch_size, self.units))

In [106]:
encoder = CNN_Encoder(embedding_dim)
decoder = RNN_Decoder(embedding_dim, units, vectorization.vocabulary_size())

In [107]:
optimizer = keras.optimizers.Adam()
loss_object = keras.losses.SparseCategoricalCrossentropy(
    from_logits=True, reduction='none'
)

def loss_function(trues, preds):
    loss_ = loss_object(trues, preds)
    
    mask = tf.cast(
        tf.math.logical_not(tf.math.equal(trues, 0)),
        dtype=loss_.dtype
    )

    loss_ *= mask

    return tf.reduce_mean(loss_)

In [108]:
!mkdir checkpoints
!mkdir checkpoints/train

mkdir: cannot create directory ‘checkpoints’: File exists
mkdir: cannot create directory ‘checkpoints/train’: File exists


In [109]:
checkpoint_path = 'checkpoints/train'
ckpt = tf.train.Checkpoint(
    encoder=encoder,
    decoder=decoder,
    optimizer=optimizer
)
ckpt_manager = tf.train.CheckpointManager(
    ckpt,
    checkpoint_path,
    max_to_keep=5
)

In [122]:
start_epoch = 0
if ckpt_manager.latest_checkpoint:
    start_epoch = int(ckpt_manager.latest_checkpoint.split('-')[-1])
    ckpt.restore(ckpt_manager.latest_checkpoint)

In [111]:
loss_plot = []

In [112]:
word_to_index = keras.layers.StringLookup(
    mask_token="",
    vocabulary=vectorization.get_vocabulary())
index_to_word = keras.layers.StringLookup(
    mask_token="",
    vocabulary=vectorization.get_vocabulary(),
    invert=True)

In [113]:
from tensorflow.python.ops.variables import trainable_variables
@tf.function
def train_step(images, targets):
    # print('train_step: images.shape == ', images.shape)
    loss = 0

    # initializing the hidden state for each batch
    # because the captions are not related from image to image
    hidden = decoder.reset_state(batch_size=targets.shape[0])

    dec_input = tf.expand_dims(
        [word_to_index('<start>')] * targets.shape[0],
        axis=1
    )

    with tf.GradientTape() as tape:
        features = encoder(images)

        for i in range(1, targets.shape[1]):
            # print('train_step: dec_input.shape == ', dec_input.shape)
            # print('train_step: features.shape == ', features.shape)
            # print('train_step: hidden.shape == ', hidden.shape)
            predictions, hidden, _ = decoder(dec_input, features, hidden)

            loss += loss_function(targets[:, i], predictions)

            dec_input = tf.expand_dims(targets[:, i], 1)
    
    total_loss = loss / targets.shape[1]

    trainable_variables = encoder.trainable_variables + decoder.trainable_variables

    gradients = tape.gradient(loss, trainable_variables)

    optimizer.apply_gradients(zip(gradients, trainable_variables))

    return loss, total_loss

In [121]:
tf.config.run_functions_eagerly(False)

In [None]:
import time

epochs = 20

for epoch in range(start_epoch, epochs):
    start = time.time()
    total_loss = 0

    for batch, (images, targets) in enumerate(train_ds):
        batch_loss, t_loss = train_step(images, targets)
        total_loss += t_loss

        if batch % 50 == 0:
            average_batch_loss = batch_loss.numpy() / int(targets.shape[1])
            print(f'Epoch: {epoch + 1} Batch: {batch} Loss: {average_batch_loss:.4f}')
        
    loss_plot.append(total_loss / train_ds.cardinality().numpy())

    if epoch % 5 == 0:
        ckpt_manager.save()

    print(f'Epoch: {epoch + 1} Loss: {total_loss/train_ds.cardinality().numpy():.6f}')
    print(f'Time taken for 1 epoch {time.time()-start:.2f} sec\n')

In [120]:
total_loss / train_ds.cardinality().numpy()

<tf.Tensor: shape=(), dtype=float32, numpy=3.1029227>

https://www.tensorflow.org/tutorials/text/image_captioning

https://www.tensorflow.org/text/tutorials/nmt_with_attention

https://colab.research.google.com/github/keras-team/keras-io/blob/master/examples/vision/ipynb/image_captioning.ipynb#scrollTo=StQK3dgDcri0

https://keras.io/examples/nlp/neural_machine_translation_with_transformer/

https://keras.io/examples/nlp/semantic_similarity_with_bert/


## References
1. https://arxiv.org/pdf/1703.09137.pdf
2. https://viblo.asia/p/a-guide-to-image-captioning-part-1-gioi-thieu-bai-toan-sinh-mo-ta-cho-anh-gAm5yr88Kdb
3. https://www.tensorflow.org/tutorials/text/image_captioning
4. https://arxiv.org/pdf/1502.03044.pdf
5. https://keras.io/examples/vision/image_captioning/
6. https://machinelearningmastery.com/the-bahdanau-attention-mechanism/
