In [None]:
import os
import pickle
from tqdm import tqdm
from textwrap import wrap

import numpy as np
import matplotlib.pyplot as plt
import pandas as pd

import tensorflow as tf
import keras

In [None]:
image_path = 'flickr30k/Images'
caption_path = 'flickr30k/captions.txt'

data = pd.read_csv(caption_path)
data.head()

In [None]:
def readImage(path, img_size=224):
    img = tf.keras.preprocessing.image.load_img(path, color_mode='rgb', target_size=(img_size, img_size))
    img = tf.keras.preprocessing.image.img_to_array(img)
    img = img / 255.
    return img

def display_images(temp_df):
    temp_df = temp_df.reset_index(drop=True)
    plt.figure(figsize=(20, 20))
    n = 0
    for i in range(15):
        n += 1
        plt.subplot(5, 5, n)
        plt.subplots_adjust(hspace=0.7, wspace=0.3)
        image = readImage(f"C:/Users/muham/Documents/COOLYEAH/Semester 7/Bangkit/Capstone/EyesOnMe-ML/flickr30k/Images/{temp_df.image[i]}")
        plt.imshow(image)
        plt.title("\n".join(wrap(temp_df.caption[i], 20)))
        plt.axis("off")

display_images(data.sample(15))

In [None]:
def text_preprocessing(data):
    data['caption'] = data['caption'].astype(str)
    data['caption'] = data['caption'].apply(lambda x: x.lower())
    data['caption'] = data['caption'].apply(lambda x: x.replace("[^A-Za-z]", ""))
    data['caption'] = data['caption'].apply(lambda x: x.replace("\s+", " "))
    data['caption'] = data['caption'].apply(lambda x: " ".join([word for word in x.split() if len(word) > 1]))
    data['caption'] = "<start> " + data['caption'] + " <end>"
    return data

data = text_preprocessing(data)
captions = data['caption'].tolist()

In [None]:
vectorizer = tf.keras.layers.TextVectorization(
    standardize=None,
    max_tokens=None,
    output_mode='int',
    output_sequence_length=None
)
vectorizer.adapt(captions)

vocabulary = vectorizer.get_vocabulary()
VOCAB_SIZE = len(vocabulary) + 1  # +1 for any special token, if needed
MAX_LENGTH = max(len(caption.split()) for caption in captions)  # Same as before

sequences = vectorizer(captions).numpy()
padded_sequences = tf.keras.utils.pad_sequences(sequences, maxlen=MAX_LENGTH, padding='post')

images = data['image'].unique().tolist()
nimages = len(images)

train_split = round(0.7 * nimages)
val_split = round(0.85 * nimages)

train_images = images[:train_split]
val_images = images[train_split:val_split]
test_images = images[val_split:]

train_data = data[data['image'].isin(train_images)].reset_index(drop=True)
val_data = data[data['image'].isin(val_images)].reset_index(drop=True)
test_data = data[data['image'].isin(test_images)].reset_index(drop=True)

In [None]:
train_imgs = train_data['image'].tolist()
val_imgs = val_data['image'].tolist()
test_imgs = test_data['image'].tolist()

train_captions = train_data['caption'].tolist()
val_captions = val_data['caption'].tolist()
test_captions = test_data['caption'].tolist()

def get_full_path(imgs):
    return [f"{image_path}/{img}" for img in imgs]

train_imgs = get_full_path(train_imgs)
val_imgs = get_full_path(val_imgs)
test_imgs = get_full_path(test_imgs)

In [None]:
train_dataset = tf.data.Dataset.from_tensor_slices((train_imgs, train_captions))
val_dataset = tf.data.Dataset.from_tensor_slices((train_imgs, train_captions))
test_dataset = tf.data.Dataset.from_tensor_slices((train_imgs, train_captions))

vectorizer = tf.keras.layers.TextVectorization( 
    standardize=None,
    max_tokens=VOCAB_SIZE,
    ragged=True
)
vectorizer.adapt(train_captions)
vocabulary = vectorizer.get_vocabulary()

print(len(vocabulary), vocabulary[:5], vectorizer(train_captions[0]), train_captions[0], end='\n\n')

ragged = vectorizer(train_captions)
padded = tf.keras.utils.pad_sequences(sequences=ragged.to_list(), padding='post', maxlen=MAX_LENGTH)

ragged[0], padded[0], train_captions[0]

In [None]:
model = tf.keras.applications.DenseNet201()
fe = tf.keras.models.Model(inputs=model.input, outputs=model.layers[-2].output)

img_size = 224
imgpaths = np.unique(train_imgs + val_imgs + test_imgs)
features = {}
for imgpath in tqdm(imgpaths):
    img = tf.keras.preprocessing.image.load_img(imgpath,target_size=(img_size,img_size))
    img = tf.keras.preprocessing.image.img_to_array(img)
    img = img/255.
    img = np.expand_dims(img,axis=0)
    feature = fe.predict(img, verbose=0)
    features[imgpath] = feature

In [None]:
# with open('features.pkl', 'wb') as f:
#     pickle.dump(features, f)

# with open('features.pkl', 'rb') as f:
#     features = pickle.load(f)
features = {os.path.basename(path): value for path, value in features.items()}

In [None]:
input1 = tf.keras.layers.Input(shape=(1, 1920))
input2 = tf.keras.layers.Input(shape=(MAX_LENGTH,))

img_features = tf.keras.layers.Dense(256, activation='relu')(input1)
img_features_reshaped = tf.keras.layers.Reshape((1, 256), input_shape=(256,))(img_features)

sentence_features = tf.keras.layers.Embedding(VOCAB_SIZE, 256, mask_zero=False)(input2)
merged = tf.keras.layers.concatenate([img_features_reshaped, sentence_features],axis=1)
sentence_features = tf.keras.layers.LSTM(256)(merged)
x = tf.keras.layers.Dropout(0.5)(sentence_features)
x = tf.keras.layers.add([x, img_features])
x = tf.keras.layers.Dense(128, activation='relu')(x)
x = tf.keras.layers.Dropout(0.5)(x)
x = tf.keras.layers.Flatten()(x)
output = tf.keras.layers.Dense(VOCAB_SIZE, activation='softmax')(x)

caption_model = tf.keras.models.Model(inputs=[input1,input2], outputs=output)
caption_model.compile(loss='categorical_crossentropy',optimizer='adam', metrics=['accuracy'])
caption_model.summary()

In [None]:
class CustomDataGenerator(tf.keras.utils.Sequence):
    def __init__(self, df, X_col, y_col, features, vectorizer, vocab_size, max_length, batch_size, shuffle=True):
        self.df = df.copy()
        self.X_col = X_col
        self.y_col = y_col
        self.features = features
        self.vectorizer = vectorizer
        self.vocab_size = vocab_size
        self.max_length = max_length
        self.batch_size = batch_size
        self.shuffle = shuffle
        self.n = len(self.df)
        self.indices = np.arange(self.n)

        if self.shuffle:
            np.random.shuffle(self.indices)

    def __len__(self):
        return self.n // self.batch_size

    def on_epoch_end(self):
        if self.shuffle:
            np.random.shuffle(self.indices)

    def __getitem__(self, index):
        start = index * self.batch_size
        end = start + self.batch_size
        batch_indices = self.indices[start:end]
        batch_df = self.df.iloc[batch_indices]

        return self.__get_data(batch_df)

    def __get_data(self, batch_df):
        X1, X2, y = [], [], []
    
        images = batch_df[self.X_col].tolist()
    
        for image in images:
            feature = self.features[image][0]
            captions = batch_df.loc[batch_df[self.X_col] == image, self.y_col].tolist()
    
            for caption in captions:
                seq = self.vectorizer(caption).numpy()
    
                for i in range(1, len(seq)):
                    in_seq, out_seq = seq[:i], seq[i]
                    in_seq = tf.keras.utils.pad_sequences([in_seq], maxlen=self.max_length, padding="pre")[0]
                    out_seq = tf.keras.utils.to_categorical([out_seq], num_classes=self.vocab_size)[0]
    
                    X1.append(feature)
                    X2.append(in_seq)
                    y.append(out_seq)
    
        X1, X2, y = np.array(X1), np.array(X2), np.array(y)
        X1 = np.expand_dims(X1, axis=1)
    
        return (X1, X2), y

In [None]:
train_generator = CustomDataGenerator(
    df=train_data,
    X_col='image',
    y_col='caption',
    features=features,
    vectorizer=vectorizer,
    vocab_size=VOCAB_SIZE,
    max_length=MAX_LENGTH,
    batch_size=64,
    shuffle=True
)

validation_generator = CustomDataGenerator(
    df=test_data,
    X_col='image',
    y_col='caption',
    features=features,
    vectorizer=vectorizer,
    vocab_size=VOCAB_SIZE,
    max_length=MAX_LENGTH,
    batch_size=64,
    shuffle=False
)

In [None]:
model_name = 'image_captioning_model.keras'
checkpoint = tf.keras.callbacks.ModelCheckpoint(model_name,
                                                monitor="val_loss",
                                                mode="min",
                                                save_best_only = True,
                                                verbose=1)

earlystopping = tf.keras.callbacks.EarlyStopping(monitor='val_loss',min_delta = 0, patience = 5, verbose = 1, restore_best_weights=True)

learning_rate_reduction = tf.keras.callbacks.ReduceLROnPlateau(monitor='val_loss', 
                                                               patience=3, 
                                                               verbose=1, 
                                                               factor=0.2, 
                                                               min_lr=0.00000001)

In [None]:
history = caption_model.fit(
        train_generator,
        epochs=50,
        validation_data=validation_generator,
        callbacks=[checkpoint,earlystopping,learning_rate_reduction])

In [None]:
plt.figure(figsize=(20,8))
plt.plot(history.history['loss'])
plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train'], loc='upper left')
plt.show()

In [None]:
# def predict_caption(model, image, vectorizer, max_length, features):
#     feature = features[image]
#     feature = tf.convert_to_tensor(feature)
#     feature = tf.expand_dims(feature, axis=0)
#     in_text = "<start>"
#     for i in range(max_length):
#         sequence = vectorizer([in_text])[0]
#         sequence = tf.keras.utils.pad_sequences([sequence], max_length)
        
#         y_pred = model.predict((feature, sequence), verbose=0)
#         y_pred = np.argmax(y_pred)
        
#         word = vectorizer.get_vocabulary()[y_pred]
        
#         if word is None:
#             break
            
#         in_text += " " + word
        
#         if word == '<end>':            
#             break
            
#     return in_text

In [None]:
# path = train_imgs[150]
# img = tf.keras.preprocessing.image.load_img(path, color_mode='rgb',target_size=(244,244))
# img = tf.keras.preprocessing.image.img_to_array(img)
# img /= 255
# plt.imshow(img)
# predict_caption(caption_model, path, vectorizer, MAX_LENGTH, features)