In [None]:
import numpy as np
import pandas as pd
import cv2
import matplotlib.pyplot as plt
import os
from glob import glob

from keras.applications import ResNet50
from keras.models import Model

from prepare_images import extract_image_features
from prepare_text import clean_text_data, preprocessed
from image_model import return_model, generator

In [None]:
tf. test. is_built_with_cuda

## Loading the Training Images

In [None]:
train_images_path = 'Data/training_Images/'
train_images = glob(train_images_path+'*.jpg')
len(train_images)

## Loading the Testing Images

In [None]:
test_images_path = 'Data/testing_images/'
test_images = glob(test_images_path+'*.jpg')
len(test_images)

In [None]:
w = 10
h = 10
fig = plt.figure(figsize=(20, 20))
columns = 5
rows = 1
for i in range(1, columns*rows +1):
    img = cv2.imread(train_images[i])
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    fig.add_subplot(rows, columns, i)
    plt.imshow(img)
plt.show()

In [None]:
incept_model = ResNet50(include_top=True)

In [None]:
# Loading the last 2 layers
last_layers = incept_model.layers[-2].output
# Setting up the model input and output
model = Model(inputs = incept_model.input,outputs = last_layers)


with open('resnet_model_summary.txt','w') as fh:
    # Pass the file handle in as a lambda function to make it callable
    model.summary(print_fn=lambda x: fh.write(x + '\n'))

In [None]:
train_images_features = extract_image_features(train_images,model)
test_images_features = extract_image_features(test_images,model)

In [None]:
print(len(train_images_features))
print(len(test_images_features))

In [None]:
train_caption_path = 'Data/training_captions.txt'
train_captions = open(train_caption_path, 'rb').read().decode('utf-8').split('\n')
train_captions, train_error_count = clean_text_data(train_captions, train_images_features)
print(len(train_captions))
print("Error in : ", train_error_count)

test_caption_path = 'Data/testing_caption.txt'
test_captions = open(test_caption_path, 'rb').read().decode('utf-8').split('\n')
test_captions, test_error_count = clean_text_data(test_captions, test_images_features)
print(len(test_captions))
print("Error in : ", test_error_count)

In [None]:
for k,v in train_captions.items():
    for vv in v:
        train_captions[k][v.index(vv)] = preprocessed(vv)
        
for k,v in test_captions.items():
    for vv in v:
        test_captions[k][v.index(vv)] = preprocessed(vv)

In [None]:
count_words = {}
for k,vv in train_captions.items():
    for v in vv:
        for word in v.split():
            if word not in count_words:

                count_words[word] = 0

            else:
                count_words[word] += 1

In [None]:
THRESH = -1
count = 1
new_dict = {}
for k,v in count_words.items():
    if count_words[k] > THRESH:
        new_dict[k] = count
        count += 1
        
print(len(new_dict))
new_dict['<OUT>'] = len(new_dict) 

In [None]:
train_captions_backup = train_captions.copy()
train_captions_dict = train_captions_backup.copy()

test_captions_backup = test_captions.copy()
test_captions_dict = test_captions_backup.copy()

In [None]:
for k, vv in train_captions_dict.items():
    for v in vv:
        encoded = []
        for word in v.split():  
            if word not in new_dict:
                encoded.append(new_dict['<OUT>'])
            else:
                encoded.append(new_dict[word])
        train_captions_dict[k][vv.index(v)] = encoded
print(len(train_captions_dict))

        
for k, vv in test_captions_dict.items():
    for v in vv:
        encoded = []
        for word in v.split():  
            if word not in new_dict:
                encoded.append(new_dict['<OUT>'])
            else:
                encoded.append(new_dict[word])
        test_captions_dict[k][vv.index(v)] = encoded    
print(len(test_captions_dict))

In [None]:
train_MAX_LEN = 0
for k, vv in train_captions_dict.items():
    for v in vv:
        if len(v) > train_MAX_LEN:
            train_MAX_LEN = len(v)
            print(v)

test_MAX_LEN = 0
for k, vv in test_captions_dict.items():
    for v in vv:
        if len(v) > test_MAX_LEN:
            test_MAX_LEN = len(v)
            print(v)

In [None]:
from keras.utils import to_categorical
from keras.preprocessing.sequence import pad_sequences
def generator(photo, caption, MAX_LEN, VOCAB_SIZE):
    n_samples = 0
    X = []
    y_in = []
    y_out = []
    for k, vv in caption.items():
        for v in vv:
            for i in range(1, len(v)):
                X.append(photo[k])
                in_seq= [v[:i]]
                out_seq = v[i]
                in_seq = pad_sequences(in_seq, maxlen=MAX_LEN, padding='post', truncating='post')[0]
                out_seq = to_categorical([out_seq], num_classes=VOCAB_SIZE)[0]
                y_in.append(in_seq)
                y_out.append(out_seq)
    return np.array(X), np.array(y_in, dtype="float64"), np.array(y_out, dtype="float64")

In [None]:
Batch_size = 32
VOCAB_SIZE = len(new_dict)

X_train, y_in_train, y_out_train = generator(train_images_features, train_captions_dict, train_MAX_LEN, VOCAB_SIZE)
X_test, y_in_test, y_out_test = generator(test_images_features, test_captions_dict, test_MAX_LEN, VOCAB_SIZE)

In [None]:
embedding_size = 128
max_len = train_MAX_LEN
vocab_size = len(new_dict)

image_model = return_model(embedding_size, max_len, vocab_size)

In [None]:
image_model.fit([X_train, y_in_train], y_out_train, validation_data=([X_test,y_in_test], y_out_test), batch_size=512, epochs=50)

In [None]:
image_model.save('model.h5')