In [None]:
import tensorflow as tf
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.applications.inception_v3 import InceptionV3
import tensorflow.keras.applications.inception_v3
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Dense, Embedding
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.losses import SparseCategoricalCrossentropy
from tensorflow.nn import relu,softmax,tanh
from sklearn.model_selection import train_test_split
import tensorflow.keras.preprocessing.image
import matplotlib.pyplot as plt
import numpy as np
from PIL import Image
import pickle
import nltk
import cv2
import string
import os
import glob
import time
from tqdm import tqdm
from google.colab.patches import cv2_imshow

In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


In [None]:
#PATH TO ROOT FOLDER
root_captioning="/content/gdrive/My Drive/projects/captions/"
image_path="/content/gdrive/My Drive/projects/captions/Flicker8k_Dataset"

In [None]:
#LIST OF ALL THE IMAGES
img_names1=glob.glob(os.path.join(image_path,'*.jpg'))

In [None]:
def img_slices(img):
  img= tf.data.Dataset.from_tensor_slices(img)
  return img

def decode_image(img):
  img=tf.image.decode_jpeg(img,channels=3)
  img=tf.image.resize(img,(299,299))
  return img

def inception_preprocess(img):
  img=tf.keras.applications.inception_v3.preprocess_input(img)
  return img
  
def preprocess(image_dataset):
  img=tf.io.read_file(image_dataset)
  img=decode_image(img)
  img=inception_preprocess(img)
  return img,image_dataset

def extract_features(img):
  feat=inception_model(img)
  return feat

def features(image_path_list):
  image_dataset=img_slices(image_path_list)
  image_dataset=image_dataset.map(preprocess,num_parallel_calls=tf.data.experimental.AUTOTUNE).batch(16)
  for img,path in image_dataset:
    batch_features=extract_features(img)
    batch_features=tf.reshape(batch_features,(batch_features.shape[0],-1,batch_features.shape[3]))
    for bf,p in zip(batch_features,path):
      path_of_feature=p.numpy().decode('utf-8')
      np.save(path_of_feature,bf.numpy())

In [None]:
#features(img_names1)

In [None]:
#Make Dictionary Of All Items
captions =open('/content/gdrive/My Drive/projects/captions/Flickr8k_text/Flickr8k.lemma.token1.txt').read().split("\n")

In [None]:
#PreProcessed Dictionary
pre_processed_captions=dict()
for i in captions:
  token=i.split()
  if len(i)>=2:
    id=token[0].split('#')[0]
    img_path=os.path.join(image_path,id)
    description=token[1:]
    description=[i1.strip() for i1 in description]
    cap=list()
    cap.append(description)
    

    if img_path not in pre_processed_captions:
      pre_processed_captions[img_path]=list()
    pre_processed_captions[img_path].append([item for sublist in cap for item in sublist])

In [None]:
#CAPTIONS LIST
captions_list=[]
for i in captions:
  token=i.split()
  if len(i)>=2:
    description=token[1:]
    listToStr = ' '.join(map(str, description))
    listToStr = listToStr.split('.')[0]
    caption='<start> '+listToStr+' <end>'
    captions_list.append(caption)

In [None]:
def tokenization(max_number_words):
  token=Tokenizer(num_words=max_number_words,oov_token='<unk>',filters='!"#$%&()*+.,-/:;=?@[\]^_`{|}~ ')
  return token

def maxlength(sequences):
  m11=max(len(s) for s in sequences)
  return m11

def padsequences(sequences,max_len):
  pad11=pad_sequences(sequences,padding='post',maxlen=max_len)
  return pad11

In [None]:
#TOKENIZE THE TOP 5000 WORDS
max_number_words=5000
tokenizer=tokenization(max_number_words)
tokenizer.fit_on_texts(captions_list)
tokenizer.word_index['<pad>'] = 0
tokenizer.index_word[0] = '<pad>'
sequences=tokenizer.texts_to_sequences(captions_list)
max_len=maxlength(sequences)
padded_captions=pad_sequences(sequences,max_len)

In [None]:
#GIVE THE IMG_IDS WHOLE IMAGE PATH
img_names1=list()
for i in captions:
  token=i.split()
  if len(i)>=2:
    i1=i.split('#')[0]
    img_path=os.path.join(image_path,i1)
    img_names1.append(img_path)

In [None]:
#DIVIDE THE DATASET INTO TRAIN AND TEST
img_names_train, img_names_test, captions_train, captions_test = train_test_split(img_names1, padded_captions, test_size=0.25, random_state=123)

In [None]:
#DEFINE INCEPTION MODEL
inceptionV3 = InceptionV3(include_top=False, weights='imagenet')
inception_model = Model(inceptionV3.input, inceptionV3.output)

In [None]:
#GET POSITIONAL ANGLES
def get_angles(pos, i, d_model):
    angle_rates = 1 / np.power(10000, (2 * (i//2)) / np.float32(d_model))
    return pos * angle_rates

In [None]:
#DEFINE POSITIONAL ENCODING
def positional_encoding(position, d_model):
    angle_rads = get_angles(np.arange(position)[:, np.newaxis], np.arange(d_model)[np.newaxis, :], d_model)
    angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])
    angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])  
    pos_encoding = angle_rads[np.newaxis, ...] 
    return tf.cast(pos_encoding, dtype=tf.float32)

In [None]:
#CREATE PADDING MASK
def create_padding_mask(seq):
    seq = tf.cast(tf.math.equal(seq, 0), tf.float32)
    return seq[:, tf.newaxis, tf.newaxis, :]

In [None]:
#CREATE LOOKAHEAD MASK
def create_look_ahead_mask(size):
    mask = 1 - tf.linalg.band_part(tf.ones((size, size)), -1, 0)
    return mask

In [None]:
# CREATE SCALED DOT PRODUCT ATTENTION
def scaled_dot_product_attention(q, k, v, mask):
    matmul_qk = tf.matmul(q, k, transpose_b=True)
    dk = tf.cast(tf.shape(k)[-1], tf.float32)
    scaled_attention_logits = matmul_qk / tf.math.sqrt(dk)
    if mask is not None:
        scaled_attention_logits += (mask * -1e9)  
    attention_weights = tf.nn.softmax(scaled_attention_logits, axis=-1)
    output = tf.matmul(attention_weights, v)  
    return output, attention_weights

In [None]:
# CREATE MULTIHEAD ATTENTION
class MultiHeadAttention(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads):
        super(MultiHeadAttention, self).__init__()
        self.num_heads = num_heads
        self.d_model = d_model
        assert d_model % self.num_heads == 0
        self.depth = d_model // self.num_heads
        self.wq = tf.keras.layers.Dense(d_model)
        self.wk = tf.keras.layers.Dense(d_model)
        self.wv = tf.keras.layers.Dense(d_model)
        self.dense = tf.keras.layers.Dense(d_model)
        
    def split_heads(self, x, batch_size):
        x = tf.reshape(x, (batch_size, -1, self.num_heads, self.depth))
        return tf.transpose(x, perm=[0, 2, 1, 3])
    
    def call(self, v, k, q, mask):
        batch_size = tf.shape(q)[0]
        q = self.wq(q)
        k = self.wk(k)
        v = self.wv(v)
        q = self.split_heads(q, batch_size)
        k = self.split_heads(k, batch_size)
        v = self.split_heads(v, batch_size)
        scaled_attention, attention_weights = scaled_dot_product_attention(q, k, v, mask)
        scaled_attention = tf.transpose(scaled_attention, perm=[0, 2, 1, 3])
        concat_attention = tf.reshape(scaled_attention,(batch_size, -1, self.d_model))
        output = self.dense(concat_attention)
        return output, attention_weights

In [None]:
# POINT WISE FEED FORWARD NEURAL NETWORK
def point_wise_feed_forward_network(d_model, dff):
    return tf.keras.Sequential([
        tf.keras.layers.Dense(dff, activation='relu'),
        tf.keras.layers.Dense(d_model)
    ])

In [None]:
# CREATE ENCODER LAYER
class EncoderLayer(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads, dff, rate=0.1):
        super(EncoderLayer, self).__init__()
        self.mha = MultiHeadAttention(d_model, num_heads)
        self.ffn = point_wise_feed_forward_network(d_model, dff)
        self.layernorm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.dropout1 = tf.keras.layers.Dropout(rate)
        self.dropout2 = tf.keras.layers.Dropout(rate)
    
    def call(self, x, training, mask):
        attn_output, _ = self.mha(x, x, x, mask)
        attn_output = self.dropout1(attn_output, training=training)
        out1 = self.layernorm1(x + attn_output)
        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output, training=training)
        out2 = self.layernorm2(out1 + ffn_output)
        return out2

In [None]:
# CREATE DECODER LAYER
class DecoderLayer(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads, dff, rate=0.1):
        super(DecoderLayer, self).__init__()
        self.mha1 = MultiHeadAttention(d_model, num_heads)
        self.mha2 = MultiHeadAttention(d_model, num_heads)
        self.ffn = point_wise_feed_forward_network(d_model, dff)
        self.layernorm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.layernorm3 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.dropout1 = tf.keras.layers.Dropout(rate)
        self.dropout2 = tf.keras.layers.Dropout(rate)
        self.dropout3 = tf.keras.layers.Dropout(rate)   
    
    def call(self, x, enc_output, training, look_ahead_mask, padding_mask):
        attn1, attn_weights_block1 = self.mha1(x, x, x, look_ahead_mask)
        attn1 = self.dropout1(attn1, training=training)
        out1 = self.layernorm1(attn1 + x)    
        attn2, attn_weights_block2 = self.mha2(
            enc_output, enc_output, out1, padding_mask)
        attn2 = self.dropout2(attn2, training=training)
        out2 = self.layernorm2(attn2 + out1)
        ffn_output = self.ffn(out2)
        ffn_output = self.dropout3(ffn_output, training=training)
        out3 = self.layernorm3(ffn_output + out2)
        return out3, attn_weights_block1, attn_weights_block2

In [None]:
# CREATE ENCODER MODULE
class Encoder(tf.keras.layers.Layer):
    def __init__(self, num_layers, d_model, num_heads, dff, input_vocab_size, maximum_position_encoding, rate=0.1):
        super(Encoder, self).__init__()
        self.d_model = d_model
        self.num_layers = num_layers
        self.embedding = tf.keras.layers.Embedding(input_vocab_size, d_model)
        self.pos_encoding = positional_encoding(maximum_position_encoding,self.d_model)
        self.enc_layers = [EncoderLayer(d_model, num_heads, dff, rate) for _ in range(num_layers)]      
        self.dropout = tf.keras.layers.Dropout(rate)
        self.embedding2 = Dense(d_model)
        
    def call(self, x, training, mask):
        seq_len = tf.shape(x)[1]
        x = self.embedding2(x)
        x = relu(x)
        x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
        x += self.pos_encoding[:, :seq_len, :]
        x = self.dropout(x, training=training)    
        for i in range(self.num_layers):
            x = self.enc_layers[i](x, training, mask)  
        return x

In [None]:
# CREATE DECODER MODULE
class Decoder(tf.keras.layers.Layer):
    def __init__(self, num_layers, d_model, num_heads, dff, target_vocab_size, maximum_position_encoding, rate=0.1):
        super(Decoder, self).__init__()
        self.d_model = d_model
        self.num_layers = num_layers
        self.embedding = tf.keras.layers.Embedding(target_vocab_size, d_model)
        self.pos_encoding = positional_encoding(maximum_position_encoding, d_model)
        self.dec_layers = [DecoderLayer(d_model, num_heads, dff, rate) for _ in range(num_layers)]
        self.dropout = tf.keras.layers.Dropout(rate)
    
    def call(self, x, enc_output, training, look_ahead_mask, padding_mask):
        seq_len = tf.shape(x)[1]
        attention_weights = {}
        x = self.embedding(x)
        x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
        x += self.pos_encoding[:, :seq_len, :]
        x = self.dropout(x, training=training)
        for i in range(self.num_layers):
            x, block1, block2 = self.dec_layers[i](x, enc_output, training, look_ahead_mask, padding_mask)
            
            attention_weights['decoder_layer{}_block1'.format(i+1)] = block1
            attention_weights['decoder_layer{}_block2'.format(i+1)] = block2
        return x, attention_weights


In [None]:
#CREATE TRANSFORMER 
class Transformer(tf.keras.Model):
    def __init__(self, num_layers, d_model, num_heads, dff, input_vocab_size, target_vocab_size, pe_input, pe_target, rate=0.1):
        super(Transformer, self).__init__()
        self.encoder = Encoder(num_layers, d_model, num_heads, dff, input_vocab_size, pe_input, rate)
        self.decoder = Decoder(num_layers, d_model, num_heads, dff, target_vocab_size, pe_target, rate)
        self.final_layer = tf.keras.layers.Dense(target_vocab_size)
    
    def call(self, inp, tar, training, enc_padding_mask, look_ahead_mask, dec_padding_mask):
        enc_output = self.encoder(inp, training, enc_padding_mask)
        dec_output, attention_weights = self.decoder(tar, enc_output, training, look_ahead_mask, dec_padding_mask)    
        final_output = self.final_layer(dec_output)
        return final_output, attention_weights

In [None]:
#SET HYPER PARAMETERS
batch_size = 128
num_layers = 4
d_model = 128
buffer_size = 1000
dff = 256
num_heads = 4
target_vocab_size = 5001
input_vocab_size = target_vocab_size
dropout_rate = 0

In [None]:
#DEFINE TRANSFORMER
transformer = Transformer(num_layers, d_model, num_heads, dff, 
                          input_vocab_size, target_vocab_size, 
                          pe_input=input_vocab_size, 
                          pe_target=target_vocab_size, 
                          rate=dropout_rate)

In [None]:
#CREATE MASK
def create_masks(inp, tar):
    enc_padding_mask = create_padding_mask(inp)

    dec_padding_mask = create_padding_mask(inp)

    look_ahead_mask = create_look_ahead_mask(tf.shape(tar)[1])
    dec_target_padding_mask = create_padding_mask(tar)
    combined_mask = tf.maximum(dec_target_padding_mask, look_ahead_mask)

    return enc_padding_mask, combined_mask, dec_padding_mask

In [None]:
#CREATE MAP FUNCTION
def map_func(img_name, cap):
    img_tensor = np.load(img_name.decode('utf-8')+'.npy')
    return img_tensor, cap

In [None]:
#CREATE DATASET
dataset = tf.data.Dataset.from_tensor_slices((img_names_train, captions_train))

dataset = dataset.map(lambda item1, item2: tf.numpy_function(
    map_func, [item1, item2], [tf.float32, tf.int32]),
    num_parallel_calls=tf.data.experimental.AUTOTUNE)

dataset = dataset.shuffle(buffer_size).batch(batch_size)
dataset = dataset.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)

In [None]:
#CUSTOM HYPER PARAMETER TUNER
class CustomSchedule(tf.keras.optimizers.schedules.LearningRateSchedule):
  def __init__(self, d_model, warmup_steps=5):
    super(CustomSchedule, self).__init__()

    self.d_model = d_model
    self.d_model = tf.cast(self.d_model, tf.float32)

    self.warmup_steps = warmup_steps

  def __call__(self, step):
    arg1 = tf.math.rsqrt(step)
    arg2 = step * (self.warmup_steps ** -1.5)

    return tf.math.rsqrt(self.d_model) * tf.math.minimum(arg1, arg2)

In [None]:
#DEFINE LEARNING RATE
learning_rate = CustomSchedule(d_model)

optimizer = tf.keras.optimizers.Adam(learning_rate, beta_1=0.9, beta_2=0.98, 
                                     epsilon=1e-9)

In [None]:
#CALCULATE LOSS AND ACCURACY
loss_object = SparseCategoricalCrossentropy(from_logits=True, reduction='none')

# custom-loss function
def loss_function(real, pred):
    mask = tf.math.logical_not(tf.math.equal(real, 0))
    loss_ = loss_object(real, pred)
    mask = tf.cast(mask, dtype=loss_.dtype)
    loss_ *= mask
    return tf.reduce_sum(loss_)/tf.reduce_sum(mask)

def accuracy_function(real, pred):
    accuracies = tf.equal(tf.cast(real,tf.float32), tf.cast(tf.argmax(pred, axis=2),tf.float32))

    mask = tf.math.logical_not(tf.math.equal(real, 0))
    accuracies = tf.math.logical_and(mask, accuracies)

    accuracies = tf.cast(accuracies, dtype=tf.float32)
    mask = tf.cast(mask, dtype=tf.float32)
    return tf.reduce_sum(accuracies)/tf.reduce_sum(mask)

In [None]:
#DEFINE CHECK POINT
chkpt_path = '/content/gdrive/My Drive/transformer/train'
chkpt = tf.train.Checkpoint(transformer=transformer, optimizer=optimizer)
chkpt_manager = tf.train.CheckpointManager(chkpt, chkpt_path, max_to_keep=1)
if chkpt_manager.latest_checkpoint:
    print("Found a checkpoint")
    chkpt.restore(chkpt_manager.latest_checkpoint)

Found a checkpoint


In [None]:
#CREATE TRAIN STEP FOR LOSS FUNCTION
@tf.function
def train_step(inp, tar):
    tar_inp = tar[:, :-1]
    tar_real = tar[:, 1:]
    _, combined_mask, _ = create_masks(inp, tar_inp)  
    with tf.GradientTape() as tape:
        predictions, _ = transformer(inp, tar_inp, True, None, combined_mask, None)
        loss = loss_function(tar_real, predictions)
        gradients = tape.gradient(loss, transformer.trainable_variables)    
        optimizer.apply_gradients(zip(gradients, transformer.trainable_variables))
    return loss

In [None]:
#CREATE TRAIN STEP FOR ACCURACY FUNCTION
@tf.function
def train_step1(inp, tar):
    tar_inp = tar[:, :-1]
    tar_real = tar[:, 1:]
    _, combined_mask, _ = create_masks(inp, tar_inp)  
    with tf.GradientTape() as tape:
        predictions, _ = transformer(inp, tar_inp, True, None, combined_mask, None)
        accuracy = accuracy_function(tar_real, predictions)
    return accuracy

In [None]:
#GENERATOR TO RUN THE DATA
EPOCHS = 199
start_epoch=0
train_losses=[]
for epoch in range(start_epoch, EPOCHS+1):
    start = time.time()
    total_loss = 0

    for (batch, (img_tensor, target)) in enumerate(dataset):
        batch_loss = train_step(img_tensor, target)
        batch_acc = train_step1(img_tensor, target)
        total_loss += batch_loss

        if batch % 50 == 0:
            print ('Epoch {} Batch {} Loss {:.4f}'.format(
              epoch + 1, batch, batch_loss.numpy() / int(target.shape[1])))
            
            print ('Epoch {} Batch {} Accuracy {:.4f}'.format(
              epoch + 1, batch, batch_acc.numpy() / int(target.shape[1])))
    # storing the epoch end loss value to plot later
    
    train_losses.append(total_loss/(len(img_names_train) // batch_size))
    if epoch % 5 == 0:
      pickle.dump(train_losses, open("/content/gdrive/My Drive/transformer/losses.p", "wb"))
      chkpt_manager.save()

    print ('Epoch {} Loss {:.6f}'.format(epoch + 1,total_loss/(len(img_names_train) // batch_size)))
    print ('Time taken for 1 epoch {} sec\n'.format(time.time() - start))

Epoch 1 Batch 0 Loss 0.1288
Epoch 1 Batch 0 Accuracy 0.0035
Epoch 1 Batch 50 Loss 0.1287
Epoch 1 Batch 50 Accuracy 0.0035
Epoch 1 Batch 100 Loss 0.1298
Epoch 1 Batch 100 Accuracy 0.0035
Epoch 1 Batch 150 Loss 0.1278
Epoch 1 Batch 150 Accuracy 0.0036
Epoch 1 Batch 200 Loss 0.1290
Epoch 1 Batch 200 Accuracy 0.0032
Epoch 1 Loss 5.014317
Time taken for 1 epoch 84.32505011558533 sec

Epoch 2 Batch 0 Loss 0.1301
Epoch 2 Batch 0 Accuracy 0.0034
Epoch 2 Batch 50 Loss 0.1283
Epoch 2 Batch 50 Accuracy 0.0033
Epoch 2 Batch 100 Loss 0.1257
Epoch 2 Batch 100 Accuracy 0.0037
Epoch 2 Batch 150 Loss 0.1291
Epoch 2 Batch 150 Accuracy 0.0034
Epoch 2 Batch 200 Loss 0.1271
Epoch 2 Batch 200 Accuracy 0.0035
Epoch 2 Loss 5.015590
Time taken for 1 epoch 66.1200020313263 sec

Epoch 3 Batch 0 Loss 0.1300
Epoch 3 Batch 0 Accuracy 0.0033
Epoch 3 Batch 50 Loss 0.1275
Epoch 3 Batch 50 Accuracy 0.0034
Epoch 3 Batch 100 Loss 0.1279
Epoch 3 Batch 100 Accuracy 0.0035
Epoch 3 Batch 150 Loss 0.1281
Epoch 3 Batch 150 Acc