In [3]:
import string
import numpy as np
from PIL import Image
import os
from pickle import dump, load
import numpy as np

from keras.applications.xception import Xception, preprocess_input
from keras.preprocessing.image import load_img, img_to_array
from tensorflow.keras.preprocessing.text import Tokenizer
from keras.preprocessing.sequence import pad_sequences
from keras.utils import to_categorical
from tensorflow.keras.layers import Add
from keras.models import Model, load_model
from keras.layers import Input, Dense, LSTM, Embedding, Dropout

# small library for seeing the progress of loops.
from tqdm import tqdm_notebook as tqdm
tqdm().pandas()

Please use `tqdm.notebook.tqdm` instead of `tqdm.tqdm_notebook`
  tqdm().pandas()


0it [00:00, ?it/s]

In [4]:
# Loading a text file into memory
def load_doc(filename):
    # Opening the file as read only
    file = open(filename, 'r')
    text = file.read()
    file.close()
    return text

In [5]:
# get all imgs with their captions
def all_img_captions(filename):
    file = load_doc(filename)
    captions = file.split('\n')
    descriptions ={}
    for caption in captions[:-1]:
        img, caption = caption.split('\t')
        if img[:-2] not in descriptions:
            descriptions[img[:-2]] = [caption]
        else:
            descriptions[img[:-2]].append(caption)
    return descriptions

In [6]:
##Data cleaning- lower casing, removing puntuations and words containing numbers
def cleaning_text(captions):
    table = str.maketrans('','',string.punctuation)
    for img,caps in captions.items():
        for i,img_caption in enumerate(caps):

            img_caption.replace("-"," ")
            desc = img_caption.split()

            #converts to lower case
            desc = [word.lower() for word in desc]
            #remove punctuation from each token
            desc = [word.translate(table) for word in desc]
            #remove hanging 's and a 
            desc = [word for word in desc if(len(word)>1)]
            #remove tokens with numbers in them
            desc = [word for word in desc if(word.isalpha())]
            #convert back to string

            img_caption = ' '.join(desc)
            captions[img][i]= img_caption
    return captions

In [7]:
def text_vocabulary(descriptions):
    # build vocabulary of all unique words
    vocab = set()
    
    for key in descriptions.keys():
        [vocab.update(d.split()) for d in descriptions[key]]
    
    return vocab

In [8]:
#All descriptions in one file 
def save_descriptions(descriptions, filename):
    lines = list()
    for key, desc_list in descriptions.items():
        for desc in desc_list:
            lines.append(key + '\t' + desc )
    data = "\n".join(lines)
    file = open(filename,"w")
    file.write(data)
    file.close()

In [9]:
# all_train_captions = []
# for key, val in descriptions.items():
#     for cap in val:
#         all_train_captions.append(cap)

# # Consider only words which occur at least 8 times in the corpus
# word_count_threshold = 8
# word_counts = {}
# nsents = 0
# for sent in all_train_captions:
#     nsents += 1
#     for w in sent.split(' '):
#         word_counts[w] = word_counts.get(w, 0) + 1

# vocab = [w for w in word_counts if word_counts[w] >= word_count_threshold]

# print('preprocessed words %d ' % len(vocab))


In [10]:
dataset_text = "C:\\Users\\Dell\\Downloads\\python-project-image-caption-generator\\Flickr_Data\\Flickr_TextData\\Flickr8k.token.txt"
dataset_images = "C:\\Users\\Dell\\Downloads\\python-project-image-caption-generator\\Flickr_Data\\Images"

In [11]:
#we prepare our text data
filename = dataset_text 
#loading the file that contains all data
#mapping them into descriptions dictionary img to 5 captions
descriptions = all_img_captions(filename)
print("Length of descriptions =" ,len(descriptions))

#cleaning the descriptions
clean_descriptions = cleaning_text(descriptions)

#building vocabulary 
vocabulary = text_vocabulary(clean_descriptions)
print("Length of vocabulary = ", len(vocabulary))

#saving each description to file 
save_descriptions(clean_descriptions, "descriptions.txt")

Length of descriptions = 8092
Length of vocabulary =  8763


In [12]:

def extract_features(directory):
        model = Xception( include_top=False, pooling='avg' )
        features = {}
        for img in tqdm(os.listdir(directory)):
            filename = directory + "/" + img
            image = Image.open(filename)
            image = image.resize((299,299))
            image = np.expand_dims(image, axis=0)
            #image = preprocess_input(image)
            image = image/127.5
            image = image - 1.0
            
            feature = model.predict(image)
            features[img] = feature
        return features

In [13]:
#2048 feature vector
features = extract_features(dataset_images)
dump(features, open("features.p","wb"))

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/xception/xception_weights_tf_dim_ordering_tf_kernels_notop.h5
[1m83683744/83683744[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m17s[0m 0us/step


Please use `tqdm.notebook.tqdm` instead of `tqdm.tqdm_notebook`
  for img in tqdm(os.listdir(directory)):


  0%|          | 0/8091 [00:00<?, ?it/s]

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 2s/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 228ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 307ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 265ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 248ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 249ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 267ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 265ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 225ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 233ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 239ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 243ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 247ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1

In [14]:
features = load(open("features.p","rb"))


In [15]:
#load the data 
def load_photos(filename):
    file = load_doc(filename)
    photos = file.split("\n")[:-1]
    return photos


def load_clean_descriptions(filename, photos):   
    #loading clean_descriptions
    file = load_doc(filename)
    descriptions = {}
    for line in file.split("\n"):
        
        words = line.split()
        if len(words)<1 :
            continue
    
        image, image_caption = words[0], words[1:]
        
        if image in photos:
            if image not in descriptions:
                descriptions[image] = []
            desc = '<start> ' + " ".join(image_caption) + ' <end>'
            descriptions[image].append(desc)

    return descriptions


def load_features(photos):
    #loading all features
    all_features = load(open("features.p","rb"))
    #selecting only needed features
    features = {k:all_features[k] for k in photos}
    return features


In [17]:
filename = "C:\\Users\\Dell\\Downloads\\python-project-image-caption-generator\\Flickr_Data\\Flickr_TextData\\Flickr_8k.trainImages.txt"

#train = loading_data(filename)
train_imgs = load_photos(filename)
train_descriptions = load_clean_descriptions("descriptions.txt", train_imgs)
train_features = load_features(train_imgs)

In [19]:
#converting dictionary to clean list of descriptions
def dict_to_list(descriptions):
    all_desc = []
    for key in descriptions.keys():
        [all_desc.append(d) for d in descriptions[key]]
    return all_desc

#creating tokenizer class 
#this will vectorise text corpus
#each integer will represent token in dictionary 

from tensorflow.keras.preprocessing.text import Tokenizer


def create_tokenizer(descriptions):
    desc_list = dict_to_list(descriptions)
    tokenizer = Tokenizer()
    tokenizer.fit_on_texts(desc_list)
    return tokenizer


In [20]:
# give each word a index, and store that into tokenizer.p pickle file
tokenizer = create_tokenizer(train_descriptions)
dump(tokenizer, open('tokenizer.p', 'wb'))
vocab_size = len(tokenizer.word_index) + 1
vocab_size 

7577

In [21]:
#calculate maximum length of descriptions
def max_length(descriptions):
    desc_list = dict_to_list(descriptions)
    return max(len(d.split()) for d in desc_list)

max_length = max_length(descriptions)
max_length

32

In [22]:
features['1000268201_693b08cb0e.jpg'][0]

array([0.4734095 , 0.01730902, 0.0733423 , ..., 0.08557954, 0.02102293,
       0.23765539], dtype=float32)

In [23]:
# Define the model

#1 Photo feature extractor - we extracted features from pretrained model Xception. 
#2 Sequence processor - word embedding layer that handles text, followed by LSTM 
#3 Decoder - Both 1 and 2 model produce fixed length vector. They are merged together and processed by dense layer to make final prediction

In [79]:
import tensorflow as tf
import numpy as np
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.utils import to_categorical 
def data_generator(descriptions, features, tokenizer, max_length, vocab_size):
    for key, desc in descriptions.items():
        # Retrieve image features
        image_features = features[key]
        image_features = tf.reshape(image_features, (2048,))  # Ensure the shape is correct
        
        # Process description into a sequence of tokens
        seq = tokenizer.texts_to_sequences([desc])
        seq = seq[0]
        
        # Pad the sequence to a fixed length
        seq = pad_sequence([seq], maxlen=max_length, padding='post')[0]
        
        # Create the one-hot encoded output for each word in the sequence
        output = np.zeros((max_length, vocab_size), dtype=np.float32)
        for i, word_id in enumerate(seq):
            if word_id != 0:
                output[i, word_id] = 1
        
        # Yield both image features and sequence
        yield (image_features, seq), output


def create_sequences(tokenizer, max_length, desc_list, feature):
    X1, X2, y = list(), list(), list()
    # Walk through each description for the image
    for desc in desc_list:
        # Encode the sequence
        seq = tokenizer.texts_to_sequences([desc])[0]
        # Split one sequence into multiple X, y pairs
        for i in range(1, len(seq)):
            # Split into input and output pair
            in_seq, out_seq = seq[:i], seq[i]
            # Pad input sequence
            in_seq = pad_sequences([in_seq], maxlen=max_length)[0]
            # Encode output sequence
            out_seq = to_categorical([out_seq], num_classes=vocab_size)[0]
            # Store
            X1.append(feature)
            X2.append(in_seq)
            y.append(out_seq)
    return np.array(X1), np.array(X2), np.array(y)
 
output_signature = (
    (
        tf.TensorSpec(shape=(2048,), dtype=tf.float32),  # Image feature tensor
        tf.TensorSpec(shape=(max_length,), dtype=tf.int32)  # Input sequence tensor
    ),
    tf.TensorSpec(shape=(max_length, vocab_size), dtype=tf.float32)  # Output word tensor
)

train_dataset = tf.data.Dataset.from_generator(
    lambda: data_generator(train_descriptions, train_features, tokenizer, max_length),
    output_signature=output_signature
)

# Set batch size and prefetch for better performance
train_dataset = train_dataset.batch(32).prefetch(tf.data.AUTOTUNE)


In [80]:
from keras.utils import plot_model

# define the captioning model
def define_model(vocab_size, max_length):

    # features from the CNN model squeezed from 2048 to 256 nodes
    inputs1 = Input(shape=(2048,))
    fe1 = Dropout(0.5)(inputs1)
    fe2 = Dense(256, activation='relu')(fe1)

    # LSTM sequence model
    inputs2 = Input(shape=(max_length,))
    se1 = Embedding(vocab_size, 256, mask_zero=True)(inputs2)
    se2 = Dropout(0.5)(se1)
    se3 = LSTM(256)(se2)

    # Merging both models
    decoder1 = Add()([fe2, se3])
    decoder2 = Dense(256, activation='relu')(decoder1)
    outputs = Dense(vocab_size, activation='softmax')(decoder2)

    # tie it together [image, seq] [word]
    model = Model(inputs=[inputs1, inputs2], outputs=outputs)
    model.compile(loss='categorical_crossentropy', optimizer='adam')

    # summarize model
    print(model.summary())
    plot_model(model, to_file='model.png', show_shapes=True)

    return model

In [85]:
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Dense, LSTM, Embedding, Add

# Inputs
image_input = Input(shape=(2048,))  # Image features (2048-dim vector)
sequence_input = Input(shape=(32,))  # Sequence input (32 tokens in a sentence)

# Image feature processing (e.g., a dense layer)
image_features = Dense(256, activation='relu')(image_input)

# Sequence input processing (Embedding layer + LSTM for sequence generation)
embedding = Embedding(input_dim=vocab_size, output_dim=256)(sequence_input)
sequence_lstm = LSTM(256, return_sequences=True)(embedding)

# Combine image features and sequence features
merged = Add()([image_features, sequence_lstm])

# Output layer with softmax to get probabilities for each token in the sequence
output = Dense(vocab_size, activation='softmax')(merged)

# Final model
model = Model(inputs=[image_input, sequence_input], outputs=output)




In [86]:
# train our model
print('Dataset: ', len(train_imgs))
print('Descriptions: train=', len(train_descriptions))
print('Photos: train=', len(train_features))
print('Vocabulary Size:', vocab_size)
print('Description Length: ', max_length)

model = define_model(vocab_size, max_length)
epochs = 10
steps = len(train_descriptions)
# making a directory models to save our models
if not os.path.exists('models'):
    os.mkdir('models')

# Now, use the generator in the model.fit function
for i in range(epochs): 
    model.fit(train_dataset, epochs=1, steps_per_epoch=steps, verbose=1)
    model.save("models/model_" + str(i) + ".h5")
 

Dataset:  6000
Descriptions: train= 6000
Photos: train= 6000
Vocabulary Size: 7577
Description Length:  32


None
You must install pydot (`pip install pydot`) for `plot_model` to work.


ValueError: Arguments `target` and `output` must have the same rank (ndim). Received: target.shape=(None, 32, 7577), output.shape=(None, 7577)

In [88]:
import os
import numpy as np
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Embedding, LSTM, Dense, Dropout, Add
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.preprocessing.text import Tokenizer

# Define the data generator
def data_generator(descriptions, features, tokenizer, max_length, vocab_size, batch_size=64):
    # Yield batches of data
    while True:
        batch_images = []
        batch_sequences = []
        batch_outputs = []
        
        for i, (key, desc) in enumerate(descriptions.items()):
            # Prepare the image features
            image_feature = features[key].reshape((2048,))  # Ensure shape is (2048,)
            batch_images.append(image_feature)
            
            # Prepare the sequence (tokenized description)
            seq = tokenizer.texts_to_sequences([desc])[0]
            seq = pad_sequences([seq], maxlen=max_length, padding='post')[0]
            
            # Create the one-hot encoded target for each word in the sequence
            output = np.zeros((max_length, vocab_size), dtype=np.float32)
            for j, word_id in enumerate(seq):
                if word_id != 0:
                    output[j, word_id] = 1
            
            # Prepare the batch
            batch_sequences.append(seq)
            batch_outputs.append(output)
            
            if len(batch_images) == batch_size:
                yield (np.array(batch_images), np.array(batch_sequences)), np.array(batch_outputs)
                batch_images, batch_sequences, batch_outputs = [], [], []

# Define the model architecture
def define_model(vocab_size, max_length):
    # Image feature input
    image_input = Input(shape=(2048,))
    image_features = Dropout(0.5)(image_input)
    image_features = Dense(256, activation='relu')(image_features)

    # Sequence input
    sequence_input = Input(shape=(max_length,))
    embedding = Embedding(vocab_size, 256)(sequence_input)
    sequence_features = Dropout(0.5)(embedding)
    sequence_features = LSTM(256)(sequence_features)

    # Combine image features and sequence features
    merged = Add()([image_features, sequence_features])
    merged = Dense(256, activation='relu')(merged)

    # Output layer
    output = Dense(vocab_size, activation='softmax')(merged)

    # Define the model
    model = Model(inputs=[image_input, sequence_input], outputs=output)
    
    return model

# Assuming these variables are defined earlier
# train_descriptions, train_features, vocab_size, max_length, tokenizer, etc.

# Print information about dataset
print('Dataset: ', len(train_imgs))
print('Descriptions: train=', len(train_descriptions))
print('Photos: train=', len(train_features))
print('Vocabulary Size:', vocab_size)
print('Description Length: ', max_length)

# Define the model
model = define_model(vocab_size, max_length)

# Compile the model
model.compile(optimizer=Adam(), loss='categorical_crossentropy')

# Number of epochs
epochs = 10
steps = len(train_descriptions) // 64  # assuming batch_size=64

# Making a directory to save the models if it doesn't exist
if not os.path.exists('models'):
    os.mkdir('models')

# Train the model
for i in range(epochs): 
    model.fit(data_generator(train_descriptions, train_features, tokenizer, max_length, vocab_size),
              epochs=1, 
              steps_per_epoch=steps, 
              verbose=1)
    model.save("models/model_" + str(i) + ".h5")


Dataset:  6000
Descriptions: train= 6000
Photos: train= 6000
Vocabulary Size: 7577
Description Length:  32


ValueError: Arguments `target` and `output` must have the same rank (ndim). Received: target.shape=(None, 32, 7577), output.shape=(None, 7577)