In [None]:
import numpy as np
from PIL import Image
import os
import string
from pickle import dump
from pickle import load
from keras.applications.xception import Xception #to get pre-trained model Xception
from keras.applications.xception import preprocess_input
from keras.preprocessing.image import load_img
from keras.preprocessing.image import img_to_array
from keras.preprocessing.text import Tokenizer #for text tokenization
from keras.preprocessing.sequence import pad_sequences
from keras.utils import to_categorical
from tensorflow.keras.layers import Add

from keras.models import Model, load_model
from keras.layers import Input, Dense#Keras to build our CNN and LSTM
from keras.layers import LSTM, Embedding, Dropout
from tqdm import tqdm_notebook as tqdm #to check loop progress
tqdm().pandas()

In [1]:
import string

# Load the document file into memory
def load_fp(filename):
    # Open file to read
    file = open(filename, 'r')
    text = file.read()
    file.close()
    return text

# Get all images with their captions
def img_capt(filename):
    file = load_fp(filename)
    captions = file.split('\n')
    descriptions = {}
    for caption in captions[:-1]:
        img, caption = caption.split('\t')
        if img[:-2] not in descriptions:
            descriptions[img[:-2]] = [caption]
        else:
            descriptions[img[:-2]].append(caption)
    return descriptions

# Data cleaning function will convert all upper case alphabets to lowercase, removing punctuations and words containing numbers
def txt_clean(captions):
    table = str.maketrans('', '', string.punctuation)
    for img, caps in captions.items():
        for i, img_caption in enumerate(caps):
            img_caption.replace("-", " ")
            desc = img_caption.split()
            # Uppercase to lowercase
            desc = [wrd.lower() for wrd in desc]
            # Remove punctuation from each token
            desc = [wrd.translate(table) for wrd in desc]
            # Remove hanging 's and a
            desc = [wrd for wrd in desc if (len(wrd) > 1)]
            # Remove words containing numbers with them
            desc = [wrd for wrd in desc if (wrd.isalpha())]
            # Converting back to string
            img_caption = ' '.join(desc)
            captions[img][i] = img_caption
    return captions

def txt_vocab(descriptions):
    # To build vocab of all unique words
    vocab = set()
    for key in descriptions.keys():
        [vocab.update(d.split()) for d in descriptions[key]]
    return vocab

# To save all descriptions in one file
def save_descriptions(descriptions, filename):
    lines = list()
    for key, desc_list in descriptions.items():
        for desc in desc_list:
            lines.append(key + '\t' + desc )
    data = "\n".join(lines)
    file = open(filename, "w")
    file.write(data)
    file.close()

# Set these paths according to the project folder in your system
dataset_text = "C:\\Users\\king\\Downloads\\Flickr8k_text"
dataset_images = "C:\\Users\\king\\Downloads\\Flickr8k_Dataset\\Flicker8k_Dataset"

# To prepare our text data
filename = dataset_text + "\\" + "Flickr8k.token.txt"

# Loading the file that contains all data and mapping them into descriptions dictionary 
descriptions = img_capt(filename)
print("Length of descriptions =", len(descriptions))

# Cleaning the descriptions
clean_descriptions = txt_clean(descriptions)

# To build vocabulary
vocabulary = txt_vocab(clean_descriptions)
print("Length of vocabulary =", len(vocabulary))

# Saving all descriptions in one file
save_descriptions(clean_descriptions, "descriptions.txt")


a


In [None]:
def extract_features(directory):
    model = Xception(include_top=False, pooling='avg')
    features = {}
    for pic in tqdm(os.listdir(directory)):
        file = directory + "/" + pic
        image = Image.open(file)
        image = image.resize((299, 299))
        image = np.expand_dims(image, axis=0)
        # image = preprocess_input(image)
        image = image / 127.5
        image = image - 1.0
        feature = model.predict(image)
        features[pic] = feature
    return features

# 2048 feature vector
features = extract_features(dataset_images)
dump(features, open("features.p", "wb"))

# To directly load the features from the pickle file.
features = load(open("features.p", "rb"))


In [None]:
# Function to load text document
def load_doc(filename):
    # Open file to read
    file = open(filename, 'r')
    text = file.read()
    file.close()
    return text

# Load the data
def load_photos(filename):
    with open(filename, 'r') as file:
        photos = file.read().strip().split("\n")
    return photos

def load_clean_descriptions(filename, photos):
    descriptions = {}
    with open(filename, 'r') as file:
        for line in file:
            words = line.split()
            if len(words) < 2:
                continue
            image, image_caption = words[0], ' '.join(words[1:])
            if image in photos:
                if image not in descriptions:
                    descriptions[image] = []
                descriptions[image].append(image_caption)
    return descriptions

def load_features(photos):
    # Loading all features
    all_features = load(open("features.p", "rb"))
    # Selecting only needed features
    features = {k: all_features[k] for k in photos}
    return features

filename = "C:\\Users\\king\\Downloads\\Flickr8k_text\\Flickr_8k.trainImages.txt"
# train = loading_data(filename)
train_imgs = load_photos(filename)
train_descriptions = load_clean_descriptions("descriptions.txt", train_imgs)
train_features = load_features(train_imgs)


In [None]:
# Convert dictionary to clear list of descriptions
def dict_to_list(descriptions):
    all_desc = []
    for key in descriptions.keys():
        [all_desc.append(d) for d in descriptions[key]]
    return all_desc

# Creating tokenizer class
# This will vectorize text corpus
# Each integer will represent a token in the dictionary
from keras.preprocessing.text import Tokenizer

def create_tokenizer(descriptions):
    desc_list = dict_to_list(descriptions)
    tokenizer = Tokenizer()
    tokenizer.fit_on_texts(desc_list)
    return tokenizer

# Give each word an index, and store that into tokenizer.p pickle file
tokenizer = create_tokenizer(train_descriptions)
dump(tokenizer, open('tokenizer.p', 'wb'))

vocab_size = len(tokenizer.word_index) + 1
vocab_size  # The size of our vocabulary is 7577 words.

# Calculate maximum length of descriptions to decide the model structure parameters.
def max_length(descriptions):
    desc_list = dict_to_list(descriptions)
    return max(len(d.split()) for d in desc_list)

max_length = max_length(descriptions)
max_length  # Max length of description is 32


In [None]:
def data_generator(descriptions, features, tokenizer, max_length, max_iterations=None):
    iterations = 0
    while max_iterations is None or iterations < max_iterations:
        for key, description_list in descriptions.items():
            # Retrieve photo features
            feature = features[key][0]
            inp_image, inp_seq, op_word = create_sequences(tokenizer, max_length, description_list, feature)
            yield [[inp_image, inp_seq], op_word]
        
def create_sequences(tokenizer, max_length, desc_list, feature):
    x_1, x_2, y = list(), list(), list()
    # Move through each description for the image
    for desc in desc_list:
        # Encode the sequence
        seq = tokenizer.texts_to_sequences([desc])[0]
        # Divide one sequence into various X,y pairs
        for i in range(1, len(seq)):
            # Divide into input and output pair
            in_seq, out_seq = seq[:i], seq[i]
            # Pad input sequence
            in_seq = pad_sequences([in_seq], maxlen=max_length)[0]
            # Encode output sequence
            out_seq = to_categorical([out_seq], num_classes=vocab_size)[0]
            # Store
            x_1.append(feature)
            x_2.append(in_seq)
            y.append(out_seq)
    return np.array(x_1), np.array(x_2), np.array(y)


In [None]:
from keras.models import Model
from keras.layers import Input, Dense, LSTM, Embedding, Dropout, add
from keras.metrics import categorical_accuracy


def define_model(vocab_size, max_length):
    # Features from the CNN model compressed from 2048 to 256 nodes
    inputs1 = Input(shape=(2048,))
    fe1 = Dropout(0.5)(inputs1)
    fe2 = Dense(256, activation='relu')(fe1)

    # LSTM sequence model
    inputs2 = Input(shape=(max_length,))
    se1 = Embedding(vocab_size, 256, mask_zero=True)(inputs2)
    se2 = Dropout(0.5)(se1)
    se3 = LSTM(512, return_sequences=True)(se2)  # Increased units and added return_sequences=True
    se4 = LSTM(512)(se3)  # Additional LSTM layer
    se5 = Dense(256, activation='relu')(se4)
    # Merging both models
    decoder1 = add([fe2, se5])  # Connected to the output of the additional LSTM layer
    decoder2 = Dense(512, activation='relu')(decoder1)  # Increased units
    outputs = Dense(vocab_size, activation='softmax')(decoder2)

    # Merge it [image, seq] [word]
    model = Model(inputs=[inputs1, inputs2], outputs=outputs)
    model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=[accuracy])

    # Summarize model
    print(model.summary())
    plot_model(model, to_file='model.png', show_shapes=True)

    return model


In [None]:
# Load the validation data
filename_val = dataset_text + "/" + "Flickr_8k.devImages.txt"
val_imgs = load_photos(filename_val)
val_descriptions = load_clean_descriptions("descriptions.txt", val_imgs)
val_features = load_features(val_imgs)

# Validation data generator
val_steps = len(val_descriptions)
val_generator = data_generator(val_descriptions, val_features, tokenizer, max_length)


In [None]:
import os
from keras.callbacks import ModelCheckpoint, CSVLogger, ReduceLROnPlateau, Callback

# Custom callback to print loss and accuracy after each epoch
class LossAccuracyLogger(Callback):
    def on_epoch_end(self, epoch, logs=None):
        print(f"Epoch {epoch + 1}/{epochs} - Loss: {logs['loss']:.4f} - Accuracy: {logs['accuracy']:.4f}")

# Print dataset information
print('Dataset:', len(train_imgs))
print('Descriptions (train):', len(train_descriptions))
print('Photos (train):', len(train_features))
print('Vocabulary Size:', vocab_size)
print('Description Length:', max_length)

# Create the model
model = define_model(vocab_size, max_length)

# Number of epochs
epochs = 10

# Steps per epoch
steps = len(train_descriptions)


# Define callbacks
checkpoint = ModelCheckpoint("models/model6_{epoch}.h5", monitor='loss', verbose=1, save_best_only=True, mode='min')
csv_logger = CSVLogger('training.log')
reduce_lr = ReduceLROnPlateau(monitor='loss', factor=0.2, patience=3, min_lr=0.0001)
loss_accuracy_logger = LossAccuracyLogger()  # Custom callback
train_generator = data_generator(train_descriptions, train_features, tokenizer, max_length, max_iterations=max_iterations)
# Train the model for the specified number of epochs
model.fit_generator(train_generator, epochs=epochs, steps_per_epoch=steps,
                    callbacks=[checkpoint, csv_logger, reduce_lr, loss_accuracy_logger], verbose=1)


In [None]:
from keras.callbacks import Callback, CSVLogger, ReduceLROnPlateau
from keras.preprocessing.sequence import pad_sequences
from nltk.translate.bleu_score import corpus_bleu
from tqdm import tqdm

def word_for_id(integer, tokenizer):
    for word, index in tokenizer.word_index.items():
        if index == integer:
            if index == tokenizer.word_index['end']:  # Check if it's the index of the end token
                return 'end'
            return word
    return None

class LossAccuracyBLEULogger(Callback):
    def __init__(self, val_descriptions, val_features, tokenizer, max_length, epochs):
        super().__init__()
        self.val_descriptions = val_descriptions
        self.val_features = val_features
        self.tokenizer = tokenizer
        self.max_length = max_length
        self.epochs = epochs
        self.best_bleu_score = None

    def on_epoch_end(self, epoch, logs=None):
        # Print loss and accuracy
        print(f"Epoch {epoch + 1}/{self.epochs} - Loss: {logs['loss']:.4f} - Accuracy: {logs['accuracy']:.4f}")
        
        # Evaluate BLEU score on validation set
        bleu_score, predicted_captions = self.evaluate_bleu()
        print(f"BLEU Score: {bleu_score:.4f}")

        # Print the first 50 predicted captions
        print("First 50 Predicted Captions:")
        for i, caption in enumerate(predicted_captions[:50]):
            print(f"{i+1}. {caption}")

        # Stop training if BLEU score does not improve
        if self.best_bleu_score is None or bleu_score > self.best_bleu_score:
            self.best_bleu_score = bleu_score
        else:
            self.model.stop_training = True

    def evaluate_bleu(self):
        actual, predicted = [], []
        predicted_captions = []  # to store the first 50 predicted captions
        desc_items = list(self.val_descriptions.items())
        for key, desc_list in tqdm(desc_items, desc="Calculating BLEU Score"):
            image_feature = self.val_features[key][0].reshape((1, 2048))
            generated_caption = self.generate_caption(image_feature)
            actual.append([desc.split() for desc in desc_list])
            predicted.append(generated_caption.split())
            predicted_captions.append(generated_caption)
        bleu_score = corpus_bleu(actual, predicted)
        return bleu_score, predicted_captions

    def generate_caption(self, photo_feature):
        in_text = 'start'
        for _ in range(self.max_length):
            sequence = self.tokenizer.texts_to_sequences([in_text])[0]
            sequence = pad_sequences([sequence], maxlen=self.max_length)
            yhat = self.model.predict([photo_feature, sequence], verbose=0)
            yhat = np.argmax(yhat)
            word = word_for_id(yhat, self.tokenizer)
            if word is None:
             break
            in_text+=' ' + word
            if word =='end':
             break
            
        return in_text

# Print dataset information
print('Dataset:', len(val_imgs))
print('Descriptions (val):', len(val_descriptions))
print('Photos (val):', len(val_features))
print('Vocabulary Size:', vocab_size)
print('Description Length:', max_length)

# Create the model
model = define_model(vocab_size, max_length)

# Number of epochs
epochs = 10

# Steps per epoch
steps = len(val_descriptions)

# Define callbacks
csv_logger = CSVLogger('val.log')
reduce_lr = ReduceLROnPlateau(monitor='loss', factor=0.2, patience=3, min_lr=0.0001)
loss_accuracy_bleu_logger = LossAccuracyBLEULogger(val_descriptions, val_features, tokenizer, max_length, epochs)
val_generator = data_generator(val_descriptions, val_features, tokenizer, max_length)

# Train the model for the specified number of epochs
model.fit_generator(val_generator, epochs=epochs, steps_per_epoch=steps,
                    validation_data=val_generator, validation_steps=val_steps,
                    callbacks=[csv_logger, reduce_lr, loss_accuracy_bleu_logger], verbose=1)


In [None]:
# Load the test data
filename_test = dataset_text + "/" + "Flickr_8k.testImages.txt"
test_imgs = load_photos(filename_test)
test_descriptions = load_clean_descriptions("descriptions.txt", test_imgs)
test_features = load_features(test_imgs)

# Test data generator
test_steps = len(test_descriptions)
test_generator = data_generator(test_descriptions, test_features, tokenizer, max_length)


In [None]:
import os
import numpy as np
from keras.models import load_model
from keras.preprocessing.sequence import pad_sequences
from nltk.translate.bleu_score import corpus_bleu
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
import pickle
from tqdm import tqdm

# Function to display image with caption
def display_image_with_caption(image_path, caption):
    img = mpimg.imread(image_path)
    plt.imshow(img)
    plt.title(caption)
    plt.axis('off')
    plt.show()

# Function to load tokenizer
def load_tokenizer(filename):
    with open(filename, 'rb') as f:
        tokenizer = pickle.load(f)
    return tokenizer

# Function to generate a description for an image
def generate_description(model, tokenizer, photo_feature, max_length):
    generated_description = 'start'
    for _ in range(max_length):
        sequence = tokenizer.texts_to_sequences([generated_description])[0]
        sequence = pad_sequences([sequence], maxlen=max_length)
        yhat = model.predict([photo_feature, sequence], verbose=0)
        yhat = np.argmax(yhat)
        word = word_for_id(yhat, tokenizer)
        
        # Stopping condition 1: If the predicted word is None, break the loop
        if word is None:
            break
        
        # Stopping condition 2: If the predicted word is 'endseq', break the loop
        if word == 'end':
            break
        
        # Check grammar before adding the word to the description
        generated_description += ' ' + word
    
    return generated_description

# Load the test data
filename_test = dataset_text + "/" + "Flickr_8k.testImages.txt"
test_imgs = load_photos(filename_test)
test_descriptions = load_clean_descriptions("descriptions.txt", test_imgs)
test_features = load_features(test_imgs)

# Generate captions for test dataset
test_actual_captions = []
test_predicted_captions = []
for key in tqdm(test_imgs, desc="Generating Captions"):
    photo_feature = test_features[key].reshape((1, 2048))
    actual_caption = ' '.join(test_descriptions[key])
    predicted_caption = generate_description(model, tokenizer, photo_feature, max_length)
    test_actual_captions.append(actual_caption)
    test_predicted_captions.append(predicted_caption)

# Calculate BLEU score
actual = [[desc.split()] for desc in test_actual_captions]
predicted = [desc.split() for desc in test_predicted_captions]
best_bleu_score = 0
for _ in range(epochs):  # Run for a fixed number of epochs
    bleu_score = corpus_bleu(actual, predicted)
    if bleu_score > best_bleu_score:
        best_bleu_score = bleu_score
        best_predicted_captions = test_predicted_captions.copy()  # Keep track of best predictions
    # Train model for another epoch here

# Display images with the best predicted captions
print("Examples with the best predicted captions:")
for i, actual_caption in enumerate(test_actual_captions):
    image_path = 'C:\\Users\\king\\Downloads\\Flickr8k_Dataset\\Flicker8k_Dataset\\' + test_imgs[i]   # Adjust the path to your images directory
    predicted_caption = best_predicted_captions[i]
    display_image_with_caption(image_path, f"Actual: {actual_caption}\nPredicted: {predicted_caption}")


In [None]:
import numpy as np
from PIL import Image
import matplotlib.pyplot as plt
from keras.preprocessing.sequence import pad_sequences
from keras.models import load_model
from keras.applications.xception import Xception
from keras.applications.xception import preprocess_input
from keras.preprocessing.image import img_to_array, load_img
from pickle import load

# Assign the image path directly
img_path = "C:\\Users\\king\\Downloads\\xx.jpg"

def extract_features(filename, model):
    try:
        image = load_img(filename, target_size=(299, 299))
    except:
        print("ERROR: Can't open image! Ensure that image path and extension is correct")
    
    image = img_to_array(image)
    image = np.expand_dims(image, axis=0)
    image = preprocess_input(image)
    
    feature = model.predict(image)
    return feature

import language_tool_python

# Create a Grammar Checker instance
grammar_checker = language_tool_python.LanguageTool('en-US')

def generate_desc(model, tokenizer, photo_feature, max_length):
    generated_description = 'start'
    used_words = set()  # Initialize a set to store used words
    for _ in range(max_length):
        sequence = tokenizer.texts_to_sequences([generated_description])[0]
        sequence = pad_sequences([sequence], maxlen=max_length)
        yhat = model.predict([photo_feature, sequence], verbose=0)
        yhat = np.argmax(yhat)
        word = word_for_id(yhat, tokenizer)
        
        # Stopping condition 1: If the predicted word is None, break the loop
        if word is None:
            break
        
        # Stopping condition 2: If the predicted word is 'endseq', break the loop
        if word == 'end':
            break
        
        # Check if the word has already been used in the description
        if word in used_words:
            continue  # Skip the word and predict the next one
        
        # Add the word to the set of used words
        used_words.add(word)
        
        # Check grammar before adding the word to the description
        corrected_word = grammar_checker.correct(word)
        
        generated_description += ' ' + corrected_word
    
    return generated_description

max_length = 32


xception_model = Xception(include_top=False, pooling="avg")
photo = extract_features(img_path, xception_model)
img = Image.open(img_path)
description = generate_desc(model, tokenizer, photo, max_length)
print("\nGenerated Description:\n", description)
plt.imshow(img)
plt.axis('off')
plt.show()
