# Second attempt Image Captioning

In [None]:
PARAMS = {}

## Import

In [None]:
import matplotlib.pyplot as plt
import tensorflow as tf
from keras.backend.tensorflow_backend import set_session
import keras
import sys, time, os, warnings 
import numpy as np
import pandas as pd 
import random
from tqdm import tqdm
from collections import Counter 
warnings.filterwarnings("ignore")

In [None]:
print("python {}".format(sys.version))
print("keras version {}".format(keras.__version__))
print("tensorflow version {}".format(tf.__version__))

## Config

In [None]:
import os
os.environ['http_proxy']="http://jessin:77332066@cache.itb.ac.id:8080"
os.environ['https_proxy']="https://jessin:77332066@cache.itb.ac.id:8080"

# for TFBertModel
PROXIES = {
  "http": "http://jessin:77332066@cache.itb.ac.id:8080",
  "https": "https://jessin:77332066@cache.itb.ac.id:8080",
}

In [None]:
config = tf.compat.v1.ConfigProto()
config.gpu_options.allow_growth=True
session = tf.compat.v1.Session(config=config)

## Load dataset

In [None]:
Flickr_image_dir = "../Dataset/Flickr8k/Flicker8k_Dataset"
Flickr_text_dir = "../Dataset/Flickr8k/Flickr8k.token.txt"

image_filenames = os.listdir(Flickr_image_dir)
image_filenames = [x for x in image_filenames if ".npy" not in x]
print("The number of jpg flies in Flicker8k: {}".format(len(image_filenames)))

In [None]:
## read in the Flickr caption data
file = open(Flickr_text_dir, 'r')
text = file.read().strip().split('\n')
file.close()

dataset = {}
for line in text:
    
    # line: 1000268201_693b08cb0e.jpg#0	A child in a pink...
    image_path, caption = line.split('\t')
    image_path, path_num = image_path.split("#")
    
    if image_path not in dataset:
        dataset[image_path] = {"captions":[], "tokens":[]}
    dataset[image_path]["captions"].append(caption)

## Sample dataset

In [None]:
from keras.preprocessing.image import load_img, img_to_array


def show_dataset_sample(n_sample=5):
    
    count = 1
    fig = plt.figure(figsize=(10, 20))
    
    sample_images = random.sample(list(dataset), n_sample)
    for image_path in sample_images:

        captions = dataset[image_path]["captions"]
        image_load = load_img(Flickr_image_dir + '/' + image_path, target_size=(224, 224, 3))

        # Plot image
        ax = fig.add_subplot(n_sample, 2, count, xticks=[], yticks=[])
        ax.imshow(image_load)
        count += 1

        # Plot captions
        ax = fig.add_subplot(n_sample, 2, count)
        ax.plot()
        ax.set_xlim(0, 1)
        ax.set_ylim(0, len(captions))
        ax.axis('off')    
        for i, caption in enumerate(captions):
            ax.text(0, i, caption, fontsize=16)
        count += 1

    plt.show()
    

show_dataset_sample()

## Image feature extractor

In [None]:
image_extractor = keras.applications.xception.Xception(include_top=True, weights='imagenet')

In [None]:
from keras.preprocessing.image import load_img, img_to_array
from keras.applications.xception import preprocess_input

PARAMS['image_shape'] = (299, 299, 3)
PARAMS['image_feature_size'] = 1000

In [None]:
def get_image_feature(image_path):
    
    # load an image from file
    image = load_img(image_path, target_size=PARAMS['image_shape'])
    image = img_to_array(image)
    image = preprocess_input(image)
    
    feature = image_extractor.predict(image.reshape((1,) + image.shape[:3])).flatten()
    
    return feature

In [None]:
# for image_path in tqdm(dataset.keys()):
    
#     filename = Flickr_image_dir + "/" + image_path
#     image_feature = get_image_feature(filename)
#     np.save(filename + ".npy", image_feature)

## Preprocessing text

In [None]:
START_TOKEN = "[CLS]"
END_TOKEN = "[SEP]"

In [None]:
def add_start_end_seq_token(captions):
    return ["{} {} {}".format(START_TOKEN, x, END_TOKEN) for x in captions]

for key in dataset.keys():
    dataset[key]["captions"] = add_start_end_seq_token(dataset[key]["captions"])

## Prepare caption dataset

In [None]:
PARAMS["vocab_size"] = 8000
PARAMS["max_caption_length"] = 25

In [None]:
all_captions = [x for captions in dataset.values() for x in captions["captions"]]

In [None]:
from keras.preprocessing.text import Tokenizer

tokenizer = Tokenizer(nb_words=PARAMS["vocab_size"])
tokenizer.fit_on_texts(all_captions)

dtexts = tokenizer.texts_to_sequences(all_captions)
for key in tqdm(dataset.keys()):
    dataset[key]["tokens"] = tokenizer.texts_to_sequences(dataset[key]["captions"])

actual_size = len(tokenizer.word_index) + 1
print("using {} of {} unique tokens ({:.2f} %)".format(PARAMS["vocab_size"], actual_size, PARAMS["vocab_size"]/actual_size*100))

In [None]:
from tensorflow.keras.preprocessing.sequence import pad_sequences


def build_paralel_dataset(dataset):
    
    images, captions, targets = [], [], []
    
    for key in tqdm(dataset.keys()):
        
        filename = Flickr_image_dir + "/" + key
        image = np.load(filename + '.npy')
        
        tokens = dataset[key]["tokens"]
        for token in tokens:
            
            for i in range(1, len(token)):
                in_text, out_text = token[:i], token[i]
                in_text = pad_sequences([in_text], 
                                   maxlen=PARAMS["max_caption_length"],
#                                    padding='post',
                                   truncating='post').flatten()

                images.append(image)
                captions.append(in_text)
                targets.append(out_text)
                
    return images, captions, targets

In [None]:
# par_dt => paralel dataset
par_dt_image_paths, par_dt_captions, par_dt_targets = build_paralel_dataset(dataset)
assert(len(par_dt_image_paths) == len(par_dt_captions) == len(par_dt_targets))

DATA_SIZE = len(par_dt_image_paths)

In [None]:
PARAMS["batch_size"] = 64

In [None]:
# def load_dataset(image_path, caption, target):
#     img_tensor = np.load(image_path.decode('utf-8') + '.npy')
#     return img_tensor, caption, target


# def create_dataset_object(par_dt_image_paths, par_dt_captions, par_dt_targets):
#     dataset = tf.data.Dataset.from_tensor_slices((par_dt_image_paths, par_dt_captions, par_dt_targets))
#     dataset = dataset.map(lambda item1, item2, item3: tf.numpy_function(
#               load_dataset, [item1, item2, item3], [tf.float32, tf.int32, tf.int32]),
#               num_parallel_calls=tf.data.experimental.AUTOTUNE)
#     dataset = dataset.batch(PARAMS["batch_size"])
#     dataset = dataset.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)
#     return dataset


# tf_dataset = create_dataset_object(par_dt_image_paths, par_dt_captions, par_dt_targets)

In [None]:
# # Split dataset 

# TRAIN_SPLIT = 0.7
# EVAL_SPLIT = 0.15
# TEST_SPLIT = 0.15  # approx

# n_batch = int(DATA_SIZE / PARAMS["batch_size"]) + 1
# n_train = int(n_batch * 0.7)
# n_eval = int(n_batch * 0.15)
# n_test = n_batch - (n_train + n_eval)

# train_tf_dataset = tf_dataset.take(n_train)
# eval_tf_dataset = tf_dataset.skip(n_train).take(n_eval)
# test_tf_dataset = tf_dataset.skip(n_train + n_eval)


# # """
# # tf_dataset => tuple of (image, captions, target)
# # image   => (batch_size = 16, image_feature = 1000)
# # caption => (batch_size = 16, max_length)
# # caption => (batch_size = 16,)
# # """

In [None]:
# print("train: {} batches, (total : {})".format(n_train, n_train * PARAMS["batch_size"]))
# print("eval : {} batches, (total : {})".format(n_eval, n_eval * PARAMS["batch_size"]))
# print("test : {} batches, (total : {} (aprx))".format(n_test, n_test * PARAMS["batch_size"]))

## Model

In [None]:
PARAMS["word_embedding_size"] = 64
PARAMS["decoder_units"] = 256
PARAMS["encoder_units"] = 256

In [None]:
from tensorflow.keras import layers, models

input_image = layers.Input(shape=(PARAMS["image_feature_size"],))
fimage = layers.Dense(PARAMS["encoder_units"], activation='relu')(input_image)

## sequence model
input_caption = layers.Input(shape=(PARAMS["max_caption_length"],))
ftxt = layers.Embedding(PARAMS["vocab_size"], output_dim=PARAMS["word_embedding_size"], mask_zero=True)(input_caption)
ftxt = layers.LSTM(PARAMS["decoder_units"])(ftxt)

## combined model for decoder
decoder = layers.add([ftxt, fimage])
decoder = layers.Dense(PARAMS["decoder_units"], activation='relu')(decoder)
output = layers.Dense(PARAMS["vocab_size"], activation='softmax')(decoder)
model = models.Model(inputs=[input_image, input_caption], outputs=output)

model.compile(loss='categorical_crossentropy', optimizer='adam')
model.summary()

In [None]:
train = 30000

X_image_train, X_caption_train, y_train = (par_dt_image_paths[:train], par_dt_captions[:train], par_dt_targets[:train])
X_image_train = tf.convert_to_tensor(X_image_train)
X_caption_train = tf.convert_to_tensor(X_caption_train)
y_train = tf.convert_to_tensor(y_train)

In [None]:
# fit model
hist = model.fit([X_image_train, X_caption_train], y_train, 
                  validation_data=([X_image_train, X_caption_train], y_train),
                  epochs=5, verbose=1, 
                  batch_size=64)

## Plot training result

In [None]:
for label in ["loss", "val_loss"]:
    plt.plot(hist.history[label], label=label)
    
plt.legend()
plt.xlabel("epochs")
plt.ylabel("loss")
plt.show()

## Prediction

In [None]:
index_word = dict([(index,word) for word, index in tokenizer.word_index.items()])
def predict_caption(image):
    '''
    image.shape = (1,4462)
    '''

    in_text = 'startseq'

    for iword in range(maxlen):
        sequence = tokenizer.texts_to_sequences([in_text])[0]
        sequence = pad_sequences([sequence],maxlen)
        yhat = model.predict([image,sequence],verbose=0)
        yhat = np.argmax(yhat)
        newword = index_word[yhat]
        in_text += " " + newword
        if newword == "endseq":
            break
    return(in_text)

In [None]:
npic = 5
npix = 224
target_size = (npix,npix,3)

count = 1
fig = plt.figure(figsize=(10, 20))

for jpgfnm, image_feature in zip(fnm_test[:npic],di_test[:npic]):
    ## images 
    filename = dir_Flickr_jpg + '/' + jpgfnm
    image_load = load_img(filename, target_size=target_size)
    ax = fig.add_subplot(npic,2,count,xticks=[],yticks=[])
    ax.imshow(image_load)
    count += 1

    ## captions
    caption = predict_caption(image_feature.reshape(1,len(image_feature)))
    ax = fig.add_subplot(npic,2,count)
    plt.axis('off')
    ax.plot()
    ax.set_xlim(0,1)
    ax.set_ylim(0,1)
    ax.text(0,0.5,caption,fontsize=20)
    count += 1

plt.show()