In [None]:
# use training token set to create vocabulary
train_dir = 'drive/Shareddrives/Bionic_Eye_IoT_Script/Show-And-Tell-Keras/datasets/Flickr8k_text/Flickr_8k.trainImages.txt'
token_dir = 'drive/Shareddrives/Bionic_Eye_IoT_Script/Show-And-Tell-Keras/datasets/Flickr8k_text/Flickr8k.token.txt'
# the current best trained model
model_dir = 'drive/Shareddrives/Bionic_Eye_IoT_Script/Show-And-Tell-Keras/model-params/current_best.h5'

In [1]:
def extract_feature_from_image(file_dir, model):
    img = image.load_img(file_dir, target_size=(299, 299))
    x = image.img_to_array(img)
    x = np.expand_dims(x, axis=0)
    x = preprocess_input(x)

    # base_model = InceptionV3(weights='imagenet')
    # model = Model(inputs=base_model.input, outputs=base_model.get_layer('avg_pool').output)

    return model.predict(x)

def load_token_text(token_dir):
    sents_dict = {}
    with open(token_dir, 'r') as f:
        for line in f.readlines():
            words = line.strip('\n').split()
            img_id = words[0].split('.')[0]
            sent = ' '.join(words[1:])

            if img_id in sents_dict.keys():
                sents_dict[img_id].append(sent)
            else:
                sents_dict[img_id] = [sent]
            
    return sents_dict


def load_dataset_token(dataset_dir, token_dir, start_end = True):
    all_sents = load_token_text(token_dir)

    img_ids = []
    with open(dataset_dir, 'r') as f:
        for line in f.readlines():
            img_ids.append(os.path.splitext(line)[0])

    sent_list = []
    for id in img_ids:
        for sent in all_sents[id]:
            sent_ = sent
            if start_end:
                sent_ = 'startseq ' + sent_ + ' endseq'

            sent_list.append(sent_)
    
    return sent_list


def create_tokenizer(dataset_dir, token_dir, start_end = True, use_all = False):
    # 'num_words = None' for all words in training set
    # for example, 'num_words = 6000', means use maximum 6000 words in vocabulary  
    num_words = None

    sent_list = load_dataset_token(dataset_dir, token_dir, start_end)

    if use_all:
        tokenizer = Tokenizer()
    else:
        if num_words:
            tokenizer = Tokenizer(num_words)
        else:
            tokenizer = Tokenizer()

    tokenizer.fit_on_texts(sent_list)

    return tokenizer

def model(vocab_size, max_len, reg):
    # Image embedding
    inputs1 = Input(shape=(2048,))
    X_img = Dropout(0.5)(inputs1)
    X_img = Dense(unit_size, use_bias = False, 
                        kernel_regularizer=regularizers.l2(reg),
                        name = 'dense_img')(X_img)
    X_img = BatchNormalization(name='batch_normalization_img')(X_img)
    X_img = Lambda(lambda x : K.expand_dims(x, axis=1))(X_img)

    # Text embedding
    inputs2 = Input(shape=(max_len,))
    X_text = Embedding(vocab_size, unit_size, mask_zero = True, name = 'emb_text')(inputs2)
    X_text = Dropout(0.5)(X_text)

    # Initial States
    a0 = Input(shape=(unit_size,))
    c0 = Input(shape=(unit_size,))

    LSTMLayer = LSTM(unit_size, return_sequences = True, return_state = True, dropout=0.5, name = 'lstm')

    # Take image embedding as the first input to LSTM
    _, a, c = LSTMLayer(X_img, initial_state=[a0, c0])

    A, _, _ = LSTMLayer(X_text, initial_state=[a, c])
    output = TimeDistributed(Dense(vocab_size, activation='softmax',
                                     kernel_regularizer = regularizers.l2(reg), 
                                     bias_regularizer = regularizers.l2(reg)), name = 'time_distributed_softmax')(A)

    return Model(inputs=[inputs1, inputs2, a0, c0], outputs=output, name='NIC')


def greedy_inference_model(vocab_size, max_len):
    EncoderDense = Dense(unit_size, use_bias=False, name = 'dense_img')
    EmbeddingLayer = Embedding(vocab_size, unit_size, mask_zero = True, name = 'emb_text')
    LSTMLayer = LSTM(unit_size, return_state = True, name = 'lstm')
    SoftmaxLayer = Dense(vocab_size, activation='softmax', name = 'time_distributed_softmax')
    BatchNormLayer = BatchNormalization(name='batch_normalization_img')

    # Image embedding
    inputs1 = Input(shape=(2048,))
    X_img = EncoderDense(inputs1)
    X_img = BatchNormLayer(X_img)
    X_img = Lambda(lambda x : K.expand_dims(x, axis=1))(X_img)

    # Text embedding
    inputs2 = Input(shape=(1,))
    X_text = EmbeddingLayer(inputs2)

    # Initial States
    a0 = Input(shape=(unit_size,))
    c0 = Input(shape=(unit_size,))

    a, _, c = LSTMLayer(X_img, initial_state=[a0, c0])

    x = X_text

    outputs = []
    for i in range(max_len):
        
        a, _, c = LSTMLayer(x, initial_state=[a, c])
        output = SoftmaxLayer(a)
        outputs.append(output)
        x = Lambda(lambda x : K.expand_dims(K.argmax(x)))(output)
        x = EmbeddingLayer(x)

    return Model(inputs=[inputs1, inputs2, a0, c0], outputs=outputs, name='NIC_greedy_inference_v2')

def decoder(inf_model, tokenizer, features, post_process = True):

    '''
    Helper funtion of greedy search
    '''
    assert(features.shape[0]>0 and features.shape[1] == 2048)
    
    N = features.shape[0]

    startseq = np.repeat([tokenizer.word_index['startseq']], N)
    a0 = np.zeros([N, unit_size])
    c0 = np.zeros([N, unit_size])

    #print("111")
    # output dims: [32, N, 7378]
    y_preds = np.array(inf_model.predict([features, startseq, a0, c0], verbose = 1))
    #print("222")
    # output dims: [N, 32, 7378]
    y_preds = np.transpose(y_preds, axes = [1,0,2])
    sequences = np.argmax(y_preds, axis = -1)
    sents = tokenizer.sequences_to_texts(sequences)
    if post_process:
        # post processing: 'endseq'
        sents_pp = []
        for sent in sents:
            if 'endseq' in sent.split():
                words = sent.split()
                sents_pp.append(' '.join(words[:words.index('endseq')]))
            else:
                sents_pp.append(sent)
        sents = sents_pp
    return sents

Greedy Inference

In [2]:
# load vocabulary
tokenizer = create_tokenizer(train_dir, token_dir, start_end = True, use_all=True)

# set relevent parameters
vocab_size  = tokenizer.num_words or (len(tokenizer.word_index)+1)
max_len = 24 # use 24 as maximum sentence's length when training the model
NIC_inference = greedy_inference_model(vocab_size, max_len)
NIC_inference.load_weights(model_dir, by_name = True, skip_mismatch=True)
def generate_caption_from_file(file_dir,model):
    # Encoder
    img_feature = extract_feature_from_image(file_dir,model)
    # Decoder
    caption = decoder(NIC_inference, tokenizer, img_feature, True)
    
    return caption



NameError: ignored

The run API is required by the interface for all modules. This is the method  the server runs to start this module.

In [None]:
def run(num_imgs,model):

  for idx in range (1, num_imgs + 1):
    image_file_dir = '/content/drive/Shareddrives/Bionic_Eye_IoT_Data/Input/' + str(idx) + '_input.jpg'
    # image_file_dir = 'drive/Shareddrives/Bionic_Eye_IoT_Script/Show-And-Tell-Keras/put-your-image-here/example.jpg'

    # display image
    img = mpimg.imread(image_file_dir)
    #plt.imshow(img)

    #generate caption
    caption = generate_caption_from_file(image_file_dir,model)
    #plt.show()
    # Write strings to a file
    out_name = '/content/drive/Shareddrives/Bionic_Eye_IoT_Data/Output/' + str(idx) + '_output.txt'
    out_txt = open(out_name,'w')
    out_txt.write(caption[0])
    out_txt.close()

    #print(caption)

Driver code for this module.

In [None]:
num_imgs = len(os.listdir('/content/drive/Shareddrives/Bionic_Eye_IoT_Data/Input')) - 1
run(num_imgs,model_)