## Load the Data

In [None]:
# Read Text Captions.
def readTextFile(path):
   with open(path) as f:
       captions = f.read()
   return captions

captions = readTextFile("./Data/archive/Flickr_Data/Flickr_Data/Flickr_TextData/Flickr8k.token.txt")
#captions.split("\n") --> splits the captions about a new line character.
captions = captions.split("\n")[:-1] # [:-1] removes the last line. The last line was empty so we removed it.
len(captions)  

In [None]:
# Dictionary to map each Image with the list of  captions it has.


In [None]:
descriptions = {}

for x in captions:
    first,second = x.split('\t')
    img_name = first.split('.')[0]
    if descriptions.get(img_name) is None:
        descriptions[img_name] = []
    descriptions[img_name].append(second)
        

In [None]:
descriptions["1000268201_693b08cb0e"]

In [None]:
# plot Image
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
def plot_image(img):
    plt.figure()
    plt.imshow(img)
    plt.axis("off")
    plt.show()
    

In [None]:
# Load Images
!pip install opencv-python
import cv2
import os
def load_images_from_folder(folder):
    images = []
    for filename in os.listdir(folder):
        img = cv2.imread(os.path.join(folder,filename))
        img = cv2.cvtColor(img,cv2.COLOR_BGR2RGB) # Convert BGR Format to RGB.
        if img is not None:
            images.append(img)
    return images

In [None]:
folder = "./Data/archive/Flickr_Data/Flickr_Data/Images"
images = load_images_from_folder(folder)
print(len(images))
print(images[0])

In [None]:
for i in range(101):
    plot_image(images[i])

In [None]:
targetImage = cv2.imread(os.path.join(folder,"1000268201_693b08cb0e.jpg"))
targetImage = cv2.cvtColor(targetImage,cv2.COLOR_BGR2RGB) # Convert BGR Format to RGB
plot_image(targetImage)

### Data Cleaning

In [None]:
import re

In [None]:
def clean_text(sentence):
    sentence = sentence.lower()
    sentence = re.sub("[^a-z]+"," ",sentence)  #Replace every occurence of non alphabetical letters with a space.
    sentence = sentence.split() # Split the sentence into words
    
    sentence = [s for s in sentence if len(s)>1] # Remove words whose length = 1.
    sentence = " ".join(sentence) # Combines the words back to form a sentence.
    
    return sentence

In [None]:
clean_text("A cat is sitting over the house number 64")

In [None]:
# Clean all captions

In [None]:
for key,caption_list in descriptions.items():
    for i in range(len(caption_list)):
        caption_list[i] = clean_text(caption_list[i])

In [None]:
descriptions["1000268201_693b08cb0e"]

In [None]:
# Write the data to text file so that we dont have to do the above steps again and again.
f = open("descriptions.txt","w")
f.write(str(descriptions))
f.close()

## Create a Vocabulary(Set of all unique words which our model can predict)

In [None]:
import json

In [None]:
descriptions = None
# Load back the data we stored earlier from our text file.
with open("descriptions.txt",'r') as f:
    descriptions = f.read()

# Interprets descriptions as a json file and converts it into a python dictionary.
json_acceptable_string = descriptions.replace("'","\"") # Replace Single quotes with double quotes.
descriptions = json.loads(json_acceptable_string)
print(type(descriptions))
    

In [None]:
vocab = set()
vocab.update(["hello","apple"])
vocab.update(["hello","hii","Mango"])
vocab.update(["hi"])
print(vocab)

In [None]:
#Vocab
vocab = set()
for key in descriptions.keys():
    [vocab.update(sentence.split()) for sentence in descriptions[key]]
    
len(vocab)
    

In [None]:
# Total No of words across all the sentences.
total_words = []

In [None]:
for key in descriptions.keys():
    for sentence in descriptions[key]:
        temp = sentence.split()
        for word in temp:
            total_words.append(word)
        
print(len(total_words))

In [None]:
# Filter words whose frequency is >10.
count_frequency = {}
for word in total_words:
    if word not in count_frequency:
        count_frequency[word] = 1
    else:
        count_frequency[word]+=1


final_words = []
for key,value in count_frequency.items():
    if value>10:
        final_words.append(key)
        
print(len(final_words))

## Prepare Train/Test Data

In [None]:
train_file_data = readTextFile("./Data/archive/Flickr_Data/Flickr_Data/Flickr_TextData/Flickr_8k.trainImages.txt")
test_file_data = readTextFile("./Data/archive/Flickr_Data/Flickr_Data/Flickr_TextData/Flickr_8k.testImages.txt")

In [None]:
train_file_data[-1] # Last Row is null.

In [None]:
train = []
test = []
for row in train_file_data.split("\n")[:-1]:
    train.append(row.split(".")[0])
    
for row in test_file_data.split("\n")[:-1]:
    test.append(row.split(".")[0])
    
print(len(train))
print(len(test))

In [None]:
# Prepare Description for the training data
# Tweak - Add <s> and <e> token to our training data.

In [None]:
train_descriptions = {}
for img_id in train:
    train_descriptions[img_id] = [] 
    for caption in descriptions[img_id]:
        caption_to_append = "startseq " + caption + " endseq"
        train_descriptions[img_id].append(caption_to_append)


In [None]:
train_descriptions["1000268201_693b08cb0e"]

## Transfer Learning
* Convert Images to Features
* Convert Text to Features

### Image Feature Extraction

In [None]:
from tensorflow.keras.applications.resnet50 import ResNet50
from tensorflow.keras.preprocessing import image
from tensorflow.keras.applications.resnet50 import preprocess_input, decode_predictions
import numpy as np
import keras

In [None]:
model = ResNet50(weights="imagenet",input_shape=(224,224,3))
model.summary()

In [None]:
model.layers[-2]

In [None]:
model_new = keras.Model(inputs=model.input,outputs=model.layers[-2].output)

In [None]:
def preprocess_img(img):
    img = image.load_img(img,target_size=(224,224))
    img = image.img_to_array(img)
    img = np.expand_dims(img,axis=0)
    
    # Normalisation
    img = preprocess_input(img)
    return img   

In [None]:
img = preprocess_img("./Data/archive/Flickr_Data/Flickr_Data/Images/1000268201_693b08cb0e.jpg")
import matplotlib.pyplot as plt
plt.imshow(img[0])
plt.show()

In [None]:
def encode_image(img):
    img = preprocess_img(img)
    feature_vector = model_new.predict(img) 
    feature_vector = feature_vector.reshape((-1,)) # # reshape from (1, 2048) to (2048, )
    #print(feature_vector.shape)
    return feature_vector

In [None]:
encode_image("./Data/archive/Flickr_Data/Flickr_Data/Images/1000268201_693b08cb0e.jpg")

In [None]:
encoding_train = {}
# image_id --> feature_vector extracted from Resnet Image

for ix,img_id in enumerate(train):
    img_path = "./Data/archive/Flickr_Data/Flickr_Data/Images/"+img_id+".jpg"
    encoding_train[img_id] = encode_image(img_path)
    
    if(ix%100==0):
        print("Encoding in Progress Time Step %d "%ix)
        

In [None]:
# Store all the computed features to the disk.
import pickle
f = open('encoded_train_features.pkl','wb')
pickle.dump(encoding_train,f,)
f.close()

In [None]:
encoding_test = {}
# image_id --> feature_vector extracted from Resnet Image

for ix,img_id in enumerate(test):
    img_path = "./Data/archive/Flickr_Data/Flickr_Data/Images/"+img_id+".jpg"
    encoding_test[img_id] = encode_image(img_path)
    
    if(ix%100==0):
        print("Encoding in Progress Time Step %d "%ix)
        

In [None]:
# Store all the computed features to the disk.
import pickle
f = open('encoded_test_features.pkl','wb')
pickle.dump(encoding_test,f,)
f.close()

## Caption Preprocessing

In [None]:
len(final_words)

In [None]:
word_to_idx = {}
idx_to_word = {}
for i,word in enumerate(final_words):
    word_to_idx[word] = i+1
    idx_to_word[i+1] = word

In [None]:
idx_to_word[1]

In [None]:
# Add two special words 'startseq' and 'endseq'
idx_to_word[1846] = 'startseq'
word_to_idx['startseq'] = 1846
idx_to_word[1847] = 'endseq'
word_to_idx['endseq'] = 1847

vocab_size = len(word_to_idx) + 1 
print("Vocab Size = ",vocab_size)

In [None]:
# Find the maximum length(maxm number of words) of a caption.
max_len = 0
for key in train_descriptions.keys():
    for cap in train_descriptions[key]:
        max_len = max(max_len,len(cap.split()))
        
print(max_len)

## Data Loader (Generator)

In [1]:
arr = [1,2,3,4,5]
for i in range(1,len(arr)):
    print(arr[0:i])

[1]
[1, 2]
[1, 2, 3]
[1, 2, 3, 4]


In [4]:
# Make Custom Data Loader
from keras import preprocessing
import keras
from keras.utils.np_utils import to_categorical

def data_generator(train_descriptions,encoding_train,word_to_idx,max_len,batch_size):
    X1,X2,y = [],[],[]
    n = 0
    while True:
        for key,desc_list in train_descriptions.items():
            n+=1
            
            photo = encoding_train[key]
            for desc in desc_list:
                seq = [word_to_idx[word] for word in desc.split() if word in word_to_idx]
                for i in range(1,len(seq)):
                    xi = seq[0:i]
                    yi = seq[i]
                    
                    #0 denotes padding words.
                    xi = preprocessing.sequence.pad_sequences([xi],maxlen=max_len,value=0,padding='post') #[xi] -> it accepts 2D
                    xi = xi[0]
                    
                    yi = to_categorical([yi],num_classes=vocab_size)[0]
                    
                    X1.append(photo)
                    X2.append(xi)
                    y.append(yi)
                    
                if n==batch_size:
                    #print(np.array(X1).shape,np.array(X2).shape,np.array(y).shape)
                    yield [[np.array(X1),np.array(X2)],np.array(y)]
                    X1,X2,y = [],[],[]
                    n = 0
                    

In [3]:
## Word Embeddings

In [None]:
f = open("./saved/glove.6B.50d.txt",encoding='utf-8')

In [None]:
embedding_index = {}

for line in f:
    values = line.split()
    #print(values)
    word = values[0]
    word_embedding = np.array(values[1:],dtype='float')
    embedding_index[word] = word_embedding

In [None]:
f.close()

In [None]:
def get_embedding_matrix():
    emb_dim = 50
    matrix = np.zeros((vocab_size,emb_dim))
    for word,idx in word_to_idx.items():
        embedding_vector = embedding_index.get(word)
        
        if embedding_vector is not None:
            matrix[idx] = embedding_vector      
    return matrix

In [None]:
embedding_matrix = get_embedding_matrix()

In [None]:
print(embedding_matrix.shape)

In [None]:
#embedding_matrix[1847] # startseq and endseq are not present in glove embeddings. So, they give 0.

## Model Architecture

In [None]:
from tensorflow.keras import layers

In [None]:
input_img_features = keras.Input(shape=(2048,))
inp_img1 = layers.Dropout(0.3)(input_img_features)
inp_img2 = layers.Dense(256,activation='relu')(inp_img1)

In [None]:
# Captions as Input
input_captions = keras.Input(shape=(max_len,))
inp_cap1 = layers.Embedding(input_dim=vocab_size,output_dim=50,mask_zero=True)(input_captions)
inp_cap2 = layers.Dropout(0.3)(inp_cap1)
inp_cap3 = layers.LSTM(256)(inp_cap2) # 256-> size of output.

In [None]:
decoder1 = layers.add([inp_img2,inp_cap3]) # Add these two tensors.
decoder2 = layers.Dense(256,activation='relu')(decoder1)
outputs = layers.Dense(vocab_size,activation='softmax')(decoder2)

In [None]:
model = keras.Model(inputs=[input_img_features,input_captions],outputs=outputs)

In [None]:
model.summary()

In [None]:
model.layers[2].set_weights([embedding_matrix])
model.layers[2].trainable = False

In [None]:
model.compile(loss='categorical_crossentropy',optimizer="adam")

### Training of Model

In [None]:
epochs = 20
batch_size = 3
steps = len(train_descriptions)//batch_size

In [None]:
def train():
    for i in range(epochs):
        generator = data_generator(train_descriptions,encoding_train,word_to_idx,max_len,batch_size)
        model.fit_generator(generator,epochs=1,steps_per_epoch=steps,verbose=1) # Here epochs means how many times we want the batch to pass through current epoch.
        model.save("./model_weights/model_"+str(i)+'.h5')

In [None]:
train()

In [None]:
from keras.models import load_model

In [None]:
model= load_model('./model_weights/model_9.h5')

In [None]:
def predict_caption(photo):
  in_text = "startseq"
  for i in range(max_len):
    sequence = [word_to_idx[w] for w in in_text.split() if w in word_to_idx]
    sequence = preprocessing.sequence.pad_sequences([sequence],maxlen=max_len,padding='post')

    ypred = model.predict([photo,sequence])
    ypred = ypred.argmax() # Word with max prob always --> Greedy Sampling
    word = idx_to_word[ypred]
    in_text += (' '+word)

    if word == 'endseq':
      break

  final_caption = in_text.split()[1:-1] # Ignore first and last words.
  final_caption = ' '.join(final_caption)

  return final_caption

In [None]:
# Pick some Random Images and See Results
for i in range(15):
  idx = np.random.randint(0,1000) # Generate a random number between 0 and 1000.
  all_img_names = list(encoding_test.keys())
  img_name = all_img_names[idx]
  photo_2048 = encoding_test[img_name].reshape((1,2048))
  caption = predict_caption(photo_2048)
  i = plt.imread("./Data/archive/Flickr_Data/Flickr_Data/Images/"+img_name+".jpg")
  plt.imshow(i)
  plt.axis("off")
  plt.show()
  print(caption)