In [None]:
!wget -q https://github.com/jbrownlee/Datasets/releases/download/Flickr8k/Flickr8k_Dataset.zip
!wget -q https://github.com/jbrownlee/Datasets/releases/download/Flickr8k/Flickr8k_text.zip
!unzip -qq Flickr8k_Dataset.zip
!unzip -qq Flickr8k_text.zip
!rm Flickr8k_Dataset.zip Flickr8k_text.zip

In [None]:
import tensorflow as tf

# You'll generate plots of attention in order to see which parts of an image
# our model focuses on during captioning
import matplotlib.pyplot as plt

# Scikit-learn includes many helpful utilities
from sklearn.model_selection import train_test_split
from sklearn.utils import shuffle
from skimage import io

import collections
import random
import numpy as np
import pandas as pd
import os
import time
import json
from PIL import Image
import re
from glob import glob
import pickle
from os import listdir
from collections import Counter

from pickle import dump
from keras.preprocessing.image import load_img
from keras.preprocessing.image import img_to_array
from keras.models import Model

In [None]:
# Use this if you're working with official EN Repo
# import efficientnet.tfkeras as efn 

In [None]:
# Run this if want to use TPU, skip if you're using GPU

In [None]:
# try:
#   tpu = tf.distribute.cluster_resolver.TPUClusterResolver()  # TPU detection
#   print('Running on TPU ', tpu.cluster_spec().as_dict()['worker'])
# except ValueError:
#   raise BaseException('ERROR: Not connected to a TPU runtime; please see the previous cell in this notebook for instructions!')

# tf.config.experimental_connect_to_cluster(tpu)
# tf.tpu.experimental.initialize_tpu_system(tpu)
# tpu_strategy = tf.distribute.experimental.TPUStrategy(tpu)

In [None]:
PATH = './Flicker8k_Dataset/'
annotations = './Flickr8k.token.txt'

In [None]:
def image_filter(image_path):
  v= list()
  for i in image_path:
    s= os.path.basename(i)
    s = s.split('.')[0]
    v.append(s)
  return v

In [None]:
all_img_name_vector = glob(PATH + '*.jpg')
print("The total images present in the dataset: {}".format(len(all_img_name_vector)))
print(all_img_name_vector[-1])

In [None]:
all_img_name_vector_filtered= image_filter(all_img_name_vector)

In [None]:
print(all_img_name_vector_filtered[-1])
print(all_img_name_vector[-1])

In [None]:
def plot_image(images, captions=None, cmap=None ):
  """
        Parameters:
              images (list of str): A list of the path of images

        Returns:
              None
  """
  f, axes = plt.subplots(1, len(images), sharey=True)
  f.set_figwidth(15)
   
  for ax,image in zip(axes, images):
      ax.imshow(io.imread(image), cmap)
        
plot_image(all_img_name_vector[8089:])

In [None]:
len(all_img_name_vector)

In [None]:
# load doc into memory
def load_doc(filename):
  # open the file as read only
  file = open(filename, 'r')
  # read all text
  text = file.read()
  # close the file
  file.close()
  return text
 
# load descriptions
doc = load_doc(annotations)

In [None]:
# extract descriptions for images
def load_descriptions(doc):
	mapping = dict()
	# process lines
	for line in doc.split('\n'):
		# split line by white space
		tokens = line.split()
		if len(line) < 2:
			continue
		# take the first token as the image id, the rest as the description
		image_id, image_desc = tokens[0], tokens[1:]
		# remove filename from image id
		image_id = image_id.split('.')[0]
		# convert description tokens back to string
		image_desc = ' '.join(image_desc)
		# create the list if needed
		if image_id not in mapping:
			mapping[image_id] = list()
		# store description
		mapping[image_id].append(image_desc)
	return mapping
 
# parse descriptions
descriptions = load_descriptions(doc)
print('Loaded: %d ' % len(descriptions))

In [None]:
def Preprocessing(image_tag, captions):
  """
        Parameters:
              image_tag (list of str): A list of tags of filtered images
              captions (list of str): A list of preprocessed cations
        Returns:
              train_captions (list of str): A list of captions ready for training
              img_name_vector (list of str): A list of image tags correseponing to train_captions
  """
  train_captions = []
  img_name_vector = []

  for image_path in image_tag:
    caption_list = captions[image_path]
    train_captions.extend(caption_list)
    img_name_vector.extend([image_path] * len(caption_list))

  return train_captions, img_name_vector

In [None]:
def image_augmentation(train_caption):
  """
        Parameters:
              train_captions (list of str): A list of captions ready for training
        Returns:
              Augmented captions with <start> and <end> keys ready for training
  """
  train_captions2= []
  for aug in train_caption:
    caption = f"<start> {aug} <end>"
    train_captions2.append(caption)

  return train_captions2

In [None]:
train_captions, img_name_vector = Preprocessing(all_img_name_vector_filtered, descriptions)

In [None]:
train_captions2= image_augmentation(train_captions)

In [None]:
# the different between tf example and this is i use the pic number and tf example use the full path
# i use all_img_name_vector_filtered and tf use all_img_name_vector so keep that in mind
# so i need to add PATH + img_name_vector[i] + '.jpg'

In [None]:
print(train_captions[0])
Image.open(PATH + img_name_vector[0] + '.jpg')

In [None]:
#Debugging

refined = list()
for i in range(len(img_name_vector)):
  refined.append(PATH + img_name_vector[i] + '.jpg')


In [None]:
refined[0]

In [None]:
# Use this when running the offical repo

# target_size = (300, 300,3)
# def load_image(image_path):
#     # image_path = PATH + image + '.jpg'
#     img = tf.io.read_file(image_path)
#     img = tf.image.decode_jpeg(img, channels=3)
#     img = tf.image.resize(img, (target_size[0],target_size[1])) 
#     img = efn.preprocess_input(img)
    
#     return img, image_path

In [None]:
# Use 'with open' when running on TPU instead of tf.io.read_file(image_path)

target_size = (300, 300,3)
def load_image(image_path):
    # with open(image_path, "rb") as local_file: 
    #   raw = local_file.read()
    raw = tf.io.read_file(image_path)
    img = tf.image.decode_jpeg(raw, channels=3)
    img = tf.image.resize(img, (target_size[0], target_size[1]))
    img = tf.keras.applications.efficientnet.preprocess_input(img)
    return img, image_path
# load_image('/content/Flicker8k_Dataset/1357689954_72588dfdc4.jpg')

In [None]:
pp= '../input/efficientnet-keras-noisystudent-weights-b0b7/noisystudent/noisy.student.notop-b3.h5'
image_model= tf.keras.applications.EfficientNetB3(weights=pp, input_shape=target_size, include_top=False)
new_input = image_model.input
hidden_layer = image_model.layers[-1].output

image_features_extract_model = tf.keras.Model(new_input, hidden_layer)

In [None]:
# Use it with EN Repo, note that the weights are alr in it

# image_model = efn.EfficientNetB3(weights='noisy-student', input_shape=target_size, include_top=False)

# new_input = image_model.input
# hidden_layer = image_model.layers[-1].output

# image_features_extract_model = tf.keras.Model(new_input, hidden_layer)

In [None]:
!pip install tqdm

In [None]:
from tqdm import tqdm

In [None]:
# # Get unique images
encode_train = sorted(set(all_img_name_vector))

# Feel free to change batch_size according to your system configuration
image_dataset = tf.data.Dataset.from_tensor_slices(encode_train)
image_dataset = image_dataset.map(load_image, num_parallel_calls=tf.data.AUTOTUNE).batch(16)

In [None]:
for img, path in tqdm(image_dataset):
  batch_features = image_features_extract_model(img)
  batch_features = tf.reshape(batch_features,
                              (batch_features.shape[0], -1, batch_features.shape[3]))

  for bf, p in zip(batch_features, path):
    path_of_feature = p.numpy().decode("utf-8")
    np.save(path_of_feature, bf.numpy())

In [None]:
# Preprocess and tokenize the captions

In [None]:
# Find the maximum length of any caption in the dataset
def calc_max_length(tensor):
    return max(len(t) for t in tensor)

In [None]:
# Choose the top 5000 words from the vocabulary
top_k = 5000
tokenizer = tf.keras.preprocessing.text.Tokenizer(num_words=top_k,
                                                  oov_token="<unk>",
                                                  filters='!"#$%&()*+.,-/:;=?@[\]^_`{|}~')
tokenizer.fit_on_texts(train_captions2)

In [None]:
tokenizer.word_index['<pad>'] = 0
tokenizer.index_word[0] = '<pad>'

In [None]:
# Create the tokenized vectors
train_seqs = tokenizer.texts_to_sequences(train_captions2)

In [None]:
# Pad each vector to the max_length of the captions
# If you do not provide a max_length value, pad_sequences calculates it automatically
cap_vector = tf.keras.preprocessing.sequence.pad_sequences(train_seqs, padding='post')

In [None]:
# Calculates the max_length, which is used to store the attention weights
max_length = calc_max_length(train_seqs)

In [None]:
# Split the data into training, validation and testing

In [None]:
img_to_cap_vector = collections.defaultdict(list)
for img, cap in zip(refined, cap_vector):
  img_to_cap_vector[img].append(cap)

# Create training and validation sets using an 80-20 split randomly.
img_keys = list(img_to_cap_vector.keys())
random.shuffle(img_keys)

slice_index = int(len(img_keys)*0.8)
img_name_train_keys, img_name_val_key = img_keys[:slice_index], img_keys[slice_index:]

slice_index2 = int(len(img_name_val_key)*0.5)
img_name_test_keys, img_name_val_keys = img_name_val_key[:slice_index2], img_name_val_key[slice_index2:]

img_name_train = []
cap_train = []
for imgtrain in img_name_train_keys:
  cap_train_len = len(img_to_cap_vector[imgtrain])
  img_name_train.extend([imgtrain] * cap_train_len)
  cap_train.extend(img_to_cap_vector[imgtrain])

img_name_val = []
cap_val = []
for imgv in img_name_val_keys:
  capv_len = len(img_to_cap_vector[imgv])
  img_name_val.extend([imgv] * capv_len)
  cap_val.extend(img_to_cap_vector[imgv])

img_name_test = []
cap_test = []
for imgtest in img_name_test_keys:
  cap_test_len = len(img_to_cap_vector[imgtest])
  img_name_test.extend([imgtest] * cap_test_len)
  cap_test.append(img_to_cap_vector[imgtest])

In [None]:
len(img_name_train), len(cap_train), len(img_name_val), len(cap_val), len(img_name_test), len(cap_test)

In [None]:
# Create a tf.data dataset for training

In [None]:
# Feel free to change these parameters according to your system's configuration

BATCH_SIZE = 64
BUFFER_SIZE = 1000
embedding_dim = 256
units = 512
vocab_size = top_k + 1
num_steps = len(img_name_train) // BATCH_SIZE
# Shape of the vector extracted from InceptionV3 is (64, 2048)
features_shape = 2048   # 1536
attention_features_shape = 100 # EN b3 is 100

In [None]:
# Load the numpy files
def map_func(img_name, cap):
  img_tensor = np.load(img_name.decode('utf-8')+'.npy')
  return img_tensor, cap

In [None]:
dataset = tf.data.Dataset.from_tensor_slices((img_name_train, cap_train))

# Use map to load the numpy files in parallel
dataset = dataset.map(lambda item1, item2: tf.numpy_function(
          map_func, [item1, item2], [tf.float32, tf.int32]),
          num_parallel_calls=tf.data.AUTOTUNE)

# Shuffle and batch
dataset = dataset.shuffle(BUFFER_SIZE).batch(BATCH_SIZE)
dataset = dataset.prefetch(buffer_size=tf.data.AUTOTUNE)

In [None]:
# Model

In [None]:
class BahdanauAttention(tf.keras.Model):
  def __init__(self, units):
    super(BahdanauAttention, self).__init__()
    self.W1 = tf.keras.layers.Dense(units)
    self.W2 = tf.keras.layers.Dense(units)
    self.V = tf.keras.layers.Dense(1)

  def call(self, features, hidden):
    # features(CNN_encoder output) shape == (batch_size, 64, embedding_dim)

    # hidden shape == (batch_size, hidden_size)
    # hidden_with_time_axis shape == (batch_size, 1, hidden_size)
    hidden_with_time_axis = tf.expand_dims(hidden, 1)

    # attention_hidden_layer shape == (batch_size, 64, units)
    attention_hidden_layer = (tf.nn.tanh(self.W1(features) +
                                         self.W2(hidden_with_time_axis)))

    # score shape == (batch_size, 64, 1)
    # This gives you an unnormalized score for each image feature.
    score = self.V(attention_hidden_layer)

    # attention_weights shape == (batch_size, 64, 1)
    attention_weights = tf.nn.softmax(score, axis=1)

    # context_vector shape after sum == (batch_size, hidden_size)
    context_vector = attention_weights * features
    context_vector = tf.reduce_sum(context_vector, axis=1)

    return context_vector, attention_weights

In [None]:
class CNN_Encoder(tf.keras.Model):
    # Since you have already extracted the features and dumped it
    # This encoder passes those features through a Fully connected layer
    def __init__(self, embedding_dim):
        super(CNN_Encoder, self).__init__()
        # shape after fc == (batch_size, 64, embedding_dim)
        self.fc = tf.keras.layers.Dense(embedding_dim)

    def call(self, x):
        x = self.fc(x)
        x = tf.nn.relu(x)
        return x

In [None]:
class RNN_Decoder(tf.keras.Model):
  def __init__(self, embedding_dim, units, vocab_size):
    super(RNN_Decoder, self).__init__()
    self.units = units

    self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
    self.gru = tf.keras.layers.GRU(self.units,
                                   return_sequences=True,
                                   return_state=True,
                                   recurrent_initializer='glorot_uniform')
    self.fc1 = tf.keras.layers.Dense(self.units)
    self.fc2 = tf.keras.layers.Dense(vocab_size)

    self.attention = BahdanauAttention(self.units)

  def call(self, x, features, hidden):
    # defining attention as a separate model
    context_vector, attention_weights = self.attention(features, hidden)

    # x shape after passing through embedding == (batch_size, 1, embedding_dim)
    x = self.embedding(x)

    # x shape after concatenation == (batch_size, 1, embedding_dim + hidden_size)
    x = tf.concat([tf.expand_dims(context_vector, 1), x], axis=-1)

    # passing the concatenated vector to the GRU
    output, state = self.gru(x)

    # shape == (batch_size, max_length, hidden_size)
    x = self.fc1(output)

    # x shape == (batch_size * max_length, hidden_size)
    x = tf.reshape(x, (-1, x.shape[2]))

    # output shape == (batch_size * max_length, vocab)
    x = self.fc2(x)

    return x, state, attention_weights

  def reset_state(self, batch_size):
    return tf.zeros((batch_size, self.units))

In [None]:
encoder = CNN_Encoder(embedding_dim)
decoder = RNN_Decoder(embedding_dim, units, vocab_size)

In [None]:
optimizer = tf.keras.optimizers.Adam()
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
    from_logits=True, reduction='none')


def loss_function(real, pred):
  mask = tf.math.logical_not(tf.math.equal(real, 0))
  loss_ = loss_object(real, pred)

  mask = tf.cast(mask, dtype=loss_.dtype)
  loss_ *= mask

  return tf.reduce_mean(loss_)

In [None]:
# Checkpoint

In [None]:
checkpoint_path = "./checkpoints/train"
ckpt = tf.train.Checkpoint(encoder=encoder,
                           decoder=decoder,
                           optimizer=optimizer)
ckpt_manager = tf.train.CheckpointManager(ckpt, checkpoint_path, max_to_keep=5)

In [None]:
start_epoch = 0
if ckpt_manager.latest_checkpoint:
  start_epoch = int(ckpt_manager.latest_checkpoint.split('-')[-1])
  # restoring the latest checkpoint in checkpoint_path
  ckpt.restore(ckpt_manager.latest_checkpoint)

In [None]:
# adding this in a separate cell because if you run the training cell
# many times, the loss_plot array will be reset
loss_plot = []

In [None]:
@tf.function
def train_step(img_tensor, target):
  loss = 0

  # initializing the hidden state for each batch
  # because the captions are not related from image to image
  hidden = decoder.reset_state(batch_size=target.shape[0])

  dec_input = tf.expand_dims([tokenizer.word_index['<start>']] * target.shape[0], 1)

  with tf.GradientTape() as tape:
      features = encoder(img_tensor)

      for i in range(1, target.shape[1]):
          # passing the features through the decoder
          predictions, hidden, _ = decoder(dec_input, features, hidden)

          loss += loss_function(target[:, i], predictions)

          # using teacher forcing
          dec_input = tf.expand_dims(target[:, i], 1)

  total_loss = (loss / int(target.shape[1]))

  trainable_variables = encoder.trainable_variables + decoder.trainable_variables

  gradients = tape.gradient(loss, trainable_variables)

  optimizer.apply_gradients(zip(gradients, trainable_variables))

  return loss, total_loss

In [None]:
EPOCHS = 10

for epoch in range(start_epoch, EPOCHS):
    start = time.time()
    total_loss = 0

    for (batch, (img_tensor, target)) in enumerate(dataset):
        batch_loss, t_loss = train_step(img_tensor, target)
        total_loss += t_loss

        if batch % 100 == 0:
            average_batch_loss = batch_loss.numpy()/int(target.shape[1])
            print(f'Epoch {epoch+1} Batch {batch} Loss {average_batch_loss:.4f}')
    # storing the epoch end loss value to plot later
    loss_plot.append(total_loss / num_steps)

    if epoch % 5 == 0:
      ckpt_manager.save()

    print(f'Epoch {epoch+1} Loss {total_loss/num_steps:.6f}')
    print(f'Time taken for 1 epoch {time.time()-start:.2f} sec\n')

In [None]:
plt.plot(loss_plot)
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.title('Loss Plot')
plt.show()

In [None]:
# Caption!

In [None]:
def evaluate(image):
    attention_plot = np.zeros((max_length, attention_features_shape))

    hidden = decoder.reset_state(batch_size=1)

    temp_input = tf.expand_dims(load_image(image)[0], 0)
    img_tensor_val = image_features_extract_model(temp_input)
    img_tensor_val = tf.reshape(img_tensor_val, (img_tensor_val.shape[0],
                                                 -1,
                                                 img_tensor_val.shape[3]))

    features = encoder(img_tensor_val)

    dec_input = tf.expand_dims([tokenizer.word_index['<start>']], 0)
    result = []

    for i in range(max_length):
        predictions, hidden, attention_weights = decoder(dec_input,
                                                         features,
                                                         hidden)

        attention_plot[i] = tf.reshape(attention_weights, (-1, )).numpy()

        predicted_id = tf.random.categorical(predictions, 1)[0][0].numpy()
        result.append(tokenizer.index_word[predicted_id])

        if tokenizer.index_word[predicted_id] == '<end>':
            return result, attention_plot

        dec_input = tf.expand_dims([predicted_id], 0)

    attention_plot = attention_plot[:len(result), :]
    return result, attention_plot

In [None]:
def plot_attention(image, result, attention_plot):
    temp_image = np.array(Image.open(image))

    fig = plt.figure(figsize=(10, 10))

    len_result = len(result)
    for l in range(len_result):
        temp_att = np.resize(attention_plot[l], (10, 10)) # 10 * 10 = 100
        ax = fig.add_subplot(len_result//2, len_result//2, l+1)
        ax.set_title(result[l])
        img = ax.imshow(temp_image)
        ax.imshow(temp_att, cmap='gray', alpha=0.6, extent=img.get_extent())

    plt.tight_layout()
    plt.show()

In [None]:
encoder.save_weights('encoder.h5')
decoder.save_weights('decoder.h5')

In [None]:
# captions on the validation set
rid = np.random.randint(0, len(img_name_val))
image = img_name_val[rid]
real_caption = ' '.join([tokenizer.index_word[i]
                        for i in cap_val[rid] if i not in [0]])
result, attention_plot = evaluate(image)

print('Real Caption:', real_caption)
print('Prediction Caption:', ' '.join(result))
plot_attention(image, result, attention_plot)

In [None]:
# testing 

In [None]:
pathy= '/content/Kindness_FTR.jpg'

result, attention_plot = evaluate(pathy)
print('Prediction Caption:', ' '.join(result))

plot_attention(pathy, result, attention_plot)


In [None]:
Image.open(pathy)

In [None]:
from nltk.translate.bleu_score import corpus_bleu

In [None]:
# run time ~ 30 minutes
def BLEU():
  actual, predicted = list(), list()
  
  for i in range(len(img_name_test_keys)):
    image = img_name_test_keys[i]
    real_caption = []
    for im in cap_test[i]:      
        real_caption.append([tokenizer.index_word[w] for w in im if w n\ot in [0]][1:-1])
    # generate description
    result, _ = evaluate(image)
    # store actual and predicted

    actual.append(real_caption)
#     print(real_caption)
    predicted.append(result[:-1])
#     print(result[:-1])
    print(i)
  # calculate BLEU score
  print('BLEU-1: %f' % corpus_bleu(actual, predicted, weights=(1.0, 0, 0, 0)))
  print('BLEU-2: %f' % corpus_bleu(actual, predicted, weights=(0.5, 0.5, 0, 0)))
  print('BLEU-3: %f' % corpus_bleu(actual, predicted, weights=(0.3, 0.3, 0.3, 0)))
  print('BLEU-4: %f' % corpus_bleu(actual, predicted, weights=(0.25, 0.25, 0.25, 0.25)))

In [None]:
BLEU()