In [239]:
# -*- coding:utf-8 -*-
"""
Description: 
    1) Use the pre handled data(image description and image features to train the caption geneartion model
    2) firstly quick choose a model to train
Author: allocator
"""

'\nDescription: \n    1) Use the pre handled data(image description and image features to train the caption geneartion model\n    2) firstly quick choose a model to train\nAuthor: allocator\n'

In [240]:
import keras
from keras.preprocessing.text import Tokenizer
from keras.preprocessing.sequence import pad_sequences
from keras.utils import to_categorical
from keras.utils import plot_model
from keras.models import Model
from keras.layers import Input
from keras.layers import Dense
from keras.layers import Flatten
from keras.layers import LSTM
from keras.layers import RepeatVector
from keras.layers import TimeDistributed
from keras.layers import Embedding
from keras.layers.merge import concatenate
from keras.layers.pooling import GlobalMaxPooling2D
import numpy as np
import os
import numpy.random as rd
import json
import h5py

In [241]:
img_dir = '../data/img'
img_feature_dir = '../data/img_feature'
img_feature_file = 'image_features.h5'
clean_txt_dir = '../data/clean_txt'
clean_txt_file = 'image_descs.json'
set_category = 200
seed = 10
output_dir = '../data/res'
output_filename = 'development_dataset_id.json'

In [242]:
# compare the image feature list and the image desc list
def compare_feature_desc(feature_file, desc_file):
    """Compare the image feature and desc."""
    image_features = h5py.File(feature_file, 'r')
    image_descs = json.load(open(desc_file, 'r'))
    image_feature_keys = dict(image_features.keys())
    image_descs_keys = image_descs.keys()
    print(' current image_feature_keys')
    print(image_feature_keys)
    print(' current image_descs_keys')
    print(image_descs_keys)

In [243]:
# first seperate the data set
def seperate_dataset(img_dir, category, seed):
    """Generate the development set to quickly choose the model and configuration"""
    img_list = os.listdir(img_dir)
    # random select the train set and test set from the image list
    img_list = [item.split('.')[0] for item in img_list]
    img_size = len(img_list)
    extract_set = set()
    dataset = {}
    rd.seed(seed)
    while len(extract_set) < category:
        curr_id = rd.randint(img_size)
        curr_item = img_list[curr_id]
        if curr_item not in extract_set:
            extract_set.add(curr_item)
    print(' extract set generated and length %d ' % len(extract_set))
    set_len = int(category/2)
    dataset['train'] = list(extract_set)[:set_len]
    dataset['test'] = list(extract_set)[set_len:]
    return dataset

In [244]:
# save the dataset id
def save_dataset(filename, dataset):
    """Save the dataset."""
    file_path = os.path.join(output_dir, filename)
    json.dump(dataset, open(file_path, 'w'))

In [245]:
def load_txt(filename, dataset):
    train_set = {}
    test_set = {}
    train_id_list = dataset['train']
    test_id_list = dataset['test']
    image_descs = json.load(open(filename, 'r'))
    for item in train_id_list:
        train_set[item] = 'startseq ' + ' '.join(image_descs[item]) + ' endseq'
    for item in test_id_list:
        test_set[item] = 'startseq ' + ' '.join(image_descs[item]) + ' endseq'
    return train_set, test_set

In [246]:
# load the features about the images
def load_image_feature(filename, dataset):
    """Load the image feature about the train and test image id list."""
    image_features = h5py.File(filename, 'r')
    train_set = {}
    test_set = {}
    train_id_list = dataset['train']
    test_id_list = dataset['test']
    for item in train_id_list:
        train_set[item] = np.array(image_features[item])
    for item in test_id_list:
        test_set[item] = np.array(image_features[item])
    return train_set, test_set

In [None]:
# to encode the descriptions before training
# from words to unique integer values
def create_tokenizer(descriptions):
    """Encode the descriptions to numbers for model training."""
    tokenizer = Tokenizer()
    lines = list(descriptions.values())
    tokenizer.fit_on_texts(lines)
    return tokenizer

In [None]:
# the important generate training sequences

In [None]:
# create the sequence of the images input sequences and output words for an image
def create_sequences(tokenizer, image_desc, image, max_length):
    ximage, xseqs, y = list(), list(), list()
    # encode the description with integer
    vocab_size = len(tokenizer.word_index) + 1
    sequence = tokenizer.texts_to_sequences([image_desc])[0]
    # splite current sequence to multiple x,y pairs
    for i in range(1, len(sequence)):
        # split the sequence
        input_seq, output_seq = sequence[:i], sequence[i]
        # 
        input_seq = pad_sequences([input_seq], maxlen=max_length)[0]
        # 
        output_seq = to_categorical([output_seq], num_classes=vocab_size)[0]
        ximage.append(image)
        xseqs.append(input_seq)
        y.append(output_seq)
    return [ximage, xseqs, y]

In [None]:
# some explain about the model

In [None]:
# begin to fit the model
def define_model():
    """Define the model."""
    inputs1 = Input(shape=(7, 7, 512))
    fe1 = GlobalMaxPooling2D()(inputs1)
    fe2 = Dense(128, activation='relu')(fe1)
    fe3 = RepeatVector(max_length)(fe2)
    # embedding
    inputs2 = Input(shape=(max_length,))
    emb2 = Embedding(vocab_size, 50, mask_zero=True)(inputs2)
    emb3 = LSTM(256, return_sequences=True)(emb2)
    emb4 = TimeDistributed(Dense(128, activation='relu'))(emb3)
    # merge inputs
    merged = concatenate([fe3, emb4])
    # language model (decoder)
    lm2 = LSTM(500)(merged)
    lm3 = Dense(500, activation='relu')(lm2)
    outputs = Dense(vocab_size, activation='softmax')(lm3)
    # tie it together [image, seq] [word]
    model = Model(inputs=[inputs1, inputs2], outputs=outputs)
    model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
    print(model.summary())
    plot_model(model, show_shapes=True, to_file='plot.png')
    return model

In [None]:
# data generator, intended to be used in a call to model.fit_generator()
def data_generator(descriptions, features, tokenizer, max_length, n_step):
    # loop until we finish training
    while 1:
        # loop over photo identifiers in the dataset
        keys = list(descriptions.keys())
        for i in range(0, len(keys), n_step):
            Ximages, XSeq, y = list(), list(),list()
            for j in range(i, min(len(keys), i+n_step)):
                image_id = keys[j]
                # retrieve photo feature input
                image = features[image_id][0]
                # retrieve text input
                desc = descriptions[image_id]
                # generate input-output pairs
                in_img, in_seq, out_word = create_sequences(tokenizer, desc, image, max_length)
                for k in range(len(in_img)):
                    Ximages.append(in_img[k])
                    XSeq.append(in_seq[k])
                    y.append(out_word[k])
            # yield this batch of samples to the model
            yield [[array(Ximages), array(XSeq)], array(y)]

In [None]:
# map an integer to a word
def word_for_id(integer, tokenizer):
    for word, index in tokenizer.word_index.items():
        if index == integer:
            return word
    return None

In [None]:
# generate a description for an image
def generate_desc(model, tokenizer, photo, max_length):
    # seed the generation process
    in_text = 'startseq'
    # iterate over the whole length of the sequence
    for i in range(max_length):
        # integer encode input sequence
        sequence = tokenizer.texts_to_sequences([in_text])[0]
        # pad input
        sequence = pad_sequences([sequence], maxlen=max_length)
        # predict next word
        yhat = model.predict([photo,sequence], verbose=0)
        # convert probability to integer
        yhat = argmax(yhat)
        # map integer to word
        word = word_for_id(yhat, tokenizer)
        # stop if we cannot map the word
        if word is None:
            break
        # append as input for generating the next word
        in_text += ' ' + word
        # stop if we predict the end of the sequence
        if word == 'endseq':
            break
    return in_text

In [None]:
# evaluate the skill of the model
def evaluate_model(model, descriptions, photos, tokenizer, max_length):
    actual, predicted = list(), list()
    # step over the whole set
    for key, desc in descriptions.items():
        # generate description
        yhat = generate_desc(model, tokenizer, photos[key], max_length)
        # store actual and predicted
        actual.append([desc.split()])
        predicted.append(yhat.split())
    # calculate BLEU score
    bleu = corpus_bleu(actual, predicted)
    return bleu

In [247]:
dataset = seperate_dataset(img_dir, set_category, seed)
# save_dataset(output_filename, dataset)

 extract set generated and length 200 


In [248]:
# compare_feature_desc(os.path.join(img_feature_dir, img_feature_file), os.path.join(clean_txt_dir, clean_txt_file))
train_desc, test_desc = load_txt(os.path.join(clean_txt_dir, clean_txt_file), dataset)
train_img, test_img = load_image_feature(os.path.join(img_feature_dir, img_feature_file), dataset)

In [249]:
# some information about the trainging and test data set
# print(' train_desc length %d test_desc length %d ' % (len(train_desc), len(test_desc)))
# print(' train_img length %d test_img length %d ' % (len(train_img), len(test_img)))

 train_desc length 100 test_desc length 100 
 train_img length 100 test_img length 100 


In [None]:
# test the tokenizer
train_desc_tokenizer = create_tokenizer(train_desc)
# print(' training desc vocabulary is %d ' len(train_desc_tokenizer.word_index))