In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# install the required libraries for BLIP
!pip install transformers timm torch torchvision matplotlib

In [None]:
import os
import pickle
import numpy as np
from tqdm.notebook import tqdm
from PIL import Image
import matplotlib.pyplot as plt
from transformers import BlipProcessor, BlipForConditionalGeneration
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.models import Model
from tensorflow.keras.utils import to_categorical, plot_model
from tensorflow.keras.layers import Input, Dense, LSTM, Embedding, Dropout, Add
from tensorflow.keras.callbacks import ModelCheckpoint
from tensorflow.keras.models import load_model
import tensorflow as tf
from nltk.translate.bleu_score import corpus_bleu

In [None]:

BASE_DIR = '/content/drive/MyDrive/Hindi Image Captioning'
WORKING_DIR = '/content/drive/MyDrive/Image Captioning/working'

In [None]:
# Load BLIP processor and model
processor = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-base")
model_blip = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-image-captioning-base")


In [None]:
# extract features from image
features = {}
directory = os.path.join(BASE_DIR, 'Flicker 8K Dataset/Images')

for img_name in tqdm(os.listdir(directory)):
    # Load the image
    img_path = os.path.join(directory, img_name)
    image = Image.open(img_path).convert("RGB")

    # Preprocess and extract features with BLIP
    inputs = processor(image, return_tensors="pt")
    feature = model_blip.vision_model(inputs["pixel_values"]).last_hidden_state.mean(dim=1).detach().numpy()

    # Store the feature
    image_id = img_name.split('.')[0]
    features[image_id] = feature

In [None]:
# Store features in pickle
pickle.dump(features, open(os.path.join(WORKING_DIR, 'features_blip.pkl'), 'wb'))


In [None]:
# Load features from pickle
with open(os.path.join(WORKING_DIR, 'features_blip.pkl'), 'rb') as f:
    features = pickle.load(f)


In [None]:
# Processing captions
with open(os.path.join(BASE_DIR, 'Hindi captions/UnClean-5Sentences_withComma.txt'), 'r') as f:
    next(f)
    captions_doc = f.read()


In [None]:
# Create mapping of image to captions
mapping = {}
for line in tqdm(captions_doc.split('\n')):
    # Split the line by comma(,)
    tokens = line.split(',')
    if len(line) < 2:
        continue
    image_id, caption = tokens[0], tokens[1:]
    image_id = image_id.split('.')[0]
    caption = " ".join(caption)
    if image_id not in mapping:
        mapping[image_id] = []
    mapping[image_id].append(caption)

In [None]:
 # create mapping of image to captions
mapping = {}
# process lines
for line in tqdm(captions_doc.split('\n')):
    # split the line by comma(,)
    tokens = line.split(',')
    if len(line) < 2:
        continue
    image_id, caption = tokens[0], tokens[1:]
    # remove extension from image ID
    image_id = image_id.split('.')[0]
    # convert caption list to string
    caption = " ".join(caption)
    # create list if needed
    if image_id not in mapping:
        mapping[image_id] = []
    # store the caption
    mapping[image_id].append(caption)

In [None]:
def clean(mapping):
  for key, captions in mapping.items():
        for i in range(len(captions)):
            # take one caption at a time
            caption = captions[i]
            # preprocessing steps
            # convert to lowercase
            caption = caption.lower()
            # delete digits, special chars, etc.,
            caption = caption.replace('[^A-Za-z]', '')
            # delete additional spaces
            caption = caption.replace('\s+', ' ')
            # add start and end tags to the caption
            caption = 'startseq ' + " ".join([word for word in caption.split() if len(word)>1]) + ' endseq'
            captions[i] = caption


In [None]:
# before preprocess of text
mapping['1000268201_693b08cb0e']

In [None]:
# preprocess the text
clean(mapping)

In [None]:
# after preprocess of text
mapping['1000268201_693b08cb0e']

In [None]:
all_captions = []
for key in mapping:
    for caption in mapping[key]:
        all_captions.append(caption)
len(all_captions)

In [None]:
# Tokenize the text
tokenizer = Tokenizer()
tokenizer.fit_on_texts([caption for captions in mapping.values() for caption in captions])
vocab_size = len(tokenizer.word_index) + 1

In [None]:
# get maximum length of the caption available
max_length = max(len(caption.split()) for caption in all_captions)
max_length

In [None]:
# Train-test split
image_ids = list(mapping.keys())
split = int(len(image_ids) * 0.90)
train = image_ids[:split]
test = image_ids[split:]

In [None]:
# Create data generator
def data_generator(data_keys, mapping, features, tokenizer, max_length, vocab_size, batch_size):
    X1, X2, y = [], [], []
    n = 0
    while True:
        for key in data_keys:
            captions = mapping[key]
            for caption in captions:
                seq = tokenizer.texts_to_sequences([caption])[0]
                for i in range(1, len(seq)):
                    in_seq = pad_sequences([seq[:i]], maxlen=max_length)[0]
                    out_seq = to_categorical([seq[i]], num_classes=vocab_size)[0]
                    X1.append(features[key][0])
                    X2.append(in_seq)
                    y.append(out_seq)
            n += 1
            if n == batch_size:
                yield [np.array(X1), np.array(X2)], np.array(y)
                X1, X2, y = [], [], []
                n = 0

In [None]:
from keras.utils import plot_model

# Encoder model
inputs1 = Input(shape=(features[list(features.keys())[0]].shape[-1],))
fe1 = Dropout(0.4)(inputs1)
fe2 = Dense(256, activation='relu')(fe1)

inputs2 = Input(shape=(max_length,))
se1 = Embedding(vocab_size, 256, mask_zero=True)(inputs2)
se2 = Dropout(0.4)(se1)
se3 = LSTM(256)(se2)

# Decoder model
decoder1 = Add()([fe2, se3])
decoder2 = Dense(256, activation='relu')(decoder1)
outputs = Dense(vocab_size, activation='softmax')(decoder2)

model = Model(inputs=[inputs1, inputs2], outputs=outputs)
model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

# Generate and display the diagram
plot_model(model, to_file='model_plot.png', show_shapes=True, show_layer_names=True)

In [None]:
# Set up the checkpoint directory
checkpoint_dir = './training_checkpoint_1'
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")

# Create the checkpoint callback
checkpoint_callback = ModelCheckpoint(filepath=checkpoint_prefix, save_weights_only=True,
                                      monitor='val_accuracy',
                                      mode='max',save_best_only=True)


In [None]:
import os
import tensorflow as tf
from tensorflow.keras.callbacks import ModelCheckpoint

# Set up the checkpoint directory
checkpoint_dir = './training_checkpoint_1'
os.makedirs(checkpoint_dir, exist_ok=True)
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")

# Create the checkpoint callback
checkpoint_callback = ModelCheckpoint(filepath=checkpoint_prefix,
                                      save_weights_only=True,
                                      monitor='val_accuracy',
                                      mode='max',
                                      save_best_only=True)

In [None]:

# Define the number of epochs and batch size
epochs =10
batch_size = 32
steps = len(train) // batch_size

# Check if there are existing checkpoints and load the model
initial_epoch = 0
if os.path.exists(checkpoint_dir):
    latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
    if latest_checkpoint:
        print(f"Resuming from {latest_checkpoint}")
        model.load_weights(latest_checkpoint)
        initial_epoch = int(latest_checkpoint.split('-')[-1]) + 1

# Train the model and store the history
history = model.fit(
    data_generator(train, mapping, features, tokenizer, max_length, vocab_size, batch_size),
    epochs=epochs,
    steps_per_epoch=steps,
    initial_epoch=initial_epoch,
    callbacks=[checkpoint_callback],
    verbose=1
)


In [None]:
# Save the model
model.save(os.path.join(WORKING_DIR, 'best_model_blip_o1.h5'))

In [None]:
# Function to plot the accuracy and loss graphs
def plot_history(history):
    # Plot accuracy
    plt.figure(figsize=(12, 5))
    plt.subplot(1, 2, 1)
    plt.plot(history.history['accuracy'], label='accuracy')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy')
    plt.title('Model Accuracy')
    plt.legend()

    # Plot loss
    plt.subplot(1, 2, 2)
    plt.plot(history.history['loss'], label='loss')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.title('Model Loss')
    plt.legend()

    plt.show()

# Plot the history
plot_history(history)

In [None]:
# Function to predict captions
def predict_caption(model, image, tokenizer, max_length):
    in_text = 'startseq'
    for i in range(max_length):
        sequence = tokenizer.texts_to_sequences([in_text])[0]
        sequence = pad_sequences([sequence], max_length)
        yhat = model.predict([image, sequence], verbose=0)
        yhat = np.argmax(yhat)
        word = next((w for w, idx in tokenizer.word_index.items() if idx == yhat), None)
        if word is None or word == 'endseq':
            break
        in_text += " " + word
    return in_text

In [None]:
# Calculate BLEU scores for model evaluation
actual, predicted = [], []
for key in tqdm(test):
    captions = mapping[key]
    y_pred = predict_caption(model, features[key], tokenizer, max_length).split()
    actual_captions = [caption.split() for caption in captions]
    actual.append(actual_captions)
    predicted.append(y_pred)

print("BLEU-1: %f" % corpus_bleu(actual, predicted, weights=(1.0, 0, 0, 0)))
print("BLEU-2: %f" % corpus_bleu(actual, predicted, weights=(0.5, 0.5, 0, 0)))
print("BLEU-3: %f" % corpus_bleu(actual, predicted, weights=(1/3, 1/3, 1/3, 0)))
print("BLEU-4: %f" % corpus_bleu(actual, predicted, weights=(0.25, 0.25, 0.25, 0.25)))

In [None]:
import os
from PIL import Image
import matplotlib.pyplot as plt
from tkinter import Tk, filedialog

# Function to open a file dialog and select an image
def select_image():
    root = Tk()
    root.withdraw()  # Hide the root window
    file_path = filedialog.askopenfilename(
        initialdir=os.path.expanduser("~/Desktop"),
        title="Select an image",
        filetypes=(("jpeg files", "*.jpg"), ("all files", "*.*"))
    )
    return file_path

# Function to generate caption for the selected image
def generate_caption(image_path):
    image_name = os.path.basename(image_path)
    image_id = image_name.split('.')[0]
    img_path = os.path.join(BASE_DIR, "Flicker 8K Dataset/Images", image_name)
    image = Image.open(img_path)

    print('--- Actual Captions ---')
    for caption in mapping[image_id]:
        print(caption)

    y_pred = predict_caption(model, features[image_id], tokenizer, max_length)
    print('--- Predicted Caption ---')
    print(y_pred)

    plt.imshow(image)
    plt.show()

# Example usage
if __name__ == "__main__":
    image_path = select_image()
    if image_path:
        generate_caption(image_path)


In [None]:
# Function to generate caption for a given image
def generate_caption(image_name):
    image_id = image_name.split('.')[0]
    img_path = os.path.join(BASE_DIR, "Flicker 8K Dataset/Images", image_name)
    image = Image.open(img_path)
    print('--- Actual Captions ---')
    for caption in mapping[image_id]:
        print(caption)

    y_pred = predict_caption(model, features[image_id], tokenizer, max_length)
    print('--- Predicted Caption ---')
    print(y_pred)

    plt.imshow(image)
    plt.show()


In [None]:
generate_caption("1057251835_6ded4ada9c.jpg")

In [None]:
generate_caption("1191338263_a4fa073154.jpg")

In [None]:
generate_caption("1343426964_cde3fb54e8.jpg")

In [None]:
generate_caption("1404832008_68e432665b.jpg")

In [None]:
generate_caption("109202801_c6381eef15.jpg")