# Image Captioning

In this Project, we will use Flick8k dataset and try to produce image captions on test data. 

<h4>Note :</h4>
My folder names are diiferent in the dataset, which have not been included. So you have to change the folder names accordingly if you want to see the results.
Sorry for inconvinience :p

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from keras.models import Model, load_model
import re
from keras.preprocessing import image
import pickle
from keras.preprocessing.sequence import pad_sequences
from keras.utils import to_categorical
from keras.layers import *

## About Dataset :

Flickr 8k dataset has approx. 8k images and corresponding to each image, 5 captions are provided.

To load images and captions for training, cross-validation and testing purposes, text files have been made which can be read and appropriate actions can be taken.

These files are also very helpful in making the data generator for caption bot model, which is an important step for this project as dataset is too large to be loaded in RAM and work with it.  

In [None]:
with open("./flicker8k-dataset/Flickr8k_text/Flickr8k.token.txt") as f:
    captions = f.read()                                                   # Read the captions file.

In [None]:
captions = captions.split("\n")[:-1]   # Last string is empty , so we remove it

In [None]:
descriptions = {}                              # descriptions -> dictionary in the form of image id: captions

for ele in captions:
    i_to_c = ele.split('\t')
    img_name = i_to_c[0].split('.')[0]         # Image name
    cap = i_to_c[1]
    
    if descriptions.get(img_name) is None:
        descriptions[img_name] = []
        
    descriptions[img_name].append(cap)

In [None]:
len(descriptions)                 

## Data Cleaning

In [None]:
# 1. lower case
# 2. remove punctuations
# 3. remove words length less than 2 ... because punction removal may lead to residual letters, like s,t etc.


def clean_text(sample):
    sample = sample.lower()
    
    sample = re.sub("[^a-z]+", " ", sample)
    
    sample = sample.split()
    
    sample = [s for s in sample if len(s)>1] # list comprehension
    
    sample = " ".join(sample)
    return sample

In [None]:
# modifiying all captions - cleaned captions
# Note : This process takes time, so after it's complete, we need to save the dictionary

for key, desc_list in descriptions.items():
    for i in range(len(desc_list)):
        desc_list[i] = clean_text(desc_list[i])

In [None]:
f = open('descriptions.txt', 'w')                     
f.write(str(descriptions))                  # Save the description as a string (.txt)
f.close()

## Building Vocabulary

This is an important step in any language model. Here, we first create a set of words as an initial step, and then we further filter out words depending on their frequency. 

This Voacb is important, as it will later help us in creating word embeddings using GloVe 

In [None]:
# finding unique vocabulary

vocabulary = set()

for key in descriptions.keys():
    [ vocabulary.update(i.split()) for i in descriptions[key]]
    

In [None]:
print("vocabulary size : " , len(vocabulary))

In [None]:
# All words in description dictionary

all_words = []

for key in descriptions.keys():
    [ all_words.append(i) for des in descriptions[key] for i in des.split()]

In [None]:
print("total words appearing : " , len(all_words))

### Filter excess words from vocab

In [None]:
from collections import Counter

counter = Counter(all_words)

In [None]:
dic_ = dict(counter)

In [None]:
# Sort acording to frequency 

sorted_dic = sorted(dic_.items(), key = lambda x: x[1], reverse=True)

In [None]:
threshold_value = 10   # Words with frequency less than 10 in the corpus to be discarded

d = [x for x in sorted_dic if x[1]>threshold_value]

In [None]:
len(d)

In [None]:
filtered_words = [x[0] for x in d]

In [None]:
len(filtered_words)

# Load Training and Testing Data

In [None]:
# Read Train File

with open('flicker8k-dataset/Flickr8k_text/Flickr_8k.trainImages.txt') as f:
    train = f.read()

In [None]:
train = [e[:-4] for e in train.split('\n')[:-1]]    # remove .jpg from the image name

In [None]:
# Read test File

with open('flicker8k-dataset/Flickr8k_text/Flickr_8k.testImages.txt') as f:
    test = f.read()

In [None]:
test = [e[:-4] for e in test.split('\n')[:-1]]    # remove .jpg from the image name

### Trigger Words 

Here, two trigger words startseq and endseq have been added, whose utility will be explained later

In [None]:
train_descriptions = {}                      # train_description -> training dictionary 

for t in train:
    train_descriptions[t] = []
    for cap in descriptions[t]:
        cap_to_append = "startseq " + cap + " endseq"
        train_descriptions[t].append(cap_to_append)

In [None]:
len(train_descriptions)

# Data Preprocessing - Images

In this project, we will use ResNet50 model and weights for feature extraction and creating feature map and use the second last GAP layer as output i.e encoded image (dim = (2048,)).

In [None]:
from keras.applications.resnet50 import ResNet50, preprocess_input

In [None]:
model = ResNet50(weights = 'imagenet', input_shape = (224,224,3))

In [None]:
model.summary()

In [None]:
model_new = Model(inputs = model.input, outputs =  model.layers[-2].output)

In [None]:
def preprocess_image(img):                            
    img = image.load_img(img, target_size=(224,224))     # Preprocess input according to ResNet requirements 
    img = image.img_to_array(img)
    img = preprocess_input(img)
    img = np.expand_dims(img, axis = 0)

    return img

In [None]:
def encode_image(img):
    img = preprocess_image(img)
    fea_vec = model_new.predict(img)
    fea_vec = fea_vec.reshape(fea_vec.shape[1], )
    return fea_vec

In [None]:
images = "./flicker8k-dataset/Flickr8k_Dataset/"

In [None]:
# Note : This process takes about 30min , depending on the PC . 
# So saving the file after the process is important.

encoding_train = {}

for ix, img in enumerate(train):
    
    img = images+train[ix]+".jpg"
    
    p = encode_image(img)
    
    encoding_train[img[len(images):]] = p
    
    
    if ix%100 == 0:
        print("Encoding image :" + str(ix))          # Printing after every 100th image

In [None]:
# Note : This process takes about 15min , depending on the PC . 
# So saving the file after the process is important.

encoding_test = {}

for ix, img in enumerate(test):
    
    img = images+test[ix]+".jpg"
    
    p = encode_image(img)
    
    encoding_test[ img[len(images):] ] = p
    
    
    if ix%100 == 0:
        print("Encoding image :" + str(ix))

#### Save the files as pickle files

In [None]:
# saving/dumping features to disk

with open("./encoded_train_images.pkl", 'wb') as f:
    pickle.dump(encoding_train, f )

In [None]:
# loading pickle file

with open("./encoded_train_images.pkl", 'rb') as f:
    encoding_train = pickle.load(f)

    
with open("./encoded_test_images.pkl", 'rb') as f:
    encoding_test = pickle.load(f)

In [None]:
len(encoding_train)

In [None]:
len(encoding_test)

## Data Preprocessing - Captions

In [None]:
word_to_idx = {}              
idx_to_word = {}

ix = 1

for e in filtered_words:
    word_to_idx[e] = ix
    idx_to_word[ix] = e
    
    ix +=1

In [None]:
word_to_idx['startseq'] = 1846
word_to_idx['endseq'] = 1847

idx_to_word[1846] = 'startseq'        # Add trigger word 'startseq'
idx_to_word[1847] = 'endseq'          # Add trigger word 'endseq'     

In [None]:
vocab_size = len(idx_to_word) + 1
print(vocab_size)

In [None]:
all_caption_len = []

for key in train_descriptions.keys():
    for cap in train_descriptions[key]:
        all_caption_len.append(len(cap.split()))

In [None]:
max_len = max(all_caption_len)
print(max_len)

The Last two cells are used to find the max lengh of the senetence .... which is useful for padding the sentences

## Generator Function

As mentioned earlier, that a generator function is required due to large file size

In [None]:
def data_generator(train_descriptions, encoding_train, word_to_idx, max_len, batch_size):
    X1, X2, y = [], [], []
    
    n=0
    
    while True:
        
        for key, desc_list in train_descriptions.items():
            n+=1
            
            photo = encoding_train[key+'.jpg']
            
            for desc in desc_list:
                
                seq = [word_to_idx[word] for word in desc.split() if word in word_to_idx]
                
                
                for i in range(1, len(seq)):
                    in_seq = seq[0:i]
                    out_seq = seq[i]
                    
                    in_seq = pad_sequences( [in_seq], maxlen=max_len, value= 0, padding='post')[0]
                
                    out_seq = to_categorical([out_seq], num_classes=vocab_size)[0]
                    
                    X1.append(photo)
                    X2.append(in_seq)
                    y.append(out_seq)
                    
            if n%batch_size == 0 :
                yield [[np.array(X1), np.array(X2)] , np.array(y) ]   #Yield instead of return 
                X1, X2, y = [], [], []

In [None]:
for i in data_generator(train_descriptions, encoding_train, word_to_idx, max_len, 3):
    X, y = i
    print(X[0].shape)
    print(X[1].shape)
    print(y.shape)
    break

## Word Embeddings

In [None]:
embeddings = {}

with open('./GloVE/glove.6B.50d.txt', 'r', encoding='utf-8') as f:
    for line in f:
        values = line.split()
        word = values[0]
        coeffs = np.array(values[1:], dtype="float32")
        
        embeddings[word] = coeffs

In [None]:
def getOutputEmbeddings():

    emb_dim = 50
    embedding_matrix_output = np.zeros((vocab_size, emb_dim ))
    
    for word, idx in word_to_idx.items():
        
        emb_vec = embeddings.get(word)
        
        if emb_vec is not None:
            embedding_matrix_output[idx] = emb_vec
            
    return embedding_matrix_output

In [None]:
embedding_output = getOutputEmbeddings()

In [None]:
embedding_output.shape

## Model Architecture

In [None]:
#  image feature extractor model

input_img_feat = Input(shape=(2048,))
inp_img1 = Dropout(0.3)(input_img_feat)
inp_img2 = Dense(256, activation='relu')(inp_img1)

In [None]:
# partial caption sequence model

input_cap = Input(shape=(max_len,))
inp_cap1 = Embedding(input_dim= vocab_size, output_dim=50, mask_zero=True)(input_cap)
inp_cap2 = Dropout(0.3)(inp_cap1)
inp_cap3 = LSTM(256)(inp_cap2)

In [None]:
decoder1 = add([inp_img2, inp_cap3])

decoder2 = Dense(256, activation='relu')(decoder1)
output = Dense(vocab_size, activation='softmax')(decoder2)


model = Model(inputs = [input_img_fea, input_cap]  , outputs =  output )

In [None]:
model.summary()

In [None]:
model.layers[2].set_weights([embedding_output])
model.layers[2].trainable = False

In [None]:
model.compile(loss="categorical_crossentropy", optimizer='adam')

## Model Training

In [None]:
epochs = 10
batch_size = 3
steps = len(train_descriptions)//batch_size

In [None]:
for i in range(epochs):
    generator = data_generator(train_descriptions, encoding_train, word_to_idx, max_len, batch_size)
    model.fit_generator(generator, epochs=1, steps_per_epoch=steps, verbose=1)
    model.save("best_model.h5")

In [None]:
model = load_model("model_weights/best_model.h5")

## Predictions

In [None]:
def predict(photo_enc):
    in_text = "startseq"
    
    for i in range(max_len):
        sequence = [word_to_idx[word] for word in in_text.split() if word in word_to_idx]
        sequence = pad_sequences([sequence], maxlen=max_len, padding='post')
        
        y_pred = model.predict([photo_enc, sequence])
        y_pred = np.argmax(y_pred)
        word = idx_to_word[y_pred]
        
        in_text += " "+word
        
        if word == 'endseq':
            break
        
        
    final_caption = in_text.split()
    final_caption = final_caption[1:-1]
    final_caption = " ".join(final_caption)
    return final_caption

In [None]:
rn = np.random.randint(0,1000)
img_id = list(encoding_test.keys())[rn]             # Random image to be tested

photo_enc = encoding_test[img_id].reshape((1,2048))
pred = predict(photo_enc)
print(pred)

path = images + img_id
img = plt.imread(path)
plt.imshow(img)
plt.show()