# Import required packages

In [30]:
import os
import string
import numpy as np
import pandas as pd
import pickle
from tqdm import tqdm
import tensorflow as tf
from keras.applications import ResNet101
from keras.preprocessing import image
from keras.applications.resnet import preprocess_input, decode_predictions
from keras.models import Model
from keras.layers import GlobalAveragePooling2D, Reshape, concatenate
from keras.preprocessing.image import img_to_array,load_img

import cv2
from ultralytics import YOLO
from keras.layers import Dense, GlobalAveragePooling2D
import warnings
from sklearn.utils import shuffle

warnings.filterwarnings('ignore')

# Patch Features

In [None]:
def extract_features(directory, t_size):
    model = ResNet101(weights='imagenet', include_top=False)
    model = Model(inputs=model.input, outputs=model.get_layer('conv5_block3_out').output)
    # print(model.summary())
    features_all = []
    for name in tqdm(os.listdir(directory)[t_size-1000:t_size]):
        img = load_img(directory+'/'+name,target_size=(224,224))
        img_array = image.img_to_array(img)
        img_array = preprocess_input(img_array)
        img_array = img_array.reshape((1, *img_array.shape))
        features = model.predict(img_array, verbose=0)
        patch_size = 14
        features_reshaped = features.reshape((patch_size * patch_size, -1))
        features_all.append(features_reshaped)
    return np.array(features_all)

directory ='./flickr30k_images/flickr30k_images'
for t_size in range(1000, 31000, 1000):
    patch_features = extract_features(directory, t_size)
    with open('./Extracted_Features/resnet_features_'+str(round(t_size/1000))+'.pkl','wb') as f: 
        pickle.dump(patch_features, f)

# Object Detection

In [None]:
model = YOLO("./yolov8x.pt")       # for detection
base_model = ResNet101(weights='imagenet', include_top=False)


def extract_object_features(image, boxes, scores, top_n=3, output_dim=512):
    # Select top N predictions
    selected_indices = np.argsort(scores)[-top_n:][::-1]

    object_features = []
    for idx in (selected_indices):
        box = boxes[idx]

        # Extract region of interest (ROI) based on the bounding box
        x, y, w, h = map(int, box)
        roi = image[y:y+h, x:x+w]

        # Resize ROI to match ResNet50 input size
        roi = cv2.resize(roi, (224, 224))

        # Preprocess input for ResNet50
        roi = preprocess_input(np.expand_dims(roi, axis=0))

        # Use a pre-trained ResNet50 model to extract features
        x = base_model(roi)
        x = GlobalAveragePooling2D()(x)
        x = Dense(output_dim, activation='relu')(x)

        # Extracted features as a 1D vector
        object_vector = x.numpy().flatten()

        object_features.append(object_vector)
    if len(selected_indices) < top_n:
        for i in range(top_n - len(selected_indices)):
            try:
                object_features.append(np.zeros(object_vector.shape))
            except:
                object_features.append(np.zeros((512,)))

    return np.array(object_features)

# Example usage
directory ='./flickr30k_images/flickr30k_images'
for t_size in range(19000, 31000, 1000):
    all_object_features = []
    for name in tqdm(os.listdir(directory)[t_size-1000:t_size]):
        image_path = directory+'/'+name
        image = cv2.imread(image_path)

        results = model.predict(image, verbose = False)
        boxes = [list(np.array((b[:4]))) for b in results[0].boxes.data]
        scores = list(np.array(results[0].boxes.conf))
        classes = list(np.array(results[0].boxes.cls))

        boxes = list(np.array(boxes)[np.unique(classes, return_index=True)[1]])
        scores = list(np.array(scores)[np.unique(classes, return_index=True)[1]])

        # Extract object features
        object_features = extract_object_features(image, boxes, scores)
        all_object_features.append(object_features)

    # Convert the list of object features to a NumPy array
    all_object_features_array = np.array(all_object_features)
    with open('./Extracted_Features/yolo_features_'+str(round(t_size/1000))+'.pkl','wb') as f: 
        pickle.dump(all_object_features_array, f)

# Feature Concatenation

In [87]:
with open('./Extracted_Features/resnet_features_'+str(1)+'.pkl','rb') as f: 
    patch_features = pickle.load(f)
with open('./Extracted_Features/yolo_features_'+str(1)+'.pkl','rb') as f: 
    all_object_features_array = pickle.load(f)

for i in range(2,8):
    with open('./Extracted_Features/resnet_features_'+str(i)+'.pkl','rb') as f: 
        patch_features = np.concatenate((patch_features, pickle.load(f)))

    with open('./Extracted_Features/yolo_features_'+str(i)+'.pkl','rb') as f: 
        all_object_features_array = np.concatenate((all_object_features_array, pickle.load(f)))

In [88]:
image_features = np.concatenate((patch_features, all_object_features_array), axis=1)

# Caption Preprocessing

In [89]:
caption_df = pd.read_csv('./flickr30k_images/results.csv', sep='|')
caption_df['comment'] = caption_df['comment'].str.lower()
caption_df = caption_df.dropna()
caption_df = caption_df[caption_df['image_name'].isin(os.listdir(directory)[:7000])]
caption_df

Unnamed: 0,image_name,comment_number,comment
0,1000092795.jpg,0,two young guys with shaggy hair look at their ...
1,1000092795.jpg,1,"two young , white males are outside near many ..."
2,1000092795.jpg,2,two men in green shirts are standing in a yard .
3,1000092795.jpg,3,a man in a blue shirt standing in a garden .
4,1000092795.jpg,4,two friends enjoy time spent together .
...,...,...,...
34995,2570559405.jpg,0,three dogs are behind a rusty fence as one bar...
34996,2570559405.jpg,1,two dogs are looking through a rusty wire fence .
34997,2570559405.jpg,2,two dogs are in a fence and one is barking .
34998,2570559405.jpg,3,the dogs are behind the fence .


In [90]:
vocabulary = []
for txt in caption_df['comment']:
   vocabulary.extend(txt.split())
print('Vocabulary Size: %d' % len(set(vocabulary)))

Vocabulary Size: 10261


In [91]:
def remove_punctuation(text_original):
   text_no_punctuation = text_original.translate(string.punctuation)
   return(text_no_punctuation)

def remove_single_character(text):
   text_len_more_than1 = ""
   for word in text.split():
       if len(word) > 1:
           text_len_more_than1 += " " + word
   return(text_len_more_than1)

def remove_numeric(text):
   text_no_numeric = ""
   for word in text.split():
       isalpha = word.isalpha()
       if isalpha:
           text_no_numeric += " " + word
   return(text_no_numeric)

def text_clean(text_original):
   text = remove_punctuation(text_original)
   text = remove_single_character(text)
   text = remove_numeric(text)
   return(text)

for i, caption in tqdm(enumerate(caption_df['comment'].values)):
   newcaption = text_clean(caption)
   caption_df["comment"].iloc[i] = newcaption

34999it [00:06, 5440.62it/s]


In [92]:
clean_vocabulary = []
for txt in caption_df['comment'].values:
   clean_vocabulary.extend(txt.split())
print('Clean Vocabulary Size: %d' % len(set(clean_vocabulary)))

Clean Vocabulary Size: 9596


In [93]:
caption_df['comment'] = 'startseq ' + caption_df['comment']+ ' endseq'

In [94]:
train_descriptions = caption_df[['image_name', 'comment']].groupby('image_name')['comment'].apply(list).to_dict()

In [95]:
from keras.preprocessing.text import Tokenizer
from keras.utils import to_categorical
from keras.preprocessing.sequence import pad_sequences
from keras.layers import Input,Dense,LSTM,Embedding,Dropout,Flatten

def to_list(descriptions):
  all_desc_list = []
  for k,v in descriptions.items():
    for desc in v:
      all_desc_list.append(desc)
  return all_desc_list

def tokenization(descriptions):
  # list of all the descriptions
  all_desc_list = to_list(descriptions)  
  tokenizer = Tokenizer()
  tokenizer.fit_on_texts(all_desc_list)
  return tokenizer

# create tokenizer
tokenizer = tokenization(train_descriptions)

# word index is the dictionary /mappings of word-->integer
vocab_size = len(tokenizer.word_index)+1
print('Vocab size: ',vocab_size)

def max_length(descriptions):
  all_desc_list = to_list(descriptions)
  return (max(len(x.split()) for x in all_desc_list))


def create_sequences(tokenizer,desc_list,max_len,photo):
  X1,X2,y = [],[],[]
  # X1 will contain photo
  # X2 will contain current sequence
  # y will contain one hot encoded next word

  for desc in desc_list:
    # tokenize descriptions
    seq = tokenizer.texts_to_sequences([desc])[0]
    for i in range(1,len(seq)):
      # out seq is basically the next word in the sentence
      in_seq,out_seq = seq[:i],seq[i]
      # pad input sequence
      in_seq = pad_sequences([in_seq],maxlen=max_len)[0]
      # one hot encode output sequence
      out_seq = to_categorical([out_seq],num_classes=vocab_size)[0]
      X1.append(photo)
      X2.append(in_seq)
      y.append(out_seq)
  return np.array(X1),np.array(X2),np.array(y)

# maximum length that a description can have OR the biggest description we are having
max_len = max_length(train_descriptions)
print(max_len)

Vocab size:  9599
74


In [96]:
train_features = {}
j = 0
for i in os.listdir(directory)[:7000]:
    train_features[i] = image_features[j]#.flatten()
    j+=1

# Model Building and Training

In [97]:
def data_generator(descriptions,photos,tokenizer,max_len):
  while 1:
    for k,desc_list in descriptions.items():
      photo = photos[k]
      in_img,in_seq,out_seq = create_sequences(tokenizer,desc_list,max_len,photo)
      yield[[in_img,in_seq],out_seq]

def define_model(vocab_size, max_length):
    # image features extractor model
    inputs1 = Input(shape=(199,512,))
    fe1 = Dropout(0.5)(inputs1)
    fe1 = Flatten()(fe1)
    fe2 = Dense(256, activation='relu')(fe1)
 
    # input sequence model
    inputs2 = Input(shape=(max_length,))
     # embedding(input_dimension,output_dimension,)
     # input dim is always the vocabulary size 
    # output dimension tells the size of vector space in which the words will be embedded
    # mask zero is used when the input itself is 0 then to not confuse it with padded zeros it is used as True
    se1 = Embedding(vocab_size, 256, mask_zero=True)(inputs2)
    se2 = Dropout(0.5)(se1)
    se3 = LSTM(256)(se2)

    # decoder model OR output word model
    decoder1 = add([fe2, se3])
    decoder2 = Dense(256, activation='relu')(decoder1)
    outputs = Dense(vocab_size, activation='softmax')(decoder2)

    # tie it together [image, seq] [word]
    model = Model(inputs=[inputs1, inputs2], outputs=outputs)
    model.compile(loss='categorical_crossentropy', optimizer='adam')

    # summarize model
    print(model.summary())
    return model

In [98]:
model = define_model(vocab_size,max_len)
# epochs = 2
steps = len(train_descriptions)/1000
# for i in range(epochs):
generator = data_generator(train_descriptions,train_features,tokenizer,max_len)
model.fit_generator(generator,epochs=100,steps_per_epoch=steps,verbose=1)

Model: "model_13"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_26 (InputLayer)       [(None, 199, 512)]           0         []                            
                                                                                                  
 input_27 (InputLayer)       [(None, 74)]                 0         []                            
                                                                                                  
 dropout_24 (Dropout)        (None, 199, 512)             0         ['input_26[0][0]']            
                                                                                                  
 embedding_12 (Embedding)    (None, 74, 256)              2457344   ['input_27[0][0]']            
                                                                                           

<keras.src.callbacks.History at 0x116770e2d30>

# Model Evaluation

In [102]:
from nltk.translate.bleu_score import sentence_bleu,corpus_bleu
def int2word(tokenizer,integer):
  for word,index in tokenizer.word_index.items():
    if index==integer:
      return word
  return None

def predict_desc(model,tokenizer,photo,max_len):
  in_seq = 'startseq'
  for i in range(max_len):
    seq = tokenizer.texts_to_sequences([in_seq])[0]
    seq = pad_sequences([seq],maxlen=max_len)
    y_hat = model.predict([photo, seq],verbose=0)
    y_hat = np.argmax(y_hat)
    word = int2word(tokenizer,y_hat)
    if word==None:
      break
    in_seq = in_seq+' '+word
    if word=='endseq':
      break
  return in_seq

def evaluate_model(model,descriptions,photos,tokenizer,max_len):
  actual,predicted = [],[]
  for key,desc in tqdm(descriptions.items()):
    y_hat = predict_desc(model,tokenizer,photos[key].reshape(-1,199,512),max_len)
    references = [d.split() for d in desc]
    actual.append(references)
    predicted.append(y_hat.split())
  print('BLEU-1: %f' %corpus_bleu(actual,predicted,weights=(1.0,0,0,0)))
  print('BLEU-2: %f' %corpus_bleu(actual,predicted,weights=(0.5,0.5,0,0)))
  print('BLEU-3: %f' %corpus_bleu(actual,predicted,weights=(0.33,0.33,0.33,0)))
  print('BLEU-4: %f' %corpus_bleu(actual,predicted,weights=(0.25,0.25,0.25,0.25)))

In [8]:
evaluate_model(model,train_descriptions,train_features,tokenizer,max_len)

BLEU-1: 81.100000
BLEU-2: 65.440000
BLEU-3: 49.220000
BLEU-4: 39.850000
