In [None]:
from glob import glob
from google.colab import drive
drive.mount('/content/drive', force_remount = True)
from keras.utils import to_categorical
from numpy import array
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tqdm import tqdm
import cv2
import matplotlib.pyplot as plt
import numpy as np
import pickle
import PIL
import os
import re
import tensorflow as tf
import tensorflow_datasets as tfds

Mounted at /content/drive


In [None]:
#getting the images
imagePath = "/content/drive/My Drive/archive/Images/"
captionPath = "/content/drive/My Drive/archive/captions.txt"
embeddingPath = "/content/drive/My Drive/archive/glove.6B.200d.txt"
images = glob(imagePath + "*.jpg")

In [None]:
#quick visualization of a few images
for i in range(5):
  plt.figure()
  image = cv2.imread(images[i])
  image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
  plt.imshow(image)

In [None]:
#function to load captions
def load(fileName):
  file = open(fileName, 'r')
  text = file.read()
  file.close()
  return text

#this file is contains only the captions, the original file has two uneeded lines that cause issues
file = "/content/drive/My Drive/archive/captions1.txt"
info = load(file)
print(info[:1])

1


In [None]:
#creates a dictionary for the images and their respective captions
def load_captions(info):
  dict0 = dict()
  for line in info.split('\n'):
    #print(line) #line seems to be correct
    splitter = line.split('.jpg,')
    #print(splitter) #splitter is fine

    #image code and image captions are the list of the images and their respective captions
    imageCode, imageCaption = splitter[0], splitter[1]

    #creates the dictionary
    if imageCode not in dict0:
      dict0[imageCode] = list()

    dict0[imageCode].append(imageCaption)

  return dict0

data = load_captions(info)

In [None]:
#preprocessing the data
def cleanse_data(data):
  dict0 = dict()
  for key, value in data.items():
    for i in range(len(value)):
      lines = ""
      line1 = value[i]
      for j in line1.split():
        if len(j) < 2:
          continue
        j = j.lower()
        lines += j + " "
      if key not in dict0:
        dict0[key] = list()

      dict0[key].append(lines)

  return dict0

data1 = cleanse_data(data)

In [None]:
#converts text into a vocabulary of words and calculates the words
def vocab(data):
  all_desc = set()
  for key in data.keys():
    [all_desc.update(d.split()) for d in data[key]]
  return all_desc

#summarizes the vocabulary
vocabData = vocab(data1)

In [None]:
#saves the descriptions to a different captions.txt file
def save_dict(data, fileName):
  lines = list()
  for key, value in data.items():
    for desc in value:
      lines.append(key + ' ' + desc)
  file = open(fileName, 'w')
  file.write(str(data))
  file.close()

save_dict(data1, "captions1.txt")

In [None]:
#preprocesses the images to for the inceptionv3 model
def preprocessInception(imagePath):
  #converts images to size 299x299
  img = tf.keras.preprocessing.image.load_img(imagePath, target_size = (299, 299))

  #converts image to 3D np array
  x = tf.keras.preprocessing.image.img_to_array(img)
  x = np.expand_dims(x, axis = 0)

  #tutorial/code I'm using uses the inceptionV3 model to preprocess the input
  x = tf.keras.applications.inception_v3.preprocess_input(x)

  return x

In [None]:
#preprocesses images for EfficientNetB0
def preprocessEfficient(imagePath):
  #converts images to size 224x224
  img = tf.keras.preprocessing.image.load_img(imagePath, target_size = (224, 224))

  #converts image to 3D np array
  x = tf.keras.preprocessing.image.img_to_array(img)
  x = np.expand_dims(x, axis = 0)

  x = tf.keras.applications.efficientnet.preprocess_input(x)

  return x

In [1]:
#preprocess images for ResNet50
def preprocessResNet(imagePath):
  #converts images to size 244x244
  img = tf.keras.preprocessing.image.load_img(imagePath, target_size = (244, 244))

  #converts image to 3D np array
  x = tf.keras.preprocessing.image.img_to_array(img)
  x = np.expand_dims(x, axis = 0)

  x = tf.keras.applications.resnet50.preprocess_input(x)

  return x

#preprocess images for VGG19
def preprocessVGG19(imagePath):
  #converts images to size 244x244
  img = tf.keras.preprocessing.image.load_img(imagePath, target_size = (244, 244))

  #converts image to 3D np array
  x = tf.keras.preprocessing.image.img_to_array(img)
  x = np.expand_dims(x, axis = 0)

  x = tf.keras.applications.vgg19.preprocess_input(x)

  return x

In [None]:
#creating a new model based on inception v3 and using the imagenet weights
input = tf.keras.applications.InceptionV3(weights = 'imagenet')
#creates the new model and removes the output layer, in this case we're using inception v3
model = tf.keras.models.Model(input.input, input.layers[-2].output)
model.summary()

#using EfficientNetB0
effNetInput = tf.keras.applications.EfficientNetB0(weights = 'imagenet')
model0 = tf.keras.models.Model(effNetInput.input, effNetInput.layers[-2].output)
model0.summary()

#using ResNet50
resNetInput = tf.keras.applications.ResNet50V2(weights = 'imagenet')
model50 = tf.keras.models.Model(resNetInput.input, resNetInput.layers[-2].output)
model50.summary()

#using VGG19
vggInput = tf.keras.applications.VGG19(weights = 'imagenet')
model19 = tf.keras.models.Model(vggInput.input, vggInput.layers[-2].output)
model19.summary()


In [None]:
#encodes an image into a vector of size (2048, ) using inceptionv3
def encodeInception(image):
  image = preprocessInception(image)
  fVec = model.predict(image)
  fVec = np.reshape(fVec, fVec.shape[1])
  return fVec

encoding = {}

for i in tqdm(images):
  encoding[i[len(imagePath):]] = encodeInception(i)

#puts the images into a pickle file
with open("images1.pkl", "wb") as encodedPickle:
  pickle.dump(encoding, encodedPickle)

In [None]:
#same as above but using efficientnetB0
def encodeEfficient(image):
  image = preprocessEfficient(image)
  fVec = model0.predict(image)
  fVec = np.reshape(fVec, fVec.shape[1])
  return fVec

encoding = {}

for i in tqdm(images):
  encoding[i[len(imagePath):]] = encodeEfficient(i)

#making a 2nd pickle file
with open("images2.pkl", "wb") as encodedPickle:
  pickle.dump(encoding, encodedPickle)

In [None]:
#encoding images w/ resnet50
def encodeResNet(image):
  image = preprocessResNet(image)
  fVec = model50.predict(image)
  fVec = np.reshape(fVec, fVec.shape[1])
  return fVec

encoding = {}

for i in tqdm(images):
  encoding[i[len(imagePath):]] = encodeResNet(i)

with open("images3.pkl", "wb") as encodedPickle:
  pickle.dump(encoding, encodedPickle)

In [None]:
#encodes images using vgg19
def encodeVGG(image):
  image = preprocessVGG19(image)
  fVec = model19.predict(image)
  fVec = np.reshape(fVec, fVec.shape[1])
  return fVec

encoding = {}

for i in tqdm(images):
  encoding[i[len(imagePath):]] = encodeVGG(i)

with open("images4.pkl", "wb") as encodedPickle:
  pickle.dump(encoding, encodedPickle)

In [None]:
#creates a list of all training captions
trainingCaptions = []
for key, val in data1.items():
  for caption in val:
    trainingCaptions.append(caption)

len(trainingCaptions)

#considers only words which occurs at least 10 times
threshold = 10
wordCounts = {}
nsents = 0

for sent in trainingCaptions:
  nsents += 1
  for w in sent.split(' '):
    wordCounts[w] = wordCounts.get(w, 0) + 1

vocabulary = [w for w in wordCounts if wordCounts[w] >= threshold]
print('preprocessed words %d -> %d' % (len(wordCounts), len(vocabulary)))

#converts the words to indices and vice versa
indexWord = {}
wordIndex = {}

index = 1
for w in vocabulary:
  wordIndex[w] = index
  indexWord[index] = w
  index += 1

vocabSize = len(indexWord) + 1

#tutoral I'm using says to convert the 2 above dictionaries(?) into pickle files, again I'll ignore that

#converts a dictionary of clean descriptions to a list of descriptions
def to_lines(desc):
  allDesc = list()
  for key in desc.keys():
    [allDesc.append(d) for d in desc[key]]
  return allDesc

#calculates the length of the description w/ the most words, not sure if needed
def max_length(desc):
  lines = to_lines(desc)
  return max(len(d.split()) for d in lines)

#determines the max. sequence length, used as a parameter for data_generator(...)
maxLength = max_length(data1)
print('Description Length: %d' % maxLength)

In [None]:
#data generator function to be used as a parameter for model.fit_generator(), or model.fit()
def data_generator(descriptions, photos, wordIndex, maxLength, photoNumPerBatch):
  x1, x2, y = list(), list(), list()
  n = 0
  #loops over each image
  while 1:
    for key, descList in descriptions.items():
      n += 1
      #retrives the photo feature
      photo = photos[key + '.jpg']
      for desc in descList:
        #encodes the sequence
        seq = [wordIndex[word] for word in desc.split(' ') if word in wordIndex]
        #splits a sequence into multiple x, y pairs
        for i in range(1, len(seq)):
          #splits into input and output pairs
          inSeq, outSeq = seq[:i], seq[i]
          #pads the input sequence
          inSeq = pad_sequences([inSeq], maxlen = maxLength)[0]
          #encodes the output sequence
          outSeq = to_categorical([outSeq], num_classes = vocabSize)[0]
          #stores the values into their respective lists
          x1.append(photo)
          x2.append(inSeq)
          y.append(outSeq)

      #yield batch data
      if n == photoNumPerBatch:
        yield([array(x1), array(x2)], array(y))
        x1, x2, y = list(), list(), list()
        n = 0

In [None]:
#creates a word embedding vector for each unique word for a fixed length
#using glove.6B.200d.txt from https://github.com/stanfordnlp/GloVe
#otherwise, I think we would need to create a custom embedding vector
embeddingIndex = {}
f = open(embeddingPath, encoding = "utf-8")

for line in f:
  values = line.split()
  word = values[0]
  coefs = np.asarray(values[1:], dtype = 'float32')
  embeddingIndex[word] = coefs
f.close()

#print('Found %s word vectors.' % len(embeddingIndex))

embeddingDim = 200
embeddingMatrix = np.zeros((vocabSize, embeddingDim))

for word, i in wordIndex.items():
  embeddingVector = embeddingIndex.get(word)
  if embeddingVector is not None:
    embeddingMatrix[i] = embeddingVector

#print(embeddingMatrix.shape)

In [None]:
#additonal encoding/decoding layers initially for inceptionv3, need to change the model variable if using a different encoding model
input1 = tf.keras.layers.Input(shape = (2048, ))
fe1 = tf.keras.layers.Dropout(0.5)(input1)
fe2 = tf.keras.layers.Dense(256, activation = 'relu')(fe1)

input2 = tf.keras.layers.Input(shape = (maxLength,))
se1 = tf.keras.layers.Embedding(vocabSize, embeddingDim, mask_zero = True)(input2)
se2 = tf.keras.layers.Dropout(0.5)(se1)
se3 = tf.keras.layers.GRU(256)(se2)#se3 = tf.keras.layers.LSTM(256)(se2)

decoder1 = tf.keras.layers.add([fe2, se3])
decoder2 = tf.keras.layers.Dense(256, activation = 'relu')(decoder1)
output = tf.keras.layers.Dense(vocabSize, activation = 'softmax')(decoder2)

model3 = tf.keras.models.Model(inputs = [input1, input2], outputs = output)
model3.summary()

In [None]:
#training the inceptionv3 model
model3.layers[2].set_weights([embeddingMatrix])
model3.layers[2].trainable = False

model3.compile(loss = 'categorical_crossentropy', optimizer = 'adam')

epochs = 10
batchNum = 3
steps = len(data1) // batchNum // 100

#tutorial uses the pickle file here as the features
features = pickle.load(open("/content/images1.pkl", "rb"))

#stops an error where function tries to create variables on non-first call
tf.config.run_functions_eagerly(True)

generator = data_generator(data1, features, wordIndex, maxLength, batchNum)
model3.fit(generator, steps_per_epoch = steps, epochs = epochs, verbose = 1)
#model3.save('model3.h5')

In [None]:
#trains efficientnetB0 model
steps = len(data1) // 300
model0.layers[2].set_weights([embeddingMatrix])
model0.layers[2].trainable = False

model0.compile(loss = 'categorical_crossentropy', optimizer = 'adam')

features = pickle.load(open("/content/images2.pkl", "rb"))

#stops an error where function tries to create variables on non-first call
tf.config.run_functions_eagerly(True)

generator = data_generator(data1, features, wordIndex, maxLength, 3)
model0.fit(generator, steps_per_epoch = steps, epochs = 10, verbose = 1)
model0.save('model0.h5')

In [None]:
#trains resnet50 model
steps = len(data1) // 300
model50.layers[2].set_weights([embeddingMatrix])
model50.layers[2].trainable = False

model50.compile(loss = 'categorical_crossentropy', optimizer = 'adam')

features = pickle.load(open("/content/images3.pkl", "rb"))

tf.config.run_functions_eagerly(True)

generator = data_generator(data1, features, wordIndex, maxLength, 3)
model50.fit(generator, steps_per_epoch = steps, epochs = 10, verbose = 1)
model50.save('model50.h5')

In [None]:
#trains vgg19 model
steps = len(data1) // 300
model19.layers[2].set_weights([embeddingMatrix])
model19.layers[2].trainable = False

model19.compile(loss = 'categorical_crossentropy', optimizer = 'adam)

features = pickle.load(open("/content/images4.pkl", "rb"))

tf.config.run_functions_eagerly(True)

generator = data_generator(data1, features, wordIndex, maxLength, 3)
model19.fit(generator, steps_per_epoch = steps, epochs = 10, verbose = 1)
model19.save('model19.h5')

In [None]:
#making predictions w/ the model
def image_caption(picture):
  in_text = 'startseq'
  for i in range(maxLength):
    seq = [wordIndex[w] for w in in_text.split() if w in wordIndex]
    seq = pad_sequences([seq], maxlen = maxLength)
    yhat = model0.predict([picture, seq], verbose = 0)
    yhat = np.argmax(yhat)
    word = indexWord[yhat]
    in_text += ' ' + word
    if word == 'endseq':
      break
  final = in_text.split()
  final = final[1:-1]
  final = ' '.join(final)
  return final

In [None]:
#randomly selects an image then attemps to visualize the output using the model
z = 78 #20
pic = list(features.keys())[z]
image = features[pic].reshape((1, 4096))
#image = features[pic].reshape((1, 2048))
#image = features[pic].reshape((1, 1280))
x = plt.imread(imagePath + pic)
plt.imshow(x)
plt.show()
print("Caption: ", image_caption(image))

In [None]:
z = 587 #200
pic = list(features.keys())[z]
image = features[pic].reshape((1, 4096))
#image = features[pic].reshape((1, 2048))
#image = features[pic].reshape((1, 1280))
x = plt.imread(imagePath + pic)
plt.imshow(x)
plt.show()
print("Caption: ", image_caption(image))

In [None]:
#[:-100] because early stopping restored best values (100 before it stopped)

plt.plot(history['loss'][:-100])
plt.plot(history['val_loss'][:-100])
plt.title('Loss Graph for InceptionV3')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend(['train', 'validation'], loc = 'upper left')
plt.show()

In [None]:
plt.plot(history['categorical_accuracy'][:-100])
plt.plot(history['val_categorical_accuracy'][:-100])
plt.title('Accuracy Graph for InceptionV3')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend(['train', 'validation'], loc = 'upper left')
plt.show()