## Image Captioning with WER as a metric

#### Mount Google Drive for the caption and images to train on.

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
!pip install jiwer

Collecting jiwer
  Downloading jiwer-3.0.4-py3-none-any.whl (21 kB)
Collecting rapidfuzz<4,>=3 (from jiwer)
  Downloading rapidfuzz-3.9.3-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.4/3.4 MB[0m [31m16.0 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: rapidfuzz, jiwer
Successfully installed jiwer-3.0.4 rapidfuzz-3.9.3


In [3]:
import pandas as pd
import numpy as np
import tensorflow as tf
from tensorflow.keras.applications.vgg16 import VGG16, preprocess_input
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Dense, LSTM, Embedding, Dropout, add
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.utils import to_categorical
import json
import os
import jiwer
import nltk
nltk.download('stopwords')
from nltk.corpus import stopwords

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.


In [4]:
# Check if using GPU
print("Num GPUs Available: ", len(tf.config.experimental.list_physical_devices('GPU')))

Num GPUs Available:  1


#### Read the caption file from drive and load in dataframe.

In [5]:
# Define the path to your JSON file
json_file_path = '/content/drive/MyDrive/NLP/output_json.json'

# Read the JSON file into a DataFrame
caption_df = pd.read_json(json_file_path, orient='records', lines=True)

# Print the DataFrame to verify
print(caption_df.head())

        image/key                                                IIW  \
0  aar_test_04600  A close-up outdoor shot shows an Echinops Bann...   
1  aar_test_04601  In a low-angle shot, a wall decorated with a p...   
2  aar_test_04602  In an eye-level shot, a brick wall extends fro...   
3  aar_test_04603  A close-up view of a brick wall reveals the wo...   
4  aar_test_04604  A broad, pale-hued shelf encompasses a large m...   

                                       processed_IIW  \
0  a closeup outdoor shot shows an echinops banna...   
1  in a lowangle shot a wall decorated with a pat...   
2  in an eyelevel shot a brick wall extends from ...   
3  a closeup view of a brick wall reveals the wor...   
4  a broad palehued shelf encompasses a large mat...   

                                         summary_IIW  
0  a closeup outdoor shot shows an echinops banna...  
1  a wall decorated with a pattern of square tile...  
2  brick wall extends from the foreground on the ...  
3  a close

#### Read the images from drive and load to the same dataframe

In [6]:
# directory where images are stored
image_dir = '/content/drive/MyDrive/NLP/Image_set/'

# Function to load and preprocess images
def load_and_preprocess_image(image_key):
    image_path = os.path.join(image_dir, f"{image_key}.jpg")
    image = load_img(image_path, target_size=(224, 224))   ## preprocessing, change size to 224x224
    image = img_to_array(image)
    image = np.expand_dims(image, axis=0)
    image = preprocess_input(image)
    return image

# Load and preprocess all images
caption_df['image_data'] = caption_df['image/key'].apply(load_and_preprocess_image)


## Extracting features from image using VGG16-ImageNet.

In [7]:
# Load VGG16 model pre-trained on ImageNet
vgg_model = VGG16(weights='imagenet')
vgg_model = Model(inputs=vgg_model.inputs, outputs=vgg_model.layers[-2].output)

# Function to extract features using VGG16
def extract_features(image_data):
    features = vgg_model.predict(image_data, verbose=0)
    return features

# Extract features for all images
caption_df['image_features'] = caption_df['image_data'].apply(extract_features)

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/vgg16/vgg16_weights_tf_dim_ordering_tf_kernels.h5


## Preparing the Caption data

In [10]:
## Loading stopwords
stop_words = set(stopwords.words('english'))

## Removing stopwords
def remove_stop_words(text):
    tokens = text.split()
    filtered_tokens = [word for word in tokens if word.lower() not in stop_words]
    return ' '.join(filtered_tokens)

# Apply the function to the 'summary_IIW' column
caption_df['filtered_summary_IIW'] = caption_df['summary_IIW'].apply(remove_stop_words)

In [11]:
# Tokenize the captions
tokenizer = Tokenizer()
tokenizer.fit_on_texts(caption_df['filtered_summary_IIW'])
vocab_size = len(tokenizer.word_index) + 1

# Convert captions to sequences
sequences = tokenizer.texts_to_sequences(caption_df['filtered_summary_IIW'])

# Pad sequences
max_length = max(len(seq) for seq in sequences)
padded_sequences = pad_sequences(sequences, maxlen=max_length, padding='post')

## Split Data into Train and Test Sets

In [12]:
from sklearn.model_selection import train_test_split

# Prepare features and captions arrays
features = np.vstack(caption_df['image_features'])
captions = padded_sequences

# Split data into training and testing sets
train_features, test_features, train_captions, test_captions = train_test_split(features, captions, test_size=0.2, random_state=42)

# Image Captioning Model

In [13]:
# Image captioning model
# Image feature extractor
image_input = Input(shape=(4096,))
image_dense = Dropout(0.5)(image_input)
image_dense = Dense(256, activation='relu')(image_dense)

# Sequence processor
caption_input = Input(shape=(max_length,))
caption_embedding = Embedding(vocab_size, 256, mask_zero=True)(caption_input)
caption_lstm = Dropout(0.5)(caption_embedding)
caption_lstm = LSTM(256)(caption_lstm)

# Decoder (feed both outputs into a single model)
decoder = add([image_dense, caption_lstm])
decoder = Dense(256, activation='relu')(decoder)
outputs = Dense(vocab_size, activation='softmax')(decoder)

# Compile the model
model = Model(inputs=[image_input, caption_input], outputs=outputs)
model.compile(loss='categorical_crossentropy', optimizer='adam')

model.summary()

Model: "model_1"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_3 (InputLayer)        [(None, 65)]                 0         []                            
                                                                                                  
 input_2 (InputLayer)        [(None, 4096)]               0         []                            
                                                                                                  
 embedding (Embedding)       (None, 65, 256)              976640    ['input_3[0][0]']             
                                                                                                  
 dropout (Dropout)           (None, 4096)                 0         ['input_2[0][0]']             
                                                                                            

# Train the Model

#### Define the parameters

In [14]:
# Define parameters
batch_size = 32
epochs = 20
steps = len(train_features) // batch_size

#### Creating data generators

In [15]:
# Prepare data generators
def data_generator(features, captions, batch_size):
    while True:
        for i in range(0, len(features), batch_size):
            batch_features = features[i:i+batch_size]
            batch_captions = captions[i:i+batch_size]
            X1, X2, y = [], [], []
            for j in range(len(batch_features)):
                seq = batch_captions[j]
                for k in range(1, len(seq)):
                    in_seq, out_seq = seq[:k], seq[k]
                    in_seq = pad_sequences([in_seq], maxlen=max_length)[0]
                    out_seq = to_categorical([out_seq], num_classes=vocab_size)[0]
                    X1.append(batch_features[j])
                    X2.append(in_seq)
                    y.append(out_seq)
            yield [np.array(X1), np.array(X2)], np.array(y)

In [16]:
# Create train-test data generators
train_generator = data_generator(train_features, train_captions, batch_size)
test_generator = data_generator(test_features, test_captions, batch_size)

## Model Fit

### Introducing Word Error Rate (WER) into the training loop to fine tune the model.
Using WER as a metric to fine-tune an image captioning model helps ensure that the generated captions are as close as possible to the ground-truth captions. By integrating WER into the training loop, you can dynamically adjust the training process to improve model performance, achieve better generalization, and avoid overfitting. This process involves calculating WER for validation data, adjusting learning rates, implementing early stopping, and saving the best model based on WER scores.

In [17]:
def generate_caption(model, tokenizer, photo, max_length):
    in_text = 'startseq'
    for _ in range(max_length):
        sequence = tokenizer.texts_to_sequences([in_text])[0]
        sequence = pad_sequences([sequence], maxlen=max_length)
        yhat = model.predict([photo, sequence], verbose=0)
        yhat = np.argmax(yhat)
        word = tokenizer.index_word.get(yhat)
        if word is None:
            break
        in_text += ' ' + word
        if word == 'endseq':
            break
    final_caption = in_text.split()[1:-1]
    final_caption = ' '.join(final_caption)
    return final_caption

In [18]:
## Implement a function to calculate WER between generated and reference captions.
def calculate_wer(reference, hypothesis):
    return jiwer.wer(reference, hypothesis)

def evaluate_model(model, tokenizer, test_features, test_captions, max_length):
    actual, predicted = list(), list()
    for img_features, caption in zip(test_features, test_captions):
        img_features = np.expand_dims(img_features, axis=0)  # Ensure the correct shape
        yhat = generate_caption(model, tokenizer, img_features, max_length)
        actual_caption = ' '.join([tokenizer.index_word[i] for i in caption if i > 0])
        actual.append(actual_caption)
        predicted.append(yhat)
    # Calculate WER for all predictions
    wer_scores = [calculate_wer(act, pred) for act, pred in zip(actual, predicted)]
    return sum(wer_scores) / len(wer_scores)

# Define the image captioning model, assuming it's already defined and compiled

# Training loop
best_wer = float('inf')
patience = 5
patience_counter = 0


In [19]:
for epoch in range(epochs):
    model.fit(train_generator, epochs=1, steps_per_epoch=steps, validation_data=test_generator, validation_steps=len(test_features) // batch_size)

    # Calculate WER on the validation set
    current_wer = evaluate_model(model, tokenizer, test_features, test_captions, max_length)
    print(f'Epoch {epoch+1}, WER: {current_wer}')

    # Check if the WER has improved
    if current_wer < best_wer:
        best_wer = current_wer
        model.save('best_model.h5')
        patience_counter = 0
    else:
        patience_counter += 1

    # Early stopping
    print("patience_counter: ", patience_counter)
    if patience_counter >= patience:
        print('Early stopping triggered')
        break

Epoch 1, WER: 1.0
patience_counter:  0


  saving_api.save_model(


Epoch 2, WER: 1.0
patience_counter:  1
Epoch 3, WER: 1.0
patience_counter:  2
Epoch 4, WER: 1.0
patience_counter:  3
Epoch 5, WER: 1.0
patience_counter:  4
Epoch 6, WER: 0.9994565217391305
patience_counter:  0
Epoch 7, WER: 1.0146932594463816
patience_counter:  1
Epoch 8, WER: 0.9981291806020067
patience_counter:  0
Epoch 9, WER: 0.9990049433785131
patience_counter:  1
Epoch 10, WER: 0.9967820483170854
patience_counter:  0
Epoch 11, WER: 1.0082535777012382
patience_counter:  1
Epoch 12, WER: 1.039985651887472
patience_counter:  2
Epoch 13, WER: 1.0377153171855196
patience_counter:  3
Epoch 14, WER: 1.2341906971052257
patience_counter:  4
Epoch 15, WER: 1.1015475019529126
patience_counter:  5
Early stopping triggered


# Save the trained model and tokenizer

In [20]:
# Save the trained model
model.save('/content/drive/MyDrive/NLP/new_model.h5')

In [21]:
# Save the tokenizer to a JSON file
tokenizer_json = tokenizer.to_json()
with open('/content/drive/MyDrive/NLP/new_tokenizer.json', 'w') as json_file:
    json_file.write(tokenizer_json)