In [1]:
import os
import torchmetrics 
from torchmetrics.text.bleu import BLEUScore
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras.applications import VGG19
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, SimpleRNN, Embedding, Dense, Flatten, Reshape, Concatenate
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from collections import defaultdict
from tqdm import tqdm
import pickle 
from tensorflow.keras.losses import SparseCategoricalCrossentropy
from tensorflow import keras
from keras.preprocessing.text import tokenizer_from_json




In [2]:
df_i = pd.read_csv(r'C:\Users\ANN MARY\Desktop\FODL_2\image_names.csv')
df_ic = pd.read_csv(r'C:\Users\ANN MARY\Desktop\FODL_2\new_captions.csv')
train_df, test_df = train_test_split(df_i, test_size=0.2, random_state=42, shuffle=True)

In [3]:
#tokenizer : global since its required for test purposes as well : 
# tokens_ip = df_ic['caption'].values
# tokenizer = Tokenizer(oov_token="<unk>")
# tokenizer.fit_on_texts(tokens_ip)



#  #Save tokenizer to a file
# tokenizer_json = tokenizer.to_json()

# # Write the JSON string to a file
# with open("tokenizer.json", "w") as json_file:
#     json_file.write(tokenizer_json)


In [4]:
with open("tokenizer.json", "r") as json_file:
    tokenizer_json = json_file.read()
    tokenizer = tokenizer_from_json(tokenizer_json)


In [5]:
word_index = tokenizer.word_index 
index_word = {index: word for word, index in word_index.items()}

In [6]:
# Load GloVe embeddings
def load_glove_embeddings( embedding_dim=200):
    embeddings_index = {}
    with open("glove.6B.200d.txt", encoding="utf-8") as f:
        for line in f:
            values = line.split()
            word = values[0]
            coeffs = np.asarray(values[1:], dtype='float32')
            embeddings_index[word] = coeffs

    embedding_matrix = np.zeros((len(word_index) + 1, embedding_dim))
    for word, i in word_index.items():
        embedding_vector = embeddings_index.get(word)
        if embedding_vector is not None:
            embedding_matrix[i] = embedding_vector

    return embedding_matrix


In [7]:
# embedding_matrix = load_glove_embeddings()
# np.save("embedding_matrix.npy", embedding_matrix)

In [8]:
embedding_matrix = np.load("embedding_matrix.npy", allow_pickle=False)

In [9]:
# Load and preprocess images
def preprocess_images(df, img_p, target_size=(224, 224)):
    # Initialize model
    names = df['name'].values
    images = {}
    vgg = VGG19(weights='imagenet', include_top=True)
    model = Model(inputs=vgg.input, outputs=vgg.get_layer('fc2').output)  # Extract from fc2 layer (4096-dim)

    # Loop for extracting features
    for img_name in tqdm(names):
        img_path = os.path.join(img_p, img_name)
        img = load_img(img_path, target_size=target_size)
        img = img_to_array(img)
        img = np.expand_dims(img, axis=0)
        img = tf.keras.applications.vgg19.preprocess_input(img)
        features = model.predict(img, verbose=0)
        images[img_name] = features
    
    return images


In [10]:
# extract_to_folder = r'C:\Users\ANN MARY\Desktop\FODL_2\Images'
# images = preprocess_images( df_i, extract_to_folder)

# # Save the dictionary
# np.savez_compressed("image_features.npz", **images)

In [11]:
loaded = np.load("image_features.npz")
images = {key: loaded[key] for key in loaded.files}

In [12]:
def preprocess_data(df):

  #pre process captions 

  captions = df['caption'].values
  sequences = tokenizer.texts_to_sequences(captions)
  # Pad the sequences to ensure uniform length
  max_sequence_length = max([len(x) for x in sequences ])
  

  #preprocess image features 
  img_cap_dict = defaultdict(list)
  for img, seq in zip(df['name'].values, sequences):
        img_cap_dict[img].append(seq)
  return  img_cap_dict, max_sequence_length+1

In [13]:


def create_rnn_model(vocab_size, embedding_dim=200, max_caption_length=20, embedding_matrix=None):
    # Image input (from the fully connected layer of VGG16)
    image_input = Input(shape=(4096,))  # VGG16 FC layer outputs 4096 features
    img_dense = Dense(50, activation='relu')(image_input)  # Reduce dimensionality to match RNN hidden size
    
    # Caption input
    text_input = Input(shape=(max_caption_length,))
    text_embedding = Embedding(input_dim=vocab_size, 
                               output_dim=embedding_dim, 
                               weights=[embedding_matrix] if embedding_matrix is not None else None, 
                               trainable=False, 
                               mask_zero=True)(text_input)

    # RNN layer: use image features as initial state
    # SimpleRNN expects initial_state as a list with one element
    rnn_out = SimpleRNN(50, return_sequences=True)(
        text_embedding, initial_state=[img_dense]
    )

    # Output layer: project RNN outputs to vocab size
    token_output = Dense(vocab_size, activation='softmax')(rnn_out)

    # Define and compile the model
    model = Model(inputs=[image_input, text_input], outputs=token_output)
    model.compile(
        loss=SparseCategoricalCrossentropy(from_logits=False),
        optimizer=Adam(learning_rate=0.001),
        metrics=['accuracy']
    )
    return model


In [14]:
# Training the model
vocab_size = len(word_index)+1
def train_model(model, images, captions_dict, max_len):
    X_images, X_texts, Y_texts = [], [], []

    for img_name, captions in captions_dict.items():
        if img_name in train_df['name'].values:
            for caption in captions:
                img_feature = images[img_name].squeeze()  # Shape: (4096,)

                # Input sequence (e.g., [<start>, a, cat])
                # Output sequence (e.g., [a, cat, <end>])
                input_seq = caption[:-1]
                output_seq = caption[1:]

                # Pad input and output to max_len
                input_seq_padded = pad_sequences([input_seq], maxlen=max_len, padding='post')[0]
                output_seq_padded = pad_sequences([output_seq], maxlen=max_len, padding='post')[0]

                X_images.append(img_feature)
                X_texts.append(input_seq_padded)
                Y_texts.append(output_seq_padded)

    # Convert to numpy arrays
    X_images = np.array(X_images)                   # (num_samples, 4096)
    X_texts = np.array(X_texts)                     # (num_samples, max_len)
    Y_texts = np.array(Y_texts)                     # (num_samples, max_len)

    # Train model
    model.fit([X_images, X_texts], Y_texts,
              batch_size=100, epochs=30, validation_split=0.1)

    return model


In [16]:

img_cap_dict, max_len  = preprocess_data(df_ic) # img_cap_dict is a dictionary containing images mapped to a list of its captions 
model = create_rnn_model(len(word_index)+1, max_caption_length= max_len)
trained_model_rnn = train_model(model, images, img_cap_dict, max_len)
trained_model_rnn.save('trained_model_rnn.keras')

Epoch 17/30
Epoch 18/30
Epoch 19/30
Epoch 20/30
Epoch 21/30
Epoch 22/30
Epoch 23/30
Epoch 24/30
Epoch 25/30
Epoch 26/30
Epoch 27/30
Epoch 28/30
Epoch 29/30
Epoch 30/30


In [17]:
trained_model_rnn = keras.models.load_model('trained_model_rnn.keras')

In [18]:
def generate_caption(model, image_feature, tokenizer, max_len):
    # Start with <start> token
    in_text = ['xyzw']

    for _ in range(max_len):
        # Convert words to token sequence
        sequence = tokenizer.texts_to_sequences([in_text])[0]

        # Pad sequence
        sequence = pad_sequences([sequence], maxlen=max_len, padding='post')

        # Predict next word
        yhat = model.predict([np.expand_dims(image_feature, axis=0), sequence], verbose=0)
        yhat = np.argmax(yhat[0], axis=1)[len(in_text)-1]  # Get next word token

        # Map token to word
        word = tokenizer.index_word.get(yhat, None)
        if word is None or word == 'abcd':
            break

        in_text.append(word)
        

    return ' '.join(in_text[1:])  # Skip <start>

In [19]:
train_dict = {k: v for k, v in img_cap_dict.items() if k in train_df['name'].values}
test_dict = {k : v for k, v in img_cap_dict.items() if k in test_df['name'].values} 

In [None]:
def evaluate_model_bar(dict_req): 
    references = []
    candidates = []
    blue_1 =[]
    blue_2 =[]
    blue_3 =[]
    blue_4 =[]
    i=0
    for image_name, ref_captions in tqdm(dict_req.items()): 
        if True: 
            image_features = images[image_name].squeeze()
            generated_caption = generate_caption(trained_model_rnn, image_features, tokenizer, max_len)
            generated_caption = generated_caption.split()
            candidates.append(' '.join(generated_caption))
            references.append([' '.join([index_word.get(token, '') for token in ref if token != 0 and token != tokenizer.word_index['xyzw']])for ref in ref_captions])
        
            #bleu k =1, 2, 3, 4
            candidate_tokens = candidates[-1].split()
            reference_tokens = [ref.split() for ref in references[-1]]  # multiple references per caption

            # Compute BLEU-1
            metric_1 = BLEUScore(n_gram=1)
            score_1 = metric_1([candidates[-1]], [references[-1]]).item()
            blue_1.append(score_1)

            metric_2 = BLEUScore(n_gram=2)
            score_2 = metric_2([candidates[-1]], [references[-1]]).item()
            blue_2.append(score_2)

            metric_3 = BLEUScore(n_gram=3)
            score_3 = metric_3([candidates[-1]], [references[-1]]).item()
            blue_3.append(score_3)

            metric_4 = BLEUScore(n_gram=4)
            score_4 = metric_4([candidates[-1]], [references[-1]]).item()
            blue_4.append(score_4)

        
            i+=1
            
    print(i)
    
    bleu1 = sum(blue_1)/len(blue_1)
    bleu2 = sum(blue_2)/len(blue_2)
    bleu3 = sum(blue_3)/len(blue_3)
    bleu4 = sum(blue_4)/len(blue_4)

    # Print
    print(f"BLEU-1: {bleu1:.4f}")
    print(f"BLEU-2: {bleu2:.4f}")
    print(f"BLEU-3: {bleu3:.4f}")
    print(f"BLEU-4: {bleu4:.4f}")


    return 

In [22]:
print('scores for test data')
evaluate_model_bar(test_dict)
print('scores for train data')
evaluate_model_bar(train_dict)

scores for test data


  0%|          | 0/800 [00:00<?, ?it/s]

100%|██████████| 800/800 [19:48<00:00,  1.49s/it]


800
BLEU-1: 0.4173
BLEU-2: 0.2247
BLEU-3: 0.0932
BLEU-4: 0.0393
scores for train data


100%|██████████| 3200/3200 [1:31:55<00:00,  1.72s/it]    

3200
BLEU-1: 0.4487
BLEU-2: 0.2559
BLEU-3: 0.1065
BLEU-4: 0.0459





In [None]:
# trained_model.save('trained_model.keras')