<a href="https://colab.research.google.com/github/KaziNazmusSakib/Img_to_Caption_Generate/blob/main/Generate_Img_Caption.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Configuration

In [3]:
import os

class Config():

	root_dataset_dir = 'dataset'

	image_dir = root_dataset_dir + '/Flicker8k_Dataset'
	train_file_path = root_dataset_dir + '/Flickr8k_text/Flickr_8k.trainImages.txt'
	test_file_path =root_dataset_dir + '/Flickr8k_text/Flickr_8k.testImages.txt'
	token_path = root_dataset_dir + '/Flickr8k_text/Flickr8k.bn_token.txt'

	train_features_file_path = root_dataset_dir + '/train_features.pickle'
	test_features_file_path = root_dataset_dir + '/test_features.pickle'

	descriptions_file_path = root_dataset_dir + '/bn_descriptions.txt'
	word2vec_file_path = root_dataset_dir + '/word2vec_bangla.txt'

	checkpoint_dir = root_dataset_dir + '/models'

	embedding_dim = 300
	batch_size = 10
	epochs = 100


Encode image


In [None]:
import os
import glob
from time import time
import numpy as np
from PIL import Image
from pickle import dump, load
from keras.applications.inception_v3 import InceptionV3, preprocess_input
from keras.models import Model
from keras.preprocessing import image

def load_dataset(filename, image_dir):
	# Read the train image names in a set
	image_names = set(open(filename, 'r').read().strip().split('\n'))
	# all image names in the image directory
	image_paths = glob.glob(image_dir + '/*.jpg')
	# Create a list of all the training images with their full path names
	images = []
	for image_path in image_paths: # img is list of full path names of all images
	    if image_path.split('/')[-1] in image_names: # Check if the image belongs to training set
	        images.append(image_path.split('/')[-1]) # Add it to the list of train images
	return images

def preprocess(image_path):
    # Convert all the images to size 299x299 as expected by the inception v3 model
    img = image.load_img(image_path, target_size=(299, 299))
    # Convert PIL image to numpy array of 3-dimensions
    x = image.img_to_array(img)
    # Add one more dimension
    x = np.expand_dims(x, axis=0)
    # preprocess the images using preprocess_input() from inception module
    x = preprocess_input(x)
    return x

# Function to encode a given image into a vector of size (2048, )
def encode(model, image):
    image = preprocess(image) # preprocess the image
    fea_vec = model.predict(image) # Get the encoding vector for the image
    fea_vec = np.reshape(fea_vec, fea_vec.shape[1]) # reshape from (1, 2048) to (2048, )
    return fea_vec

def encode_images_into_pickle(config):
	train_images_file = config.train_file_path
	test_images_file = config.test_file_path
	image_dir = config.image_dir

	train_images = load_dataset(train_images_file, image_dir)
	test_images = load_dataset(test_images_file, image_dir)

	print("Total train images : ", len(train_images))
	print("Total test images : ", len(test_images))

	# Call the funtion to encode all the train images
	# This will take a while on CPU - Execute this only once
	start = time()
	encoding_train = {}
	print("Loading InceptionV3 Model...")
	# Load the inception v3 model
	model = InceptionV3(weights='imagenet')
	# Create a new model, by removing the last layer (output layer) from the inception v3
	model = Model(model.input, model.layers[-2].output)
	print("Train Imge encoding start..")
	for train_image in train_images:
	    encoding_train[train_image] = encode(model, os.path.join(image_dir, train_image))
	# Save the bottleneck train features to disk
	print("Saving encoded...")
	with open(config.train_features_file_path, "wb") as encoded_pickle:
	    dump(encoding_train, encoded_pickle)
	print("Finish encoding. Total time taken (s) = ", time()-start)

	start = time()
	encoding_test = {}
	print("Test Imge encoding start..")
	for test_image in test_images:
	    encoding_test[test_image] = encode(model, os.path.join(image_dir, test_image))
	# Save the bottleneck train features to disk
	print("Saving encoded...")
	with open(config.test_features_file_path, "wb") as encoded_pickle:
	    dump(encoding_test, encoded_pickle)
	print("Finish encoding. Total time taken (s) = ", time()-start)

if __name__ == '__main__':
	from config import Config
	config = Config()
	# encode train and test images and save to disk
	encode_images_into_pickle(config)

Data preprocessing


In [None]:
import os
import string

DEBUG = True
# load doc into memory
def load_doc(filename):
	# open the file as read only
	file = open(filename, 'r')
	# read all text
	text = file.read()
	# close the file
	file.close()
	return text

def load_descriptions(doc):
	mapping = dict()
	# process lines
	for line in doc.split('\n'):
		# split line by white space
		tokens = line.split()
		if len(line) < 2:
			continue
		# take the first token as the image id, the rest as the description
		image_id, image_desc = tokens[0], tokens[1:]
		# extract filename from image id
		image_id = image_id.split('.')[0]
		# convert description tokens back to string
		image_desc = ' '.join(image_desc)
		# create the list if needed
		if image_id not in mapping:
			mapping[image_id] = list()
		# store description
		mapping[image_id].append(image_desc)
	return mapping

def clean_descriptions(descriptions):
	# prepare translation table for removing punctuation
	puc_table = str.maketrans('', '', string.punctuation)
	# prepare translation table for removing english letter
	en_table = str.maketrans('', '', string.ascii_letters)
	table = {**puc_table, **en_table}
	for key, desc_list in descriptions.items():
		for i in range(len(desc_list)):
			desc = desc_list[i]
			desc = desc.replace('।', '')
			# tokenize
			desc = desc.split()
			# remove punctuation and english letter from each token
			desc = [w.translate(table) for w in desc]
			# remove hanging 's' and 'a'
			desc = [word for word in desc if len(word)>1]
			# remove tokens with numbers in them
			desc = [word for word in desc if word.isdigit() is False]
			# store as string
			desc_list[i] =  ' '.join(desc)

# convert the loaded descriptions into a vocabulary of words
def to_vocabulary(descriptions):
	# build a list of all description strings
	all_desc = set()
	for key in descriptions.keys():
		[all_desc.update(d.split()) for d in descriptions[key]]
	return all_desc

# save descriptions to file, one per line
def save_descriptions(descriptions, filename):
	lines = list()
	for key, desc_list in descriptions.items():
		for desc in desc_list:
			lines.append(key + ' ' + desc)
	data = '\n'.join(lines)
	file = open(filename, 'w')
	file.write(data)
	file.close()

def process_dataset(config):
	caption_file_path = config.token_path
	clean_caption_save_path = config.descriptions_file_path
	doc = load_doc(caption_file_path)
	if DEBUG:
		print('Raw descriptions : ', doc[:300])
	# parse descriptions
	descriptions = load_descriptions(doc)
	print('Total Loaded Captions: %d ' % len(descriptions))
	if DEBUG:
		print('Key of some captions : ', list(descriptions.keys())[:5])
		key = '1002674143_1b742ab4b8'
		print('Captions for this key {}:\n{}'.format(key, descriptions[key]))
	# clean descriptions
	clean_descriptions(descriptions)
	if DEBUG:
		print('Captions after cleaning : ', descriptions[key])
	# save captions after cleaning
	save_descriptions(descriptions, clean_caption_save_path)
	print("Clean captions saved to disk : Done")


if __name__ == '__main__':
	from config import Config
	config = Config()
	process_dataset(config)

train.py

In [None]:
import os

import numpy as np
from numpy import array
from pickle import dump, load
from time import time
from keras.layers import LSTM, Embedding, Dense, Dropout
from keras.layers.merge import add
from keras.models import Model
from keras import Input
from keras.preprocessing.text import Tokenizer
from keras.preprocessing.sequence import pad_sequences
from keras.utils import to_categorical

from config import Config
from utils import *
# data generator, intended to be used in a call to model.fit_generator()
def load_word_vector(file_path):
    embeddings_index = {} # empty dictionary
    f = open(file_path, encoding="utf-8")

    for line in f:
        values = line.split()
        word = values[0]
        coefs = np.asarray(values[1:], dtype='float32')
        embeddings_index[word] = coefs
    f.close()
    print('Found %s word vectors.' % len(embeddings_index))
    return embeddings_index

def get_embedding_matrix(word2vec_file_path, vocab_size, wordtoix, embedding_dim=300):
    embeddings_index = load_word_vector(word2vec_file_path)
    # Get 200-dim dense vector for each of the 10000 words in out vocabulary
    embedding_matrix = np.zeros((vocab_size, embedding_dim))

    for word, i in wordtoix.items():
        #if i < max_words:
        embedding_vector = embeddings_index.get(word)
        if embedding_vector is not None:
            # Words not found in the embedding index will be all zeros
            embedding_matrix[i] = embedding_vector
    return embedding_matrix

def load_features(pickle_file_path):
    features = pickle.load(open(pickle_file_path, "rb"))
    print('Photos: in pickle=%d' % len(features))
    return features

def data_generator(descriptions, photos, wordtoix, max_length, num_photos_per_batch):
    X1, X2, y = list(), list(), list()
    n=0
    # loop for ever over images
    while 1:
        for key, desc_list in descriptions.items():
            n+=1
            # retrieve the photo feature
            photo = photos[key+'.jpg']
            for desc in desc_list:
                # encode the sequence
                seq = [wordtoix[word] for word in desc.split(' ') if word in wordtoix]
                # split one sequence into multiple X, y pairs
                for i in range(1, len(seq)):
                    # split into input and output pair
                    in_seq, out_seq = seq[:i], seq[i]
                    # pad input sequence
                    in_seq = pad_sequences([in_seq], maxlen=max_length)[0]
                    # encode output sequence
                    out_seq = to_categorical([out_seq], num_classes=vocab_size)[0]
                    # store
                    X1.append(photo)
                    X2.append(in_seq)
                    y.append(out_seq)
            # yield the batch data
            if n==num_photos_per_batch:
                yield [[array(X1), array(X2)], array(y)]
                X1, X2, y = list(), list(), list()
                n=0

def get_model(vocab_size, max_length, embedding_dim):
    inputs1 = Input(shape=(2048,))
    fe1 = Dropout(0.5)(inputs1)
    fe2 = Dense(256, activation='relu')(fe1)
    inputs2 = Input(shape=(max_length,))
    se1 = Embedding(vocab_size, embedding_dim, mask_zero=True)(inputs2)
    se2 = Dropout(0.5)(se1)
    se3 = LSTM(256)(se2)
    decoder1 = add([fe2, se3])
    decoder2 = Dense(256, activation='relu')(decoder1)
    outputs = Dense(vocab_size, activation='softmax')(decoder2)
    model = Model(inputs=[inputs1, inputs2], outputs=outputs)
    return model

def train():
    config = Config()

    train_descriptions = load_clean_descriptions(config.descriptions_file_path, config.train_file_path)
    print("Total discriptions : ", len(train_descriptions))
    vocab = get_vocab(train_descriptions)
    print("Total vocab : ", len(vocab))
    ixtoword, wordtoix = convert_ixtoword_and_wordtoix(vocab)
    vocab_size = len(ixtoword) + 1
    print("Vocab size : ", vocab_size)
    max_length = get_max_length(train_descriptions)
    print("Max caption length : ", max_length)
    print("Loading word vector...")

    embedding_matrix = get_embedding_matrix(config.word2vec_file_path, vocab_size, wordtoix, config.embedding_dim)
    print("Loading features...")

    train_features = load_features(config.train_features_file_path)
    print("Initializing model...")

    model = get_model(vocab_size, max_length, config.embedding_dim)
    model.layers[2].set_weights([embedding_matrix])
    model.layers[2].trainable = False
    model.compile(loss='categorical_crossentropy', optimizer='adam')

    steps = len(train_descriptions)//config.batch_size

    os.makedirs(config.checkpoint_dir, exist_ok=True)
    print("Model training...")
    for i in range(config.epochs):
        train_generator = data_generator(train_descriptions, train_features, wordtoix, max_length, config.batch_size)
        model.fit_generator(train_generator, epochs=1, steps_per_epoch=steps, verbose=1)
        model.save(os.path.join(config.checkpoint_dir, 'model_' + str(i) + '.h5'))
        if i == 50:
            model.optimizer.lr = 0.0001

if __name__ == '__main__':
    train()

utils.py


In [None]:
import pickle
# load doc into memory
def load_doc(filename):
    # open the file as read only
    file = open(filename, 'r')
    # read all text
    text = file.read()
    # close the file
    file.close()
    return text

# load a pre-defined list of photo identifiers
def load_set(filename):
    doc = load_doc(filename)
    dataset = list()
    # process line by line
    for line in doc.split('\n'):
        # skip empty lines
        if len(line) < 1:
            continue
        # get the image identifier
        identifier = line.split('.')[0]
        dataset.append(identifier)
    return set(dataset)

# load clean descriptions into memory
def load_clean_descriptions(des_file_path, image_list_path):
    # load document
    doc = load_doc(des_file_path)
    image_name_list = load_set(image_list_path)
    descriptions = dict()
    for line in doc.split('\n'):
        # split line by white space
        tokens = line.split()
        # split id from description
        image_id, image_desc = tokens[0], tokens[1:]
        # skip images not in the set
        if image_id in image_name_list:
            # create list
            if image_id not in descriptions:
                descriptions[image_id] = list()
            # wrap description in tokens
            desc = 'startseq ' + ' '.join(image_desc) + ' endseq'
            # store
            descriptions[image_id].append(desc)
    return descriptions

def get_captions(descriptions):
        # Create a list of all the training captions
    all_captions = []
    for key, val in descriptions.items():
        for cap in val:
            all_captions.append(cap)
    return all_captions

def get_vocab(descriptions, word_count_threshold=10):
    # Consider only words which occur at least 10 times in the corpus
    all_captions = get_captions(descriptions)
    word_counts = {}
    nsents = 0
    for sent in all_captions:
        nsents += 1
        for w in sent.split(' '):
            word_counts[w] = word_counts.get(w, 0) + 1
    vocab = [w for w in word_counts if word_counts[w] >= word_count_threshold]
    print('preprocessed words %d -> %d' % (len(word_counts), len(vocab)))
    return vocab

def convert_ixtoword_and_wordtoix(vocab):
    ixtoword = {}
    wordtoix = {}

    ix = 1
    for w in vocab:
        wordtoix[w] = ix
        ixtoword[ix] = w
        ix += 1
    return ixtoword, wordtoix
# convert a dictionary of clean descriptions to a list of descriptions
def to_lines(descriptions):
    all_desc = list()
    for key in descriptions.keys():
        [all_desc.append(d) for d in descriptions[key]]
    return all_desc

# calculate the length of the description with the most words
def get_max_length(descriptions):
    lines = to_lines(descriptions)
    return max(len(d.split()) for d in lines)

if __name__ == '__main__':
    from config import Config
    config = Config()
    train_descriptions = load_clean_descriptions(config.descriptions_file_path, config.train_file_path)
    print("Total discriptions : ", len(train_descriptions))
    vocab = get_vocab(train_descriptions)
    print("Total vocab : ", len(vocab))
    ixtoword, wordtoix = convert_ixtoword_and_wordtoix(vocab)
    vocab_size = len(ixtoword)
    print("Vocab size : ", vocab_size)
    max_length = get_max_length(train_descriptions)
    print("Max caption length : ", max_length)