In [1]:
# Importing required libraries
import string
import numpy as np
from PIL import Image
import os
from pickle import dump, load
import time
import tensorflow as tf
import matplotlib.pyplot as plt

In [2]:
# Keras components for image preprocessing, model creation and layers
from keras.applications.xception import Xception, preprocess_input
from tensorflow.keras.utils import load_img, img_to_array
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from keras.utils import to_categorical, get_file
from keras.layers import add
from keras.models import Model, load_model
from keras.layers import Input, Dense, LSTM, Embedding, Dropout

In [3]:
# tqdm for progress bars in loops
from tqdm.notebook import tqdm
tqdm.pandas()

In [4]:
# Load text file into memory
def load_doc(filename):
    file = open(filename, 'r')
    text = file.read()
    file.close()
    return text

In [5]:
# Map image filenames to their corresponding captions
def all_img_captions(filename):
    file = load_doc(filename)
    captions = file.split('\n')
    descriptions = {}
    for caption in captions[:-1]:
        img, caption = caption.split('\t')
        if img[:-2] not in descriptions:
            descriptions[img[:-2]] = [caption]
        else:
            descriptions[img[:-2]].append(caption)
    return descriptions

In [6]:
# Clean the text captions: lowercase, remove punctuation, numbers etc.
def cleaning_text(captions):
    table = str.maketrans('', '', string.punctuation)
    for img, caps in captions.items():
        for i, img_caption in enumerate(caps):
            img_caption.replace("-", " ")
            desc = img_caption.split()
            desc = [word.lower() for word in desc]
            desc = [word.translate(table) for word in desc]
            desc = [word for word in desc if len(word) > 1]
            desc = [word for word in desc if word.isalpha()]
            img_caption = ' '.join(desc)
            captions[img][i] = img_caption
    return captions

In [7]:
# Build vocabulary of all unique words
def text_vocabulary(descriptions):
    vocab = set()
    for key in descriptions.keys():
        [vocab.update(d.split()) for d in descriptions[key]]
    return vocab

In [8]:
# Save descriptions to file for later use
def save_descriptions(descriptions, filename):
    lines = list()
    for key, desc_list in descriptions.items():
        for desc in desc_list:
            lines.append(key + '\t' + desc)
    data = "\n".join(lines)
    file = open(filename, "w")
    file.write(data)
    file.close()

In [9]:
# Set dataset paths
dataset_text = "D://ClassAIML//DLWithPython//ClassNB//Projects//ClassProj2//Flickr8k_text"
dataset_images = "D://ClassAIML//DLWithPython//ClassNB//Projects//ClassProj2//Flicker8k_Dataset"

In [10]:
# # Prepare our text data #
# filename = dataset_text + "/" + "Flickr8k.token.txt"
# # loading the file that contains all data
# # mapping them into descriptions dictionary img to 5 captions
# descriptions = all_img_captions(filename)
# print("Length of descriptions =" ,len(descriptions))

In [11]:
# # cleaning the descriptions
# clean_descriptions = cleaning_text(descriptions)

# # #building vocabulary 
# vocabulary = text_vocabulary(clean_descriptions)
# print("Length of vocabulary = ", len(vocabulary))

# # #saving each description to file 
# save_descriptions(clean_descriptions, "descriptions.txt")

In [12]:
# def download_with_retry(url, filename, max_retries=3):
#      for attempt in range(max_retries):
#          try:
#              return get_file(filename, url)
#          except Exception as e:
#              if attempt == max_retries - 1:
#                  raise e
#              print(f"Download attempt {attempt + 1} failed. Retrying in 5 seconds...")
#              time.sleep(5)

In [13]:
# # # Replace the Xception model initialization with:
# weights_url = "https://storage.googleapis.com/tensorflow/keras-applications/xception/xception_weights_tf_dim_ordering_tf_kernels_notop.h5"
# weights_path = download_with_retry(weights_url, 'xception_weights.h5')
# model = Xception(include_top=False, pooling='avg', weights=weights_path)

In [14]:
# def extract_features(directory):
#      features = {}
#      valid_images = ['.jpg', '.jpeg', '.png']
    
#      for img in tqdm(os.listdir(directory)):
# #        Skip files that don't end with valid image extensions
#          ext = os.path.splitext(img)[1].lower()
#          if ext not in valid_images:
#              continue
            
#          filename = directory + "/" + img
#          image = Image.open(filename)
#          image = image.resize((299,299))
#          image = np.expand_dims(image, axis=0)
#          image = image/127.5
#          image = image - 1.0

#          feature = model.predict(image)
#          features[img] = feature
#      return features

In [15]:
# # # 2048 feature vector
# features = extract_features(dataset_images)
# dump(features, open("features.p","wb"))

In [16]:
# Load features previously saved using Xception model
features = load(open("features.p", "rb"))

In [17]:
# Load the list of photo filenames
def load_photos(filename):
    file = load_doc(filename)
    photos = file.split("\n")[:-1]
    photos_present = [photo for photo in photos if os.path.exists(os.path.join(dataset_images, photo))]
    return photos_present

In [18]:
# Load cleaned captions only for selected photos
def load_clean_descriptions(filename, photos): 
    file = load_doc(filename)
    descriptions = {}
    for line in file.split("\n"):
        words = line.split()
        if len(words) < 1:
            continue
        image, image_caption = words[0], words[1:]
        if image in photos:
            if image not in descriptions:
                descriptions[image] = []
            desc = '<start> ' + " ".join(image_caption) + ' <end>'
            descriptions[image].append(desc)
    return descriptions

In [19]:
# Select features only for those photos used in training
def load_features(photos):
    all_features = load(open("features.p", "rb"))
    features = {k: all_features[k] for k in photos}
    return features

In [20]:
# Load training image file names and corresponding descriptions & features
filename = dataset_text + "/" + "Flickr_8k.trainImages.txt"
train_imgs = load_photos(filename)
train_descriptions = load_clean_descriptions("descriptions.txt", train_imgs)
train_features = load_features(train_imgs)

In [21]:
# Convert descriptions dictionary into a flat list
def dict_to_list(descriptions):
    all_desc = []
    for key in descriptions.keys():
        [all_desc.append(d) for d in descriptions[key]]
    return all_desc

In [22]:
# Tokenize all the training captions
def create_tokenizer(descriptions):
    desc_list = dict_to_list(descriptions)
    tokenizer = Tokenizer()
    tokenizer.fit_on_texts(desc_list)
    return tokenizer

In [23]:
# Create and save tokenizer
tokenizer = create_tokenizer(train_descriptions)
dump(tokenizer, open('tokenizer.p', 'wb'))
vocab_size = len(tokenizer.word_index) + 1
print(vocab_size)

7577


In [24]:
# Compute the maximum length of a caption
def max_length(descriptions):
    desc_list = dict_to_list(descriptions)
    return max(len(d.split()) for d in desc_list)

In [25]:
max_length = max_length(train_descriptions)
print(max_length)

34


In [26]:
# Generate data for training: input features, input sequences, output words
def data_generator(descriptions, features, tokenizer, max_length):
    def generator():
        while True:
            for key, description_list in descriptions.items():
                feature = features[key][0]
                input_image, input_sequence, output_word = create_sequences(tokenizer, max_length, description_list, feature)
                for i in range(len(input_image)):
                    yield {'input_1': input_image[i], 'input_2': input_sequence[i]}, output_word[i]

    output_signature = (
        {
            'input_1': tf.TensorSpec(shape=(2048,), dtype=tf.float32),
            'input_2': tf.TensorSpec(shape=(max_length,), dtype=tf.int32)
        },
        tf.TensorSpec(shape=(vocab_size,), dtype=tf.float32)
    )

    dataset = tf.data.Dataset.from_generator(generator, output_signature=output_signature)
    return dataset.batch(32)

In [27]:
# Convert each caption to multiple input-output sequence pairs
def create_sequences(tokenizer, max_length, desc_list, feature):
    X1, X2, y = list(), list(), list()
    for desc in desc_list:
        seq = tokenizer.texts_to_sequences([desc])[0]
        for i in range(1, len(seq)):
            in_seq, out_seq = seq[:i], seq[i]
            in_seq = pad_sequences([in_seq], maxlen=max_length)[0]
            out_seq = to_categorical([out_seq], num_classes=vocab_size)[0]
            X1.append(feature)
            X2.append(in_seq)
            y.append(out_seq)
    return np.array(X1), np.array(X2), np.array(y)

In [28]:
# Debug: Print shape of one batch
dataset = data_generator(train_descriptions, features, tokenizer, max_length)
for (a, b) in dataset.take(1):
    print(a['input_1'].shape, a['input_2'].shape, b.shape)
    break

(32, 2048) (32, 34) (32, 7577)


In [29]:
from keras.utils import plot_model

In [30]:
# Define the image captioning model architecture
def define_model(vocab_size, max_length):
    # Image feature extractor model input
    inputs1 = Input(shape=(2048,), name='input_1')
    fe1 = Dropout(0.5)(inputs1)
    fe2 = Dense(256, activation='relu')(fe1)

    # Sequence model input
    inputs2 = Input(shape=(max_length,), name='input_2')
    se1 = Embedding(vocab_size, 256, mask_zero=True)(inputs2)
    se2 = Dropout(0.5)(se1)
    se3 = LSTM(256)(se2)

    # Decoder (combining both models)
    decoder1 = add([fe2, se3])
    decoder2 = Dense(256, activation='relu')(decoder1)
    outputs = Dense(vocab_size, activation='softmax')(decoder2)

    # Define final model
    model = Model(inputs=[inputs1, inputs2], outputs=outputs)
    model.compile(loss='categorical_crossentropy', optimizer='adam')

    print(model.summary())
    plot_model(model, to_file='model.png', show_shapes=True)

    return model

In [31]:
# Print dataset summary
print('Dataset: ', len(train_imgs))
print('Descriptions: train=', len(train_descriptions))
print('Photos: train=', len(train_features))
print('Vocabulary Size:', vocab_size)
print('Description Length: ', max_length)

Dataset:  6000
Descriptions: train= 6000
Photos: train= 6000
Vocabulary Size: 7577
Description Length:  34


In [32]:
# Build the model
model = define_model(vocab_size, max_length)
epochs = 10

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_2 (InputLayer)           [(None, 34)]         0           []                               
                                                                                                  
 input_1 (InputLayer)           [(None, 2048)]       0           []                               
                                                                                                  
 embedding (Embedding)          (None, 34, 256)      1939712     ['input_2[0][0]']                
                                                                                                  
 dropout (Dropout)              (None, 2048)         0           ['input_1[0][0]']                
                                                                                              

In [33]:
# # Estimate number of steps per epoch
# def get_steps_per_epoch(train_descriptions):
#     total_sequences = 0
#     for img_captions in train_descriptions.values():
#         for caption in img_captions:
#             words = caption.split()
#             total_sequences += len(words) - 1
#     return max(1, total_sequences // 32)

# steps = get_steps_per_epoch(train_descriptions)

In [34]:
# Train model and save after every epoch
os.mkdir("avbmodels")
for i in range(epochs):
    dataset = data_generator(train_descriptions, train_features, tokenizer, max_length)
    model.fit(dataset, epochs=15, steps_per_epoch=5, verbose=1)
    model.save("avbmodels/model_" + str(i) + ".h5")

Epoch 1/15
Epoch 2/15
Epoch 3/15
Epoch 4/15
Epoch 5/15
Epoch 6/15
Epoch 7/15
Epoch 8/15
Epoch 9/15
Epoch 10/15
Epoch 11/15
Epoch 12/15
Epoch 13/15
Epoch 14/15
Epoch 15/15
Epoch 1/15
Epoch 2/15
Epoch 3/15
Epoch 4/15
Epoch 5/15
Epoch 6/15
Epoch 7/15
Epoch 8/15
Epoch 9/15
Epoch 10/15
Epoch 11/15
Epoch 12/15
Epoch 13/15
Epoch 14/15
Epoch 15/15
Epoch 1/15
Epoch 2/15
Epoch 3/15
Epoch 4/15
Epoch 5/15
Epoch 6/15
Epoch 7/15
Epoch 8/15
Epoch 9/15
Epoch 10/15
Epoch 11/15
Epoch 12/15
Epoch 13/15
Epoch 14/15
Epoch 15/15
Epoch 1/15
Epoch 2/15
Epoch 3/15
Epoch 4/15
Epoch 5/15
Epoch 6/15
Epoch 7/15
Epoch 8/15
Epoch 9/15
Epoch 10/15
Epoch 11/15
Epoch 12/15
Epoch 13/15
Epoch 14/15
Epoch 15/15
Epoch 1/15
Epoch 2/15
Epoch 3/15
Epoch 4/15
Epoch 5/15
Epoch 6/15
Epoch 7/15
Epoch 8/15
Epoch 9/15
Epoch 10/15
Epoch 11/15
Epoch 12/15
Epoch 13/15
Epoch 14/15
Epoch 15/15
Epoch 1/15
Epoch 2/15
Epoch 3/15
Epoch 4/15
Epoch 5/15
Epoch 6/15
Epoch 7/15
Epoch 8/15
Epoch 9/15
Epoch 10/15
Epoch 11/15
Epoch 12/15
Epoch 13/1

Epoch 15/15
Epoch 1/15
Epoch 2/15
Epoch 3/15
Epoch 4/15
Epoch 5/15
Epoch 6/15
Epoch 7/15
Epoch 8/15
Epoch 9/15
Epoch 10/15
Epoch 11/15
Epoch 12/15
Epoch 13/15
Epoch 14/15
Epoch 15/15
Epoch 1/15
Epoch 2/15
Epoch 3/15
Epoch 4/15
Epoch 5/15
Epoch 6/15
Epoch 7/15
Epoch 8/15
Epoch 9/15
Epoch 10/15
Epoch 11/15
Epoch 12/15
Epoch 13/15
Epoch 14/15
Epoch 15/15
Epoch 1/15
Epoch 2/15
Epoch 3/15
Epoch 4/15
Epoch 5/15
Epoch 6/15
Epoch 7/15
Epoch 8/15
Epoch 9/15
Epoch 10/15
Epoch 11/15
Epoch 12/15
Epoch 13/15
Epoch 14/15
Epoch 15/15
