<a href="https://colab.research.google.com/github/John-p-v1999/Sonnet-generator-final/blob/main/caption.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [12]:
import os
import pickle
import numpy as np

########################################################################


def cache(cache_path, fn, *args, **kwargs):
    """
    Cache-wrapper for a function or class. If the cache-file exists
    then the data is reloaded and returned, otherwise the function
    is called and the result is saved to cache. The fn-argument can
    also be a class instead, in which case an object-instance is
    created and saved to the cache-file.
    :param cache_path:
        File-path for the cache-file.
    :param fn:
        Function or class to be called.
    :param args:
        Arguments to the function or class-init.
    :param kwargs:
        Keyword arguments to the function or class-init.
    :return:
        The result of calling the function or creating the object-instance.
    """

    # If the cache-file exists.
    if os.path.exists(cache_path):
        # Load the cached data from the file.
        with open(cache_path, mode='rb') as file:
            obj = pickle.load(file)

        print("- Data loaded from cache-file: " + cache_path)
    else:
        # The cache-file does not exist.

        # Call the function / class-init with the supplied arguments.
        obj = fn(*args, **kwargs)

        # Save the data to a cache-file.
        with open(cache_path, mode='wb') as file:
            pickle.dump(obj, file)

        print("- Data saved to cache-file: " + cache_path)

    return obj


In [28]:
from tensorflow import keras
print(keras.__version__)

2.4.0


In [13]:
import json
import os



########################################################################

# Directory where you want to download and save the data-set.
# Set this before you start calling any of the functions below.
# Use the function set_data_dir() to also update train_dir and val_dir.
data_dir = "/content/gdrive/My Drive/coco/"

# Sub-directories for the training- and validation-sets.

val_dir = "/content/gdrive/My Drive/coco/val2014/"
train_dir="/content/gdrive/My Drive/coco/train2014/"

# Base-URL for the data-sets on the internet.
data_url = "http://images.cocodataset.org/"


########################################################################
# Private helper-functions.

def _load_records(train=False):
    """
    Load the image-filenames and captions
    for either the training-set or the validation-set.
    """

    if train:
        # Training-set.
        filename = "captions_train2014.json"
    else:
        # Validation-set.
        filename = "captions_val2014.json"

    # Full path for the data-file.
    path = os.path.join(data_dir, "annotations", filename)

    # Load the file.
    with open(path, "r", encoding="utf-8") as file:
        data_raw = json.load(file)

    # Convenience variables.
    images = data_raw['images']
    annotations = data_raw['annotations']

    # Initialize the dict for holding our data.
    # The lookup-key is the image-id.
    records = dict()

    # Collect all the filenames for the images.
    for image in images:
        # Get the id and filename for this image.
        image_id = image['id']
        filename = image['file_name']

        # Initialize a new data-record.
        record = dict()

        # Set the image-filename in the data-record.
        record['filename'] = filename

        # Initialize an empty list of image-captions
        # which will be filled further below.
        record['captions'] = list()

        # Save the record using the the image-id as the lookup-key.
        records[image_id] = record

    # Collect all the captions for the images.
    for ann in annotations:
        # Get the id and caption for an image.
        image_id = ann['image_id']
        caption = ann['caption']

        # Lookup the data-record for this image-id.
        # This data-record should already exist from the loop above.
        record = records[image_id]

        # Append the current caption to the list of captions in the
        # data-record that was initialized in the loop above.
        record['captions'].append(caption)

    # Convert the records-dict to a list of tuples.
    records_list = [(key, record['filename'], record['captions'])
                    for key, record in sorted(records.items())]

    # Convert the list of tuples to separate tuples with the data.
    ids, filenames, captions = zip(*records_list)

    return ids, filenames, captions







def load_records(train=False):
    """
    Load the data-records for the data-set. This returns the image ids,
    filenames and text-captions for either the training-set or validation-set.
    
    This wraps _load_records() above with a cache, so if the cache-file already
    exists then it is loaded instead of processing the original data-file.
    
    :param train:
        Bool whether to load the training-set (True) or validation-set (False).
    :return: 
        ids, filenames, captions for the images in the data-set.
    """

    if train:
        # Cache-file for the training-set data.
        cache_filename = "records_train.pkl"
    else:
        # Cache-file for the validation-set data.
        cache_filename = "records_val.pkl"

    # Path for the cache-file.
    cache_path = os.path.join(data_dir, cache_filename)

    # If the data-records already exist in a cache-file then load it,
    # otherwise call the _load_records() function and save its
    # return-values to the cache-file so it can be loaded the next time.
    records = cache(cache_path=cache_path,
                    fn=_load_records,
                    train=train)

    return records

In [5]:
from google.colab import drive
drive.mount('/content/gdrive/')

Mounted at /content/gdrive/


listing the names of images in the directory


In [None]:
name=os.listdir('/content/gdrive/My Drive/coco/val2014')

In [15]:
_,filename_train,caption_train=load_records(train=False)

- Data loaded from cache-file: /content/gdrive/My Drive/coco/records_val.pkl


In [2]:
from PIL import Image
import numpy as np
import os
import matplotlib.pyplot as plt
from tensorflow import keras
from tensorflow.keras.preprocessing.sequence import pad_sequences

Loading the preprocessed images by the VGG16 model.

In [None]:
with open(os.path.join(data_dir, "transfer_values_true.pickle"), mode='rb') as file:
            transfer_values_val = pickle.load(file)
transfer_values_val[0]

array([0.   , 0.   , 1.619, ..., 0.   , 0.   , 0.658], dtype=float16)

loading the captions and tokenizer files

In [17]:
with open(os.path.join(data_dir, "caption_sos.pickle"), mode='rb') as file:
            captions = pickle.load(file)
with open(os.path.join(data_dir, "tokenizer.pickle"), mode='rb') as file:
            tokeniser = pickle.load(file)

In [None]:
sequence_final=[]
for ele in captions:
  sequence_inter=tokeniser.texts_to_sequences(ele)
  sequence_final.append(sequence_inter)

preparing the captions and preprocessed images

In [None]:
def get_random_caption_tokens(idx):
  result=[]
  for i in idx:
    j=np.random.choice(len(sequence_final[i]))
    tokens=sequence_final[i][j]
    result.append(tokens)
  return result

In [None]:
def batch_generator(batch_size):
  while True:
    idx=np.random.randint(len(name),size=batch_size)
    transfer_values =  transfer_values_val[idx]
    tokens = get_random_caption_tokens(idx)
    num_tokens = [len(t) for t in tokens]
    max_tokens=np.max(num_tokens)
    tokens_padded = pad_sequences(tokens,maxlen=max_tokens, padding='post',truncating='post')
    decoder_input_data = tokens_padded[:, 0:-1]
    decoder_output_data = tokens_padded[:,1:]
    x_data = \
    {
        'decoders_input':decoder_input_data,
        'transfer_values_input':transfer_values
    }
    y_data = \
    {
        'decoders_op':decoder_output_data
    }
    yield (x_data, y_data)

In [None]:
batch_size = 384
generator = batch_generator(batch_size=batch_size)

In [18]:
steps_per_epoch = 1541
num_words=len(tokeniser.word_index)

In [3]:
transfer_values_input = keras.layers.Input(shape=(4096,),
                              name='transfer_values_input')

decoder_transfer_map = keras.layers.Dense(512,
                             activation='tanh',
                             name='decoder_transfer_map')

decoders_input = keras.layers.Input(shape=(None, ), name='decoders_input')
decoder_embedding = keras.layers.Embedding(input_dim=num_words+1,
                              output_dim=128,
                              name='decoder_embedding')

decoder_LSTM1 = keras.layers.GRU(512, name='decoder_gru1',
                   return_sequences=True)
decoder_LSTM2 = keras.layers.GRU(512, name='decoder_gru2',
                   return_sequences=True)
decoder_LSTM3 = keras.layers.GRU(512, name='decoder_gru3',
                   return_sequences=True)

decoder_dense=keras.layers.Dense(num_words+1,activation='softmax',name='decoders_op')

NameError: ignored

In [20]:
def connect_decoder(transfer_values_input):
    
    initial_state = decoder_transfer_map(transfer_values_input)
    

    # Start the decoder-network with its input-layer.
    net = decoders_input
    
    # Connect the embedding-layer.
    net = decoder_embedding(net)
    
    # Connect all the GRU layers.
    net = decoder_LSTM1(net, initial_state=initial_state)
    net = decoder_LSTM2(net, initial_state=initial_state)
    net = decoder_LSTM3(net, initial_state=initial_state)

    # Connect the final dense layer that converts to
    # one-hot encoded arrays.
    decoders_output = decoder_dense(net)
    
    return decoders_output

In [21]:
decoders_output = connect_decoder(transfer_values_input)

decoder_model = keras.Model(inputs=[transfer_values_input, decoders_input],
                      outputs=[decoders_output])

In [22]:
decoder_model.compile(optimizer=keras.optimizers.RMSprop(lr=1e-3),
                      loss='sparse_categorical_crossentropy')

In [None]:
decoder_model.fit(x=generator,
                  steps_per_epoch=steps_per_epoch,
                  epochs=5)
decoder_model.save_weights('/content/gdrive/My Drive/coco/decoder_model_weights_VGG16_finese',overwrite=True)

Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


In [27]:
decoder_model.save('/content/gdrive/My Drive/coco/decoder_model')

INFO:tensorflow:Assets written to: /content/gdrive/My Drive/coco/decoder_model/assets


In [2]:
from tensorflow import keras


In [23]:
decoder_model.load_weights('/content/gdrive/My Drive/coco/decoder_model_weights_VGG16_finese')

<tensorflow.python.training.tracking.util.CheckpointLoadStatus at 0x7fd65d7cdeb8>

In [3]:
from keras.applications import VGG16
base_model=VGG16(weights='imagenet',include_top=True)
for layers in base_model.layers:
  layers.trainable=False
last_layer=base_model.layers[-3]
last_output=last_layer.output

img_model=keras.Model(base_model.input,last_output)
img_model.summary()

Model: "functional_1"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_2 (InputLayer)         [(None, 224, 224, 3)]     0         
_________________________________________________________________
block1_conv1 (Conv2D)        (None, 224, 224, 64)      1792      
_________________________________________________________________
block1_conv2 (Conv2D)        (None, 224, 224, 64)      36928     
_________________________________________________________________
block1_pool (MaxPooling2D)   (None, 112, 112, 64)      0         
_________________________________________________________________
block2_conv1 (Conv2D)        (None, 112, 112, 128)     73856     
_________________________________________________________________
block2_conv2 (Conv2D)        (None, 112, 112, 128)     147584    
_________________________________________________________________
block2_pool (MaxPooling2D)   (None, 56, 56, 128)      

TESTING PHASE


In [11]:
img_model.save('/content/gdrive/My Drive/coco/VGG16foruse.h5')

In [None]:
img=Image.open('/content/gdrive/My Drive/coco/val2014/COCO_val2014_000000000536.jpg')
 
img=img.resize(size=(224,224), resample=Image.LANCZOS)
image=np.array(img)
img=image/255.0
image_batch = np.expand_dims(img, axis=0)
print(image_batch.shape)
print(img.shape)

(1, 224, 224, 3)
(224, 224, 3)


In [None]:
transfered_values = img_model.predict(image_batch)

In [None]:
start_word=tokeniser.word_index['sos']
end_word=tokeniser.word_index['eos']
corpus_index=tokeniser.word_index
corpus_index={value:key for key,value in corpus_index.items()}
reverse_corpus_index={value:key for key,value in corpus_index.items()}
reverse_corpus_index['horse']

107

In [None]:
def sample(preds,temperature=1.0):
  preds =np.asarray(preds).astype('float64')
  preds=np.log(preds)/temperature
  exp_preds=np.exp(preds)
  preds=exp_preds/np.sum(exp_preds)
  
  
  '''preds=preds.reshape(preds.shape[1])'''
  probas = np.random.multinomial(1, preds, 1)
  return np.argmax(probas)

In [None]:
decode_input=np.zeros(shape=(1,10),dtype=np.int)

curr_token=start_word
count_tokens=0
max_token=10
output_text = ''
while curr_token!=end_word and count_tokens<max_token:
  decode_input[0,count_tokens]=curr_token
  x_data={
      'transfer_values_input': transfered_values,
      'decoders_input': decode_input
  }
  decode_output = decoder_model.predict(x_data)
  token_output=decode_output[0,count_tokens,:]
  pred=np.argmax(token_output)
  
  curr_token=pred
  sampled_word = corpus_index[pred]
  output_text += " " + sampled_word
  count_tokens += 1

In [None]:
output_text

' a man is holding a box of pizza in a'

In [None]:
transfer_values_input = keras.layers.Input(shape=(4096,),
                              name='transfer_values_input')

decoder_transfer_map = keras.layers.Dense(512,
                             activation='tanh',
                             name='decoder_transfer_map')
decoder_transfer_map_averaged=keras.layers.GlobalAveragePooling1D()
decoders_input = keras.layers.Input(shape=(None, ), name='decoders_input')
decoder_embedding = keras.layers.Embedding(input_dim=num_words+1,
                              output_dim=128,
                              name='decoder_embedding')

decoder_LSTM1 = keras.layers.GRU(512, name='decoder_gru1',
                   return_sequences=True)
decoder_LSTM2 = keras.layers.GRU(512, name='decoder_gru2',
                   return_sequences=True)
decoder_LSTM3 = keras.layers.GRU(512, name='decoder_gru3',
                   return_sequences=True)

decoder_dense=keras.layers.Dense(num_words+1,activation='linear',name='decoders_op')

In [None]:
def connect_decoder(transfer_values_input):
    
    initial_state = decoder_transfer_map(transfer_values_input)
    

    # Start the decoder-network with its input-layer.
    net = decoders_input
    
    # Connect the embedding-layer.
    net = decoder_embedding(net)
    
    # Connect all the GRU layers.
    net = decoder_LSTM1(net, initial_state=initial_state)
    net = decoder_LSTM2(net, initial_state=initial_state)
    net = decoder_LSTM3(net, initial_state=initial_state)

    # Connect the final dense layer that converts to
    # one-hot encoded arrays.
    decoders_output = decoder_dense(net)
    
    return decoders_output

In [None]:
decoders_output = connect_decoder(transfer_values_input)

decoder_model = keras.Model(inputs=[transfer_values_input, decoders_input],
                      outputs=[decoders_output])

In [None]:
import tensorflow as tf
def sparse_crossentropy(y_true, y_pred):
  loss=tf.nn.sparse_softmax_cross_entropy_with_logits(labels=y_true, logits=y_pred)
  loss_mean = tf.reduce_mean(loss)
  return loss_mean 

In [None]:
decoder_target= tf.keras.backend.placeholder(dtype='int32', shape=(None,None))

In [None]:
decoder_model.compile(optimizer=keras.optimizers.RMSprop(lr=1e-3),
                      loss=sparse_crossentropy,
                      
                      )