In [None]:
import string
import numpy as np
import PIL.Image

from os import listdir
from pickle import dump, load
import matplotlib.pyplot as plt
from numpy import array
from numpy import argmax
import tensorflow as tf
from tensorflow.keras.applications.resnet import ResNet101, preprocess_input
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.models import Model, load_model
from tensorflow.keras.layers import Input, Dense, LSTM, Embedding, Dropout
from tensorflow.keras.callbacks import ModelCheckpoint , EarlyStopping

from nltk.translate.bleu_score import corpus_bleu

In [None]:
import tensorflow as tf
print("Num GPUs Available: ", len(tf.config.experimental.list_physical_devices('GPU')))

In [None]:
def define_model_resnet(vocab_size, max_length):
    
    # feature extractor model
    inputs1 = Input(shape=(2048,))
    fe1 = Dropout(0.5)(inputs1)
    fe2 = Dense(512, activation='relu')(fe1)

    # sequence model
    inputs2 = Input(shape=(max_length,))
    se1 = Embedding(vocab_size, 512, mask_zero=True)(inputs2)
    se2 = Dropout(0.5)(se1)
    se3 = LSTM(512)(se2)

    # decoder model
    decoder1 = tf.keras.layers.add([fe2, se3])
    decoder2 = Dense(256, activation='relu')(decoder1)
    outputs = Dense(vocab_size, activation='softmax')(decoder2)
    opt = tf.keras.optimizers.Adam(learning_rate=4e-4)
    # tie it together [image, seq] [word]
    model = Model(inputs=[inputs1, inputs2], outputs=outputs)
    model.compile(loss='categorical_crossentropy', optimizer=opt,metrics=['accuracy'])
    
    # summarize model
    #print(model.summary())
    
    return model

In [None]:
def load_photo_identifiers(filename):
    
    # Loading the file containing the list of photo identifier
    file = load_file(filename)
    
    # Creating a list for storing the identifiers
    photos = list()
    
    # Traversing the file one line at a time
    for line in file.split('\n'):
        if len(line) < 1:
            continue
        
        # Image name contains the extension as well but we need just the name
        identifier = line.split('.')[0]
        
        # Adding it to the list of photos
        photos.append(identifier)
        
    # Returning the set of photos created
    return set(photos)

In [None]:
def load_clean_descriptions(filename, photos):
    
    #loading the cleaned description file
    file = load_file(filename)
    
    #creating a dictionary of descripitions for storing the photo to description mapping of train images
    descriptions = dict()
    
    #traversing the file line by line
    for line in file.split('\n'):
        # splitting the line at white spaces
        words = line.split()
        
        # the first word will be the image name and the rest will be the description of that particular image
        image_id, image_description = words[0], words[1:]
        
        # we want to load only those description which corresponds to the set of photos we provided as argument
        if image_id in photos:
            #creating list of description if needed
            if image_id not in descriptions:
                descriptions[image_id] = list()
            
            #the model we will develop will generate a caption given a photo, 
            #and the caption will be generated one word at a time. 
            #The sequence of previously generated words will be provided as input. 
            #Therefore, we will need a ‘first word’ to kick-off the generation process 
            #and a ‘last word‘ to signal the end of the caption.
            #we will use 'startseq' and 'endseq' for this purpose
            #also we have to convert image description back to string
            
            desc = 'startseq ' + ' '.join(image_description) + ' endseq'
            descriptions[image_id].append(desc)
            
    return descriptions

In [None]:
# function to load the photo features created using the VGG16 model
def load_photo_features(filename, photos):
    
    #this will load the entire features
    all_features = load(open(filename, 'rb'))
    
    #we are interested in loading the features of the required photos only
    features = {k: all_features[k] for k in photos}
    
    return features

In [None]:
def load_file(filename):
    file = open(filename, 'r',encoding='utf-8')
    text = file.read()
    file.close()
    return text

In [None]:
def to_lines(descriptions):
    all_desc = list()
    for key in descriptions.keys():
        [all_desc.append(d) for d in descriptions[key]]
    return all_desc

In [None]:
filename = './datasets/Flickr8k_text/Flickr_8k.trainImages.txt'
train = load_photo_identifiers(filename)
train_descriptions = load_clean_descriptions('./datasets/descriptions.txt', train)
tokenizer = load(open('./flickr30k/tokenizer30k.pkl', 'rb'))
train_features = load_photo_features('./features_resnet.pkl', train)
vocab_size = len(tokenizer.word_index) + 1
lines = to_lines(train_descriptions)
max_length = 16

In [None]:
def data_generator(descriptions, photos, tokenizer, max_length,bs=32):
    while 1:
        for key, description_list in descriptions.items():
            #retrieve photo features
            photo = photos[key][0]
            input_image, input_sequence, output_word = create_sequences(tokenizer, max_length, description_list, photo)
            yield [input_image,input_sequence],output_word

def create_sequences(tokenizer, max_length, desc_list, photo):
    X1, X2, y = list(), list(), list()
    # walk through each description for the image
    for desc in desc_list:
        # encode the sequence
        seq = tokenizer.texts_to_sequences([desc])[0]
        # split one sequence into multiple X,y pairs
        for i in range(1, len(seq)):
            # split into input and output pair
            in_seq, out_seq = seq[:i], seq[i]
            # pad input sequence
            in_seq = pad_sequences([in_seq], maxlen=max_length)[0]
            # encode output sequence
            out_seq = to_categorical([out_seq], num_classes=vocab_size)[0]
            # store
            X1.append(photo)
            X2.append(in_seq)
            y.append(out_seq)
    return array(X1), array(X2), array(y)

In [None]:
model = define_model_resnet(vocab_size, 16)

In [None]:
model.summary()

In [None]:
steps = len(train_descriptions)
generator = data_generator(train_descriptions, train_features, tokenizer, 16)
filename = './saved8k/flick'
model.load_weights(filename)
history = model.fit(generator, epochs=5000,verbose=0,steps_per_epoch=16,callbacks = [ModelCheckpoint(filepath='final_model.h5',
        save_weights_only=True,                                                                                 
        monitor='loss',
        save_best_only=True)])


In [None]:
history.history['loss'][-1]

In [None]:
#plt.plot(history.epoch,history.history['accuracy'])
plt.plot(history.epoch,history.history['loss'],history.history['accuracy'])

In [None]:
filename = './saved8k/flick'
model.load_weights(filename)

# Predictions

In [None]:
def extract_features_resnet(filename):
    model = ResNet101(weights="imagenet")
    model = Model(inputs=model.inputs, outputs=model.layers[-2].output)
    image = load_img(filename, target_size=(224, 224))
    image = img_to_array(image)
    image = image.reshape((1, image.shape[0], image.shape[1], image.shape[2]))
    image = preprocess_input(image)
    feature = model.predict(image, verbose=0)
    return feature

def word_for_id(integer, tokenizer):
    for word, index in tokenizer.word_index.items():
        if index == integer:
            return word
    return None

def generate_desc(model, tokenizer, photo, max_length):
    in_text = 'startseq'
    for i in range(max_length):
        sequence = tokenizer.texts_to_sequences([in_text])[0]
        sequence = pad_sequences([sequence], maxlen=max_length)
        yhat = model.predict([photo,sequence], verbose=0)
        yhat = argmax(yhat)
        word = word_for_id(yhat, tokenizer)
        if word is None:
            break
        in_text += ' ' + word
        if word == 'endseq':
            break
    return in_text

In [None]:
path = "D://GP//Image_captioning//datasets/Flicker8k_Dataset/700884207_d3ec546494.jpg"
photo_resnet = extract_features_resnet(path)

In [None]:
description_resnet = generate_desc(model, tokenizer, photo_resnet, 16)
plt.imshow(plt.imread(path))
print("ResNet101 output: " + description_resnet.strip('startseqendseq'))