## Import Necessary Packages

In [None]:
import string
import numpy as np
from PIL import Image
import os
from pickle import dump, load
import numpy as np

from keras.applications.xception import Xception, preprocess_input
from keras.preprocessing.image import load_img, img_to_array
from keras.preprocessing.text import Tokenizer
from keras.preprocessing.sequence import pad_sequences
from keras.utils import to_categorical
#from keras.layers.merge import add
from keras.layers import Input, Dense, LSTM, Embedding, Dropout, add
from keras.models import Model, load_model

import nltk
nltk.download('punkt')
from nltk.translate.bleu_score import corpus_bleu, SmoothingFunction
from nltk.stem import PorterStemmer
from nltk.tokenize import word_tokenize
from tensorflow.keras.callbacks import Callback

from tqdm.notebook import tqdm
tqdm.pandas()

## Data Cleaning Functions

In [None]:
# Read Image Captions from files in Flickr8k_text & organize into a dictionary
def all_img_captions(filename):
    with open(filename, 'r') as f:
        file = f.read()
    captions = file.split('\n')
    descriptions = {}
    for caption in captions[:-1]:
        img, caption = caption.split('\t')
        if img[:-2] not in descriptions:
            descriptions[img[:-2]] = [caption]
        else:
            descriptions[img[:-2]].append(caption)
    return descriptions

In [None]:
# Caption cleaning - lower casing, removing puntuations and words containing numbers
def cleaning_text(captions):
    table = str.maketrans('','',string.punctuation)
    for img,caps in captions.items():
        for i,img_caption in enumerate(caps):

            img_caption.replace("-"," ")
            desc = img_caption.split()

            #converts to lower case
            desc = [word.lower() for word in desc]
            #remove punctuation from each token
            desc = [word.translate(table) for word in desc]
            #remove hanging 's and a
            desc = [word for word in desc if(len(word)>1)]
            #remove tokens with numbers in them
            desc = [word for word in desc if(word.isalpha())]
            #convert back to string

            img_caption = ' '.join(desc)
            captions[img][i]= img_caption
    return captions


# Caption Cleaning - keep unique word from the 5 captions
def text_vocabulary(descriptions):
    # build vocabulary of all unique words
    vocab = set()

    for key in descriptions.keys():
        [vocab.update(d.split()) for d in descriptions[key]]

    return vocab


# All descriptions in one file
def save_descriptions(descriptions, filename):
    lines = list()
    for key, desc_list in descriptions.items():
        for desc in desc_list:
            lines.append(key + '\t' + desc )
    data = "\n".join(lines)
    file = open(filename,"w")
    file.write(data)
    file.close()

In [None]:
# Process images from the Flicker8k_Dataset
# extract features using Xception, ResNet50, and VGG16
from keras.applications.resnet50 import ResNet50, preprocess_input

def Xception_extract_features(directory):
        model = Xception( include_top=False, pooling='avg' )
        features = {}
        for img in tqdm(os.listdir(directory)):
            filename = directory + "/" + img
            image = Image.open(filename)
            image = image.resize((299,299))
            image = np.expand_dims(image, axis=0)
            #image = preprocess_input(image)
            image = image/127.5
            image = image - 1.0

            feature = model.predict(image)
            features[img] = feature
        return features

def ResNet50_extract_features(directory):
        model = ResNet50(include_top=False, pooling='avg', weights='imagenet')
        features = {}
        for img in tqdm(os.listdir(directory)):
          filename = os.path.join(directory, img)
          # Open and resize image to 224x224 (required input size for ResNet50)
          image = Image.open(filename).resize((224, 224))
          # Convert the image to a numpy array and add a batch dimension
          image = np.expand_dims(np.array(image), axis=0)
          # Preprocess the image using the appropriate preprocess_input function for ResNet50
          image = preprocess_input(image)
          # Predict features with the model
          feature = model.predict(image)
          # Store the feature vector in a dictionary with the image filename as the key
          features[img] = feature.flatten()
        return features

from keras.applications.vgg16 import VGG16, preprocess_input

def VGG16_extract_features(directory):
    # Load VGG16 model, exclude the top fully connected layers, and use average pooling
    model = VGG16(include_top=False, pooling='avg', weights='imagenet')
    features = {}
    # Iterate through all images in the specified directory
    for img in tqdm(os.listdir(directory)):
        filename = os.path.join(directory, img)
        # Open and resize image to 224x224 (required input size for VGG16)
        image = Image.open(filename).resize((224, 224))
        image = np.expand_dims(np.array(image), axis=0)
        image = preprocess_input(image)
        feature = model.predict(image)
        features[img] = feature.flatten()

    return features

## Prepare Text Data & Image Data

In [None]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

Mounted at /content/drive


In [None]:
#Preparing our text data
#Shared folder need to add a shorcut to MyDrive
dataset_text = "/content/drive/MyDrive/DL_Project_2024/Flickr8k_text"
dataset_images = "/content/drive/MyDrive/DL_Project_2024/Flicker8k_Dataset"

filename = dataset_text + "/" + "Flickr8k.token.txt"
#loading the file that contains all data
#mapping them into descriptions dictionary img to 5 captions
descriptions = all_img_captions(filename)
print("Length of descriptions =" ,len(descriptions))

#cleaning the descriptions
clean_descriptions = cleaning_text(descriptions)

#building vocabulary
vocabulary = text_vocabulary(clean_descriptions)
print("Length of vocabulary = ", len(vocabulary))

#saving each description to file
save_descriptions(clean_descriptions, "/content/drive/MyDrive/DL_Project_2024/temp/descriptions.txt")

Length of descriptions = 8092
Length of vocabulary =  8763


In [None]:
#Preparing our Image data
#2048 feature vector
features = Xception_extract_features(dataset_images)
dump(features, open("/content/drive/MyDrive/DL_Project_2024/temp/features.p","wb"))

## Load Processed Data

In [None]:
features = load(open("/content/drive/MyDrive/DL_Project_2024/temp/features.p","rb"))

In [None]:
#load the data
def load_photos(filename):
    with open(filename, 'r') as f:
        file = f.read()
    photos = file.split("\n")[:-1]
    return photos

#loading clean_descriptions
def load_clean_descriptions(filename, photos):
    with open(filename, 'r') as f:
        file = f.read()
    descriptions = {}
    for line in file.split("\n"):

        words = line.split()
        if len(words) < 1:
            continue

        image, image_caption = words[0], words[1:]

        if image in photos:
            if image not in descriptions:
                descriptions[image] = []
            desc = '<start> ' + " ".join(image_caption) + ' <end>'
            descriptions[image].append(desc)

    return descriptions

def load_features(photos):
    #loading all features
    all_features = load(open("/content/drive/MyDrive/DL_Project_2024/temp/features.p","rb"))
    #selecting only needed features
    features = {k:all_features[k] for k in photos}
    return features


In [None]:
filename = dataset_text + "/" + "Flickr_8k.trainImages.txt"

#train = loading_data(filename)
train_imgs = load_photos(filename)
train_descriptions = load_clean_descriptions("/content/drive/MyDrive/DL_Project_2024/temp/descriptions.txt", train_imgs)
train_features = load_features(train_imgs)

In [None]:
all_imgs = train_imgs + load_photos(dataset_text + "/" + "Flickr_8k.testImages.txt")
all_descriptions = load_clean_descriptions("/content/drive/MyDrive/DL_Project_2024/temp/descriptions.txt", all_imgs)

In [None]:
#converting dictionary to clean list of descriptions
def dict_to_list(descriptions):
    all_desc = []
    for key in descriptions.keys():
        [all_desc.append(d) for d in descriptions[key]]
    return all_desc

#creating tokenizer class
#this will vectorise text corpus
#each integer will represent token in dictionary

from keras.preprocessing.text import Tokenizer

def create_tokenizer(descriptions):
    desc_list = dict_to_list(descriptions)
    tokenizer = Tokenizer()
    tokenizer.fit_on_texts(desc_list)
    return tokenizer

In [None]:
# Preparing text data for training deep learning models

# give each word a index, and store that into tokenizer.p pickle file
tokenizer = create_tokenizer(train_descriptions)
dump(tokenizer, open('/content/drive/MyDrive/DL_Project_2024/temp/tokenizer.p', 'wb'))
vocab_size = len(tokenizer.word_index) + 1
vocab_size

7577

In [None]:
#calculate maximum length of descriptions
def max_length(descriptions):
    desc_list = dict_to_list(descriptions)
    return max(len(d.split()) for d in desc_list)

max_length = max_length(descriptions)
max_length
# This will be the input shape

32

In [None]:
features['1000268201_693b08cb0e.jpg'][0]

array([0.4734095 , 0.01730889, 0.07334236, ..., 0.08557957, 0.02102294,
       0.2376553 ], dtype=float32)

## Define Our Model

In [None]:
# Define the model

#1 Photo feature extractor - we extracted features from pretrained model Xception.
#2 Sequence processor - word embedding layer that handles text, followed by LSTM
#3 Decoder - Both 1 and 2 model produce fixed length vector. They are merged together and processed by dense layer to make final prediction

In [None]:
#create input-output sequence pairs from the image description.

#data generator, used by model.fit_generator()
def data_generator(descriptions, features, tokenizer, max_length):
    while 1:
        for key, description_list in descriptions.items():
            #retrieve photo features
            feature = features[key][0]
            input_image, input_sequence, output_word = create_sequences(tokenizer, max_length, description_list, feature)
            yield [[input_image, input_sequence], output_word]

def create_sequences(tokenizer, max_length, desc_list, feature):
    X1, X2, y = list(), list(), list()
    # walk through each description for the image
    for desc in desc_list:
        # encode the sequence
        seq = tokenizer.texts_to_sequences([desc])[0]
        # split one sequence into multiple X,y pairs
        for i in range(1, len(seq)):
            # split into input and output pair
            in_seq, out_seq = seq[:i], seq[i]
            # pad input sequence
            in_seq = pad_sequences([in_seq], maxlen=max_length)[0]
            # encode output sequence
            out_seq = to_categorical([out_seq], num_classes=vocab_size)[0]
            # store
            X1.append(feature)
            X2.append(in_seq)
            y.append(out_seq)
    return np.array(X1), np.array(X2), np.array(y)

In [None]:
[a,b],c = next(data_generator(train_descriptions, features, tokenizer, max_length))
a.shape, b.shape, c.shape

((47, 2048), (47, 32), (47, 7577))

In [None]:
from keras.utils import plot_model

# define the captioning model
def define_model(vocab_size, max_length):

    # features from the CNN model squeezed from 2048 to 256 nodes
    inputs1 = Input(shape=(2048,))
    fe1 = Dropout(0.5)(inputs1)
    fe2 = Dense(256, activation='relu')(fe1)

    # LSTM sequence model
    inputs2 = Input(shape=(max_length,))
    se1 = Embedding(vocab_size, 256, mask_zero=True)(inputs2)
    se2 = Dropout(0.5)(se1)
    se3 = LSTM(256)(se2)

    # Merging both models
    decoder1 = add([fe2, se3])
    decoder2 = Dense(256, activation='relu')(decoder1)
    outputs = Dense(vocab_size, activation='softmax')(decoder2)

    # tie it together [image, seq] [word]
    model = Model(inputs=[inputs1, inputs2], outputs=outputs)
    model.compile(loss='categorical_crossentropy', optimizer='adam')

    # summarize model
    print(model.summary())
    plot_model(model, to_file='/content/drive/MyDrive/DL_Project_2024/temp/model.png', show_shapes=True)

    return model


## Training

In [None]:
# train our model
print('Dataset: ', len(train_imgs))
print('Descriptions: train=', len(train_descriptions))
print('Photos: train=', len(train_features))
print('Vocabulary Size:', vocab_size)
print('Description Length: ', max_length)

model = define_model(vocab_size, max_length)
epochs = 10
steps = len(train_descriptions)
# # making a directory models to save our models
# os.mkdir("/content/drive/MyDrive/DL_Project_2024/temp/models")
for i in range(epochs):
    generator = data_generator(train_descriptions, train_features, tokenizer, max_length)
    model.fit(generator, epochs=1, steps_per_epoch= steps, verbose=1)
    model.save("/content/drive/MyDrive/DL_Project_2024/temp/models/model_" + str(i) + ".h5")

Dataset:  6000
Descriptions: train= 6000
Photos: train= 6000
Vocabulary Size: 7577
Description Length:  32
Model: "model"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_2 (InputLayer)        [(None, 32)]                 0         []                            
                                                                                                  
 input_1 (InputLayer)        [(None, 2048)]               0         []                            
                                                                                                  
 embedding (Embedding)       (None, 32, 256)              1939712   ['input_2[0][0]']             
                                                                                                  
 dropout (Dropout)           (None, 2048)                 0         ['input_1[0][0]'] 

  saving_api.save_model(


1003/6000 [====>.........................] - ETA: 7:23 - loss: 3.0513

In [None]:
from keras.models import load_model
import os

# Load the previously trained model
model = load_model('/content/drive/MyDrive/DL_Project_2024/temp/models/model_12.h5')

# Compile the model with accuracy metric
model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

# Set the number of epochs for additional training
epochs = 8
steps = len(train_descriptions)

# Train the model further
for i in range(epochs):
    generator = data_generator(train_descriptions, train_features, tokenizer, max_length)
    history = model.fit(generator, epochs=1, steps_per_epoch=steps, verbose=1)
    print(f"Epoch {i+1}/{epochs}, Accuracy: {history.history['accuracy'][-1]}")
    # Save the model after each epoch
    model.save("/content/drive/MyDrive/DL_Project_2024/temp/models/model_continued_" + str(i) + ".h5")


Epoch 1/8, Accuracy: 0.3611343204975128
Epoch 2/8, Accuracy: 0.363608181476593
Epoch 3/8, Accuracy: 0.3669893443584442
Epoch 4/8, Accuracy: 0.367609441280365
Epoch 5/8, Accuracy: 0.36972755193710327
Epoch 6/8, Accuracy: 0.3719729483127594
Epoch 7/8, Accuracy: 0.37351667881011963
Epoch 8/8, Accuracy: 0.37462306022644043


## Training with BLEU score

In [None]:
class BLEUScoreCallback(Callback):
    def __init__(self, features, descriptions, tokenizer, max_length):
        self.features = features
        self.descriptions = descriptions
        self.tokenizer = tokenizer
        self.max_length = max_length
        self.stemmer = PorterStemmer()

    def on_epoch_end(self, epoch, logs=None):
        actual, predicted = [], []
        for key, desc_list in self.descriptions.items():
            image_feature = self.features[key][0]
            generated_caption = generate_caption(self.model, image_feature, self.tokenizer, self.max_length)
            # Stemming the generated caption
            stemmed_generated = ' '.join([self.stemmer.stem(word) for word in word_tokenize(generated_caption)])

            # Stemming the reference captions
            stemmed_references = []
            for ref in desc_list:
                stemmed_ref = [self.stemmer.stem(word) for word in word_tokenize(ref)]
                stemmed_references.append(stemmed_ref)
            actual.append(stemmed_references)
            predicted.append(stemmed_generated.split())

        # compute the BLEU score
        bleu_score = corpus_bleu(actual, predicted, smoothing_function=SmoothingFunction().method1)
        print(f'Epoch {epoch + 1}: BLEU Score = {bleu_score:.4f}')
        if logs is not None:
            logs['val_bleu'] = bleu_score


In [None]:
print('Dataset: ', len(train_imgs))
print('Descriptions: train=', len(train_descriptions))
print('Photos: train=', len(train_features))
print('Vocabulary Size:', vocab_size)
print('Description Length: ', max_length)

# Initialize the model
model = define_model(vocab_size, max_length)

# BLEU score callback initialization
bleu_callback = BLEUScoreCallback(train_features, train_descriptions, tokenizer, max_length)

# Fit the model
epochs = 10
steps = len(train_descriptions)
generator = data_generator(train_descriptions, train_features, tokenizer, max_length)

# Train with BLEU score evaluation
model.fit(generator, epochs=epochs, steps_per_epoch=steps, verbose=1, callbacks=[bleu_callback])

## Image Captions Demo

In [None]:
import numpy as np
from PIL import Image
import matplotlib.pyplot as plt
from keras.models import load_model
from keras.applications.xception import Xception
from keras.preprocessing.sequence import pad_sequences
from pickle import load
import os
import random

def extract_features(filename, model):
    try:
        image = Image.open(filename)
    except:
        print("ERROR: Couldn't open image! Make sure the image path and extension is correct")
        return None
    image = image.resize((299, 299))
    image = np.array(image)
    if image.shape[-1] == 4:  # for images with 4 channels, convert to 3 channels
        image = image[..., :3]
    image = np.expand_dims(image, axis=0)
    image = image / 127.5 - 1.0
    feature = model.predict(image)
    return feature

def word_for_id(integer, tokenizer):
    for word, index in tokenizer.word_index.items():
        if index == integer:
            return word
    return None

def generate_desc(model, tokenizer, photo, max_length):
    in_text = 'start'
    for i in range(max_length):
        sequence = tokenizer.texts_to_sequences([in_text])[0]
        sequence = pad_sequences([sequence], maxlen=max_length)
        pred = model.predict([photo, sequence], verbose=0)
        pred = np.argmax(pred)
        word = word_for_id(pred, tokenizer)
        if word is None:
            break
        in_text += ' ' + word
        if word == 'end':
            break
    words = in_text.split()
    in_text = ' '.join(words[1:-1])
    return in_text

# Directory containing images
image_dir = '/content/drive/MyDrive/DL_Project_2024/Flicker8k_Dataset'

ps = PorterStemmer()

for _ in range(10):
    # Randomly choose an image
    # image_files = [f for f in os.listdir(image_dir) if os.path.isfile(os.path.join(image_dir, f))]
    random_image = random.choice(all_imgs)
    img_path = os.path.join(image_dir, random_image)
    print(random_image)
    # img_path = os.path.join(image_dir, '19212715_20476497a3.jpg')
    # img_path = os.path.join(image_dir, '12830823_87d2654e31.jpg')

    # Load necessary models and tokenizer
    max_length = 32
    tokenizer = load(open("/content/drive/MyDrive/DL_Project_2024/temp/tokenizer.p", "rb"))
    model = load_model('/content/drive/MyDrive/DL_Project_2024/temp/models/model_20.h5')
    xception_model = Xception(include_top=False, pooling="avg")

    # Extract features and generate description
    photo = extract_features(img_path, xception_model)
    if photo is not None:
        img = Image.open(img_path)

        # true caption
        reference_captions = all_descriptions[random_image]
        true_caption_stems = []
        for reference_caption in reference_captions:
            true_caption_tokens = word_tokenize(reference_caption)
            true_caption_stem = [ps.stem(word) for word in true_caption_tokens]
            true_caption_stem = ' '.join(true_caption_stem)
            true_caption_stems.append(true_caption_stem)
        print("True Caption: ", reference_captions)

        # generated caption
        description = generate_desc(model, tokenizer, photo, max_length)
        generated = word_tokenize(description)
        generated_stem = [ps.stem(word) for word in generated]
        generated_stem = ' '.join(generated_stem)
        print("Generated Caption: ", ' '.join(generated))

        # Compute BLEU score
        chencherry = SmoothingFunction()
        bleu_score = corpus_bleu([true_caption_stems], [generated_stem], smoothing_function=chencherry.method1)
        print("Score: ", bleu_score)

        plt.imshow(img)
        plt.show()
    else:
        print("Failed to extract features.")

Output hidden; open in https://colab.research.google.com to view.

---