# Lab12-2: Image Captioning

In the last Lab, use a combination of convolutional neural networks to obtain the vectorial representation of images and recurrent neural networks to decode those representations into natural language sentences. 

In [1]:
import tensorflow as tf
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
    try:
        # Currently, memory growth needs to be the same across GPUs
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
        tf.config.experimental.set_visible_devices(gpus[0], 'GPU')
        logical_gpus = tf.config.experimental.list_logical_devices('GPU')
        print(len(gpus), "Physical GPUs,", len(logical_gpus), "Logical GPUs")
    except RuntimeError as e:
        # Memory growth must be set before GPUs have been initialized
        print(e)

1 Physical GPUs, 1 Logical GPUs


In [2]:
import matplotlib.pyplot as plt

# Scikit-learn includes many helpful utilities
from sklearn.model_selection import train_test_split
from sklearn.utils import shuffle

import re
import numpy as np
import os
import time
import json
from glob import glob
from PIL import Image
import pickle
from tqdm import tqdm

In [3]:
# Store captions and image names in vectors
all_captions = []
all_img_name_vector = [] # 140000 images

with open('./words_captcha/spec_train_val.txt') as f:
    for line in f:
        img_name, caption = line.strip().split()
        all_img_name_vector.append(f'./words_captcha/{img_name}.png')
        all_captions.append('<start> ' + ' '.join(caption) + ' <end>')
        
for i in range(120000, 140000):
    all_img_name_vector.append(f'./words_captcha/a{i}.png')
    
print(len(all_img_name_vector)) # 140000
print(all_img_name_vector[0])
print(all_captions[0])

140000
./words_captcha/a0.png
<start> t h u s <end>


## Preprocess and tokenize the captions

In [4]:
# Find the maximum length of any caption in our dataset
def calc_max_length(tensor):
    return max(len(t) for t in tensor)

In [5]:
# Choose the top 5000 words from the vocabulary
top_k = 5000
tokenizer = tf.keras.preprocessing.text.Tokenizer(num_words=top_k,
                                                  oov_token="<unk>",
                                                  filters='!"#$%&()*+.,-/:;=?@[\]^_`{|}~ ')
tokenizer.fit_on_texts(all_captions)

# Create the tokenized vectors
train_seqs = tokenizer.texts_to_sequences(all_captions)

print(all_captions[0])
print(train_seqs[0])

<start> t h u s <end>
[2, 9, 18, 17, 6, 3]


In [6]:
tokenizer.word_index['<pad>'] = 0
tokenizer.index_word[0] = '<pad>'

In [7]:
# Pad each vector to the max_length of the captions
# If you do not provide a max_length value, pad_sequences calculates it automatically
cap_vector = tf.keras.preprocessing.sequence.pad_sequences(train_seqs, padding='post')
print(cap_vector[0])

[ 2  9 18 17  6  3  0]


In [8]:
# Calculates the max_length, which is used to store the attention weights
max_length = calc_max_length(train_seqs)
print(max_length)

7


## Split the data into training and testing

In [9]:
# Shuffle captions and image_names together
# Set a random state
# img_name_train, caption_train = shuffle(all_img_name_vector[:100000], cap_vector[:100000], random_state=514)
# img_name_valid, caption_valid = shuffle(all_img_name_vector[100000:120000], cap_vector[100000:120000], random_state=514)

img_name_train, cap_train = all_img_name_vector[:100000], cap_vector[:100000]
img_name_valid, cap_val = all_img_name_vector[100000:120000], cap_vector[100000:120000]

img_name_test = all_img_name_vector[120000:]

## Create a tf.data dataset for training

In [10]:
BATCH_SIZE = 100
BUFFER_SIZE = 5000
embedding_dim = 256
units = 512
vocab_size = len(tokenizer.word_index) + 1
num_steps = len(img_name_train) // BATCH_SIZE

In [11]:
IMAGE_SIZE = (160, 300)

def load_image(image_path, cap):
    img = tf.io.read_file(image_path)
    img = tf.image.decode_jpeg(img, channels=3)
    img = tf.image.resize(img, IMAGE_SIZE)
    img = img / 255 * 2 - 1
    # img = tf.keras.applications.inception_v3.preprocess_input(img)
    return img, cap

In [12]:
dataset_train = tf.data.Dataset.from_tensor_slices((img_name_train, cap_train))\
                               .map(load_image, num_parallel_calls=tf.data.experimental.AUTOTUNE)\
                               .shuffle(BUFFER_SIZE)\
                               .batch(BATCH_SIZE, drop_remainder=True)\
                               .prefetch(buffer_size=tf.data.experimental.AUTOTUNE)

dataset_valid = tf.data.Dataset.from_tensor_slices((img_name_valid, cap_val))\
                               .map(load_image, num_parallel_calls=tf.data.experimental.AUTOTUNE)\
                               .batch(BATCH_SIZE, drop_remainder=True)\
                               .prefetch(buffer_size=tf.data.experimental.AUTOTUNE)

## Model

In [13]:
# image_model = tf.keras.applications.InceptionV3(include_top=False,
#                                                 weights='imagenet')
# new_input = image_model.input
# hidden_layer = image_model.layers[-1].output

# image_features_extract_model = tf.keras.Model(new_input, hidden_layer)

In [14]:
def conv_leaky_relu(inputs, filters, size, stride):
    x = layers.Conv2D(filters, size, stride, padding="same")(inputs)
    x = layers.BatchNormalization()(x)
    x = layers.LeakyReLU(0.1)(x)

    return x

In [15]:
from tensorflow import keras
from tensorflow.keras import Input, layers, Model

img_inputs = keras.Input(shape=(IMAGE_SIZE[0], IMAGE_SIZE[1], 3))
x = conv_leaky_relu(img_inputs, 64, 7, 2)
x = layers.MaxPool2D()(x)
x = conv_leaky_relu(x, 192, 3, 1)
x = layers.MaxPool2D()(x)
x = conv_leaky_relu(x, 128, 1, 1)
x = conv_leaky_relu(x, 256, 3, 1)
x = conv_leaky_relu(x, 256, 1, 1)
x = conv_leaky_relu(x, 512, 3, 1)
x = layers.MaxPool2D()(x)
x = conv_leaky_relu(x, 256, 1, 1)
x = conv_leaky_relu(x, 512, 3, 1)
x = conv_leaky_relu(x, 256, 1, 1)
x = conv_leaky_relu(x, 512, 3, 1)
x = conv_leaky_relu(x, 256, 1, 1)
x = conv_leaky_relu(x, 512, 3, 1)
x = conv_leaky_relu(x, 256, 1, 1)
x = conv_leaky_relu(x, 512, 3, 1)
x = conv_leaky_relu(x, 512, 1, 1)
x = conv_leaky_relu(x, 1024, 3, 1)
x = layers.MaxPool2D()(x)
x = conv_leaky_relu(x, 512, 1, 1)
x = conv_leaky_relu(x, 1024, 3, 1)
x = conv_leaky_relu(x, 512, 1, 1)
x = conv_leaky_relu(x, 1024, 3, 1)
x = conv_leaky_relu(x, 1024, 3, 1)
x = conv_leaky_relu(x, 1024, 3, 2)
x = conv_leaky_relu(x, 1024, 3, 1)
x = conv_leaky_relu(x, 1024, 3, 1)

YOLO = keras.Model(inputs=img_inputs, outputs=x, name="YOLO")

YOLO.summary()

Model: "YOLO"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_1 (InputLayer)        [(None, 160, 300, 3)]     0         
                                                                 
 conv2d (Conv2D)             (None, 80, 150, 64)       9472      
                                                                 
 batch_normalization (BatchN  (None, 80, 150, 64)      256       
 ormalization)                                                   
                                                                 
 leaky_re_lu (LeakyReLU)     (None, 80, 150, 64)       0         
                                                                 
 max_pooling2d (MaxPooling2D  (None, 40, 75, 64)       0         
 )                                                               
                                                                 
 conv2d_1 (Conv2D)           (None, 40, 75, 192)       110784 

 leaky_re_lu_15 (LeakyReLU)  (None, 10, 18, 1024)      0         
                                                                 
 max_pooling2d_3 (MaxPooling  (None, 5, 9, 1024)       0         
 2D)                                                             
                                                                 
 conv2d_16 (Conv2D)          (None, 5, 9, 512)         524800    
                                                                 
 batch_normalization_16 (Bat  (None, 5, 9, 512)        2048      
 chNormalization)                                                
                                                                 
 leaky_re_lu_16 (LeakyReLU)  (None, 5, 9, 512)         0         
                                                                 
 conv2d_17 (Conv2D)          (None, 5, 9, 1024)        4719616   
                                                                 
 batch_normalization_17 (Bat  (None, 5, 9, 1024)       4096      
 chNormali

In [16]:
class BahdanauAttention(tf.keras.Model):
    def __init__(self, units):
        super(BahdanauAttention, self).__init__()
        self.W1 = tf.keras.layers.Dense(units)
        self.W2 = tf.keras.layers.Dense(units)
        self.V = tf.keras.layers.Dense(1)

    def call(self, features, hidden):
        # features(CNN_encoder output) shape == (batch_size, 10, embedding_dim)

        # hidden shape == (batch_size, hidden_size)
        # hidden_with_time_axis shape == (batch_size, 1, hidden_size)
        hidden_with_time_axis = tf.expand_dims(hidden, 1)

        # score shape == (batch_size, 10, hidden_size)
        score = tf.nn.tanh(self.W1(features) + self.W2(hidden_with_time_axis))

        # attention_weights shape == (batch_size, 10, 1)
        # you get 1 at the last axis because you are applying score to self.V
        attention_weights = tf.nn.softmax(self.V(score), axis=1)

        # context_vector shape after sum == (batch_size, hidden_size)
        context_vector = attention_weights * features
        context_vector = tf.reduce_sum(context_vector, axis=1)

        return context_vector, attention_weights

In [17]:
class CNN_Encoder(tf.keras.Model):
    # Since you have already extracted the features and dumped it using pickle
    # This encoder passes those features through a Fully connected layer
    def __init__(self, embedding_dim):
        super(CNN_Encoder, self).__init__()
        # shape after fc == (batch_size, 15, embedding_dim)
        self.fc = tf.keras.layers.Dense(embedding_dim)

    def call(self, x):
        x = self.fc(x)
        x = tf.nn.relu(x)
        return x

In [18]:
class RNN_Decoder(tf.keras.Model):
    def __init__(self, embedding_dim, units, vocab_size):
        super(RNN_Decoder, self).__init__()
        self.units = units

        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
        self.gru = tf.keras.layers.GRU(self.units,
                                       return_sequences=True,
                                       return_state=True,
                                       recurrent_initializer='glorot_uniform')
        self.fc1 = tf.keras.layers.Dense(self.units)
        self.fc2 = tf.keras.layers.Dense(vocab_size)

        self.attention = BahdanauAttention(self.units)

    def call(self, x, features, hidden):
        # defining attention as a separate model
        context_vector, attention_weights = self.attention(features, hidden)

        # x shape after passing through embedding == (batch_size, 1, embedding_dim)
        x = self.embedding(x)

        # x shape after concatenation == (batch_size, 1, embedding_dim + hidden_size)
        x = tf.concat([tf.expand_dims(context_vector, 1), x], axis=-1)

        # passing the concatenated vector to the GRU
        output, state = self.gru(x)

        # shape == (batch_size, max_length, hidden_size)
        x = self.fc1(output)

        # x shape == (batch_size * max_length, hidden_size)
        x = tf.reshape(x, (-1, x.shape[2]))

        # output shape == (batch_size * max_length, vocab)
        x = self.fc2(x)

        return x, state, attention_weights

    def reset_state(self, batch_size):
        return tf.zeros((batch_size, self.units))

In [19]:
encoder = CNN_Encoder(embedding_dim)
decoder = RNN_Decoder(embedding_dim, units, vocab_size)

In [20]:
LEARNING_RATE = 1e-4

optimizer = tf.keras.optimizers.Adam(learning_rate=LEARNING_RATE)
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
    from_logits=True, reduction='none')

def loss_function(real, pred):
    mask = tf.math.logical_not(tf.math.equal(real, 0))
    loss_ = loss_object(real, pred)

    mask = tf.cast(mask, dtype=loss_.dtype)
    loss_ *= mask

    return tf.reduce_mean(loss_)

## Checkpoint

In [21]:
checkpoint_path = './checkpoints/train/'
ckpt = tf.train.Checkpoint(feature_extractor = YOLO,
                           encoder=encoder,
                           decoder=decoder,
                           optimizer=optimizer)
ckpt_manager = tf.train.CheckpointManager(ckpt, checkpoint_path, max_to_keep=3)

In [22]:
start_epoch = 0
if ckpt_manager.latest_checkpoint:
    start_epoch = int(ckpt_manager.latest_checkpoint.split('-')[-1])

In [23]:
# adding this in a separate cell because if you run the training cell
# many times, the loss_plot array will be reset
loss_plot = []

In [24]:
@tf.function
def train_step(img_tensor, target):
    loss = 0

    # initializing the hidden state for each batch
    # because the captions are not related from image to image
    hidden = decoder.reset_state(batch_size=img_tensor.shape[0])

    dec_input = tf.expand_dims([tokenizer.word_index['<start>']] * img_tensor.shape[0], 1)

    with tf.GradientTape() as tape:
        
        features = YOLO(img_tensor, training=True)
        
        # extract the features from YOLO giving us a vector of shape (batch, 2, 5, 1024)
        # squash that to a shape of (batch, 10, 1024)
        features = tf.reshape(features, (features.shape[0], -1, features.shape[3]))
        
        features = encoder(features)

        for i in range(1, target.shape[1]):
            # passing the features through the decoder
            predictions, hidden, _ = decoder(dec_input, features, hidden)

            loss += loss_function(target[:, i], predictions)

            # using teacher forcing
            # target[:, i]: 取得目標序列（target）在時間步 i 的實際目標值
            # 將這個實際目標值轉換為形狀為 (batch_size, 1)，表示模型當前時間i+1的輸入(or i的輸出)
            dec_input = tf.expand_dims(target[:, i], 1) 

    total_loss = (loss / int(target.shape[1]))

    trainable_variables = YOLO.trainable_variables + encoder.trainable_variables + decoder.trainable_variables

    gradients = tape.gradient(loss, trainable_variables)

    optimizer.apply_gradients(zip(gradients, trainable_variables))

    return loss, total_loss

In [25]:
EPOCHS = 10
start = time.time()
for epoch in range(start_epoch, EPOCHS):
    
    total_loss = 0

    pbar = tqdm(dataset_train, total=num_steps, desc=f'Epoch {epoch + 1:2d}')
    
    for (batch, (img_tensor, target)) in enumerate(pbar):
        batch_loss, t_loss = train_step(img_tensor, target)
        total_loss += t_loss

#         if batch % 100 == 0:
#             print ('Epoch {} Batch {} Loss {:.4f}'.format(
#               epoch + 1, batch, batch_loss.numpy() / int(target.shape[1])))
    # storing the epoch end loss value to plot later
    loss_plot.append(total_loss / num_steps)

    if epoch % 2 == 0:
        ckpt_manager.save()

    print ('Epoch {} Loss {:.6f}'.format(epoch + 1,
                                         total_loss/num_steps))
print ('Time taken for {} epoch {} sec\n'.format(EPOCHS - start_epoch, time.time() - start))

Epoch  1:   1%|▏         | 14/1000 [01:05<1:17:25,  4.71s/it]


KeyboardInterrupt: 