In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
from numpy import array
from pickle import load,dump
import pandas as pd
import numpy as np
from keras.preprocessing.text import Tokenizer
from keras.preprocessing.sequence import pad_sequences
from keras.utils import to_categorical
from keras.utils import plot_model
from keras.models import Model
from keras.layers import Input
from keras.layers import Dense
from keras.layers import LSTM
from keras.layers import Embedding
from keras.layers import Dropout
from keras.layers import Bidirectional
from keras.layers import *
from keras.callbacks import ModelCheckpoint
import pydot
from numpy import argmax
from nltk.translate.bleu_score import corpus_bleu
import  pickle
import tensorflow as tf

In [4]:
import pickle
def load_doc(filename):
	# open the file as read only
	file = open(filename, 'r')
	# read all text
	text = file.read()
	# close the file
	file.close()
	return text

# load a pre-defined list of photo identifiers
def load_set(filename):
	doc = load_doc(filename)
	dataset = list()
	# process line by line
	for line in doc.split('\n'):
		# skip empty lines
		if len(line) < 1:
			continue
		# get the image identifier
		identifier = line.split(' ')[0]
		dataset.append(identifier)
	return set(dataset)

# load clean descriptions into memory
def load_clean_descriptions(filename, dataset):
    document = open(filename, 'r', encoding='utf-8').read()
    descriptions = dict()
    for line in document.strip().split('\n'):
        tokens = line.split()
        if len(tokens) < 2:  # Check if the line has at least two elements
            print(f"Skipping problematic line: {line}")  # Print problematic lines
            continue  # Skip this iteration if the condition is not met

        image_id, image_desc = tokens[0], tokens[1:]
        if image_id in dataset:
            if image_id not in descriptions:
                descriptions[image_id] = list()
            desc = 'startseq ' + ' '.join(image_desc) + ' endseq'
            descriptions[image_id].append(desc)
    return descriptions


# load photo features
def load_photo_features(filename, dataset):
    # load all features
    all_features = pickle.load(open(filename, 'rb'))
    #print(all_features)
    # filter features
    features = {k: all_features[k] for k in dataset}
    return features

# covert a dictionary of clean descriptions to a list of descriptions
def to_lines(descriptions):
	all_desc = list()
	for key in descriptions.keys():
		[all_desc.append(d) for d in descriptions[key]]
	return all_desc

# fit a tokenizer given caption descriptions
def create_tokenizer(descriptions):
	lines = to_lines(descriptions)
	tokenizer = Tokenizer()
	tokenizer.fit_on_texts(lines)
	return tokenizer

# calculate the length of the description with the most words
def max_length(descriptions):
	lines = to_lines(descriptions)
	return max(len(d.split()) for d in lines)

# create sequences of images, input sequences and output words for an image
def create_sequences(tokenizer, max_length, desc_list, photo):
	X1, X2, y = list(), list(), list()
	# walk through each description for the image
	for desc in desc_list:
		# encode the sequence
		seq = tokenizer.texts_to_sequences([desc])[0]
		# split one sequence into multiple X,y pairs
		for i in range(1, len(seq)):
			# split into input and output pair
			in_seq, out_seq = seq[:i], seq[i]
			# pad input sequence
			in_seq = pad_sequences([in_seq], maxlen=max_length)[0]
			# encode output sequence
			out_seq = to_categorical([out_seq], num_classes=vocab_size)[0]
			# store
			X1.append(photo)
			X2.append(in_seq)
			y.append(out_seq)
	return array(X1), array(X2), array(y)

#data generator, intended to be used in a call to model.fit_generator()
def data_generator(descriptions, photos, tokenizer, max_length, vocab_size):
    while True:
        for key, desc_list in descriptions.items():
            photo = photos.get(key)
            if photo is None:
                continue
            photo = photo[0]
            for desc in desc_list:
                seq = tokenizer.texts_to_sequences([desc])[0]
                for i in range(1, len(seq)):
                    in_seq, out_seq = seq[:i], seq[i]
                    in_seq = pad_sequences([in_seq], maxlen=max_length, padding='post')[0]
                    out_seq = to_categorical([out_seq], num_classes=vocab_size)[0]
                    # Ensure the output is a tuple of tuples
                    yield ((np.array(photo, dtype=np.float32), np.array(in_seq, dtype=np.int32)), np.array(out_seq, dtype=np.float32))

# load training dataset (6K)
filename = 'drive/My Drive/MLE_NEW/Captions/train_captions.txt'
train = load_set(filename)
#print(sorted(train))
print('Dataset: %d' % len(train))
# descriptions
train_descriptions = load_clean_descriptions('drive/My Drive/MLE_NEW/Captions/train_captions.txt', train)
print('Descriptions: train=%d' % len(train_descriptions))
# photo features
#print(train)
#all_features = load(open('features_inceptionv3_uc.pkl', 'rb'))
#print(all_features)
train_features = load_photo_features('drive/My Drive/MLE_NEW/Features/train_features.pkl', train)
print('Photos: train=%d' % len(train_features))
# prepare tokenizer
tokenizer = create_tokenizer(train_descriptions)
#dump(tokenizer,open('tokenizer_resnet152.pkl','wb'))
vocab_size = len(tokenizer.word_index) + 1
print('Vocabulary Size: %d' % vocab_size)
## determine the maximum sequence length
max_length = max_length(train_descriptions)
print('Description Length: %d' % max_length)


Dataset: 1176
Descriptions: train=1176
Photos: train=1176
Vocabulary Size: 311
Description Length: 24


In [6]:
# load val set
filename = 'drive/My Drive/MLE_NEW/Captions/val_captions.txt'
val = load_set(filename)
print('Dataset: %d' % len(val))
# descriptions
val_descriptions = load_clean_descriptions('drive/My Drive/MLE_NEW/Captions/val_captions.txt', val)
print('Descriptions: val=%d' % len(val_descriptions))
# photo features
val_features = load_photo_features('drive/My Drive/MLE_NEW/Features/val_features.pkl', val)
print('Photos: test=%d' % len(val_features))

Dataset: 252
Descriptions: val=252
Photos: test=252


In [26]:
from keras import backend as K
from keras.layers import Layer
from keras import initializers, regularizers, constraints

#@register_keras_serializable()
class Attention(Layer):
    def __init__(self, step_dim=max_length,
                 W_regularizer=None, b_regularizer=None,
                 W_constraint=None, b_constraint=None,
                 bias=True, **kwargs):
        self.supports_masking = True
        self.init = initializers.get('glorot_uniform')

        self.W_regularizer = regularizers.get(W_regularizer)
        self.b_regularizer = regularizers.get(b_regularizer)

        self.W_constraint = constraints.get(W_constraint)
        self.b_constraint = constraints.get(b_constraint)

        self.bias = bias
        self.step_dim = step_dim
        self.features_dim = 0
        super(Attention, self).__init__(**kwargs)

    def build(self, input_shape):
        assert len(input_shape) == 3

        self.W = self.add_weight(shape=(input_shape[-1],),
                                 initializer=self.init,
                                 name='{}_W'.format(self.name),
                                 regularizer=self.W_regularizer)
        self.features_dim = input_shape[-1]

        if self.bias:
            self.b = self.add_weight(shape=(input_shape[1],),
                                     initializer='zero',
                                     name='{}_b'.format(self.name),
                                     regularizer=self.b_regularizer)

        else:
            self.b = None

        self.built = True

    def compute_mask(self, input, input_mask=None):
        return None

    def call(self, x, mask=None):
        features_dim = self.features_dim
        step_dim = self.step_dim

        eij = K.reshape(K.dot(K.reshape(x, (-1, features_dim)),
                        K.reshape(self.W, (features_dim, 1))), (-1, step_dim))

        if self.bias:
            eij += self.b

        eij = K.tanh(eij)

        a = K.exp(eij)

        if mask is not None:
            a *= K.cast(mask, K.floatx())

        a /= K.cast(K.sum(a, axis=1, keepdims=True) + K.epsilon(), K.floatx())

        a = K.expand_dims(a)
        weighted_input = x * a
        return K.sum(weighted_input, axis=1)

    def compute_output_shape(self, input_shape):
        return input_shape[0],  self.features_dim

    def get_config(self):
        config = super(Attention, self).get_config()
        config.update({
            'step_dim': self.step_dim,
            'W_regularizer': initializers.serialize(self.W_regularizer),
            'b_regularizer': initializers.serialize(self.b_regularizer),
            'W_constraint': constraints.serialize(self.W_constraint),
            'b_constraint': constraints.serialize(self.b_constraint),
            'bias': self.bias
        })
        return config

In [27]:
from keras.optimizers import Adam
from keras.layers import Input, Dense, LSTM, Embedding, Concatenate, RepeatVector, TimeDistributed, Bidirectional, Dropout
from keras.models import Model

#from keras.utils.np_utils import to_categorical

def define_model(vocab_size, max_length):
    inputs1 = Input(shape=(2048,))
    fe2 = Dense(256, activation='relu')(inputs1)
    fe3 = RepeatVector(max_length)(fe2)

    inputs2 = Input(shape=(max_length,))
    se1 = Embedding(vocab_size, 256, mask_zero=True)(inputs2)
    se2 = LSTM(256,return_sequences=True)(se1)
    se3 = TimeDistributed(Dense(256,activation='relu'))(se2)

    decoder1 = concatenate([fe3, se3])
    decoder2 = Bidirectional(LSTM(256,return_sequences=True))(decoder1)
    decoder3 = Attention(max_length)(decoder2)
    outputs = Dense(vocab_size, activation='softmax')(decoder3)

    model = Model(inputs=[inputs1, inputs2], outputs=outputs)
    optimizer = Adam(learning_rate=0.001)
    model.compile(loss='categorical_crossentropy', optimizer=optimizer)
    model.summary()
    return model

model = define_model(vocab_size, max_length)

Model: "model_3"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_8 (InputLayer)        [(None, 24)]                 0         []                            
                                                                                                  
 input_7 (InputLayer)        [(None, 2048)]               0         []                            
                                                                                                  
 embedding_3 (Embedding)     (None, 24, 256)              79616     ['input_8[0][0]']             
                                                                                                  
 dense_9 (Dense)             (None, 256)                  524544    ['input_7[0][0]']             
                                                                                            

In [28]:
def make_dataset(generator_func, descriptions, photos, tokenizer, max_length, vocab_size):
    output_signature = (
        (
            tf.TensorSpec(shape=(2048,), dtype=tf.float32),
            tf.TensorSpec(shape=(max_length,), dtype=tf.int32)
        ),
        tf.TensorSpec(shape=(vocab_size,), dtype=tf.float32)
    )
    return tf.data.Dataset.from_generator(
        lambda: generator_func(descriptions, photos, tokenizer, max_length, vocab_size),
        output_signature=output_signature
    )

# Prepare the dataset
train_dataset = make_dataset(data_generator, train_descriptions, train_features, tokenizer, max_length, vocab_size)
train_dataset = train_dataset.batch(16)

val_dataset = make_dataset(data_generator, val_descriptions, val_features, tokenizer, max_length, vocab_size)
val_dataset = val_dataset.batch(16)

# Fit the model
model.fit(train_dataset, epochs=125, steps_per_epoch=74)

Epoch 1/125
Epoch 2/125
Epoch 3/125
Epoch 4/125
Epoch 5/125
Epoch 6/125
Epoch 7/125
Epoch 8/125
Epoch 9/125
Epoch 10/125
Epoch 11/125
Epoch 12/125
Epoch 13/125
Epoch 14/125
Epoch 15/125
Epoch 16/125
Epoch 17/125
Epoch 18/125
Epoch 19/125
Epoch 20/125
Epoch 21/125
Epoch 22/125
Epoch 23/125
Epoch 24/125
Epoch 25/125
Epoch 26/125
Epoch 27/125
Epoch 28/125
Epoch 29/125
Epoch 30/125
Epoch 31/125
Epoch 32/125
Epoch 33/125
Epoch 34/125
Epoch 35/125
Epoch 36/125
Epoch 37/125
Epoch 38/125
Epoch 39/125
Epoch 40/125
Epoch 41/125
Epoch 42/125
Epoch 43/125
Epoch 44/125
Epoch 45/125
Epoch 46/125
Epoch 47/125
Epoch 48/125
Epoch 49/125
Epoch 50/125
Epoch 51/125
Epoch 52/125
Epoch 53/125
Epoch 54/125
Epoch 55/125
Epoch 56/125
Epoch 57/125
Epoch 58/125
Epoch 59/125
Epoch 60/125
Epoch 61/125
Epoch 62/125
Epoch 63/125
Epoch 64/125
Epoch 65/125
Epoch 66/125
Epoch 67/125
Epoch 68/125
Epoch 69/125
Epoch 70/125
Epoch 71/125
Epoch 72/125
Epoch 73/125
Epoch 74/125
Epoch 75/125
Epoch 76/125
Epoch 77/125
Epoch 78

<keras.src.callbacks.History at 0x78110b69ce20>

In [29]:
model.save('drive/My Drive/MLE_NEW/Main_model.keras')

# Evaluation BLEU Score

In [30]:
#val_dataset = data_generator(val_descriptions, val_features, tokenizer, max_length, vocab_size)
val_steps = len(val_descriptions)

In [31]:
def generate_caption(model, image, tokenizer, max_length):
    in_text = 'startseq'
    for i in range(max_length):
        sequence = tokenizer.texts_to_sequences([in_text])[0]
        sequence = pad_sequences([sequence], maxlen=max_length)
        yhat = model.predict([np.array([image]), sequence], verbose=0)
        yhat = np.argmax(yhat)
        word = tokenizer.index_word[yhat] if yhat in tokenizer.index_word else None
        if word is None or word == 'endseq':
            break
        in_text += ' ' + word
    return in_text

In [32]:
from nltk.translate.bleu_score import corpus_bleu, SmoothingFunction

def evaluate_model(model, descriptions, photos, tokenizer, max_length):
    actual, predicted = [], []
    # Function to generate captions for the photos
    for key, desc_list in descriptions.items():
        yhat = generate_caption(model, photos[key][0], tokenizer, max_length)
        actual.append([d.split() for d in desc_list])
        predicted.append(yhat.split())

    # Create a SmoothingFunction object
    chencherry = SmoothingFunction()

    # Calculate BLEU scores with smoothing
    print('BLEU-1: %f' % corpus_bleu(actual, predicted, weights=(1.0, 0, 0, 0), smoothing_function=chencherry.method1))
    print('BLEU-2: %f' % corpus_bleu(actual, predicted, weights=(0.5, 0.5, 0, 0), smoothing_function=chencherry.method1))
    print('BLEU-3: %f' % corpus_bleu(actual, predicted, weights=(0.33, 0.33, 0.33, 0), smoothing_function=chencherry.method1))
    print('BLEU-4: %f' % corpus_bleu(actual, predicted, weights=(0.25, 0.25, 0.25, 0.25), smoothing_function=chencherry.method1))

# Call the evaluation function
evaluate_model(model, val_descriptions, val_features, tokenizer, max_length)

BLEU-1: 0.709641
BLEU-2: 0.627896
BLEU-3: 0.574487
BLEU-4: 0.524471


# CIDEr

In [33]:
generated_captions = {}
for img_id, features in val_features.items():
    caption = generate_caption(model, features[0], tokenizer, max_length)
    generated_captions[img_id] = [caption]

In [34]:
!pip install git+https://github.com/salaniz/pycocoevalcap

Collecting git+https://github.com/salaniz/pycocoevalcap
  Cloning https://github.com/salaniz/pycocoevalcap to /tmp/pip-req-build-p_drvink
  Running command git clone --filter=blob:none --quiet https://github.com/salaniz/pycocoevalcap /tmp/pip-req-build-p_drvink
  Resolved https://github.com/salaniz/pycocoevalcap to commit a24f74c408c918f1f4ec34e9514bc8a76ce41ffd
  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pycocoevalcap
  Building wheel for pycocoevalcap (setup.py) ... [?25l[?25hdone
  Created wheel for pycocoevalcap: filename=pycocoevalcap-1.2-py3-none-any.whl size=104312246 sha256=11f89a148df0eead7365edee3bc4360cdb3594c21e957cde041d53433de06947
  Stored in directory: /tmp/pip-ephem-wheel-cache-bqw36rx0/wheels/43/54/73/3e2c6d4ace7657958cde52ac6fd47b342cd4aae5a7aa4fcbf9
Successfully built pycocoevalcap
Installing collected packages: pycocoevalcap
Successfully installed pycocoevalcap-1.2


In [42]:
from pycocoevalcap.cider.cider import Cider

def calculate_cider(refs, hyps):
    cider_scorer = Cider()
    score, scores = cider_scorer.compute_score(refs, hyps)
    return score

refs = {img_id: desc for img_id, desc in val_descriptions.items()}
cands = generated_captions

cider_score = calculate_cider(refs, cands)
print("CIDEr Score: ", cider_score)

CIDEr Score:  2.385882867433643
