In [1]:
import string
import numpy as np
import os, time
from PIL import Image
from pickle import dump, load
import tensorflow as tf
import matplotlib.pyplot as plt
from keras.applications.xception import Xception, preprocess_input
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from keras.utils import to_categorical, get_file
from keras.layers import add, Input, Dense, LSTM, Embedding, Dropout
from keras.models import Model, load_model
from keras.utils import plot_model

# small library for seeing the progress of loops.
from tqdm import tqdm_notebook as tqdm
tqdm().pandas()

Please use `tqdm.notebook.tqdm` instead of `tqdm.tqdm_notebook`
  tqdm().pandas()


0it [00:00, ?it/s]

In [2]:
#Load Files
def load_file(filename):
    file = open(filename, 'r')
    text = file.read()
    file.close()
    return text

In [3]:
#Load all images with their Captions
def all_img_captions(filename):
    file = load_file(filename)
    captions = file.split('\n')
    descriptions = {}
    for caption in captions[:-1]:
        img, caption = caption.split('\t')
        if img[:-2] not in descriptions:
            descriptions[img[:-2]] = [ caption ]
        else:
            descriptions[img[:-2]].append(caption)
    return descriptions
        

In [4]:
#Data cleaning- lower casing, removing puntuations and words containing numbers
def cleaning_text(captions):
    table = str.maketrans('', '', string.punctuation)
    for img, caps in captions.items():
        for i, img_caption in enumerate(caps):
            img_caption = img_caption.replace("-","")
            desc = img_caption.split()
            #converts to lowercase
            desc = [word.lower() for word in desc]
            #remove punctuation from each token
            desc = [word.translate(table) for word in desc]
            #remove hanging 's and a 
            desc = [word for word in desc if(len(word)>1)]
            #remove tokens with numbers in them
            desc = [word for word in desc if(word.isalpha())]
            #convert back to string

            img_caption = ' '.join(desc)
            captions[img][i] = img_caption
    return captions

In [5]:
# build vocabulary of all unique words
def text_vocabolary(descriptions):
    vocab = set()
    for key in descriptions.keys():
        [vocab.update(d.split()) for d in descriptions[key]]
    return vocab

In [6]:
#Save all descriptions in one file 
def save_descriptions(descriptions, filename):
    lines = list()
    for key, desc_list in descriptions.items():
        lines.append(key + '\t' + ' '.join(desc_list))
    data = '\n'.join(lines)
    file = open(filename, 'w')
    file.write(data)
    file.close()

In [7]:
# Set these path according to project folder in you system
dataset_text = "D:\DataCamp Projects\LLM\Flickr8k_text"
dataset_images = "D:\DataCamp Projects\LLM\Flickr8k_Dataset\Flicker8k_Dataset"

#we prepare our text data
#filename = dataset_text + "/" + "Flickr8k.token.txt"
#loading the file that contains all data
#mapping them into descriptions dictionary img to 5 captions
#descriptions = all_img_captions(filename)
#print("Length of descriptions =" ,len(descriptions))

In [8]:
#Cleaning the descriptions
#clean_descriptions = cleaning_text(descriptions)

#Building Vocabolary
#vocabolary = text_vocabolary(clean_descriptions)
#print("Length of Vocabolary = ", len(vocabolary))

#Saving each description to file 
#save_descriptions(clean_descriptions, "descriptions.txt")

In [9]:
#Download a model
def download_with_retry(url, filename, max_retries = 3):
    for attempt in range(max_retries):
        try:
            return get_file(filename, url)
        except Exception as e:
            if attempt == max_retries -1:
                raise e
            print(f"Download attempt failed")
            time.sleep(3)

weights_url = "https://storage.googleapis.com/tensorflow/keras-applications/xception/xception_weights_tf_dim_ordering_tf_kernels_notop.h5"
weights_path = download_with_retry(weights_url, 'xception_weights.h5')

#model = Xception(include_top=False, pooling="avg", weights=weights_path)

In [10]:
#Extract Features from Images
def extract_features(directory):
    features = {}
    valid_images = ['.jpg', 'jpeg', '.png']
    for img in tqdm(os.listdir(directory)):
        ext = os.path.splitext(img)[1].lower()
        if ext not in valid_images:
            continue
        filename = directory + "/" + img
        image = Image.open(filename)
        image = image.resize((299,299))
        image = np.expand_dims(image, axis=0)
        image = image/127.5
        image = image - 1.0

        feature = model.predict(image)
        features[img] = feature

    return features

In [11]:
#2048 Feature Vector
#features = extract_features(dataset_images)
#dump(features, open("features.p", 'wb'))

In [12]:
features = load(open("features.p", 'rb'))

#Load the Data
def load_photos(filename):
    file = load_file(filename)
    photos = file.split("\n")[:-1]
    photos_present = [photo for photo in photos if os.path.exists(os.path.join(dataset_images, photo))]
    return photos_present

def load_clean_descriptions(filename, photos):
    file = load_file(filename)
    descriptions = {}
    for line in file.split("\n"):
        words = line.split()
        if len(words) < 1:
            continue

        image, image_caption = words[0], words[1:]
        if image in photos:
            if image not in descriptions:
                descriptions[image] = []
            desc = '<start>' + " ".join(image_caption) + '<end>'
            descriptions[image].append(desc)

    return descriptions

def load_features(photos):
    all_features = load(open("features.p", "rb"))
    features = {k:all_features[k] for k in photos}
    return features


In [13]:
filename = dataset_text + "/" + "Flickr_8k.trainImages.txt"

train_imgs = load_photos(filename)
train_descriptions = load_clean_descriptions("descriptions.txt", train_imgs)
train_features = load_features(train_imgs)

In [14]:
#converting dictionary to clean list of descriptions
def dict_to_list(descriptions):
    all_desc = []
    for key in descriptions.keys():
        [all_desc.append(d) for d in descriptions[key]]
    return all_desc

#creating tokenizer class 
#this will vectorise text corpus
#each integer will represent token in dictionary
def create_tokenizer(descriptions):
    desc_list = dict_to_list(descriptions)
    tokenizer = Tokenizer()
    tokenizer.fit_on_texts(desc_list)
    return tokenizer

In [15]:
# give each word an index, and store that into tokenizer.p pickle file
tokenizer = create_tokenizer(train_descriptions)
#dump(tokenizer, open("tokenizer.p", "wb"))

In [16]:
vocab_size = len(tokenizer.word_index) + 1
print(vocab_size)

7577


In [17]:
#calculate maximum length of descriptions
def max_length(descriptions):
    desc_list = dict_to_list(descriptions)
    return max(len(d.split()) for d in desc_list)
    
max_length = max_length(train_descriptions)
print(max_length)

86


### Create input-output sequence pairs from the image description.

In [18]:
#Create Sequences
def create_sequences(tokenizer, max_length, desc_list, feature):
    X1, X2, y = list(), list(), list()
    for desc in desc_list:
        seq = tokenizer.texts_to_sequences([desc])[0]
        for i in range(1, len(seq)):
            in_seq, out_seq = seq[:i], seq[i]
            in_seq = pad_sequences([in_seq], maxlen=max_length)[0]
            out_seq = to_categorical([out_seq], num_classes=vocab_size)[0]
            X1.append(feature)
            X2.append(in_seq)
            y.append(out_seq)
        
    return np.array(X1), np.array(X2), np.array(y)

In [19]:
#Data generator, used by model.fit()
def data_generator(descriptions, features, tokenizer, max_length):
    def generator():
        for key, descriptions_list in descriptions.items():
            while True:
                feature = features[key]
                input_image, input_sequence, output_word = create_sequences(tokenizer, max_length, descriptions_list, feature)
                for i in range(len(input_image)):
                    yield {'input_1': input_image[i].squeeze(), 'input_2': input_sequence[i]}, output_word[i]

    # Define the output signature for the generator
    output_signature = (
        {
            'input_1': tf.TensorSpec(shape=(2048,), dtype = (tf.float32)),
            'input_2': tf.TensorSpec(shape=(max_length,), dtype= (tf.int32))
        },
        tf.TensorSpec(shape=(vocab_size,), dtype=(tf.float32))
    )

    # Create the dataset
    dataset = tf.data.Dataset.from_generator(
        generator,
        output_signature=output_signature
    )

    return dataset.batch(32)

In [20]:
#Check the shape of the input and output for your model
#dataset = data_genertor(train_descriptions, features, tokenizer, max_length)
#for (a, b) in dataset.take(1):
#    print(a['input_1'].shape, a['input_2'].shape, b.shape)
#    break

In [21]:
# define the captioning model
def define_model(vocab_size, max_length):

    # features from the CNN model squeezed from 2048 to 256 nodes
    inputs1 = Input(shape=(2048,), name='input_1')
    fe1 = Dropout(0.5)(inputs1)
    fe2 = Dense(256, activation='relu')(fe1)

    # LSTM sequence model
    inputs2 = Input(shape=(max_length,), name='input_2')
    se1 = Embedding(vocab_size, 256, mask_zero=True)(inputs2)
    se2 = Dropout(0.5)(se1)
    se3 = LSTM(256)(se2)

    # Merging both models
    decoder1 = add([fe2, se3])
    decoder2 = Dense(256, activation='relu')(decoder1)
    outputs = Dense(vocab_size, activation='softmax')(decoder2)

    # tie it together [image, seq] [word]
    model = Model(inputs=[inputs1, inputs2], outputs=outputs)
    model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'], run_eagerly=True)

    # summarize model
    print(model.summary())
    #plot_model(model, to_file='model.png', show_shapes=True)

    return model

In [None]:
# train our model
print('Dataset: ', len(train_imgs))
print('Descriptions: train=', len(train_descriptions))
print('Photos: train=', len(train_features))
print('Vocabulary Size:', vocab_size)
print('Description Length: ', max_length)

model = define_model(vocab_size, max_length)
epochs = 10

def get_steps_per_epoch(train_descriptions):
    total_sequences = 0
    for img_captions in train_descriptions.values():
        for caption in img_captions:
            words = caption.split()
            total_sequences += len(words) - 1
    # Ensure at least 1 step, even if sequences < batch_size
    return max(1, total_sequences // 32)

# Update training loop
steps = get_steps_per_epoch(train_descriptions)
print("Steps per epoch: ", steps)  # Debugging line

# making a directory models to save our models
os.makedirs("models", exist_ok=True)
for i in range(epochs):
    dataset = data_generator(train_descriptions, train_features, tokenizer, max_length)
    # Verify dataset is not empty
    for data in dataset.take(1):  # Take one batch to inspect it
        print("Sample data:", data)

    model.fit(dataset, epochs=4, steps_per_epoch=steps, verbose=1)
    model.save(f"models/model_{i}.h5")

Dataset:  6000
Descriptions: train= 6000
Photos: train= 6000
Vocabulary Size: 7577
Description Length:  86
Model: "model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_2 (InputLayer)           [(None, 86)]         0           []                               
                                                                                                  
 input_1 (InputLayer)           [(None, 2048)]       0           []                               
                                                                                                  
 embedding (Embedding)          (None, 86, 256)      1939712     ['input_2[0][0]']                
                                                                                                  
 dropout (Dropout)              (None, 2048)         0           ['input_1[0][0]']    