In [1]:
from numpy import array
from pickle import load

import tensorflow
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.utils import plot_model
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Dense, LSTM, Embedding, Dropout, Flatten, Activation
from tensorflow.keras.layers import RepeatVector, Permute, Multiply, Lambda, Add
from tensorflow.keras.callbacks import ModelCheckpoint
from tensorflow.keras.backend import sum

In [3]:
# Load doc into memory
def load_doc(filename):
    
    file = open(filename, 'r')
    text = file.read()
    file.close()
    
    return text

# Load a pre-defined list of photo identifiers
def load_set(filename):
    
    doc = load_doc(filename)
    dataset = list()
    
    for line in doc.split('\n'):
        if len(line) < 1:
            continue
            
        identifier = line.split('.')[0]
        dataset.append(identifier)
        
    return set(dataset)

# Load clean descriptions into memory
def load_clean_descriptions(filename, dataset):

    doc = load_doc(filename)
    descriptions = dict()
    
    for line in doc.split('\n'):
        
        tokens = line.split()
        
        image_id, image_desc = tokens[0], tokens[1:]
        
        # skip images not in the set
        if image_id in dataset:
            if image_id not in descriptions:
                descriptions[image_id] = list()
                
            # wrap description in tokens
            desc = 'startseq ' + ' '.join(image_desc) + ' endseq'
        
            descriptions[image_id].append(desc)
            
    return descriptions

# Load photo features
def load_photo_features(filename, dataset):
    
    all_features = load(open(filename, 'rb'))
    
    features = {k: all_features[k] for k in dataset}
    
    return features

# Creating a list of descriptions
def to_lines(descriptions):
    
    all_desc = list()
    
    for key in descriptions.keys():
        [all_desc.append(d) for d in descriptions[key]]
        
    return all_desc

# Fit a tokenizer given caption descriptions
def create_tokenizer(descriptions):
    
    lines = to_lines(descriptions)
    tokenizer = Tokenizer()
    tokenizer.fit_on_texts(lines)
    
    return tokenizer

# Length of the description with the most words
def max_length(descriptions):
    
    lines = to_lines(descriptions)
    
    return max(len(d.split()) for d in lines)

# Creating sequences of images, input sequences and output words for an image
def create_sequences(tokenizer, max_length, desc_list, photo, vocab_size):
    
    x1, x2, y = list(), list(), list()
    
    # Going through each description for the image
    for desc in desc_list:
        # Encoding the sequence
        seq = tokenizer.texts_to_sequences([desc])[0]
        
        # Split one sequence into multiple x,y pairs
        for i in range(1, len(seq)):
            
            # 1) Split into input and output pair
            in_seq, out_seq = seq[:i], seq[i]
            # 2) Pad input sequence
            in_seq = pad_sequences([in_seq], maxlen=max_length)[0]
            # 3) Encode output sequence
            out_seq = to_categorical([out_seq], num_classes=vocab_size)[0]
            

            x1.append(photo)
            x2.append(in_seq)
            y.append(out_seq)
            
    return array(x1), array(x2), array(y)

In [4]:
# Captioning model

def define_model(vocab_size, max_length):
    
    # Feature extractor model
    inputs1 = Input(shape=(4096, ))
    fe1 = Dropout(0.5)(inputs1)
    fe2 = Dense(256, activation='relu')(fe1)

    # Sequence model
    inputs2 = Input(shape=(max_length,))
    se1 = Embedding(vocab_size, 256, mask_zero=True)(inputs2)
    se2 = Dropout(0.5)(se1)
    se3 = LSTM(256)(se2)
    e=Dense(1, activation='tanh')(se3)
    
    # Soft-Attention model
    e=Flatten()(e)
    a=Activation('softmax')(e)
    temp=RepeatVector(256)(a)
    temp=Permute([2, 1])(temp)
    output= Multiply()([se3, temp])
    
    # Get the attention adjusted output state
    output= Lambda(lambda values: sum(values, axis=1))(output)
    se4 = Dense(256, activation='relu')(output)
    
    # Decoder model
    decoder1 = Add()([fe2, se4])
    decoder2 = Dense(256, activation='relu')(decoder1)
    outputs = Dense(vocab_size, activation='softmax')(decoder2)
    
    # Combining the model [image, seq] [word]
    model = Model(inputs=[inputs1, inputs2], outputs=outputs)
    
    # Compile model
    model.compile(loss='categorical_crossentropy', optimizer='adam')
    
    # Summarize model
    model.summary()
    plot_model(model, to_file='model.png', show_shapes=True)
    
    return model

In [5]:
# Data generator, call in model.fit_generator()
def data_generator(descriptions, photos, tokenizer, max_length, vocab_size):

    while 1:
        for key, desc_list in descriptions.items():
            
            # Retrieve the photo feature
            photo = photos[key][0]
            in_img, in_seq, out_word = create_sequences(tokenizer, max_length, desc_list, photo, vocab_size)
            
            yield [in_img, in_seq], out_word

In [None]:
# Load training dataset (6K)
filename = 'Flickr8k_text/Flickr_8k.trainImages.txt'
train = load_set(filename)
print('Dataset: %d' % len(train))

# Descriptions
train_descriptions = load_clean_descriptions('descriptions.txt', train)
print('Descriptions: train=%d' % len(train_descriptions))

# Photo features
train_features = load_photo_features('features.pkl', train)
print('Photos: train=%d' % len(train_features))

# Preparing tokenizer
tokenizer = create_tokenizer(train_descriptions)
vocab_size = len(tokenizer.word_index) + 1
print('Vocabulary Size: %d' % vocab_size)

# Determine the maximum sequence length
max_length = max_length(train_descriptions)
print('Description Length: %d' % max_length)

# Model
model = define_model(vocab_size, max_length)

# Train the model, run epochs manually and save after each epoch
epochs = 20
steps = len(train_descriptions)

for i in range(epochs):
    
    # Create the data generator
    generator = data_generator(train_descriptions, train_features, tokenizer, max_length, vocab_size)
    # Fit for one epoch
    model.fit_generator(generator, epochs=1, steps_per_epoch=steps, verbose=1)
    # Save model
    model.save('model_' + str(i) + '.h5')